#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
RimWorld知识库Web API服务器
提供HTTP接口,可以通过浏览器或HTTP客户端访问
"""
import os
import sys
import json
from http.server import HTTPServer, BaseHTTPRequestHandler
from urllib.parse import urlparse, parse_qs
import threading
import webbrowser
# 添加MCP路径
MCP_DIR = os.path.dirname(os.path.abspath(__file__))
SDK_PATH = os.path.join(MCP_DIR, 'python-sdk', 'src')
if SDK_PATH not in sys.path:
sys.path.insert(0, SDK_PATH)
class RimWorldAPIHandler(BaseHTTPRequestHandler):
"""HTTP请求处理器"""
def do_GET(self):
"""处理GET请求"""
parsed_url = urlparse(self.path)
if parsed_url.path == '/':
self.serve_web_interface()
elif parsed_url.path == '/query':
self.handle_query_get(parsed_url)
elif parsed_url.path == '/api/query':
self.handle_api_query_get(parsed_url)
else:
self.send_error(404, "Not Found")
def do_POST(self):
"""处理POST请求"""
if self.path == '/api/query':
self.handle_api_query_post()
else:
self.send_error(404, "Not Found")
def serve_web_interface(self):
"""提供Web界面"""
html = """
RimWorld 知识库
💡 查询示例:
• ThingDef的定义和用法
• 如何创建Building
• Pawn类的主要方法
• CompPower的使用
"""
self.send_response(200)
self.send_header('Content-Type', 'text/html; charset=utf-8')
self.end_headers()
self.wfile.write(html.encode('utf-8'))
def handle_query_get(self, parsed_url):
"""处理GET查询请求"""
params = parse_qs(parsed_url.query)
question = params.get('q', [''])[0]
if not question:
self.send_error(400, "Missing 'q' parameter")
return
try:
from mcpserver_stdio import get_context
result = get_context(question)
self.send_response(200)
self.send_header('Content-Type', 'text/plain; charset=utf-8')
self.end_headers()
self.wfile.write(result.encode('utf-8'))
except Exception as e:
self.send_error(500, f"Query failed: {e}")
def handle_api_query_get(self, parsed_url):
"""处理API GET查询"""
params = parse_qs(parsed_url.query)
question = params.get('q', [''])[0]
if not question:
response = {"success": False, "error": "Missing 'q' parameter"}
else:
try:
from mcpserver_stdio import get_context
result = get_context(question)
response = {"success": True, "result": result}
except Exception as e:
response = {"success": False, "error": str(e)}
self.send_response(200)
self.send_header('Content-Type', 'application/json; charset=utf-8')
self.end_headers()
self.wfile.write(json.dumps(response, ensure_ascii=False).encode('utf-8'))
def handle_api_query_post(self):
"""处理API POST查询"""
content_length = int(self.headers['Content-Length'])
post_data = self.rfile.read(content_length)
try:
data = json.loads(post_data.decode('utf-8'))
question = data.get('question', '')
if not question:
response = {"success": False, "error": "Missing 'question' field"}
else:
from mcpserver_stdio import get_context
result = get_context(question)
response = {"success": True, "result": result}
except Exception as e:
response = {"success": False, "error": str(e)}
self.send_response(200)
self.send_header('Content-Type', 'application/json; charset=utf-8')
self.send_header('Access-Control-Allow-Origin', '*')
self.end_headers()
self.wfile.write(json.dumps(response, ensure_ascii=False).encode('utf-8'))
def log_message(self, format, *args):
"""自定义日志输出"""
print(f"[{self.address_string()}] {format % args}")
def start_server(port=8080, open_browser=True):
"""启动Web服务器"""
server_address = ('', port)
httpd = HTTPServer(server_address, RimWorldAPIHandler)
print(f"🌐 RimWorld知识库Web服务器启动")
print(f"📍 服务地址: http://localhost:{port}")
print(f"🔍 查询API: http://localhost:{port}/api/query?q=您的问题")
print(f"💻 Web界面: http://localhost:{port}")
print("按 Ctrl+C 停止服务器")
if open_browser:
# 延迟打开浏览器
def open_browser_delayed():
import time
time.sleep(1)
webbrowser.open(f'http://localhost:{port}')
thread = threading.Thread(target=open_browser_delayed)
thread.daemon = True
thread.start()
try:
httpd.serve_forever()
except KeyboardInterrupt:
print("\n🛑 服务器已停止")
httpd.shutdown()
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(description='RimWorld知识库Web API服务器')
parser.add_argument('--port', '-p', type=int, default=8080, help='服务器端口 (默认: 8080)')
parser.add_argument('--no-browser', action='store_true', help='不自动打开浏览器')
args = parser.parse_args()
start_server(args.port, not args.no_browser)