暂存
This commit is contained in:
@@ -3,6 +3,7 @@ import os
|
||||
import sys
|
||||
import logging
|
||||
import json
|
||||
import re
|
||||
|
||||
# 1. --- 导入库 ---
|
||||
# mcp 库已通过 'pip install -e' 安装,无需修改 sys.path
|
||||
@@ -208,10 +209,10 @@ def extract_xml_def(lines, start_index):
|
||||
return ""
|
||||
|
||||
# 5. --- 核心功能函数 ---
|
||||
def find_files_with_keyword(roots, keyword, extensions=['.xml', '.cs', '.txt']):
|
||||
"""在指定目录中查找包含关键字的文件名和内容。"""
|
||||
found_files = []
|
||||
keyword_lower = keyword.lower()
|
||||
def find_files_with_keyword(roots, keywords: list[str], extensions=['.xml', '.cs', '.txt']):
|
||||
"""在指定目录中查找包含任何一个关键字的文件。"""
|
||||
found_files = set()
|
||||
keywords_lower = [k.lower() for k in keywords]
|
||||
for root_path in roots:
|
||||
if not os.path.isdir(root_path):
|
||||
logging.warning(f"知识库路径不存在或不是一个目录: {root_path}")
|
||||
@@ -222,58 +223,68 @@ def find_files_with_keyword(roots, keyword, extensions=['.xml', '.cs', '.txt']):
|
||||
file_path = os.path.join(dirpath, filename)
|
||||
try:
|
||||
with open(file_path, 'r', encoding='utf-8') as f:
|
||||
content = f.read()
|
||||
# 使用不区分大小写的子字符串搜索
|
||||
if keyword_lower in content.lower():
|
||||
found_files.append(file_path)
|
||||
content_lower = f.read().lower()
|
||||
# 如果任何一个关键词在内容中,就添加文件
|
||||
if any(kw in content_lower for kw in keywords_lower):
|
||||
found_files.add(file_path)
|
||||
except Exception as e:
|
||||
logging.error(f"读取文件时出错 {file_path}: {e}")
|
||||
return found_files
|
||||
return list(found_files)
|
||||
|
||||
def find_keyword_in_question(question: str) -> str:
|
||||
"""从问题中提取最有可能的单个关键词 (通常是类型名或defName)。"""
|
||||
# 排除常见但非特定的术语
|
||||
excluded_keywords = {"XML", "C#", "DEF", "CS"}
|
||||
def find_keywords_in_question(question: str) -> list[str]:
|
||||
"""从问题中提取所有可能的关键词 (类型名, defName等)。"""
|
||||
# 正则表达式优先,用于精确匹配定义
|
||||
# 匹配 C# class, struct, enum, interface 定义, 例如 "public class MyClass : Base"
|
||||
csharp_def_pattern = re.compile(r'\b(?:public|private|internal|protected|sealed|abstract|static|new)\s+(?:class|struct|enum|interface)\s+([A-Za-z_][A-Za-z0-9_]*)')
|
||||
# 匹配 XML Def, 例如 "<ThingDef Name="MyDef">" or "<MyCustomDef>"
|
||||
xml_def_pattern = re.compile(r'<([A-Za-z_][A-Za-z0-9_]*Def)\b')
|
||||
|
||||
# 使用更精确的规则来识别关键词
|
||||
# 启发式规则,用于匹配独立的关键词
|
||||
# 规则1: 包含下划线 (很可能是 defName)
|
||||
# 规则2: 混合大小写 (很可能是 C# 类型名)
|
||||
# 规则3: 全大写但不在排除列表中
|
||||
# 规则3: 多个大写字母(例如 CompPsychicScaling,但要排除纯大写缩写词)
|
||||
|
||||
parts = question.replace('"', ' ').replace("'", ' ').replace('`', ' ').split()
|
||||
# 排除常见但非特定的术语
|
||||
excluded_keywords = {"XML", "C#", "DEF", "CS", "CLASS", "PUBLIC"}
|
||||
|
||||
found_keywords = set()
|
||||
|
||||
# 1. 正则匹配
|
||||
csharp_matches = csharp_def_pattern.findall(question)
|
||||
xml_matches = xml_def_pattern.findall(question)
|
||||
|
||||
for match in csharp_matches:
|
||||
found_keywords.add(match)
|
||||
for match in xml_matches:
|
||||
found_keywords.add(match)
|
||||
|
||||
# 2. 启发式单词匹配
|
||||
parts = re.split(r'[\s,.:;\'"`()<>]+', question)
|
||||
|
||||
potential_keywords = []
|
||||
for part in parts:
|
||||
part = part.strip(',.?;:')
|
||||
if not part:
|
||||
continue
|
||||
|
||||
# 检查是否在排除列表中
|
||||
if part.upper() in excluded_keywords:
|
||||
if not part or part.upper() in excluded_keywords:
|
||||
continue
|
||||
|
||||
# 规则1: 包含下划线
|
||||
if '_' in part:
|
||||
potential_keywords.append((part, 3)) # 最高优先级
|
||||
found_keywords.add(part)
|
||||
# 规则2: 驼峰命名或混合大小写
|
||||
elif any(c.islower() for c in part) and any(c.isupper() for c in part):
|
||||
potential_keywords.append((part, 2)) # 次高优先级
|
||||
# 规则3: 多个大写字母(例如 CompPsychicScaling,但要排除纯大写缩写词)
|
||||
elif any(c.islower() for c in part) and any(c.isupper() for c in part) and len(part) > 3:
|
||||
found_keywords.add(part)
|
||||
# 规则3: 多个大写字母
|
||||
elif sum(1 for c in part if c.isupper()) > 1 and not part.isupper():
|
||||
potential_keywords.append((part, 2))
|
||||
# 备用规则:如果之前的规则都没匹配上,就找一个看起来像专有名词的
|
||||
elif part[0].isupper() and len(part) > 4: # 长度大于4以避免像 'A' 'I' 这样的词
|
||||
potential_keywords.append((part, 1)) # 较低优先级
|
||||
found_keywords.add(part)
|
||||
# 备用规则: 大写字母开头且较长
|
||||
elif part[0].isupper() and len(part) > 4:
|
||||
found_keywords.add(part)
|
||||
|
||||
# 如果找到了关键词,按优先级排序并返回最高优先级的那个
|
||||
if potential_keywords:
|
||||
potential_keywords.sort(key=lambda x: x[1], reverse=True)
|
||||
logging.info(f"找到的潜在关键词: {potential_keywords}")
|
||||
return potential_keywords[0][0]
|
||||
if not found_keywords:
|
||||
logging.warning(f"在 '{question}' 中未找到合适的关键词。")
|
||||
return []
|
||||
|
||||
logging.info(f"找到的潜在关键词: {list(found_keywords)}")
|
||||
return list(found_keywords)
|
||||
|
||||
# 如果没有找到,返回空字符串
|
||||
logging.warning(f"在 '{question}' 中未找到合适的关键词。")
|
||||
return ""
|
||||
|
||||
# 5. --- 创建和配置 MCP 服务器 ---
|
||||
# 使用 FastMCP 创建服务器实例
|
||||
@@ -289,31 +300,74 @@ def get_context(question: str) -> str:
|
||||
并将其整合后返回。
|
||||
"""
|
||||
logging.info(f"收到问题: {question}")
|
||||
keyword = find_keyword_in_question(question)
|
||||
if not keyword:
|
||||
keywords = find_keywords_in_question(question)
|
||||
if not keywords:
|
||||
logging.warning("无法从问题中提取关键词。")
|
||||
return "无法从问题中提取关键词,请提供更具体的信息。"
|
||||
|
||||
logging.info(f"提取到关键词: {keyword}")
|
||||
logging.info(f"提取到关键词: {keywords}")
|
||||
|
||||
# 基于所有关键词创建缓存键
|
||||
cache_key = "-".join(sorted(keywords))
|
||||
|
||||
# 1. 检查缓存 (新逻辑)
|
||||
cached_result = load_cache_for_keyword(keyword)
|
||||
# 1. 检查缓存
|
||||
cached_result = load_cache_for_keyword(cache_key)
|
||||
if cached_result:
|
||||
logging.info(f"缓存命中: 关键词 '{keyword}'")
|
||||
logging.info(f"缓存命中: 关键词 '{cache_key}'")
|
||||
return cached_result
|
||||
|
||||
logging.info(f"缓存未命中,开始实时搜索: {keyword}")
|
||||
logging.info(f"缓存未命中,开始实时搜索: {cache_key}")
|
||||
|
||||
# 2. 关键词文件搜索 (初步筛选)
|
||||
# 2. 关键词文件搜索 (分层智能筛选)
|
||||
try:
|
||||
candidate_files = find_files_with_keyword(KNOWLEDGE_BASE_PATHS, keyword)
|
||||
# 优先使用最长的(通常最具体)的关键词进行搜索
|
||||
specific_keywords = sorted(keywords, key=len, reverse=True)
|
||||
candidate_files = find_files_with_keyword(KNOWLEDGE_BASE_PATHS, [specific_keywords[0]])
|
||||
|
||||
# 如果最具体的关键词找不到文件,再尝试所有关键词
|
||||
if not candidate_files and len(keywords) > 1:
|
||||
logging.info(f"使用最具体的关键词 '{specific_keywords[0]}' 未找到文件,尝试所有关键词...")
|
||||
candidate_files = find_files_with_keyword(KNOWLEDGE_BASE_PATHS, keywords)
|
||||
|
||||
if not candidate_files:
|
||||
logging.info(f"未找到与 '{keyword}' 相关的文件。")
|
||||
return f"未在知识库中找到与 '{keyword}' 相关的文件定义。"
|
||||
logging.info(f"未找到与 '{keywords}' 相关的文件。")
|
||||
return f"未在知识库中找到与 '{keywords}' 相关的文件定义。"
|
||||
|
||||
logging.info(f"找到 {len(candidate_files)} 个候选文件,开始向量化处理...")
|
||||
|
||||
# 新增:文件名精确匹配优先
|
||||
priority_results = []
|
||||
remaining_files = []
|
||||
for file_path in candidate_files:
|
||||
filename_no_ext = os.path.splitext(os.path.basename(file_path))[0]
|
||||
is_priority = False
|
||||
for keyword in keywords:
|
||||
if filename_no_ext.lower() == keyword.lower():
|
||||
logging.info(f"文件名精确匹配: {file_path}")
|
||||
code_block = extract_relevant_code(file_path, keyword)
|
||||
if code_block:
|
||||
lang = "csharp" if file_path.endswith(('.cs', '.txt')) else "xml"
|
||||
priority_results.append(
|
||||
f"---\n"
|
||||
f"**文件路径 (精确匹配):** `{file_path}`\n\n"
|
||||
f"```{lang}\n"
|
||||
f"{code_block}\n"
|
||||
f"```"
|
||||
)
|
||||
is_priority = True
|
||||
break # 已处理该文件,跳出内层循环
|
||||
if not is_priority:
|
||||
remaining_files.append(file_path)
|
||||
|
||||
candidate_files = remaining_files # 更新候选文件列表,排除已优先处理的文件
|
||||
|
||||
# 3. 向量化和相似度计算 (精准筛选)
|
||||
# 增加超时保护:限制向量化的文件数量
|
||||
MAX_FILES_TO_VECTORIZE = 25
|
||||
if len(candidate_files) > MAX_FILES_TO_VECTORIZE:
|
||||
logging.warning(f"候选文件过多 ({len(candidate_files)}),仅处理前 {MAX_FILES_TO_VECTORIZE} 个。")
|
||||
candidate_files = candidate_files[:MAX_FILES_TO_VECTORIZE]
|
||||
|
||||
question_embedding = get_embedding(question)
|
||||
if not question_embedding:
|
||||
return "无法生成问题向量,请检查API连接或问题内容。"
|
||||
@@ -323,7 +377,7 @@ def get_context(question: str) -> str:
|
||||
try:
|
||||
with open(file_path, 'r', encoding='utf-8') as f:
|
||||
content = f.read()
|
||||
file_embedding = get_embedding(content[:8000])
|
||||
file_embedding = get_embedding(content[:8000]) # 限制内容长度以提高效率
|
||||
if file_embedding:
|
||||
file_embeddings.append({'path': file_path, 'embedding': file_embedding})
|
||||
except Exception as e:
|
||||
@@ -333,47 +387,45 @@ def get_context(question: str) -> str:
|
||||
return "无法为任何候选文件生成向量。"
|
||||
|
||||
# 找到最相似的多个文件
|
||||
best_matches = find_most_similar_files(question_embedding, file_embeddings, top_n=3)
|
||||
best_matches = find_most_similar_files(question_embedding, file_embeddings, top_n=5) # 增加返回数量
|
||||
|
||||
if not best_matches:
|
||||
return "计算向量相似度失败或没有找到足够相似的文件。"
|
||||
|
||||
# 4. 提取代码并格式化输出
|
||||
output_parts = [f"根据向量相似度分析,与 '{keyword}' 最相关的代码定义如下:\n"]
|
||||
output_parts = [f"根据向量相似度分析,与 '{', '.join(keywords)}' 最相关的代码定义如下:\n"]
|
||||
output_parts.extend(priority_results) # 将优先结果放在最前面
|
||||
|
||||
extracted_blocks = set() # 用于防止重复提取相同的代码块
|
||||
|
||||
for match in best_matches:
|
||||
file_path = match['path']
|
||||
similarity = match['similarity']
|
||||
|
||||
# 智能提取代码块
|
||||
code_block = extract_relevant_code(file_path, keyword)
|
||||
|
||||
# 如果提取失败,则跳过这个文件
|
||||
if not code_block or code_block.startswith("# Error"):
|
||||
logging.warning(f"未能从 {file_path} 提取到完整的代码块。")
|
||||
continue
|
||||
|
||||
# 确定语言类型用于markdown高亮
|
||||
lang = "csharp" if file_path.endswith(('.cs', '.txt')) else "xml"
|
||||
|
||||
output_parts.append(
|
||||
f"---\n"
|
||||
f"**文件路径:** `{file_path}`\n"
|
||||
f"**相似度:** {similarity:.4f}\n\n"
|
||||
f"```{lang}\n"
|
||||
f"{code_block}\n"
|
||||
f"```"
|
||||
)
|
||||
# 对每个关键词都尝试提取代码
|
||||
for keyword in keywords:
|
||||
code_block = extract_relevant_code(file_path, keyword)
|
||||
|
||||
if code_block and code_block not in extracted_blocks:
|
||||
extracted_blocks.add(code_block)
|
||||
lang = "csharp" if file_path.endswith(('.cs', '.txt')) else "xml"
|
||||
output_parts.append(
|
||||
f"---\n"
|
||||
f"**文件路径:** `{file_path}`\n"
|
||||
f"**相似度:** {similarity:.4f}\n\n"
|
||||
f"```{lang}\n"
|
||||
f"{code_block}\n"
|
||||
f"```"
|
||||
)
|
||||
|
||||
# 如果没有任何代码块被成功提取
|
||||
if len(output_parts) <= 1:
|
||||
return f"虽然找到了相似的文件,但无法在其中提取到关于 '{keyword}' 的完整代码块。"
|
||||
return f"虽然找到了相似的文件,但无法在其中提取到关于 '{', '.join(keywords)}' 的完整代码块。"
|
||||
|
||||
final_output = "\n".join(output_parts)
|
||||
|
||||
# 5. 更新缓存并返回结果
|
||||
logging.info(f"向量搜索完成。找到了 {len(best_matches)} 个匹配项并成功提取了代码。")
|
||||
save_cache_for_keyword(keyword, final_output)
|
||||
save_cache_for_keyword(cache_key, final_output)
|
||||
|
||||
return final_output
|
||||
|
||||
|
||||
Reference in New Issue
Block a user