KinesisStream
caution
This is the SST v0.x Constructs doc. SST v1 is now released. If you are using v1, see the v1 Constructs doc. If you are looking to upgrade to v1, check out the migration steps.
The KinesisStream
construct is a higher level CDK construct that makes it easy to create a Kinesis Data Stream. You can create a stream and add a list of consumers to it.
This construct makes it easy to define a stream and its consumers. It also internally connects the consumers and the stream together.
Initializer
new KinesisStream(scope: Construct, id: string, props: KinesisStreamProps)
Parameters
- scope
Construct
- id
string
- props
KinesisStreamProps
Examples
Using the minimal config
import { KinesisStream } from "@serverless-stack/resources";
new KinesisStream(this, "Stream", {
consumers: {
consumer1: "src/consumer1.main",
consumer2: "src/consumer2.main",
}
});
Adding consumers
Add consumers after the stream has been created.
const stream = new KinesisStream(this, "Stream", {
consumers: {
consumer1: "src/consumer1.main",
consumer2: "src/consumer2.main",
}
});
stream.addConsumers(this, {
consumer3: "src/consumer3.main",
});
Lazily adding consumers
Create an empty stream and lazily add the consumers.
const stream = new KinesisStream(this, "Stream");
stream.addConsumers(this, {
consumer1: "src/consumer1.main",
consumer2: "src/consumer2.main",
});
Specifying function props for all the consumers
You can extend the minimal config, to set some function props and have them apply to all the consumers.
new KinesisStream(this, "Stream", {
defaultFunctionProps: {
timeout: 20,
environment: { tableName: table.tableName },
permissions: [table],
},
consumers: {
consumer1: "src/consumer1.main",
consumer2: "src/consumer2.main",
}
});
Using the full config
If you wanted to configure each Lambda function separately, you can pass in the KinesisStreamConsumerProps
.
new KinesisStream(this, "Stream", {
consumers: {
consumer1: {
function: {
handler: "src/consumer1.main",
timeout: 10,
environment: { tableName: table.tableName },
permissions: [table],
},
}
},
});
Note that, you can set the defaultFunctionProps
while using the function
per consumer. The function
will just override the defaultFunctionProps
. Except for the environment
, the layers
, and the permissions
properties, that will be merged.
new KinesisStream(this, "Stream", {
defaultFunctionProps: {
timeout: 20,
environment: { tableName: table.tableName },
permissions: [table],
},
consumers: {
consumer1: {
function: {
handler: "src/consumer1.main",
timeout: 10,
environment: { bucketName: bucket.bucketName },
permissions: [bucket],
},
},
consumer2: "src/consumer2.main",
},
});
So in the above example, the consumer1
function doesn't use the timeout
that is set in the defaultFunctionProps
. It'll instead use the one that is defined in the function definition (10 seconds
). And the function will have both the tableName
and the bucketName
environment variables set; as well as permissions to both the table
and the bucket
.
Giving the consumers some permissions
Allow the consumer functions to access S3.
const stream = new KinesisStream(this, "Stream", {
consumers: {
consumer1: "src/consumer1.main",
consumer2: "src/consumer2.main",
}
});
stream.attachPermissions(["s3"]);
Giving a specific consumers some permissions
Allow a specific consumer function to access S3.
const stream = new KinesisStream(this, "Stream", {
consumers: {
consumer1: "src/consumer1.main",
consumer2: "src/consumer2.main",
}
});
stream.attachPermissionsToConsumer("consumer1", ["s3"]);
Configuring the Kinesis stream
Configure the internally created CDK Stream
instance.
new KinesisStream(this, "Stream", {
consumers: {
consumer1: "src/consumer1.main",
consumer2: "src/consumer2.main",
}
kinesisStream: {
shardCount: 3,
},
});
Configuring a consumer
Configure the internally created CDK Event Source.
import { StartingPosition } from "aws-cdk-lib/aws-lambda";
new KinesisStream(this, "Stream", {
consumers: {
consumer1: {
function: "src/consumer1.main",
consumerProps: {
startingPosition: StartingPosition.LATEST,
},
},
}
});
Importing an existing stream
Override the internally created CDK Stream
instance.
import { Stream } from "aws-cdk-lib/aws-kinesis";
new KinesisStream(this, "Stream", {
kinesisStream: Stream.fromStreamArn(this, "ImportedStream", streamArn),
});
Properties
An instance of KinesisStream
contains the following properties.
streamArn
Type: string
The ARN of the internally created CDK Stream
instance.
streamName
Type: string
The name of the internally created CDK Stream
instance.
kinesisStream
Type : cdk.aws-kinesis.Stream
The internally created CDK Stream
instance.
Methods
An instance of KinesisStream
contains the following methods.
getFunction
getFunction(consumerName: string): Function
Parameters
- consumerName
string
Returns
Get the instance of the internally created Function
, for a given consumer. Where the consumerName
is the name used to define a consumer.
addConsumers
addConsumers(scope: cdk.Construct, consumers: { [consumerName: string]: FunctionDefinition | KinesisStreamConsumerProps })
Parameters
- scope
cdk.Construct
- consumers
{ [consumerName: string]: FunctionDefinition | KinesisStreamConsumerProps }
An associative array with the consumer name being a string and the value is either a FunctionDefinition
or the KinesisStreamConsumerProps
.
attachPermissions
attachPermissions(permissions: Permissions)
Parameters
- permissions
Permissions
Attaches the given list of permissions to all the consumer functions. This allows the consumers to access other AWS resources.
Internally calls Function.attachPermissions
.
attachPermissionsToConsumer
attachPermissionsToConsumer(consumerName: string, permissions: Permissions)
Parameters
consumerName
string
permissions
Permissions
Attaches the given list of permissions to a specific consumer. This allows that function to access other AWS resources.
Internally calls Function.attachPermissions
.
KinesisStreamProps
consumers?
Type : { [consumerName: string]: FunctionDefinition | KinesisStreamConsumerProps }
, defaults to {}
The consumers for this stream. Takes an associative array, with the consumer name being a string and the value is either a FunctionDefinition
or the KinesisStreamConsumerProps
.
caution
You should not change the name of a consumer.
Note, if the consumerName
is changed, CloudFormation will remove the existing consumer and create a new one. If the starting point is set to TRIM_HORIZON
, all the historical records available in the stream will be resent to the new consumer.
kinesisStream?
Type : cdk.aws-kinesis.Stream | cdk.aws-kinesis.StreamProps
, defaults to undefined
Or optionally pass in a CDK cdk.aws-kinesis.StreamProps
instance or a cdk.aws-kinesis.Stream
instance. This allows you to override the default settings this construct uses internally to create the stream.
defaultFunctionProps?
Type : FunctionProps
, defaults to {}
The default function props to be applied to all the Lambda functions in the Stream. If the function
is specified for a consumer, these default values are overridden. Except for the environment
, the layers
, and the permissions
properties, that will be merged.
KinesisStreamConsumerProps
function
Type : FunctionDefinition
A FunctionDefinition
object that'll be used to create the consumer function for the stream.
consumerProps?
Type : cdk.aws-lambda-event-sources.lambdaEventSources.KinesisEventSourceProps
, defaults to KinesisEventSourceProps
with starting point set to LATEST
.
Or optionally pass in a CDK KinesisEventSourceProps
. This allows you to override the default settings this construct uses internally to create the consumer.