Spaces:
Running
on
Zero
Running
on
Zero
File size: 3,058 Bytes
d864d45 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 |
import csv
import datetime
from decimal import Decimal
import boto3
from tools.config import (
AWS_REGION,
OUTPUT_FOLDER,
USAGE_LOG_DYNAMODB_TABLE_NAME,
)
# Replace with your actual table name and region
TABLE_NAME = USAGE_LOG_DYNAMODB_TABLE_NAME # Choose as appropriate
REGION = AWS_REGION
CSV_OUTPUT = OUTPUT_FOLDER + "dynamodb_logs_export.csv"
# Create DynamoDB resource
dynamodb = boto3.resource("dynamodb", region_name=REGION)
table = dynamodb.Table(TABLE_NAME)
# Helper function to convert Decimal to float or int
def convert_types(item):
new_item = {}
for key, value in item.items():
# Handle Decimals first
if isinstance(value, Decimal):
new_item[key] = int(value) if value % 1 == 0 else float(value)
# Handle Strings that might be dates
elif isinstance(value, str):
try:
# Attempt to parse a common ISO 8601 format.
# The .replace() handles the 'Z' for Zulu/UTC time.
dt_obj = datetime.datetime.fromisoformat(value.replace("Z", "+00:00"))
# Now that we have a datetime object, format it as desired
new_item[key] = dt_obj.strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]
except (ValueError, TypeError):
# If it fails to parse, it's just a regular string
new_item[key] = value
# Handle all other types
else:
new_item[key] = value
return new_item
# Paginated scan
def scan_table():
items = []
response = table.scan()
items.extend(response["Items"])
while "LastEvaluatedKey" in response:
response = table.scan(ExclusiveStartKey=response["LastEvaluatedKey"])
items.extend(response["Items"])
return items
# Export to CSV
def export_to_csv(items, output_path, fields_to_drop: list = None):
if not items:
print("No items found.")
return
# Use a set for efficient lookup
drop_set = set(fields_to_drop or [])
# Get a comprehensive list of all possible headers from all items
all_keys = set()
for item in items:
all_keys.update(item.keys())
# Determine the final fieldnames by subtracting the ones to drop
fieldnames = sorted(list(all_keys - drop_set))
print("Final CSV columns will be:", fieldnames)
with open(output_path, "w", newline="", encoding="utf-8-sig") as csvfile:
# The key fix is here: extrasaction='ignore'
# restval='' is also good practice to handle rows that are missing a key
writer = csv.DictWriter(
csvfile, fieldnames=fieldnames, extrasaction="ignore", restval=""
)
writer.writeheader()
for item in items:
# The convert_types function can now return the full dict,
# and the writer will simply ignore the extra fields.
writer.writerow(convert_types(item))
print(f"Exported {len(items)} items to {output_path}")
# Run export
items = scan_table()
export_to_csv(items, CSV_OUTPUT, fields_to_drop=[])
|