Spaces:
No application file
No application file
| import rclpy | |
| from std_msgs.msg import String | |
| from loguru import logger | |
| import asyncio | |
| from llm_request_handler import LLMRequestHandler | |
| from ros_node_publisher import RosNodePublisher | |
| from json_processor import JsonProcessor | |
| from config import MODE_CONFIG, ROS_MESSAGE_MODE | |
| import json | |
| class RosMessageServer(RosNodePublisher): | |
| def __init__(self): | |
| super().__init__(ROS_MESSAGE_MODE) | |
| self.mode_config = MODE_CONFIG[ROS_MESSAGE_MODE] | |
| self._subscriptions = [] | |
| self.received_tasks = [] | |
| self.llm_handler = LLMRequestHandler( | |
| model_version=self.mode_config["model_version"], | |
| max_tokens=self.mode_config["max_tokens"], | |
| temperature=self.mode_config["temperature"], | |
| frequency_penalty=self.mode_config["frequency_penalty"], | |
| list_navigation_once=True | |
| ) | |
| self.initial_messages = self.llm_handler.build_initial_messages(self.mode_config["prompt_file"], ROS_MESSAGE_MODE) | |
| self.json_processor = JsonProcessor() | |
| # Initialize subscriptions based on input topics | |
| for input_topic in self.mode_config["input_topics"]: | |
| subscription = self.create_subscription( | |
| String, | |
| input_topic, | |
| self.listener_callback, | |
| 10) | |
| self._subscriptions.append(subscription) | |
| logger.info(f"{self.mode_config['display_name']} node initialized.") | |
| def listener_callback(self, msg): | |
| task_data = msg.data | |
| logger.debug(f"Received raw task data: {task_data}") | |
| try: | |
| # Parse the task data from the message | |
| task_data_clean = task_data.strip('"') | |
| self.received_tasks.append({"task": task_data_clean}) | |
| asyncio.run(self.process_tasks()) | |
| except Exception as e: | |
| logger.error(f"Unexpected error: {e}") | |
| async def process_tasks(self): | |
| while self.received_tasks: | |
| task = self.received_tasks.pop(0) | |
| logger.debug(f"Processing task: {task}") | |
| await self.send_to_gpt(task) | |
| async def send_to_gpt(self, task): | |
| prompt = f"# Task: {task}" | |
| messages = self.initial_messages + [{"role": "user", "content": prompt}] | |
| response = await self.llm_handler.make_completion(messages) | |
| if response: | |
| self.publish_response(response) | |
| else: | |
| self.publish_response("Error: Unable to get response.") | |
| def publish_response(self, response): | |
| response_json = self.json_processor.process_response(response) | |
| super().publish_response(response_json) | |
| def main(args=None): | |
| rclpy.init(args=args) | |
| ros_message_server = RosMessageServer() | |
| rclpy.spin(ros_message_server) | |
| ros_message_server.destroy_node() | |
| rclpy.shutdown() | |
| if __name__ == "__main__": | |
| main() | |