#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ RimWorld知识库Web API服务器 提供HTTP接口,可以通过浏览器或HTTP客户端访问 """ import os import sys import json from http.server import HTTPServer, BaseHTTPRequestHandler from urllib.parse import urlparse, parse_qs import threading import webbrowser # 添加MCP路径 MCP_DIR = os.path.dirname(os.path.abspath(__file__)) SDK_PATH = os.path.join(MCP_DIR, 'python-sdk', 'src') if SDK_PATH not in sys.path: sys.path.insert(0, SDK_PATH) class RimWorldAPIHandler(BaseHTTPRequestHandler): """HTTP请求处理器""" def do_GET(self): """处理GET请求""" parsed_url = urlparse(self.path) if parsed_url.path == '/': self.serve_web_interface() elif parsed_url.path == '/query': self.handle_query_get(parsed_url) elif parsed_url.path == '/api/query': self.handle_api_query_get(parsed_url) else: self.send_error(404, "Not Found") def do_POST(self): """处理POST请求""" if self.path == '/api/query': self.handle_api_query_post() else: self.send_error(404, "Not Found") def serve_web_interface(self): """提供Web界面""" html = """ RimWorld 知识库

🎮 RimWorld 知识库

直接查询RimWorld游戏的C#源码和XML定义

💡 查询示例:

• ThingDef的定义和用法
• 如何创建Building
• Pawn类的主要方法
• CompPower的使用
""" self.send_response(200) self.send_header('Content-Type', 'text/html; charset=utf-8') self.end_headers() self.wfile.write(html.encode('utf-8')) def handle_query_get(self, parsed_url): """处理GET查询请求""" params = parse_qs(parsed_url.query) question = params.get('q', [''])[0] if not question: self.send_error(400, "Missing 'q' parameter") return try: from mcpserver_stdio import get_context result = get_context(question) self.send_response(200) self.send_header('Content-Type', 'text/plain; charset=utf-8') self.end_headers() self.wfile.write(result.encode('utf-8')) except Exception as e: self.send_error(500, f"Query failed: {e}") def handle_api_query_get(self, parsed_url): """处理API GET查询""" params = parse_qs(parsed_url.query) question = params.get('q', [''])[0] if not question: response = {"success": False, "error": "Missing 'q' parameter"} else: try: from mcpserver_stdio import get_context result = get_context(question) response = {"success": True, "result": result} except Exception as e: response = {"success": False, "error": str(e)} self.send_response(200) self.send_header('Content-Type', 'application/json; charset=utf-8') self.end_headers() self.wfile.write(json.dumps(response, ensure_ascii=False).encode('utf-8')) def handle_api_query_post(self): """处理API POST查询""" content_length = int(self.headers['Content-Length']) post_data = self.rfile.read(content_length) try: data = json.loads(post_data.decode('utf-8')) question = data.get('question', '') if not question: response = {"success": False, "error": "Missing 'question' field"} else: from mcpserver_stdio import get_context result = get_context(question) response = {"success": True, "result": result} except Exception as e: response = {"success": False, "error": str(e)} self.send_response(200) self.send_header('Content-Type', 'application/json; charset=utf-8') self.send_header('Access-Control-Allow-Origin', '*') self.end_headers() self.wfile.write(json.dumps(response, ensure_ascii=False).encode('utf-8')) def log_message(self, format, *args): """自定义日志输出""" print(f"[{self.address_string()}] {format % args}") def start_server(port=8080, open_browser=True): """启动Web服务器""" server_address = ('', port) httpd = HTTPServer(server_address, RimWorldAPIHandler) print(f"🌐 RimWorld知识库Web服务器启动") print(f"📍 服务地址: http://localhost:{port}") print(f"🔍 查询API: http://localhost:{port}/api/query?q=您的问题") print(f"💻 Web界面: http://localhost:{port}") print("按 Ctrl+C 停止服务器") if open_browser: # 延迟打开浏览器 def open_browser_delayed(): import time time.sleep(1) webbrowser.open(f'http://localhost:{port}') thread = threading.Thread(target=open_browser_delayed) thread.daemon = True thread.start() try: httpd.serve_forever() except KeyboardInterrupt: print("\n🛑 服务器已停止") httpd.shutdown() if __name__ == "__main__": import argparse parser = argparse.ArgumentParser(description='RimWorld知识库Web API服务器') parser.add_argument('--port', '-p', type=int, default=8080, help='服务器端口 (默认: 8080)') parser.add_argument('--no-browser', action='store_true', help='不自动打开浏览器') args = parser.parse_args() start_server(args.port, not args.no_browser)