Spaces:
Sleeping
Sleeping
| from typing import Any | |
| from mcp.server.fastmcp import FastMCP | |
| import http.client | |
| import json | |
| mindv = FastMCP("mindverse") | |
| url = "localhost:8002" | |
| path = "/api/kernel2/chat" | |
| messages =[] | |
| async def get_response(query:str) -> str | None | Any: | |
| """ | |
| Received a response based on local secondme model. | |
| Args: | |
| query (str): Questions raised by users regarding the secondme model | |
| """ | |
| headers = { | |
| "Content-Type": "application/json", | |
| "Accept": "text/event-stream" | |
| } | |
| messages.append({"role": "user", "content": query}) | |
| data={ | |
| "messages": messages, | |
| "stream": True | |
| } | |
| conn = http.client.HTTPConnection(url) | |
| # Send the POST request | |
| conn.request("POST", path, body=json.dumps(data), headers=headers) | |
| # Get the response | |
| response = conn.getresponse() | |
| full_content="" | |
| for line in response: | |
| if line: | |
| decoded_line = line.decode('utf-8').strip() | |
| if decoded_line == 'data: [DONE]': | |
| break | |
| if decoded_line.startswith('data: '): | |
| try: | |
| json_str = decoded_line[6:] | |
| chunk = json.loads(json_str) | |
| content = chunk['choices'][0]['delta'].get('content', '') | |
| if content: | |
| full_content+=content | |
| except json.JSONDecodeError: | |
| pass | |
| conn.close() | |
| if full_content: | |
| messages.append({"role": "system", "content": full_content}) | |
| return full_content | |
| else: | |
| return None | |
| if __name__ == "__main__": | |
| # Initialize and run the server | |
| mindv.run(transport='stdio') | |