document_redaction / cdk /check_resources.py
seanpedrickcase's picture
Sync: Merge pull request #108 from seanpedrick-case/dev
546be9b
import json
import os
from typing import Any, Dict, List
from cdk_config import ( # Import necessary config
ALB_NAME,
AWS_REGION,
CDK_CONFIG_PATH,
CDK_FOLDER,
CODEBUILD_PROJECT_NAME,
CODEBUILD_ROLE_NAME,
COGNITO_USER_POOL_CLIENT_NAME,
COGNITO_USER_POOL_CLIENT_SECRET_NAME,
COGNITO_USER_POOL_NAME,
CONTEXT_FILE,
ECR_CDK_REPO_NAME,
ECS_TASK_EXECUTION_ROLE_NAME,
ECS_TASK_ROLE_NAME,
PRIVATE_SUBNET_AVAILABILITY_ZONES,
PRIVATE_SUBNET_CIDR_BLOCKS,
PRIVATE_SUBNETS_TO_USE,
PUBLIC_SUBNET_AVAILABILITY_ZONES,
PUBLIC_SUBNET_CIDR_BLOCKS,
PUBLIC_SUBNETS_TO_USE,
S3_LOG_CONFIG_BUCKET_NAME,
S3_OUTPUT_BUCKET_NAME,
VPC_NAME,
WEB_ACL_NAME,
)
from cdk_functions import ( # Import your check functions (assuming they use Boto3)
_get_existing_subnets_in_vpc,
check_alb_exists,
check_codebuild_project_exists,
check_ecr_repo_exists,
check_for_existing_role,
check_for_existing_user_pool,
check_for_existing_user_pool_client,
check_for_secret,
check_s3_bucket_exists,
check_subnet_exists_by_name,
check_web_acl_exists,
get_vpc_id_by_name,
validate_subnet_creation_parameters,
# Add other check functions as needed
)
cdk_folder = CDK_FOLDER # <FULL_PATH_TO_CDK_FOLDER_HERE>
# Full path needed to find config file
os.environ["CDK_CONFIG_PATH"] = cdk_folder + CDK_CONFIG_PATH
# --- Helper to parse environment variables into lists ---
def _get_env_list(env_var_name: str) -> List[str]:
"""Parses a comma-separated environment variable into a list of strings."""
value = env_var_name[1:-1].strip().replace('"', "").replace("'", "")
if not value:
return []
# Split by comma and filter out any empty strings that might result from extra commas
return [s.strip() for s in value.split(",") if s.strip()]
if PUBLIC_SUBNETS_TO_USE and not isinstance(PUBLIC_SUBNETS_TO_USE, list):
PUBLIC_SUBNETS_TO_USE = _get_env_list(PUBLIC_SUBNETS_TO_USE)
if PRIVATE_SUBNETS_TO_USE and not isinstance(PRIVATE_SUBNETS_TO_USE, list):
PRIVATE_SUBNETS_TO_USE = _get_env_list(PRIVATE_SUBNETS_TO_USE)
if PUBLIC_SUBNET_CIDR_BLOCKS and not isinstance(PUBLIC_SUBNET_CIDR_BLOCKS, list):
PUBLIC_SUBNET_CIDR_BLOCKS = _get_env_list(PUBLIC_SUBNET_CIDR_BLOCKS)
if PUBLIC_SUBNET_AVAILABILITY_ZONES and not isinstance(
PUBLIC_SUBNET_AVAILABILITY_ZONES, list
):
PUBLIC_SUBNET_AVAILABILITY_ZONES = _get_env_list(PUBLIC_SUBNET_AVAILABILITY_ZONES)
if PRIVATE_SUBNET_CIDR_BLOCKS and not isinstance(PRIVATE_SUBNET_CIDR_BLOCKS, list):
PRIVATE_SUBNET_CIDR_BLOCKS = _get_env_list(PRIVATE_SUBNET_CIDR_BLOCKS)
if PRIVATE_SUBNET_AVAILABILITY_ZONES and not isinstance(
PRIVATE_SUBNET_AVAILABILITY_ZONES, list
):
PRIVATE_SUBNET_AVAILABILITY_ZONES = _get_env_list(PRIVATE_SUBNET_AVAILABILITY_ZONES)
# Check for the existence of elements in your AWS environment to see if it's necessary to create new versions of the same
def check_and_set_context():
context_data = {}
# --- Find the VPC ID first ---
if VPC_NAME:
print("VPC_NAME:", VPC_NAME)
vpc_id, nat_gateways = get_vpc_id_by_name(VPC_NAME)
# If you expect only one, or one per AZ and you're creating one per AZ in CDK:
if nat_gateways:
# For simplicity, let's just check if *any* NAT exists in the VPC
# A more robust check would match by subnet, AZ, or a specific tag.
context_data["exists:NatGateway"] = True
context_data["id:NatGateway"] = nat_gateways[0][
"NatGatewayId"
] # Store the ID of the first one found
else:
context_data["exists:NatGateway"] = False
context_data["id:NatGateway"] = None
if not vpc_id:
# If the VPC doesn't exist, you might not be able to check/create subnets.
# Decide how to handle this: raise an error, set a flag, etc.
raise RuntimeError(
f"Required VPC '{VPC_NAME}' not found. Cannot proceed with subnet checks."
)
context_data["vpc_id"] = vpc_id # Store VPC ID in context
# SUBNET CHECKS
context_data: Dict[str, Any] = {}
all_proposed_subnets_data: List[Dict[str, str]] = []
# Flag to indicate if full validation mode (with CIDR/AZs) is active
full_validation_mode = False
# Determine if full validation mode is possible/desired
# It's 'desired' if CIDR/AZs are provided, and their lengths match the name lists.
public_ready_for_full_validation = (
len(PUBLIC_SUBNETS_TO_USE) > 0
and len(PUBLIC_SUBNET_CIDR_BLOCKS) == len(PUBLIC_SUBNETS_TO_USE)
and len(PUBLIC_SUBNET_AVAILABILITY_ZONES) == len(PUBLIC_SUBNETS_TO_USE)
)
private_ready_for_full_validation = (
len(PRIVATE_SUBNETS_TO_USE) > 0
and len(PRIVATE_SUBNET_CIDR_BLOCKS) == len(PRIVATE_SUBNETS_TO_USE)
and len(PRIVATE_SUBNET_AVAILABILITY_ZONES) == len(PRIVATE_SUBNETS_TO_USE)
)
# Activate full validation if *any* type of subnet (public or private) has its full details provided.
# You might adjust this logic if you require ALL subnet types to have CIDRs, or NONE.
if public_ready_for_full_validation or private_ready_for_full_validation:
full_validation_mode = True
# If some are ready but others aren't, print a warning or raise an error based on your strictness
if (
public_ready_for_full_validation
and not private_ready_for_full_validation
and PRIVATE_SUBNETS_TO_USE
):
print(
"Warning: Public subnets have CIDRs/AZs, but private subnets do not. Only public will be fully validated/created with CIDRs."
)
if (
private_ready_for_full_validation
and not public_ready_for_full_validation
and PUBLIC_SUBNETS_TO_USE
):
print(
"Warning: Private subnets have CIDRs/AZs, but public subnets do not. Only private will be fully validated/created with CIDRs."
)
# Prepare data for validate_subnet_creation_parameters for all subnets that have full details
if public_ready_for_full_validation:
for i, name in enumerate(PUBLIC_SUBNETS_TO_USE):
all_proposed_subnets_data.append(
{
"name": name,
"cidr": PUBLIC_SUBNET_CIDR_BLOCKS[i],
"az": PUBLIC_SUBNET_AVAILABILITY_ZONES[i],
}
)
if private_ready_for_full_validation:
for i, name in enumerate(PRIVATE_SUBNETS_TO_USE):
all_proposed_subnets_data.append(
{
"name": name,
"cidr": PRIVATE_SUBNET_CIDR_BLOCKS[i],
"az": PRIVATE_SUBNET_AVAILABILITY_ZONES[i],
}
)
print(f"Target VPC ID for Boto3 lookup: {vpc_id}")
# Fetch all existing subnets in the target VPC once to avoid repeated API calls
try:
existing_aws_subnets = _get_existing_subnets_in_vpc(vpc_id)
except Exception as e:
print(f"Failed to fetch existing VPC subnets. Aborting. Error: {e}")
raise SystemExit(1) # Exit immediately if we can't get baseline data
print("\n--- Running Name-Only Subnet Existence Check Mode ---")
# Fallback: check only by name using the existing data
checked_public_subnets = {}
if PUBLIC_SUBNETS_TO_USE:
for subnet_name in PUBLIC_SUBNETS_TO_USE:
print("subnet_name:", subnet_name)
exists, subnet_id = check_subnet_exists_by_name(
subnet_name, existing_aws_subnets
)
checked_public_subnets[subnet_name] = {
"exists": exists,
"id": subnet_id,
}
# If the subnet exists, remove it from the proposed subnets list
if checked_public_subnets[subnet_name]["exists"] is True:
all_proposed_subnets_data = [
subnet
for subnet in all_proposed_subnets_data
if subnet["name"] != subnet_name
]
context_data["checked_public_subnets"] = checked_public_subnets
checked_private_subnets = {}
if PRIVATE_SUBNETS_TO_USE:
for subnet_name in PRIVATE_SUBNETS_TO_USE:
print("subnet_name:", subnet_name)
exists, subnet_id = check_subnet_exists_by_name(
subnet_name, existing_aws_subnets
)
checked_private_subnets[subnet_name] = {
"exists": exists,
"id": subnet_id,
}
# If the subnet exists, remove it from the proposed subnets list
if checked_private_subnets[subnet_name]["exists"] is True:
all_proposed_subnets_data = [
subnet
for subnet in all_proposed_subnets_data
if subnet["name"] != subnet_name
]
context_data["checked_private_subnets"] = checked_private_subnets
print("\nName-only existence subnet check complete.\n")
if full_validation_mode:
print(
"\n--- Running in Full Subnet Validation Mode (CIDR/AZs provided) ---"
)
try:
validate_subnet_creation_parameters(
vpc_id, all_proposed_subnets_data, existing_aws_subnets
)
print("\nPre-synth validation successful. Proceeding with CDK synth.\n")
# Populate context_data for downstream CDK construct creation
context_data["public_subnets_to_create"] = []
if public_ready_for_full_validation:
for i, name in enumerate(PUBLIC_SUBNETS_TO_USE):
context_data["public_subnets_to_create"].append(
{
"name": name,
"cidr": PUBLIC_SUBNET_CIDR_BLOCKS[i],
"az": PUBLIC_SUBNET_AVAILABILITY_ZONES[i],
"is_public": True,
}
)
context_data["private_subnets_to_create"] = []
if private_ready_for_full_validation:
for i, name in enumerate(PRIVATE_SUBNETS_TO_USE):
context_data["private_subnets_to_create"].append(
{
"name": name,
"cidr": PRIVATE_SUBNET_CIDR_BLOCKS[i],
"az": PRIVATE_SUBNET_AVAILABILITY_ZONES[i],
"is_public": False,
}
)
except (ValueError, Exception) as e:
print(f"\nFATAL ERROR: Subnet parameter validation failed: {e}\n")
raise SystemExit(1) # Exit if validation fails
# Example checks and setting context values
# IAM Roles
role_name = CODEBUILD_ROLE_NAME
exists, _, _ = check_for_existing_role(role_name)
context_data[f"exists:{role_name}"] = exists # Use boolean
if exists:
_, role_arn, _ = check_for_existing_role(role_name) # Get ARN if needed
context_data[f"arn:{role_name}"] = role_arn
role_name = ECS_TASK_ROLE_NAME
exists, _, _ = check_for_existing_role(role_name)
context_data[f"exists:{role_name}"] = exists
if exists:
_, role_arn, _ = check_for_existing_role(role_name)
context_data[f"arn:{role_name}"] = role_arn
role_name = ECS_TASK_EXECUTION_ROLE_NAME
exists, _, _ = check_for_existing_role(role_name)
context_data[f"exists:{role_name}"] = exists
if exists:
_, role_arn, _ = check_for_existing_role(role_name)
context_data[f"arn:{role_name}"] = role_arn
# S3 Buckets
bucket_name = S3_LOG_CONFIG_BUCKET_NAME
exists, _ = check_s3_bucket_exists(bucket_name)
context_data[f"exists:{bucket_name}"] = exists
if exists:
# You might not need the ARN if using from_bucket_name
pass
output_bucket_name = S3_OUTPUT_BUCKET_NAME
exists, _ = check_s3_bucket_exists(output_bucket_name)
context_data[f"exists:{output_bucket_name}"] = exists
if exists:
pass
# ECR Repository
repo_name = ECR_CDK_REPO_NAME
exists, _ = check_ecr_repo_exists(repo_name)
context_data[f"exists:{repo_name}"] = exists
if exists:
pass # from_repository_name is sufficient
# CodeBuild Project
project_name = CODEBUILD_PROJECT_NAME
exists, _ = check_codebuild_project_exists(project_name)
context_data[f"exists:{project_name}"] = exists
if exists:
# Need a way to get the ARN from the check function
_, project_arn = check_codebuild_project_exists(
project_name
) # Assuming it returns ARN
context_data[f"arn:{project_name}"] = project_arn
# ALB (by name lookup)
alb_name = ALB_NAME
exists, _ = check_alb_exists(alb_name, region_name=AWS_REGION)
context_data[f"exists:{alb_name}"] = exists
if exists:
_, alb_object = check_alb_exists(
alb_name, region_name=AWS_REGION
) # Assuming check returns object
print("alb_object:", alb_object)
context_data[f"arn:{alb_name}"] = alb_object["LoadBalancerArn"]
# Cognito User Pool (by name)
user_pool_name = COGNITO_USER_POOL_NAME
exists, user_pool_id, _ = check_for_existing_user_pool(user_pool_name)
context_data[f"exists:{user_pool_name}"] = exists
if exists:
context_data[f"id:{user_pool_name}"] = user_pool_id
# Cognito User Pool Client (by name and pool ID) - requires User Pool ID from check
if user_pool_id:
user_pool_id_for_client_check = user_pool_id # context_data.get(f"id:{user_pool_name}") # Use ID from context
user_pool_client_name = COGNITO_USER_POOL_CLIENT_NAME
if user_pool_id_for_client_check:
exists, client_id, _ = check_for_existing_user_pool_client(
user_pool_client_name, user_pool_id_for_client_check
)
context_data[f"exists:{user_pool_client_name}"] = exists
if exists:
context_data[f"id:{user_pool_client_name}"] = client_id
# Secrets Manager Secret (by name)
secret_name = COGNITO_USER_POOL_CLIENT_SECRET_NAME
exists, _ = check_for_secret(secret_name)
context_data[f"exists:{secret_name}"] = exists
# You might not need the ARN if using from_secret_name_v2
# WAF Web ACL (by name and scope)
web_acl_name = WEB_ACL_NAME
exists, _ = check_web_acl_exists(
web_acl_name, scope="CLOUDFRONT"
) # Assuming check returns object
context_data[f"exists:{web_acl_name}"] = exists
if exists:
_, existing_web_acl = check_web_acl_exists(web_acl_name, scope="CLOUDFRONT")
context_data[f"arn:{web_acl_name}"] = existing_web_acl.attr_arn
# Write the context data to the file
with open(CONTEXT_FILE, "w") as f:
json.dump(context_data, f, indent=2)
print(f"Context data written to {CONTEXT_FILE}")