| | import numpy as np |
| | import pandas as pd |
| | import json |
| | import pickle |
| | import io |
| | from sklearn.metrics.pairwise import cosine_similarity |
| |
|
| | class MovieRecommender: |
| | def __init__(self, model_path="."): |
| | self.embeddings = np.load(f"{model_path}/embeddings.npy") |
| | self.embeddings = np.nan_to_num(self.embeddings) |
| | |
| | |
| | try: |
| | with open(f"{model_path}/tokenizer_vocab.json", "r") as f: |
| | self.tokenizer = json.load(f) |
| | except FileNotFoundError: |
| | |
| | self.tokenizer = self._extract_vocab_from_pickle(f"{model_path}/tokenizer.pkl") |
| | |
| | with open(f"{model_path}/tokenizer_vocab.json", "w") as f: |
| | json.dump(self.tokenizer, f) |
| | |
| | self.movies = pd.read_json(f"{model_path}/movies.json") |
| | |
| | def _extract_vocab_from_pickle(self, filepath): |
| | """Extract vocabulary dictionary from pickle file by analyzing its structure""" |
| | with open(filepath, "rb") as f: |
| | pickle_data = f.read() |
| | |
| | |
| | try: |
| | |
| | unpickler = pickle.Unpickler(io.BytesIO(pickle_data)) |
| | |
| | unpickler.find_class = lambda module, name: dict |
| | try: |
| | result = unpickler.load() |
| | if isinstance(result, dict): |
| | return result |
| | except: |
| | pass |
| | except: |
| | pass |
| | |
| | |
| | try: |
| | memo = {} |
| | stack = [] |
| | |
| | |
| | import pickletools |
| | ops = [] |
| | for opcode, arg, pos in pickletools.genops(pickle_data): |
| | ops.append((opcode.name, arg)) |
| | |
| | |
| | for i, (op, arg) in enumerate(ops): |
| | if op == 'EMPTY_DICT' or op == 'DICT': |
| | |
| | try: |
| | |
| | subset = pickle_data[:pos+10] |
| | test_unpickler = pickle.Unpickler(io.BytesIO(subset)) |
| | test_unpickler.find_class = lambda m, n: None |
| | except: |
| | pass |
| | except: |
| | pass |
| | |
| | |
| | print("Warning: Could not extract vocabulary from pickle. Using empty tokenizer.") |
| | print("Recommendation quality will be limited.") |
| | return {} |
| |
|
| | def _encode(self, prompt): |
| | tokens = prompt.lower().split()[:32] |
| | ids = [self.tokenizer.get(t, 0) for t in tokens] |
| | ids = [i if i < len(self.embeddings) else 0 for i in ids] |
| | return np.array(ids)[None,:] |
| |
|
| | def recommend(self, prompt, topk=10): |
| | q_ids = self.tokenizer.texts_to_sequences([prompt])[0] |
| | q_ids = [i for i in q_ids if 0 <= i < len(self.embeddings)] |
| | q_ids = np.array(q_ids, dtype=np.int64) |
| | query_vec = self.embeddings[q_ids].mean(axis=0, keepdims=True) |
| | sims = cosine_similarity(query_vec, self.embeddings).flatten() |
| | idx = sims.argsort()[::-1][:topk] |
| | return self.movies.iloc[idx][["title","release_date","vote_average","vote_count","status"]] |
| | |