import ast import hashlib def get_category_id(category): """Maps categorical roles to integers for vector embedding.""" mapping = { 'unknown': 0, 'import': 1, 'function': 2, 'class': 3, 'if': 4, 'while': 5, 'for': 6, 'try': 7, 'expression': 8, 'spacer': 9, 'elif': 10, 'else': 11, 'except': 12, 'return': 13, 'assigned_variable': 14, 'variable_def': 15 } return mapping.get(category, 0) def create_vector(category, level, location, total_lines, parent_path): """ Creates a 6D normalized vector with rounded values to reduce JSON size. """ cat_id = get_category_id(category) start, end = location total_lines = max(1, total_lines) # Calculate metrics span = (end - start + 1) / total_lines center = ((start + end) / 2) / total_lines parent_depth = len(parent_path) # Ancestry weight path_str = "".join(parent_path) parent_weight = (int(hashlib.md5(path_str.encode()).hexdigest(), 16) % 100) / 100.0 # OPTIMIZATION: Round floats to 4 decimals return [ cat_id, level, round(center, 4), round(span, 4), parent_depth, round(parent_weight, 4) ] def parse_source_to_graph(code): try: tree = ast.parse(code) except SyntaxError as e: return {"error": f"Syntax Error on line {e.lineno}: {e.msg}"} lines = code.splitlines(keepends=True) total_lines = len(lines) nodes = [] def traverse(node, parent_path=[], level=0, parent_id=None): category = 'other' name = getattr(node, 'name', None) node_id = f"{type(node).__name__}_{getattr(node, 'lineno', 0)}_{getattr(node, 'col_offset', 0)}" # Categorization logic if isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef)): category = 'function' elif isinstance(node, ast.ClassDef): category = 'class' elif isinstance(node, ast.If): category = 'if'; name = "if" elif isinstance(node, (ast.For, ast.AsyncFor)): category = 'for'; name = "for" elif isinstance(node, ast.While): category = 'while'; name = "while" elif isinstance(node, ast.Return): category = 'return'; name = "return" elif isinstance(node, (ast.Assign, ast.AnnAssign)): category = 'assigned_variable'; name = "assignment" elif isinstance(node, ast.Expr): category = 'expression'; name = "expr" elif isinstance(node, ast.Try): category = 'try'; name = "try" elif isinstance(node, (ast.Import, ast.ImportFrom)): category = 'import'; name = "import" lineno = getattr(node, 'lineno', 0) end_lineno = getattr(node, 'end_lineno', lineno) if lineno == 0: return label = name if name else category if category == 'assigned_variable': targets = getattr(node, 'targets', []) or [getattr(node, 'target', None)] if targets and isinstance(targets[0], ast.Name): label = f"{targets[0].id} =" vector = create_vector(category, level, (lineno, end_lineno), total_lines, parent_path) # OPTIMIZATION: Send 'loc' (location) instead of 'source' string. # Shorten keys to reduce payload size. node_data = { "id": node_id, "lbl": label, # label -> lbl "type": category, "loc": [lineno, end_lineno], # Start/End lines only "vec": vector, # vector -> vec "lvl": level, # level -> lvl "pid": parent_id # parent_id -> pid } if category != 'other': nodes.append(node_data) current_path = parent_path + [node_id] current_parent = node_id next_level = level + 1 else: current_path = parent_path current_parent = parent_id next_level = level for child in ast.iter_child_nodes(node): traverse(child, current_path, next_level, current_parent) for node in tree.body: traverse(node) nodes.sort(key=lambda x: x['loc'][0]) # Update connections to use shorter keys connections = [] node_ids = {n['id'] for n in nodes} for node in nodes: if node['pid'] and node['pid'] in node_ids: connections.append({"f": node['pid'], "t": node['id']}) # from/to -> f/t return {"nodes": nodes, "connections": connections} def generate_connections(nodes): connections = [] node_map = {n['id']: n for n in nodes} for node in nodes: # 1. Structural Hierarchy (Tree) if node['parent_id'] and node['parent_id'] in node_map: connections.append({ "from": node['parent_id'], "to": node['id'], "type": "hierarchy" }) return connections