Spaces:
Sleeping
Sleeping
| from time import time, sleep | |
| import datetime | |
| import dotenv | |
| import os | |
| import openai | |
| import json | |
| import pinecone | |
| from uuid import uuid4 | |
| from helper import open_file, save_file | |
| import re | |
| from langchain.memory import VectorStoreRetrieverMemory | |
| ## Read the environment variables | |
| dotenv.load_dotenv('.env') | |
| openai.api_key = os.getenv('OPENAI_API_KEY') | |
| embedding_model = os.getenv('EMBEDDING_ENGINE') | |
| convo_length = int(os.getenv('CONVO_LENGTH_TO_FETCH')) | |
| llm_model = os.getenv('LLM_MODEL') | |
| debug=False | |
| if os.getenv('DEBUG') == 'True': | |
| debug=True | |
| pinecone_api_key = os.getenv('PINECONE_API_KEY') | |
| pinecone_env = os.getenv('PINECONE_REGION') | |
| pinecone_index = os.getenv('PINECONE_INDEX') | |
| pinecone.init( | |
| api_key=pinecone_api_key, | |
| environment=pinecone_env | |
| ) | |
| vector_db = pinecone.Index(pinecone_index) | |
| file_path = os.getenv('GAME_DOCS_FOLDER') | |
| file_name = os.getenv('GAME_DOCS_FILE') | |
| game_index = os.getenv('GAME_ID_INDEX') | |
| def timestamp_to_datetime(unix_time): | |
| return datetime.datetime.fromtimestamp(unix_time).strftime("%A, %B %d, %Y at %I:%M%p %Z") | |
| def perform_embedding(content): | |
| content = content.encode(encoding='ASCII',errors='ignore').decode() | |
| response = openai.Embedding.create(model=embedding_model, input=content) | |
| vector = response['data'][0]['embedding'] | |
| return vector | |
| def load_conversation(results): | |
| result = list() | |
| for m in results['matches']: | |
| result.append({'time1': m['metadata']['timestring'], 'text': m['metadata']['text']}) | |
| ordered = sorted(result, key=lambda d: d['time1'], reverse = False) | |
| messages = [i['text'] for i in ordered] | |
| message_block = '\n'.join(messages).strip() | |
| return message_block | |
| def call_gpt(prompt): | |
| max_retry = 5 | |
| retry = 0 | |
| prompt = prompt.encode(encoding='ASCII',errors='ignore').decode() | |
| while True: | |
| try: | |
| response = openai.ChatCompletion.create( | |
| model=llm_model, | |
| temperature=0.9, | |
| messages=[ | |
| {"role": "user", "content": prompt} | |
| ] | |
| ) | |
| text = response.choices[0].message.content | |
| text = re.sub('[\r\n]+', '\n', text) | |
| text = re.sub('[\t ]+', ' ', text) | |
| filename = '%s_gpt3.txt' % time() | |
| if not os.path.exists('gpt3_logs'): | |
| os.makedirs('gpt3_logs') | |
| save_file('gpt3_logs/%s' % filename, prompt + '\n\n==========\n\n' + text) | |
| response.choices[0].message.content = text | |
| return response | |
| except Exception as oops: | |
| retry += 1 | |
| if retry >= max_retry: | |
| return "GPT3 error: %s" % oops | |
| print('Error communicating with OpenAI:', oops) | |
| sleep(1) | |
| def start_game(game_id, user_id, user_input): | |
| payload = list() | |
| # Get user input, save it, vectorize it and save to pinecone | |
| timestamp = time() | |
| timestring = timestamp_to_datetime(timestamp) | |
| unique_id = str(uuid4()) | |
| vector = perform_embedding(user_input) | |
| metadata = {'speaker': 'USER', 'user_id': user_id, 'game_id': game_id, 'timestring': timestring, 'text': user_input} | |
| payload.append((unique_id, vector, metadata)) | |
| # Search for relevant messages and return a response | |
| results=vector_db.query(vector=vector, top_k=convo_length, include_metadata=True, | |
| filter={ | |
| "$and": [{ "user_id": { "$eq": user_id } }, { "game_id": { "$eq": game_id } }] | |
| } | |
| ) | |
| conversation = load_conversation(results) | |
| # Populate prompt | |
| prompt_text = open_file(f"prompt_{game_id}_{user_id}.txt") | |
| prompt = open_file('prompt_response.txt').replace('<<PROMPT_VALUE>>', prompt_text).replace('<<CONVERSATION>>', conversation).replace('<<USER_MSG>>', user_input).replace('<<USER_VAL>>', user_id) | |
| # Generate response, vectorize | |
| llm_output_msg = call_gpt(prompt) | |
| llm_output = llm_output_msg.choices[0].message.content | |
| timestamp_op = time() | |
| timestring_op = timestamp_to_datetime(timestamp) | |
| vector_op = perform_embedding(llm_output) | |
| unique_id_op = str(uuid4) | |
| metadata_op = {'speaker': 'BOT', 'user_id': user_id, 'game_id': game_id, 'timestring': timestring, 'text': llm_output} | |
| payload.append((unique_id_op, vector_op, metadata_op)) | |
| # Upsert into the vector database | |
| vector_db.upsert(payload) | |
| return(llm_output) | |
| def get_game_details(game_id): | |
| file_data = open_file(f"{file_path}/{game_index}") | |
| tmp_json = json.loads(file_data) | |
| for json_item in tmp_json["game_details"]: | |
| if json_item["game_id"] == game_id: | |
| return json_item | |
| return "Not Found" | |
| def populate_prompt(game_id, splits): | |
| prompt_text = list() | |
| idlist = [] | |
| for j in range(int(splits)): | |
| idlist.append(game_id + "-" + str(j)) | |
| results=vector_db.fetch(ids=idlist) | |
| for ids in idlist: | |
| prompt_text.append(results['vectors'][ids]["metadata"]["text"]) | |
| whole_prompt = ' '.join(prompt_text).strip() | |
| return whole_prompt | |
| def initialize_game(game_id, user_id, user_input): | |
| game_details = get_game_details(game_id) | |
| whole_prompt = populate_prompt(game_id, game_details["splits"]) | |
| if debug: | |
| print(whole_prompt[:1000]) | |
| whole_prompt = whole_prompt.replace("<<USER_INPUT_MSG>>", user_input) | |
| if debug: | |
| print(whole_prompt[:1000]) | |
| llm_prompt_op = call_gpt(whole_prompt) | |
| #print(llm_prompt_op.choices[0]["message"]["content"]) | |
| fname="prompt_" + game_id + "_" + user_id + ".txt" | |
| save_file(fname, llm_prompt_op.choices[0]["message"]["content"]) | |
| return llm_prompt_op.choices[0]["message"]["content"] | |
| if __name__ == '__main__': | |
| print("main") |