|
|
import json |
|
|
import os |
|
|
from typing import Any, Dict, List |
|
|
|
|
|
from cdk_config import ( |
|
|
ALB_NAME, |
|
|
AWS_REGION, |
|
|
CDK_CONFIG_PATH, |
|
|
CDK_FOLDER, |
|
|
CODEBUILD_PROJECT_NAME, |
|
|
CODEBUILD_ROLE_NAME, |
|
|
COGNITO_USER_POOL_CLIENT_NAME, |
|
|
COGNITO_USER_POOL_CLIENT_SECRET_NAME, |
|
|
COGNITO_USER_POOL_NAME, |
|
|
CONTEXT_FILE, |
|
|
ECR_CDK_REPO_NAME, |
|
|
ECS_TASK_EXECUTION_ROLE_NAME, |
|
|
ECS_TASK_ROLE_NAME, |
|
|
PRIVATE_SUBNET_AVAILABILITY_ZONES, |
|
|
PRIVATE_SUBNET_CIDR_BLOCKS, |
|
|
PRIVATE_SUBNETS_TO_USE, |
|
|
PUBLIC_SUBNET_AVAILABILITY_ZONES, |
|
|
PUBLIC_SUBNET_CIDR_BLOCKS, |
|
|
PUBLIC_SUBNETS_TO_USE, |
|
|
S3_LOG_CONFIG_BUCKET_NAME, |
|
|
S3_OUTPUT_BUCKET_NAME, |
|
|
VPC_NAME, |
|
|
WEB_ACL_NAME, |
|
|
) |
|
|
from cdk_functions import ( |
|
|
_get_existing_subnets_in_vpc, |
|
|
check_alb_exists, |
|
|
check_codebuild_project_exists, |
|
|
check_ecr_repo_exists, |
|
|
check_for_existing_role, |
|
|
check_for_existing_user_pool, |
|
|
check_for_existing_user_pool_client, |
|
|
check_for_secret, |
|
|
check_s3_bucket_exists, |
|
|
check_subnet_exists_by_name, |
|
|
check_web_acl_exists, |
|
|
get_vpc_id_by_name, |
|
|
validate_subnet_creation_parameters, |
|
|
|
|
|
) |
|
|
|
|
|
cdk_folder = CDK_FOLDER |
|
|
|
|
|
|
|
|
os.environ["CDK_CONFIG_PATH"] = cdk_folder + CDK_CONFIG_PATH |
|
|
|
|
|
|
|
|
|
|
|
def _get_env_list(env_var_name: str) -> List[str]: |
|
|
"""Parses a comma-separated environment variable into a list of strings.""" |
|
|
value = env_var_name[1:-1].strip().replace('"', "").replace("'", "") |
|
|
if not value: |
|
|
return [] |
|
|
|
|
|
return [s.strip() for s in value.split(",") if s.strip()] |
|
|
|
|
|
|
|
|
if PUBLIC_SUBNETS_TO_USE and not isinstance(PUBLIC_SUBNETS_TO_USE, list): |
|
|
PUBLIC_SUBNETS_TO_USE = _get_env_list(PUBLIC_SUBNETS_TO_USE) |
|
|
if PRIVATE_SUBNETS_TO_USE and not isinstance(PRIVATE_SUBNETS_TO_USE, list): |
|
|
PRIVATE_SUBNETS_TO_USE = _get_env_list(PRIVATE_SUBNETS_TO_USE) |
|
|
if PUBLIC_SUBNET_CIDR_BLOCKS and not isinstance(PUBLIC_SUBNET_CIDR_BLOCKS, list): |
|
|
PUBLIC_SUBNET_CIDR_BLOCKS = _get_env_list(PUBLIC_SUBNET_CIDR_BLOCKS) |
|
|
if PUBLIC_SUBNET_AVAILABILITY_ZONES and not isinstance( |
|
|
PUBLIC_SUBNET_AVAILABILITY_ZONES, list |
|
|
): |
|
|
PUBLIC_SUBNET_AVAILABILITY_ZONES = _get_env_list(PUBLIC_SUBNET_AVAILABILITY_ZONES) |
|
|
if PRIVATE_SUBNET_CIDR_BLOCKS and not isinstance(PRIVATE_SUBNET_CIDR_BLOCKS, list): |
|
|
PRIVATE_SUBNET_CIDR_BLOCKS = _get_env_list(PRIVATE_SUBNET_CIDR_BLOCKS) |
|
|
if PRIVATE_SUBNET_AVAILABILITY_ZONES and not isinstance( |
|
|
PRIVATE_SUBNET_AVAILABILITY_ZONES, list |
|
|
): |
|
|
PRIVATE_SUBNET_AVAILABILITY_ZONES = _get_env_list(PRIVATE_SUBNET_AVAILABILITY_ZONES) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def check_and_set_context(): |
|
|
context_data = {} |
|
|
|
|
|
|
|
|
if VPC_NAME: |
|
|
print("VPC_NAME:", VPC_NAME) |
|
|
vpc_id, nat_gateways = get_vpc_id_by_name(VPC_NAME) |
|
|
|
|
|
|
|
|
if nat_gateways: |
|
|
|
|
|
|
|
|
context_data["exists:NatGateway"] = True |
|
|
context_data["id:NatGateway"] = nat_gateways[0][ |
|
|
"NatGatewayId" |
|
|
] |
|
|
else: |
|
|
context_data["exists:NatGateway"] = False |
|
|
context_data["id:NatGateway"] = None |
|
|
|
|
|
if not vpc_id: |
|
|
|
|
|
|
|
|
raise RuntimeError( |
|
|
f"Required VPC '{VPC_NAME}' not found. Cannot proceed with subnet checks." |
|
|
) |
|
|
|
|
|
context_data["vpc_id"] = vpc_id |
|
|
|
|
|
|
|
|
context_data: Dict[str, Any] = {} |
|
|
all_proposed_subnets_data: List[Dict[str, str]] = [] |
|
|
|
|
|
|
|
|
full_validation_mode = False |
|
|
|
|
|
|
|
|
|
|
|
public_ready_for_full_validation = ( |
|
|
len(PUBLIC_SUBNETS_TO_USE) > 0 |
|
|
and len(PUBLIC_SUBNET_CIDR_BLOCKS) == len(PUBLIC_SUBNETS_TO_USE) |
|
|
and len(PUBLIC_SUBNET_AVAILABILITY_ZONES) == len(PUBLIC_SUBNETS_TO_USE) |
|
|
) |
|
|
private_ready_for_full_validation = ( |
|
|
len(PRIVATE_SUBNETS_TO_USE) > 0 |
|
|
and len(PRIVATE_SUBNET_CIDR_BLOCKS) == len(PRIVATE_SUBNETS_TO_USE) |
|
|
and len(PRIVATE_SUBNET_AVAILABILITY_ZONES) == len(PRIVATE_SUBNETS_TO_USE) |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
if public_ready_for_full_validation or private_ready_for_full_validation: |
|
|
full_validation_mode = True |
|
|
|
|
|
|
|
|
if ( |
|
|
public_ready_for_full_validation |
|
|
and not private_ready_for_full_validation |
|
|
and PRIVATE_SUBNETS_TO_USE |
|
|
): |
|
|
print( |
|
|
"Warning: Public subnets have CIDRs/AZs, but private subnets do not. Only public will be fully validated/created with CIDRs." |
|
|
) |
|
|
if ( |
|
|
private_ready_for_full_validation |
|
|
and not public_ready_for_full_validation |
|
|
and PUBLIC_SUBNETS_TO_USE |
|
|
): |
|
|
print( |
|
|
"Warning: Private subnets have CIDRs/AZs, but public subnets do not. Only private will be fully validated/created with CIDRs." |
|
|
) |
|
|
|
|
|
|
|
|
if public_ready_for_full_validation: |
|
|
for i, name in enumerate(PUBLIC_SUBNETS_TO_USE): |
|
|
all_proposed_subnets_data.append( |
|
|
{ |
|
|
"name": name, |
|
|
"cidr": PUBLIC_SUBNET_CIDR_BLOCKS[i], |
|
|
"az": PUBLIC_SUBNET_AVAILABILITY_ZONES[i], |
|
|
} |
|
|
) |
|
|
if private_ready_for_full_validation: |
|
|
for i, name in enumerate(PRIVATE_SUBNETS_TO_USE): |
|
|
all_proposed_subnets_data.append( |
|
|
{ |
|
|
"name": name, |
|
|
"cidr": PRIVATE_SUBNET_CIDR_BLOCKS[i], |
|
|
"az": PRIVATE_SUBNET_AVAILABILITY_ZONES[i], |
|
|
} |
|
|
) |
|
|
|
|
|
print(f"Target VPC ID for Boto3 lookup: {vpc_id}") |
|
|
|
|
|
|
|
|
try: |
|
|
existing_aws_subnets = _get_existing_subnets_in_vpc(vpc_id) |
|
|
except Exception as e: |
|
|
print(f"Failed to fetch existing VPC subnets. Aborting. Error: {e}") |
|
|
raise SystemExit(1) |
|
|
|
|
|
print("\n--- Running Name-Only Subnet Existence Check Mode ---") |
|
|
|
|
|
checked_public_subnets = {} |
|
|
if PUBLIC_SUBNETS_TO_USE: |
|
|
for subnet_name in PUBLIC_SUBNETS_TO_USE: |
|
|
print("subnet_name:", subnet_name) |
|
|
exists, subnet_id = check_subnet_exists_by_name( |
|
|
subnet_name, existing_aws_subnets |
|
|
) |
|
|
checked_public_subnets[subnet_name] = { |
|
|
"exists": exists, |
|
|
"id": subnet_id, |
|
|
} |
|
|
|
|
|
|
|
|
if checked_public_subnets[subnet_name]["exists"] is True: |
|
|
all_proposed_subnets_data = [ |
|
|
subnet |
|
|
for subnet in all_proposed_subnets_data |
|
|
if subnet["name"] != subnet_name |
|
|
] |
|
|
|
|
|
context_data["checked_public_subnets"] = checked_public_subnets |
|
|
|
|
|
checked_private_subnets = {} |
|
|
if PRIVATE_SUBNETS_TO_USE: |
|
|
for subnet_name in PRIVATE_SUBNETS_TO_USE: |
|
|
print("subnet_name:", subnet_name) |
|
|
exists, subnet_id = check_subnet_exists_by_name( |
|
|
subnet_name, existing_aws_subnets |
|
|
) |
|
|
checked_private_subnets[subnet_name] = { |
|
|
"exists": exists, |
|
|
"id": subnet_id, |
|
|
} |
|
|
|
|
|
|
|
|
if checked_private_subnets[subnet_name]["exists"] is True: |
|
|
all_proposed_subnets_data = [ |
|
|
subnet |
|
|
for subnet in all_proposed_subnets_data |
|
|
if subnet["name"] != subnet_name |
|
|
] |
|
|
|
|
|
context_data["checked_private_subnets"] = checked_private_subnets |
|
|
|
|
|
print("\nName-only existence subnet check complete.\n") |
|
|
|
|
|
if full_validation_mode: |
|
|
print( |
|
|
"\n--- Running in Full Subnet Validation Mode (CIDR/AZs provided) ---" |
|
|
) |
|
|
try: |
|
|
validate_subnet_creation_parameters( |
|
|
vpc_id, all_proposed_subnets_data, existing_aws_subnets |
|
|
) |
|
|
print("\nPre-synth validation successful. Proceeding with CDK synth.\n") |
|
|
|
|
|
|
|
|
context_data["public_subnets_to_create"] = [] |
|
|
if public_ready_for_full_validation: |
|
|
for i, name in enumerate(PUBLIC_SUBNETS_TO_USE): |
|
|
context_data["public_subnets_to_create"].append( |
|
|
{ |
|
|
"name": name, |
|
|
"cidr": PUBLIC_SUBNET_CIDR_BLOCKS[i], |
|
|
"az": PUBLIC_SUBNET_AVAILABILITY_ZONES[i], |
|
|
"is_public": True, |
|
|
} |
|
|
) |
|
|
context_data["private_subnets_to_create"] = [] |
|
|
if private_ready_for_full_validation: |
|
|
for i, name in enumerate(PRIVATE_SUBNETS_TO_USE): |
|
|
context_data["private_subnets_to_create"].append( |
|
|
{ |
|
|
"name": name, |
|
|
"cidr": PRIVATE_SUBNET_CIDR_BLOCKS[i], |
|
|
"az": PRIVATE_SUBNET_AVAILABILITY_ZONES[i], |
|
|
"is_public": False, |
|
|
} |
|
|
) |
|
|
|
|
|
except (ValueError, Exception) as e: |
|
|
print(f"\nFATAL ERROR: Subnet parameter validation failed: {e}\n") |
|
|
raise SystemExit(1) |
|
|
|
|
|
|
|
|
|
|
|
role_name = CODEBUILD_ROLE_NAME |
|
|
exists, _, _ = check_for_existing_role(role_name) |
|
|
context_data[f"exists:{role_name}"] = exists |
|
|
if exists: |
|
|
_, role_arn, _ = check_for_existing_role(role_name) |
|
|
context_data[f"arn:{role_name}"] = role_arn |
|
|
|
|
|
role_name = ECS_TASK_ROLE_NAME |
|
|
exists, _, _ = check_for_existing_role(role_name) |
|
|
context_data[f"exists:{role_name}"] = exists |
|
|
if exists: |
|
|
_, role_arn, _ = check_for_existing_role(role_name) |
|
|
context_data[f"arn:{role_name}"] = role_arn |
|
|
|
|
|
role_name = ECS_TASK_EXECUTION_ROLE_NAME |
|
|
exists, _, _ = check_for_existing_role(role_name) |
|
|
context_data[f"exists:{role_name}"] = exists |
|
|
if exists: |
|
|
_, role_arn, _ = check_for_existing_role(role_name) |
|
|
context_data[f"arn:{role_name}"] = role_arn |
|
|
|
|
|
|
|
|
bucket_name = S3_LOG_CONFIG_BUCKET_NAME |
|
|
exists, _ = check_s3_bucket_exists(bucket_name) |
|
|
context_data[f"exists:{bucket_name}"] = exists |
|
|
if exists: |
|
|
|
|
|
pass |
|
|
|
|
|
output_bucket_name = S3_OUTPUT_BUCKET_NAME |
|
|
exists, _ = check_s3_bucket_exists(output_bucket_name) |
|
|
context_data[f"exists:{output_bucket_name}"] = exists |
|
|
if exists: |
|
|
pass |
|
|
|
|
|
|
|
|
repo_name = ECR_CDK_REPO_NAME |
|
|
exists, _ = check_ecr_repo_exists(repo_name) |
|
|
context_data[f"exists:{repo_name}"] = exists |
|
|
if exists: |
|
|
pass |
|
|
|
|
|
|
|
|
project_name = CODEBUILD_PROJECT_NAME |
|
|
exists, _ = check_codebuild_project_exists(project_name) |
|
|
context_data[f"exists:{project_name}"] = exists |
|
|
if exists: |
|
|
|
|
|
_, project_arn = check_codebuild_project_exists( |
|
|
project_name |
|
|
) |
|
|
context_data[f"arn:{project_name}"] = project_arn |
|
|
|
|
|
|
|
|
alb_name = ALB_NAME |
|
|
exists, _ = check_alb_exists(alb_name, region_name=AWS_REGION) |
|
|
context_data[f"exists:{alb_name}"] = exists |
|
|
if exists: |
|
|
_, alb_object = check_alb_exists( |
|
|
alb_name, region_name=AWS_REGION |
|
|
) |
|
|
print("alb_object:", alb_object) |
|
|
context_data[f"arn:{alb_name}"] = alb_object["LoadBalancerArn"] |
|
|
|
|
|
|
|
|
user_pool_name = COGNITO_USER_POOL_NAME |
|
|
exists, user_pool_id, _ = check_for_existing_user_pool(user_pool_name) |
|
|
context_data[f"exists:{user_pool_name}"] = exists |
|
|
if exists: |
|
|
context_data[f"id:{user_pool_name}"] = user_pool_id |
|
|
|
|
|
|
|
|
if user_pool_id: |
|
|
user_pool_id_for_client_check = user_pool_id |
|
|
user_pool_client_name = COGNITO_USER_POOL_CLIENT_NAME |
|
|
if user_pool_id_for_client_check: |
|
|
exists, client_id, _ = check_for_existing_user_pool_client( |
|
|
user_pool_client_name, user_pool_id_for_client_check |
|
|
) |
|
|
context_data[f"exists:{user_pool_client_name}"] = exists |
|
|
if exists: |
|
|
context_data[f"id:{user_pool_client_name}"] = client_id |
|
|
|
|
|
|
|
|
secret_name = COGNITO_USER_POOL_CLIENT_SECRET_NAME |
|
|
exists, _ = check_for_secret(secret_name) |
|
|
context_data[f"exists:{secret_name}"] = exists |
|
|
|
|
|
|
|
|
|
|
|
web_acl_name = WEB_ACL_NAME |
|
|
exists, _ = check_web_acl_exists( |
|
|
web_acl_name, scope="CLOUDFRONT" |
|
|
) |
|
|
context_data[f"exists:{web_acl_name}"] = exists |
|
|
if exists: |
|
|
_, existing_web_acl = check_web_acl_exists(web_acl_name, scope="CLOUDFRONT") |
|
|
context_data[f"arn:{web_acl_name}"] = existing_web_acl.attr_arn |
|
|
|
|
|
|
|
|
with open(CONTEXT_FILE, "w") as f: |
|
|
json.dump(context_data, f, indent=2) |
|
|
|
|
|
print(f"Context data written to {CONTEXT_FILE}") |
|
|
|