Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use table_name if different from tap_stream_id #25

Open
wants to merge 3 commits into
base: master
Choose a base branch
from

Conversation

aaronsteers
Copy link

@aaronsteers aaronsteers commented Sep 4, 2020

Description of change

According to the Singer spec here, I believe the tap is supposed to use the stream's table_name field, if provided, to specify the source table. This is important so that a user who is customizing the stream catalog can redirect a given table to another downstream stream ID or table name.

(In our case, this is necessary due to special characters ('-') in the DynamoDB table name, which we do not want to have replicated to the downstream database's table name.)

Essentially, we just replace table_name = stream['tap_stream_id'] with table_name = stream.get('table_name', stream['tap_stream_id']). This doesn't have any effect if using the default/discovered catalog, but if the user overwrites these values in the catalog,, it can provide separation of upstream table name and downstream stream ID / table name.

Manual QA steps

  • pipx install git+https://github.com/aaronsteers/tap-dynamodb@feature/configurable-table-name
  • In an existing catalog, override stream and tap_stream_id properties and leave table_name as is.
  • Rerun the tap sync and confirm (1) records are flowing through and (2) that the table name downstream matches to stream, which is now distinct from table_name.

Risks

  • Minimal, unless others have overridden table_name without expecting this behavior.

Rollback steps

  • revert this branch

@aaronsteers
Copy link
Author

Manual tests, as described above, have passed. This is ready for review.

@aroder
Copy link

aroder commented Sep 4, 2020

So this will use table_name if available, then fall back to tap_stream_id?

Makes sense to me, and if the names come from dynamo DB with hyphens, and the target is a database like postgres, this change would be necessary

@aaronsteers
Copy link
Author

Indeed! Yes, if table_name is available, that's what dynamo will connect to; otherwise, it will use the tap_stream_id value as it does today.

@aaronsteers
Copy link
Author

aaronsteers commented Sep 5, 2020

UPDATE: I found that the tap was still emitting SCHEMA and RECORD messages with the upstream table_name, which led me to an additional set of updates (here: fbe88a4) which declare both table_name and stream_name, and then update other functions to ensure we are sending stream_name downstream.

Log:

2020-09-04 19:08:34,241 - INFO - Beginning running command: tap-dynamodb --config /mnt/c/Files/Source/slalom-data-platform-core/data/taps/.secrets/tmp/tap-me-slalom-config.json --catalog ./.output/taps/me-slalom-catalog/me-slalom-employeeAssessment-catalog.json --state /tmp/tmp_x2fk149/me-slalom-employeeAssessment-state.json | target-snowflake --config /mnt/c/Files/Source/slalom-data-platform-core/data/taps/.secrets/tmp/target-snowflake-config-employeeAssessment.json > /tmp/tmp_x2fk149/me-slalom-employeeAssessment-state-new.json...
INFO Found credentials in shared credentials file: /mnt/c/Files/Source/slalom-data-platform-core/infra/dev/.secrets/aws-credentials
INFO Attempting to assume_role on RoleArn: arn:aws:iam::489003720472:role/TEST-AJ-DynamoDB-SingerExtracts-Role
INFO Starting sync.
INFO employeeAssessment: Starting sync
INFO Syncing full table for stream: employeeAssessment
INFO Scanning table dev_mes-employeeAssessment-table with params:
INFO    TableName = dev_mes-employeeAssessment-table
INFO    Limit = 1000
time=2020-09-04 19:08:35 name=target_snowflake level=INFO message=Getting catalog objects from table cache...
INFO employeeAssessment: Completed sync (17 rows)
INFO
+Sync Summary--------+--------------------+---------------+--------------------+
| table name         | replication method | total records | write speed        |
+--------------------+--------------------+---------------+--------------------+
| employeeAssessment | FULL_TABLE         | 17 records    | 7.5 records/second |
+--------------------+--------------------+---------------+--------------------+
INFO Done syncing.
time=2020-09-04 19:08:39 name=target_snowflake level=INFO message=Table 'RAW_MES."EMPLOYEEASSESSMENT"' exists
time=2020-09-04 19:08:39 name=target_snowflake level=INFO message=Table 'RAW_MES."EMPLOYEEASSESSMENT"' exists
time=2020-09-04 19:08:39 name=target_snowflake level=INFO message=Uploading 17 rows to external snowflake stage on S3
time=2020-09-04 19:08:39 name=target_snowflake level=INFO message=Target S3 bucket: dataplatformtest01-data-44635, local file: /tmp/records_zrcb4xpo.csv.gz, S3 
key: data/raw/me-slalom/employeeAssessment/v1/pipelinewise_employeeAssessment_20200904-190839-624640.csv.gz
time=2020-09-04 19:08:41 name=target_snowflake level=INFO message=Loading 17 rows into 'RAW_MES."EMPLOYEEASSESSMENT"'
time=2020-09-04 19:08:44 name=target_snowflake level=INFO message=Loading into RAW_MES."EMPLOYEEASSESSMENT": {"inserts": 17, "updates": 0, "size_bytes": 119}
time=2020-09-04 19:08:44 name=target_snowflake level=INFO message=Emitting state {"bookmarks": {"employeeAssessment": {"last_replication_method": "FULL_TABLE", 
"version": 1599271714573, "initial_full_table_complete": true, "success_timestamp": "2020-09-05T02:08:36.852091Z"}, "dev_mes-employeeAssessment-table": {"version": 1599260777218, "initial_full_table_complete": true, "success_timestamp": "2020-09-04T23:06:18.099907Z"}}, "currently_syncing": "employeeAssessment"}        
2020-09-04 19:08:44,961 - INFO - Completed running command: tap-dynamodb --config /mnt/c/Files/Source/slalom-data-platform-core/data/taps/.secrets/tmp/tap-me-slalom-config.json --catalog ./.output/taps/me-slalom-catalog/me-slalom-employeeAssessment-catalog.json --state /tmp/tmp_x2fk149/me-slalom-employeeAssessment-state.json | target-snowflake --config /mnt/c/Files/Source/slalom-data-platform-core/data/taps/.secrets/tmp/target-snowflake-config-employeeAssessment.json > /tmp/tmp_x2fk149/me-slalom-employeeAssessment-state-new.json (10s elapsed) (return_code=0)

For the above example, the catalog has been modified such that table_name='dev_mes-employeeAssessment-table' and tap_stream_id='employeeAssessment'.

@LucasZielke
Copy link

Id love to see this pushed up, because with my case I have issues where my targets having issues, seemingly attributed to a dynamo table name being extremely long

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants