GeoQuery / backend /scripts /enrich_censo.py
GerardCB's picture
Deploy to Spaces (Final Clean)
4851501
import csv
import json
import os
import unicodedata
# Define paths
BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
CSV_PATH = os.path.join(BASE_DIR, "data/censo/censo_panama_2023_unificado.csv")
OUTPUT_PATH = os.path.join(BASE_DIR, "data/censo/censo_2023_enriched.csv")
GEOJSON_PATH = os.path.join(BASE_DIR, "data/base/pan_admin3.geojson")
def normalize_text(text):
if not text:
return ""
text = unicodedata.normalize('NFKD', text).encode('ASCII', 'ignore').decode('ASCII')
return text.lower().strip()
def process_censo_data():
print(f"Loading CSV from {CSV_PATH}...")
csv_data = []
headers = []
try:
with open(CSV_PATH, mode='r', encoding='utf-8') as f:
reader = csv.DictReader(f)
headers = reader.fieldnames
for row in reader:
csv_data.append(row)
except Exception as e:
print(f"Error loading CSV: {e}")
return
print(f"Loading GeoJSON from {GEOJSON_PATH}...")
try:
with open(GEOJSON_PATH, 'r') as f:
geojson = json.load(f)
except Exception as e:
print(f"Error loading GeoJSON: {e}")
return
# Build GeoJSON Lookup Map
geojson_lookup = {}
def clean_name(name):
return normalize_text(name)
print("Building GeoJSON lookup table...")
for feature in geojson['features']:
props = feature.get('properties', {})
p_name = clean_name(props.get('adm1_name'))
d_name = clean_name(props.get('adm2_name'))
c_name = clean_name(props.get('adm3_name'))
# Store properties keyed by (Prov, Dist, Corr)
geojson_lookup[(p_name, d_name, c_name)] = props
# Province Mapping Heuristics
PROV_MAPPING = {
"panama oeste": "panama",
"comarca naso tjer di": "bocas del toro"
}
print("Enriching CSV data...")
matches = 0
for row in csv_data:
p_name = clean_name(row.get('nomb_prov'))
d_name = clean_name(row.get('nomb_dist'))
c_name = clean_name(row.get('nomb_corr'))
search_p_name = PROV_MAPPING.get(p_name, p_name)
# Strategy 1: Exact Match
key = (search_p_name, d_name, c_name)
found_code = None
if key in geojson_lookup:
found_code = geojson_lookup[key].get('adm3_pcode')
else:
# Strategy 2: Relaxed District Search
candidates = [k for k in geojson_lookup.keys() if k[0] == search_p_name and k[2] == c_name]
if len(candidates) == 1:
found_code = geojson_lookup[candidates[0]].get('adm3_pcode')
else:
# Strategy 3: Fuzzy startsWith check
prov_keys = [k for k in geojson_lookup.keys() if k[0] == search_p_name]
for k in prov_keys:
geo_c = k[2]
# Check if names are "close enough" (contains or starts with)
if (c_name in geo_c or geo_c in c_name) and len(c_name) > 4:
found_code = geojson_lookup[k].get('adm3_pcode')
break
# Assign found code or empty string
if found_code:
row['adm3_pcode'] = found_code
matches += 1
else:
row['adm3_pcode'] = ""
print(f"Enrichment Complete. Matches: {matches}/{len(csv_data)} ({matches/len(csv_data)*100:.1f}%)")
# Save Enriched CSV
new_headers = ['adm3_pcode'] + headers
print(f"Saving to {OUTPUT_PATH}...")
try:
with open(OUTPUT_PATH, mode='w', encoding='utf-8', newline='') as f:
writer = csv.DictWriter(f, fieldnames=new_headers)
writer.writeheader()
writer.writerows(csv_data)
print("File saved successfully.")
except Exception as e:
print(f"Error saving CSV: {e}")
if __name__ == "__main__":
process_censo_data()