File size: 15,738 Bytes
d864d45
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
import json
import os
from typing import Any, Dict, List

from cdk_config import (  # Import necessary config
    ALB_NAME,
    AWS_REGION,
    CDK_CONFIG_PATH,
    CDK_FOLDER,
    CODEBUILD_PROJECT_NAME,
    CODEBUILD_ROLE_NAME,
    COGNITO_USER_POOL_CLIENT_NAME,
    COGNITO_USER_POOL_CLIENT_SECRET_NAME,
    COGNITO_USER_POOL_NAME,
    CONTEXT_FILE,
    ECR_CDK_REPO_NAME,
    ECS_TASK_EXECUTION_ROLE_NAME,
    ECS_TASK_ROLE_NAME,
    PRIVATE_SUBNET_AVAILABILITY_ZONES,
    PRIVATE_SUBNET_CIDR_BLOCKS,
    PRIVATE_SUBNETS_TO_USE,
    PUBLIC_SUBNET_AVAILABILITY_ZONES,
    PUBLIC_SUBNET_CIDR_BLOCKS,
    PUBLIC_SUBNETS_TO_USE,
    S3_LOG_CONFIG_BUCKET_NAME,
    S3_OUTPUT_BUCKET_NAME,
    VPC_NAME,
    WEB_ACL_NAME,
)
from cdk_functions import (  # Import your check functions (assuming they use Boto3)
    _get_existing_subnets_in_vpc,
    check_alb_exists,
    check_codebuild_project_exists,
    check_ecr_repo_exists,
    check_for_existing_role,
    check_for_existing_user_pool,
    check_for_existing_user_pool_client,
    check_for_secret,
    check_s3_bucket_exists,
    check_subnet_exists_by_name,
    check_web_acl_exists,
    get_vpc_id_by_name,
    validate_subnet_creation_parameters,
    # Add other check functions as needed
)

cdk_folder = CDK_FOLDER  # <FULL_PATH_TO_CDK_FOLDER_HERE>

# Full path needed to find config file
os.environ["CDK_CONFIG_PATH"] = cdk_folder + CDK_CONFIG_PATH


# --- Helper to parse environment variables into lists ---
def _get_env_list(env_var_name: str) -> List[str]:
    """Parses a comma-separated environment variable into a list of strings."""
    value = env_var_name[1:-1].strip().replace('"', "").replace("'", "")
    if not value:
        return []
    # Split by comma and filter out any empty strings that might result from extra commas
    return [s.strip() for s in value.split(",") if s.strip()]


if PUBLIC_SUBNETS_TO_USE and not isinstance(PUBLIC_SUBNETS_TO_USE, list):
    PUBLIC_SUBNETS_TO_USE = _get_env_list(PUBLIC_SUBNETS_TO_USE)
if PRIVATE_SUBNETS_TO_USE and not isinstance(PRIVATE_SUBNETS_TO_USE, list):
    PRIVATE_SUBNETS_TO_USE = _get_env_list(PRIVATE_SUBNETS_TO_USE)
if PUBLIC_SUBNET_CIDR_BLOCKS and not isinstance(PUBLIC_SUBNET_CIDR_BLOCKS, list):
    PUBLIC_SUBNET_CIDR_BLOCKS = _get_env_list(PUBLIC_SUBNET_CIDR_BLOCKS)
if PUBLIC_SUBNET_AVAILABILITY_ZONES and not isinstance(
    PUBLIC_SUBNET_AVAILABILITY_ZONES, list
):
    PUBLIC_SUBNET_AVAILABILITY_ZONES = _get_env_list(PUBLIC_SUBNET_AVAILABILITY_ZONES)
if PRIVATE_SUBNET_CIDR_BLOCKS and not isinstance(PRIVATE_SUBNET_CIDR_BLOCKS, list):
    PRIVATE_SUBNET_CIDR_BLOCKS = _get_env_list(PRIVATE_SUBNET_CIDR_BLOCKS)
if PRIVATE_SUBNET_AVAILABILITY_ZONES and not isinstance(
    PRIVATE_SUBNET_AVAILABILITY_ZONES, list
):
    PRIVATE_SUBNET_AVAILABILITY_ZONES = _get_env_list(PRIVATE_SUBNET_AVAILABILITY_ZONES)

