Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use table_name if different from tap_stream_id #25

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 13 additions & 12 deletions tap_dynamodb/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,47 +22,48 @@ def clear_state_on_replication_change(stream, state):
return state

def sync_stream(config, state, stream):
table_name = stream['tap_stream_id']
stream_name = stream['tap_stream_id']
table_name = stream.get('table_name', stream_name)

md_map = metadata.to_map(stream['metadata'])
replication_method = metadata.get(md_map, (), 'replication-method')
key_properties = metadata.get(md_map, (), 'table-key-properties')

# write state message with currently_syncing bookmark
state = clear_state_on_replication_change(stream, state)
state = singer.set_currently_syncing(state, table_name)
state = singer.set_currently_syncing(state, stream_name)
singer.write_state(state)

singer.write_message(singer.SchemaMessage(
stream=table_name,
stream=stream_name,
schema=stream['schema'],
key_properties=key_properties))

rows_saved = 0
if replication_method == 'FULL_TABLE':
LOGGER.info("Syncing full table for stream: %s", table_name)
LOGGER.info("Syncing full table for stream: %s", stream_name)
rows_saved += full_table.sync(config, state, stream)
elif replication_method == 'LOG_BASED':
LOGGER.info("Syncing log based for stream: %s", table_name)
LOGGER.info("Syncing log based for stream: %s", stream_name)

if log_based.has_stream_aged_out(state, table_name):
if log_based.has_stream_aged_out(state, stream_name):
LOGGER.info("Clearing state because stream has aged out")
state.get('bookmarks', {}).pop(table_name)
state.get('bookmarks', {}).pop(stream_name)

if not singer.get_bookmark(state, table_name, 'initial_full_table_complete'):
if not singer.get_bookmark(state, stream_name, 'initial_full_table_complete'):
msg = 'Must complete full table sync before replicating from dynamodb streams for %s'
LOGGER.info(msg, table_name)
LOGGER.info(msg, stream_name)

state = log_based.get_initial_bookmarks(config, state, table_name)
state = log_based.get_initial_bookmarks(config, state, stream_name, table_name)
singer.write_state(state)

rows_saved += full_table.sync(config, state, stream)

rows_saved += log_based.sync(config, state, stream)
else:
LOGGER.info('Unknown replication method: %s for stream: %s', replication_method, table_name)
LOGGER.info('Unknown replication method: %s for stream: %s', replication_method, stream_name)

state = singer.write_bookmark(state, table_name, 'success_timestamp', singer.utils.strftime(singer.utils.now()))
state = singer.write_bookmark(state, stream_name, 'success_timestamp', singer.utils.strftime(singer.utils.now()))
singer.write_state(state)

return rows_saved
25 changes: 13 additions & 12 deletions tap_dynamodb/sync_strategies/full_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,35 +37,36 @@ def scan_table(table_name, projection, last_evaluated_key, config):


def sync(config, state, stream):
table_name = stream['tap_stream_id']
stream_name = stream['tap_stream_id']
table_name = stream.get('table_name', stream_name)

# before writing the table version to state, check if we had one to begin with
first_run = singer.get_bookmark(state, table_name, 'version') is None
first_run = singer.get_bookmark(state, stream_name, 'version') is None

# last run was interrupted if there is a last_id_fetched bookmark
was_interrupted = singer.get_bookmark(state,
table_name,
stream_name,
'last_evaluated_key') is not None

# pick a new table version if last run wasn't interrupted
if was_interrupted:
stream_version = singer.get_bookmark(state, table_name, 'version')
stream_version = singer.get_bookmark(state, stream_name, 'version')
else:
stream_version = int(time.time() * 1000)

state = singer.write_bookmark(state,
table_name,
stream_name,
'version',
stream_version)
singer.write_state(state)

# For the initial replication, emit an ACTIVATE_VERSION message
# at the beginning so the records show up right away.
if first_run:
singer.write_version(table_name, stream_version)
singer.write_version(stream_name, stream_version)

last_evaluated_key = singer.get_bookmark(state,
table_name,
stream_name,
'last_evaluated_key')

