ctr-ll4 / src /regression /datasets /RegressionDataset.py
sanjin7's picture
Upload src/ with huggingface_hub
cea4a4b
import os
import numpy as np
import pandas as pd
from dotenv import load_dotenv
from langdetect import detect
from loguru import logger
from sklearn.model_selection import train_test_split
from time import sleep
from transformers import BertModel, AutoTokenizer
from tqdm import tqdm
import torch
from config import DEVICE
from src.utils.text_functions import clean_text, detect_language
from src.utils import (
get_sentiment,
detect_language,
)
from src.regression.PL import (
get_bert_embedding,
get_concat_embedding,
)
from src.utils.s3 import read_csv, save_csv
load_dotenv()
class RegressionDataset:
def __init__(
self,
s3: bool = False,
bucket: str = "lebesgue-data-science",
folder: str = os.getenv("GLOBAL_PATH_TO_REPO") + "/data",
s3_folder: str = "transformers/data",
):
self.s3 = s3
self.bucket = bucket
if self.s3:
self.folder = s3_folder
else:
self.folder = folder
self.original_path = f"{self.folder}/original.csv"
self.untrimmed_path = f"{self.folder}/untrimmed.csv"
self.normalized_path = f"{self.folder}/normalized.csv"
self.trimmed_path = f"{self.folder}/trimmed.csv"
self.train_path = f"{self.folder}/train.csv"
self.val_path = f"{self.folder}/val.csv"
self.test_path = f"{self.folder}/test.csv"
self.text_types = ["primary", "title", "description"]
self.col_func_dict = {
"number": len,
"len": lambda texts: np.mean([len(text) for text in texts]),
}
@property
def original(self) -> pd.DataFrame:
df = read_csv(path=self.original_path, s3=self.s3, s3_args={"bucket": self.bucket})
return df
@property
def untrimmed(self) -> pd.DataFrame:
df = read_csv(path=self.untrimmed_path, s3=self.s3, s3_args={"bucket": self.bucket})
return df
@property
def normalized(self) -> pd.DataFrame:
df = read_csv(path=self.normalized_path, s3=self.s3, s3_args={"bucket": self.bucket})
return df
@property
def trimmed(self) -> pd.DataFrame:
df = read_csv(path=self.trimmed_path, s3=self.s3, s3_args={"bucket": self.bucket})
return df
@property
def train(self) -> pd.DataFrame:
df = read_csv(path=self.train_path, s3=self.s3, s3_args={"bucket": self.bucket})
return df
@property
def val(self) -> pd.DataFrame:
df = read_csv(path=self.val_path, s3=self.s3, s3_args={"bucket": self.bucket})
return df
@property
def test(self) -> pd.DataFrame:
df = read_csv(path=self.test_path, s3=self.s3, s3_args={"bucket": self.bucket})
return df
def normalize_untrimmed(self, group_cols: list[str] = ["text", "target", "shop_id"]) -> pd.DataFrame:
df = self.untrimmed
grouped = df.groupby(group_cols)
filters_df = grouped.agg({"impr": "sum", "spend": "sum"}).reset_index()
ctr = grouped.apply(lambda df: df.link_clicks.sum() / df.impr.sum())
ctr_df = pd.DataFrame(ctr, columns=["ctr"]).reset_index()
normalised = filters_df.merge(ctr_df, on=group_cols)
merged = df.merge(normalised, on=group_cols, validate="m:1", suffixes=["___", None])
merged.drop(list([col for col in merged.columns if "___" in col]), inplace=True, axis=1)
final = merged.drop_duplicates(group_cols)
save_csv(
df=final,
path=self.normalized_path,
s3=self.s3,
s3_args={"bucket": self.bucket},
)
return df
def expand_untrimmed(self, update_existing_columns: bool = False) -> pd.DataFrame:
df = self.untrimmed
# normalise target by adset
# df["ctr_norm"] = (
# df.groupby(["shop_id", "adset_id"])
# .ctr.transform(lambda x: (x - x.mean()) / x.std())
# .count()
# )
new_col_func_dict = self.col_func_dict
if not update_existing_columns:
new_col_func_dict = {
col: fun for col, fun in new_col_func_dict.items() if "primary_" + col not in df.columns
}
# get extra columns
for col, func in new_col_func_dict.items():
logger.debug(col)
for text_type in self.text_types:
df[f"{text_type}_{col}"] = df[text_type].apply(func)
df["has_text"] = df.apply(
lambda df: bool(df.primary_number + df.title_number + df.description_number),
axis=1,
)
# text columns
df = df.apply(_get_text, axis=1)
df = df.apply(_get_concatinated_text, axis=1)
df["language"] = df.text.apply(detect_language)
df = df[df.language == "en"]
df = df[df.ctr.notna()]
save_csv(df=df, path=self.untrimmed_path, s3=self.s3, s3_args={"bucket": self.bucket})
return df
def trim(self, min_impr: int = 900, min_spend: float = 90) -> pd.DataFrame:
df = self.normalized
df = df[(df.impr >= min_impr) & (df.spend >= min_spend)]
df = df[df.target == "acquisition"]
df = df[df.aov.notna()]
df = df[df.has_text == True]
save_csv(df=df, path=self.trimmed_path, s3=self.s3, s3_args={"bucket": self.bucket})
return df
def expand_trimmed(
self, bert: BertModel = None, tokenizer: AutoTokenizer = None, add_bert_embeddings_bool: bool = False
) -> pd.DataFrame:
df = self.trimmed
# clean text
for col in ["text", "concat_text"]:
df[f"{col}_clean"] = df[col].apply(clean_text)
df["text_clean_sentiment"] = df.text_clean.apply(get_sentiment)
if add_bert_embeddings_bool:
if tokenizer is None or bert is None:
raise ValueError("tokenizer or bert is None")
layer_dict = {"bert": bert, "tokenizer": tokenizer}
df = add_bert_embeddings(df=df, save_path=self.trimmed_path, layer_dict=layer_dict)
df = df.apply(add_concat_embeddings, axis=1)
save_csv(df=df, path=self.trimmed_path, s3=self.s3, s3_args={"bucket": self.bucket})
return df
def split_into_train_and_test(
self,
) -> tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]:
df = self.trimmed
train, test = train_test_split(df, train_size=0.9, random_state=42)
train, val = train_test_split(train, train_size=0.85, random_state=42)
save_csv(df=train, path=self.train_path, s3=self.s3, s3_args={"bucket": self.bucket})
save_csv(df=val, path=self.val_path, s3=self.s3, s3_args={"bucket": self.bucket})
save_csv(df=test, path=self.test_path, s3=self.s3, s3_args={"bucket": self.bucket})
return train, val, test
def expand_normalise_trim_split(
self,
update_existing_columns: bool = False,
group_cols=["text", "target", "shop_id"],
) -> tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]:
self.expand_untrimmed(update_existing_columns=update_existing_columns)
self.normalize_untrimmed(group_cols=group_cols)
self.trim()
self.expand_trimmed()
train, val, test = self.split_into_train_and_test()
return train, val, test
def _get_text(ad: pd.Series) -> pd.Series:
if ad.primary_number > 0:
ad["text"] = ad.primary[0]
elif ad.description_number > 0:
ad["text"] = ad.description[0]
elif ad.title_number > 0:
ad["text"] = ad.title[0]
else:
ad["text"] = None
return ad
def _get_concatinated_text(ad: pd.Series) -> pd.Series:
concat_text = ""
if ad.primary_number > 0:
concat_text = concat_text + ad.primary[0]
if ad.description_number > 0:
concat_text = concat_text + ad.description[0]
if ad.title_number > 0:
concat_text = concat_text + ad.title[0]
ad["concat_text"] = concat_text
return ad
regression_dataset = RegressionDataset()
regression_dataset_s3 = RegressionDataset(s3=True)
def add_bert_embeddings(df: pd.DataFrame, save_path: str, layer_dict: dict = {}, device=DEVICE) -> pd.DataFrame:
if device == torch.device("cuda"):
df["my_bert_cls_embedding"] = df.text_clean.apply(
lambda text: get_bert_embedding(text=text, cls=True, layer_dict=layer_dict)
)
df["my_bert_mean_embedding"] = df.text_clean.apply(
lambda text: get_bert_embedding(text=text, cls=False, layer_dict=layer_dict)
)
return df
if "my_bert_cls_embedding" not in df.columns:
df["my_bert_cls_embedding"] = None
if "my_bert_mean_embedding" not in df.columns:
df["my_bert_mean_embedding"] = None
counter = 0
df["my_bert_cls_embedding"] = df["my_bert_cls_embedding"].astype(object)
df["my_bert_mean_embedding"] = df["my_bert_mean_embedding"].astype(object)
for i in tqdm(range(len(df))):
if df.at[i, "my_bert_cls_embedding"] is not None:
df.at[i, "my_bert_cls_embedding"] = get_bert_embedding(
text=df.at[i, "text_clean"], cls=False, layer_dict=layer_dict
)
counter = counter + 1
sleep(0.5)
if df.at[i, "my_bert_mean_embedding"] is not None:
df.at[i, "my_bert_mean_embedding"] = get_bert_embedding(
text=df.at[i, "text_clean"], cls=True, layer_dict=layer_dict
)
counter = counter + 1
sleep(0.5)
if counter % 50 in [0, 1]:
df.to_csv(save_path, index=False)
df.to_csv(save_path, index=False)
return df
def add_concat_embeddings(series: pd.DataFrame) -> pd.Series:
other_features = {"aov": series["aov"]} | series["text_clean_sentiment"]
for type in ["cls", "mean"]:
bert_embedding = series[f"my_bert_{type}_embedding"]
series[f"my_full_{type}_embedding"] = get_concat_embedding(
bert_embedding=bert_embedding, other_features=other_features
)
return series