|
|
import os |
|
|
|
|
|
import numpy as np |
|
|
import pandas as pd |
|
|
from dotenv import load_dotenv |
|
|
from langdetect import detect |
|
|
from loguru import logger |
|
|
from sklearn.model_selection import train_test_split |
|
|
from time import sleep |
|
|
from transformers import BertModel, AutoTokenizer |
|
|
from tqdm import tqdm |
|
|
import torch |
|
|
from config import DEVICE |
|
|
|
|
|
from src.utils.text_functions import clean_text, detect_language |
|
|
from src.utils import ( |
|
|
get_sentiment, |
|
|
detect_language, |
|
|
) |
|
|
|
|
|
from src.regression.PL import ( |
|
|
get_bert_embedding, |
|
|
get_concat_embedding, |
|
|
) |
|
|
|
|
|
from src.utils.s3 import read_csv, save_csv |
|
|
|
|
|
|
|
|
load_dotenv() |
|
|
|
|
|
|
|
|
class RegressionDataset: |
|
|
def __init__( |
|
|
self, |
|
|
s3: bool = False, |
|
|
bucket: str = "lebesgue-data-science", |
|
|
folder: str = os.getenv("GLOBAL_PATH_TO_REPO") + "/data", |
|
|
s3_folder: str = "transformers/data", |
|
|
): |
|
|
self.s3 = s3 |
|
|
self.bucket = bucket |
|
|
|
|
|
if self.s3: |
|
|
self.folder = s3_folder |
|
|
else: |
|
|
self.folder = folder |
|
|
|
|
|
self.original_path = f"{self.folder}/original.csv" |
|
|
self.untrimmed_path = f"{self.folder}/untrimmed.csv" |
|
|
self.normalized_path = f"{self.folder}/normalized.csv" |
|
|
self.trimmed_path = f"{self.folder}/trimmed.csv" |
|
|
|
|
|
self.train_path = f"{self.folder}/train.csv" |
|
|
self.val_path = f"{self.folder}/val.csv" |
|
|
self.test_path = f"{self.folder}/test.csv" |
|
|
|
|
|
self.text_types = ["primary", "title", "description"] |
|
|
|
|
|
self.col_func_dict = { |
|
|
"number": len, |
|
|
"len": lambda texts: np.mean([len(text) for text in texts]), |
|
|
} |
|
|
|
|
|
@property |
|
|
def original(self) -> pd.DataFrame: |
|
|
df = read_csv(path=self.original_path, s3=self.s3, s3_args={"bucket": self.bucket}) |
|
|
return df |
|
|
|
|
|
@property |
|
|
def untrimmed(self) -> pd.DataFrame: |
|
|
df = read_csv(path=self.untrimmed_path, s3=self.s3, s3_args={"bucket": self.bucket}) |
|
|
return df |
|
|
|
|
|
@property |
|
|
def normalized(self) -> pd.DataFrame: |
|
|
df = read_csv(path=self.normalized_path, s3=self.s3, s3_args={"bucket": self.bucket}) |
|
|
return df |
|
|
|
|
|
@property |
|
|
def trimmed(self) -> pd.DataFrame: |
|
|
df = read_csv(path=self.trimmed_path, s3=self.s3, s3_args={"bucket": self.bucket}) |
|
|
return df |
|
|
|
|
|
@property |
|
|
def train(self) -> pd.DataFrame: |
|
|
df = read_csv(path=self.train_path, s3=self.s3, s3_args={"bucket": self.bucket}) |
|
|
return df |
|
|
|
|
|
@property |
|
|
def val(self) -> pd.DataFrame: |
|
|
df = read_csv(path=self.val_path, s3=self.s3, s3_args={"bucket": self.bucket}) |
|
|
return df |
|
|
|
|
|
@property |
|
|
def test(self) -> pd.DataFrame: |
|
|
df = read_csv(path=self.test_path, s3=self.s3, s3_args={"bucket": self.bucket}) |
|
|
return df |
|
|
|
|
|
def normalize_untrimmed(self, group_cols: list[str] = ["text", "target", "shop_id"]) -> pd.DataFrame: |
|
|
df = self.untrimmed |
|
|
grouped = df.groupby(group_cols) |
|
|
|
|
|
filters_df = grouped.agg({"impr": "sum", "spend": "sum"}).reset_index() |
|
|
ctr = grouped.apply(lambda df: df.link_clicks.sum() / df.impr.sum()) |
|
|
ctr_df = pd.DataFrame(ctr, columns=["ctr"]).reset_index() |
|
|
normalised = filters_df.merge(ctr_df, on=group_cols) |
|
|
|
|
|
merged = df.merge(normalised, on=group_cols, validate="m:1", suffixes=["___", None]) |
|
|
merged.drop(list([col for col in merged.columns if "___" in col]), inplace=True, axis=1) |
|
|
final = merged.drop_duplicates(group_cols) |
|
|
save_csv( |
|
|
df=final, |
|
|
path=self.normalized_path, |
|
|
s3=self.s3, |
|
|
s3_args={"bucket": self.bucket}, |
|
|
) |
|
|
return df |
|
|
|
|
|
def expand_untrimmed(self, update_existing_columns: bool = False) -> pd.DataFrame: |
|
|
|
|
|
df = self.untrimmed |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
new_col_func_dict = self.col_func_dict |
|
|
|
|
|
if not update_existing_columns: |
|
|
new_col_func_dict = { |
|
|
col: fun for col, fun in new_col_func_dict.items() if "primary_" + col not in df.columns |
|
|
} |
|
|
|
|
|
|
|
|
for col, func in new_col_func_dict.items(): |
|
|
logger.debug(col) |
|
|
for text_type in self.text_types: |
|
|
df[f"{text_type}_{col}"] = df[text_type].apply(func) |
|
|
|
|
|
df["has_text"] = df.apply( |
|
|
lambda df: bool(df.primary_number + df.title_number + df.description_number), |
|
|
axis=1, |
|
|
) |
|
|
|
|
|
|
|
|
df = df.apply(_get_text, axis=1) |
|
|
df = df.apply(_get_concatinated_text, axis=1) |
|
|
|
|
|
df["language"] = df.text.apply(detect_language) |
|
|
df = df[df.language == "en"] |
|
|
df = df[df.ctr.notna()] |
|
|
|
|
|
save_csv(df=df, path=self.untrimmed_path, s3=self.s3, s3_args={"bucket": self.bucket}) |
|
|
|
|
|
return df |
|
|
|
|
|
def trim(self, min_impr: int = 900, min_spend: float = 90) -> pd.DataFrame: |
|
|
df = self.normalized |
|
|
df = df[(df.impr >= min_impr) & (df.spend >= min_spend)] |
|
|
df = df[df.target == "acquisition"] |
|
|
df = df[df.aov.notna()] |
|
|
|
|
|
df = df[df.has_text == True] |
|
|
|
|
|
save_csv(df=df, path=self.trimmed_path, s3=self.s3, s3_args={"bucket": self.bucket}) |
|
|
|
|
|
return df |
|
|
|
|
|
def expand_trimmed( |
|
|
self, bert: BertModel = None, tokenizer: AutoTokenizer = None, add_bert_embeddings_bool: bool = False |
|
|
) -> pd.DataFrame: |
|
|
df = self.trimmed |
|
|
|
|
|
|
|
|
for col in ["text", "concat_text"]: |
|
|
df[f"{col}_clean"] = df[col].apply(clean_text) |
|
|
|
|
|
df["text_clean_sentiment"] = df.text_clean.apply(get_sentiment) |
|
|
|
|
|
if add_bert_embeddings_bool: |
|
|
if tokenizer is None or bert is None: |
|
|
raise ValueError("tokenizer or bert is None") |
|
|
layer_dict = {"bert": bert, "tokenizer": tokenizer} |
|
|
df = add_bert_embeddings(df=df, save_path=self.trimmed_path, layer_dict=layer_dict) |
|
|
|
|
|
df = df.apply(add_concat_embeddings, axis=1) |
|
|
|
|
|
save_csv(df=df, path=self.trimmed_path, s3=self.s3, s3_args={"bucket": self.bucket}) |
|
|
return df |
|
|
|
|
|
def split_into_train_and_test( |
|
|
self, |
|
|
) -> tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]: |
|
|
df = self.trimmed |
|
|
train, test = train_test_split(df, train_size=0.9, random_state=42) |
|
|
train, val = train_test_split(train, train_size=0.85, random_state=42) |
|
|
save_csv(df=train, path=self.train_path, s3=self.s3, s3_args={"bucket": self.bucket}) |
|
|
save_csv(df=val, path=self.val_path, s3=self.s3, s3_args={"bucket": self.bucket}) |
|
|
save_csv(df=test, path=self.test_path, s3=self.s3, s3_args={"bucket": self.bucket}) |
|
|
return train, val, test |
|
|
|
|
|
def expand_normalise_trim_split( |
|
|
self, |
|
|
update_existing_columns: bool = False, |
|
|
group_cols=["text", "target", "shop_id"], |
|
|
) -> tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]: |
|
|
self.expand_untrimmed(update_existing_columns=update_existing_columns) |
|
|
self.normalize_untrimmed(group_cols=group_cols) |
|
|
self.trim() |
|
|
self.expand_trimmed() |
|
|
train, val, test = self.split_into_train_and_test() |
|
|
return train, val, test |
|
|
|
|
|
|
|
|
def _get_text(ad: pd.Series) -> pd.Series: |
|
|
|
|
|
if ad.primary_number > 0: |
|
|
ad["text"] = ad.primary[0] |
|
|
|
|
|
elif ad.description_number > 0: |
|
|
ad["text"] = ad.description[0] |
|
|
|
|
|
elif ad.title_number > 0: |
|
|
ad["text"] = ad.title[0] |
|
|
|
|
|
else: |
|
|
ad["text"] = None |
|
|
|
|
|
return ad |
|
|
|
|
|
|
|
|
def _get_concatinated_text(ad: pd.Series) -> pd.Series: |
|
|
|
|
|
concat_text = "" |
|
|
|
|
|
if ad.primary_number > 0: |
|
|
concat_text = concat_text + ad.primary[0] |
|
|
|
|
|
if ad.description_number > 0: |
|
|
concat_text = concat_text + ad.description[0] |
|
|
|
|
|
if ad.title_number > 0: |
|
|
concat_text = concat_text + ad.title[0] |
|
|
|
|
|
ad["concat_text"] = concat_text |
|
|
|
|
|
return ad |
|
|
|
|
|
|
|
|
regression_dataset = RegressionDataset() |
|
|
|
|
|
regression_dataset_s3 = RegressionDataset(s3=True) |
|
|
|
|
|
|
|
|
def add_bert_embeddings(df: pd.DataFrame, save_path: str, layer_dict: dict = {}, device=DEVICE) -> pd.DataFrame: |
|
|
|
|
|
if device == torch.device("cuda"): |
|
|
df["my_bert_cls_embedding"] = df.text_clean.apply( |
|
|
lambda text: get_bert_embedding(text=text, cls=True, layer_dict=layer_dict) |
|
|
) |
|
|
df["my_bert_mean_embedding"] = df.text_clean.apply( |
|
|
lambda text: get_bert_embedding(text=text, cls=False, layer_dict=layer_dict) |
|
|
) |
|
|
return df |
|
|
|
|
|
if "my_bert_cls_embedding" not in df.columns: |
|
|
df["my_bert_cls_embedding"] = None |
|
|
|
|
|
if "my_bert_mean_embedding" not in df.columns: |
|
|
df["my_bert_mean_embedding"] = None |
|
|
|
|
|
counter = 0 |
|
|
|
|
|
df["my_bert_cls_embedding"] = df["my_bert_cls_embedding"].astype(object) |
|
|
df["my_bert_mean_embedding"] = df["my_bert_mean_embedding"].astype(object) |
|
|
|
|
|
for i in tqdm(range(len(df))): |
|
|
|
|
|
if df.at[i, "my_bert_cls_embedding"] is not None: |
|
|
df.at[i, "my_bert_cls_embedding"] = get_bert_embedding( |
|
|
text=df.at[i, "text_clean"], cls=False, layer_dict=layer_dict |
|
|
) |
|
|
counter = counter + 1 |
|
|
sleep(0.5) |
|
|
|
|
|
if df.at[i, "my_bert_mean_embedding"] is not None: |
|
|
df.at[i, "my_bert_mean_embedding"] = get_bert_embedding( |
|
|
text=df.at[i, "text_clean"], cls=True, layer_dict=layer_dict |
|
|
) |
|
|
counter = counter + 1 |
|
|
sleep(0.5) |
|
|
|
|
|
if counter % 50 in [0, 1]: |
|
|
df.to_csv(save_path, index=False) |
|
|
|
|
|
df.to_csv(save_path, index=False) |
|
|
|
|
|
return df |
|
|
|
|
|
|
|
|
def add_concat_embeddings(series: pd.DataFrame) -> pd.Series: |
|
|
other_features = {"aov": series["aov"]} | series["text_clean_sentiment"] |
|
|
|
|
|
for type in ["cls", "mean"]: |
|
|
bert_embedding = series[f"my_bert_{type}_embedding"] |
|
|
series[f"my_full_{type}_embedding"] = get_concat_embedding( |
|
|
bert_embedding=bert_embedding, other_features=other_features |
|
|
) |
|
|
|
|
|
return series |
|
|
|