import os import numpy as np import pandas as pd from dotenv import load_dotenv from langdetect import detect from loguru import logger from sklearn.model_selection import train_test_split from time import sleep from transformers import BertModel, AutoTokenizer from tqdm import tqdm import torch from config import DEVICE from src.utils.text_functions import clean_text, detect_language from src.utils import ( get_sentiment, detect_language, ) from src.regression.PL import ( get_bert_embedding, get_concat_embedding, ) from src.utils.s3 import read_csv, save_csv load_dotenv() class RegressionDataset: def __init__( self, s3: bool = False, bucket: str = "lebesgue-data-science", folder: str = os.getenv("GLOBAL_PATH_TO_REPO") + "/data", s3_folder: str = "transformers/data", ): self.s3 = s3 self.bucket = bucket if self.s3: self.folder = s3_folder else: self.folder = folder self.original_path = f"{self.folder}/original.csv" self.untrimmed_path = f"{self.folder}/untrimmed.csv" self.normalized_path = f"{self.folder}/normalized.csv" self.trimmed_path = f"{self.folder}/trimmed.csv" self.train_path = f"{self.folder}/train.csv" self.val_path = f"{self.folder}/val.csv" self.test_path = f"{self.folder}/test.csv" self.text_types = ["primary", "title", "description"] self.col_func_dict = { "number": len, "len": lambda texts: np.mean([len(text) for text in texts]), } @property def original(self) -> pd.DataFrame: df = read_csv(path=self.original_path, s3=self.s3, s3_args={"bucket": self.bucket}) return df @property def untrimmed(self) -> pd.DataFrame: df = read_csv(path=self.untrimmed_path, s3=self.s3, s3_args={"bucket": self.bucket}) return df @property def normalized(self) -> pd.DataFrame: df = read_csv(path=self.normalized_path, s3=self.s3, s3_args={"bucket": self.bucket}) return df @property def trimmed(self) -> pd.DataFrame: df = read_csv(path=self.trimmed_path, s3=self.s3, s3_args={"bucket": self.bucket}) return df @property def train(self) -> pd.DataFrame: df = read_csv(path=self.train_path, s3=self.s3, s3_args={"bucket": self.bucket}) return df @property def val(self) -> pd.DataFrame: df = read_csv(path=self.val_path, s3=self.s3, s3_args={"bucket": self.bucket}) return df @property def test(self) -> pd.DataFrame: df = read_csv(path=self.test_path, s3=self.s3, s3_args={"bucket": self.bucket}) return df def normalize_untrimmed(self, group_cols: list[str] = ["text", "target", "shop_id"]) -> pd.DataFrame: df = self.untrimmed grouped = df.groupby(group_cols) filters_df = grouped.agg({"impr": "sum", "spend": "sum"}).reset_index() ctr = grouped.apply(lambda df: df.link_clicks.sum() / df.impr.sum()) ctr_df = pd.DataFrame(ctr, columns=["ctr"]).reset_index() normalised = filters_df.merge(ctr_df, on=group_cols) merged = df.merge(normalised, on=group_cols, validate="m:1", suffixes=["___", None]) merged.drop(list([col for col in merged.columns if "___" in col]), inplace=True, axis=1) final = merged.drop_duplicates(group_cols) save_csv( df=final, path=self.normalized_path, s3=self.s3, s3_args={"bucket": self.bucket}, ) return df def expand_untrimmed(self, update_existing_columns: bool = False) -> pd.DataFrame: df = self.untrimmed # normalise target by adset # df["ctr_norm"] = ( # df.groupby(["shop_id", "adset_id"]) # .ctr.transform(lambda x: (x - x.mean()) / x.std()) # .count() # ) new_col_func_dict = self.col_func_dict if not update_existing_columns: new_col_func_dict = { col: fun for col, fun in new_col_func_dict.items() if "primary_" + col not in df.columns } # get extra columns for col, func in new_col_func_dict.items(): logger.debug(col) for text_type in self.text_types: df[f"{text_type}_{col}"] = df[text_type].apply(func) df["has_text"] = df.apply( lambda df: bool(df.primary_number + df.title_number + df.description_number), axis=1, ) # text columns df = df.apply(_get_text, axis=1) df = df.apply(_get_concatinated_text, axis=1) df["language"] = df.text.apply(detect_language) df = df[df.language == "en"] df = df[df.ctr.notna()] save_csv(df=df, path=self.untrimmed_path, s3=self.s3, s3_args={"bucket": self.bucket}) return df def trim(self, min_impr: int = 900, min_spend: float = 90) -> pd.DataFrame: df = self.normalized df = df[(df.impr >= min_impr) & (df.spend >= min_spend)] df = df[df.target == "acquisition"] df = df[df.aov.notna()] df = df[df.has_text == True] save_csv(df=df, path=self.trimmed_path, s3=self.s3, s3_args={"bucket": self.bucket}) return df def expand_trimmed( self, bert: BertModel = None, tokenizer: AutoTokenizer = None, add_bert_embeddings_bool: bool = False ) -> pd.DataFrame: df = self.trimmed # clean text for col in ["text", "concat_text"]: df[f"{col}_clean"] = df[col].apply(clean_text) df["text_clean_sentiment"] = df.text_clean.apply(get_sentiment) if add_bert_embeddings_bool: if tokenizer is None or bert is None: raise ValueError("tokenizer or bert is None") layer_dict = {"bert": bert, "tokenizer": tokenizer} df = add_bert_embeddings(df=df, save_path=self.trimmed_path, layer_dict=layer_dict) df = df.apply(add_concat_embeddings, axis=1) save_csv(df=df, path=self.trimmed_path, s3=self.s3, s3_args={"bucket": self.bucket}) return df def split_into_train_and_test( self, ) -> tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]: df = self.trimmed train, test = train_test_split(df, train_size=0.9, random_state=42) train, val = train_test_split(train, train_size=0.85, random_state=42) save_csv(df=train, path=self.train_path, s3=self.s3, s3_args={"bucket": self.bucket}) save_csv(df=val, path=self.val_path, s3=self.s3, s3_args={"bucket": self.bucket}) save_csv(df=test, path=self.test_path, s3=self.s3, s3_args={"bucket": self.bucket}) return train, val, test def expand_normalise_trim_split( self, update_existing_columns: bool = False, group_cols=["text", "target", "shop_id"], ) -> tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]: self.expand_untrimmed(update_existing_columns=update_existing_columns) self.normalize_untrimmed(group_cols=group_cols) self.trim() self.expand_trimmed() train, val, test = self.split_into_train_and_test() return train, val, test def _get_text(ad: pd.Series) -> pd.Series: if ad.primary_number > 0: ad["text"] = ad.primary[0] elif ad.description_number > 0: ad["text"] = ad.description[0] elif ad.title_number > 0: ad["text"] = ad.title[0] else: ad["text"] = None return ad def _get_concatinated_text(ad: pd.Series) -> pd.Series: concat_text = "" if ad.primary_number > 0: concat_text = concat_text + ad.primary[0] if ad.description_number > 0: concat_text = concat_text + ad.description[0] if ad.title_number > 0: concat_text = concat_text + ad.title[0] ad["concat_text"] = concat_text return ad regression_dataset = RegressionDataset() regression_dataset_s3 = RegressionDataset(s3=True) def add_bert_embeddings(df: pd.DataFrame, save_path: str, layer_dict: dict = {}, device=DEVICE) -> pd.DataFrame: if device == torch.device("cuda"): df["my_bert_cls_embedding"] = df.text_clean.apply( lambda text: get_bert_embedding(text=text, cls=True, layer_dict=layer_dict) ) df["my_bert_mean_embedding"] = df.text_clean.apply( lambda text: get_bert_embedding(text=text, cls=False, layer_dict=layer_dict) ) return df if "my_bert_cls_embedding" not in df.columns: df["my_bert_cls_embedding"] = None if "my_bert_mean_embedding" not in df.columns: df["my_bert_mean_embedding"] = None counter = 0 df["my_bert_cls_embedding"] = df["my_bert_cls_embedding"].astype(object) df["my_bert_mean_embedding"] = df["my_bert_mean_embedding"].astype(object) for i in tqdm(range(len(df))): if df.at[i, "my_bert_cls_embedding"] is not None: df.at[i, "my_bert_cls_embedding"] = get_bert_embedding( text=df.at[i, "text_clean"], cls=False, layer_dict=layer_dict ) counter = counter + 1 sleep(0.5) if df.at[i, "my_bert_mean_embedding"] is not None: df.at[i, "my_bert_mean_embedding"] = get_bert_embedding( text=df.at[i, "text_clean"], cls=True, layer_dict=layer_dict ) counter = counter + 1 sleep(0.5) if counter % 50 in [0, 1]: df.to_csv(save_path, index=False) df.to_csv(save_path, index=False) return df def add_concat_embeddings(series: pd.DataFrame) -> pd.Series: other_features = {"aov": series["aov"]} | series["text_clean_sentiment"] for type in ["cls", "mean"]: bert_embedding = series[f"my_bert_{type}_embedding"] series[f"my_full_{type}_embedding"] = get_concat_embedding( bert_embedding=bert_embedding, other_features=other_features ) return series