diff --git a/tap_dynamodb/sync.py b/tap_dynamodb/sync.py index bfe26fd..060d0d3 100644 --- a/tap_dynamodb/sync.py +++ b/tap_dynamodb/sync.py @@ -22,7 +22,8 @@ def clear_state_on_replication_change(stream, state): return state def sync_stream(config, state, stream): - table_name = stream['tap_stream_id'] + stream_name = stream['tap_stream_id'] + table_name = stream.get('table_name', stream_name) md_map = metadata.to_map(stream['metadata']) replication_method = metadata.get(md_map, (), 'replication-method') @@ -30,39 +31,39 @@ def sync_stream(config, state, stream): # write state message with currently_syncing bookmark state = clear_state_on_replication_change(stream, state) - state = singer.set_currently_syncing(state, table_name) + state = singer.set_currently_syncing(state, stream_name) singer.write_state(state) singer.write_message(singer.SchemaMessage( - stream=table_name, + stream=stream_name, schema=stream['schema'], key_properties=key_properties)) rows_saved = 0 if replication_method == 'FULL_TABLE': - LOGGER.info("Syncing full table for stream: %s", table_name) + LOGGER.info("Syncing full table for stream: %s", stream_name) rows_saved += full_table.sync(config, state, stream) elif replication_method == 'LOG_BASED': - LOGGER.info("Syncing log based for stream: %s", table_name) + LOGGER.info("Syncing log based for stream: %s", stream_name) - if log_based.has_stream_aged_out(state, table_name): + if log_based.has_stream_aged_out(state, stream_name): LOGGER.info("Clearing state because stream has aged out") - state.get('bookmarks', {}).pop(table_name) + state.get('bookmarks', {}).pop(stream_name) - if not singer.get_bookmark(state, table_name, 'initial_full_table_complete'): + if not singer.get_bookmark(state, stream_name, 'initial_full_table_complete'): msg = 'Must complete full table sync before replicating from dynamodb streams for %s' - LOGGER.info(msg, table_name) + LOGGER.info(msg, stream_name) - state = log_based.get_initial_bookmarks(config, state, table_name) + state = log_based.get_initial_bookmarks(config, state, stream_name, table_name) singer.write_state(state) rows_saved += full_table.sync(config, state, stream) rows_saved += log_based.sync(config, state, stream) else: - LOGGER.info('Unknown replication method: %s for stream: %s', replication_method, table_name) + LOGGER.info('Unknown replication method: %s for stream: %s', replication_method, stream_name) - state = singer.write_bookmark(state, table_name, 'success_timestamp', singer.utils.strftime(singer.utils.now())) + state = singer.write_bookmark(state, stream_name, 'success_timestamp', singer.utils.strftime(singer.utils.now())) singer.write_state(state) return rows_saved diff --git a/tap_dynamodb/sync_strategies/full_table.py b/tap_dynamodb/sync_strategies/full_table.py index dc6b30a..7d0a337 100644 --- a/tap_dynamodb/sync_strategies/full_table.py +++ b/tap_dynamodb/sync_strategies/full_table.py @@ -37,24 +37,25 @@ def scan_table(table_name, projection, last_evaluated_key, config): def sync(config, state, stream): - table_name = stream['tap_stream_id'] + stream_name = stream['tap_stream_id'] + table_name = stream.get('table_name', stream_name) # before writing the table version to state, check if we had one to begin with - first_run = singer.get_bookmark(state, table_name, 'version') is None + first_run = singer.get_bookmark(state, stream_name, 'version') is None # last run was interrupted if there is a last_id_fetched bookmark was_interrupted = singer.get_bookmark(state, - table_name, + stream_name, 'last_evaluated_key') is not None # pick a new table version if last run wasn't interrupted if was_interrupted: - stream_version = singer.get_bookmark(state, table_name, 'version') + stream_version = singer.get_bookmark(state, stream_name, 'version') else: stream_version = int(time.time() * 1000) state = singer.write_bookmark(state, - table_name, + stream_name, 'version', stream_version) singer.write_state(state) @@ -62,10 +63,10 @@ def sync(config, state, stream): # For the initial replication, emit an ACTIVATE_VERSION message # at the beginning so the records show up right away. if first_run: - singer.write_version(table_name, stream_version) + singer.write_version(stream_name, stream_version) last_evaluated_key = singer.get_bookmark(state, - table_name, + stream_name, 'last_evaluated_key') md_map = metadata.to_map(stream['metadata']) @@ -80,24 +81,24 @@ def sync(config, state, stream): # TODO: Do we actually have to put the item we retreive from # dynamo into a map before we can deserialize? record = deserializer.deserialize_item(item) - record_message = singer.RecordMessage(stream=table_name, + record_message = singer.RecordMessage(stream=stream_name, record=record, version=stream_version) singer.write_message(record_message) if result.get('LastEvaluatedKey'): - state = singer.write_bookmark(state, table_name, 'last_evaluated_key', result.get('LastEvaluatedKey')) + state = singer.write_bookmark(state, stream_name, 'last_evaluated_key', result.get('LastEvaluatedKey')) singer.write_state(state) - state = singer.clear_bookmark(state, table_name, 'last_evaluated_key') + state = singer.clear_bookmark(state, stream_name, 'last_evaluated_key') state = singer.write_bookmark(state, - table_name, + stream_name, 'initial_full_table_complete', True) singer.write_state(state) - singer.write_version(table_name, stream_version) + singer.write_version(stream_name, stream_version) return rows_saved diff --git a/tap_dynamodb/sync_strategies/log_based.py b/tap_dynamodb/sync_strategies/log_based.py index 014d0af..7a971e7 100644 --- a/tap_dynamodb/sync_strategies/log_based.py +++ b/tap_dynamodb/sync_strategies/log_based.py @@ -77,7 +77,7 @@ def get_shard_records(streams_client, stream_arn, shard, sequence_number): shard_iterator = records.get('NextShardIterator') -def sync_shard(shard, seq_number_bookmarks, streams_client, stream_arn, projection, deserializer, table_name, stream_version, state): +def sync_shard(shard, seq_number_bookmarks, streams_client, stream_arn, projection, deserializer, stream_name, stream_version, state): seq_number = seq_number_bookmarks.get(shard['ShardId']) rows_synced = 0 @@ -98,7 +98,7 @@ def sync_shard(shard, seq_number_bookmarks, streams_client, stream_arn, projecti LOGGER.fatal("Projection failed to apply: %s", projection) raise RuntimeError('Projection failed to apply: {}'.format(projection)) - record_message = singer.RecordMessage(stream=table_name, + record_message = singer.RecordMessage(stream=stream_name, record=record_message, version=stream_version) singer.write_message(record_message) @@ -106,7 +106,7 @@ def sync_shard(shard, seq_number_bookmarks, streams_client, stream_arn, projecti rows_synced += 1 seq_number_bookmarks[shard['ShardId']] = record['dynamodb']['SequenceNumber'] - state = singer.write_bookmark(state, table_name, 'shard_seq_numbers', seq_number_bookmarks) + state = singer.write_bookmark(state, stream_name, 'shard_seq_numbers', seq_number_bookmarks) # Every 100 rows write the state if rows_synced % 100 == 0: @@ -117,7 +117,8 @@ def sync_shard(shard, seq_number_bookmarks, streams_client, stream_arn, projecti def sync(config, state, stream): - table_name = stream['tap_stream_id'] + stream_name = stream['tap_stream_id'] + table_name = stream.get('table_name', stream_name) client = dynamodb.get_client(config) streams_client = dynamodb.get_stream_client(config) @@ -128,8 +129,8 @@ def sync(config, state, stream): projection = [x.strip().split('.') for x in projection.split(',')] # Write activate version message - stream_version = singer.get_bookmark(state, table_name, 'version') - singer.write_version(table_name, stream_version) + stream_version = singer.get_bookmark(state, stream_name, 'version') + singer.write_version(stream_name, stream_version) table = client.describe_table(TableName=table_name)['Table'] stream_arn = table['LatestStreamArn'] @@ -137,7 +138,7 @@ def sync(config, state, stream): # Stores a dictionary of shardId : sequence_number for a shard. Should # only store sequence numbers for closed shards that have not been # fully synced - seq_number_bookmarks = singer.get_bookmark(state, table_name, 'shard_seq_numbers') + seq_number_bookmarks = singer.get_bookmark(state, stream_name, 'shard_seq_numbers') if not seq_number_bookmarks: seq_number_bookmarks = dict() @@ -145,7 +146,7 @@ def sync(config, state, stream): # are removed after performing a sync and not seeing the shardId # returned by get_shards() because at that point the shard has been # killed by DynamoDB and will not be returned anymore - finished_shard_bookmarks = singer.get_bookmark(state, table_name, 'finished_shards') + finished_shard_bookmarks = singer.get_bookmark(state, stream_name, 'finished_shards') if not finished_shard_bookmarks: finished_shard_bookmarks = list() @@ -163,16 +164,16 @@ def sync(config, state, stream): if shard['ShardId'] not in finished_shard_bookmarks: rows_synced += sync_shard(shard, seq_number_bookmarks, streams_client, stream_arn, projection, deserializer, - table_name, stream_version, state) + stream_name, stream_version, state) # Now that we have fully synced the shard, move it from the # shard_seq_numbers to finished_shards. finished_shard_bookmarks.append(shard['ShardId']) - state = singer.write_bookmark(state, table_name, 'finished_shards', finished_shard_bookmarks) + state = singer.write_bookmark(state, stream_name, 'finished_shards', finished_shard_bookmarks) if seq_number_bookmarks.get(shard['ShardId']): seq_number_bookmarks.pop(shard['ShardId']) - state = singer.write_bookmark(state, table_name, 'shard_seq_numbers', seq_number_bookmarks) + state = singer.write_bookmark(state, stream_name, 'shard_seq_numbers', seq_number_bookmarks) singer.write_state(state) @@ -180,14 +181,14 @@ def sync(config, state, stream): if shardId not in found_shards: # Remove this shard because its no longer appearing when we query for get_shards finished_shard_bookmarks.remove(shardId) - state = singer.write_bookmark(state, table_name, 'finished_shards', finished_shard_bookmarks) + state = singer.write_bookmark(state, stream_name, 'finished_shards', finished_shard_bookmarks) singer.write_state(state) return rows_synced -def has_stream_aged_out(state, table_name): +def has_stream_aged_out(state, stream_name): ''' Uses the success_timestamp on the stream to determine if we have successfully synced the stream in the last 19 hours 30 minutes. @@ -200,7 +201,7 @@ def has_stream_aged_out(state, table_name): ''' current_time = singer.utils.now() - success_timestamp = singer.get_bookmark(state, table_name, 'success_timestamp') + success_timestamp = singer.get_bookmark(state, stream_name, 'success_timestamp') # If we have no success_timestamp then we have aged out if not success_timestamp: @@ -213,7 +214,7 @@ def has_stream_aged_out(state, table_name): return time_span > datetime.timedelta(hours=19, minutes=30) -def get_initial_bookmarks(config, state, table_name): +def get_initial_bookmarks(config, state, stream_name, table_name): ''' Returns the state including all bookmarks necessary for the initial full table sync @@ -229,6 +230,6 @@ def get_initial_bookmarks(config, state, table_name): stream_arn = table['LatestStreamArn'] finished_shard_bookmarks = [shard['ShardId'] for shard in get_shards(streams_client, stream_arn)] - state = singer.write_bookmark(state, table_name, 'finished_shards', finished_shard_bookmarks) + state = singer.write_bookmark(state, stream_name, 'finished_shards', finished_shard_bookmarks) return state