如何像 OpenCode 和 Claude Code 一样调用 Skill
一、OpenCode Skill 调用机制解析
1.1 OpenCode 架构特点
OpenCode 采用 文件系统扫描 + YAML 定义 + 脚本执行 的方式管理 Skill:
.opencode/
├── skills/
│ ├── halo-article-publisher/
│ │ ├── SKILL.md # Skill 定义(Markdown格式)
│ │ ├── publish.py # 执行脚本
│ │ └── requirements.txt # Python依赖
│ └── another-skill/
│ ├── SKILL.md
│ └── main.py
Skill 定义示例(SKILL.md):
---
name: halo-article-publisher
description: 发布 Markdown 文章到 Halo 博客
version: 1.0.0
author: OpenCode Team
# 入参规范
parameters:
type: object
properties:
title:
type: string
description: 文章标题
content:
type: string
description: Markdown 内容(支持 SVG)
cover:
type: string
description: 封面图片 URL 或本地路径
tags:
type: array
items:
type: string
description: 标签列表
required: [title, content]
# 执行方式
executor:
type: python
script: publish.py
# 能力声明
capabilities:
- article_publishing
- image_upload
- remote_fetch
# 权限要求
permissions:
- halo:post:create
- halo:file:upload
---
1.2 OpenCode Skill 调用流程
步骤1:Skill 注册与发现
# OpenCode 启动时扫描 skills 目录
class SkillRegistry:
def __init__(self, skills_dir=".opencode/skills"):
self.skills = {}
for skill_dir in Path(skills_dir).iterdir():
if skill_dir.is_dir():
skill_file = skill_dir / "SKILL.md"
if skill_file.exists():
skill = self.parse_skill(skill_file)
self.skills[skill.name] = skill
def parse_skill(self, skill_file):
# 解析 frontmatter(YAML部分)
with open(skill_file) as f:
content = f.read()
frontmatter = yaml.safe_load(content.split('---')[1])
return Skill(**frontmatter)
步骤2:Skill 匹配
class SkillMatcher:
def match(self, user_query: str, available_skills: List[Skill]) -> Optional[Skill]:
# 方法1:关键词匹配
keywords = extract_keywords(user_query)
for skill in available_skills:
if any(kw in skill.capabilities for kw in keywords):
return skill
# 方法2:Embedding 相似度
query_embedding = embed(user_query)
similarities = [
(skill, cosine_similarity(query_embedding, skill.embedding))
for skill in available_skills
]
return max(similarities, key=lambda x: x[1])[0]
# 方法3:LLM 判断(类似 Function Calling)
prompt = f"""
用户查询:{user_query}
可用技能:{[s.name + ': ' + s.description for s in available_skills]}
选择最合适的技能名称(如无匹配返回"none"):
"""
skill_name = llm.generate(prompt).strip()
return self.skills.get(skill_name)
步骤3:参数提取
def extract_parameters(query: str, skill: Skill) -> dict:
prompt = f"""
从用户查询中提取 {skill.name} 所需的参数。
Skill描述:{skill.description}
参数Schema:
{json.dumps(skill.parameters, indent=2)}
用户查询:{query}
输出JSON格式参数:
"""
response = llm.generate(prompt)
return json.loads(response)
步骤4:执行 Skill
class SkillExecutor:
def execute(self, skill: Skill, params: dict) -> SkillResult:
if skill.executor.type == "python":
return self.execute_python(skill, params)
elif skill.executor.type == "http":
return self.execute_http(skill, params)
elif skill.executor.type == "shell":
return self.execute_shell(skill, params)
def execute_python(self, skill: Skill, params: dict):
script_path = Path(skill.executor.script)
# 将参数写入临时文件
with tempfile.NamedTemporaryFile(mode='w', suffix='.json', delete=False) as f:
json.dump(params, f)
param_file = f.name
# 执行脚本
result = subprocess.run(
["python", str(script_path), param_file],
capture_output=True,
text=True,
timeout=skill.executor.get('timeout', 30)
)
os.unlink(param_file)
return SkillResult(
stdout=result.stdout,
stderr=result.stderr,
return_code=result.returncode
)
步骤5:结果格式化
def format_result(skill_result: SkillResult, skill: Skill) -> str:
# 使用 skill 定义的 output_template 格式化输出
if skill.output_template:
try:
data = json.loads(skill_result.stdout)
return skill.output_template.format(**data)
except:
return skill_result.stdout
return skill_result.stdout
二、Claude Code Skill 调用机制
2.1 Claude Code 的 Tool Use API
Claude Code 使用 Anthropic 官方 Tool Use API(Function Calling):
import anthropic
client = anthropic.Anthropic(api_key="...")
# 1. 注册工具
tools = [
{
"name": "read_file",
"description": "Read file content",
"input_schema": {
"type": "object",
"properties": {
"path": {"type": "string", "description": "File path"}
},
"required": ["path"]
}
},
{
"name": "edit_file",
"description": "Edit file content",
"input_schema": {...}
}
]
# 2. 发送请求
message = client.messages.create(
model="claude-3-opus-20240229",
max_tokens=1024,
messages=[
{"role": "user", "content": "Read the file README.md"}
],
tools=tools
)
# 3. 处理工具调用
if message.stop_reason == "tool_use":
for block in message.content:
if block.type == "tool_use":
tool_name = block.name
tool_input = block.input
# 执行工具
if tool_name == "read_file":
result = read_file(**tool_input)
elif tool_name == "edit_file":
result = edit_file(**tool_input)
# 4. 回传结果
message = client.messages.create(
model="claude-3-opus-20240229",
max_tokens=1024,
messages=[
{"role": "user", "content": "Read the file README.md"},
{"role": "assistant", "content": message.content},
{"role": "user", "content": [
{
"type": "tool_result",
"tool_use_id": block.id,
"content": result
}
]}
]
)
print(message.content[0].text)
2.2 Claude Code 内置工具
Claude Code 预定义了开发工具,无需注册即可使用:
| 工具名 | 描述 | 参数示例 |
|---|---|---|
Bash |
执行 shell 命令 | {"command": "ls -la"} |
Glob |
文件模式匹配 | {"pattern": "**/*.java"} |
Grep |
内容搜索 | {"pattern": "class.*Service", "paths": ["src/"]} |
Read |
读取文件 | {"file": {"path": "README.md"}} |
Edit |
编辑文件 | {"file": {...}, "old_string": "...", "new_string": "..."} |
Write |
写入文件 | {"file": {...}, "content": "..."} |
Task |
创建子任务 | {"description": "...", "prompt": "..."} |
这些工具是 Claude Code 的核心能力,用户无需配置。
三、通用 Skill 调用实现方案
3.1 Skill 标准定义(推荐格式)
# skill.yaml - 统一Skill定义格式
name: search-web
version: 1.0.0
description: 搜索互联网获取实时信息
author: Your Team
# 能力标签(用于匹配)
capabilities:
- search
- internet
- realtime
# 参数定义(JSON Schema 格式)
parameters:
type: object
properties:
query:
type: string
description: 搜索关键词
num_results:
type: integer
description: 返回结果数量
default: 5
language:
type: string
description: 搜索语言
enum: [zh, en, ja]
default: zh
required: [query]
# 执行器配置
executor:
type: http # 或 python、java、shell
endpoint: https://api.search.com/v1/search
method: POST
headers:
Authorization: "Bearer ${SEARCH_API_KEY}"
Content-Type: "application/json"
timeout: 10
# HTTP执行器的body模板
body_template: |
{
"q": "{query}",
"limit": {num_results},
"lang": "{language}"
}
# 或者Python脚本方式
# executor:
# type: python
# script: skills/search-web/search.py
# requirements:
# - requests>=2.28.0
# - beautifulsoup4
# 输出格式化模板(可选)
output_template: |
搜索结果(前{num_results}条):
{results}
共找到 {total} 条结果
# 示例(用于LLM理解)
examples:
- user: "搜索 Java 教程"
params:
query: "Java tutorial"
num_results: 10
language: "zh"
- user: "Find Python examples"
params:
query: "Python examples"
num_results: 5
language: "en"
# 权限要求
permissions:
- internet:outbound
- api:search
# 成本估算(每次调用token数或费用)
cost:
type: token
estimate: 100
3.2 Skill 注册中心实现
# skill_registry.py
from dataclasses import dataclass, asdict
from typing import Dict, List, Optional, Any
import yaml
import os
import json
from pathlib import Path
import hashlib
@dataclass
class Skill:
name: str
version: str
description: str
parameters: dict
executor: dict
capabilities: List[str]
output_template: Optional[str] = None
examples: Optional[List[dict]] = None
_embedding: Optional[List[float]] = None
def execute(self, params: dict) -> 'SkillResult':
"""执行Skill"""
executor_type = self.executor.get('type')
if executor_type == 'http':
return self._execute_http(params)
elif executor_type == 'python':
return self._execute_python(params)
elif executor_type == 'java':
return self._execute_java(params)
else:
raise ValueError(f"Unsupported executor type: {executor_type}")
def _execute_http(self, params: dict) -> 'SkillResult':
"""HTTP调用"""
import requests
import time
endpoint = self.executor['endpoint']
method = self.executor.get('method', 'POST').upper()
headers = self._render_headers()
body_template = self.executor.get('body_template')
# 渲染body
if body_template:
body = self._render_template(body_template, params)
else:
body = json.dumps(params)
start_time = time.time()
try:
resp = requests.request(
method=method,
url=endpoint,
headers=headers,
data=body,
timeout=self.executor.get('timeout', 30)
)
duration = time.time() - start_time
return SkillResult(
stdout=resp.text,
status_code=resp.status_code,
headers=dict(resp.headers),
duration=duration
)
except Exception as e:
return SkillResult(
stdout="",
stderr=str(e),
return_code=1,
duration=time.time() - start_time
)
def _execute_python(self, params: dict) -> 'SkillResult':
"""执行Python脚本"""
import subprocess
import tempfile
script = self.executor['script']
timeout = self.executor.get('timeout', 30)
# 将参数写入临时文件
with tempfile.NamedTemporaryFile(mode='w', suffix='.json', delete=False) as f:
json.dump(params, f)
param_file = f.name
try:
result = subprocess.run(
['python', script, param_file],
capture_output=True,
text=True,
timeout=timeout
)
return SkillResult(
stdout=result.stdout,
stderr=result.stderr,
return_code=result.returncode
)
except subprocess.TimeoutExpired:
return SkillResult(
stdout="",
stderr=f"Script timeout after {timeout}s",
return_code=124
)
finally:
os.unlink(param_file)
def _render_headers(self) -> dict:
"""渲染headers中的环境变量"""
headers = self.executor.get('headers', {})
return {
k: os.path.expandvars(v) # 替换 ${VAR}
for k, v in headers.items()
}
def _render_template(self, template: str, params: dict) -> str:
"""简单模板渲染 {field}"""
import string
return string.Template(template).safe_substitute(params)
@property
def embedding(self) -> List[float]:
"""获取或计算embedding"""
if self._embedding is None:
text = f"{self.name}: {self.description} {' '.join(self.capabilities)}"
self._embedding = compute_embedding(text) # 实现compute_embedding
return self._embedding
@dataclass
class SkillResult:
stdout: str
stderr: str = ""
return_code: int = 0
status_code: int = 200
headers: Optional[dict] = None
duration: float = 0.0
def is_success(self) -> bool:
return self.return_code == 0 and (200 <= self.status_code < 300)
def to_dict(self) -> dict:
return asdict(self)
class SkillRegistry:
"""Skill注册中心"""
def __init__(self, skills_dir: str = "skills"):
self.skills_dir = Path(skills_dir)
self._skills: Dict[str, Skill] = {}
self.load_all_skills()
def load_all_skills(self):
"""扫描目录加载所有Skill"""
if not self.skills_dir.exists():
return
for skill_dir in self.skills_dir.iterdir():
if skill_dir.is_dir():
skill_file = skill_dir / "skill.yaml"
if skill_file.exists():
try:
skill = self.load_skill(skill_file)
self._skills[skill.name] = skill
print(f"Loaded skill: {skill.name}")
except Exception as e:
print(f"Failed to load skill {skill_dir.name}: {e}")
def load_skill(self, path: Path) -> Skill:
"""从YAML加载Skill"""
with open(path) as f:
data = yaml.safe_load(f)
# 验证必需字段
required = ['name', 'description', 'executor']
for field in required:
if field not in data:
raise ValueError(f"Skill {path} missing required field: {field}")
return Skill(**data)
def get(self, name: str) -> Optional[Skill]:
return self._skills.get(name)
def list(self) -> List[Skill]:
return list(self._skills.values())
def search_by_capability(self, capability: str) -> List[Skill]:
"""根据能力标签搜索"""
return [
skill for skill in self._skills.values()
if capability in skill.capabilities
]
def semantic_search(self, query: str, top_k: int = 3) -> List[Skill]:
"""语义搜索最相关的Skills"""
query_embedding = compute_embedding(query)
scored = []
for skill in self._skills.values():
score = cosine_similarity(query_embedding, skill.embedding)
scored.append((score, skill))
scored.sort(key=lambda x: x[0], reverse=True)
return [skill for _, skill in scored[:top_k]]
def match(self, query: str, method: str = 'semantic') -> Optional[Skill]:
"""匹配最合适的Skill"""
if method == 'keyword':
keywords = extract_keywords(query)
for skill in self._skills.values():
if any(kw in ' '.join(skill.capabilities) for kw in keywords):
return skill
elif method == 'semantic':
results = self.semantic_search(query, top_k=1)
return results[0] if results else None
elif method == 'llm':
return self._llm_match(query)
return None
def _llm_match(self, query: str) -> Optional[Skill]:
"""使用LLM匹配Skill"""
skills_list = "\n".join([
f"- {s.name}: {s.description} (capabilities: {', '.join(s.capabilities)})"
for s in self._skills.values()
])
prompt = f"""
用户查询:{query}
可用技能:
{skills_list}
选择最合适的技能名称,只返回技能名,如无匹配返回"none":
"""
skill_name = llm.generate(prompt).strip()
return self._skills.get(skill_name) if skill_name != 'none' else None
# 辅助函数
def compute_embedding(text: str) -> List[float]:
"""计算文本embedding(需实现)"""
# 使用本地模型或OpenAI API
# 示例:return openai.Embedding.create(input=text, model="text-embedding-ada-002")['data'][0]['embedding']
return [0.0] * 1536 # 占位符
def cosine_similarity(a: List[float], b: List[float]) -> float:
"""计算余弦相似度"""
import numpy as np
a = np.array(a)
b = np.array(b)
return np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b))
def extract_keywords(text: str) -> List[str]:
"""提取关键词(简化实现)"""
# 实际应使用NLP库如jieba、spacy
words = text.lower().split()
return [w for w in words if len(w) > 1]
3.3 Agent 集成(调用Skill的完整流程)
# agent.py
class AgentSkillSystem:
"""类似OpenCode/Claude Code的Agent系统"""
def __init__(
self,
llm, # LLM实例(OpenAI、Claude等)
skill_registry: SkillRegistry,
max_iterations: int = 10
):
self.llm = llm
self.skill_registry = skill_registry
self.max_iterations = max_iterations
self.conversation_history = []
def chat(self, user_message: str) -> str:
"""主对话流程"""
# 记录用户消息
self.conversation_history.append({"role": "user", "content": user_message})
# 循环:匹配Skill → 执行 → 生成回答
for iteration in range(self.max_iterations):
# 1. 判断是否需要调用Skill
skill = self._decide_skill(user_message)
if not skill:
# 直接回答
response = self.llm.generate(self._build_prompt())
self.conversation_history.append({"role": "assistant", "content": response})
return response
# 2. 提取参数
params = self._extract_parameters(user_message, skill)
# 3. 执行Skill
print(f"Executing skill: {skill.name} with params: {params}")
result = skill.execute(params)
if not result.is_success():
error_msg = f"Skill执行失败: {result.stderr}"
self.conversation_history.append({"role": "system", "content": error_msg})
return error_msg
# 4. 格式化结果并返回
formatted = self._format_result(result, skill)
self.conversation_history.append({"role": "assistant", "content": formatted})
return formatted
return "达到最大迭代次数,停止执行。"
def _decide_skill(self, query: str) -> Optional[Skill]:
"""决定是否调用Skill"""
# 方法1:规则判断
if any(kw in query for kw in ["搜索", "查询", "查找", "search", "find"]):
return self.skill_registry.get("search-web")
# 方法2:使用LLM + Function Calling
tools = [
{
"name": skill.name,
"description": skill.description,
"parameters": skill.parameters
}
for skill in self.skill_registry.list()
]
# 构造包含工具定义的prompt
prompt = self._build_prompt_with_tools(tools)
response = self.llm.chat(prompt) # 假设LLM支持function calling
# 解析LLM返回的工具调用
if response.has_tool_calls():
tool_name = response.tool_calls[0].name
return self.skill_registry.get(tool_name)
return None
def _extract_parameters(self, query: str, skill: Skill) -> dict:
"""从query提取参数"""
# 方法1:使用LLM提取
prompt = f"""
从用户查询中提取 {skill.name} 的参数。
Skill描述:{skill.description}
参数Schema:
{json.dumps(skill.parameters, indent=2)}
用户查询:{query}
提取参数(只输出JSON):
"""
response = self.llm.generate(prompt)
try:
return json.loads(response)
except json.JSONDecodeError:
# 尝试从响应中提取JSON部分
import re
match = re.search(r'\{.*\}', response, re.DOTALL)
if match:
return json.loads(match.group())
raise ValueError(f"Failed to parse parameters: {response}")
def _format_result(self, result: SkillResult, skill: Skill) -> str:
"""格式化Skill执行结果"""
if skill.output_template:
try:
data = json.loads(result.stdout) if result.stdout else {}
# 同时支持变量插值
return skill.output_template.format(**data)
except Exception as e:
print(f"Template formatting failed: {e}")
return result.stdout
return result.stdout
def _build_prompt(self) -> str:
"""构建对话prompt"""
prompt = "你是一个智能助手,可以使用以下技能:\n\n"
for skill in self.skill_registry.list():
prompt += f"- {skill.name}: {skill.description}\n"
prompt += "\n对话历史:\n"
for msg in self.conversation_history[-10:]: # 最近10条
role = "用户" if msg['role'] == 'user' else "助手"
prompt += f"{role}: {msg['content']}\n"
prompt += "\n助手:"
return prompt
def _build_prompt_with_tools(self, tools: List[dict]) -> str:
"""构建包含工具定义的prompt(类似OpenAI Function Calling)"""
tools_json = json.dumps(tools, ensure_ascii=False, indent=2)
prompt = f"""
你是一个智能助手,可以使用以下工具:
{tools_json}
根据用户查询,决定是否调用工具。如果需要调用,输出:
{{
"tool": "工具名",
"parameters": {{...}}
}}
如果不需要调用工具,直接回复用户。
用户查询:{{query}}
"""
return prompt
# 使用示例
def main():
# 1. 初始化
registry = SkillRegistry("./skills")
llm = OpenAI(model="gpt-4") # 或使用其他LLM
agent = AgentSkillSystem(llm, registry)
# 2. 对话循环
while True:
user_input = input("You: ")
if user_input.lower() in ['exit', 'quit']:
break
response = agent.chat(user_input)
print(f"Agent: {response}")
if __name__ == "__main__":
main()
四、Spring AI 集成实现
4.1 使用 @Tool 注解(Spring AI 0.8.0+)
// 1. 定义Skill为Spring Bean
@Service
public class WeatherService {
@Tool(description = "获取指定城市的天气信息")
public WeatherResponse getWeather(
@ToolParam(description = "城市名称,如:北京、上海") String city,
@ToolParam(description = "温度单位:celsius或fahrenheit", defaultValue = "celsius") String unit
) {
// 调用实际天气API
return weatherClient.getWeather(city, unit);
}
@Tool(description = "搜索网络信息")
public SearchResponse searchWeb(
@ToolParam(description = "搜索关键词") String query,
@ToolParam(description = "返回结果数量", defaultValue = "5") int numResults
) {
return searchService.search(query, numResults);
}
}
// 2. 自动注册到ChatClient
@Configuration
public class AiConfig {
@Bean
public ChatClient chatClient(
OpenAiChatModel chatModel,
WeatherService weatherService,
SearchService searchService) {
// Spring AI 0.8.0+ 自动扫描 @Tool 注解
// 或手动注册
List<FunctionCallback> tools = new ArrayList<>();
tools.addAll(ToolFunctionUtils.getToolCallbacks(weatherService));
tools.addAll(ToolFunctionUtils.getToolCallbacks(searchService));
return ChatClient.builder(chatModel)
.defaultFunctionCallbacks(tools)
.build();
}
}
// 3. 使用
@RestController
public class ChatController {
@Autowired
private ChatClient chatClient;
@PostMapping("/chat")
public String chat(@RequestBody ChatRequest request) {
return chatClient.call(request.getMessage());
}
}
// 用户查询:"北京天气怎么样?"
// 自动触发 getWeather(city="北京", unit="celsius")
4.2 动态加载Skill(从YAML文件)
// 1. 动态加载Skill定义并注册
@Component
public class DynamicSkillLoader {
@Autowired
private FunctionCallbackRegistry registry;
@Value("${skills.dir:skills}")
private String skillsDir;
@PostConstruct
public void loadSkills() throws IOException {
File dir = new File(skillsDir);
if (!dir.exists()) return;
for (File skillDir : dir.listFiles(File::isDirectory)) {
File skillFile = new File(skillDir, "skill.yaml");
if (skillFile.exists()) {
registerSkill(skillFile);
}
}
}
private void registerSkill(File yamlFile) throws IOException {
Yaml yaml = new Yaml();
Map<String, Object> skillData = yaml.load(new FileInputStream(yamlFile));
String name = (String) skillData.get("name");
String description = (String) skillData.get("description");
Map<String, Object> params = (Map<String, Object>) skillData.get("parameters");
Map<String, Object> executor = (Map<String, Object>) skillData.get("executor");
FunctionCallback callback = FunctionCallback.builder()
.withName(name)
.withDescription(description)
.withInputType(createParameterClass(params)) // 动态生成或使用Map
.withInvoker(request -> executeSkill(executor, request))
.build();
registry.register(callback);
}
private Object executeSkill(Map<String, Object> executor, Object request) {
String type = (String) executor.get("type");
if ("http".equals(type)) {
return executeHttpSkill(executor, request);
} else if ("python".equals(type)) {
return executePythonSkill(executor, request);
}
throw new IllegalArgumentException("Unsupported executor type: " + type);
}
private String executeHttpSkill(Map<String, Object> executor, Object request) {
String endpoint = (String) executor.get("endpoint");
String method = (String) executor.get("method");
Map<String, Object> bodyTemplate = (Map<String, Object>) executor.get("body_template");
// 将请求参数映射到模板
Map<String, Object> body = new HashMap<>();
if (bodyTemplate != null) {
// 实现模板替换逻辑
body = renderTemplate(bodyTemplate, request);
} else {
body = toMap(request);
}
// 发送HTTP请求
return restTemplate.postForObject(endpoint, body, String.class);
}
private String executePythonSkill(Map<String, Object> executor, Object request)
throws IOException, InterruptedException {
String script = (String) executor.get("script");
// 将参数写入JSON文件
String paramFile = createTempJsonFile(request);
ProcessBuilder pb = new ProcessBuilder("python", script, paramFile);
Process process = pb.start();
String stdout = new String(process.getInputStream().readAllBytes(), StandardCharsets.UTF_8);
String stderr = new String(process.getErrorStream().readAllBytes(), StandardCharsets.UTF_8);
int exitCode = process.waitFor();
if (exitCode != 0) {
throw new RuntimeException("Python script failed: " + stderr);
}
return stdout;
}
}
// 2. ChatClient自动使用已注册的Skill
@Bean
public ChatClient chatClient(OpenAiChatModel chatModel, FunctionCallbackRegistry registry) {
return ChatClient.builder(chatModel)
.defaultFunctionCallbacks(registry.getAllCallbacks())
.build();
}
4.3 参数类型处理
// 方案1:使用Map作为通用参数类型
@Bean
public FunctionCallback genericSkillCallback(SkillDefinition skill) {
return FunctionCallback.builder()
.withName(skill.getName())
.withDescription(skill.getDescription())
.withInputType(Map.class) // 通用Map
.withInvoker(params -> {
// params 是 Map<String, Object>
return invokeSkill(skill, params);
})
.build();
}
// 方案2:动态生成POJO类(复杂,需使用字节码生成或反射)
// 建议:使用Map或定义通用接口
public interface SkillParameters {
Map<String, Object> toMap();
}
// 方案3:为每个Skill定义具体的Request类
public class WeatherRequest {
private String city;
private String unit = "celsius";
// getters & setters
}
@Bean
public FunctionCallback weatherSkill() {
return FunctionCallback.builder()
.withName("get_weather")
.withDescription("Get weather for a city")
.withInputType(WeatherRequest.class)
.withInvoker(req -> {
WeatherRequest request = (WeatherRequest) req;
return weatherService.getWeather(request.getCity(), request.getUnit());
})
.build();
}
五、LangChain4J 集成实现
5.1 实现 Tool 接口
// 1. 自定义Tool
public class WeatherTool implements Tool {
private final WeatherService weatherService;
public WeatherTool(WeatherService weatherService) {
this.weatherService = weatherService;
}
@Override
public String name() {
return "get_weather";
}
@Override
public String description() {
return "获取指定城市的当前天气信息";
}
@Override
public String execute(Map<String, Object> parameters) {
String city = (String) parameters.get("city");
String unit = (String) parameters.getOrDefault("unit", "celsius");
try {
Weather weather = weatherService.getWeather(city, unit);
return String.format(
"城市:%s\n温度:%d°%s\n天气:%s\n湿度:%d%%",
weather.getCity(), weather.getTemp(), unit,
weather.getCondition(), weather.getHumidity()
);
} catch (Exception e) {
return "查询天气失败:" + e.getMessage();
}
}
@Override
public ToolParameter[] parameters() {
return new ToolParameter[]{
ToolParameter.builder()
.name("city")
.description("城市名称,如:北京、上海、New York")
.type(ToolParameterType.STRING)
.required(true)
.build(),
ToolParameter.builder()
.name("unit")
.description("温度单位:celsius 或 fahrenheit")
.type(ToolParameterType.STRING)
.required(false)
.defaultValue("celsius")
.build()
};
}
}
// 2. 注册并创建Agent
@Service
public class AgentService {
@Autowired
private WeatherService weatherService;
public String chat(String userMessage) {
// 构建Tool列表
List<Tool> tools = List.of(
new WeatherTool(weatherService),
new SearchTool(searchService),
new CalculatorTool()
);
// 创建Agent
ChatLanguageModel model = new OpenAiChatModel(
OpenAiChatModelOptions.builder()
.apiKey(System.getenv("OPENAI_API_KEY"))
.modelName("gpt-4-turbo-preview")
.build()
);
Agent agent = Agent.builder()
.chatLanguageModel(model)
.tools(tools)
.maxIterations(10)
.build();
// 执行
AgentExecutionResult result = agent.execute(userMessage);
return result.outputText();
}
}
5.2 动态加载Tool(从YAML)
@Component
public class DynamicToolLoader {
private final ToolRegistry registry = new SimpleToolRegistry();
private final RestTemplate restTemplate = new RestTemplate();
@Value("${skills.dir:skills}")
private String skillsDir;
@PostConstruct
public void init() throws IOException {
loadTools();
}
@Scheduled(fixedDelay = 300000) // 每5分钟刷新
public void reload() throws IOException {
registry.clear();
loadTools();
}
private void loadTools() throws IOException {
File dir = new File(skillsDir);
if (!dir.exists()) return;
for (File skillDir : dir.listFiles(File::isDirectory)) {
File skillFile = new File(skillDir, "skill.yaml");
if (skillFile.exists()) {
Tool tool = loadTool(skillFile);
registry.register(tool);
}
}
}
private Tool loadTool(File yamlFile) throws IOException {
Yaml yaml = new Yaml();
Map<String, Object> skill = yaml.load(new FileInputStream(yamlFile));
String name = (String) skill.get("name");
String description = (String) skill.get("description");
Map<String, Object> executor = (Map<String, Object>) skill.get("executor");
return createTool(name, description, executor);
}
private Tool createTool(String name, String description, Map<String, Object> executor) {
String type = (String) executor.get("type");
if ("http".equals(type)) {
return createHttpTool(name, description, executor);
} else if ("java".equals(type)) {
return createJavaMethodTool(name, description, executor);
}
throw new IllegalArgumentException("Unknown executor type: " + type);
}
private Tool createHttpTool(String name, String description, Map<String, Object> executor) {
String endpoint = (String) executor.get("endpoint");
String method = (String) executor.get("method", "POST");
Map<String, Object> paramsSchema = (Map<String, Object>) executor.get("parameters");
return HttpRequestTool.builder()
.name(name)
.description(description)
.url(endpoint)
.httpMethod(HttpMethod.valueOf(method))
.queryParams(extractQueryParams(paramsSchema))
.headers(extractHeaders(executor))
.build();
}
private Map<String, String> extractQueryParams(Map<String, Object> schema) {
Map<String, String> params = new HashMap<>();
if (schema == null) return params;
Map<String, Object> properties = (Map<String, Object>) schema.get("properties");
if (properties == null) return params;
for (String key : properties.keySet()) {
params.put(key, "${" + key + "}"); // 占位符
}
return params;
}
public ToolRegistry getRegistry() {
return registry;
}
}
// 使用
@Service
public class ChatService {
@Autowired
private DynamicToolLoader toolLoader;
@Autowired
private ChatLanguageModel chatModel;
public String chat(String userMessage) {
Agent agent = Agent.builder()
.chatLanguageModel(chatModel)
.tools(toolLoader.getRegistry().getAll())
.maxIterations(10)
.build();
AgentExecutionResult result = agent.execute(userMessage);
return result.outputText();
}
}
六、统一Skill网关(企业级方案)
6.1 架构设计
┌─────────────────┐
│ Skill Gateway │ (OpenAI兼容接口)
│ (Spring Boot) │
└────────┬────────┘
│ OpenAI Function Calling
┌─────────────┼─────────────┐
│ │ │
┌──────▼──────┐ ┌────▼─────┐ ┌───▼────┐
│ Skill A │ │ Skill B │ │ External│
│ (HTTP) │ │ (Python) │ │ Platform│
└─────────────┘ └──────────┘ └─────────┘
┌──────────────────────────────────────┐
│ Skill Management API │
│ (列出Skill、动态注册、热加载) │
└──────────────────────────────────────┘
Skill Gateway 功能:
- 统一入口:提供 OpenAI 兼容接口
- 动态注册:从文件系统或API加载Skill
- 协议转换:OpenAI Function Call → 平台特定协议
- 认证鉴权:统一权限控制
- 监控审计:调用日志、性能指标
- 熔断降级:Skill故障隔离
- 缓存:缓存Skill执行结果
6.2 Skill Gateway 实现
@SpringBootApplication
public class SkillGatewayApplication {
public static void main(String[] args) {
SpringApplication.run(SkillGatewayApplication.class, args);
}
// 动态加载所有Skill并注册为FunctionCallback
@Bean
public FunctionCallbackRegistry functionCallbackRegistry(
DynamicSkillLoader skillLoader) {
FunctionCallbackRegistry registry = new FunctionCallbackRegistry();
// 加载所有Skill
List<SkillDefinition> definitions = skillLoader.loadAllSkills();
for (SkillDefinition def : definitions) {
FunctionCallback callback = createCallback(def, skillLoader);
registry.register(callback);
}
return registry;
}
private FunctionCallback createCallback(
SkillDefinition def,
DynamicSkillLoader skillLoader) {
return FunctionCallback.builder()
.withName(def.getName())
.withDescription(def.getDescription())
.withInputType(createParameterType(def.getParameters()))
.withInvoker(params -> {
// 调用Skill执行器
return skillLoader.execute(def.getName(), params);
})
.build();
}
}
// OpenAI兼容接口
@RestController
@RequestMapping("/v1")
public class OpenAICompatibleController {
@Autowired
private ChatModel chatModel; // Spring AI ChatModel
@PostMapping("/chat/completions")
public ChatResponse chat(@RequestBody OpenAIChatRequest request) {
// 自动处理Function Calling
return chatModel.chat(request.toChatRequest());
}
// 列出所有可用的Skill(工具)
@GetMapping("/skills")
public List<FunctionDefinition> listSkills() {
return chatModel.getTools(); // 返回注册的所有FunctionDefinition
}
// Skill执行接口(供ChatModel回调使用)
@PostMapping("/skills/{name}/execute")
public SkillExecuteResponse executeSkill(
@PathVariable String name,
@RequestBody Map<String, Object> parameters) {
Object result = skillExecutor.execute(name, parameters);
return SkillExecuteResponse.success(result);
}
}
// Skill管理端点
@RestController
@RequestMapping("/admin/skills")
public class SkillManagementController {
@Autowired
private DynamicSkillLoader skillLoader;
@GetMapping
public List<SkillInfo> list() {
return skillLoader.listSkills();
}
@PostMapping("/reload")
public ResponseEntity<String> reload() throws IOException {
skillLoader.reload();
return ResponseEntity.ok("Skills reloaded");
}
@PostMapping("/register")
public ResponseEntity<?> register(@RequestBody SkillDefinition definition) {
skillLoader.register(definition);
return ResponseEntity.ok("Skill registered");
}
}
// 配置文件
@ConfigurationProperties(prefix = "skill.gateway")
public class SkillGatewayProperties {
private boolean authEnabled = true;
private String jwtSecret;
private String skillsDir = "./skills";
private boolean enableAudit = true;
// getters & setters
}
6.3 客户端调用示例
// 任何支持OpenAI SDK的客户端都可以调用Skill Gateway
public class SkillGatewayClient {
public static void main(String[] args) {
OpenAiChatModel model = new OpenAiChatModel(
OpenAiChatModelOptions.builder()
.baseUrl("http://localhost:8080/v1") // Skill Gateway地址
.apiKey("sk-dummy") // 认证密钥
.modelName("auto") // 自动匹配Skill
.build()
);
String response = model.generate("搜索最新的Java新闻");
System.out.println(response);
// 或使用ChatClient
ChatClient client = ChatClient.create(model);
String chatResponse = client.call("发布一篇文章到博客,标题是'Hello World'");
System.out.println(chatResponse);
}
}
6.4 监控与审计
// 审计切面
@Aspect
@Component
public class SkillAuditAspect {
@Autowired
private AuditLogService auditLogService;
@Around("@annotation(org.springframework.ai.tool.Skill)")
public Object auditSkillExecution(ProceedingJoinPoint pjp) throws Throwable {
String skillName = pjp.getSignature().getName();
String userId = SecurityContext.getCurrentUserId();
Map<String, Object> args = extractArguments(pjp);
long start = System.currentTimeMillis();
Object result = pjp.proceed();
long duration = System.currentTimeMillis() - start;
auditLogService.log(SkillAudit.builder()
.userId(userId)
.skillName(skillName)
.parameters(args)
.success(result != null && !(result instanceof ErrorResult))
.duration(duration)
.timestamp(Instant.now())
.build());
return result;
}
}
// 监控指标
@Component
public class SkillMetrics {
private final MeterRegistry meterRegistry;
public SkillMetrics(MeterRegistry meterRegistry) {
this.meterRegistry = meterRegistry;
}
@CounterMetric(name = "skill.execution.count", tags = {"skill", "${skillName}"})
public void incrementExecutionCount(String skillName) {
// Micrometer自动记录
}
@TimerMetric(name = "skill.execution.duration", tags = {"skill", "${skillName}"})
public void recordDuration(String skillName, long duration) {
// 记录耗时
}
}
七、对比总结与实际选型
7.1 四种方案对比
| 维度 | OpenCode | Claude Code | 自定义实现(推荐) | Spring AI | LangChain4J |
|---|---|---|---|---|---|
| Skill定义 | Markdown/YAML | OpenAI Function | YAML(标准) | @Tool注解/FunctionCallback |
Tool接口 |
| 匹配方式 | 规则+Embedding | LLM自动选择 | LLM自动选择 | LLM自动选择 | LLM自动选择 |
| 执行方式 | Python脚本/HTTP | 工具调用 | HTTP/Python/Java | Java方法/HTTP | Java方法/HTTP |
| 参数提取 | LLM | LLM | LLM | LLM(OpenAI) | LLM |
| 热加载 | ✅(文件监控) | ❌ | ✅ | ✅(@RefreshScope) | ✅(reload registry) |
| 权限控制 | ❌ | ❌ | ✅(需实现) | ✅(Spring Security) | ✅(自定义) |
| 监控审计 | 日志 | 日志 | ✅(需实现) | ✅(Callback) | ✅(Callback) |
| 多平台对接 | 单一类型 | 单一类型 | ✅(网关模式) | ✅(FunctionCallback) | ✅(Tool) |
| 易用性 | ⭐⭐⭐⭐ | ⭐⭐⭐⭐⭐ | ⭐⭐⭐ | ⭐⭐⭐⭐⭐ | ⭐⭐⭐⭐ |
| 生产就绪 | ⭐⭐⭐ | ⭐⭐⭐⭐⭐ | ⭐⭐⭐⭐ | ⭐⭐⭐⭐⭐ | ⭐⭐⭐⭐ |
7.2 技术选型建议
场景1:快速验证,已有Skill平台
推荐:使用OpenAI兼容API直接调用
实现:Spring AI ChatClient 或 LangChain4J ChatLanguageModel
配置baseUrl指向Skill平台即可
场景2:Spring Boot项目,自建Skill
推荐:Spring AI + @Tool注解
优势:与Spring生态无缝集成、类型安全、生产特性齐全
场景3:复杂Agent工作流,多工具选择
推荐:LangChain4J + Tool
优势:Agent能力更强(ReAct、多步推理)、回调完善
场景4:多租户Skill平台,统一管理
推荐:自建Skill Gateway(OpenAI兼容接口)
优势:统一认证、协议转换、监控、多平台整合
7.3 实施路线图
阶段1:定义标准
- 确定Skill YAML格式(参考上文)
- 定义参数Schema(JSON Schema)
- 确定执行器类型(HTTP/Python/Java)
阶段2:实现注册中心
- 扫描skills目录或API拉取
- 生成Skill对象
- 计算embedding用于语义匹配
阶段3:集成到LLM调用链
- Spring AI:注册FunctionCallback到ChatClient
- LangChain4J:注册Tool到ToolRegistry/Agent
阶段4:生产特性
- 权限控制(Spring Security或自定义拦截器)
- 审计日志(AOP记录Skill调用)
- 监控指标(Micrometer指标)
- 熔断降级(Resilience4j)
- 缓存(Caffeine/Redis)
阶段5:Skill网关(可选)
- 提供OpenAI兼容API
- 动态注册/卸载Skill
- 多租户隔离
- 计费与配额管理
八、完整示例:自建Skill网关
// application.yml
skill:
gateway:
port: 8080
skills-dir: ./skills
auth:
enabled: true
jwt-secret: ${JWT_SECRET}
metrics:
enabled: true
// Skill网关主类
@SpringBootApplication
@EnableDiscoveryClient
public class SkillGatewayApplication {
public static void main(String[] args) {
SpringApplication.run(SkillGatewayApplication.class, args);
}
}
// 动态加载Skill
@Component
public class SkillAutoConfiguration {
@Bean
public FunctionCallbackRegistry functionCallbackRegistry(
@Value("${skill.gateway.skills-dir}") String skillsDir,
RestTemplate restTemplate) throws IOException {
FunctionCallbackRegistry registry = new FunctionCallbackRegistry();
File dir = new File(skillsDir);
if (dir.exists()) {
for (File skillDir : dir.listFiles(File::isDirectory)) {
File skillFile = new File(skillDir, "skill.yaml");
if (skillFile.exists()) {
SkillDefinition def = loadSkillDefinition(skillFile);
FunctionCallback callback = createCallback(def, restTemplate);
registry.register(callback);
}
}
}
return registry;
}
}
// OpenAI兼容接口
@RestController
@RequestMapping("/v1")
public class OpenAICompatibleController {
@Autowired
private ChatModel chatModel;
@PostMapping("/chat/completions")
public ChatResponse chat(@RequestBody OpenAIChatRequest request) {
return chatModel.chat(request.toChatRequest());
}
@GetMapping("/models")
public ModelsResponse models() {
// 返回所有Skill作为model
return ModelsResponse.of(chatModel.getTools().stream()
.map(tool -> new Model(tool.getName(), tool.getDescription()))
.collect(Collectors.toList())
);
}
}
// 客户端示例(任何语言)
public class Client {
public static void main(String[] args) {
// Java
OpenAiChatModel model = new OpenAiChatModel(
OpenAiChatModelOptions.builder()
.baseUrl("http://localhost:8080/v1")
.apiKey("sk-dummy")
.modelName("search-web") // 指定Skill
.build()
);
String response = model.generate("搜索Java最新版本");
System.out.println(response);
// Python
// from openai import OpenAI
// client = OpenAI(base_url="http://localhost:8080/v1", api_key="sk-dummy")
// response = client.chat.completions.create(model="search", messages=[...])
}
}
九、最佳实践与陷阱
9.1 最佳实践
✅ Skill设计
- 参数明确,使用JSON Schema详细定义
- 支持默认值,减少必填参数
- 提供输出模板,格式化结果
- 幂等性设计,相同输入相同输出
- 快速失败,参数验证前置
✅ 错误处理
public Object invokeSkill(SkillDefinition skill, Map<String, Object> params) {
try {
// 参数验证
validate(skill.getParameters(), params);
// 执行
Object result = doInvoke(skill, params);
// 结果验证
validateOutput(skill.getOutputSchema(), result);
return result;
} catch (ValidationException e) {
log.warn("Skill {} parameter validation failed: {}", skill.getName(), e.getMessage());
return Map.of("error", "参数错误:" + e.getMessage());
} catch (Exception e) {
log.error("Skill {} execution failed", skill.getName(), e);
return Map.of("error", "执行失败,请稍后重试");
}
}
✅ 性能优化
- HTTP请求池化
- 设置合理的超时
- 实现缓存(Redis)
- 批量操作支持(如batch_search)
✅ 安全考虑
- 参数白名单验证
- 敏感信息脱敏
- 调用频率限制(Rate Limit)
- 权限控制(RBAC)
9.2 常见陷阱
❌ 陷阱1:参数定义模糊
# 错误
parameters:
- name: data
description: 数据
# 正确
parameters:
- name: query
type: string
description: 搜索关键词,100字符以内
maxLength: 100
❌ 陷阱2:无超时控制
// 错误
String result = restTemplate.postForObject(url, params, String.class);
// 正确
RequestConfig config = RequestConfig.custom()
.setConnectTimeout(5000)
.setSocketTimeout(10000)
.build();
❌ 陷阱3:忽略错误处理
# 错误
def execute_skill(params):
return requests.post(url, json=params).json()
# 正确
def execute_skill(params):
try:
resp = requests.post(url, json=params, timeout=10)
resp.raise_for_status()
return resp.json()
except requests.Timeout:
return {"error": "请求超时"}
except requests.RequestException as e:
return {"error": f"请求失败:{str(e)}"}
❌ 陷阱4:无缓存导致重复调用
// 添加缓存
@Cacheable(value = "skill-results", key = "#skillName + ':' + #params.hashCode()")
public Object executeSkill(String skillName, Map<String, Object> params) {
return actualExecute(skillName, params);
}
十、总结
OpenCode 模式:
- 文件系统扫描(skills目录)
- YAML/Markdown定义Skill
- Python脚本或HTTP执行
- LLM提取参数并匹配
Claude Code 模式:
- 预定义工具列表
- Function Calling API
- 自动工具选择与执行
- 结果回传生成最终答案
自定义实现(推荐):
- 定义标准:YAML格式,包含name、description、parameters、executor、output_template
- 注册中心:扫描目录/API拉取,计算embedding用于语义匹配
- 执行器:支持HTTP、Python、Java三种方式
- LLM集成:
- Spring AI:
FunctionCallback注册,ChatClient自动调用 - LangChain4J:实现
Tool接口,注册到ToolRegistry
- Spring AI:
- 生产特性:权限、审计、监控、缓存、熔断
Skill网关(企业级):
- 提供OpenAI兼容接口(
/v1/chat/completions) - 动态加载Skill并转换为Function Calling
- 统一认证、监控、多租户
核心要点:
- ✅ 使用OpenAI Function Calling标准(事实标准)
- ✅ LLM自动匹配Skill和提取参数
- ✅ 支持多种执行方式(HTTP最通用)
- ✅ 生产环境必须:权限、审计、监控、缓存
- ✅ 热加载支持Skill动态更新
文档版本:v1.0
最后更新:2025年3月
评论区