akseljoonas HF Staff commited on
Commit
f8d6755
·
1 Parent(s): 8d46a58

subprocess uploading

Browse files
.gitignore CHANGED
@@ -15,4 +15,5 @@ wheels/
15
  *.csv
16
  /logs
17
  hf-agent-leaderboard/
18
- .cursor/
 
 
15
  *.csv
16
  /logs
17
  hf-agent-leaderboard/
18
+ .cursor/
19
+ session_logs/
agent/config.py CHANGED
@@ -21,6 +21,7 @@ class Config(BaseModel):
21
  mcpServers: dict[str, MCPServerConfig] = {}
22
  save_sessions: bool = True
23
  session_dataset_repo: str = "smolagents/hf-agent-sessions"
 
24
 
25
 
26
  def substitute_env_vars(obj: Any) -> Any:
 
21
  mcpServers: dict[str, MCPServerConfig] = {}
22
  save_sessions: bool = True
23
  session_dataset_repo: str = "smolagents/hf-agent-sessions"
24
+ auto_save_interval: int = 3 # Save every N user turns (0 = disabled)
25
 
26
 
27
  def substitute_env_vars(obj: Any) -> Any:
agent/core/agent_loop.py CHANGED
@@ -255,6 +255,11 @@ class Handlers:
255
  data={"history_size": len(session.context_manager.items)},
256
  )
257
  )
 
 
 
 
 
258
  return final_response
259
 
260
  @staticmethod
@@ -414,15 +419,13 @@ class Handlers:
414
  @staticmethod
415
  async def shutdown(session: Session) -> bool:
416
  """Handle shutdown (like shutdown in codex.rs:1329)"""
417
- # Save session trajectory if enabled
418
  if session.config.save_sessions:
419
- print("💾 Saving session trajectory...")
420
  repo_id = session.config.session_dataset_repo
421
- url = await session.push_to_dataset(repo_id)
422
- if url:
423
- print(f"✅ Session saved to: {url}")
424
- else:
425
- print("❌ Failed to save session")
426
 
427
  session.is_running = False
428
  await session.send_event(Event(event_type="shutdown"))
@@ -484,26 +487,47 @@ async def submission_loop(
484
  session = Session(event_queue, config=config, tool_router=tool_router)
485
  print("Agent loop started")
486
 
487
- # Main processing loop
488
- async with tool_router:
489
- # Emit ready event after initialization
490
- await session.send_event(
491
- Event(event_type="ready", data={"message": "Agent initialized"})
492
  )
493
 
494
- while session.is_running:
495
- submission = await submission_queue.get()
 
 
 
 
 
496
 
497
- try:
498
- should_continue = await process_submission(session, submission)
499
- if not should_continue:
 
 
 
 
 
 
500
  break
501
- except asyncio.CancelledError:
502
- break
503
- except Exception as e:
504
- print(f" Error in agent loop: {e}")
505
- await session.send_event(
506
- Event(event_type="error", data={"error": str(e)})
507
- )
508
 
509
- print("🛑 Agent loop exited")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
255
  data={"history_size": len(session.context_manager.items)},
256
  )
257
  )
258
+
259
+ # Increment turn counter and check for auto-save
260
+ session.increment_turn()
261
+ await session.auto_save_if_needed()
262
+
263
  return final_response
264
 
265
  @staticmethod
 
419
  @staticmethod
420
  async def shutdown(session: Session) -> bool:
421
  """Handle shutdown (like shutdown in codex.rs:1329)"""
422
+ # Save session trajectory if enabled (fire-and-forget, returns immediately)
423
  if session.config.save_sessions:
424
+ print("💾 Saving session...")
425
  repo_id = session.config.session_dataset_repo
426
+ local_path = session.save_and_upload_detached(repo_id)
427
+ if local_path:
428
+ print("✅ Session saved locally, upload in progress")
 
 
429
 
430
  session.is_running = False
431
  await session.send_event(Event(event_type="shutdown"))
 
487
  session = Session(event_queue, config=config, tool_router=tool_router)
488
  print("Agent loop started")
489
 
