|
|
|
|
|
""" |
|
|
Extract individual functions from enhanced_dataset.csv and create a new dataset. |
|
|
Each function becomes a separate row in the new dataset. |
|
|
""" |
|
|
|
|
|
import csv |
|
|
import json |
|
|
from collections import defaultdict |
|
|
import sys |
|
|
|
|
|
def extract_function_content(text, start_line, end_line): |
|
|
""" |
|
|
Extract function content from text based on line number range. |
|
|
|
|
|
Args: |
|
|
text: The full code text |
|
|
start_line: Starting line number (1-indexed) |
|
|
end_line: Ending line number (1-indexed) |
|
|
|
|
|
Returns: |
|
|
Extracted function content as string |
|
|
""" |
|
|
lines = text.split('\n') |
|
|
|
|
|
start_idx = max(0, start_line - 1) |
|
|
end_idx = min(len(lines), end_line) |
|
|
|
|
|
function_lines = lines[start_idx:end_idx] |
|
|
return '\n'.join(function_lines) |
|
|
|
|
|
|
|
|
def process_dataset(input_file, output_file): |
|
|
""" |
|
|
Process enhanced_dataset.csv and extract functions. |
|
|
|
|
|
Args: |
|
|
input_file: Path to enhanced_dataset.csv |
|
|
output_file: Path to output CSV file |
|
|
""" |
|
|
print(f"Reading from: {input_file}") |
|
|
print(f"Writing to: {output_file}") |
|
|
|
|
|
|
|
|
total_rows = 0 |
|
|
total_functions = 0 |
|
|
score_distribution = defaultdict(int) |
|
|
skipped_rows = 0 |
|
|
|
|
|
with open(input_file, 'r', encoding='utf-8') as infile, \ |
|
|
open(output_file, 'w', encoding='utf-8', newline='') as outfile: |
|
|
|
|
|
reader = csv.DictReader(infile) |
|
|
|
|
|
|
|
|
fieldnames = [ |
|
|
'original_index', |
|
|
'function_index', |
|
|
'repo_name', |
|
|
'path', |
|
|
'language', |
|
|
'license', |
|
|
'keyword', |
|
|
'text_hash', |
|
|
'config', |
|
|
'split', |
|
|
'repo_path', |
|
|
'ds_source', |
|
|
'function_name', |
|
|
'function_start_line', |
|
|
'function_end_line', |
|
|
'doc_start_line', |
|
|
'doc_end_line', |
|
|
'relevance_score', |
|
|
'relevance_reason', |
|
|
'function_content' |
|
|
] |
|
|
|
|
|
writer = csv.DictWriter(outfile, fieldnames=fieldnames) |
|
|
writer.writeheader() |
|
|
|
|
|
|
|
|
all_function_rows = [] |
|
|
|
|
|
print("\nProcessing rows...") |
|
|
for row in reader: |
|
|
total_rows += 1 |
|
|
|
|
|
if total_rows % 100 == 0: |
|
|
print(f"Processed {total_rows} rows, extracted {total_functions} functions...", end='\r') |
|
|
|
|
|
|
|
|
function_info_str = row.get('function_info', '[]') |
|
|
if not function_info_str or function_info_str.strip() == '': |
|
|
skipped_rows += 1 |
|
|
continue |
|
|
|
|
|
|
|
|
|
|
|
try: |
|
|
|
|
|
function_info_list = json.loads(function_info_str) |
|
|
except (json.JSONDecodeError, ValueError) as e: |
|
|
|
|
|
try: |
|
|
import ast |
|
|
function_info_list = ast.literal_eval(function_info_str) |
|
|
except: |
|
|
|
|
|
if total_rows <= 20: |
|
|
print(f"\nWarning: Failed to parse function_info in row {total_rows}") |
|
|
skipped_rows += 1 |
|
|
continue |
|
|
|
|
|
|
|
|
if not isinstance(function_info_list, list): |
|
|
skipped_rows += 1 |
|
|
continue |
|
|
|
|
|
|
|
|
text = row.get('text', '') |
|
|
|
|
|
|
|
|
for func_idx, func_info in enumerate(function_info_list): |
|
|
|
|
|
if not isinstance(func_info, dict): |
|
|
continue |
|
|
|
|
|
|
|
|
start_line = func_info.get('function_start_line', 0) |
|
|
end_line = func_info.get('function_end_line', 0) |
|
|
|
|
|
|
|
|
try: |
|
|
start_line = int(start_line) if start_line else 0 |
|
|
end_line = int(end_line) if end_line else 0 |
|
|
except (ValueError, TypeError): |
|
|
start_line = 0 |
|
|
end_line = 0 |
|
|
|
|
|
if start_line > 0 and end_line > 0: |
|
|
function_content = extract_function_content(text, start_line, end_line) |
|
|
else: |
|
|
function_content = "" |
|
|
|
|
|
|
|
|
relevance_score = func_info.get('relevance_score', 0) |
|
|
|
|
|
|
|
|
try: |
|
|
relevance_score = int(relevance_score) if relevance_score else 0 |
|
|
except (ValueError, TypeError): |
|
|
relevance_score = 0 |
|
|
|
|
|
|
|
|
score_bucket = (relevance_score // 10) * 10 |
|
|
score_distribution[score_bucket] += 1 |
|
|
|
|
|
|
|
|
new_row = { |
|
|
'original_index': row.get('Unnamed: 0', row.get('Unnamed: 0.1', total_rows - 1)), |
|
|
'function_index': func_idx, |
|
|
'repo_name': row.get('repo_name', ''), |
|
|
'path': row.get('path', ''), |
|
|
'language': row.get('language', ''), |
|
|
'license': row.get('license', ''), |
|
|
'keyword': row.get('keyword', ''), |
|
|
'text_hash': row.get('text_hash', ''), |
|
|
'config': row.get('config', ''), |
|
|
'split': row.get('split', ''), |
|
|
'repo_path': row.get('repo_path', ''), |
|
|
'ds_source': row.get('ds_source', ''), |
|
|
'function_name': func_info.get('function_name', ''), |
|
|
'function_start_line': start_line, |
|
|
'function_end_line': end_line, |
|
|
'doc_start_line': func_info.get('doc_start_line', ''), |
|
|
'doc_end_line': func_info.get('doc_end_line', ''), |
|
|
'relevance_score': relevance_score, |
|
|
'relevance_reason': func_info.get('relevance_reason', ''), |
|
|
'function_content': function_content |
|
|
} |
|
|
|
|
|
all_function_rows.append(new_row) |
|
|
total_functions += 1 |
|
|
|
|
|
print(f"\n\nTotal rows processed: {total_rows}") |
|
|
print(f"Total functions extracted: {total_functions}") |
|
|
print(f"Skipped rows (no valid function_info): {skipped_rows}") |
|
|
|
|
|
|
|
|
print("\nSorting by relevance score...") |
|
|
all_function_rows.sort(key=lambda x: x['relevance_score'], reverse=True) |
|
|
|
|
|
|
|
|
print("Writing sorted data to output file...") |
|
|
for row in all_function_rows: |
|
|
writer.writerow(row) |
|
|
|
|
|
print(f"\nSuccessfully written {total_functions} functions to {output_file}") |
|
|
|
|
|
|
|
|
print("\n" + "="*60) |
|
|
print("SCORE DISTRIBUTION") |
|
|
print("="*60) |
|
|
print(f"{'Score Range':<20} {'Count':<10} {'Percentage':<10} {'Bar'}") |
|
|
print("-"*60) |
|
|
|
|
|
|
|
|
sorted_scores = sorted(score_distribution.items(), reverse=True) |
|
|
|
|
|
for score_bucket, count in sorted_scores: |
|
|
percentage = (count / total_functions * 100) if total_functions > 0 else 0 |
|
|
bar = '█' * int(percentage / 2) |
|
|
print(f"{score_bucket}-{score_bucket+9:<18} {count:<10} {percentage:>6.2f}% {bar}") |
|
|
|
|
|
print("-"*60) |
|
|
print(f"{'Total':<20} {total_functions:<10} {'100.00%':<10}") |
|
|
print("="*60) |
|
|
|
|
|
|
|
|
if total_functions > 0: |
|
|
scores = [row['relevance_score'] for row in all_function_rows] |
|
|
avg_score = sum(scores) / len(scores) |
|
|
max_score = max(scores) |
|
|
min_score = min(scores) |
|
|
|
|
|
print(f"\nScore Statistics:") |
|
|
print(f" Average Score: {avg_score:.2f}") |
|
|
print(f" Maximum Score: {max_score}") |
|
|
print(f" Minimum Score: {min_score}") |
|
|
print(f" Total Functions: {total_functions}") |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
input_file = "enhanced_dataset.csv" |
|
|
output_file = "function_dataset.csv" |
|
|
|
|
|
|
|
|
if len(sys.argv) > 1: |
|
|
input_file = sys.argv[1] |
|
|
if len(sys.argv) > 2: |
|
|
output_file = sys.argv[2] |
|
|
|
|
|
try: |
|
|
process_dataset(input_file, output_file) |
|
|
print("\n✅ Processing complete!") |
|
|
except FileNotFoundError: |
|
|
print(f"❌ Error: File '{input_file}' not found.") |
|
|
sys.exit(1) |
|
|
except Exception as e: |
|
|
print(f"❌ Error: {e}") |
|
|
import traceback |
|
|
traceback.print_exc() |
|
|
sys.exit(1) |
|
|
|