LaunchLLM / data_aggregation /collectors.py
Bmccloud22's picture
Deploy LaunchLLM - Production AI Training Platform
ec8f374 verified
"""
Data Collectors Module
Provides various data collection methods for training data.
"""
import json
from pathlib import Path
from typing import List, Dict, Any, Optional
from abc import ABC, abstractmethod
class DataCollector(ABC):
"""Base class for data collectors."""
@abstractmethod
def collect(self) -> List[Dict[str, Any]]:
"""
Collect data from source.
Returns:
List of data examples
"""
pass
class TextDataCollector(DataCollector):
"""Collect data from text files."""
def __init__(self, file_path: str):
"""
Initialize text data collector.
Args:
file_path: Path to text file
"""
self.file_path = Path(file_path)
def collect(self) -> List[Dict[str, Any]]:
"""
Collect data from text file.
Returns:
List of text examples
"""
if not self.file_path.exists():
raise FileNotFoundError(f"File not found: {self.file_path}")
with open(self.file_path, 'r', encoding='utf-8') as f:
lines = f.readlines()
# Simple line-by-line collection
data = []
for i, line in enumerate(lines):
line = line.strip()
if line:
data.append({
"id": i,
"text": line,
"source": str(self.file_path)
})
return data
class JSONDataCollector(DataCollector):
"""Collect data from JSON files."""
def __init__(self, file_path: str):
"""
Initialize JSON data collector.
Args:
file_path: Path to JSON file
"""
self.file_path = Path(file_path)
def collect(self) -> List[Dict[str, Any]]:
"""
Collect data from JSON file.
Expects format:
[
{"instruction": "...", "input": "...", "output": "..."},
...
]
Returns:
List of data examples
"""
if not self.file_path.exists():
raise FileNotFoundError(f"File not found: {self.file_path}")
with open(self.file_path, 'r', encoding='utf-8') as f:
data = json.load(f)
# Ensure data is a list
if isinstance(data, dict):
# If it's a dict, try to extract data from common keys
if "data" in data:
data = data["data"]
elif "examples" in data:
data = data["examples"]
else:
# Wrap single example in list
data = [data]
return data
class APIDataCollector(DataCollector):
"""Collect data from API endpoints."""
def __init__(self, api_url: str, headers: Optional[Dict] = None):
"""
Initialize API data collector.
Args:
api_url: API endpoint URL
headers: Optional HTTP headers
"""
self.api_url = api_url
self.headers = headers or {}
def collect(self) -> List[Dict[str, Any]]:
"""
Collect data from API.
Returns:
List of data examples
"""
import requests
response = requests.get(self.api_url, headers=self.headers)
response.raise_for_status()
data = response.json()
# Handle different response formats
if isinstance(data, list):
return data
elif isinstance(data, dict):
# Try common keys
for key in ["data", "results", "items", "examples"]:
if key in data:
return data[key]
# Otherwise wrap in list
return [data]
return []
class CSVDataCollector(DataCollector):
"""Collect data from CSV files."""
def __init__(self, file_path: str):
"""
Initialize CSV data collector.
Args:
file_path: Path to CSV file
"""
self.file_path = Path(file_path)
def collect(self) -> List[Dict[str, Any]]:
"""
Collect data from CSV file.
Returns:
List of data examples (dict per row)
"""
import csv
if not self.file_path.exists():
raise FileNotFoundError(f"File not found: {self.file_path}")
data = []
with open(self.file_path, 'r', encoding='utf-8') as f:
reader = csv.DictReader(f)
for row in reader:
data.append(dict(row))
return data