| | |
| | """ |
| | Export repository files to CSV datasets grouped by keyword. |
| | |
| | This script processes all files in repos_filtered directory, groups them by keyword |
| | from repos_check_history.csv, and exports to separate CSV files for each keyword. |
| | """ |
| |
|
| | import os |
| | import csv |
| | import re |
| | from pathlib import Path |
| | from collections import defaultdict |
| | from typing import Dict, List, Tuple, Optional |
| | import pandas as pd |
| | from tqdm import tqdm |
| | import logging |
| |
|
| | |
| | logging.basicConfig( |
| | level=logging.INFO, |
| | format='%(asctime)s - %(levelname)s - %(message)s', |
| | handlers=[ |
| | logging.FileHandler('export_files_to_csv.log'), |
| | logging.StreamHandler() |
| | ] |
| | ) |
| | logger = logging.getLogger(__name__) |
| |
|
| | |
| | REPOS_FILTERED_DIR = Path("/home/weifengsun/tangou1/domain_code/src/workdir/repos_filtered") |
| | REPOS_CHECK_HISTORY_CSV = Path("/home/weifengsun/tangou1/domain_code/src/workdir/repos_check_history.csv") |
| | OUTPUT_DIR = Path("/home/weifengsun/tangou1/domain_code/src/workdir/dataset_csv") |
| | MAX_FILE_SIZE = None |
| |
|
| | |
| | SKIP_DIRS = {'.git', 'node_modules', '__pycache__', '.pytest_cache', '.mypy_cache', |
| | 'venv', 'env', '.venv', '.env', 'dist', 'build', '.eggs', '*.egg-info'} |
| |
|
| | |
| | BINARY_EXTENSIONS = {'.png', '.jpg', '.jpeg', '.gif', '.bmp', '.ico', '.svg', |
| | '.pdf', '.zip', '.tar', '.gz', '.bz2', '.xz', '.7z', |
| | '.exe', '.dll', '.so', '.dylib', '.bin', '.o', '.a', |
| | '.pyc', '.pyo', '.pyd', '.class', '.jar', '.war', |
| | '.mp3', '.mp4', '.avi', '.mov', '.wav', '.flac', |
| | '.db', '.sqlite', '.sqlite3', '.h5', '.hdf5', '.pkl', '.pickle'} |
| |
|
| | |
| | LANGUAGE_MAP = { |
| | '.py': 'Python', |
| | '.js': 'JavaScript', |
| | '.ts': 'TypeScript', |
| | '.java': 'Java', |
| | '.cpp': 'C++', |
| | '.c': 'C', |
| | '.cs': 'C#', |
| | '.go': 'Go', |
| | '.rs': 'Rust', |
| | '.rb': 'Ruby', |
| | '.php': 'PHP', |
| | '.swift': 'Swift', |
| | '.kt': 'Kotlin', |
| | '.scala': 'Scala', |
| | '.r': 'R', |
| | '.m': 'MATLAB', |
| | '.jl': 'Julia', |
| | '.sh': 'Shell', |
| | '.bash': 'Bash', |
| | '.zsh': 'Zsh', |
| | '.sql': 'SQL', |
| | '.html': 'HTML', |
| | '.css': 'CSS', |
| | '.xml': 'XML', |
| | '.json': 'JSON', |
| | '.yaml': 'YAML', |
| | '.yml': 'YAML', |
| | '.md': 'Markdown', |
| | '.tex': 'LaTeX', |
| | '.f90': 'Fortran', |
| | '.f': 'Fortran', |
| | '.f77': 'Fortran', |
| | '.f95': 'Fortran', |
| | '.cu': 'CUDA', |
| | '.cl': 'OpenCL', |
| | '.hs': 'Haskell', |
| | '.ml': 'OCaml', |
| | '.fs': 'F#', |
| | '.vb': 'Visual Basic', |
| | '.pl': 'Perl', |
| | '.pm': 'Perl', |
| | '.lua': 'Lua', |
| | '.vim': 'Vim script', |
| | '.cmake': 'CMake', |
| | '.makefile': 'Makefile', |
| | '.dockerfile': 'Dockerfile', |
| | } |
| |
|
| |
|
| | def sanitize_keyword(keyword: str) -> str: |
| | """Sanitize keyword for use in filename.""" |
| | |
| | sanitized = re.sub(r'[^\w\s-]', '_', keyword) |
| | |
| | sanitized = re.sub(r'[\s-]+', '_', sanitized) |
| | |
| | sanitized = re.sub(r'_+', '_', sanitized) |
| | |
| | sanitized = sanitized.strip('_') |
| | return sanitized |
| |
|
| |
|
| | def load_keyword_mapping() -> Dict[str, str]: |
| | """Load keyword mapping from repos_check_history.csv.""" |
| | logger.info(f"Loading keyword mapping from {REPOS_CHECK_HISTORY_CSV}") |
| | |
| | mapping = {} |
| | try: |
| | |
| | chunk_size = 100000 |
| | for chunk in pd.read_csv(REPOS_CHECK_HISTORY_CSV, chunksize=chunk_size): |
| | for _, row in chunk.iterrows(): |
| | full_name = row['full_name'] |
| | keyword = row['keyword'] |
| | mapping[full_name] = keyword |
| | |
| | logger.info(f"Loaded {len(mapping)} keyword mappings") |
| | return mapping |
| | except Exception as e: |
| | logger.error(f"Error loading keyword mapping: {e}") |
| | raise |
| |
|
| |
|
| | def is_binary_file(file_path: Path) -> bool: |
| | """Check if file is binary by extension and content.""" |
| | |
| | if file_path.suffix.lower() in BINARY_EXTENSIONS: |
| | return True |
| | |
| | |
| | for part in file_path.parts: |
| | if part in SKIP_DIRS or part.startswith('.'): |
| | return True |
| | |
| | |
| | try: |
| | with open(file_path, 'rb') as f: |
| | chunk = f.read(512) |
| | |
| | if b'\x00' in chunk: |
| | return True |
| | |
| | try: |
| | chunk.decode('utf-8') |
| | except UnicodeDecodeError: |
| | return True |
| | except Exception: |
| | return True |
| | |
| | return False |
| |
|
| |
|
| | def should_skip_file(file_path: Path) -> bool: |
| | """Determine if file should be skipped.""" |
| | |
| | for part in file_path.parts: |
| | if part in SKIP_DIRS: |
| | return True |
| | if part.startswith('.') and part != '.': |
| | return True |
| | |
| | |
| | file_name = file_path.name.lower() |
| | if file_name.startswith('readme') and file_path.suffix.lower() in {'.md', '.markdown', '.txt'}: |
| | return True |
| | |
| | |
| | if is_binary_file(file_path): |
| | return True |
| | |
| | return False |
| |
|
| |
|
| | def get_language(file_path: Path) -> str: |
| | """Get programming language from file extension.""" |
| | ext = file_path.suffix.lower() |
| | return LANGUAGE_MAP.get(ext, 'Unknown') |
| |
|
| |
|
| | def read_file_content(file_path: Path) -> Optional[str]: |
| | """Read file content, handling encoding issues.""" |
| | try: |
| | |
| | with open(file_path, 'r', encoding='utf-8') as f: |
| | content = f.read() |
| | return content |
| | except UnicodeDecodeError: |
| | |
| | encodings = ['latin-1', 'iso-8859-1', 'cp1252'] |
| | for encoding in encodings: |
| | try: |
| | with open(file_path, 'r', encoding=encoding) as f: |
| | content = f.read() |
| | logger.warning(f"Read {file_path} with {encoding} encoding") |
| | return content |
| | except (UnicodeDecodeError, LookupError): |
| | continue |
| | |
| | logger.warning(f"Could not decode {file_path}, skipping") |
| | return None |
| | except Exception as e: |
| | logger.error(f"Error reading {file_path}: {e}") |
| | return None |
| |
|
| |
|
| | def process_file(file_path: Path, repo_name: str, keyword: str) -> Optional[Dict]: |
| | """Process a single file and return its metadata and content.""" |
| | if should_skip_file(file_path): |
| | return None |
| | |
| | try: |
| | file_size = file_path.stat().st_size |
| | |
| | |
| | repo_dir = REPOS_FILTERED_DIR / repo_name |
| | try: |
| | relative_path = file_path.relative_to(repo_dir) |
| | except ValueError: |
| | |
| | return None |
| | |
| | |
| | content = read_file_content(file_path) |
| | if content is None: |
| | return None |
| | |
| | |
| | line_count = content.count('\n') + (1 if content else 0) |
| | |
| | return { |
| | 'keyword': keyword, |
| | 'repo_name': repo_name.replace('___', '/'), |
| | 'file_path': str(relative_path), |
| | 'file_extension': file_path.suffix, |
| | 'file_size': file_size, |
| | 'line_count': line_count, |
| | 'content': content, |
| | 'language': get_language(file_path) |
| | } |
| | except Exception as e: |
| | logger.error(f"Error processing {file_path}: {e}") |
| | return None |
| |
|
| |
|
| | def process_repo(repo_name: str, keyword_mapping: Dict[str, str]) -> List[Dict]: |
| | """Process all files in a repository.""" |
| | repo_dir = REPOS_FILTERED_DIR / repo_name |
| | |
| | if not repo_dir.exists() or not repo_dir.is_dir(): |
| | return [] |
| | |
| | |
| | full_name = repo_name.replace('___', '/') |
| | keyword = keyword_mapping.get(full_name) |
| | |
| | if not keyword: |
| | logger.debug(f"No keyword found for {full_name}, skipping") |
| | return [] |
| | |
| | results = [] |
| | |
| | |
| | try: |
| | for root, dirs, files in os.walk(repo_dir): |
| | |
| | dirs[:] = [d for d in dirs if d not in SKIP_DIRS and not d.startswith('.')] |
| | |
| | for file in files: |
| | file_path = Path(root) / file |
| | result = process_file(file_path, repo_name, keyword) |
| | if result: |
| | results.append(result) |
| | except Exception as e: |
| | logger.error(f"Error walking {repo_dir}: {e}") |
| | |
| | return results |
| |
|
| |
|
| | class CSVWriterManager: |
| | """Manager for CSV writers - handles opening, writing, and closing CSV files.""" |
| | |
| | def __init__(self, output_dir: Path): |
| | self.output_dir = output_dir |
| | self.writers = {} |
| | self.file_counts = defaultdict(int) |
| | self.fieldnames = ['keyword', 'repo_name', 'file_path', 'file_extension', |
| | 'file_size', 'line_count', 'content', 'language'] |
| | |
| | def get_writer(self, keyword: str): |
| | """Get or create a CSV writer for a keyword.""" |
| | if keyword not in self.writers: |
| | sanitized_keyword = sanitize_keyword(keyword) |
| | output_file = self.output_dir / f"dataset_{sanitized_keyword}.csv" |
| | |
| | file_handle = open(output_file, 'w', newline='', encoding='utf-8') |
| | writer = csv.DictWriter(file_handle, fieldnames=self.fieldnames, quoting=csv.QUOTE_ALL) |
| | writer.writeheader() |
| | |
| | self.writers[keyword] = (file_handle, writer) |
| | |
| | return self.writers[keyword][1] |
| | |
| | def write_row(self, keyword: str, row: Dict): |
| | """Write a row to the appropriate CSV file.""" |
| | writer = self.get_writer(keyword) |
| | writer.writerow(row) |
| | self.file_counts[keyword] += 1 |
| | |
| | def close_all(self): |
| | """Close all open file handles.""" |
| | for keyword, (file_handle, _) in self.writers.items(): |
| | file_handle.close() |
| | logger.info(f"Closed dataset_{sanitize_keyword(keyword)}.csv with {self.file_counts[keyword]} files") |
| | |
| | def get_stats(self) -> Tuple[int, int]: |
| | """Return (total_keywords, total_files).""" |
| | return len(self.writers), sum(self.file_counts.values()) |
| |
|
| |
|
| | def main(): |
| | """Main function with streaming write to avoid memory issues.""" |
| | logger.info("Starting file export to CSV (streaming mode)") |
| | |
| | |
| | OUTPUT_DIR.mkdir(parents=True, exist_ok=True) |
| | logger.info(f"Output directory: {OUTPUT_DIR}") |
| | |
| | |
| | keyword_mapping = load_keyword_mapping() |
| | |
| | |
| | logger.info("Scanning repository directories...") |
| | repo_dirs = [d.name for d in REPOS_FILTERED_DIR.iterdir() if d.is_dir()] |
| | logger.info(f"Found {len(repo_dirs)} repositories") |
| | |
| | |
| | csv_manager = CSVWriterManager(OUTPUT_DIR) |
| | |
| | |
| | logger.info("Processing repositories (streaming mode - writing as we go)...") |
| | |
| | total_files_processed = 0 |
| | repos_processed = 0 |
| | repos_with_no_keyword = 0 |
| | |
| | try: |
| | with tqdm(total=len(repo_dirs), desc="Processing repos") as pbar: |
| | for repo_name in repo_dirs: |
| | |
| | full_name = repo_name.replace('___', '/') |
| | keyword = keyword_mapping.get(full_name) |
| | |
| | if not keyword: |
| | repos_with_no_keyword += 1 |
| | pbar.update(1) |
| | continue |
| | |
| | |
| | results = process_repo(repo_name, keyword_mapping) |
| | |
| | if results: |
| | |
| | for result in results: |
| | csv_manager.write_row(result['keyword'], result) |
| | total_files_processed += 1 |
| | repos_processed += 1 |
| | |
| | pbar.update(1) |
| | |
| | |
| | if repos_processed > 0 and repos_processed % 1000 == 0: |
| | logger.info(f"Progress: {repos_processed} repos, {total_files_processed} files") |
| | |
| | finally: |
| | |
| | csv_manager.close_all() |
| | |
| | |
| | total_keywords, total_files = csv_manager.get_stats() |
| | |
| | logger.info("=" * 60) |
| | logger.info("Export completed!") |
| | logger.info(f"Repositories processed: {repos_processed}") |
| | logger.info(f"Repositories with no keyword mapping: {repos_with_no_keyword}") |
| | logger.info(f"Total keywords: {total_keywords}") |
| | logger.info(f"Total files exported: {total_files}") |
| | logger.info(f"Output directory: {OUTPUT_DIR}") |
| | logger.info("=" * 60) |
| |
|
| |
|
| | if __name__ == "__main__": |
| | main() |
| |
|
| |
|