KinesisStream
caution
This is the SST v1.x Constructs doc. SST v2 is now released. If you are using v2, see the v2 Constructs doc. If you are looking to upgrade to v2, check out the upgrade steps.
The KinesisStream
construct is a higher level CDK construct that makes it easy to create a Kinesis Data Stream. You can create a stream and add a list of consumers to it.
This construct makes it easy to define a stream and its consumers. It also internally connects the consumers and the stream together.
Examples
Using the minimal config
import { KinesisStream } from "@serverless-stack/resources";
new KinesisStream(stack, "Stream", {
consumers: {
myConsumer: "src/lambda.main",
}
});
Configuring consumers
Lazily adding consumers
Add consumers after the stream has been created.
const stream = new KinesisStream(stack, "Stream", {
consumers: {
consumer1: "src/consumer1.main",
consumer2: "src/consumer2.main",
}
});
stream.addConsumers(this, {
consumer3: "src/consumer3.main",
});
Specifying function props for all the consumers
You can extend the minimal config, to set some function props and have them apply to all the consumers.
new KinesisStream(stack, "Stream", {
defaults: {
function: {
timeout: 20,
environment: { tableName: table.tableName },
permissions: [table],
},
},
consumers: {
consumer1: "src/consumer1.main",
consumer2: "src/consumer2.main",
}
});
Configuring an individual consumer
Configure each Lambda function separately.
new KinesisStream(stack, "Stream", {
consumers: {
consumer1: {
function: {
handler: "src/consumer1.main",
timeout: 10,
environment: { tableName: table.tableName },
permissions: [table],
},
}
},
});
Note that, you can set the defaults.function
while using the function
per consumer. The function
will just override the defaults.function
. Except for the environment
, the layers
, and the permissions
properties, that will be merged.
new KinesisStream(stack, "Stream", {
defaults: {
function: {
timeout: 20,
environment: { tableName: table.tableName },
permissions: [table],
},
},
consumers: {
consumer1: {
function: {
handler: "src/consumer1.main",
timeout: 10,
environment: { bucketName: bucket.bucketName },
permissions: [bucket],
},
},
consumer2: "src/consumer2.main",
},
});
So in the above example, the consumer1
function doesn't use the timeout
that is set in the defaults.function
. It'll instead use the one that is defined in the function definition (10 seconds
). And the function will have both the tableName
and the bucketName
environment variables set; as well as permissions to both the table
and the bucket
.
Configuring consumer event source
Configure the internally created CDK Event Source.
import { StartingPosition } from "aws-cdk-lib/aws-lambda";
new KinesisStream(stack, "Stream", {
consumers: {
consumer1: {
function: "src/consumer1.main",
cdk: {
eventSource: {
startingPosition: StartingPosition.LATEST,
},
},
},
}
});
Giving the consumers some permissions
Allow the consumer functions to access S3.
const stream = new KinesisStream(stack, "Stream", {
consumers: {
consumer1: "src/consumer1.main",
consumer2: "src/consumer2.main",
}
});
stream.attachPermissions(["s3"]);
Giving a specific consumers some permissions
Allow a specific consumer function to access S3.
const stream = new KinesisStream(stack, "Stream", {
consumers: {
consumer1: "src/consumer1.main",
consumer2: "src/consumer2.main",
}
});
stream.attachPermissionsToConsumer("consumer1", ["s3"]);
Advanced examples
Configuring the Kinesis stream
Configure the internally created CDK Stream
instance.
new KinesisStream(stack, "Stream", {
consumers: {
consumer1: "src/consumer1.main",
consumer2: "src/consumer2.main",
}
cdk: {
stream: {
shardCount: 3,
}
},
});
Importing an existing stream
Override the internally created CDK Stream
instance.
import { Stream } from "aws-cdk-lib/aws-kinesis";
new KinesisStream(stack, "Stream", {
cdk: {
stream: Stream.fromStreamArn(this, "ImportedStream", streamArn),
},
});
Constructor
new KinesisStream(scope, id, props)
Parameters
- scope Construct
- id string
- props KinesisStreamProps
KinesisStreamProps
consumers?
Type : Record<string, string | Function | KinesisStreamConsumerProps>
Define the function consumers for this stream
new KinesisStream(stack, "Stream", {
consumers: {
consumer1: "src/consumer1.main",
consumer2: {
function: {
handler: "src/consumer2.handler",
timeout: 30
}
}
}
});
defaults.function?
Type : FunctionProps
The default function props to be applied to all the Lambda functions in the API. The environment
, permissions
and layers
properties will be merged with per route definitions if they are defined.
new KinesisStream(stack, "Stream", {
defaults: {
function: {
timeout: 20,
}
}
});
cdk.id?
Type : string
Allows you to override default id for this construct.
cdk.stream?
Type : IStream | StreamProps
Override the internally created Kinesis Stream
new KinesisStream(stack, "Stream", {
cdk: {
stream: {
streamName: "my-stream",
}
}
});
Properties
An instance of KinesisStream
has the following properties.
id
Type : string
streamArn
Type : string
The ARN of the internally created Kinesis Stream
streamName
Type : string
The name of the internally created Kinesis Stream
cdk.stream
Type : IStream
Return internally created Kinesis Stream
Methods
An instance of KinesisStream
has the following methods.
addConsumers
addConsumers(scope, consumers)
Parameters
- scope Construct
- consumers
Add consumers to a stream after creating it
stream.addConsumers(stack, {
consumer1: "src/function.handler"
})
attachPermissions
attachPermissions(permissions)
Parameters
- permissions Permissions
Attaches the given list of permissions to all the consumers. This allows the functions to access other AWS resources.
stream.attachPermissions(["s3"]);
attachPermissionsToConsumer
attachPermissionsToConsumer(consumerName, permissions)
Parameters
- consumerName string
- permissions Permissions
Attaches the given list of permissions to a specific consumer. This allows that function to access other AWS resources.
stream.attachPermissionsToConsumer("consumer1", ["s3"]);
bind
bind(constructs)
Parameters
- constructs Array<SSTConstruct>
Binds the given list of resources to all the consumers.
stream.bind([STRIPE_KEY, bucket]]);
bindToConsumer
bindToConsumer(consumerName, constructs)
Parameters
- consumerName string
- constructs Array<SSTConstruct>
Binds the given list of resources to a specific consumer.
stream.bindToConsumer("consumer1", [STRIPE_KEY, bucket]);
getFunction
getFunction(consumerName)
Parameters
- consumerName string
Get the function for a specific consumer
stream.getFunction("consumer1");
KinesisStreamConsumerProps
Used to define the function consumer for the stream
function
Type : string | Function | FunctionProps
The function definition
new KinesisStream(stack, "Stream", {
consumers: {
consumer1: {
function: {
handler: "src/consumer1.handler",
timeout: 30
}
}
}
});
cdk.eventSource?
Type : KinesisEventSourceProps
Override the interally created event source
new KinesisStream(stack, "Stream", {
consumers: {
fun: {
cdk: {
eventSource: {
enabled: false
}
}
}
}
});