|
|
import csv |
|
|
import json |
|
|
import os |
|
|
import unicodedata |
|
|
|
|
|
|
|
|
BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) |
|
|
CSV_PATH = os.path.join(BASE_DIR, "data/censo/censo_panama_2023_unificado.csv") |
|
|
OUTPUT_PATH = os.path.join(BASE_DIR, "data/censo/censo_2023_enriched.csv") |
|
|
GEOJSON_PATH = os.path.join(BASE_DIR, "data/base/pan_admin3.geojson") |
|
|
|
|
|
def normalize_text(text): |
|
|
if not text: |
|
|
return "" |
|
|
text = unicodedata.normalize('NFKD', text).encode('ASCII', 'ignore').decode('ASCII') |
|
|
return text.lower().strip() |
|
|
|
|
|
def process_censo_data(): |
|
|
print(f"Loading CSV from {CSV_PATH}...") |
|
|
csv_data = [] |
|
|
headers = [] |
|
|
try: |
|
|
with open(CSV_PATH, mode='r', encoding='utf-8') as f: |
|
|
reader = csv.DictReader(f) |
|
|
headers = reader.fieldnames |
|
|
for row in reader: |
|
|
csv_data.append(row) |
|
|
except Exception as e: |
|
|
print(f"Error loading CSV: {e}") |
|
|
return |
|
|
|
|
|
print(f"Loading GeoJSON from {GEOJSON_PATH}...") |
|
|
try: |
|
|
with open(GEOJSON_PATH, 'r') as f: |
|
|
geojson = json.load(f) |
|
|
except Exception as e: |
|
|
print(f"Error loading GeoJSON: {e}") |
|
|
return |
|
|
|
|
|
|
|
|
geojson_lookup = {} |
|
|
|
|
|
def clean_name(name): |
|
|
return normalize_text(name) |
|
|
|
|
|
print("Building GeoJSON lookup table...") |
|
|
for feature in geojson['features']: |
|
|
props = feature.get('properties', {}) |
|
|
p_name = clean_name(props.get('adm1_name')) |
|
|
d_name = clean_name(props.get('adm2_name')) |
|
|
c_name = clean_name(props.get('adm3_name')) |
|
|
|
|
|
|
|
|
geojson_lookup[(p_name, d_name, c_name)] = props |
|
|
|
|
|
|
|
|
PROV_MAPPING = { |
|
|
"panama oeste": "panama", |
|
|
"comarca naso tjer di": "bocas del toro" |
|
|
} |
|
|
|
|
|
print("Enriching CSV data...") |
|
|
matches = 0 |
|
|
|
|
|
for row in csv_data: |
|
|
p_name = clean_name(row.get('nomb_prov')) |
|
|
d_name = clean_name(row.get('nomb_dist')) |
|
|
c_name = clean_name(row.get('nomb_corr')) |
|
|
|
|
|
search_p_name = PROV_MAPPING.get(p_name, p_name) |
|
|
|
|
|
|
|
|
key = (search_p_name, d_name, c_name) |
|
|
found_code = None |
|
|
|
|
|
if key in geojson_lookup: |
|
|
found_code = geojson_lookup[key].get('adm3_pcode') |
|
|
else: |
|
|
|
|
|
candidates = [k for k in geojson_lookup.keys() if k[0] == search_p_name and k[2] == c_name] |
|
|
if len(candidates) == 1: |
|
|
found_code = geojson_lookup[candidates[0]].get('adm3_pcode') |
|
|
else: |
|
|
|
|
|
prov_keys = [k for k in geojson_lookup.keys() if k[0] == search_p_name] |
|
|
for k in prov_keys: |
|
|
geo_c = k[2] |
|
|
|
|
|
if (c_name in geo_c or geo_c in c_name) and len(c_name) > 4: |
|
|
found_code = geojson_lookup[k].get('adm3_pcode') |
|
|
break |
|
|
|
|
|
|
|
|
if found_code: |
|
|
row['adm3_pcode'] = found_code |
|
|
matches += 1 |
|
|
else: |
|
|
row['adm3_pcode'] = "" |
|
|
|
|
|
print(f"Enrichment Complete. Matches: {matches}/{len(csv_data)} ({matches/len(csv_data)*100:.1f}%)") |
|
|
|
|
|
|
|
|
new_headers = ['adm3_pcode'] + headers |
|
|
print(f"Saving to {OUTPUT_PATH}...") |
|
|
try: |
|
|
with open(OUTPUT_PATH, mode='w', encoding='utf-8', newline='') as f: |
|
|
writer = csv.DictWriter(f, fieldnames=new_headers) |
|
|
writer.writeheader() |
|
|
writer.writerows(csv_data) |
|
|
print("File saved successfully.") |
|
|
except Exception as e: |
|
|
print(f"Error saving CSV: {e}") |
|
|
|
|
|
if __name__ == "__main__": |
|
|
process_censo_data() |
|
|
|