|
|
import os |
|
|
from typing import List, Type |
|
|
|
|
|
import boto3 |
|
|
import pandas as pd |
|
|
|
|
|
from tools.config import ( |
|
|
AWS_REGION, |
|
|
DOCUMENT_REDACTION_BUCKET, |
|
|
RUN_AWS_FUNCTIONS, |
|
|
S3_OUTPUTS_BUCKET, |
|
|
SAVE_LOGS_TO_CSV, |
|
|
) |
|
|
from tools.secure_path_utils import secure_join |
|
|
|
|
|
PandasDataFrame = Type[pd.DataFrame] |
|
|
|
|
|
|
|
|
def get_assumed_role_info(): |
|
|
sts_endpoint = "https://sts." + AWS_REGION + ".amazonaws.com" |
|
|
sts = boto3.client("sts", region_name=AWS_REGION, endpoint_url=sts_endpoint) |
|
|
response = sts.get_caller_identity() |
|
|
|
|
|
|
|
|
assumed_role_arn = response["Arn"] |
|
|
|
|
|
|
|
|
assumed_role_name = assumed_role_arn.split("/")[-1] |
|
|
|
|
|
return assumed_role_arn, assumed_role_name |
|
|
|
|
|
|
|
|
if RUN_AWS_FUNCTIONS: |
|
|
try: |
|
|
session = boto3.Session(region_name=AWS_REGION) |
|
|
|
|
|
except Exception as e: |
|
|
print("Could not start boto3 session:", e) |
|
|
|
|
|
try: |
|
|
assumed_role_arn, assumed_role_name = get_assumed_role_info() |
|
|
|
|
|
print("Successfully assumed ARN role") |
|
|
|
|
|
|
|
|
|
|
|
except Exception as e: |
|
|
print("Could not get assumed role from STS:", e) |
|
|
|
|
|
|
|
|
|
|
|
def download_file_from_s3( |
|
|
bucket_name: str, |
|
|
key: str, |
|
|
local_file_path_and_name: str, |
|
|
RUN_AWS_FUNCTIONS: bool = RUN_AWS_FUNCTIONS, |
|
|
): |
|
|
|
|
|
if RUN_AWS_FUNCTIONS: |
|
|
|
|
|
try: |
|
|
|
|
|
os.makedirs(os.path.dirname(local_file_path_and_name), exist_ok=True) |
|
|
|
|
|
s3 = boto3.client("s3", region_name=AWS_REGION) |
|
|
s3.download_file(bucket_name, key, local_file_path_and_name) |
|
|
print( |
|
|
f"File downloaded from s3://{bucket_name}/{key} to {local_file_path_and_name}" |
|
|
) |
|
|
except Exception as e: |
|
|
print("Could not download file:", key, "from s3 due to", e) |
|
|
|
|
|
|
|
|
def download_folder_from_s3( |
|
|
bucket_name: str, |
|
|
s3_folder: str, |
|
|
local_folder: str, |
|
|
RUN_AWS_FUNCTIONS: bool = RUN_AWS_FUNCTIONS, |
|
|
): |
|
|
""" |
|
|
Download all files from an S3 folder to a local folder. |
|
|
""" |
|
|
if RUN_AWS_FUNCTIONS: |
|
|
if bucket_name and s3_folder and local_folder: |
|
|
|
|
|
s3 = boto3.client("s3", region_name=AWS_REGION) |
|
|
|
|
|
|
|
|
response = s3.list_objects_v2(Bucket=bucket_name, Prefix=s3_folder) |
|
|
|
|
|
|
|
|
for obj in response.get("Contents", []): |
|
|
|
|
|
object_key = obj["Key"] |
|
|
local_file_path = secure_join( |
|
|
local_folder, os.path.relpath(object_key, s3_folder) |
|
|
) |
|
|
|
|
|
|
|
|
os.makedirs(os.path.dirname(local_file_path), exist_ok=True) |
|
|
|
|
|
|
|
|
try: |
|
|
s3.download_file(bucket_name, object_key, local_file_path) |
|
|
print( |
|
|
f"Downloaded 's3://{bucket_name}/{object_key}' to '{local_file_path}'" |
|
|
) |
|
|
except Exception as e: |
|
|
print(f"Error downloading 's3://{bucket_name}/{object_key}':", e) |
|
|
else: |
|
|
print( |
|
|
"One or more required variables are empty, could not download from S3" |
|
|
) |
|
|
|
|
|
|
|
|
def download_files_from_s3( |
|
|
bucket_name: str, |
|
|
s3_folder: str, |
|
|
local_folder: str, |
|
|
filenames: List[str], |
|
|
RUN_AWS_FUNCTIONS: bool = RUN_AWS_FUNCTIONS, |
|
|
): |
|
|
""" |
|
|
Download specific files from an S3 folder to a local folder. |
|
|
""" |
|
|
|
|
|
if RUN_AWS_FUNCTIONS: |
|
|
if bucket_name and s3_folder and local_folder and filenames: |
|
|
|
|
|
s3 = boto3.client("s3", region_name=AWS_REGION) |
|
|
|
|
|
print("Trying to download file: ", filenames) |
|
|
|
|
|
if filenames == "*": |
|
|
|
|
|
print("Trying to download all files in AWS folder: ", s3_folder) |
|
|
response = s3.list_objects_v2(Bucket=bucket_name, Prefix=s3_folder) |
|
|
|
|
|
print("Found files in AWS folder: ", response.get("Contents", [])) |
|
|
|
|
|
filenames = [ |
|
|
obj["Key"].split("/")[-1] for obj in response.get("Contents", []) |
|
|
] |
|
|
|
|
|
print("Found filenames in AWS folder: ", filenames) |
|
|
|
|
|
for filename in filenames: |
|
|
object_key = secure_join(s3_folder, filename) |
|
|
local_file_path = secure_join(local_folder, filename) |
|
|
|
|
|
|
|
|
os.makedirs(os.path.dirname(local_file_path), exist_ok=True) |
|
|
|
|
|
|
|
|
try: |
|
|
s3.download_file(bucket_name, object_key, local_file_path) |
|
|
print( |
|
|
f"Downloaded 's3://{bucket_name}/{object_key}' to '{local_file_path}'" |
|
|
) |
|
|
except Exception as e: |
|
|
print(f"Error downloading 's3://{bucket_name}/{object_key}':", e) |
|
|
|
|
|
else: |
|
|
print( |
|
|
"One or more required variables are empty, could not download from S3" |
|
|
) |
|
|
|
|
|
|
|
|
def upload_file_to_s3( |
|
|
local_file_paths: List[str], |
|
|
s3_key: str, |
|
|
s3_bucket: str = DOCUMENT_REDACTION_BUCKET, |
|
|
RUN_AWS_FUNCTIONS: bool = RUN_AWS_FUNCTIONS, |
|
|
): |
|
|
""" |
|
|
Uploads a file from local machine to Amazon S3. |
|
|
|
|
|
Args: |
|
|
- local_file_path: Local file path(s) of the file(s) to upload. |
|
|
- s3_key: Key (path) to the file in the S3 bucket. |
|
|
- s3_bucket: Name of the S3 bucket. |
|
|
|
|
|
Returns: |
|
|
- Message as variable/printed to console |
|
|
""" |
|
|
final_out_message = list() |
|
|
final_out_message_str = "" |
|
|
|
|
|
if RUN_AWS_FUNCTIONS: |
|
|
try: |
|
|
if s3_bucket and s3_key and local_file_paths: |
|
|
|
|
|
s3_client = boto3.client("s3", region_name=AWS_REGION) |
|
|
|
|
|
if isinstance(local_file_paths, str): |
|
|
local_file_paths = [local_file_paths] |
|
|
|
|
|
for file in local_file_paths: |
|
|
if s3_client: |
|
|
|
|
|
try: |
|
|
|
|
|
file_name = os.path.basename(file) |
|
|
|
|
|
s3_key_full = s3_key + file_name |
|
|
|
|
|
|
|
|
s3_client.upload_file(file, s3_bucket, s3_key_full) |
|
|
out_message = ( |
|
|
"File " + file_name + " uploaded successfully!" |
|
|
) |
|
|
|
|
|
except Exception as e: |
|
|
out_message = f"Error uploading file(s): {e}" |
|
|
print(out_message) |
|
|
|
|
|
final_out_message.append(out_message) |
|
|
final_out_message_str = "\n".join(final_out_message) |
|
|
|
|
|
else: |
|
|
final_out_message_str = "Could not connect to AWS." |
|
|
else: |
|
|
final_out_message_str = ( |
|
|
"At least one essential variable is empty, could not upload to S3" |
|
|
) |
|
|
except Exception as e: |
|
|
final_out_message_str = "Could not upload files to S3 due to: " + str(e) |
|
|
print(final_out_message_str) |
|
|
else: |
|
|
final_out_message_str = "App not set to run AWS functions" |
|
|
|
|
|
return final_out_message_str |
|
|
|
|
|
|
|
|
def upload_log_file_to_s3( |
|
|
local_file_paths: List[str], |
|
|
s3_key: str, |
|
|
s3_bucket: str = DOCUMENT_REDACTION_BUCKET, |
|
|
RUN_AWS_FUNCTIONS: bool = RUN_AWS_FUNCTIONS, |
|
|
SAVE_LOGS_TO_CSV: bool = SAVE_LOGS_TO_CSV, |
|
|
): |
|
|
""" |
|
|
Uploads a log file from local machine to Amazon S3. |
|
|
|
|
|
Args: |
|
|
- local_file_path: Local file path(s) of the file(s) to upload. |
|
|
- s3_key: Key (path) to the file in the S3 bucket. |
|
|
- s3_bucket: Name of the S3 bucket. |
|
|
|
|
|
Returns: |
|
|
- Message as variable/printed to console |
|
|
""" |
|
|
final_out_message = list() |
|
|
final_out_message_str = "" |
|
|
|
|
|
if RUN_AWS_FUNCTIONS and SAVE_LOGS_TO_CSV: |
|
|
try: |
|
|
if s3_bucket and s3_key and local_file_paths: |
|
|
|
|
|
s3_client = boto3.client("s3", region_name=AWS_REGION) |
|
|
|
|
|
if isinstance(local_file_paths, str): |
|
|
local_file_paths = [local_file_paths] |
|
|
|
|
|
for file in local_file_paths: |
|
|
if s3_client: |
|
|
|
|
|
try: |
|
|
|
|
|
file_name = os.path.basename(file) |
|
|
|
|
|
s3_key_full = s3_key + file_name |
|
|
|
|
|
s3_client.upload_file(file, s3_bucket, s3_key_full) |
|
|
out_message = ( |
|
|
"File " + file_name + " uploaded successfully!" |
|
|
) |
|
|
|
|
|
|
|
|
except Exception as e: |
|
|
out_message = f"Error uploading file(s): {e}" |
|
|
print(out_message) |
|
|
|
|
|
final_out_message.append(out_message) |
|
|
final_out_message_str = "\n".join(final_out_message) |
|
|
|
|
|
else: |
|
|
final_out_message_str = "Could not connect to AWS." |
|
|
else: |
|
|
final_out_message_str = ( |
|
|
"At least one essential variable is empty, could not upload to S3" |
|
|
) |
|
|
except Exception as e: |
|
|
final_out_message_str = "Could not upload files to S3 due to: " + str(e) |
|
|
print(final_out_message_str) |
|
|
else: |
|
|
final_out_message_str = "App not set to run AWS functions" |
|
|
|
|
|
return final_out_message_str |
|
|
|
|
|
|
|
|
|
|
|
def export_outputs_to_s3( |
|
|
file_list_state, |
|
|
s3_output_folder_state_value: str, |
|
|
save_outputs_to_s3_flag: bool, |
|
|
base_file_state=None, |
|
|
s3_bucket: str = S3_OUTPUTS_BUCKET, |
|
|
): |
|
|
""" |
|
|
Upload a list of local output files to the configured S3 outputs folder. |
|
|
|
|
|
- file_list_state: Gradio dropdown state that holds a list of file paths or a |
|
|
single path/string. If blank/empty, no action is taken. |
|
|
- s3_output_folder_state_value: Final S3 key prefix (including any session hash) |
|
|
to use as the destination folder for uploads. |
|
|
- s3_bucket: Name of the S3 bucket. |
|
|
""" |
|
|
try: |
|
|
|
|
|
|
|
|
if not save_outputs_to_s3_flag: |
|
|
return |
|
|
|
|
|
if not s3_output_folder_state_value: |
|
|
|
|
|
return |
|
|
|
|
|
|
|
|
file_paths = file_list_state |
|
|
if not file_paths: |
|
|
return |
|
|
|
|
|
|
|
|
if isinstance(file_paths, str): |
|
|
file_paths = [file_paths] |
|
|
|
|
|
|
|
|
file_paths = [p for p in file_paths if p] |
|
|
if not file_paths: |
|
|
return |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
base_stem = None |
|
|
if base_file_state: |
|
|
base_path = None |
|
|
|
|
|
|
|
|
if isinstance(base_file_state, str): |
|
|
base_path = base_file_state |
|
|
elif isinstance(base_file_state, list) and base_file_state: |
|
|
first_item = base_file_state[0] |
|
|
base_path = getattr(first_item, "name", None) or str(first_item) |
|
|
else: |
|
|
base_path = getattr(base_file_state, "name", None) or str( |
|
|
base_file_state |
|
|
) |
|
|
|
|
|
if base_path: |
|
|
base_name = os.path.basename(base_path) |
|
|
base_stem, _ = os.path.splitext(base_name) |
|
|
|
|
|
|
|
|
base_prefix = s3_output_folder_state_value |
|
|
if not base_prefix.endswith("/"): |
|
|
base_prefix = base_prefix + "/" |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
for file in file_paths: |
|
|
file_name = os.path.basename(file) |
|
|
|
|
|
if base_stem: |
|
|
folder_stem = base_stem |
|
|
else: |
|
|
folder_stem, _ = os.path.splitext(file_name) |
|
|
|
|
|
per_file_prefix = base_prefix + folder_stem + "/" |
|
|
|
|
|
out_message = upload_file_to_s3( |
|
|
local_file_paths=[file], |
|
|
s3_key=per_file_prefix, |
|
|
s3_bucket=s3_bucket, |
|
|
) |
|
|
|
|
|
|
|
|
if ( |
|
|
"Error uploading file" in out_message |
|
|
or "could not upload" in out_message.lower() |
|
|
): |
|
|
print("export_outputs_to_s3 encountered issues:", out_message) |
|
|
|
|
|
print("Successfully uploaded outputs to S3") |
|
|
|
|
|
except Exception as e: |
|
|
|
|
|
print(f"export_outputs_to_s3 failed with error: {e}") |
|
|
|
|
|
|
|
|
return |
|
|
|