KinesisStream
The KinesisStream
construct is a higher level CDK construct that makes it easy to create a Kinesis Data Stream. You can create a stream and add a list of consumers to it.
This construct makes it easy to define a stream and its consumers. It also internally connects the consumers and the stream together.
Examples
Using the minimal config
import { KinesisStream } from "sst/constructs";
new KinesisStream(stack, "Stream", {
consumers: {
myConsumer: "src/lambda.main",
},
});
Configuring consumers
Lazily adding consumers
Add consumers after the stream has been created.
const stream = new KinesisStream(stack, "Stream", {
consumers: {
consumer1: "src/consumer1.main",
consumer2: "src/consumer2.main",
},
});
stream.addConsumers(stack, {
consumer3: "src/consumer3.main",
});
Specifying function props for all the consumers
You can extend the minimal config, to set some function props and have them apply to all the consumers.
new KinesisStream(stack, "Stream", {
defaults: {
function: {
timeout: 20,
environment: { tableName: table.tableName },
permissions: [table],
},
},
consumers: {
consumer1: "src/consumer1.main",
consumer2: "src/consumer2.main",
},
});
Configuring an individual consumer
Configure each Lambda function separately.
new KinesisStream(stack, "Stream", {
consumers: {
consumer1: {
function: {
handler: "src/consumer1.main",
timeout: 10,
environment: { tableName: table.tableName },
permissions: [table],
},
},
},
});
Note that, you can set the defaults.function
while using the function
per consumer. The function
will just override the defaults.function
. Except for the environment
, the layers
, and the permissions
properties, that will be merged.
new KinesisStream(stack, "Stream", {
defaults: {
function: {
timeout: 20,
environment: { tableName: table.tableName },
permissions: [table],
},
},
consumers: {
consumer1: {
function: {
handler: "src/consumer1.main",
timeout: 10,
environment: { bucketName: bucket.bucketName },
permissions: [bucket],
},
},
consumer2: "src/consumer2.main",
},
});
So in the above example, the consumer1
function doesn't use the timeout
that is set in the defaults.function
. It'll instead use the one that is defined in the function definition (10 seconds
). And the function will have both the tableName
and the bucketName
environment variables set; as well as permissions to both the table
and the bucket
.
Configuring consumer event source
Configure the internally created CDK Event Source.
import { StartingPosition } from "aws-cdk-lib/aws-lambda";
new KinesisStream(stack, "Stream", {
consumers: {
consumer1: {
function: "src/consumer1.main",
cdk: {
eventSource: {
startingPosition: StartingPosition.LATEST,
},
},
},
},
});
Giving the consumers some permissions
Allow the consumer functions to access S3.
const stream = new KinesisStream(stack, "Stream", {
consumers: {
consumer1: "src/consumer1.main",
consumer2: "src/consumer2.main",
},
});
stream.attachPermissions(["s3"]);
Giving a specific consumers some permissions
Allow a specific consumer function to access S3.
const stream = new KinesisStream(stack, "Stream", {
consumers: {
consumer1: "src/consumer1.main",
consumer2: "src/consumer2.main",
},
});
stream.attachPermissionsToConsumer("consumer1", ["s3"]);
Advanced examples
Configuring the Kinesis stream
Configure the internally created CDK Stream
instance.
new KinesisStream(stack, "Stream", {
consumers: {
consumer1: "src/consumer1.main",
consumer2: "src/consumer2.main",
}
cdk: {
stream: {
shardCount: 3,
}
},
});
Importing an existing stream
Override the internally created CDK Stream
instance.
import { Stream } from "aws-cdk-lib/aws-kinesis";
new KinesisStream(stack, "Stream", {
cdk: {
stream: Stream.fromStreamArn(stack, "ImportedStream", streamArn),
},
});
Constructor
new KinesisStream(scope, id, props)
Parameters
- scope Construct
- id string
- props KinesisStreamProps
KinesisStreamProps
consumers?
Type : Record<string, string | Function | KinesisStreamConsumerProps>
Define the function consumers for this stream
new KinesisStream(stack, "Stream", {
consumers: {
consumer1: "src/consumer1.main",
consumer2: {
function: {
handler: "src/consumer2.handler",
timeout: 30
}
}
}
});
defaults?
Type :
defaults.function?
Type : FunctionProps
The default function props to be applied to all the Lambda functions in the API. The
environment
,
permissions
and
layers
properties will be merged with per route definitions if they are defined.
new KinesisStream(stack, "Stream", {
defaults: {
function: {
timeout: 20,
}
}
});
cdk?
Type :
cdk.id?
Type : string
Allows you to override default id for this construct.
cdk.stream?
Type : IStream | StreamProps
Override the internally created Kinesis Stream
new KinesisStream(stack, "Stream", {
cdk: {
stream: {
streamName: "my-stream",
}
}
});
Properties
An instance of KinesisStream
has the following properties.
id
Type : string
streamArn
Type : string
The ARN of the internally created Kinesis Stream
streamName
Type : string
The name of the internally created Kinesis Stream
cdk
Type :
cdk.stream
Type : IStream
Return internally created Kinesis Stream
Methods
An instance of KinesisStream
has the following methods.
addConsumers
addConsumers(scope, consumers)
Parameters
- scope Construct
- consumers
Add consumers to a stream after creating it
stream.addConsumers(stack, {
consumer1: "src/function.handler"
})
attachPermissions
attachPermissions(permissions)
Parameters
- permissions Permissions
Attaches the given list of permissions to all the consumers. This allows the functions to access other AWS resources.
stream.attachPermissions(["s3"]);
attachPermissionsToConsumer
attachPermissionsToConsumer(consumerName, permissions)
Parameters
- consumerName string
- permissions Permissions
Attaches the given list of permissions to a specific consumer. This allows that function to access other AWS resources.
stream.attachPermissionsToConsumer("consumer1", ["s3"]);
bind
bind(constructs)
Parameters
- constructs Array<BindingResource>
Binds the given list of resources to all the consumers.
stream.bind([STRIPE_KEY, bucket]]);
bindToConsumer
bindToConsumer(consumerName, constructs)
Parameters
- consumerName string
- constructs Array<BindingResource>
Binds the given list of resources to a specific consumer.
stream.bindToConsumer("consumer1", [STRIPE_KEY, bucket]);
getFunction
getFunction(consumerName)
Parameters
- consumerName string
Get the function for a specific consumer
stream.getFunction("consumer1");
KinesisStreamConsumerProps
Used to define the function consumer for the stream
function
Type : string | Function | FunctionProps
The function definition
new KinesisStream(stack, "Stream", {
consumers: {
consumer1: {
function: {
handler: "src/consumer1.handler",
timeout: 30
}
}
}
});
cdk?
Type :
cdk.eventSource?
Type : KinesisEventSourceProps
Override the interally created event source
new KinesisStream(stack, "Stream", {
consumers: {
fun: {
cdk: {
eventSource: {
enabled: false
}
}
}
}
});