md_map = metadata.to_map(stream['metadata'])
Expand All @@ -80,24 +81,24 @@ def sync(config, state, stream):
# TODO: Do we actually have to put the item we retreive from
# dynamo into a map before we can deserialize?
record = deserializer.deserialize_item(item)
record_message = singer.RecordMessage(stream=table_name,
record_message = singer.RecordMessage(stream=stream_name,
record=record,
version=stream_version)

singer.write_message(record_message)
if result.get('LastEvaluatedKey'):
state = singer.write_bookmark(state, table_name, 'last_evaluated_key', result.get('LastEvaluatedKey'))
state = singer.write_bookmark(state, stream_name, 'last_evaluated_key', result.get('LastEvaluatedKey'))
singer.write_state(state)

state = singer.clear_bookmark(state, table_name, 'last_evaluated_key')
state = singer.clear_bookmark(state, stream_name, 'last_evaluated_key')

state = singer.write_bookmark(state,
table_name,
stream_name,
'initial_full_table_complete',
True)

singer.write_state(state)

singer.write_version(table_name, stream_version)
singer.write_version(stream_name, stream_version)

return rows_saved
33 changes: 17 additions & 16 deletions tap_dynamodb/sync_strategies/log_based.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ def get_shard_records(streams_client, stream_arn, shard, sequence_number):
shard_iterator = records.get('NextShardIterator')


def sync_shard(shard, seq_number_bookmarks, streams_client, stream_arn, projection, deserializer, table_name, stream_version, state):
def sync_shard(shard, seq_number_bookmarks, streams_client, stream_arn, projection, deserializer, stream_name, stream_version, state):
seq_number = seq_number_bookmarks.get(shard['ShardId'])

rows_synced = 0
Expand All @@ -98,15 +98,15 @@ def sync_shard(shard, seq_number_bookmarks, streams_client, stream_arn, projecti
LOGGER.fatal("Projection failed to apply: %s", projection)
raise RuntimeError('Projection failed to apply: {}'.format(projection))

record_message = singer.RecordMessage(stream=table_name,
record_message = singer.RecordMessage(stream=stream_name,
record=record_message,
version=stream_version)
singer.write_message(record_message)

rows_synced += 1

seq_number_bookmarks[shard['ShardId']] = record['dynamodb']['SequenceNumber']
state = singer.write_bookmark(state, table_name, 'shard_seq_numbers', seq_number_bookmarks)
state = singer.write_bookmark(state, stream_name, 'shard_seq_numbers', seq_number_bookmarks)

# Every 100 rows write the state
if rows_synced % 100 == 0:
Expand All @@ -117,7 +117,8 @@ def sync_shard(shard, seq_number_bookmarks, streams_client, stream_arn, projecti


def sync(config, state, stream):
table_name = stream['tap_stream_id']
stream_name = stream['tap_stream_id']
table_name = stream.get('table_name', stream_name)

client = dynamodb.get_client(config)
streams_client = dynamodb.get_stream_client(config)
Expand All @@ -128,24 +129,24 @@ def sync(config, state, stream):
projection = [x.strip().split('.') for x in projection.split(',')]

# Write activate version message
stream_version = singer.get_bookmark(state, table_name, 'version')
singer.write_version(table_name, stream_version)
stream_version = singer.get_bookmark(state, stream_name, 'version')
singer.write_version(stream_name, stream_version)

table = client.describe_table(TableName=table_name)['Table']
stream_arn = table['LatestStreamArn']

# Stores a dictionary of shardId : sequence_number for a shard. Should
# only store sequence numbers for closed shards that have not been
# fully synced
seq_number_bookmarks = singer.get_bookmark(state, table_name, 'shard_seq_numbers')
seq_number_bookmarks = singer.get_bookmark(state, stream_name, 'shard_seq_numbers')
if not seq_number_bookmarks:
seq_number_bookmarks = dict()

# Get the list of closed shards which we have fully synced. These
# are removed after performing a sync and not seeing the shardId
# returned by get_shards() because at that point the shard has been
# killed by DynamoDB and will not be returned anymore
finished_shard_bookmarks = singer.get_bookmark(state, table_name, 'finished_shards')
finished_shard_bookmarks = singer.get_bookmark(state, stream_name, 'finished_shards')
if not finished_shard_bookmarks:
finished_shard_bookmarks = list()

Expand All @@ -163,31 +164,31 @@ def sync(config, state, stream):
if shard['ShardId'] not in finished_shard_bookmarks:
rows_synced += sync_shard(shard, seq_number_bookmarks,
streams_client, stream_arn, projection, deserializer,
table_name, stream_version, state)
stream_name, stream_version, state)

# Now that we have fully synced the shard, move it from the
# shard_seq_numbers to finished_shards.
finished_shard_bookmarks.append(shard['ShardId'])
state = singer.write_bookmark(state, table_name, 'finished_shards', finished_shard_bookmarks)
state = singer.write_bookmark(state, stream_name, 'finished_shards', finished_shard_bookmarks)

if seq_number_bookmarks.get(shard['ShardId']):
seq_number_bookmarks.pop(shard['ShardId'])
state = singer.write_bookmark(state, table_name, 'shard_seq_numbers', seq_number_bookmarks)
state = singer.write_bookmark(state, stream_name, 'shard_seq_numbers', seq_number_bookmarks)

singer.write_state(state)

for shardId in finished_shard_bookmarks:
if shardId not in found_shards:
# Remove this shard because its no longer appearing when we query for get_shards
finished_shard_bookmarks.remove(shardId)
state = singer.write_bookmark(state, table_name, 'finished_shards', finished_shard_bookmarks)
state = singer.write_bookmark(state, stream_name, 'finished_shards', finished_shard_bookmarks)

singer.write_state(state)

return rows_synced


def has_stream_aged_out(state, table_name):
def has_stream_aged_out(state, stream_name):
'''
Uses the success_timestamp on the stream to determine if we have
successfully synced the stream in the last 19 hours 30 minutes.
Expand All @@ -200,7 +201,7 @@ def has_stream_aged_out(state, table_name):
'''
current_time = singer.utils.now()

success_timestamp = singer.get_bookmark(state, table_name, 'success_timestamp')
success_timestamp = singer.get_bookmark(state, stream_name, 'success_timestamp')

# If we have no success_timestamp then we have aged out
if not success_timestamp:
Expand All @@ -213,7 +214,7 @@ def has_stream_aged_out(state, table_name):
return time_span > datetime.timedelta(hours=19, minutes=30)


def get_initial_bookmarks(config, state, table_name):
def get_initial_bookmarks(config, state, stream_name, table_name):
'''
Returns the state including all bookmarks necessary for the initial
full table sync
Expand All @@ -229,6 +230,6 @@ def get_initial_bookmarks(config, state, table_name):
stream_arn = table['LatestStreamArn']

finished_shard_bookmarks = [shard['ShardId'] for shard in get_shards(streams_client, stream_arn)]
state = singer.write_bookmark(state, table_name, 'finished_shards', finished_shard_bookmarks)
state = singer.write_bookmark(state, stream_name, 'finished_shards', finished_shard_bookmarks)

return state