from fastapi import FastAPI import uvicorn from langchain_core.prompts import ChatPromptTemplate from pydantic import BaseModel, Field import pygsheets import json from langgraph.graph import StateGraph, END from typing import TypedDict, Annotated import operator from langchain_core.messages import SystemMessage, HumanMessage, AnyMessage from langchain_ollama import ChatOllama from langgraph.pregel import RetryPolicy import json import pandas as pd from google.oauth2 import service_account import os # GOOGLESHEETS_CREDENTIALS = json.dumps({"type": "service_account","project_id": "spendstracker-457904","private_key_id": "91bb3866226ab9f6734aa3fcc861faf17897d79d","private_key": "-----BEGIN PRIVATE KEY-----\nMIIEvQIBADANBgkqhkiG9w0BAQEFAASCBKcwggSjAgEAAoIBAQCy2v9irZ+/f+hF\nGUW/ikqnQUGMTapLulH3I6+vdDoQBtFa3tNc5BcK991Qbbk8NG2Iv6Ni7H/dY4jv\nWlBr//WTcThKSa23HzjztDxFtCbw5BsaNcXzt3HlAb/ldZNjZkt8CJdzLDpDuiLc\n9PbJEUxNZZYZcGrY7rhse1Z5QCn/gX1GlTCqMORPoEcrCmjklabjht4le9SoUaCA\n9qABNt8SXaTPw1l4HxxxtMEQE466COxWn89lEDh+DqDjU+oXVwXUFscDUSrKtDFt\nNNjIy0yMlKTtLwtMaHMK6DdT+hqAjs4xaOFF2YQFDPLuXAX1PaLEi7J/ircese1s\nlQAcxrThAgMBAAECggEAAIvnQl2t8oeZRdbHLgfl5P9vzBYzqkISPItVHtff0os2\nygyKwEqpIF00BaokKgEuAYA2Z1e6J0rF0RdpTf8s+KucpKt3dqsHgUUgdwUPJmbI\nB2s5JN6/YBgChli239Og8OrUzaMJtYnE0ACGnYQqQ8VG8WJ0zR9jnF8/GyU9S5zQ\nGlUO04y6ftetQCaFaVIxxy9B/pOtSuIZi4qoJjGuOebMAcYWusrAWID3uewhsZJW\nqCQUMLyADK5ujEAybo7dpohph6YYwV/6bmCl/8rfnBiXn8NsuFfE9m/B7W9TBXku\n0AB/EF8rnjI2kEMXgUogLUj1DBm4pkVEpce+vd2LzQKBgQD7UJg35cLFCcjylumP\n/5f7d4V25ryWpfHMYSs9zefkRDnVrmwumanHy+IOSLgOMUC5dIcJOjQPutaUnKN5\nH4oW2q4K6Pmp4vlDeTNp9bHaZMokuQPVhMZvsWWcpFe3v6nMHA3tvshiWKiPVApo\nHaaXeNETateCpwGBn+9Eu5VAtQKBgQC2MJbQ/85DndSnk3YZ+aEtp2fEJABJwfPl\nM6vRnKy5gHKMPk1BZLb+B4osr+OA6eZEzp6VL2kOpI+lIZ9E20uMWOOzZUk+du6i\nEpXimHSoNPS56pksfS3tWoHRuyM9nm+rPEV0LPWotyenEfhtGE1CYZwnef1aMNfW\nKhui8uH6/QKBgQCLvoMGAhLNseU1T8lMMxn10L48IY2YT2om9Zkv4sEhYvat5TFu\nsC+CU9K9kp4V9jlBZpR4Aw9T99a+CGO2RF1q2+qPUoERgI6OgGSgdOiSwhzNUrvZ\nDN2y2ffgpFnKaR8nyinMm5udZCNGn7qxrlsmOx43J9/yXJ8vzxkjJROXSQKBgGsW\n4G91DU7dZPQjT1YxTzZAolO+PZUdNjlRR/trtnNLNwmMTWjUxGNJF0TxFi7eTYXA\nVaKnPX9n5y9PNgkJRbz3OtBmBsl6qwYFGqkYp+l/RyJI7UQjSG2tt4UKFMrRaB4k\nzUZebv9+uQYRIA8wK6mLKnhh0jPDZfrywU/kqEQZAoGAbywkV/FjFeAhmqY/yLQo\nho3E5T5jEc45kDadsK2OHVnCRb0tWh6VTdr93XQ9z5mygub7Wu16jp/FnHtA7f+E\no743SU8sYHxPGbVavlM/iRzbzhaF8o5lTHqqzLqrPvMOBbUpP/cRXugoxt7m7o4v\nvcRikUgNG7O8ipxnN28L8dg=\n-----END PRIVATE KEY-----\n","client_email": "spendstracker@spendstracker-457904.iam.gserviceaccount.com","client_id": "117285083911847446748","auth_uri": "https://accounts.google.com/o/oauth2/auth","token_uri": "https://oauth2.googleapis.com/token","auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs","client_x509_cert_url": "https://www.googleapis.com/robot/v1/metadata/x509/spendstracker%40spendstracker-457904.iam.gserviceaccount.com","universe_domain": "googleapis.com"}) sheet_url = "https://docs.google.com/spreadsheets/d/1t4bOM4fULdaVsjDDnqEG1g8Zey6M00UuFhTZC03_4xo/edit?gid=0#gid=0" GOOGLESHEETS_CREDENTIALS = os.getenv("GOOGLESHEETS_CREDENTIALS") class TransactionParser(BaseModel): """This Pydantic class is used to parse the transaction message.""" amount: str = Field(description="The amount of the transaction in decimal format. If the transaction is a credit or a reversal, then include negative sign. DO not insert currency.", example="123.45") dr_or_cr: str = Field(description="Identify if the transaction was debit (spent) or credit (received). Strictly choose one of the values - Debit or Credit") receiver: str = Field(description="The recipient of the transaction. Identify the Merchant Name from the value.") category: str = Field(description="The category of the transaction. The category of the transaction is linked to the Merchant Name. Strictly choose from one the of values - Shopping,EMI,Education,Miscellaneous,Grocery,Utility,House Help,Travel,Transport") transaction_date: str = Field(description="The date of the transaction in yyyy-mm-dd format. If the year is not provided then use current year.") transaction_origin: str = Field(description="The origin of the transaction. Provide the card or account number as well.") class AgentState(TypedDict): messages: Annotated[list[AnyMessage], operator.add] class Agent: def __init__(self, model, system=""): self.system = system graph = StateGraph(AgentState) graph.add_node("classify_txn_type", self.classify_txn_type) graph.add_node("parse_message", self.parse_message) graph.add_node("write_message", self.write_message) graph.add_conditional_edges( "classify_txn_type", self.check_txn_and_decide, {True: "parse_message", False: END} ) graph.add_edge("parse_message", "write_message") graph.add_edge("write_message", END) graph.set_entry_point("classify_txn_type") self.graph = graph.compile() self.model = model def classify_txn_type(self, state: AgentState) -> AgentState: print("Classifying transaction type...") messages = state["messages"] if self.system: messages = [SystemMessage(content=self.system)] + messages message = self.model.invoke(messages) print("Classifying transaction type completed.") return {"messages": [message]} def parse_message(self, state: AgentState) -> AgentState: print("Parsing transaction message...") message = state["messages"][0]#.content system = """ You are a helpful assistant skilled at parsing transaction messages and providing structured responses. """ human = "Categorize the transaction message and provide the output in a structed format: {topic}" prompt = ChatPromptTemplate.from_messages([("system", system), ("human", human)]) chain = prompt | self.model.with_structured_output(TransactionParser) result = chain.invoke({"topic": message}) print("Parsing transaction message completed.") return {"messages": [result]} def write_message(self, state: AgentState) -> AgentState: print("Writing transaction message to Google Sheets...") result = state["messages"][-1] SCOPES = ('https://www.googleapis.com/auth/spreadsheets', 'https://www.googleapis.com/auth/drive') service_account_info = json.loads(GOOGLESHEETS_CREDENTIALS) my_credentials = service_account.Credentials.from_service_account_info(service_account_info, scopes=SCOPES) client = pygsheets.authorize(custom_credentials=my_credentials) # client = pygsheets.authorize(service_account_json=GOOGLESHEETS_CREDENTIALS) # client = pygsheets.authorize(service_account_file="serviceaccount.json") worksheet = client.open_by_url(sheet_url) wk = worksheet[0] # Get number of rows in the worksheet df = wk.get_as_df(start='A1', end='G999') nrows = df.shape[0] wk.update_value(f'A{nrows+2}', result.amount) wk.update_value(f'B{nrows+2}', result.dr_or_cr) wk.update_value(f'C{nrows+2}', result.receiver) wk.update_value(f'D{nrows+2}', result.category) wk.update_value(f'E{nrows+2}', result.transaction_date) wk.update_value(f'F{nrows+2}', result.transaction_origin) wk.update_value(f'G{nrows+2}', state["messages"][0]) print("Writing transaction message to Google Sheets completed.") return {"messages": ["Transaction Completed"]} def check_txn_and_decide(self, state: AgentState): try: result = json.loads(state['messages'][-1].content)['classification'] except json.JSONDecodeError: result = state['messages'][-1].content.strip() return result == "Transaction" prompt = """You are a smart assistant adept at classifying different messages. \ You will be penalized heavily for incorrect classification. \ Your task is to classify the message into one of the following categories: \ Transaction, OTP, Promotional, Scheduled. \ Output the classification in a structured format like below. \ {"classification": "OTP"} \ """ app = FastAPI() @app.get("/") def greetings(): return {"message": "Hello, this is a transaction bot. Please send a POST request to /write_message with the transaction data."} @app.post("/write_message") def write_message(data: dict): message = data['message'] model = ChatOllama(model="gemma3:4b", temperature=1) transaction_bot = Agent(model, system=prompt) result = transaction_bot.graph.invoke({"messages": [message]}) return {"message": "Transaction completed successfully"} @app.get("/ask") def ask(prompt: str): model = ChatOllama(model="gemma3:4b", temperature=1) return model.invoke(prompt) if __name__ == "__main__": uvicorn.run(app, host="0.0.0.0", port=7860, log_level="info")