#!/usr/bin/env python3 """ Extract individual functions from enhanced_dataset.csv and create a new dataset. Each function becomes a separate row in the new dataset. Version 2: Better handling of malformed CSV/JSON """ import csv import json import re from collections import defaultdict import sys def clean_json_string(json_str): """ Clean up malformed JSON strings that may have been corrupted by CSV formatting. """ # Remove extra spaces in key names that might have been inserted # This is a bit risky but we'll try to handle common cases # Replace common malformed patterns json_str = re.sub(r'"\s*function_nam\s*e\s*"', '"function_name"', json_str) json_str = re.sub(r'"\s*function_start_line\s*"', '"function_start_line"', json_str) json_str = re.sub(r'"\s*function_end_line\s*"', '"function_end_line"', json_str) json_str = re.sub(r'"\s*relevance_score\s*"', '"relevance_score"', json_str) json_str = re.sub(r'"\s*relevance_reason\s*"', '"relevance_reason"', json_str) json_str = re.sub(r'"\s*doc_start_line\s*"', '"doc_start_line"', json_str) json_str = re.sub(r'"\s*doc_end_line\s*"', '"doc_end_line"', json_str) # Remove markdown bold markers that might have been inserted json_str = json_str.replace('**', '') # Fix spacing issues in keys json_str = re.sub(r'"\s*([a-z_]+)\s*([a-z_]+)\s*([a-z_]*)\s*":', lambda m: '"' + m.group(1) + m.group(2) + (m.group(3) if m.group(3) else '') + '":', json_str) return json_str def extract_function_content(text, start_line, end_line): """ Extract function content from text based on line number range. Args: text: The full code text start_line: Starting line number (1-indexed) end_line: Ending line number (1-indexed) Returns: Extracted function content as string """ lines = text.split('\n') # Convert to 0-indexed (since start_line is 1-indexed, we subtract 1) start_idx = max(0, start_line - 1) end_idx = min(len(lines), end_line) # end_line is inclusive, so we don't subtract 1 function_lines = lines[start_idx:end_idx] return '\n'.join(function_lines) def process_dataset(input_file, output_file): """ Process enhanced_dataset.csv and extract functions. Args: input_file: Path to enhanced_dataset.csv output_file: Path to output CSV file """ print(f"Reading from: {input_file}") print(f"Writing to: {output_file}") # Statistics total_rows = 0 total_functions = 0 score_distribution = defaultdict(int) skipped_rows = 0 parse_errors = 0 empty_function_info = 0 with open(input_file, 'r', encoding='utf-8') as infile, \ open(output_file, 'w', encoding='utf-8', newline='') as outfile: reader = csv.DictReader(infile) # Define output columns fieldnames = [ 'original_index', # Original row number 'function_index', # Index within the file 'repo_name', 'path', 'language', 'license', 'keyword', 'text_hash', 'config', 'split', 'repo_path', 'ds_source', 'function_name', 'function_start_line', 'function_end_line', 'doc_start_line', 'doc_end_line', 'relevance_score', 'relevance_reason', 'function_content' ] writer = csv.DictWriter(outfile, fieldnames=fieldnames) writer.writeheader() # Store all function rows for later sorting all_function_rows = [] print("\nProcessing rows...") for row in reader: total_rows += 1 if total_rows % 1000 == 0: print(f"Processed {total_rows} rows, extracted {total_functions} functions, errors: {parse_errors}...", end='\r') # Parse function_info JSON function_info_str = row.get('function_info', '[]') if not function_info_str or function_info_str.strip() == '': empty_function_info += 1 skipped_rows += 1 continue # Clean the JSON string function_info_str = clean_json_string(function_info_str) # Handle potential CSV escaping issues try: # First try direct JSON parsing function_info_list = json.loads(function_info_str) except (json.JSONDecodeError, ValueError) as e: # If that fails, try with ast.literal_eval as backup try: import ast function_info_list = ast.literal_eval(function_info_str) except: # If still fails, skip this row parse_errors += 1 if parse_errors <= 5: # Only print first 5 errors print(f"\nWarning: Failed to parse function_info in row {total_rows}") if parse_errors == 5: print("(Suppressing further parse error messages...)") skipped_rows += 1 continue # Validate that we got a list if not isinstance(function_info_list, list): skipped_rows += 1 continue # Get the original text text = row.get('text', '') # Extract each function for func_idx, func_info in enumerate(function_info_list): # Validate func_info is a dictionary if not isinstance(func_info, dict): continue # Extract function content start_line = func_info.get('function_start_line', 0) end_line = func_info.get('function_end_line', 0) # Ensure they are integers try: start_line = int(start_line) if start_line else 0 end_line = int(end_line) if end_line else 0 except (ValueError, TypeError): start_line = 0 end_line = 0 if start_line > 0 and end_line > 0: function_content = extract_function_content(text, start_line, end_line) else: function_content = "" # Get relevance score relevance_score = func_info.get('relevance_score', 0) # Ensure it's an integer try: relevance_score = int(relevance_score) if relevance_score else 0 except (ValueError, TypeError): relevance_score = 0 # Track score distribution (in buckets of 10) score_bucket = (relevance_score // 10) * 10 score_distribution[score_bucket] += 1 # Create new row new_row = { 'original_index': row.get('Unnamed: 0', row.get('Unnamed: 0.1', total_rows - 1)), 'function_index': func_idx, 'repo_name': row.get('repo_name', ''), 'path': row.get('path', ''), 'language': row.get('language', ''), 'license': row.get('license', ''), 'keyword': row.get('keyword', ''), 'text_hash': row.get('text_hash', ''), 'config': row.get('config', ''), 'split': row.get('split', ''), 'repo_path': row.get('repo_path', ''), 'ds_source': row.get('ds_source', ''), 'function_name': func_info.get('function_name', ''), 'function_start_line': start_line, 'function_end_line': end_line, 'doc_start_line': func_info.get('doc_start_line', ''), 'doc_end_line': func_info.get('doc_end_line', ''), 'relevance_score': relevance_score, 'relevance_reason': func_info.get('relevance_reason', ''), 'function_content': function_content } all_function_rows.append(new_row) total_functions += 1 print(f"\n\nTotal rows processed: {total_rows}") print(f"Total functions extracted: {total_functions}") print(f"Skipped rows:") print(f" - Empty function_info: {empty_function_info}") print(f" - Parse errors: {parse_errors}") print(f" - Total skipped: {skipped_rows}") # Sort by relevance_score (descending - highest first) print("\nSorting by relevance score...") all_function_rows.sort(key=lambda x: x['relevance_score'], reverse=True) # Write sorted rows print("Writing sorted data to output file...") for row in all_function_rows: writer.writerow(row) print(f"\nSuccessfully written {total_functions} functions to {output_file}") # Print score distribution print("\n" + "="*70) print("SCORE DISTRIBUTION") print("="*70) print(f"{'Score Range':<15} {'Count':<12} {'Percentage':<12} {'Visualization'}") print("-"*70) # Sort by score range (descending) sorted_scores = sorted(score_distribution.items(), reverse=True) # Filter out anomalous scores (very negative values) normal_scores = [(k, v) for k, v in sorted_scores if k >= 0] anomalous_scores = [(k, v) for k, v in sorted_scores if k < 0] for score_bucket, count in normal_scores: percentage = (count / total_functions * 100) if total_functions > 0 else 0 bar = 'ā–ˆ' * min(50, int(percentage / 2)) # Scale bar to fit print(f"{score_bucket:>3}-{score_bucket+9:<9} {count:<12} {percentage:>6.2f}% {bar}") if anomalous_scores: print("\nAnomalous scores (negative or out of range):") for score_bucket, count in anomalous_scores: percentage = (count / total_functions * 100) if total_functions > 0 else 0 print(f"{score_bucket:>15} {count:<12} {percentage:>6.2f}%") print("-"*70) print(f"{'Total':<15} {total_functions:<12} {'100.00%':<12}") print("="*70) # Additional statistics if total_functions > 0: # Filter out anomalous scores for statistics valid_scores = [row['relevance_score'] for row in all_function_rows if 0 <= row['relevance_score'] <= 100] if valid_scores: avg_score = sum(valid_scores) / len(valid_scores) max_score = max(valid_scores) min_score = min(valid_scores) print(f"\nScore Statistics (valid scores 0-100 only):") print(f" Average Score: {avg_score:.2f}") print(f" Maximum Score: {max_score}") print(f" Minimum Score: {min_score}") print(f" Valid Functions: {len(valid_scores)} / {total_functions}") if __name__ == "__main__": input_file = "enhanced_dataset.csv" output_file = "function_dataset_v2.csv" # Allow command line arguments if len(sys.argv) > 1: input_file = sys.argv[1] if len(sys.argv) > 2: output_file = sys.argv[2] try: process_dataset(input_file, output_file) print("\nāœ… Processing complete!") except FileNotFoundError: print(f"āŒ Error: File '{input_file}' not found.") sys.exit(1) except Exception as e: print(f"āŒ Error: {e}") import traceback traceback.print_exc() sys.exit(1)