akseljoonas HF Staff commited on
Commit
bc8323d
·
1 Parent(s): f8d6755

better file uploading

Browse files
Files changed (1) hide show
  1. agent/core/session_uploader.py +96 -44
agent/core/session_uploader.py CHANGED
@@ -2,20 +2,34 @@
2
  """
3
  Standalone script for uploading session trajectories to HuggingFace.
4
  This runs as a separate process to avoid blocking the main agent.
 
5
  """
6
 
7
  import json
8
  import os
9
  import sys
 
10
  from pathlib import Path
11
 
12
 
13
- def upload_session_to_dataset(session_file: str, repo_id: str, max_retries: int = 3):
14
- """Upload a single session file to HuggingFace dataset"""
 
 
 
 
 
 
 
 
 
 
 
 
15
  try:
16
- from datasets import Dataset, load_dataset
17
  except ImportError:
18
- print("Error: datasets library not available", file=sys.stderr)
19
  return False
20
 
21
  try:
@@ -28,8 +42,17 @@ def upload_session_to_dataset(session_file: str, repo_id: str, max_retries: int
28
  if upload_status == "success":
29
  return True
30
 
31
- # Prepare row for upload
32
- row = {
 
 
 
 
 
 
 
 
 
33
  "session_id": data["session_id"],
34
  "session_start_time": data["session_start_time"],
35
  "session_end_time": data["session_end_time"],
@@ -38,49 +61,78 @@ def upload_session_to_dataset(session_file: str, repo_id: str, max_retries: int
38
  "events": json.dumps(data["events"]),
39
  }
40
 
41
- hf_token = os.getenv("HF_TOKEN")
42
- if not hf_token:
43
- # Update status to failed
44
- data["upload_status"] = "failed"
45
- with open(session_file, "w") as f:
46
- json.dump(data, f, indent=2)
47
- return False
48
 
49
- # Try to load existing dataset and append
50
- for attempt in range(max_retries):
51
- try:
 
 
 
 
 
 
 
 
 
 
 
 
 
 
52
  try:
53
- existing_dataset = load_dataset(repo_id, split="train")
54
- new_dataset = Dataset.from_dict(
55
- {k: list(existing_dataset[k]) + [v] for k, v in row.items()}
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
56
  )
57
- except Exception:
58
- # Dataset doesn't exist yet, create new one
59
- new_dataset = Dataset.from_dict({k: [v] for k, v in row.items()})
60
 
61
- # Push to hub
62
- new_dataset.push_to_hub(repo_id, private=True, token=hf_token)
63
-
64
- # Update status to success
65
- data["upload_status"] = "success"
66
- data["upload_url"] = f"https://huggingface.co/datasets/{repo_id}"
67
- with open(session_file, "w") as f:
68
- json.dump(data, f, indent=2)
69
 
70
- return True
71
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
72
  except Exception:
73
- if attempt < max_retries - 1:
74
- import time
75
-
76
- wait_time = 2**attempt
77
- time.sleep(wait_time)
78
- else:
79
- # Final attempt failed
80
- data["upload_status"] = "failed"
81
- with open(session_file, "w") as f:
82
- json.dump(data, f, indent=2)
83
- return False
84
 
85
  except Exception as e:
86
  print(f"Error uploading session: {e}", file=sys.stderr)
@@ -104,7 +156,7 @@ def retry_failed_uploads(directory: str, repo_id: str):
104
 
105
  # Only retry pending or failed uploads
106
  if upload_status in ["pending", "failed"]:
107
- upload_session_to_dataset(str(filepath), repo_id)
108
 
109
  except Exception:
110
  pass
@@ -124,7 +176,7 @@ if __name__ == "__main__":
124
  sys.exit(1)
125
  session_file = sys.argv[2]
126
  repo_id = sys.argv[3]
127
- success = upload_session_to_dataset(session_file, repo_id)
128
  sys.exit(0 if success else 1)
129
 
130
  elif command == "retry":
 
2
  """
3
  Standalone script for uploading session trajectories to HuggingFace.
4
  This runs as a separate process to avoid blocking the main agent.
5
+ Uses individual file uploads to avoid race conditions.
6
  """
7
 
8
  import json
9
  import os
10
  import sys
11
+ from datetime import datetime
12
  from pathlib import Path
13
 
14
 
15
+ def upload_session_as_file(
16
+ session_file: str, repo_id: str, max_retries: int = 3
17
+ ) -> bool:
18
+ """
19
+ Upload a single session as an individual JSONL file (no race conditions)
20
+
21
+ Args:
22
+ session_file: Path to local session JSON file
23
+ repo_id: HuggingFace dataset repo ID
24
+ max_retries: Number of retry attempts
25
+
26
+ Returns:
27
+ True if successful, False otherwise
28
+ """
29
  try:
30
+ from huggingface_hub import HfApi
31
  except ImportError:
32
+ print("Error: huggingface_hub library not available", file=sys.stderr)
33
  return False
34
 
35
  try:
 
42
  if upload_status == "success":
43
  return True
44
 
45
+ hf_token = os.getenv("HF_TOKEN")
46
+ if not hf_token:
47
+ # Update status to failed
48
+ data["upload_status"] = "failed"
49
+ with open(session_file, "w") as f:
50
+ json.dump(data, f, indent=2)
51
+ return False
52
+
53
+ # Prepare JSONL content (single line)
54
+ # Store messages and events as JSON strings to avoid schema conflicts
55
+ session_row = {
56
  "session_id": data["session_id"],
57
  "session_start_time": data["session_start_time"],
58
  "session_end_time": data["session_end_time"],
 
61
  "events": json.dumps(data["events"]),
62
  }
63
 
64
+ # Create temporary JSONL file
65
+ import tempfile
 
 
 
 
 
66
 
67
+ with tempfile.NamedTemporaryFile(
68
+ mode="w", suffix=".jsonl", delete=False
69
+ ) as tmp:
70
+ json.dump(session_row, tmp) # Single line JSON
71
+ tmp_path = tmp.name
72
+
73
+ try:
74
+ # Generate unique path in repo: sessions/YYYY-MM-DD/session_id.jsonl
75
+ session_id = data["session_id"]
76
+ date_str = datetime.fromisoformat(data["session_start_time"]).strftime(
77
+ "%Y-%m-%d"
78
+ )
79
+ repo_path = f"sessions/{date_str}/{session_id}.jsonl"
80
+
81
+ # Upload with retries
82
+ api = HfApi()
83
+ for attempt in range(max_retries):
84
  try:
85
+ # Try to create repo if it doesn't exist (idempotent)
86
+ try:
87
+ api.create_repo(
88
+ repo_id=repo_id,
89
+ repo_type="dataset",
90
+ private=True,
91
+ token=hf_token,
92
+ exist_ok=True, # Don't fail if already exists
93
+ )
94
+
95
+ except Exception:
96
+ # Repo might already exist, continue
97
+ pass
98
+
99
+ # Upload the session file
100
+ api.upload_file(
101
+ path_or_fileobj=tmp_path,
102
+ path_in_repo=repo_path,
103
+ repo_id=repo_id,
104
+ repo_type="dataset",
105
+ token=hf_token,
106
+ commit_message=f"Add session {session_id}",
107
  )
 
 
 
108
 
109
+ # Update local status to success
110
+ data["upload_status"] = "success"
111
+ data["upload_url"] = f"https://huggingface.co/datasets/{repo_id}"
112
+ with open(session_file, "w") as f:
113
+ json.dump(data, f, indent=2)
 
 
 
114
 
115
+ return True
116
 
117
+ except Exception:
118
+ if attempt < max_retries - 1:
119
+ import time
120
+
121
+ wait_time = 2**attempt
122
+ time.sleep(wait_time)
123
+ else:
124
+ # Final attempt failed
125
+ data["upload_status"] = "failed"
126
+ with open(session_file, "w") as f:
127
+ json.dump(data, f, indent=2)
128
+ return False
129
+
130
+ finally:
131
+ # Clean up temp file
132
+ try:
133
+ os.unlink(tmp_path)
134
  except Exception:
135
+ pass
 
 
 
 
 
 
 
 
 
 
136
 
137
  except Exception as e:
138
  print(f"Error uploading session: {e}", file=sys.stderr)
 
156
 
157
  # Only retry pending or failed uploads
158
  if upload_status in ["pending", "failed"]:
159
+ upload_session_as_file(str(filepath), repo_id)
160
 
161
  except Exception:
162
  pass
 
176
  sys.exit(1)
177
  session_file = sys.argv[2]
178
  repo_id = sys.argv[3]
179
+ success = upload_session_as_file(session_file, repo_id)
180
  sys.exit(0 if success else 1)
181
 
182
  elif command == "retry":