AWS CDK Kinesis Analytics Flink Alpha
This is an experimental (alpha) AWS CDK Construct Library for defining Kinesis Data Analytics Flink applications. It simplifies the setup of Flink applications, including code location, runtime versions, and IAM roles. As part of the AWS CDK, it receives frequent updates, typically aligning with the CDK's bi-weekly release cycle. The current version is 2.250.0a0.
Common errors
-
ModuleNotFoundError: No module named 'aws_cdk.aws_kinesisanalytics_flink_alpha'
cause The Python package `aws-cdk-aws-kinesisanalytics-flink-alpha` is not installed in your current environment.fixRun `pip install aws-cdk-aws-kinesisanalytics-flink-alpha` to install the construct library. You may also need `aws-cdk-lib` and `constructs`. -
jsii.errors.JavaScriptError: The 'serviceExecutionRole' property is required for Flink Application. (or similar error indicating a missing required property)
cause A mandatory property like `service_execution_role` or `code` was omitted or passed an invalid value when instantiating `flink.Application`.fixEnsure all required properties are provided with valid CDK constructs or values. For `serviceExecutionRole`, create an `iam.Role` with `kinesisanalytics.amazonaws.com` as its trusted entity. For `code`, use `flink.ApplicationCode.from_bucket(...)`. -
The Kinesis Data Analytics application 'MyCDKFlinkApp' failed to deploy. The provided application code bucket or key does not exist or is inaccessible.
cause The S3 bucket or object key specified in `ApplicationCode.from_bucket` either does not exist, or the `service_execution_role` lacks `s3:GetObject` permissions for that location.fix1. Verify your Flink application JAR/ZIP is uploaded to the exact S3 bucket and key specified in your CDK code. 2. Ensure the `service_execution_role` has `s3:GetObject` permission on the specific bucket and key (or the entire bucket if appropriate). -
jsii.errors.JavaScriptError: Flink runtime version must be one of: FLINK_1_11, FLINK_1_13, FLINK_1_15. (or similar for invalid runtime)
cause An invalid or unsupported Flink runtime version was specified for the `runtime` property.fixUse a valid `flink.FlinkRuntime` enum member, such as `flink.FlinkRuntime.FLINK_1_15`. Check the latest CDK documentation for supported versions if you encounter issues.
Warnings
- breaking This is an ALPHA construct. Its API is experimental and subject to breaking changes without prior notice. Use in production environments is not recommended without thorough testing.
- gotcha AWS CDK applications require your AWS environment to be bootstrapped before deployment. Deploying without bootstrapping will result in errors such as 'No cdk bootstrap stack found'.
- gotcha The Flink application code (JAR or ZIP file) referenced by `ApplicationCode.from_bucket` must be pre-uploaded to the specified S3 location *before* the CDK deployment. CDK does not handle the uploading of your Flink application bundle itself.
- gotcha The Kinesis Data Analytics service execution role requires specific IAM permissions to access input/output sources (e.g., Kinesis streams, S3 buckets), the application code bucket, and CloudWatch Logs. Insufficient permissions will lead to deployment failures or runtime errors within KDA.
Install
-
pip install aws-cdk-aws-kinesisanalytics-flink-alpha aws-cdk-lib constructs
Imports
- Application
from aws_cdk.aws_kinesisanalytics_flink_alpha import Application
from aws_cdk import aws_kinesisanalytics_flink_alpha as flink # then flink.Application(...)
- FlinkRuntime
from aws_cdk import aws_kinesisanalytics_flink_alpha as flink # then flink.FlinkRuntime.FLINK_1_15
- ApplicationCode
from aws_cdk import aws_kinesisanalytics_flink_alpha as flink # then flink.ApplicationCode.from_bucket(...) or flink.ApplicationCode.from_asset(...)
Quickstart
import os
from aws_cdk import (
Stack,
aws_kinesisanalytics_flink_alpha as flink,
aws_s3 as s3,
aws_iam as iam,
)
from constructs import Construct
class FlinkApplicationStack(Stack):
def __init__(self, scope: Construct, id: str, **kwargs):
super().__init__(scope, id, **kwargs)
# 1. Create an S3 bucket for your Flink application JAR/ZIP
# (CDK will deploy this bucket for you)
app_code_bucket = s3.Bucket(self, "FlinkAppCodeBucket")
# 2. Define the path to your Flink application code in S3
# IMPORTANT: Your Flink application JAR/ZIP must be uploaded to this path BEFORE cdk deploy.
flink_app_jar_key = os.environ.get('FLINK_APP_JAR_KEY', 'my-flink-app.jar') # e.g., 'path/to/my-app.jar'
# 3. Create an IAM role for Kinesis Data Analytics service execution
# This role grants KDA permissions to access resources like S3, Kinesis streams, etc.
flink_service_role = iam.Role(self, "FlinkServiceRole",
assumed_by=iam.ServicePrincipal("kinesisanalytics.amazonaws.com"),
description="IAM role for Kinesis Data Analytics Flink application"
)
# Grant read access to the application code bucket
app_code_bucket.grant_read(flink_service_role)
# 4. Define the Kinesis Data Analytics Flink Application
flink_application = flink.Application(self, "MyFlinkApplication",
application_name="MyCDKFlinkApp",
code=flink.ApplicationCode.from_bucket(app_code_bucket, flink_app_jar_key),
runtime=flink.FlinkRuntime.FLINK_1_15, # Choose your Flink runtime version: FLINK_1_11, FLINK_1_13, FLINK_1_15
service_execution_role=flink_service_role,
# Optional: Add environment variables, monitoring, logging, etc.
# application_properties=[flink.ApplicationProperty(
# property_group_id='kda.flink.run.options',
# properties={'parallelism.default': '2'})
# ]
)
# To deploy this, typically you'd put it in a file like 'app.py'
# from aws_cdk import App
# app = App()
# FlinkApplicationStack(app, "MyFlinkApplicationStack",
# env={'region': os.environ.get('CDK_DEFAULT_REGION'), 'account': os.environ.get('CDK_DEFAULT_ACCOUNT')}
# )
# app.synth()