Spaces:
Running
on
Zero
Running
on
Zero
| import json | |
| import os | |
| from typing import Any, Dict, List | |
| from cdk_config import ( # Import necessary config | |
| ALB_NAME, | |
| AWS_REGION, | |
| CDK_CONFIG_PATH, | |
| CDK_FOLDER, | |
| CODEBUILD_PROJECT_NAME, | |
| CODEBUILD_ROLE_NAME, | |
| COGNITO_USER_POOL_CLIENT_NAME, | |
| COGNITO_USER_POOL_CLIENT_SECRET_NAME, | |
| COGNITO_USER_POOL_NAME, | |
| CONTEXT_FILE, | |
| ECR_CDK_REPO_NAME, | |
| ECS_TASK_EXECUTION_ROLE_NAME, | |
| ECS_TASK_ROLE_NAME, | |
| PRIVATE_SUBNET_AVAILABILITY_ZONES, | |
| PRIVATE_SUBNET_CIDR_BLOCKS, | |
| PRIVATE_SUBNETS_TO_USE, | |
| PUBLIC_SUBNET_AVAILABILITY_ZONES, | |
| PUBLIC_SUBNET_CIDR_BLOCKS, | |
| PUBLIC_SUBNETS_TO_USE, | |
| S3_LOG_CONFIG_BUCKET_NAME, | |
| S3_OUTPUT_BUCKET_NAME, | |
| VPC_NAME, | |
| WEB_ACL_NAME, | |
| ) | |
| from cdk_functions import ( # Import your check functions (assuming they use Boto3) | |
| _get_existing_subnets_in_vpc, | |
| check_alb_exists, | |
| check_codebuild_project_exists, | |
| check_ecr_repo_exists, | |
| check_for_existing_role, | |
| check_for_existing_user_pool, | |
| check_for_existing_user_pool_client, | |
| check_for_secret, | |
| check_s3_bucket_exists, | |
| check_subnet_exists_by_name, | |
| check_web_acl_exists, | |
| get_vpc_id_by_name, | |
| validate_subnet_creation_parameters, | |
| # Add other check functions as needed | |
| ) | |
| cdk_folder = CDK_FOLDER # <FULL_PATH_TO_CDK_FOLDER_HERE> | |
| # Full path needed to find config file | |
| os.environ["CDK_CONFIG_PATH"] = cdk_folder + CDK_CONFIG_PATH | |
| # --- Helper to parse environment variables into lists --- | |
| def _get_env_list(env_var_name: str) -> List[str]: | |
| """Parses a comma-separated environment variable into a list of strings.""" | |
| value = env_var_name[1:-1].strip().replace('"', "").replace("'", "") | |
| if not value: | |
| return [] | |
| # Split by comma and filter out any empty strings that might result from extra commas | |
| return [s.strip() for s in value.split(",") if s.strip()] | |
| if PUBLIC_SUBNETS_TO_USE and not isinstance(PUBLIC_SUBNETS_TO_USE, list): | |
| PUBLIC_SUBNETS_TO_USE = _get_env_list(PUBLIC_SUBNETS_TO_USE) | |
| if PRIVATE_SUBNETS_TO_USE and not isinstance(PRIVATE_SUBNETS_TO_USE, list): | |
| PRIVATE_SUBNETS_TO_USE = _get_env_list(PRIVATE_SUBNETS_TO_USE) | |
| if PUBLIC_SUBNET_CIDR_BLOCKS and not isinstance(PUBLIC_SUBNET_CIDR_BLOCKS, list): | |
| PUBLIC_SUBNET_CIDR_BLOCKS = _get_env_list(PUBLIC_SUBNET_CIDR_BLOCKS) | |
| if PUBLIC_SUBNET_AVAILABILITY_ZONES and not isinstance( | |
| PUBLIC_SUBNET_AVAILABILITY_ZONES, list | |
| ): | |
| PUBLIC_SUBNET_AVAILABILITY_ZONES = _get_env_list(PUBLIC_SUBNET_AVAILABILITY_ZONES) | |
| if PRIVATE_SUBNET_CIDR_BLOCKS and not isinstance(PRIVATE_SUBNET_CIDR_BLOCKS, list): | |
| PRIVATE_SUBNET_CIDR_BLOCKS = _get_env_list(PRIVATE_SUBNET_CIDR_BLOCKS) | |
| if PRIVATE_SUBNET_AVAILABILITY_ZONES and not isinstance( | |
| PRIVATE_SUBNET_AVAILABILITY_ZONES, list | |
| ): | |
| PRIVATE_SUBNET_AVAILABILITY_ZONES = _get_env_list(PRIVATE_SUBNET_AVAILABILITY_ZONES) | |
| # Check for the existence of elements in your AWS environment to see if it's necessary to create new versions of the same | |
| def check_and_set_context(): | |
| context_data = {} | |
| # --- Find the VPC ID first --- | |
| if VPC_NAME: | |
| print("VPC_NAME:", VPC_NAME) | |
| vpc_id, nat_gateways = get_vpc_id_by_name(VPC_NAME) | |
| # If you expect only one, or one per AZ and you're creating one per AZ in CDK: | |
| if nat_gateways: | |
| # For simplicity, let's just check if *any* NAT exists in the VPC | |
| # A more robust check would match by subnet, AZ, or a specific tag. | |
| context_data["exists:NatGateway"] = True | |
| context_data["id:NatGateway"] = nat_gateways[0][ | |
| "NatGatewayId" | |
| ] # Store the ID of the first one found | |
| else: | |
| context_data["exists:NatGateway"] = False | |
| context_data["id:NatGateway"] = None | |
| if not vpc_id: | |
| # If the VPC doesn't exist, you might not be able to check/create subnets. | |
| # Decide how to handle this: raise an error, set a flag, etc. | |
| raise RuntimeError( | |
| f"Required VPC '{VPC_NAME}' not found. Cannot proceed with subnet checks." | |
| ) | |
| context_data["vpc_id"] = vpc_id # Store VPC ID in context | |
| # SUBNET CHECKS | |
| context_data: Dict[str, Any] = {} | |
| all_proposed_subnets_data: List[Dict[str, str]] = [] | |
| # Flag to indicate if full validation mode (with CIDR/AZs) is active | |
| full_validation_mode = False | |
| # Determine if full validation mode is possible/desired | |
| # It's 'desired' if CIDR/AZs are provided, and their lengths match the name lists. | |
| public_ready_for_full_validation = ( | |
| len(PUBLIC_SUBNETS_TO_USE) > 0 | |
| and len(PUBLIC_SUBNET_CIDR_BLOCKS) == len(PUBLIC_SUBNETS_TO_USE) | |
| and len(PUBLIC_SUBNET_AVAILABILITY_ZONES) == len(PUBLIC_SUBNETS_TO_USE) | |
| ) | |
| private_ready_for_full_validation = ( | |
| len(PRIVATE_SUBNETS_TO_USE) > 0 | |
| and len(PRIVATE_SUBNET_CIDR_BLOCKS) == len(PRIVATE_SUBNETS_TO_USE) | |
| and len(PRIVATE_SUBNET_AVAILABILITY_ZONES) == len(PRIVATE_SUBNETS_TO_USE) | |
| ) | |
| # Activate full validation if *any* type of subnet (public or private) has its full details provided. | |
| # You might adjust this logic if you require ALL subnet types to have CIDRs, or NONE. | |
| if public_ready_for_full_validation or private_ready_for_full_validation: | |
| full_validation_mode = True | |
| # If some are ready but others aren't, print a warning or raise an error based on your strictness | |
| if ( | |
| public_ready_for_full_validation | |
| and not private_ready_for_full_validation | |
| and PRIVATE_SUBNETS_TO_USE | |
| ): | |
| print( | |
| "Warning: Public subnets have CIDRs/AZs, but private subnets do not. Only public will be fully validated/created with CIDRs." | |
| ) | |
| if ( | |
| private_ready_for_full_validation | |
| and not public_ready_for_full_validation | |
| and PUBLIC_SUBNETS_TO_USE | |
| ): | |
| print( | |
| "Warning: Private subnets have CIDRs/AZs, but public subnets do not. Only private will be fully validated/created with CIDRs." | |
| ) | |
| # Prepare data for validate_subnet_creation_parameters for all subnets that have full details | |
| if public_ready_for_full_validation: | |
| for i, name in enumerate(PUBLIC_SUBNETS_TO_USE): | |
| all_proposed_subnets_data.append( | |
| { | |
| "name": name, | |
| "cidr": PUBLIC_SUBNET_CIDR_BLOCKS[i], | |
| "az": PUBLIC_SUBNET_AVAILABILITY_ZONES[i], | |
| } | |
| ) | |
| if private_ready_for_full_validation: | |
| for i, name in enumerate(PRIVATE_SUBNETS_TO_USE): | |
| all_proposed_subnets_data.append( | |
| { | |
| "name": name, | |
| "cidr": PRIVATE_SUBNET_CIDR_BLOCKS[i], | |
| "az": PRIVATE_SUBNET_AVAILABILITY_ZONES[i], | |
| } | |
| ) | |
| print(f"Target VPC ID for Boto3 lookup: {vpc_id}") | |
| # Fetch all existing subnets in the target VPC once to avoid repeated API calls | |
| try: | |
| existing_aws_subnets = _get_existing_subnets_in_vpc(vpc_id) | |
| except Exception as e: | |
| print(f"Failed to fetch existing VPC subnets. Aborting. Error: {e}") | |
| raise SystemExit(1) # Exit immediately if we can't get baseline data | |
| print("\n--- Running Name-Only Subnet Existence Check Mode ---") | |
| # Fallback: check only by name using the existing data | |
| checked_public_subnets = {} | |
| if PUBLIC_SUBNETS_TO_USE: | |
| for subnet_name in PUBLIC_SUBNETS_TO_USE: | |
| print("subnet_name:", subnet_name) | |
| exists, subnet_id = check_subnet_exists_by_name( | |
| subnet_name, existing_aws_subnets | |
| ) | |
| checked_public_subnets[subnet_name] = { | |
| "exists": exists, | |
| "id": subnet_id, | |
| } | |
| # If the subnet exists, remove it from the proposed subnets list | |
| if checked_public_subnets[subnet_name]["exists"] is True: | |
| all_proposed_subnets_data = [ | |
| subnet | |
| for subnet in all_proposed_subnets_data | |
| if subnet["name"] != subnet_name | |
| ] | |
| context_data["checked_public_subnets"] = checked_public_subnets | |
| checked_private_subnets = {} | |
| if PRIVATE_SUBNETS_TO_USE: | |
| for subnet_name in PRIVATE_SUBNETS_TO_USE: | |
| print("subnet_name:", subnet_name) | |
| exists, subnet_id = check_subnet_exists_by_name( | |
| subnet_name, existing_aws_subnets | |
| ) | |
| checked_private_subnets[subnet_name] = { | |
| "exists": exists, | |
| "id": subnet_id, | |
| } | |
| # If the subnet exists, remove it from the proposed subnets list | |
| if checked_private_subnets[subnet_name]["exists"] is True: | |
| all_proposed_subnets_data = [ | |
| subnet | |
| for subnet in all_proposed_subnets_data | |
| if subnet["name"] != subnet_name | |
| ] | |
| context_data["checked_private_subnets"] = checked_private_subnets | |
| print("\nName-only existence subnet check complete.\n") | |
| if full_validation_mode: | |
| print( | |
| "\n--- Running in Full Subnet Validation Mode (CIDR/AZs provided) ---" | |
| ) | |
| try: | |
| validate_subnet_creation_parameters( | |
| vpc_id, all_proposed_subnets_data, existing_aws_subnets | |
| ) | |
| print("\nPre-synth validation successful. Proceeding with CDK synth.\n") | |
| # Populate context_data for downstream CDK construct creation | |
| context_data["public_subnets_to_create"] = [] | |
| if public_ready_for_full_validation: | |
| for i, name in enumerate(PUBLIC_SUBNETS_TO_USE): | |
| context_data["public_subnets_to_create"].append( | |
| { | |
| "name": name, | |
| "cidr": PUBLIC_SUBNET_CIDR_BLOCKS[i], | |
| "az": PUBLIC_SUBNET_AVAILABILITY_ZONES[i], | |
| "is_public": True, | |
| } | |
| ) | |
| context_data["private_subnets_to_create"] = [] | |
| if private_ready_for_full_validation: | |
| for i, name in enumerate(PRIVATE_SUBNETS_TO_USE): | |
| context_data["private_subnets_to_create"].append( | |
| { | |
| "name": name, | |
| "cidr": PRIVATE_SUBNET_CIDR_BLOCKS[i], | |
| "az": PRIVATE_SUBNET_AVAILABILITY_ZONES[i], | |
| "is_public": False, | |
| } | |
| ) | |
| except (ValueError, Exception) as e: | |
| print(f"\nFATAL ERROR: Subnet parameter validation failed: {e}\n") | |
| raise SystemExit(1) # Exit if validation fails | |
| # Example checks and setting context values | |
| # IAM Roles | |
| role_name = CODEBUILD_ROLE_NAME | |
| exists, _, _ = check_for_existing_role(role_name) | |
| context_data[f"exists:{role_name}"] = exists # Use boolean | |
| if exists: | |
| _, role_arn, _ = check_for_existing_role(role_name) # Get ARN if needed | |
| context_data[f"arn:{role_name}"] = role_arn | |
| role_name = ECS_TASK_ROLE_NAME | |
| exists, _, _ = check_for_existing_role(role_name) | |
| context_data[f"exists:{role_name}"] = exists | |
| if exists: | |
| _, role_arn, _ = check_for_existing_role(role_name) | |
| context_data[f"arn:{role_name}"] = role_arn | |
| role_name = ECS_TASK_EXECUTION_ROLE_NAME | |
| exists, _, _ = check_for_existing_role(role_name) | |
| context_data[f"exists:{role_name}"] = exists | |
| if exists: | |
| _, role_arn, _ = check_for_existing_role(role_name) | |
| context_data[f"arn:{role_name}"] = role_arn | |
| # S3 Buckets | |
| bucket_name = S3_LOG_CONFIG_BUCKET_NAME | |
| exists, _ = check_s3_bucket_exists(bucket_name) | |
| context_data[f"exists:{bucket_name}"] = exists | |
| if exists: | |
| # You might not need the ARN if using from_bucket_name | |
| pass | |
| output_bucket_name = S3_OUTPUT_BUCKET_NAME | |
| exists, _ = check_s3_bucket_exists(output_bucket_name) | |
| context_data[f"exists:{output_bucket_name}"] = exists | |
| if exists: | |
| pass | |
| # ECR Repository | |
| repo_name = ECR_CDK_REPO_NAME | |
| exists, _ = check_ecr_repo_exists(repo_name) | |
| context_data[f"exists:{repo_name}"] = exists | |
| if exists: | |
| pass # from_repository_name is sufficient | |
| # CodeBuild Project | |
| project_name = CODEBUILD_PROJECT_NAME | |
| exists, _ = check_codebuild_project_exists(project_name) | |
| context_data[f"exists:{project_name}"] = exists | |
| if exists: | |
| # Need a way to get the ARN from the check function | |
| _, project_arn = check_codebuild_project_exists( | |
| project_name | |
| ) # Assuming it returns ARN | |
| context_data[f"arn:{project_name}"] = project_arn | |
| # ALB (by name lookup) | |
| alb_name = ALB_NAME | |
| exists, _ = check_alb_exists(alb_name, region_name=AWS_REGION) | |
| context_data[f"exists:{alb_name}"] = exists | |
| if exists: | |
| _, alb_object = check_alb_exists( | |
| alb_name, region_name=AWS_REGION | |
| ) # Assuming check returns object | |
| print("alb_object:", alb_object) | |
| context_data[f"arn:{alb_name}"] = alb_object["LoadBalancerArn"] | |
| # Cognito User Pool (by name) | |
| user_pool_name = COGNITO_USER_POOL_NAME | |
| exists, user_pool_id, _ = check_for_existing_user_pool(user_pool_name) | |
| context_data[f"exists:{user_pool_name}"] = exists | |
| if exists: | |
| context_data[f"id:{user_pool_name}"] = user_pool_id | |
| # Cognito User Pool Client (by name and pool ID) - requires User Pool ID from check | |
| if user_pool_id: | |
| user_pool_id_for_client_check = user_pool_id # context_data.get(f"id:{user_pool_name}") # Use ID from context | |
| user_pool_client_name = COGNITO_USER_POOL_CLIENT_NAME | |
| if user_pool_id_for_client_check: | |
| exists, client_id, _ = check_for_existing_user_pool_client( | |
| user_pool_client_name, user_pool_id_for_client_check | |
| ) | |
| context_data[f"exists:{user_pool_client_name}"] = exists | |
| if exists: | |
| context_data[f"id:{user_pool_client_name}"] = client_id | |
| # Secrets Manager Secret (by name) | |
| secret_name = COGNITO_USER_POOL_CLIENT_SECRET_NAME | |
| exists, _ = check_for_secret(secret_name) | |
| context_data[f"exists:{secret_name}"] = exists | |
| # You might not need the ARN if using from_secret_name_v2 | |
| # WAF Web ACL (by name and scope) | |
| web_acl_name = WEB_ACL_NAME | |
| exists, _ = check_web_acl_exists( | |
| web_acl_name, scope="CLOUDFRONT" | |
| ) # Assuming check returns object | |
| context_data[f"exists:{web_acl_name}"] = exists | |
| if exists: | |
| _, existing_web_acl = check_web_acl_exists(web_acl_name, scope="CLOUDFRONT") | |
| context_data[f"arn:{web_acl_name}"] = existing_web_acl.attr_arn | |
| # Write the context data to the file | |
| with open(CONTEXT_FILE, "w") as f: | |
| json.dump(context_data, f, indent=2) | |
| print(f"Context data written to {CONTEXT_FILE}") | |