# Check for the existence of elements in your AWS environment to see if it's necessary to create new versions of the same


def check_and_set_context():
    context_data = {}

    # --- Find the VPC ID first ---
    if VPC_NAME:
        print("VPC_NAME:", VPC_NAME)
        vpc_id, nat_gateways = get_vpc_id_by_name(VPC_NAME)

        # If you expect only one, or one per AZ and you're creating one per AZ in CDK:
        if nat_gateways:
            # For simplicity, let's just check if *any* NAT exists in the VPC
            # A more robust check would match by subnet, AZ, or a specific tag.
            context_data["exists:NatGateway"] = True
            context_data["id:NatGateway"] = nat_gateways[0][
                "NatGatewayId"
            ]  # Store the ID of the first one found
        else:
            context_data["exists:NatGateway"] = False
            context_data["id:NatGateway"] = None

        if not vpc_id:
            # If the VPC doesn't exist, you might not be able to check/create subnets.
            # Decide how to handle this: raise an error, set a flag, etc.
            raise RuntimeError(
                f"Required VPC '{VPC_NAME}' not found. Cannot proceed with subnet checks."
            )

        context_data["vpc_id"] = vpc_id  # Store VPC ID in context

        # SUBNET CHECKS
        context_data: Dict[str, Any] = {}
        all_proposed_subnets_data: List[Dict[str, str]] = []

        # Flag to indicate if full validation mode (with CIDR/AZs) is active
        full_validation_mode = False

        # Determine if full validation mode is possible/desired
        # It's 'desired' if CIDR/AZs are provided, and their lengths match the name lists.
        public_ready_for_full_validation = (
            len(PUBLIC_SUBNETS_TO_USE) > 0
            and len(PUBLIC_SUBNET_CIDR_BLOCKS) == len(PUBLIC_SUBNETS_TO_USE)
            and len(PUBLIC_SUBNET_AVAILABILITY_ZONES) == len(PUBLIC_SUBNETS_TO_USE)
        )
        private_ready_for_full_validation = (
            len(PRIVATE_SUBNETS_TO_USE) > 0
            and len(PRIVATE_SUBNET_CIDR_BLOCKS) == len(PRIVATE_SUBNETS_TO_USE)
            and len(PRIVATE_SUBNET_AVAILABILITY_ZONES) == len(PRIVATE_SUBNETS_TO_USE)
        )

        # Activate full validation if *any* type of subnet (public or private) has its full details provided.
        # You might adjust this logic if you require ALL subnet types to have CIDRs, or NONE.
        if public_ready_for_full_validation or private_ready_for_full_validation:
            full_validation_mode = True

            # If some are ready but others aren't, print a warning or raise an error based on your strictness
            if (
                public_ready_for_full_validation
                and not private_ready_for_full_validation
                and PRIVATE_SUBNETS_TO_USE
            ):
                print(
                    "Warning: Public subnets have CIDRs/AZs, but private subnets do not. Only public will be fully validated/created with CIDRs."
                )
            if (
                private_ready_for_full_validation
                and not public_ready_for_full_validation
                and PUBLIC_SUBNETS_TO_USE
            ):
                print(
                    "Warning: Private subnets have CIDRs/AZs, but public subnets do not. Only private will be fully validated/created with CIDRs."
                )

            # Prepare data for validate_subnet_creation_parameters for all subnets that have full details
            if public_ready_for_full_validation:
                for i, name in enumerate(PUBLIC_SUBNETS_TO_USE):
                    all_proposed_subnets_data.append(
                        {
                            "name": name,
                            "cidr": PUBLIC_SUBNET_CIDR_BLOCKS[i],
                            "az": PUBLIC_SUBNET_AVAILABILITY_ZONES[i],
                        }
                    )
            if private_ready_for_full_validation:
                for i, name in enumerate(PRIVATE_SUBNETS_TO_USE):
                    all_proposed_subnets_data.append(
                        {
                            "name": name,
                            "cidr": PRIVATE_SUBNET_CIDR_BLOCKS[i],
                            "az": PRIVATE_SUBNET_AVAILABILITY_ZONES[i],
                        }
                    )

        print(f"Target VPC ID for Boto3 lookup: {vpc_id}")

        # Fetch all existing subnets in the target VPC once to avoid repeated API calls
        try:
            existing_aws_subnets = _get_existing_subnets_in_vpc(vpc_id)
        except Exception as e:
            print(f"Failed to fetch existing VPC subnets. Aborting. Error: {e}")
            raise SystemExit(1)  # Exit immediately if we can't get baseline data

        print("\n--- Running Name-Only Subnet Existence Check Mode ---")
        # Fallback: check only by name using the existing data
        checked_public_subnets = {}
        if PUBLIC_SUBNETS_TO_USE:
            for subnet_name in PUBLIC_SUBNETS_TO_USE:
                print("subnet_name:", subnet_name)
                exists, subnet_id = check_subnet_exists_by_name(
                    subnet_name, existing_aws_subnets
                )
                checked_public_subnets[subnet_name] = {
                    "exists": exists,
                    "id": subnet_id,
                }

                # If the subnet exists, remove it from the proposed subnets list
                if checked_public_subnets[subnet_name]["exists"] is True:
                    all_proposed_subnets_data = [
                        subnet
                        for subnet in all_proposed_subnets_data
                        if subnet["name"] != subnet_name
                    ]

        context_data["checked_public_subnets"] = checked_public_subnets

        checked_private_subnets = {}
        if PRIVATE_SUBNETS_TO_USE:
            for subnet_name in PRIVATE_SUBNETS_TO_USE:
                print("subnet_name:", subnet_name)
                exists, subnet_id = check_subnet_exists_by_name(
                    subnet_name, existing_aws_subnets
                )
                checked_private_subnets[subnet_name] = {
                    "exists": exists,
                    "id": subnet_id,
                }

                # If the subnet exists, remove it from the proposed subnets list
                if checked_private_subnets[subnet_name]["exists"] is True:
                    all_proposed_subnets_data = [
                        subnet
                        for subnet in all_proposed_subnets_data
                        if subnet["name"] != subnet_name
                    ]

        context_data["checked_private_subnets"] = checked_private_subnets

        print("\nName-only existence subnet check complete.\n")

        if full_validation_mode:
            print(
                "\n--- Running in Full Subnet Validation Mode (CIDR/AZs provided) ---"
            )
            try:
                validate_subnet_creation_parameters(
                    vpc_id, all_proposed_subnets_data, existing_aws_subnets
                )
                print("\nPre-synth validation successful. Proceeding with CDK synth.\n")

                # Populate context_data for downstream CDK construct creation
                context_data["public_subnets_to_create"] = []
                if public_ready_for_full_validation:
                    for i, name in enumerate(PUBLIC_SUBNETS_TO_USE):
                        context_data["public_subnets_to_create"].append(
                            {
                                "name": name,
                                "cidr": PUBLIC_SUBNET_CIDR_BLOCKS[i],
                                "az": PUBLIC_SUBNET_AVAILABILITY_ZONES[i],
                                "is_public": True,
                            }
                        )
                context_data["private_subnets_to_create"] = []
                if private_ready_for_full_validation:
                    for i, name in enumerate(PRIVATE_SUBNETS_TO_USE):
                        context_data["private_subnets_to_create"].append(
                            {
                                "name": name,
                                "cidr": PRIVATE_SUBNET_CIDR_BLOCKS[i],
                                "az": PRIVATE_SUBNET_AVAILABILITY_ZONES[i],
                                "is_public": False,
                            }
                        )

            except (ValueError, Exception) as e:
                print(f"\nFATAL ERROR: Subnet parameter validation failed: {e}\n")
                raise SystemExit(1)  # Exit if validation fails

    # Example checks and setting context values
    # IAM Roles
    role_name = CODEBUILD_ROLE_NAME
    exists, _, _ = check_for_existing_role(role_name)
    context_data[f"exists:{role_name}"] = exists  # Use boolean
    if exists:
        _, role_arn, _ = check_for_existing_role(role_name)  # Get ARN if needed
        context_data[f"arn:{role_name}"] = role_arn

    role_name = ECS_TASK_ROLE_NAME
    exists, _, _ = check_for_existing_role(role_name)
    context_data[f"exists:{role_name}"] = exists
    if exists:
        _, role_arn, _ = check_for_existing_role(role_name)
        context_data[f"arn:{role_name}"] = role_arn

    role_name = ECS_TASK_EXECUTION_ROLE_NAME
    exists, _, _ = check_for_existing_role(role_name)
    context_data[f"exists:{role_name}"] = exists
    if exists:
        _, role_arn, _ = check_for_existing_role(role_name)
        context_data[f"arn:{role_name}"] = role_arn

    # S3 Buckets
    bucket_name = S3_LOG_CONFIG_BUCKET_NAME
    exists, _ = check_s3_bucket_exists(bucket_name)
    context_data[f"exists:{bucket_name}"] = exists
    if exists:
        # You might not need the ARN if using from_bucket_name
        pass

    output_bucket_name = S3_OUTPUT_BUCKET_NAME
    exists, _ = check_s3_bucket_exists(output_bucket_name)
    context_data[f"exists:{output_bucket_name}"] = exists
    if exists:
        pass

    # ECR Repository
    repo_name = ECR_CDK_REPO_NAME
    exists, _ = check_ecr_repo_exists(repo_name)
    context_data[f"exists:{repo_name}"] = exists
    if exists:
        pass  # from_repository_name is sufficient

    # CodeBuild Project
    project_name = CODEBUILD_PROJECT_NAME
    exists, _ = check_codebuild_project_exists(project_name)
    context_data[f"exists:{project_name}"] = exists
    if exists:
        # Need a way to get the ARN from the check function
        _, project_arn = check_codebuild_project_exists(
            project_name
        )  # Assuming it returns ARN
        context_data[f"arn:{project_name}"] = project_arn

    # ALB (by name lookup)
    alb_name = ALB_NAME
    exists, _ = check_alb_exists(alb_name, region_name=AWS_REGION)
    context_data[f"exists:{alb_name}"] = exists
    if exists:
        _, alb_object = check_alb_exists(
            alb_name, region_name=AWS_REGION
        )  # Assuming check returns object
        print("alb_object:", alb_object)
        context_data[f"arn:{alb_name}"] = alb_object["LoadBalancerArn"]

    # Cognito User Pool (by name)
    user_pool_name = COGNITO_USER_POOL_NAME
    exists, user_pool_id, _ = check_for_existing_user_pool(user_pool_name)
    context_data[f"exists:{user_pool_name}"] = exists
    if exists:
        context_data[f"id:{user_pool_name}"] = user_pool_id

    # Cognito User Pool Client (by name and pool ID) - requires User Pool ID from check
    if user_pool_id:
        user_pool_id_for_client_check = user_pool_id  # context_data.get(f"id:{user_pool_name}") # Use ID from context
        user_pool_client_name = COGNITO_USER_POOL_CLIENT_NAME
        if user_pool_id_for_client_check:
            exists, client_id, _ = check_for_existing_user_pool_client(
                user_pool_client_name, user_pool_id_for_client_check
            )
            context_data[f"exists:{user_pool_client_name}"] = exists
            if exists:
                context_data[f"id:{user_pool_client_name}"] = client_id

    # Secrets Manager Secret (by name)
    secret_name = COGNITO_USER_POOL_CLIENT_SECRET_NAME
    exists, _ = check_for_secret(secret_name)
    context_data[f"exists:{secret_name}"] = exists
    # You might not need the ARN if using from_secret_name_v2

    # WAF Web ACL (by name and scope)
    web_acl_name = WEB_ACL_NAME
    exists, _ = check_web_acl_exists(
        web_acl_name, scope="CLOUDFRONT"
    )  # Assuming check returns object
    context_data[f"exists:{web_acl_name}"] = exists
    if exists:
        _, existing_web_acl = check_web_acl_exists(web_acl_name, scope="CLOUDFRONT")
        context_data[f"arn:{web_acl_name}"] = existing_web_acl.attr_arn

    # Write the context data to the file
    with open(CONTEXT_FILE, "w") as f:
        json.dump(context_data, f, indent=2)

    print(f"Context data written to {CONTEXT_FILE}")