Files
WulaFallenEmpireRW/MCP/web_api_server.py
2025-08-24 21:15:38 +08:00

249 lines
8.9 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
RimWorld知识库Web API服务器
提供HTTP接口可以通过浏览器或HTTP客户端访问
"""
import os
import sys
import json
from http.server import HTTPServer, BaseHTTPRequestHandler
from urllib.parse import urlparse, parse_qs
import threading
import webbrowser
# 添加MCP路径
MCP_DIR = os.path.dirname(os.path.abspath(__file__))
SDK_PATH = os.path.join(MCP_DIR, 'python-sdk', 'src')
if SDK_PATH not in sys.path:
sys.path.insert(0, SDK_PATH)
class RimWorldAPIHandler(BaseHTTPRequestHandler):
"""HTTP请求处理器"""
def do_GET(self):
"""处理GET请求"""
parsed_url = urlparse(self.path)
if parsed_url.path == '/':
self.serve_web_interface()
elif parsed_url.path == '/query':
self.handle_query_get(parsed_url)
elif parsed_url.path == '/api/query':
self.handle_api_query_get(parsed_url)
else:
self.send_error(404, "Not Found")
def do_POST(self):
"""处理POST请求"""
if self.path == '/api/query':
self.handle_api_query_post()
else:
self.send_error(404, "Not Found")
def serve_web_interface(self):
"""提供Web界面"""
html = """
<!DOCTYPE html>
<html lang="zh-CN">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>RimWorld 知识库</title>
<style>
body { font-family: Arial, sans-serif; max-width: 1200px; margin: 0 auto; padding: 20px; }
.header { text-align: center; margin-bottom: 30px; }
.query-box { margin-bottom: 20px; }
input[type="text"] { width: 70%; padding: 10px; font-size: 16px; }
button { padding: 10px 20px; font-size: 16px; background: #007cba; color: white; border: none; cursor: pointer; }
button:hover { background: #005a8b; }
.result { margin-top: 20px; padding: 15px; background: #f5f5f5; border-radius: 5px; white-space: pre-wrap; }
.loading { color: #666; font-style: italic; }
.examples { margin-top: 20px; }
.example { cursor: pointer; color: #007cba; margin: 5px 0; }
.example:hover { text-decoration: underline; }
</style>
</head>
<body>
<div class="header">
<h1>🎮 RimWorld 知识库</h1>
<p>直接查询RimWorld游戏的C#源码和XML定义</p>
</div>
<div class="query-box">
<input type="text" id="queryInput" placeholder="输入您的问题例如ThingDef是什么" onkeypress="handleKeyPress(event)">
<button onclick="performQuery()">🔍 查询</button>
</div>
<div class="examples">
<h3>💡 查询示例:</h3>
<div class="example" onclick="setQuery('ThingDef的定义和用法')">• ThingDef的定义和用法</div>
<div class="example" onclick="setQuery('如何创建Building')">• 如何创建Building</div>
<div class="example" onclick="setQuery('Pawn类的主要方法')">• Pawn类的主要方法</div>
<div class="example" onclick="setQuery('CompPower的使用')">• CompPower的使用</div>
</div>
<div id="result" class="result" style="display: none;"></div>
<script>
async function performQuery() {
const input = document.getElementById('queryInput');
const result = document.getElementById('result');
const query = input.value.trim();
if (!query) {
alert('请输入查询问题');
return;
}
result.style.display = 'block';
result.textContent = '🔄 正在查询,请稍候...';
result.className = 'result loading';
try {
const response = await fetch('/api/query', {
method: 'POST',
headers: {'Content-Type': 'application/json'},
body: JSON.stringify({question: query})
});
const data = await response.json();
if (data.success) {
result.textContent = data.result;
result.className = 'result';
} else {
result.textContent = '❌ 查询失败: ' + data.error;
result.className = 'result';
}
} catch (error) {
result.textContent = '❌ 网络错误: ' + error.message;
result.className = 'result';
}
}
function setQuery(query) {
document.getElementById('queryInput').value = query;
}
function handleKeyPress(event) {
if (event.key === 'Enter') {
performQuery();
}
}
</script>
</body>
</html>
"""
self.send_response(200)
self.send_header('Content-Type', 'text/html; charset=utf-8')
self.end_headers()
self.wfile.write(html.encode('utf-8'))
def handle_query_get(self, parsed_url):
"""处理GET查询请求"""
params = parse_qs(parsed_url.query)
question = params.get('q', [''])[0]
if not question:
self.send_error(400, "Missing 'q' parameter")
return
try:
from mcpserver_stdio import get_context
result = get_context(question)
self.send_response(200)
self.send_header('Content-Type', 'text/plain; charset=utf-8')
self.end_headers()
self.wfile.write(result.encode('utf-8'))
except Exception as e:
self.send_error(500, f"Query failed: {e}")
def handle_api_query_get(self, parsed_url):
"""处理API GET查询"""
params = parse_qs(parsed_url.query)
question = params.get('q', [''])[0]
if not question:
response = {"success": False, "error": "Missing 'q' parameter"}
else:
try:
from mcpserver_stdio import get_context
result = get_context(question)
response = {"success": True, "result": result}
except Exception as e:
response = {"success": False, "error": str(e)}
self.send_response(200)
self.send_header('Content-Type', 'application/json; charset=utf-8')
self.end_headers()
self.wfile.write(json.dumps(response, ensure_ascii=False).encode('utf-8'))
def handle_api_query_post(self):
"""处理API POST查询"""
content_length = int(self.headers['Content-Length'])
post_data = self.rfile.read(content_length)
try:
data = json.loads(post_data.decode('utf-8'))
question = data.get('question', '')
if not question:
response = {"success": False, "error": "Missing 'question' field"}
else:
from mcpserver_stdio import get_context
result = get_context(question)
response = {"success": True, "result": result}
except Exception as e:
response = {"success": False, "error": str(e)}
self.send_response(200)
self.send_header('Content-Type', 'application/json; charset=utf-8')
self.send_header('Access-Control-Allow-Origin', '*')
self.end_headers()
self.wfile.write(json.dumps(response, ensure_ascii=False).encode('utf-8'))
def log_message(self, format, *args):
"""自定义日志输出"""
print(f"[{self.address_string()}] {format % args}")
def start_server(port=8080, open_browser=True):
"""启动Web服务器"""
server_address = ('', port)
httpd = HTTPServer(server_address, RimWorldAPIHandler)
print(f"🌐 RimWorld知识库Web服务器启动")
print(f"📍 服务地址: http://localhost:{port}")
print(f"🔍 查询API: http://localhost:{port}/api/query?q=您的问题")
print(f"💻 Web界面: http://localhost:{port}")
print("按 Ctrl+C 停止服务器")
if open_browser:
# 延迟打开浏览器
def open_browser_delayed():
import time
time.sleep(1)
webbrowser.open(f'http://localhost:{port}')
thread = threading.Thread(target=open_browser_delayed)
thread.daemon = True
thread.start()
try:
httpd.serve_forever()
except KeyboardInterrupt:
print("\n🛑 服务器已停止")
httpd.shutdown()
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(description='RimWorld知识库Web API服务器')
parser.add_argument('--port', '-p', type=int, default=8080, help='服务器端口 (默认: 8080)')
parser.add_argument('--no-browser', action='store_true', help='不自动打开浏览器')
args = parser.parse_args()
start_server(args.port, not args.no_browser)