MCPserver2
This commit is contained in:
@@ -7,20 +7,105 @@ import json
|
||||
# 1. --- 导入库 ---
|
||||
# mcp 库已通过 'pip install -e' 安装,无需修改 sys.path
|
||||
from mcp.server.fastmcp import FastMCP
|
||||
# 新增:阿里云模型服务和向量计算库
|
||||
import dashscope
|
||||
from dashscope.api_entities.dashscope_response import Role
|
||||
from tenacity import retry, stop_after_attempt, wait_random_exponential
|
||||
from sklearn.metrics.pairwise import cosine_similarity
|
||||
import numpy as np
|
||||
from dotenv import load_dotenv
|
||||
|
||||
# 2. --- 日志和知识库配置 ---
|
||||
# 2. --- 日志、缓存和知识库配置 ---
|
||||
MCP_DIR = os.path.dirname(os.path.abspath(__file__))
|
||||
LOG_FILE_PATH = os.path.join(MCP_DIR, 'mcpserver.log')
|
||||
CACHE_DIR = os.path.join(MCP_DIR, 'vector_cache')
|
||||
CACHE_FILE_PATH = os.path.join(CACHE_DIR, 'knowledge_cache.json')
|
||||
os.makedirs(CACHE_DIR, exist_ok=True)
|
||||
|
||||
logging.basicConfig(filename=LOG_FILE_PATH, level=logging.INFO,
|
||||
format='%(asctime)s - %(levelname)s - %(message)s',
|
||||
encoding='utf-8')
|
||||
format='%(asctime)s - %(levelname)s - %(message)s',
|
||||
encoding='utf-8')
|
||||
|
||||
# 新增: 加载 .env 文件并设置 API Key
|
||||
# 指定 .env 文件的确切路径,以确保脚本在任何工作目录下都能正确加载
|
||||
env_path = os.path.join(MCP_DIR, '.env')
|
||||
load_dotenv(dotenv_path=env_path)
|
||||
|
||||
dashscope.api_key = os.getenv("DASHSCOPE_API_KEY")
|
||||
|
||||
if not dashscope.api_key:
|
||||
logging.error("错误:未在 .env 文件中找到或加载 DASHSCOPE_API_KEY。")
|
||||
# 如果没有Key,服务器无法工作,可以选择退出或继续运行但功能受限
|
||||
# sys.exit("错误:API Key 未配置。")
|
||||
else:
|
||||
logging.info("成功加载 DASHSCOPE_API_KEY。")
|
||||
|
||||
# 定义知识库路径
|
||||
KNOWLEDGE_BASE_PATHS = [
|
||||
r"C:\Steam\steamapps\common\RimWorld\Data"
|
||||
r"C:\Steam\steamapps\common\RimWorld\Data"
|
||||
]
|
||||
|
||||
# 4. --- 核心功能函数 ---
|
||||
# 3. --- 缓存管理 ---
|
||||
def load_cache():
|
||||
"""加载缓存文件"""
|
||||
if os.path.exists(CACHE_FILE_PATH):
|
||||
try:
|
||||
with open(CACHE_FILE_PATH, 'r', encoding='utf-8') as f:
|
||||
return json.load(f)
|
||||
except (json.JSONDecodeError, IOError) as e:
|
||||
logging.error(f"读取缓存文件失败: {e}")
|
||||
return {}
|
||||
return {}
|
||||
|
||||
def save_cache(cache_data):
|
||||
"""保存缓存到文件"""
|
||||
try:
|
||||
with open(CACHE_FILE_PATH, 'w', encoding='utf-8') as f:
|
||||
json.dump(cache_data, f, ensure_ascii=False, indent=4)
|
||||
except IOError as e:
|
||||
logging.error(f"写入缓存文件失败: {e}")
|
||||
|
||||
# 加载初始缓存
|
||||
knowledge_cache = load_cache()
|
||||
|
||||
# 4. --- 向量化与相似度计算 ---
|
||||
@retry(wait=wait_random_exponential(min=1, max=60), stop=stop_after_attempt(6))
|
||||
def get_embedding(text: str):
|
||||
"""获取文本的向量嵌入"""
|
||||
try:
|
||||
# 根据用户文档,选用v4模型,更适合代码和文本
|
||||
response = dashscope.TextEmbedding.call(
|
||||
model='text-embedding-v4',
|
||||
input=text
|
||||
)
|
||||
if response.status_code == 200:
|
||||
return response.output['embeddings'][0]['embedding']
|
||||
else:
|
||||
logging.error(f"获取向量失败: {response.message}")
|
||||
return None
|
||||
except Exception as e:
|
||||
logging.error(f"调用向量API时出错: {e}", exc_info=True)
|
||||
raise
|
||||
|
||||
def find_most_similar_file(question_embedding, file_embeddings):
|
||||
"""在文件向量中找到与问题向量最相似的一个"""
|
||||
if not question_embedding or not file_embeddings:
|
||||
return None
|
||||
|
||||
# 将文件嵌入列表转换为NumPy数组
|
||||
file_vectors = np.array([emb['embedding'] for emb in file_embeddings])
|
||||
question_vector = np.array(question_embedding).reshape(1, -1)
|
||||
|
||||
# 计算余弦相似度
|
||||
similarities = cosine_similarity(question_vector, file_vectors)[0]
|
||||
|
||||
# 找到最相似的文件的索引
|
||||
most_similar_index = np.argmax(similarities)
|
||||
|
||||
# 返回最相似的文件路径
|
||||
return file_embeddings[most_similar_index]['path']
|
||||
|
||||
# 5. --- 核心功能函数 ---
|
||||
def find_files_with_keyword(roots, keyword, extensions=['.xml', '.cs', '.txt']):
|
||||
"""在指定目录中查找包含关键字的文件名和内容。"""
|
||||
found_files = []
|
||||
@@ -97,35 +182,75 @@ mcp = FastMCP(
|
||||
|
||||
@mcp.tool()
|
||||
def get_context(question: str) -> str:
|
||||
"""
|
||||
根据问题中的关键词,在RimWorld知识库中搜索相关的XML或C#文件。
|
||||
返回找到的文件路径列表。
|
||||
"""
|
||||
logging.info(f"收到问题: {question}")
|
||||
keyword = find_keyword_in_question(question)
|
||||
if not keyword:
|
||||
logging.warning("无法从问题中提取关键词。")
|
||||
return "无法从问题中提取关键词,请提供更具体的信息。"
|
||||
"""
|
||||
根据问题中的关键词和向量相似度,在RimWorld知识库中搜索最相关的XML或C#文件。
|
||||
返回最匹配的文件路径。
|
||||
"""
|
||||
logging.info(f"收到问题: {question}")
|
||||
keyword = find_keyword_in_question(question)
|
||||
if not keyword:
|
||||
logging.warning("无法从问题中提取关键词。")
|
||||
return "无法从问题中提取关键词,请提供更具体的信息。"
|
||||
|
||||
logging.info(f"提取到关键词: {keyword}")
|
||||
|
||||
try:
|
||||
found_files = find_files_with_keyword(KNOWLEDGE_BASE_PATHS, keyword)
|
||||
if not found_files:
|
||||
logging.info(f"未找到与 '{keyword}' 相关的文件。")
|
||||
return f"未在知识库中找到与 '{keyword}' 相关的文件定义。"
|
||||
|
||||
logging.info(f"找到了 {len(found_files)} 个相关文件。")
|
||||
# 将文件列表格式化为字符串返回
|
||||
context = f"根据关键词 '{keyword}',在知识库中找到了以下 {len(found_files)} 个相关文件:\n\n" + "\n".join(found_files)
|
||||
return context
|
||||
except Exception as e:
|
||||
logging.error(f"处理请求时发生意外错误: {e}", exc_info=True)
|
||||
return f"处理您的请求时发生错误: {e}"
|
||||
logging.info(f"提取到关键词: {keyword}")
|
||||
|
||||
# 1. 检查缓存
|
||||
if keyword in knowledge_cache:
|
||||
cached_path = knowledge_cache[keyword]
|
||||
logging.info(f"缓存命中: 关键词 '{keyword}' -> {cached_path}")
|
||||
return f"根据知识库缓存,与 '{keyword}' 最相关的定义文件是:\n{cached_path}"
|
||||
|
||||
logging.info(f"缓存未命中,开始实时搜索: {keyword}")
|
||||
|
||||
# 2. 关键词文件搜索 (初步筛选)
|
||||
try:
|
||||
candidate_files = find_files_with_keyword(KNOWLEDGE_BASE_PATHS, keyword)
|
||||
if not candidate_files:
|
||||
logging.info(f"未找到与 '{keyword}' 相关的文件。")
|
||||
return f"未在知识库中找到与 '{keyword}' 相关的文件定义。"
|
||||
|
||||
logging.info(f"找到 {len(candidate_files)} 个候选文件,开始向量化处理...")
|
||||
|
||||
# 3. 向量化和相似度计算 (精准筛选)
|
||||
question_embedding = get_embedding(question)
|
||||
if not question_embedding:
|
||||
return "无法生成问题向量,请检查API连接或问题内容。"
|
||||
|
||||
file_embeddings = []
|
||||
for file_path in candidate_files:
|
||||
try:
|
||||
with open(file_path, 'r', encoding='utf-8') as f:
|
||||
content = f.read()
|
||||
# v4模型支持更长的输入
|
||||
file_embedding = get_embedding(content[:8000])
|
||||
if file_embedding:
|
||||
file_embeddings.append({'path': file_path, 'embedding': file_embedding})
|
||||
except Exception as e:
|
||||
logging.error(f"处理文件 {file_path} 时出错: {e}")
|
||||
|
||||
if not file_embeddings:
|
||||
return "无法为任何候选文件生成向量。"
|
||||
|
||||
# 找到最相似的文件
|
||||
best_match_path = find_most_similar_file(question_embedding, file_embeddings)
|
||||
|
||||
if not best_match_path:
|
||||
return "计算向量相似度失败。"
|
||||
|
||||
# 4. 更新缓存并返回结果
|
||||
logging.info(f"向量搜索完成。最匹配的文件是: {best_match_path}")
|
||||
knowledge_cache[keyword] = best_match_path
|
||||
save_cache(knowledge_cache)
|
||||
|
||||
return f"根据向量相似度分析,与 '{keyword}' 最相关的定义文件是:\n{best_match_path}"
|
||||
|
||||
except Exception as e:
|
||||
logging.error(f"处理请求时发生意外错误: {e}", exc_info=True)
|
||||
return f"处理您的请求时发生错误: {e}"
|
||||
|
||||
# 6. --- 启动服务器 ---
|
||||
# FastMCP 实例可以直接运行
|
||||
if __name__ == "__main__":
|
||||
logging.info("RimWorld 本地知识库 (FastMCP版, v1.2 关键词修正) 正在启动...")
|
||||
# 使用 'stdio' 传输协议
|
||||
mcp.run(transport="stdio")
|
||||
logging.info("RimWorld 向量知识库 (FastMCP版, v2.1-v4-model) 正在启动...")
|
||||
# 使用 'stdio' 传输协议
|
||||
mcp.run(transport="stdio")
|
||||
1
Source/MCP/python-sdk
Submodule
1
Source/MCP/python-sdk
Submodule
Submodule Source/MCP/python-sdk added at 959d4e39ae
Reference in New Issue
Block a user