|
|
|
|
|
""" |
|
|
Generate programming problems from function_dataset_v2.csv using Gemini API. |
|
|
Filters by relevance score and controls API cost. |
|
|
""" |
|
|
|
|
|
import csv |
|
|
import json |
|
|
import os |
|
|
import sys |
|
|
import vertexai |
|
|
from vertexai.generative_models import GenerativeModel |
|
|
from datetime import datetime |
|
|
from typing import Dict, Optional, Tuple, List |
|
|
from concurrent.futures import ThreadPoolExecutor, as_completed |
|
|
import threading |
|
|
|
|
|
|
|
|
|
|
|
PROJECT_ID = "tangou" |
|
|
MODEL_NAME = "gemini-2.5-flash-lite" |
|
|
MIN_RELEVANCE_SCORE = 1 |
|
|
MAX_BUDGET_USD = 50.0 |
|
|
|
|
|
|
|
|
|
|
|
INPUT_PRICE_PER_MILLION = 0.1 |
|
|
OUTPUT_PRICE_PER_MILLION = 0.4 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
PROMPT_TEMPLATE = """You are an expert in scientific computing and computational chemistry/biology/physics. Please create a high-quality programming problem inspired by the following code snippet from a real scientific computing project. |
|
|
|
|
|
The problem should focus on scientific computing concepts such as: |
|
|
- Numerical algorithms and simulations |
|
|
- Data analysis and visualization |
|
|
- Mathematical modeling |
|
|
- Scientific data processing |
|
|
- Computational methods in chemistry, biology, or physics |
|
|
|
|
|
Code snippet for inspiration: |
|
|
```python |
|
|
{code} |
|
|
``` |
|
|
|
|
|
Present your output in two distinct sections: |
|
|
|
|
|
[Problem Description] |
|
|
Create a **completely self-contained** problem description that: |
|
|
- Does NOT directly reference the code snippet above |
|
|
- Provides all necessary context and background |
|
|
- Clearly states what needs to be implemented |
|
|
- Specifies input/output format and constraints |
|
|
- Is inspired by the scientific computing concepts in the code but creates a NEW, interesting problem |
|
|
- Assumes common programming knowledge but explains any domain-specific concepts |
|
|
|
|
|
[Solution] |
|
|
Provide a comprehensive, **correct** Python solution that: |
|
|
- Accurately solves the problem described |
|
|
- Includes clear comments explaining the approach |
|
|
- Uses appropriate scientific computing libraries (numpy, scipy, etc.) when relevant |
|
|
- Is complete and runnable |
|
|
- Follows best practices for scientific computing |
|
|
|
|
|
Remember: The problem should be INSPIRED by the code, not a direct copy. Create something educational and interesting for scientific computing practitioners.""" |
|
|
|
|
|
|
|
|
class GeminiAPIClient: |
|
|
"""Client for Gemini API with cost tracking.""" |
|
|
|
|
|
def __init__(self, project_id: str, model_name: str): |
|
|
"""Initialize Gemini API client. |
|
|
|
|
|
Args: |
|
|
project_id: Google Cloud project ID |
|
|
model_name: Name of the Gemini model to use |
|
|
""" |
|
|
vertexai.init(project=project_id) |
|
|
self.model = GenerativeModel(model_name) |
|
|
self.total_input_tokens = 0 |
|
|
self.total_output_tokens = 0 |
|
|
self.total_requests = 0 |
|
|
self.total_cost = 0.0 |
|
|
self._lock = threading.Lock() |
|
|
|
|
|
def generate_content(self, prompt: str) -> Tuple[str, Dict]: |
|
|
"""Generate content using Gemini API and track usage. |
|
|
|
|
|
Args: |
|
|
prompt: The prompt to send to the API |
|
|
|
|
|
Returns: |
|
|
Tuple of (response_text, usage_info) |
|
|
usage_info contains: input_tokens, output_tokens, cost |
|
|
""" |
|
|
try: |
|
|
response = self.model.generate_content(prompt) |
|
|
usage_metadata = response.usage_metadata |
|
|
|
|
|
input_tokens = usage_metadata.prompt_token_count |
|
|
output_tokens = usage_metadata.candidates_token_count |
|
|
|
|
|
|
|
|
input_cost = (input_tokens / 1_000_000) * INPUT_PRICE_PER_MILLION |
|
|
output_cost = (output_tokens / 1_000_000) * OUTPUT_PRICE_PER_MILLION |
|
|
request_cost = input_cost + output_cost |
|
|
|
|
|
|
|
|
with self._lock: |
|
|
self.total_input_tokens += input_tokens |
|
|
self.total_output_tokens += output_tokens |
|
|
self.total_requests += 1 |
|
|
self.total_cost += request_cost |
|
|
|
|
|
usage_info = { |
|
|
'input_tokens': input_tokens, |
|
|
'output_tokens': output_tokens, |
|
|
'total_tokens': input_tokens + output_tokens, |
|
|
'input_cost': input_cost, |
|
|
'output_cost': output_cost, |
|
|
'request_cost': request_cost |
|
|
} |
|
|
|
|
|
return response.text, usage_info |
|
|
|
|
|
except Exception as e: |
|
|
print(f"Error generating content: {e}") |
|
|
raise |
|
|
|
|
|
def get_total_usage(self) -> Dict: |
|
|
"""Get total usage statistics. |
|
|
|
|
|
Returns: |
|
|
Dictionary with total usage information |
|
|
""" |
|
|
return { |
|
|
'total_requests': self.total_requests, |
|
|
'total_input_tokens': self.total_input_tokens, |
|
|
'total_output_tokens': self.total_output_tokens, |
|
|
'total_tokens': self.total_input_tokens + self.total_output_tokens, |
|
|
'total_cost': self.total_cost |
|
|
} |
|
|
|
|
|
def print_usage_summary(self): |
|
|
"""Print a summary of API usage and costs.""" |
|
|
usage = self.get_total_usage() |
|
|
print("\n" + "="*70) |
|
|
print("API USAGE SUMMARY") |
|
|
print("="*70) |
|
|
print(f"Total Requests: {usage['total_requests']}") |
|
|
print(f"Total Input Tokens: {usage['total_input_tokens']:,}") |
|
|
print(f"Total Output Tokens: {usage['total_output_tokens']:,}") |
|
|
print(f"Total Tokens: {usage['total_tokens']:,}") |
|
|
print(f"\nTotal Cost: ${usage['total_cost']:.6f}") |
|
|
print(f"Budget Remaining: ${MAX_BUDGET_USD - usage['total_cost']:.6f}") |
|
|
print("="*70) |
|
|
|
|
|
|
|
|
def process_function_dataset( |
|
|
input_file: str, |
|
|
output_file: str, |
|
|
min_score: int = MIN_RELEVANCE_SCORE, |
|
|
max_budget: float = MAX_BUDGET_USD, |
|
|
max_samples: Optional[int] = None, |
|
|
start_from: int = 0, |
|
|
max_workers: int = 5 |
|
|
): |
|
|
"""Process function dataset and generate programming problems. |
|
|
|
|
|
Args: |
|
|
input_file: Path to function_dataset_v2.csv |
|
|
output_file: Path to output JSONL file |
|
|
min_score: Minimum relevance score to process |
|
|
max_budget: Maximum budget in USD |
|
|
max_samples: Maximum number of samples to process (None for all) |
|
|
start_from: Skip first N rows (for resuming) |
|
|
max_workers: Maximum number of concurrent workers (default: 5) |
|
|
""" |
|
|
print(f"Starting programming problem generation...") |
|
|
print(f"Input: {input_file}") |
|
|
print(f"Output: {output_file}") |
|
|
print(f"Min Relevance Score: {min_score}") |
|
|
print(f"Max Budget: ${max_budget:.2f}") |
|
|
print(f"Max Workers: {max_workers}") |
|
|
if max_samples: |
|
|
print(f"Max Samples: {max_samples}") |
|
|
print(f"Starting from row: {start_from}") |
|
|
print() |
|
|
|
|
|
|
|
|
processed_rows = set() |
|
|
if os.path.exists(output_file): |
|
|
print(f"Checking existing output file for already processed rows...") |
|
|
try: |
|
|
with open(output_file, 'r', encoding='utf-8') as f: |
|
|
for line in f: |
|
|
try: |
|
|
data = json.loads(line.strip()) |
|
|
if 'row_number' in data: |
|
|
processed_rows.add(data['row_number']) |
|
|
except json.JSONDecodeError: |
|
|
continue |
|
|
print(f"Found {len(processed_rows)} already processed rows. These will be skipped.") |
|
|
except Exception as e: |
|
|
print(f"Warning: Could not read existing output file: {e}") |
|
|
else: |
|
|
print(f"No existing output file found. Will create new file.") |
|
|
print() |
|
|
|
|
|
|
|
|
client = GeminiAPIClient(PROJECT_ID, MODEL_NAME) |
|
|
|
|
|
|
|
|
total_rows = 0 |
|
|
processed = 0 |
|
|
skipped_low_score = 0 |
|
|
skipped_no_code = 0 |
|
|
skipped_already_processed = 0 |
|
|
errors = 0 |
|
|
|
|
|
|
|
|
tasks = [] |
|
|
|
|
|
with open(input_file, 'r', encoding='utf-8') as infile: |
|
|
reader = csv.DictReader(infile) |
|
|
|
|
|
for row in reader: |
|
|
total_rows += 1 |
|
|
|
|
|
|
|
|
if total_rows <= start_from: |
|
|
continue |
|
|
|
|
|
|
|
|
if total_rows in processed_rows: |
|
|
skipped_already_processed += 1 |
|
|
continue |
|
|
|
|
|
|
|
|
if max_samples and len(tasks) >= max_samples: |
|
|
break |
|
|
|
|
|
|
|
|
try: |
|
|
relevance_score = int(row.get('relevance_score', 0)) |
|
|
except (ValueError, TypeError): |
|
|
relevance_score = 0 |
|
|
|
|
|
if relevance_score < min_score: |
|
|
skipped_low_score += 1 |
|
|
continue |
|
|
|
|
|
|
|
|
function_content = row.get('function_content', '').strip() |
|
|
if not function_content or len(function_content) < 50: |
|
|
skipped_no_code += 1 |
|
|
continue |
|
|
|
|
|
|
|
|
metadata = { |
|
|
'original_index': row.get('original_index'), |
|
|
'function_name': row.get('function_name'), |
|
|
'repo_name': row.get('repo_name'), |
|
|
'path': row.get('path'), |
|
|
'language': row.get('language'), |
|
|
'relevance_score': relevance_score, |
|
|
'function_start_line': row.get('function_start_line'), |
|
|
'function_end_line': row.get('function_end_line'), |
|
|
} |
|
|
|
|
|
|
|
|
prompt = PROMPT_TEMPLATE.format(code=function_content) |
|
|
|
|
|
tasks.append({ |
|
|
'row_number': total_rows, |
|
|
'metadata': metadata, |
|
|
'prompt': prompt, |
|
|
'function_content': function_content |
|
|
}) |
|
|
|
|
|
print(f"Total rows read: {total_rows}") |
|
|
print(f"Tasks to process: {len(tasks)}") |
|
|
print(f"Skipped (low score): {skipped_low_score}") |
|
|
print(f"Skipped (no/short code): {skipped_no_code}") |
|
|
print(f"\nStarting concurrent processing with {max_workers} workers...\n") |
|
|
|
|
|
|
|
|
def process_task(task): |
|
|
"""Process a single task.""" |
|
|
try: |
|
|
row_number = task['row_number'] |
|
|
metadata = task['metadata'] |
|
|
prompt = task['prompt'] |
|
|
|
|
|
print(f"Processing row {row_number} (score={metadata['relevance_score']}, func={metadata['function_name']})...", end=' ') |
|
|
|
|
|
response_text, usage_info = client.generate_content(prompt) |
|
|
|
|
|
print(f"✓ (${usage_info['request_cost']:.6f}, {usage_info['total_tokens']} tokens)") |
|
|
|
|
|
|
|
|
return { |
|
|
'success': True, |
|
|
'data': { |
|
|
'metadata': metadata, |
|
|
'prompt': prompt, |
|
|
'response': response_text, |
|
|
'usage': usage_info, |
|
|
'timestamp': datetime.now().isoformat(), |
|
|
'row_number': row_number |
|
|
} |
|
|
} |
|
|
|
|
|
except Exception as e: |
|
|
print(f"✗ Error: {e}") |
|
|
return { |
|
|
'success': False, |
|
|
'error': str(e), |
|
|
'row_number': task['row_number'] |
|
|
} |
|
|
|
|
|
|
|
|
mode = 'a' if start_from > 0 else 'w' |
|
|
|
|
|
|
|
|
with open(output_file, mode, encoding='utf-8') as outfile: |
|
|
with ThreadPoolExecutor(max_workers=max_workers) as executor: |
|
|
|
|
|
future_to_task = {executor.submit(process_task, task): task for task in tasks} |
|
|
|
|
|
|
|
|
for future in as_completed(future_to_task): |
|
|
|
|
|
if client.total_cost >= max_budget: |
|
|
print(f"\n⚠️ Budget limit reached (${client.total_cost:.6f} >= ${max_budget:.2f})") |
|
|
print(f"Cancelling remaining tasks...") |
|
|
|
|
|
for f in future_to_task: |
|
|
f.cancel() |
|
|
break |
|
|
|
|
|
result = future.result() |
|
|
|
|
|
if result['success']: |
|
|
|
|
|
outfile.write(json.dumps(result['data'], ensure_ascii=False) + '\n') |
|
|
outfile.flush() |
|
|
|
|
|
processed += 1 |
|
|
|
|
|
|
|
|
if processed % 10 == 0: |
|
|
print(f"\n--- Progress: {processed} problems generated, ${client.total_cost:.6f} spent ---\n") |
|
|
else: |
|
|
errors += 1 |
|
|
|
|
|
|
|
|
print("\n" + "="*70) |
|
|
print("PROCESSING COMPLETE") |
|
|
print("="*70) |
|
|
print(f"Total rows read: {total_rows}") |
|
|
print(f"Successfully processed: {processed}") |
|
|
print(f"Skipped (low score): {skipped_low_score}") |
|
|
print(f"Skipped (no/short code): {skipped_no_code}") |
|
|
print(f"Errors: {errors}") |
|
|
|
|
|
client.print_usage_summary() |
|
|
|
|
|
print(f"\nResults saved to: {output_file}") |
|
|
|
|
|
return processed |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
import argparse |
|
|
|
|
|
parser = argparse.ArgumentParser( |
|
|
description='Generate programming problems from function dataset using Gemini API' |
|
|
) |
|
|
parser.add_argument( |
|
|
'--input', |
|
|
default='function_dataset_v2.csv', |
|
|
help='Input CSV file (default: function_dataset_v2.csv)' |
|
|
) |
|
|
parser.add_argument( |
|
|
'--output', |
|
|
default='programming_problems.jsonl', |
|
|
help='Output JSONL file (default: programming_problems.jsonl)' |
|
|
) |
|
|
parser.add_argument( |
|
|
'--min-score', |
|
|
type=int, |
|
|
default=MIN_RELEVANCE_SCORE, |
|
|
help=f'Minimum relevance score (default: {MIN_RELEVANCE_SCORE})' |
|
|
) |
|
|
parser.add_argument( |
|
|
'--max-budget', |
|
|
type=float, |
|
|
default=MAX_BUDGET_USD, |
|
|
help=f'Maximum budget in USD (default: {MAX_BUDGET_USD})' |
|
|
) |
|
|
parser.add_argument( |
|
|
'--max-samples', |
|
|
type=int, |
|
|
default=None, |
|
|
help='Maximum number of samples to process (default: no limit)' |
|
|
) |
|
|
parser.add_argument( |
|
|
'--start-from', |
|
|
type=int, |
|
|
default=0, |
|
|
help='Start from row N (for resuming, default: 0)' |
|
|
) |
|
|
parser.add_argument( |
|
|
'--max-workers', |
|
|
type=int, |
|
|
default=10, |
|
|
help='Maximum number of concurrent workers (default: 10)' |
|
|
) |
|
|
|
|
|
args = parser.parse_args() |
|
|
|
|
|
|
|
|
if not os.path.exists(args.input): |
|
|
print(f"Error: Input file not found: {args.input}") |
|
|
sys.exit(1) |
|
|
|
|
|
try: |
|
|
process_function_dataset( |
|
|
input_file=args.input, |
|
|
output_file=args.output, |
|
|
min_score=args.min_score, |
|
|
max_budget=args.max_budget, |
|
|
max_samples=args.max_samples, |
|
|
start_from=args.start_from, |
|
|
max_workers=args.max_workers |
|
|
) |
|
|
print("\n✅ Success!") |
|
|
except KeyboardInterrupt: |
|
|
print("\n\n⚠️ Interrupted by user. Progress has been saved to output file.") |
|
|
print(" You can resume by using --start-from <row_number>") |
|
|
sys.exit(0) |
|
|
except Exception as e: |
|
|
print(f"\n❌ Error: {e}") |
|
|
import traceback |
|
|
traceback.print_exc() |
|
|
sys.exit(1) |
|
|
|