import json import os import time from datetime import datetime, timedelta from bson import ObjectId from dotenv import load_dotenv from fastapi import FastAPI, HTTPException from openai import OpenAI from pydantic import BaseModel from pymongo import MongoClient from pymongo.errors import PyMongoError # =========================== # ENV SETUP # =========================== load_dotenv() MONGO_URI = os.getenv("MONGO_URI") OPENAI_API_KEY = os.getenv("OPENAI_API_KEY") if not MONGO_URI: raise RuntimeError("❌ MONGO_URI missing") if not OPENAI_API_KEY: raise RuntimeError("❌ OPENAI_API_KEY missing") mongo_client = MongoClient(MONGO_URI) default_db = mongo_client.get_default_database() budget_collection = default_db["budgets"] transaction_collection = default_db["transactions"] currencies_collection = default_db["currencies"] creditcards_collection = default_db["creditcards"] api_logs_collection = default_db["api_logs"] openai = OpenAI(api_key=OPENAI_API_KEY) app = FastAPI(title="Financial Health Score Service") # =========================== # MODELS # =========================== class ScoreRequest(BaseModel): userId: str # =========================== # HELPERS # =========================== def ist_now(): return datetime.now().strftime("%d-%m-%Y %H:%M:%S:IST") def log_api_event( *, status: str, response_time: float, user_id: str | None = None, error_message: str | None = None ): payload = { "name": "Financial Health Score", "status": status, "date": ist_now(), "response_time": round(response_time, 3), } if user_id: payload["user_id"] = user_id if error_message: payload["error_message"] = error_message try: api_logs_collection.insert_one(payload) except Exception: pass # logging must never break API def safe_number(v): try: return float(v) except: return None def normalize_budgets(budgets): out = [] for b in budgets: heads = [] for h in b.get("headCategories", []) or []: heads.append({ "spendLimitType": h.get("spendLimitType"), "spendAmount": safe_number(h.get("spendAmount")), "maxAmount": safe_number(h.get("maxAmount")), "remainingAmount": safe_number(h.get("remainingAmount")), "notifications": h.get("notifications") or [] }) out.append({ "name": b.get("name"), "status": b.get("status"), "period": b.get("period"), "maxAmount": safe_number(b.get("maxAmount")), "spendAmount": safe_number(b.get("spendAmount")), "remainingAmount": safe_number(b.get("remainingAmount")), "rollover": b.get("rollover"), "notifications": b.get("notifications") or [], "headCategories": heads, }) return out def normalize_transactions(txns): out = [] for txn in txns: date_val = txn.get("date") date_str = date_val.date().isoformat() if isinstance(date_val, datetime) else None currency_code = None currency_id = txn.get("currency") try: doc = currencies_collection.find_one({"_id": ObjectId(currency_id)}) if doc: currency_code = doc.get("code") or doc.get("currency") except: pass out.append({ "type": txn.get("type"), "amount": safe_number(txn.get("amount")), "currency": currency_code, "date": date_str, }) return out def normalize_creditcards(cards): out = [] for c in cards: try: start_date = c.get("billing_cycle", {}).get("start_date") end_date = c.get("billing_cycle", {}).get("end_date") due_date = c.get("due_date") start_date = start_date.date().isoformat() if isinstance(start_date, datetime) else None end_date = end_date.date().isoformat() if isinstance(end_date, datetime) else None due_date = due_date.date().isoformat() if isinstance(due_date, datetime) else None except: start_date = end_date = due_date = None out.append({ "card_name": c.get("card_name"), "credit_limit": safe_number(c.get("credit_limit")), "current_balance": safe_number(c.get("current_balance")), "total_due_amount": safe_number(c.get("total_due_amount")), "minimum_due": safe_number(c.get("minimum_due")), "billing_cycle_start": start_date, "billing_cycle_end": end_date, "due_date": due_date, }) return out def build_prompt(budgets, transactions, creditcards): return f""" You are a financial health scoring engine. Compute a score (0–100) considering: - Budget usage - Spending behavior - Credit card utilization and dues Return JSON only. Budgets: {json.dumps(budgets, indent=2)} Transactions: {json.dumps(transactions, indent=2)} Credit Cards: {json.dumps(creditcards, indent=2)} """ # =========================== # HEALTH ENDPOINT # =========================== @app.get("/health") def health(): report = { "service": "Financial Health Score Service", "status": "healthy", "checks": {} } try: mongo_client.admin.command("ping") report["checks"]["mongodb"] = "ok" except PyMongoError as e: report["checks"]["mongodb"] = f"fail: {e}" report["status"] = "degraded" try: openai.models.list() report["checks"]["openai"] = "ok" except Exception as e: report["checks"]["openai"] = f"fail: {e}" report["status"] = "degraded" report["timestamp"] = datetime.utcnow().isoformat() return report # =========================== # FINANCIAL SCORE # =========================== @app.post("/financial-score") def financial_score(payload: ScoreRequest): start_time = time.time() try: user_oid = ObjectId(payload.userId) except: log_api_event( status="fail", response_time=time.time() - start_time, user_id=payload.userId, error_message="Invalid userId" ) raise HTTPException(status_code=400, detail="Invalid userId") try: budgets = normalize_budgets( list(budget_collection.find({"createdBy": user_oid})) ) txns = normalize_transactions( list( transaction_collection.find( {"user": user_oid, "date": {"$gte": datetime.utcnow() - timedelta(days=30)}} ).sort("date", -1).limit(200) ) ) cards = normalize_creditcards( list(creditcards_collection.find({"user_id": user_oid})) ) if not budgets and not txns and not cards: log_api_event( status="success", response_time=time.time() - start_time, user_id=payload.userId ) return { "status": "success", "message": "No financial data found", "userId": payload.userId, "score": 0, "explanation": "No financial data available." } response = openai.chat.completions.create( model="gpt-4o-mini", temperature=0.6, response_format={ "type": "json_schema", "json_schema": { "name": "financial_score", "schema": { "type": "object", "properties": { "score": {"type": "number"}, "explanation": {"type": "string"} }, "required": ["score", "explanation"] } } }, messages=[ {"role": "system", "content": "You calculate financial health."}, {"role": "user", "content": build_prompt(budgets, txns, cards)} ] ) parsed = json.loads(response.choices[0].message.content) score = max(0, min(100, int(float(parsed.get("score", 0))))) log_api_event( status="success", response_time=time.time() - start_time, user_id=payload.userId ) return { "status": "success", "message": "Financial health score calculated successfully", "userId": payload.userId, "score": score, "explanation": parsed.get("explanation") } except Exception as exc: log_api_event( status="fail", response_time=time.time() - start_time, user_id=payload.userId, error_message=str(exc) ) raise HTTPException(status_code=502, detail=str(exc)) # # main.py # """ # Financial Health Score Service (FastAPI) # FINAL + CREDIT CARD VERSION: # - Budget score # - Transaction score # - Credit card utilization & repayment score # - GPT-4o-mini with JSON schema # - Universal JSON parsing # """ # import json # import os # from datetime import datetime, timedelta # from bson import ObjectId # from dotenv import load_dotenv # from fastapi import FastAPI, HTTPException # from openai import OpenAI # from pydantic import BaseModel # from pymongo import MongoClient # # =========================== # # ENV SETUP # # =========================== # load_dotenv() # MONGO_URI = os.getenv("MONGO_URI") # OPENAI_API_KEY = os.getenv("OPENAI_API_KEY") # if not MONGO_URI: # raise RuntimeError("❌ MONGO_URI missing") # if not OPENAI_API_KEY: # raise RuntimeError("❌ OPENAI_API_KEY missing") # mongo_client = MongoClient(MONGO_URI) # default_db = mongo_client.get_default_database() # budget_collection = default_db["budgets"] # transaction_collection = default_db["transactions"] # currencies_collection = default_db["currencies"] # creditcards_collection = default_db["creditcards"] # openai = OpenAI(api_key=OPENAI_API_KEY) # app = FastAPI(title="Financial Health Score Service") # # =========================== # # MODELS # # =========================== # class ScoreRequest(BaseModel): # userId: str # # =========================== # # HELPERS # # =========================== # def safe_number(v): # try: # return float(v) # except: # return None # def normalize_budgets(budgets): # out = [] # for b in budgets: # heads = [] # for h in b.get("headCategories", []) or []: # heads.append({ # "spendLimitType": h.get("spendLimitType"), # "spendAmount": safe_number(h.get("spendAmount")), # "maxAmount": safe_number(h.get("maxAmount")), # "remainingAmount": safe_number(h.get("remainingAmount")), # "notifications": h.get("notifications") or [] # }) # out.append({ # "name": b.get("name"), # "status": b.get("status"), # "period": b.get("period"), # "maxAmount": safe_number(b.get("maxAmount")), # "spendAmount": safe_number(b.get("spendAmount")), # "remainingAmount": safe_number(b.get("remainingAmount")), # "rollover": b.get("rollover"), # "notifications": b.get("notifications") or [], # "headCategories": heads, # }) # return out # def normalize_transactions(txns): # out = [] # for txn in txns: # date_val = txn.get("date") # date_str = date_val.date().isoformat() if isinstance(date_val, datetime) else None # currency_code = None # currency_id = txn.get("currency") # try: # if isinstance(currency_id, ObjectId): # doc = currencies_collection.find_one({"_id": currency_id}) # else: # try: # doc = currencies_collection.find_one({"_id": ObjectId(currency_id)}) # except: # doc = None # if doc: # currency_code = doc.get("code") or doc.get("currency") # except: # currency_code = None # out.append({ # "type": txn.get("type"), # "amount": safe_number(txn.get("amount")), # "currency": currency_code, # "date": date_str, # }) # return out # def normalize_creditcards(cards): # out = [] # for c in cards: # try: # start_date = c.get("billing_cycle", {}).get("start_date") # end_date = c.get("billing_cycle", {}).get("end_date") # start_date = start_date.date().isoformat() if isinstance(start_date, datetime) else None # end_date = end_date.date().isoformat() if isinstance(end_date, datetime) else None # due_date = c.get("due_date") # due_date = due_date.date().isoformat() if isinstance(due_date, datetime) else None # except: # start_date = end_date = due_date = None # out.append({ # "card_name": c.get("card_name"), # "credit_limit": safe_number(c.get("credit_limit")), # "current_balance": safe_number(c.get("current_balance")), # "total_due_amount": safe_number(c.get("total_due_amount")), # "minimum_due": safe_number(c.get("minimum_due")), # "billing_cycle_start": start_date, # "billing_cycle_end": end_date, # "due_date": due_date, # }) # return out # def build_prompt(budgets, transactions, creditcards): # return f""" # You are a financial health scoring engine. # Compute a score (0–100) considering: # 1. Budgets usage # 2. Spending patterns from recent transactions (last 30 days) # 3. Credit card health: # - utilization ratio (balance / credit_limit) # - due amount status # - repayment behavior # - risk from minimum-due payments # - future risk if due_date is near # Scoring rules: # - A lower credit utilization (<30%) increases score. # - High utilization (>70%) lowers score sharply. # - Overdue or large due amounts reduce score. # - Good budget management increases score. # - Overspending reduces score. # - Make explanation short (2 sentences max). # - MUST follow JSON schema. # Budgets: # {json.dumps(budgets, indent=2)} # Transactions: # {json.dumps(transactions, indent=2)} # CreditCards: # {json.dumps(creditcards, indent=2)} # """ # # =========================== # # ROUTES # # =========================== # @app.post("/financial-score") # def financial_score(payload: ScoreRequest): # # Validate ID # try: # user_id = ObjectId(payload.userId) # except: # raise HTTPException(status_code=400, detail="Invalid userId") # # Fetch budgets # budgets_raw = list(budget_collection.find({"createdBy": user_id})) # budgets = normalize_budgets(budgets_raw) # # Fetch last 30 days transactions # thirty_days_ago = datetime.utcnow() - timedelta(days=30) # txns_raw = list( # transaction_collection.find( # {"user": user_id, "date": {"$gte": thirty_days_ago}} # ).sort("date", -1).limit(200) # ) # transactions = normalize_transactions(txns_raw) # # Fetch credit cards # cards_raw = list(creditcards_collection.find({"user_id": user_id})) # creditcards = normalize_creditcards(cards_raw) # # No data # if not budgets and not transactions and not creditcards: # return { # "userId": payload.userId, # "score": 0, # "explanation": "No financial data found." # } # prompt = build_prompt(budgets, transactions, creditcards) # try: # response = openai.chat.completions.create( # model="gpt-4o-mini", # temperature=0.6, # response_format={ # "type": "json_schema", # "json_schema": { # "name": "financial_score", # "schema": { # "type": "object", # "properties": { # "score": {"type": "number"}, # "explanation": {"type": "string"} # }, # "required": ["score", "explanation"], # "additionalProperties": False # } # } # }, # messages=[ # {"role": "system", "content": "You calculate financial health."}, # {"role": "user", "content": prompt} # ] # ) # except Exception as exc: # raise HTTPException(status_code=502, detail=f"OpenAI request failed: {exc}") # # UNIVERSAL JSON PARSING # try: # content = response.choices[0].message.content # if isinstance(content, list): # raw_json = content[0].get("text", content[0]) # else: # raw_json = content # if not isinstance(raw_json, str): # raw_json = str(raw_json) # parsed = json.loads(raw_json) # except Exception as e: # raise HTTPException( # status_code=502, # detail=f"Could not parse JSON output: {e}" # ) # score_val = parsed.get("score", 0) # try: # score_val = int(float(score_val)) # except: # score_val = 0 # score_val = max(0, min(100, score_val)) # return { # "userId": payload.userId, # "score": score_val, # "explanation": parsed.get("explanation") # }