490
+ # Retry any failed uploads from previous sessions (fire-and-forget)
491
+ if config and config.save_sessions:
492
+ Session.retry_failed_uploads_detached(
493
+ directory="session_logs", repo_id=config.session_dataset_repo
 
494
  )
495
 
496
+ try:
497
+ # Main processing loop
498
+ async with tool_router:
499
+ # Emit ready event after initialization
500
+ await session.send_event(
501
+ Event(event_type="ready", data={"message": "Agent initialized"})
502
+ )
503
 
504
+ while session.is_running:
505
+ submission = await submission_queue.get()
506
+
507
+ try:
508
+ should_continue = await process_submission(session, submission)
509
+ if not should_continue:
510
+ break
511
+ except asyncio.CancelledError:
512
+ print("\n⚠️ Agent loop cancelled")
513
  break
514
+ except Exception as e:
515
+ print(f"❌ Error in agent loop: {e}")
516
+ await session.send_event(
517
+ Event(event_type="error", data={"error": str(e)})
518
+ )
 
 
519
 
520
+ print("🛑 Agent loop exited")
521
+
522
+ finally:
523
+ # Emergency save if session saving is enabled and shutdown wasn't called properly
524
+ if session.config.save_sessions and session.is_running:
525
+ print("\n💾 Emergency save: preserving session before exit...")
526
+ try:
527
+ local_path = session.save_and_upload_detached(
528
+ session.config.session_dataset_repo
529
+ )
530
+ if local_path:
531
+ print("✅ Emergency save successful, upload in progress")
532
+ except Exception as e:
533
+ print(f"❌ Emergency save failed: {e}")
agent/core/session.py CHANGED
@@ -1,9 +1,12 @@
1
  import asyncio
2
  import json
 
 
3
  import uuid
4
  from dataclasses import dataclass
5
  from datetime import datetime
6
  from enum import Enum
 
7
  from typing import Any, Optional
8
 
9
  from litellm import get_max_tokens
@@ -60,6 +63,8 @@ class Session:
60
  # Session trajectory logging
61
  self.logged_events: list[dict] = []
62
  self.session_start_time = datetime.now().isoformat()
 
 
63
 
64
  async def send_event(self, event: Event) -> None:
65
  """Send event back to client and log to trajectory"""
@@ -79,6 +84,26 @@ class Session:
79
  if self.current_task and not self.current_task.done():
80
  self.current_task.cancel()
81
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
82
  def get_trajectory(self) -> dict:
83
  """Serialize complete session trajectory for logging"""
84
  return {
@@ -90,51 +115,121 @@ class Session:
90
  "events": self.logged_events,
91
  }
92
 
93
- async def push_to_dataset(self, repo_id: str) -> Optional[str]:
 
 
 
 
 
94
  """
95
- Push session trajectory to Hugging Face dataset
96
 
97
  Args:
98
- repo_id: HuggingFace dataset repo ID (e.g. 'username/dataset-name')
 
 
99
 
100
  Returns:
101
- URL to the uploaded file if successful, None otherwise
102
  """
103
  try:
104
- import os
105
-
106
- from datasets import Dataset
107
 
108
- # Get trajectory data
109
  trajectory = self.get_trajectory()
110
 
111
- # Convert to dataset row format
112
- row = {
113
- "session_id": trajectory["session_id"],
114
- "session_start_time": trajectory["session_start_time"],
115
- "session_end_time": trajectory["session_end_time"],
116
- "model_name": trajectory["model_name"],
117
- "messages": json.dumps(trajectory["messages"]),
118
- "events": json.dumps(trajectory["events"]),
119
- }
120
 
121
- # Try to load existing dataset and append
122
- try:
123
- from datasets import load_dataset
124
 
125
- existing_dataset = load_dataset(repo_id, split="train")
126
- new_dataset = Dataset.from_dict(
127
- {k: list(existing_dataset[k]) + [v] for k, v in row.items()}
128
- )
129
- except Exception:
130
- # Dataset doesn't exist yet, create new one
131
- new_dataset = Dataset.from_dict({k: [v] for k, v in row.items()})
 
 
 
 
 
132
 
