GeoQuery / backend /scripts /validate_censo.py
GerardCB's picture
Deploy to Spaces (Final Clean)
4851501
import csv
import json
import os
import unicodedata
# Define paths
BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
CSV_PATH = os.path.join(BASE_DIR, "data/censo/censo_panama_2023_unificado.csv")
GEOJSON_PATH = os.path.join(BASE_DIR, "data/base/pan_admin3.geojson")
def normalize_text(text):
if not text:
return ""
# Normalize unicode characters to ASCII (remove accents)
text = unicodedata.normalize('NFKD', text).encode('ASCII', 'ignore').decode('ASCII')
return text.lower().strip()
def validate_censo_integration():
print(f"Loading CSV from {CSV_PATH}...")
csv_data = []
try:
with open(CSV_PATH, mode='r', encoding='utf-8') as f:
reader = csv.DictReader(f)
for row in reader:
csv_data.append(row)
except Exception as e:
print(f"Error loading CSV: {e}")
return
print(f"Loading GeoJSON from {GEOJSON_PATH}...")
try:
with open(GEOJSON_PATH, 'r') as f:
geojson = json.load(f)
except Exception as e:
print(f"Error loading GeoJSON: {e}")
return
# Build GeoJSON Lookup Map: (norm_prov, norm_dist, norm_corr) -> properties
geojson_lookup = {}
# Helper to handle common name variations found in Panama data
# (can add more rules as we discover mismatches)
def clean_name(name):
n = normalize_text(name)
# remove "distrito de", "comarca", etc if needed
return n
print("Building GeoJSON lookup table...")
for feature in geojson['features']:
props = feature.get('properties', {})
p_name = clean_name(props.get('adm1_name'))
d_name = clean_name(props.get('adm2_name'))
c_name = clean_name(props.get('adm3_name'))
key = (p_name, d_name, c_name)
if key in geojson_lookup:
print(f"Duplicate key in GeoJSON: {key}")
geojson_lookup[key] = props
print(f"GeoJSON lookup size: {len(geojson_lookup)}")
# Heuristics for Province Mapping (New -> Old)
PROV_MAPPING = {
"panama oeste": "panama",
"comarca naso tjer di": "bocas del toro" # Naso was part of Bocas
}
print("\nValidating CSV via Name Matching with Heuristics...")
matches = []
mismatches = []
for row in csv_data:
# CSV headers: nomb_prov, nomb_dist, nomb_corr
p_name = clean_name(row.get('nomb_prov'))
d_name = clean_name(row.get('nomb_dist'))
c_name = clean_name(row.get('nomb_corr'))
# Apply Province Mapping
search_p_name = PROV_MAPPING.get(p_name, p_name)
# 1. Try Exact Match (with mapped province)
key = (search_p_name, d_name, c_name)
if key in geojson_lookup:
matches.append(row)
row['geo_match_id'] = geojson_lookup[key].get('adm3_pcode')
continue
# 2. Relaxed District Match: Search in Province
# Find any entry in this province with the same corregimiento name
candidates = [k for k in geojson_lookup.keys() if k[0] == search_p_name and k[2] == c_name]
if len(candidates) == 1:
# Single match found in another district!
match_key = candidates[0]
matches.append(row)
row['geo_match_id'] = geojson_lookup[match_key].get('adm3_pcode')
# print(f"Relaxed Match: {c_name} (CSV Dist: {d_name}) -> (Geo Dist: {match_key[1]})")
continue
elif len(candidates) > 1:
# Ambiguous (same corregimiento name in multiple districts of same province - rare but possible)
# print(f"Ambiguous: {c_name} found in districts {[k[1] for k in candidates]}")
pass
# 3. Fuzzy/Typo Fixes (Specific hardcodes for common mismatch types if needed)
# E.g. "El Hato de San Juan de Dios" vs "El Hato de San Juan"
# We can perform a primitive "contains" check
best_candidate = None
# Get all corregimientos in this province
prov_corrs = [k for k in geojson_lookup.keys() if k[0] == search_p_name]
for k in prov_corrs:
geo_c = k[2]
# Check if one contains the other
if (c_name in geo_c or geo_c in c_name) and len(c_name) > 4 and len(geo_c) > 4:
# Check if starts matching
if c_name.startswith(geo_c) or geo_c.startswith(c_name):
best_candidate = k
break
if best_candidate:
matches.append(row)
row['geo_match_id'] = geojson_lookup[best_candidate].get('adm3_pcode')
# print(f"Fuzzy Match: '{c_name}' ~= '{best_candidate[2]}'")
continue
# No match
mismatches.append(row)
row['lookup_key'] = (search_p_name, d_name, c_name)
print(f"Total rows in CSV: {len(csv_data)}")
print(f"Matches found: {len(matches)}")
print(f"Mismatches found: {len(mismatches)}")
print(f"Match Rate: {len(matches)/len(csv_data)*100:.1f}%")
if mismatches:
print("\nMismatch Details (First 20):")
print(f"{'CSV Key (Prov, Dist, Corr)':<60} {'Closest Match?':<20}")
print("-" * 85)
for row in mismatches[:20]:
key = row['lookup_key']
print(f"{str(key):<60}")
# Analyze mismatches by Province
print("\nAnalyzing remaining mismatches by Province:")
prov_mismatches = {}
for row in mismatches:
p = row['nomb_prov']
prov_mismatches[p] = prov_mismatches.get(p, 0) + 1
for p, count in prov_mismatches.items():
print(f"{p}: {count}")
if __name__ == "__main__":
validate_censo_integration()