Spaces:
Runtime error
Runtime error
| # -*- coding: utf-8 -*- | |
| # Copyright (c) 2024 OSU Natural Language Processing Group | |
| # | |
| # Licensed under the OpenRAIL-S License; | |
| # you may not use this file except in compliance with the License. | |
| # You may obtain a copy of the License at | |
| # | |
| # https://www.licenses.ai/ai-pubs-open-rails-vz1 | |
| # | |
| # Unless required by applicable law or agreed to in writing, software | |
| # distributed under the License is distributed on an "AS IS" BASIS, | |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
| # See the License for the specific language governing permissions and | |
| # limitations under the License. | |
| import os | |
| import time | |
| import backoff | |
| import openai | |
| from openai import ( | |
| APIConnectionError, | |
| APIError, | |
| RateLimitError, | |
| ) | |
| import requests | |
| from dotenv import load_dotenv | |
| import litellm | |
| import base64 | |
| EMPTY_API_KEY="Your API KEY Here" | |
| def load_openai_api_key(): | |
| load_dotenv() | |
| assert ( | |
| os.getenv("OPENAI_API_KEY") is not None and | |
| os.getenv("OPENAI_API_KEY") != EMPTY_API_KEY | |
| ), "must pass on the api_key or set OPENAI_API_KEY in the environment" | |
| return os.getenv("OPENAI_API_KEY") | |
| def load_gemini_api_key(): | |
| load_dotenv() | |
| assert ( | |
| os.getenv("GEMINI_API_KEY") is not None and | |
| os.getenv("GEMINI_API_KEY") != EMPTY_API_KEY | |
| ), "must pass on the api_key or set GEMINI_API_KEY in the environment" | |
| return os.getenv("GEMINI_API_KEY") | |
| def encode_image(image_path): | |
| with open(image_path, "rb") as image_file: | |
| return base64.b64encode(image_file.read()).decode('utf-8') | |
| def engine_factory(api_key=None, model=None, **kwargs): | |
| model = model.lower() | |
| if model in ["gpt-4-vision-preview", "gpt-4-turbo", "gpt-4o"]: | |
| if api_key and api_key != EMPTY_API_KEY: | |
| os.environ["OPENAI_API_KEY"] = api_key | |
| else: | |
| load_openai_api_key() | |
| return OpenAIEngine(model=model, **kwargs) | |
| elif model in ["gemini-1.5-pro-latest", "gemini-1.5-flash"]: | |
| if api_key and api_key != EMPTY_API_KEY: | |
| os.environ["GEMINI_API_KEY"] = api_key | |
| else: | |
| load_gemini_api_key() | |
| model=f"gemini/{model}" | |
| return GeminiEngine(model=model, **kwargs) | |
| elif model == "llava": | |
| model="llava" | |
| return OllamaEngine(model=model, **kwargs) | |
| raise Exception(f"Unsupported model: {model}, currently supported models: \ | |
| gpt-4-vision-preview, gpt-4-turbo, gemini-1.5-pro-latest, llava") | |
| class Engine: | |
| def __init__( | |
| self, | |
| stop=["\n\n"], | |
| rate_limit=-1, | |
| model=None, | |
| temperature=0, | |
| **kwargs, | |
| ) -> None: | |
| """ | |
| Base class to init an engine | |
| Args: | |
| api_key (_type_, optional): Auth key from OpenAI. Defaults to None. | |
| stop (list, optional): Tokens indicate stop of sequence. Defaults to ["\n"]. | |
| rate_limit (int, optional): Max number of requests per minute. Defaults to -1. | |
| model (_type_, optional): Model family. Defaults to None. | |
| """ | |
| self.time_slots = [0] | |
| self.stop = stop | |
| self.temperature = temperature | |
| self.model = model | |
| # convert rate limit to minmum request interval | |
| self.request_interval = 0 if rate_limit == -1 else 60.0 / rate_limit | |
| self.next_avil_time = [0] * len(self.time_slots) | |
| self.current_key_idx = 0 | |
| print(f"Initializing model {self.model}") | |
| def tokenize(self, input): | |
| return self.tokenizer(input) | |
| class OllamaEngine(Engine): | |
| def __init__(self, **kwargs) -> None: | |
| """ | |
| Init an Ollama engine | |
| To use Ollama, dowload and install Ollama from https://ollama.com/ | |
| After Ollama start, pull llava with command: ollama pull llava | |
| """ | |
| super().__init__(**kwargs) | |
| self.api_url = "http://localhost:11434/api/chat" | |
| def generate(self, prompt: list = None, max_new_tokens=4096, temperature=None, model=None, image_path=None, | |
| ouput_0=None, turn_number=0, **kwargs): | |
| self.current_key_idx = (self.current_key_idx + 1) % len(self.time_slots) | |
| start_time = time.time() | |
| if ( | |
| self.request_interval > 0 | |
| and start_time < self.next_avil_time[self.current_key_idx] | |
| ): | |
| wait_time = self.next_avil_time[self.current_key_idx] - start_time | |
| print(f"Wait {wait_time} for rate limitting") | |
| time.sleep(wait_time) | |
| prompt0, prompt1, prompt2 = prompt | |
| base64_image = encode_image(image_path) | |
| if turn_number == 0: | |
| # Assume one turn dialogue | |
| prompt_input = [ | |
| {"role": "assistant", "content": prompt0}, | |
| {"role": "user", "content": prompt1, "images": [f"{base64_image}"]}, | |
| ] | |
| elif turn_number == 1: | |
| prompt_input = [ | |
| {"role": "assistant", "content": prompt0}, | |
| {"role": "user", "content": prompt1, "images": [f"{base64_image}"]}, | |
| {"role": "assistant", "content": f"\n\n{ouput_0}"}, | |
| {"role": "user", "content": prompt2}, | |
| ] | |
| options = {"temperature": self.temperature, "num_predict": max_new_tokens} | |
| data = { | |
| "model": self.model, | |
| "messages": prompt_input, | |
| "options": options, | |
| "stream": False, | |
| } | |
| _request = { | |
| "url": f"{self.api_url}", | |
| "json": data, | |
| } | |
| response = requests.post(**_request) # type: ignore | |
| if response.status_code != 200: | |
| raise Exception(f"Ollama API Error: {response.status_code}, {response.text}") | |
| response_json = response.json() | |
| return response_json["message"]["content"] | |
| class GeminiEngine(Engine): | |
| def __init__(self, **kwargs) -> None: | |
| """ | |
| Init a Gemini engine | |
| To use this engine, please provide the GEMINI_API_KEY in the environment | |
| Supported Model Rate Limit | |
| gemini-1.5-pro-latest 2 queries per minute, 1000 queries per day | |
| """ | |
| super().__init__(**kwargs) | |
| def generate(self, prompt: list = None, max_new_tokens=4096, temperature=None, model=None, image_path=None, | |
| ouput_0=None, turn_number=0, **kwargs): | |
| self.current_key_idx = (self.current_key_idx + 1) % len(self.time_slots) | |
| start_time = time.time() | |
| if ( | |
| self.request_interval > 0 | |
| and start_time < self.next_avil_time[self.current_key_idx] | |
| ): | |
| wait_time = self.next_avil_time[self.current_key_idx] - start_time | |
| print(f"Wait {wait_time} for rate limitting") | |
| prompt0, prompt1, prompt2 = prompt | |
| litellm.set_verbose=True | |
| base64_image = encode_image(image_path) | |
| if turn_number == 0: | |
| # Assume one turn dialogue | |
| prompt_input = [ | |
| {"role": "system", "content": prompt0}, | |
| {"role": "user", | |
| "content": [{"type": "text", "text": prompt1}, {"type": "image_url", "image_url": {"url": image_path, | |
| "detail": "high"}, | |
| }]}, | |
| ] | |
| elif turn_number == 1: | |
| prompt_input = [ | |
| {"role": "system", "content": prompt0}, | |
| {"role": "user", | |
| "content": [{"type": "text", "text": prompt1}, {"type": "image_url", "image_url": {"url": image_path, | |
| "detail": "high"}, | |
| }]}, | |
| {"role": "assistant", "content": [{"type": "text", "text": f"\n\n{ouput_0}"}]}, | |
| {"role": "user", "content": [{"type": "text", "text": prompt2}]}, | |
| ] | |
| response = litellm.completion( | |
| model=model if model else self.model, | |
| messages=prompt_input, | |
| max_tokens=max_new_tokens if max_new_tokens else 4096, | |
| temperature=temperature if temperature else self.temperature, | |
| **kwargs, | |
| ) | |
| return [choice["message"]["content"] for choice in response.choices][0] | |
| class OpenAIEngine(Engine): | |
| def __init__(self, **kwargs) -> None: | |
| """ | |
| Init an OpenAI GPT/Codex engine | |
| To find your OpenAI API key, visit https://platform.openai.com/api-keys | |
| """ | |
| super().__init__(**kwargs) | |
| def generate(self, prompt: list = None, max_new_tokens=4096, temperature=None, model=None, image_path=None, | |
| ouput_0=None, turn_number=0, **kwargs): | |
| self.current_key_idx = (self.current_key_idx + 1) % len(self.time_slots) | |
| start_time = time.time() | |
| if ( | |
| self.request_interval > 0 | |
| and start_time < self.next_avil_time[self.current_key_idx] | |
| ): | |
| time.sleep(self.next_avil_time[self.current_key_idx] - start_time) | |
| prompt0, prompt1, prompt2 = prompt | |
| # litellm.set_verbose=True | |
| base64_image = encode_image(image_path) | |
| if turn_number == 0: | |
| # Assume one turn dialogue | |
| prompt_input = [ | |
| {"role": "system", "content": [{"type": "text", "text": prompt0}]}, | |
| {"role": "user", | |
| "content": [{"type": "text", "text": prompt1}, {"type": "image_url", "image_url": {"url": | |
| f"data:image/jpeg;base64,{base64_image}", | |
| "detail": "high"}, | |
| }]}, | |
| ] | |
| elif turn_number == 1: | |
| prompt_input = [ | |
| {"role": "system", "content": [{"type": "text", "text": prompt0}]}, | |
| {"role": "user", | |
| "content": [{"type": "text", "text": prompt1}, {"type": "image_url", "image_url": {"url": | |
| f"data:image/jpeg;base64,{base64_image}", | |
| "detail": "high"}, }]}, | |
| {"role": "assistant", "content": [{"type": "text", "text": f"\n\n{ouput_0}"}]}, | |
| {"role": "user", "content": [{"type": "text", "text": prompt2}]}, | |
| ] | |
| response = litellm.completion( | |
| model=model if model else self.model, | |
| messages=prompt_input, | |
| max_tokens=max_new_tokens if max_new_tokens else 4096, | |
| temperature=temperature if temperature else self.temperature, | |
| **kwargs, | |
| ) | |
| return [choice["message"]["content"] for choice in response.choices][0] | |
| class OpenaiEngine_MindAct(Engine): | |
| def __init__(self, **kwargs) -> None: | |
| """Init an OpenAI GPT/Codex engine | |
| Args: | |
| api_key (_type_, optional): Auth key from OpenAI. Defaults to None. | |
| stop (list, optional): Tokens indicate stop of sequence. Defaults to ["\n"]. | |
| rate_limit (int, optional): Max number of requests per minute. Defaults to -1. | |
| model (_type_, optional): Model family. Defaults to None. | |
| """ | |
| super().__init__(**kwargs) | |
| # | |
| def generate(self, prompt, max_new_tokens=50, temperature=0, model=None, **kwargs): | |
| self.current_key_idx = (self.current_key_idx + 1) % len(self.time_slots) | |
| start_time = time.time() | |
| if ( | |
| self.request_interval > 0 | |
| and start_time < self.next_avil_time[self.current_key_idx] | |
| ): | |
| time.sleep(self.next_avil_time[self.current_key_idx] - start_time) | |
| if isinstance(prompt, str): | |
| # Assume one turn dialogue | |
| prompt = [ | |
| {"role": "user", "content": prompt}, | |
| ] | |
| response = litellm.completion( | |
| model=model if model else self.model, | |
| messages=prompt, | |
| max_tokens=max_new_tokens, | |
| temperature=temperature, | |
| **kwargs, | |
| ) | |
| if self.request_interval > 0: | |
| self.next_avil_time[self.current_key_idx] = ( | |
| max(start_time, self.next_avil_time[self.current_key_idx]) | |
| + self.request_interval | |
| ) | |
| return [choice["message"]["content"] for choice in response["choices"]] | |