-
Notifications
You must be signed in to change notification settings - Fork 4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(kinesis): support stream consumers #32087
base: main
Are you sure you want to change the base?
Conversation
- introduce `StreamConsumer` construct to model `AWS::Kinesis::StreamConsumer` - introduce `addToResourcePolicy` to enable creating/configuring a resource policy for the consumer - introduce `grant` and `grantRead` for granting permissions - leverage `iam.Grant.addToPrincipalOrResource` in `grant` to be able to use `grant` methods cross environments to update the grantee's iam policy and the consumer's resource policy as needed - update `ResourcePolicy` to support both `Stream` and `StreamConsumer` - update `Stream`'s `grant` to leverage `iam.Grant.addToPrincipalOrResource` for cross-environment support closes aws#32050
Codecov ReportAll modified and coverable lines are covered by tests ✅
Additional details and impacted files@@ Coverage Diff @@
## main #32087 +/- ##
=======================================
Coverage 81.52% 81.52%
=======================================
Files 222 222
Lines 13715 13715
Branches 2417 2417
=======================================
Hits 11181 11181
Misses 2254 2254
Partials 280 280
Flags with carried forward coverage won't be shown. Click here to find out more.
|
…aws-cdk into kinesis-stream-consumer
One other thing I noticed, is that in lambda event sources, the KinesisEventSource assumes an
A potential implementation
import * as constructs from 'constructs';
import { StreamEventSource, StreamEventSourceProps } from './stream';
import * as iam from '../../aws-iam';
import * as kinesis from '../../aws-kinesis';
import * as lambda from '../../aws-lambda';
import * as cdk from '../../core';
export interface KinesisEventSourceProps extends StreamEventSourceProps {
/**
* The time from which to start reading, in Unix time seconds.
*
* @default - no timestamp
*/
readonly startingPositionTimestamp?: number;
}
interface KinesisSource {
readonly node: constructs.Node;
readonly sourceArn: string;
readonly eventSourceName: string;
grantRead(grantee: iam.IGrantable): iam.Grant;
}
abstract class KinesisEventSourceBase extends StreamEventSource {
private _eventSourceMappingId?: string = undefined;
private _eventSourceMappingArn?: string = undefined;
private startingPositionTimestamp?: number;
constructor(readonly source: KinesisSource, props: KinesisEventSourceProps) {
super(props);
this.startingPositionTimestamp = props.startingPositionTimestamp;
this.props.batchSize !== undefined && cdk.withResolved(this.props.batchSize, batchSize => {
if (batchSize < 1 || batchSize > 10000) {
throw new Error(`Maximum batch size must be between 1 and 10000 inclusive (given ${this.props.batchSize})`);
}
});
}
public bind(target: lambda.IFunction) {
const eventSourceMapping = target.addEventSourceMapping(`${this.source.eventSourceName}:${cdk.Names.nodeUniqueId(this.source.node)}`,
this.enrichMappingOptions({
eventSourceArn: this.source.sourceArn,
startingPositionTimestamp: this.startingPositionTimestamp,
metricsConfig: this.props.metricsConfig,
}),
);
this._eventSourceMappingId = eventSourceMapping.eventSourceMappingId;
this._eventSourceMappingArn = eventSourceMapping.eventSourceMappingArn;
this.source.grantRead(target);
}
/**
* The identifier for this EventSourceMapping
*/
public get eventSourceMappingId(): string {
if (!this._eventSourceMappingId) {
throw new Error(`${this.source.eventSourceName} is not yet bound to an event source mapping`);
}
return this._eventSourceMappingId;
}
/**
* The ARN for this EventSourceMapping
*/
public get eventSourceMappingArn(): string {
if (!this._eventSourceMappingArn) {
throw new Error(`${this.source.eventSourceName} is not yet bound to an event source mapping`);
}
return this._eventSourceMappingArn;
}
}
/**
* Use an Amazon Kinesis stream as an event source for AWS Lambda.
*/
export class KinesisEventSource extends KinesisEventSourceBase {
constructor(stream: kinesis.IStream, props: KinesisEventSourceProps) {
super({ ...stream, eventSourceName: 'KinesisEventSource', sourceArn: stream.streamArn }, props);
}
}
/**
* Use an Amazon Kinesis stream consumer as an event source for AWS Lambda.
*/
export class KinesisConsumerEventSource extends KinesisEventSourceBase {
constructor(streamConsumer: kinesis.IStreamConsumer, props: KinesisEventSourceProps) {
super({ ...streamConsumer, eventSourceName: 'KinesisConsumerEventSource', sourceArn: streamConsumer.streamConsumerArn }, props);
}
} |
…aws-cdk into kinesis-stream-consumer
// and `SubscribeToShard` APIs. | ||
// The Lambda::EventSourceMapping resource validates against the `DescribeStream` permission. So we add it explicitly. | ||
// FIXME This permission can be removed when the event source mapping resource drops it from validation. | ||
this.stream.grant(target, 'kinesis:DescribeStream'); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This permission is now already part of grantRead
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great job putting this together, we could finally replace our custom constructs for stream and consumer if this gets done. It has a fatal flaw though :)
return iam.Grant.addToPrincipalOrResource({ | ||
grantee, | ||
actions, | ||
resourceArns: [this.streamArn], | ||
scope: this, | ||
resource: this, | ||
}); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
KDS resource policy allows only a limited subset of actions. If the resource policy contains actions that aren't supported, Kinesis will return an InvalidRequest
error, which means you need to do some filtering here.
The trouble is you can't know if the actions end up in the identity or resource policy. The way we solved it in our custom construct was:
- override
grantRead
,grantWrite
,grantReadWrite
separately - lift
addToPrincipalOrResource
implementation and repurpose it to detect if it leads to identity or resource policy - apply correct set of limited actions for resource policies
I'm unaware of any other possible solution, but maybe you will have some idea.
For reference, the list of allowed actions (https://docs.aws.amazon.com/streams/latest/dev/controlling-access.html#kinesis-stream-sharing-iam-examples):
- Stream
DescribeStreamSummary
GetRecords
GetShardIterator
ListShards
PutRecord
PutRecords
DescribeStream
Funnily enough, the official list doesn't containDescribeStream
but in reality, it is supported as it's required for event source mapping (we raised the doc inconsistency with AWS, but they haven't updated it)
- Consumer
DescribeStreamConsumer
SubscribeToShard
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for this. This is interesting I didn't know about this limitation.
I've checked the link, just to confirm, are u referring to the table of Kinesis Data Streams actions that can be shared ?
I'm trying to think about what issues can arise?
- The list of hardcoded permissions in
stream.ts
(READ_OPERATIONS
,WRITE_OPERATIONS
) andstream-consumers.ts
(READ_OPERATIONS
) might not match those lists u provided? - Anyone using
grant
directly might not know about the limitation, and would just be surprised when cloudformation deployment fail?
Re 1, I think that if so, then it definitely needs addressing
Re 2, depending on how 1 might be addressed, can be automatically solved, or if 1 is not an issue, then it might be ok to leave it on the user maybe to
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
note to self: integ test for grant methods in a cross-account fashion... examples of cross-account tests https://github.com/search?q=repo%3Aaws/aws-cdk%20CDK_INTEG_CROSS_ACCOUNT&type=code
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I meant the table. Those allowed actions are subset of the hardcoded permissions.
The point of grantX
methods is that they're seamless and solve the complex situations for you - user don't have a way how to influence their behavior, so they can't make the CFN deployment fail.
AWS CodeBuild CI Report
Powered by github-codebuild-logs, available on the AWS Serverless Application Repository |
Issue # (if applicable)
Closes #32050
Reason for this change
Support Enhanced fan-out consumers via
AWS::Kinesis::StreamConsumer
and facilitate cross-account stream consumption via LambdaDescription of changes
StreamConsumer
construct to modelAWS::Kinesis::StreamConsumer
addToResourcePolicy
to enable creating/configuring a resource policy for the consumergrant
andgrantRead
for granting permissionsiam.Grant.addToPrincipalOrResource
ingrant
to be able to usegrant
methods cross environments to update the grantee's iam policy and the consumer's resource policy as neededResourcePolicy
to support bothStream
andStreamConsumer
Stream
'sgrant
to leverageiam.Grant.addToPrincipalOrResource
for cross-environment supportKinesisConsumerEventSource
tolambda-event-sources
for use with the newly introducedStreamConsumer
Useful links
Description of how you validated changes
unit and integration tests
Checklist
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache-2.0 license