File size: 4,623 Bytes
ebdad37
 
222f6b9
6bf4af7
ebdad37
 
 
cb053ff
5123466
ebdad37
 
35d4a05
 
1d5163f
35d4a05
 
 
 
 
88e4347
 
 
 
ebdad37
5123466
 
 
1d5163f
5123466
 
 
 
1d5163f
 
 
 
 
 
 
5123466
 
 
 
 
 
 
8aa7030
1d5163f
 
 
 
5123466
 
 
 
 
 
6bf4af7
5123466
 
1d5163f
 
5123466
 
 
 
 
 
 
 
 
 
 
 
 
 
ebdad37
5123466
 
 
9a8a36f
 
5123466
 
ebdad37
5123466
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
ebdad37
5123466
 
 
 
ebdad37
5123466
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
from agent import AgentContext, UserMessage
from python.helpers.api import ApiHandler, Request, Response
from python.helpers import files, dotenv
from initialize import initialize_agent
import os
import json
import base64
import queue
import traceback

class Stream(ApiHandler):
    @classmethod
    def requires_auth(cls) -> bool:
        return False

    @classmethod
    def requires_csrf(cls) -> bool:
        return False

    @classmethod
    def requires_api_key(cls) -> bool:
        return True

    async def process(self, input: dict, request: Request) -> Response:
        try:
            text = input.get("message") or input.get("text") or ""
            ctxid = input.get("context")
            subagent = input.get("subagent") or input.get("profile")
            file_data = input.get("file")
            file_name = input.get("file_name", "uploaded_file")
            
            dotenv.load_dotenv()
            
            # Automatically use BLABLADOR_API_KEY for 'other' provider if available
            blablador_key = os.getenv("BLABLADOR_API_KEY")
            if blablador_key:
                os.environ.setdefault("OTHER_API_KEY", blablador_key)
                os.environ.setdefault("API_KEY_OTHER", blablador_key)

            context = self.get_context(ctxid)
            config = initialize_agent()
            
            if config.chat_model.provider == "Other OpenAI compatible":
                config.chat_model.provider = "other"
            if config.utility_model.provider == "Other OpenAI compatible":
                config.utility_model.provider = "other"

            if subagent:
                config.profile = subagent
                if subagent not in config.knowledge_subdirs:
                    config.knowledge_subdirs.append(subagent)
                    
            context.config = config
            curr_agent = context.agent0
            while curr_agent:
                curr_agent.config = config
                curr_agent = curr_agent.data.get(curr_agent.DATA_NAME_SUBORDINATE)

            attachment_paths = []
            if file_data:
                # Sanitize file name to prevent path traversal
                file_name = os.path.basename(file_name)
                knowledge_dir = files.get_abs_path("knowledge/custom")
                os.makedirs(knowledge_dir, exist_ok=True)
                save_path = os.path.join(knowledge_dir, file_name)
                try:
                    if isinstance(file_data, str) and "," in file_data:
                        header, encoded = file_data.split(",", 1)
                        file_data = encoded
                    decoded_data = base64.b64decode(file_data)
                    with open(save_path, "wb") as f:
                        f.write(decoded_data)
                except Exception:
                    with open(save_path, "w") as f:
                        f.write(str(file_data))
                attachment_paths.append(save_path)

            sync_queue = queue.Queue()
            context.stream_queue = sync_queue
            
            msg = UserMessage(text, attachment_paths)
            task = context.communicate(msg)

            def generate():
                try:
                    while task.is_alive() or not sync_queue.empty():
                        try:
                            chunk = sync_queue.get(timeout=0.1)
                            yield f"data: {json.dumps(chunk)}\n\n"
                        except queue.Empty:
                            if not task.is_alive():
                                break
                            continue
                        except Exception as e:
                            yield f"data: {json.dumps({'type': 'error', 'text': str(e)})}\n\n"
                            break
                    
                    try:
                        result = task.result_sync(timeout=300)
                        yield f"data: {json.dumps({'type': 'final', 'text': result})}\n\n"
                    except Exception as e:
                        yield f"data: {json.dumps({'type': 'error', 'text': f'Result error: {str(e)}'})}\n\n"
                except Exception as e:
                    yield f"data: {json.dumps({'type': 'error', 'text': f'Generator error: {str(e)}'})}\n\n"
                finally:
                    if hasattr(context, 'stream_queue'):
                        delattr(context, 'stream_queue')

            return Response(generate(), mimetype='text/event-stream')
        except Exception as e:
            return Response(f"Error: {str(e)}\n{traceback.format_exc()}", status=500)