133
- # Push to hub
134
- new_dataset.push_to_hub(repo_id, private=True, token=os.getenv("HF_TOKEN"))
 
135
 
136
- return f"https://huggingface.co/datasets/{repo_id}"
 
137
 
 
138
  except Exception as e:
139
- print(f"Failed to push session to dataset: {e}")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
140
  return None
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
  import asyncio
2
  import json
3
+ import subprocess
4
+ import sys
5
  import uuid
6
  from dataclasses import dataclass
7
  from datetime import datetime
8
  from enum import Enum
9
+ from pathlib import Path
10
  from typing import Any, Optional
11
 
12
  from litellm import get_max_tokens
 
63
  # Session trajectory logging
64
  self.logged_events: list[dict] = []
65
  self.session_start_time = datetime.now().isoformat()
66
+ self.turn_count: int = 0
67
+ self.last_auto_save_turn: int = 0
68
 
69
  async def send_event(self, event: Event) -> None:
70
  """Send event back to client and log to trajectory"""
 
84
  if self.current_task and not self.current_task.done():
85
  self.current_task.cancel()
86
 
87
+ def increment_turn(self) -> None:
88
+ """Increment turn counter (called after each user interaction)"""
89
+ self.turn_count += 1
90
+
91
+ async def auto_save_if_needed(self) -> None:
92
+ """Check if auto-save should trigger and save if so (completely non-blocking)"""
93
+ if not self.config.save_sessions:
94
+ return
95
+
96
+ interval = self.config.auto_save_interval
97
+ if interval <= 0:
98
+ return
99
+
100
+ turns_since_last_save = self.turn_count - self.last_auto_save_turn
101
+ if turns_since_last_save >= interval:
102
+ print(f"\n💾 Auto-saving session (turn {self.turn_count})...")
103
+ # Fire-and-forget save - returns immediately
104
+ self.save_and_upload_detached(self.config.session_dataset_repo)
105
+ self.last_auto_save_turn = self.turn_count
106
+
107
  def get_trajectory(self) -> dict:
108
  """Serialize complete session trajectory for logging"""
109
  return {
 
115
  "events": self.logged_events,
116
  }
117
 
118
+ def save_trajectory_local(
119
+ self,
120
+ directory: str = "session_logs",
121
+ upload_status: str = "pending",
122
+ dataset_url: Optional[str] = None,
123
+ ) -> Optional[str]:
124
  """
125
+ Save trajectory to local JSON file as backup with upload status
126
 
127
  Args:
128
+ directory: Directory to save logs (default: "session_logs")
129
+ upload_status: Status of upload attempt ("pending", "success", "failed")
130
+ dataset_url: URL of dataset if upload succeeded
131
 
132
  Returns:
133
+ Path to saved file if successful, None otherwise
134
  """
135
  try:
136
+ log_dir = Path(directory)
137
+ log_dir.mkdir(parents=True, exist_ok=True)
 
138
 
 
139
  trajectory = self.get_trajectory()
140
 
141
+ # Add upload metadata
142
+ trajectory["upload_status"] = upload_status
143
+ trajectory["upload_url"] = dataset_url
144
+ trajectory["last_save_time"] = datetime.now().isoformat()
145
+
146
+ filename = f"session_{self.session_id}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
147
+ filepath = log_dir / filename
 
 
148
 
149
+ with open(filepath, "w") as f:
150
+ json.dump(trajectory, f, indent=2)
 
151
 
152
+ return str(filepath)
153
+ except Exception as e:
154
+ print(f"Failed to save session locally: {e}")
155
+ return None
156
+
157
+ def update_local_save_status(
158
+ self, filepath: str, upload_status: str, dataset_url: Optional[str] = None
159
+ ) -> bool:
160
+ """Update the upload status of an existing local save file"""
161
+ try:
162
+ with open(filepath, "r") as f:
163
+ data = json.load(f)
164
 
165
+ data["upload_status"] = upload_status
166
+ data["upload_url"] = dataset_url
167
+ data["last_save_time"] = datetime.now().isoformat()
168
 
169
+ with open(filepath, "w") as f:
170
+ json.dump(data, f, indent=2)
171
 
