diff --git a/packages/serverless-offline-kinesis/package.json b/packages/serverless-offline-kinesis/package.json index 60b3903e..e648af43 100644 --- a/packages/serverless-offline-kinesis/package.json +++ b/packages/serverless-offline-kinesis/package.json @@ -32,5 +32,11 @@ "kinesis", "serverless", "lambda" - ] + ], + "devDependencies": { + "jest": "^24.9.0" + }, + "scripts": { + "test": "jest" + } } diff --git a/packages/serverless-offline-kinesis/src/index.js b/packages/serverless-offline-kinesis/src/index.js index feeae14b..1cb16356 100644 --- a/packages/serverless-offline-kinesis/src/index.js +++ b/packages/serverless-offline-kinesis/src/index.js @@ -1,4 +1,4 @@ -const {join} = require('path'); +const path = require('path'); const {Writable} = require('stream'); const figures = require('figures'); const Kinesis = require('aws-sdk/clients/kinesis'); @@ -19,6 +19,8 @@ const { matchesProperty, omitBy, isString, + isObject, + isArray, pipe, startsWith } = require('lodash/fp'); @@ -36,6 +38,22 @@ const extractStreamNameFromARN = arn => { return StreamNames.join('/'); }; +const extractStreamNameFromGetAtt = getAtt => { + if (isArray(getAtt)) return getAtt[0]; + if (isString(getAtt) && getAtt.endsWith('.Arn')) return getAtt.replace(/\.Arn$/, ''); + throw new Error('Unable to parse Fn::GetAtt for stream cross-reference'); +}; + +const extractStreamNameFromJoin = ([delimiter, parts]) => { + const resolvedParts = parts.map(part => { + if (isString(part)) return part; + // TODO maybe handle getAtt in Join? + if (isObject(part)) return ''; // empty string as placeholder + return ''; + }); + return extractStreamNameFromARN(resolvedParts.join(delimiter)); +}; + class ServerlessOfflineKinesis { constructor(serverless, options) { this.serverless = serverless; @@ -87,7 +105,7 @@ class ServerlessOfflineKinesis { process.env = functionEnv; const serviceRuntime = this.service.provider.runtime; - const servicePath = join(this.serverless.config.servicePath, location); + const servicePath = path.join(this.serverless.config.servicePath, location); const funOptions = functionHelper.getFunctionOptions( __function, functionName, @@ -134,23 +152,59 @@ class ServerlessOfflineKinesis { if (isString(streamEvent.arn)) return extractStreamNameFromARN(streamEvent.arn); if (isString(streamEvent.streamName)) return streamEvent.streamName; - if (streamEvent.arn['Fn::GetAtt']) { - const [ResourceName] = streamEvent.arn['Fn::GetAtt']; + const {'Fn::GetAtt': getAtt, 'Fn::Join': join} = streamEvent.arn; + if (getAtt) { + const [ResourceName] = streamEvent.arn[getAtt]; + // const logicalResourceName = extractStreamNameFromGetAtt(getAtt); + // const physicalResourceName = get(['service', 'resources', 'Resources', logicalResourceName, 'Properties', 'Name'])(this); const name = get(`resources.Resources.${ResourceName}.Properties.Name`, this.service); if (isString(name)) return name; } + if (join) { + const physicalResourceName = extractStreamNameFromJoin(join); // Fixme name + if (isString(physicalResourceName)) return physicalResourceName; + } throw new Error( `StreamName not found. See https://github.com/CoorpAcademy/serverless-plugins/tree/master/packages/serverless-offline-kinesis#functions` ); } + // FIXME: to really incorporate [to be done after conflict resolving] + pollStreamUntilActive(streamName, timeout) { + const client = this.getClient(); + const lastTime = Date.now() + timeout; + return new Promise((resolve, reject) => { + const poll = async () => { + const { + StreamDescription: {StreamStatus} + } = await client.describeStream({StreamName: streamName}).promise(); + if (StreamStatus === 'ACTIVE') { + resolve(); + } else if (Date.now() > lastTime) { + reject( + new Error( + `Stream ${streamName} did not become active within timeout of ${Math.floor( + timeout / 1000 + )}s` + ) + ); + } else { + setTimeout(poll, 1000); + } + }; + poll(); + }); + } + async createKinesisReadable(functionName, streamEvent, retry = false) { const client = this.getClient(); const streamName = this.getStreamName(streamEvent); - this.serverless.cli.log(`${streamName}`); + this.serverless.cli.log(`Waiting for ${streamName} to become active`); + + await this.pollStreamUntilActive(streamName, this.getConfig().waitForActiveTimeout || 30000); // FIXME const kinesisStream = await client .describeStream({ @@ -175,6 +229,7 @@ class ServerlessOfflineKinesis { const { StreamDescription: {Shards: shards} } = kinesisStream; + this.serverless.cli.log(`${streamName} - creating listeners for ${shards.length} shards`); forEach(({ShardId: shardId}) => { const readable = KinesisReadable( @@ -244,3 +299,4 @@ class ServerlessOfflineKinesis { } module.exports = ServerlessOfflineKinesis; +module.exports.extractStreamNameFromGetAtt = extractStreamNameFromGetAtt; diff --git a/packages/serverless-offline-kinesis/src/index.test.js b/packages/serverless-offline-kinesis/src/index.test.js new file mode 100644 index 00000000..fc8a0752 --- /dev/null +++ b/packages/serverless-offline-kinesis/src/index.test.js @@ -0,0 +1,76 @@ +const ServerlessOfflineKinesis = require('./index'); +const { extractStreamNameFromGetAtt } = ServerlessOfflineKinesis; + +test('extractStreamNameFromGetAtt handles array Fn::GetAtt', () => { + expect(extractStreamNameFromGetAtt(['MyResource', 'Arn'])).toEqual('MyResource'); +}); +test('extractStreamNameFromGetAtt handles string Fn::GetAtt', () => { + expect(extractStreamNameFromGetAtt('MyResource.Arn')).toEqual('MyResource'); +}); +test('extractStreamNameFromGetAtt throws on other cases', () => { + expect(() => extractStreamNameFromGetAtt({ MyResource: 'Arn' })).toThrow(); +}); + +const baseServerless = { + service: { + resources: { + Resources: { + TestStream: { + Type: 'AWS::Kinesis::Stream', + Properties: { + Name: 'test-stream-dev' + }, + }, + }, + }, + }, +}; + +test('getStreamName handles a directly specified ARN', () => { + const plugin = new ServerlessOfflineKinesis(baseServerless); + expect(plugin.getStreamName('arn:aws:kinesis:us-east-1:123456789012:stream/TestStream')).toEqual('TestStream'); +}) + +test('getStreamName handles an object with a arn string property', () => { + const plugin = new ServerlessOfflineKinesis(baseServerless); + expect(plugin.getStreamName({ + arn: 'arn:aws:kinesis:us-east-1:123456789012:stream/TestStream' + })).toEqual('TestStream'); +}); + +test('getStreamName handles an object with a streamName property', () => { + const plugin = new ServerlessOfflineKinesis(baseServerless); + expect(plugin.getStreamName({ + streamName: 'TestStream', + })).toEqual('TestStream'); +}); + +test('getStreamName handles an object with an arn Fn::GetAtt lookup (array form)', () => { + const plugin = new ServerlessOfflineKinesis(baseServerless); + expect(plugin.getStreamName({ + arn: { + 'Fn::GetAtt': ['TestStream', 'Arn'], + } + })).toEqual('test-stream-dev'); +}); + +test('getStreamName handles an object with an arn Fn::GetAtt lookup (string form)', () => { + const plugin = new ServerlessOfflineKinesis(baseServerless); + expect(plugin.getStreamName({ + arn: { + 'Fn::GetAtt': 'TestStream.Arn', + } + })).toEqual('test-stream-dev'); +}); +test('getStreamName makes a naïve attempt to parse an arn Fn::Join lookup', () => { + const plugin = new ServerlessOfflineKinesis(baseServerless); + expect(plugin.getStreamName({ + arn: { + 'Fn::Join': [ + ':', + ['arn', 'aws', 'kinesis', { Ref: 'AWS::Region'}, { Ref: 'AWS::AccountId' }, 'stream/my-stream-dev'] + ] + } + })).toEqual('my-stream-dev'); +}); +