File size: 5,746 Bytes
4851501 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 |
import csv
import json
import os
import unicodedata
# Define paths
BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
CSV_PATH = os.path.join(BASE_DIR, "data/censo/censo_panama_2023_unificado.csv")
GEOJSON_PATH = os.path.join(BASE_DIR, "data/base/pan_admin3.geojson")
def normalize_text(text):
if not text:
return ""
# Normalize unicode characters to ASCII (remove accents)
text = unicodedata.normalize('NFKD', text).encode('ASCII', 'ignore').decode('ASCII')
return text.lower().strip()
def validate_censo_integration():
print(f"Loading CSV from {CSV_PATH}...")
csv_data = []
try:
with open(CSV_PATH, mode='r', encoding='utf-8') as f:
reader = csv.DictReader(f)
for row in reader:
csv_data.append(row)
except Exception as e:
print(f"Error loading CSV: {e}")
return
print(f"Loading GeoJSON from {GEOJSON_PATH}...")
try:
with open(GEOJSON_PATH, 'r') as f:
geojson = json.load(f)
except Exception as e:
print(f"Error loading GeoJSON: {e}")
return
# Build GeoJSON Lookup Map: (norm_prov, norm_dist, norm_corr) -> properties
geojson_lookup = {}
# Helper to handle common name variations found in Panama data
# (can add more rules as we discover mismatches)
def clean_name(name):
n = normalize_text(name)
# remove "distrito de", "comarca", etc if needed
return n
print("Building GeoJSON lookup table...")
for feature in geojson['features']:
props = feature.get('properties', {})
p_name = clean_name(props.get('adm1_name'))
d_name = clean_name(props.get('adm2_name'))
c_name = clean_name(props.get('adm3_name'))
key = (p_name, d_name, c_name)
if key in geojson_lookup:
print(f"Duplicate key in GeoJSON: {key}")
geojson_lookup[key] = props
print(f"GeoJSON lookup size: {len(geojson_lookup)}")
# Heuristics for Province Mapping (New -> Old)
PROV_MAPPING = {
"panama oeste": "panama",
"comarca naso tjer di": "bocas del toro" # Naso was part of Bocas
}
print("\nValidating CSV via Name Matching with Heuristics...")
matches = []
mismatches = []
for row in csv_data:
# CSV headers: nomb_prov, nomb_dist, nomb_corr
p_name = clean_name(row.get('nomb_prov'))
d_name = clean_name(row.get('nomb_dist'))
c_name = clean_name(row.get('nomb_corr'))
# Apply Province Mapping
search_p_name = PROV_MAPPING.get(p_name, p_name)
# 1. Try Exact Match (with mapped province)
key = (search_p_name, d_name, c_name)
if key in geojson_lookup:
matches.append(row)
row['geo_match_id'] = geojson_lookup[key].get('adm3_pcode')
continue
# 2. Relaxed District Match: Search in Province
# Find any entry in this province with the same corregimiento name
candidates = [k for k in geojson_lookup.keys() if k[0] == search_p_name and k[2] == c_name]
if len(candidates) == 1:
# Single match found in another district!
match_key = candidates[0]
matches.append(row)
row['geo_match_id'] = geojson_lookup[match_key].get('adm3_pcode')
# print(f"Relaxed Match: {c_name} (CSV Dist: {d_name}) -> (Geo Dist: {match_key[1]})")
continue
elif len(candidates) > 1:
# Ambiguous (same corregimiento name in multiple districts of same province - rare but possible)
# print(f"Ambiguous: {c_name} found in districts {[k[1] for k in candidates]}")
pass
# 3. Fuzzy/Typo Fixes (Specific hardcodes for common mismatch types if needed)
# E.g. "El Hato de San Juan de Dios" vs "El Hato de San Juan"
# We can perform a primitive "contains" check
best_candidate = None
# Get all corregimientos in this province
prov_corrs = [k for k in geojson_lookup.keys() if k[0] == search_p_name]
for k in prov_corrs:
geo_c = k[2]
# Check if one contains the other
if (c_name in geo_c or geo_c in c_name) and len(c_name) > 4 and len(geo_c) > 4:
# Check if starts matching
if c_name.startswith(geo_c) or geo_c.startswith(c_name):
best_candidate = k
break
if best_candidate:
matches.append(row)
row['geo_match_id'] = geojson_lookup[best_candidate].get('adm3_pcode')
# print(f"Fuzzy Match: '{c_name}' ~= '{best_candidate[2]}'")
continue
# No match
mismatches.append(row)
row['lookup_key'] = (search_p_name, d_name, c_name)
print(f"Total rows in CSV: {len(csv_data)}")
print(f"Matches found: {len(matches)}")
print(f"Mismatches found: {len(mismatches)}")
print(f"Match Rate: {len(matches)/len(csv_data)*100:.1f}%")
if mismatches:
print("\nMismatch Details (First 20):")
print(f"{'CSV Key (Prov, Dist, Corr)':<60} {'Closest Match?':<20}")
print("-" * 85)
for row in mismatches[:20]:
key = row['lookup_key']
print(f"{str(key):<60}")
# Analyze mismatches by Province
print("\nAnalyzing remaining mismatches by Province:")
prov_mismatches = {}
for row in mismatches:
p = row['nomb_prov']
prov_mismatches[p] = prov_mismatches.get(p, 0) + 1
for p, count in prov_mismatches.items():
print(f"{p}: {count}")
if __name__ == "__main__":
validate_censo_integration()
|