172
+ return True
173
  except Exception as e:
174
+ print(f"Failed to update local save status: {e}")
175
+ return False
176
+
177
+ def save_and_upload_detached(self, repo_id: str) -> Optional[str]:
178
+ """
179
+ Save session locally and spawn detached subprocess for upload (fire-and-forget)
180
+
181
+ Args:
182
+ repo_id: HuggingFace dataset repo ID
183
+
184
+ Returns:
185
+ Path to local save file
186
+ """
187
+ # Save locally first (fast, synchronous)
188
+ local_path = self.save_trajectory_local(upload_status="pending")
189
+ if not local_path:
190
  return None
191
+
192
+ # Spawn detached subprocess for upload (fire-and-forget)
193
+ try:
194
+ uploader_script = Path(__file__).parent / "session_uploader.py"
195
+
196
+ # Use Popen with detached process
197
+ subprocess.Popen(
198
+ [sys.executable, str(uploader_script), "upload", local_path, repo_id],
199
+ stdin=subprocess.DEVNULL,
200
+ stdout=subprocess.DEVNULL,
201
+ stderr=subprocess.DEVNULL,
202
+ start_new_session=True, # Detach from parent
203
+ )
204
+ except Exception as e:
205
+ print(f"⚠️ Failed to spawn upload subprocess: {e}")
206
+
207
+ return local_path
208
+
209
+ @staticmethod
210
+ def retry_failed_uploads_detached(
211
+ directory: str = "session_logs", repo_id: Optional[str] = None
212
+ ) -> None:
213
+ """
214
+ Spawn detached subprocess to retry failed/pending uploads (fire-and-forget)
215
+
216
+ Args:
217
+ directory: Directory containing session logs
218
+ repo_id: Target dataset repo ID
219
+ """
220
+ if not repo_id:
221
+ return
222
+
223
+ try:
224
+ uploader_script = Path(__file__).parent / "session_uploader.py"
225
+
226
+ # Spawn detached subprocess for retry
227
+ subprocess.Popen(
228
+ [sys.executable, str(uploader_script), "retry", directory, repo_id],
229
+ stdin=subprocess.DEVNULL,
230
+ stdout=subprocess.DEVNULL,
231
+ stderr=subprocess.DEVNULL,
232
+ start_new_session=True, # Detach from parent
233
+ )
234
+ except Exception as e:
235
+ print(f"⚠️ Failed to spawn retry subprocess: {e}")
agent/core/session_uploader.py ADDED
@@ -0,0 +1,142 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ #!/usr/bin/env python3
2
+ """
3
+ Standalone script for uploading session trajectories to HuggingFace.
4
+ This runs as a separate process to avoid blocking the main agent.
5
+ """
6
+
7
+ import json
8
+ import os
9
+ import sys
10
+ from pathlib import Path
11
+
12
+
13
+ def upload_session_to_dataset(session_file: str, repo_id: str, max_retries: int = 3):
14
+ """Upload a single session file to HuggingFace dataset"""
15
+ try:
16
+ from datasets import Dataset, load_dataset
17
+ except ImportError:
18
+ print("Error: datasets library not available", file=sys.stderr)
19
+ return False
20
+
21
+ try:
22
+ # Load session data
23
+ with open(session_file, "r") as f:
24
+ data = json.load(f)
25
+
26
+ # Check if already uploaded
27
+ upload_status = data.get("upload_status")
28
+ if upload_status == "success":
29
+ return True
30
+
31
+ # Prepare row for upload
32
+ row = {
33
+ "session_id": data["session_id"],
34
+ "session_start_time": data["session_start_time"],
35
+ "session_end_time": data["session_end_time"],
36
+ "model_name": data["model_name"],
37
+ "messages": json.dumps(data["messages"]),
38
+ "events": json.dumps(data["events"]),
39
+ }
40
+
41
+ hf_token = os.getenv("HF_TOKEN")
42
+ if not hf_token:
43
+ # Update status to failed
44
+ data["upload_status"] = "failed"
45
+ with open(session_file, "w") as f:
46
+ json.dump(data, f, indent=2)
47
+ return False
48
+
49
+ # Try to load existing dataset and append
50
+ for attempt in range(max_retries):
51
+ try:
52
+ try:
53
+ existing_dataset = load_dataset(repo_id, split="train")
54
+ new_dataset = Dataset.from_dict(
55
+ {k: list(existing_dataset[k]) + [v] for k, v in row.items()}
56
+ )
57
+ except Exception:
58
+ # Dataset doesn't exist yet, create new one
59
+ new_dataset = Dataset.from_dict({k: [v] for k, v in row.items()})
60
+
61
+ # Push to hub
62
+ new_dataset.push_to_hub(repo_id, private=True, token=hf_token)
63
+
64
+ # Update status to success
65
+ data["upload_status"] = "success"
66
+ data["upload_url"] = f"https://huggingface.co/datasets/{repo_id}"
67
+ with open(session_file, "w") as f:
68
+ json.dump(data, f, indent=2)
69
+
70
+ return True
71
+
72
+ except Exception:
73
+ if attempt < max_retries - 1:
74
+ import time
75
+
76
+ wait_time = 2**attempt
77
+ time.sleep(wait_time)
78
+ else:
79
+ # Final attempt failed
80
+ data["upload_status"] = "failed"
81
+ with open(session_file, "w") as f:
82
+ json.dump(data, f, indent=2)
83
+ return False
84
+
85
+ except Exception as e:
86
+ print(f"Error uploading session: {e}", file=sys.stderr)
87
+ return False
88
+
89
+
90
+ def retry_failed_uploads(directory: str, repo_id: str):
91
+ """Retry all failed/pending uploads in a directory"""
92
+ log_dir = Path(directory)
93
+ if not log_dir.exists():
94
+ return
95
+
96
+ session_files = list(log_dir.glob("session_*.json"))
97
+
98
+ for filepath in session_files:
99
+ try:
100
+ with open(filepath, "r") as f:
101
+ data = json.load(f)
102
+
103
+ upload_status = data.get("upload_status", "unknown")
104
+
105
+ # Only retry pending or failed uploads
106
+ if upload_status in ["pending", "failed"]:
107
+ upload_session_to_dataset(str(filepath), repo_id)
108
+
109
+ except Exception:
110
+ pass
111
+
112
+
113
+ if __name__ == "__main__":
114
+ if len(sys.argv) < 3:
115
+ print("Usage: session_uploader.py <command> <args...>")
116
+ sys.exit(1)
117
+
118
+ command = sys.argv[1]
119
+
120
+ if command == "upload":
121
+ # python session_uploader.py upload <session_file> <repo_id>
122
+ if len(sys.argv) < 4:
123
+ print("Usage: session_uploader.py upload <session_file> <repo_id>")
124
+ sys.exit(1)
125
+ session_file = sys.argv[2]
126
+ repo_id = sys.argv[3]
127
+ success = upload_session_to_dataset(session_file, repo_id)
128
+ sys.exit(0 if success else 1)
129
+
130
+ elif command == "retry":
131
+ # python session_uploader.py retry <directory> <repo_id>
132
+ if len(sys.argv) < 4:
133
+ print("Usage: session_uploader.py retry <directory> <repo_id>")
134
+ sys.exit(1)
135
+ directory = sys.argv[2]
136
+ repo_id = sys.argv[3]
137
+ retry_failed_uploads(directory, repo_id)
138
+ sys.exit(0)
139
+
140
+ else:
141
+ print(f"Unknown command: {command}")
142
+ sys.exit(1)
agent/main.py CHANGED
@@ -415,8 +415,7 @@ async def main():
415
  )
416
  await submission_queue.put(shutdown_submission)
417
 
418
- # Wait for tasks to complete (longer timeout to allow for session save)
419
- await asyncio.wait_for(agent_task, timeout=30.0)
420
  listener_task.cancel()
421
 
422
  print("✨ Goodbye!\n")
 
415
  )
416
  await submission_queue.put(shutdown_submission)
417
 
418
+ await asyncio.wait_for(agent_task, timeout=5.0)
 
419
  listener_task.cancel()
420
 
421
  print("✨ Goodbye!\n")