File size: 4,922 Bytes
1684743
ba7e5cb
 
 
 
 
 
 
 
 
 
 
a4421ce
4fa7df9
ba7e5cb
5bed3d1
ba7e5cb
 
 
 
 
5bed3d1
ba7e5cb
 
4fa7df9
ba7e5cb
5bed3d1
ba7e5cb
 
 
5bed3d1
ba7e5cb
 
 
5bed3d1
 
ba7e5cb
5bed3d1
ba7e5cb
 
 
1684743
 
ba7e5cb
 
1684743
ba7e5cb
 
 
 
 
 
 
 
 
 
5bed3d1
ba7e5cb
 
 
 
 
 
 
 
5bed3d1
ba7e5cb
 
 
 
5bed3d1
ba7e5cb
 
 
 
 
 
 
 
 
5bed3d1
 
ba7e5cb
 
5bed3d1
ba7e5cb
5bed3d1
 
 
 
ba7e5cb
 
 
 
 
 
 
 
 
 
 
1684743
ba7e5cb
 
1684743
ba7e5cb
 
 
5bed3d1
ba7e5cb
5bed3d1
 
 
 
 
 
 
 
 
ba7e5cb
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
import ast
import hashlib

def get_category_id(category):
    """Maps categorical roles to integers for vector embedding."""
    mapping = {
        'unknown': 0, 'import': 1, 'function': 2, 'class': 3, 
        'if': 4, 'while': 5, 'for': 6, 'try': 7, 'expression': 8, 
        'spacer': 9, 'elif': 10, 'else': 11, 'except': 12, 
        'return': 13, 'assigned_variable': 14, 'variable_def': 15
    }
    return mapping.get(category, 0)

def create_vector(category, level, location, total_lines, parent_path):
    """
    Creates a 6D normalized vector with rounded values to reduce JSON size.
    """
    cat_id = get_category_id(category)
    start, end = location
    total_lines = max(1, total_lines)
    
    # Calculate metrics
    span = (end - start + 1) / total_lines
    center = ((start + end) / 2) / total_lines
    parent_depth = len(parent_path)
    
    # Ancestry weight
    path_str = "".join(parent_path)
    parent_weight = (int(hashlib.md5(path_str.encode()).hexdigest(), 16) % 100) / 100.0
    
    # OPTIMIZATION: Round floats to 4 decimals
    return [
        cat_id, 
        level, 
        round(center, 4), 
        round(span, 4), 
        parent_depth, 
        round(parent_weight, 4)
    ]

def parse_source_to_graph(code):
    try:
        tree = ast.parse(code)
    except SyntaxError as e:
        return {"error": f"Syntax Error on line {e.lineno}: {e.msg}"}

    lines = code.splitlines(keepends=True)
    total_lines = len(lines)
    nodes = []
    
    def traverse(node, parent_path=[], level=0, parent_id=None):
        category = 'other'
        name = getattr(node, 'name', None)
        node_id = f"{type(node).__name__}_{getattr(node, 'lineno', 0)}_{getattr(node, 'col_offset', 0)}"
        
        # Categorization logic
        if isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef)): category = 'function'
        elif isinstance(node, ast.ClassDef): category = 'class'
        elif isinstance(node, ast.If): category = 'if'; name = "if"
        elif isinstance(node, (ast.For, ast.AsyncFor)): category = 'for'; name = "for"
        elif isinstance(node, ast.While): category = 'while'; name = "while"
        elif isinstance(node, ast.Return): category = 'return'; name = "return"
        elif isinstance(node, (ast.Assign, ast.AnnAssign)): category = 'assigned_variable'; name = "assignment"
        elif isinstance(node, ast.Expr): category = 'expression'; name = "expr"
        elif isinstance(node, ast.Try): category = 'try'; name = "try"
        elif isinstance(node, (ast.Import, ast.ImportFrom)): category = 'import'; name = "import"
        
        lineno = getattr(node, 'lineno', 0)
        end_lineno = getattr(node, 'end_lineno', lineno)
        
        if lineno == 0: return 

        label = name if name else category
        if category == 'assigned_variable':
            targets = getattr(node, 'targets', []) or [getattr(node, 'target', None)]
            if targets and isinstance(targets[0], ast.Name):
                label = f"{targets[0].id} ="
        
        vector = create_vector(category, level, (lineno, end_lineno), total_lines, parent_path)
        
        # OPTIMIZATION: Send 'loc' (location) instead of 'source' string.
        # Shorten keys to reduce payload size.
        node_data = {
            "id": node_id,
            "lbl": label,          # label -> lbl
            "type": category,
            "loc": [lineno, end_lineno], # Start/End lines only
            "vec": vector,         # vector -> vec
            "lvl": level,          # level -> lvl
            "pid": parent_id       # parent_id -> pid
        }
        
        if category != 'other':
            nodes.append(node_data)
            current_path = parent_path + [node_id]
            current_parent = node_id
            next_level = level + 1
        else:
            current_path = parent_path
            current_parent = parent_id
            next_level = level

        for child in ast.iter_child_nodes(node):
            traverse(child, current_path, next_level, current_parent)

    for node in tree.body:
        traverse(node)
    
    nodes.sort(key=lambda x: x['loc'][0])
    
    # Update connections to use shorter keys
    connections = []
    node_ids = {n['id'] for n in nodes}
    for node in nodes:
        if node['pid'] and node['pid'] in node_ids:
            connections.append({"f": node['pid'], "t": node['id']}) # from/to -> f/t

    return {"nodes": nodes, "connections": connections}


def generate_connections(nodes):
    connections = []
    node_map = {n['id']: n for n in nodes}
    
    for node in nodes:
        # 1. Structural Hierarchy (Tree)
        if node['parent_id'] and node['parent_id'] in node_map:
            connections.append({
                "from": node['parent_id'], 
                "to": node['id'], 
                "type": "hierarchy"
            })
            
    return connections