Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[source-postgres] Airbyte Commits the Wrong LSN When Taking the Initial Snapshot #49803

Open
1 task
hagar-yasser1 opened this issue Dec 15, 2024 · 1 comment
Open
1 task
Labels
area/connectors Connector related issues community connectors/source/postgres team/db-dw-sources Backlog for Database and Data Warehouse Sources team type/bug Something isn't working

Comments

@hagar-yasser1
Copy link

hagar-yasser1 commented Dec 15, 2024

Connector Name

source-postgres

Connector Version

>= 3.6.18

What step the error happened?

None

Relevant information

Summary

Repeating failing pipelines or repeating full refreshing pipelines (Depending on the value for Invalid CDC position behavior Property) after 1st successfull initial sync that occur with the following conditions:

  1. DB is receiving traffic and the pg_current_wal_lsn() is always changing.
  2. Tables in the publication are NOT having high traffic.
  3. Tables outside the publication are having high traffic.

We traced back the issue to the source-postgres's PostgresCdcCtidInitializer committing the LSN bigger than the state saved offset.

Details

The identified behavior in a case of the first refresh run

  1. Get lsn1 pg_current_wal_lsn() to compute the desicion for isSavedOffsetAfterReplicaSlot
    final CdcState defaultCdcState = getDefaultCdcState(postgresDebeziumStateUtil, database);
  2. This should always be true in a refresh since the pg_current_wal_lsn() is always >= confirmed_flush_lsn
  3. Get lsn2 pg_current_wal_lsn() for the second time to use it in the rest of the flow as a Pseudo savedOffset
    final CdcState defaultCdcState = getDefaultCdcState(postgresDebeziumStateUtil, database);
  4. Get lsn3 pg_current_wal_lsn() for the third time to commit it before taking the snapshot
    final JsonNode initialDebeziumState = postgresDebeziumStateUtil.constructInitialDebeziumState(database,
  5. Get lsn4 pg_current_wal_lsn() for the fourth time to set it as the target lsn

The problem occurs if after taking the initial snapshot no incremental changes were found in the replicaslot for the given publication, as:

  • Incremental iterators finish without emitting any change
  • The final state is emitted with the sudo savedOffset -> lsn2
  • However we previously committed to lsn3
  • The next job starts with a state where savedOffset < confirmed_flush_lsn
  • Either a new refresh is triggered or the job fails depending on the value in Invalid CDC position behavior
  • relevant logs for the incremental iterators below
  • And the loop never ends until a change can be consumed from the replica slot after finishing the initial snapshot

Proposed Solutions

  • Only commiting the LSN that is going to be used as a Pseudo Saved Offset.

Relevant log output

2024-12-03 09:22:23 source > INFO pool-2-thread-1 i.d.c.c.BaseSourceTask(logStatistics):323 1 records sent during previous 00:00:10.827, last recorded offset of {server=airbyte_source_db1} partition is {lsn=2676916224, txId=798, ts_usec=1733217732116949}
2024-12-03 09:22:23 source > INFO pool-2-thread-1 i.a.c.i.d.AirbyteDebeziumHandler$CapacityReportingBlockingQueue(reportQueueUtilization):48 CDC events queue stats: size=0, cap=10000, puts=1, polls=0
2024-12-03 09:22:23 source > INFO main i.a.c.i.d.i.DebeziumRecordIterator(computeNext):87 CDC events queue poll(): blocked for PT10.940167461S in its first call.
2024-12-03 09:22:23 source > INFO main i.a.c.i.d.i.DebeziumRecordIterator(computeNext):140 CDC events queue poll(): returned a heartbeat event: progressing to 2676916224.
2024-12-03 09:22:33 source > INFO pool-2-thread-1 i.d.c.c.BaseSourceTask(logStatistics):323 1 records sent during previous 00:00:10.028, last recorded offset of {server=airbyte_source_db1} partition is {lsn=2676916224, txId=798, ts_usec=1733217732116949}
2024-12-03 09:22:33 source > INFO pool-2-thread-1 i.a.c.i.d.AirbyteDebeziumHandler$CapacityReportingBlockingQueue(reportQueueUtilization):48 CDC events queue stats: size=0, cap=10000, puts=2, polls=0
2024-12-03 09:22:33 source > INFO main i.a.c.i.d.i.DebeziumRecordIterator(computeNext):87 CDC events queue poll(): blocked for PT10.022176567S after its previous call which was also logged.
2024-12-03 09:22:33 source > INFO main i.a.c.i.d.i.DebeziumRecordIterator(computeNext):140 CDC events queue poll(): returned a heartbeat event: no progress since last heartbeat.
2024-12-03 09:22:43 source > INFO pool-2-thread-1 i.a.c.i.d.AirbyteDebeziumHandler$CapacityReportingBlockingQueue(reportQueueUtilization):48 CDC events queue stats: size=0, cap=10000, puts=3, polls=0
2024-12-03 09:22:43 source > INFO main i.a.c.i.d.i.DebeziumRecordIterator(computeNext):87 CDC events queue poll(): blocked for PT10.02126126S after its previous call which was also logged.
2024-12-03 09:22:43 source > INFO main i.a.c.i.d.i.DebeziumRecordIterator(computeNext):140 CDC events queue poll(): returned a heartbeat event: no progress since last heartbeat.
2024-12-03 09:22:53 source > INFO pool-2-thread-1 i.d.c.c.BaseSourceTask(logStatistics):323 2 records sent during previous 00:00:20.545, last recorded offset of {server=airbyte_source_db1} partition is {lsn=2676916224, txId=798, ts_usec=1733217732116949}
2024-12-03 09:22:53 source > INFO pool-2-thread-1 i.a.c.i.d.AirbyteDebeziumHandler$CapacityReportingBlockingQueue(reportQueueUtilization):48 CDC events queue stats: size=0, cap=10000, puts=4, polls=0
2024-12-03 09:22:53 source > INFO main i.a.c.i.d.i.DebeziumRecordIterator(computeNext):87 CDC events queue poll(): blocked for PT10.522634062S after its previous call which was also logged.
2024-12-03 09:22:53 source > INFO main i.a.c.i.d.i.DebeziumRecordIterator(computeNext):140 CDC events queue poll(): returned a heartbeat event: no progress since last heartbeat.
2024-12-03 09:23:04 source > INFO pool-2-thread-1 i.a.c.i.d.AirbyteDebeziumHandler$CapacityReportingBlockingQueue(reportQueueUtilization):48 CDC events queue stats: size=0, cap=10000, puts=5, polls=0
2024-12-03 09:23:04 source > INFO main i.a.c.i.d.i.DebeziumRecordIterator(computeNext):87 CDC events queue poll(): blocked for PT10.020020753S after its previous call which was also logged.
2024-12-03 09:23:04 source > INFO main i.a.c.i.d.i.DebeziumRecordIterator(computeNext):140 CDC events queue poll(): returned a heartbeat event: no progress since last heartbeat.
2024-12-03 09:23:14 source > INFO pool-2-thread-1 i.a.c.i.d.AirbyteDebeziumHandler$CapacityReportingBlockingQueue(reportQueueUtilization):48 CDC events queue stats: size=0, cap=10000, puts=6, polls=0
2024-12-03 09:23:14 source > INFO main i.a.c.i.d.i.DebeziumRecordIterator(computeNext):87 CDC events queue poll(): blocked for PT10.02051872S after its previous call which was also logged.
2024-12-03 09:23:14 source > INFO main i.a.c.i.d.i.DebeziumRecordIterator(computeNext):140 CDC events queue poll(): returned a heartbeat event: no progress since last heartbeat.
2024-12-03 09:23:21 destination > INFO pool-3-thread-1 i.a.c.i.d.a.b.BufferManager(printQueueInfo):94 [ASYNC QUEUE INFO] Global: max: 742.41 MB, allocated: 10 MB (10.0 MB), %% used: 0.013469714189502041 | State Manager memory usage: Allocated: 10 MB, Used: 0 bytes, percentage Used 0.0
2024-12-03 09:23:21 destination > INFO pool-6-thread-1 i.a.c.i.d.a.FlushWorkers(printWorkerInfo):127 [ASYNC WORKER INFO] Pool queue size: 0, Active threads: 0
2024-12-03 09:23:24 source > INFO pool-2-thread-1 i.a.c.i.d.AirbyteDebeziumHandler$CapacityReportingBlockingQueue(reportQueueUtilization):48 CDC events queue stats: size=0, cap=10000, puts=7, polls=0
2024-12-03 09:23:24 source > INFO main i.a.c.i.d.i.DebeziumRecordIterator(computeNext):87 CDC events queue poll(): blocked for PT10.020763797S after its previous call which was also logged.
2024-12-03 09:23:24 source > INFO main i.a.c.i.d.i.DebeziumRecordIterator(computeNext):140 CDC events queue poll(): returned a heartbeat event: no progress since last heartbeat.
2024-12-03 09:23:34 source > INFO pool-2-thread-1 i.d.c.c.BaseSourceTask(logStatistics):323 4 records sent during previous 00:00:40.085, last recorded offset of {server=airbyte_source_db1} partition is {lsn=2676916224, txId=798, ts_usec=1733217732116949}
2024-12-03 09:23:34 source > INFO pool-2-thread-1 i.a.c.i.d.AirbyteDebeziumHandler$CapacityReportingBlockingQueue(reportQueueUtilization):48 CDC events queue stats: size=0, cap=10000, puts=8, polls=0
2024-12-03 09:23:34 source > INFO main i.a.c.i.d.i.DebeziumRecordIterator(computeNext):87 CDC events queue poll(): blocked for PT10.021969193S after its previous call which was also logged.
2024-12-03 09:23:34 source > INFO main i.a.c.i.d.i.DebeziumRecordIterator(computeNext):140 CDC events queue poll(): returned a heartbeat event: no progress since last heartbeat.
2024-12-03 09:23:44 source > INFO pool-2-thread-1 i.a.c.i.d.AirbyteDebeziumHandler$CapacityReportingBlockingQueue(reportQueueUtilization):48 CDC events queue stats: size=0, cap=10000, puts=9, polls=0
2024-12-03 09:23:44 source > INFO main i.a.c.i.d.i.DebeziumRecordIterator(computeNext):87 CDC events queue poll(): blocked for PT10.521665265S after its previous call which was also logged.
2024-12-03 09:23:44 source > INFO main i.a.c.i.d.i.DebeziumRecordIterator(computeNext):140 CDC events queue poll(): returned a heartbeat event: no progress since last heartbeat.
2024-12-03 09:23:54 source > INFO pool-2-thread-1 i.a.c.i.d.AirbyteDebeziumHandler$CapacityReportingBlockingQueue(reportQueueUtilization):48 CDC events queue stats: size=0, cap=10000, puts=10, polls=0
2024-12-03 09:23:54 source > INFO main i.a.c.i.d.i.DebeziumRecordIterator(computeNext):87 CDC events queue poll(): blocked for PT10.020799778S after its previous call which was also logged.
2024-12-03 09:23:54 source > INFO main i.a.c.i.d.i.DebeziumRecordIterator(computeNext):140 CDC events queue poll(): returned a heartbeat event: no progress since last heartbeat.
2024-12-03 09:24:04 source > INFO pool-2-thread-1 i.a.c.i.d.AirbyteDebeziumHandler$CapacityReportingBlockingQueue(reportQueueUtilization):48 CDC events queue stats: size=0, cap=10000, puts=11, polls=0
2024-12-03 09:24:04 source > INFO main i.a.c.i.d.i.DebeziumRecordIterator(computeNext):87 CDC events queue poll(): blocked for PT10.020568545S after its previous call which was also logged.
2024-12-03 09:24:04 source > INFO main i.a.c.i.d.i.DebeziumRecordIterator(computeNext):140 CDC events queue poll(): returned a heartbeat event: no progress since last heartbeat.
2024-12-03 09:24:14 source > INFO pool-2-thread-1 i.a.c.i.d.AirbyteDebeziumHandler$CapacityReportingBlockingQueue(reportQueueUtilization):48 CDC events queue stats: size=0, cap=10000, puts=12, polls=0
2024-12-03 09:24:14 source > INFO main i.a.c.i.d.i.DebeziumRecordIterator(computeNext):87 CDC events queue poll(): blocked for PT10.020844521S after its previous call which was also logged.
2024-12-03 09:24:14 source > INFO main i.a.c.i.d.i.DebeziumRecordIterator(computeNext):140 CDC events queue poll(): returned a heartbeat event: no progress since last heartbeat.
2024-12-03 09:24:21 destination > INFO pool-3-thread-1 i.a.c.i.d.a.b.BufferManager(printQueueInfo):94 [ASYNC QUEUE INFO] Global: max: 742.41 MB, allocated: 10 MB (10.0 MB), %% used: 0.013469714189502041 | State Manager memory usage: Allocated: 10 MB, Used: 0 bytes, percentage Used 0.0
2024-12-03 09:24:21 destination > INFO pool-6-thread-1 i.a.c.i.d.a.FlushWorkers(printWorkerInfo):127 [ASYNC WORKER INFO] Pool queue size: 0, Active threads: 0
2024-12-03 09:24:24 source > INFO pool-2-thread-1 i.a.c.i.d.AirbyteDebeziumHandler$CapacityReportingBlockingQueue(reportQueueUtilization):48 CDC events queue stats: size=0, cap=10000, puts=13, polls=0
2024-12-03 09:24:24 source > INFO main i.a.c.i.d.i.DebeziumRecordIterator(computeNext):87 CDC events queue poll(): blocked for PT10.020634246S after its previous call which was also logged.
2024-12-03 09:24:24 source > INFO main i.a.c.i.d.i.DebeziumRecordIterator(computeNext):140 CDC events queue poll(): returned a heartbeat event: no progress since last heartbeat.
2024-12-03 09:24:34 source > INFO pool-2-thread-1 i.a.c.i.d.AirbyteDebeziumHandler$CapacityReportingBlockingQueue(reportQueueUtilization):48 CDC events queue stats: size=0, cap=10000, puts=14, polls=0
2024-12-03 09:24:34 source > INFO main i.a.c.i.d.i.DebeziumRecordIterator(computeNext):87 CDC events queue poll(): blocked for PT10.020611686S after its previous call which was also logged.
2024-12-03 09:24:34 source > INFO main i.a.c.i.d.i.DebeziumRecordIterator(computeNext):140 CDC events queue poll(): returned a heartbeat event: no progress since last heartbeat.
2024-12-03 09:24:45 source > INFO pool-2-thread-1 i.a.c.i.d.AirbyteDebeziumHandler$CapacityReportingBlockingQueue(reportQueueUtilization):48 CDC events queue stats: size=0, cap=10000, puts=15, polls=0
2024-12-03 09:24:45 source > INFO main i.a.c.i.d.i.DebeziumRecordIterator(computeNext):87 CDC events queue poll(): blocked for PT10.521634728S after its previous call which was also logged.
2024-12-03 09:24:45 source > INFO main i.a.c.i.d.i.DebeziumRecordIterator(computeNext):140 CDC events queue poll(): returned a heartbeat event: no progress since last heartbeat.
2024-12-03 09:24:55 source > INFO pool-2-thread-1 i.d.c.c.BaseSourceTask(logStatistics):323 8 records sent during previous 00:01:21.171, last recorded offset of {server=airbyte_source_db1} partition is {lsn=2676916224, txId=798, ts_usec=1733217732116949}
2024-12-03 09:24:55 source > INFO pool-2-thread-1 i.a.c.i.d.AirbyteDebeziumHandler$CapacityReportingBlockingQueue(reportQueueUtilization):48 CDC events queue stats: size=0, cap=10000, puts=16, polls=0
2024-12-03 09:24:55 source > INFO main i.a.c.i.d.i.DebeziumRecordIterator(computeNext):87 CDC events queue poll(): blocked for PT10.020966794S after its previous call which was also logged.
2024-12-03 09:24:55 source > INFO main i.a.c.i.d.i.DebeziumRecordIterator(computeNext):140 CDC events queue poll(): returned a heartbeat event: no progress since last heartbeat.
2024-12-03 09:25:05 source > INFO pool-2-thread-1 i.a.c.i.d.AirbyteDebeziumHandler$CapacityReportingBlockingQueue(reportQueueUtilization):48 CDC events queue stats: size=0, cap=10000, puts=17, polls=0
2024-12-03 09:25:05 source > INFO main i.a.c.i.d.i.DebeziumRecordIterator(computeNext):87 CDC events queue poll(): blocked for PT10.02077295S after its previous call which was also logged.
2024-12-03 09:25:05 source > INFO main i.a.c.i.d.i.DebeziumRecordIterator(computeNext):140 CDC events queue poll(): returned a heartbeat event: no progress since last heartbeat.
2024-12-03 09:25:15 source > INFO pool-2-thread-1 i.a.c.i.d.AirbyteDebeziumHandler$CapacityReportingBlockingQueue(reportQueueUtilization):48 CDC events queue stats: size=0, cap=10000, puts=18, polls=0
2024-12-03 09:25:15 source > INFO main i.a.c.i.d.i.DebeziumRecordIterator(computeNext):87 CDC events queue poll(): blocked for PT10.020546589S after its previous call which was also logged.
2024-12-03 09:25:15 source > INFO main i.a.c.i.d.i.DebeziumRecordIterator(computeNext):140 CDC events queue poll(): returned a heartbeat event: no progress since last heartbeat.
2024-12-03 09:25:21 destination > INFO pool-3-thread-1 i.a.c.i.d.a.b.BufferManager(printQueueInfo):94 [ASYNC QUEUE INFO] Global: max: 742.41 MB, allocated: 10 MB (10.0 MB), %% used: 0.013469714189502041 | State Manager memory usage: Allocated: 10 MB, Used: 0 bytes, percentage Used 0.0
2024-12-03 09:25:21 destination > INFO pool-6-thread-1 i.a.c.i.d.a.FlushWorkers(printWorkerInfo):127 [ASYNC WORKER INFO] Pool queue size: 0, Active threads: 0
2024-12-03 09:25:25 source > INFO pool-2-thread-1 i.a.c.i.d.AirbyteDebeziumHandler$CapacityReportingBlockingQueue(reportQueueUtilization):48 CDC events queue stats: size=0, cap=10000, puts=19, polls=0
2024-12-03 09:25:25 source > INFO main i.a.c.i.d.i.DebeziumRecordIterator(computeNext):87 CDC events queue poll(): blocked for PT10.020643155S after its previous call which was also logged.
2024-12-03 09:25:25 source > INFO main i.a.c.i.d.i.DebeziumRecordIterator(computeNext):140 CDC events queue poll(): returned a heartbeat event: no progress since last heartbeat.
2024-12-03 09:25:25 source > INFO main i.a.c.i.d.i.DebeziumRecordIterator(requestClose):276 Closing: Heartbeat indicates sync is not progressing
2024-12-03 09:25:25 platform > SOURCE analytics [airbyte/source-postgres:3.6.18] | Type: db-sources-debezium-close-reason | Value: HEARTBEAT_NOT_PROGRESSING
2024-12-03 09:25:25 source > INFO main i.d.e.EmbeddedEngine(stop):957 Stopping the embedded engine
2024-12-03 09:25:25 source > INFO main i.d.e.EmbeddedEngine(stop):964 Waiting for PT5M for connector to stop
2024-12-03 09:25:25 source > INFO pool-2-thread-1 i.d.e.EmbeddedEngine(stopTaskAndCommitOffset):765 Stopping the task and engine
2024-12-03 09:25:25 source > INFO pool-2-thread-1 i.d.c.c.BaseSourceTask(stop):406 Stopping down connector
2024-12-03 09:25:26 source > INFO pool-6-thread-1 i.d.j.JdbcConnection(lambda$doClose$4):952 Connection gracefully closed
2024-12-03 09:25:26 source > INFO pool-7-thread-1 i.d.j.JdbcConnection(lambda$doClose$4):952 Connection gracefully closed
2024-12-03 09:25:26 source > INFO debezium-postgresconnector-airbyte_source_db1-change-event-source-coordinator i.d.p.ChangeEventSourceCoordinator(streamEvents):281 Finished streaming
2024-12-03 09:25:26 source > INFO debezium-postgresconnector-airbyte_source_db1-change-event-source-coordinator i.d.p.ChangeEventSourceCoordinator(streamingConnected):433 Connected metrics set to 'false'
2024-12-03 09:25:26 source > INFO pool-2-thread-1 i.d.p.s.SignalProcessor(stop):127 SignalProcessor stopped
2024-12-03 09:25:26 source > INFO pool-2-thread-1 i.d.s.DefaultServiceRegistry(close):105 Debezium ServiceRegistry stopped.
2024-12-03 09:25:26 source > INFO pool-8-thread-1 i.d.j.JdbcConnection(lambda$doClose$4):952 Connection gracefully closed
2024-12-03 09:25:26 source > INFO pool-2-thread-1 i.a.c.i.d.i.DebeziumRecordPublisher$start$3(taskStopped):91 DebeziumEngine notify: task stopped
2024-12-03 09:25:26 source > INFO pool-2-thread-1 o.a.k.c.s.FileOffsetBackingStore(stop):71 Stopped FileOffsetBackingStore
2024-12-03 09:25:26 source > INFO pool-2-thread-1 i.a.c.i.d.i.DebeziumRecordPublisher$start$3(connectorStopped):83 DebeziumEngine notify: connector stopped
2024-12-03 09:25:26 source > INFO pool-2-thread-1 i.a.c.i.d.i.DebeziumRecordPublisher(start$lambda$1):60 Debezium engine shutdown. Engine terminated successfully : true
2024-12-03 09:25:26 source > INFO pool-2-thread-1 i.a.c.i.d.i.DebeziumRecordPublisher(start$lambda$1):63 Connector 'io.debezium.connector.postgresql.PostgresConnector' completed normally.
2024-12-03 09:25:26 source > INFO main i.a.i.s.p.c.PostgresCdcStateHandler(saveState):38 debezium state: {"[\"airbyte_source_db1\",{\"server\":\"airbyte_source_db1\"}]":"{\"transaction_id\":null,\"lsn\":2676916224,\"txId\":798,\"ts_usec\":1733217732116949}"}
2024-12-03 09:25:26 source > INFO main i.a.c.i.s.r.s.SourceStateIterator(computeNext):84 sending final state message, with count per stream: {} 
2024-12-03 09:25:26 platform > Stream status TRACE received of status: COMPLETE for stream public:source_table1

Contribute

  • Yes, I want to contribute
@marcosmarxm
Copy link
Member

Thanks for reporting the issue @hagar-yasser1 I asked to the team take a look and return with updates.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area/connectors Connector related issues community connectors/source/postgres team/db-dw-sources Backlog for Database and Data Warehouse Sources team type/bug Something isn't working
Projects
None yet
Development

No branches or pull requests

3 participants