From f0495a6082381478b86a80706fa7bce1efa6dd7f Mon Sep 17 00:00:00 2001 From: ismail simsek <6005685+ismailsimsek@users.noreply.github.com> Date: Sun, 16 Jun 2024 12:11:20 +0200 Subject: [PATCH] Document new features and improve documentation (#357) * Document new feature and update documentation * Document new feature and update documentation * Document new feature and update documentation --- .../server/iceberg/IcebergChangeEvent.java | 5 +- docs/CAVEATS.md | 3 +- docs/DOCS.md | 125 +++++++++++------- 3 files changed, 83 insertions(+), 50 deletions(-) diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeEvent.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeEvent.java index cb3014d2..f1568d73 100644 --- a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeEvent.java +++ b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeEvent.java @@ -333,8 +333,9 @@ private Schema icebergSchema(boolean createIdentifierFields) { // due to inconsistency in the after and before fields. // For insert events, only the `before` field is NULL, while for delete events after field is NULL. // This inconsistency prevents using either field as a reliable key. - throw new DebeziumException("Events are unnested, Identifier fields are not supported for unnested events! " + - "Pleas make sure you are using event flattening SMT! or disable identifier field creation!"); + throw new DebeziumException("Debezium events are unnested, Identifier fields are not supported for unnested events! " + + "Pleas enable event flattening SMT see: https://debezium.io/documentation/reference/stable/transformations/event-flattening.html " + + " Or disable identifier field creation `debezium.sink.iceberg.create-identifier-fields=false`"); } } else { icebergSchemaFields(valueSchema, keySchema, schemaData); diff --git a/docs/CAVEATS.md b/docs/CAVEATS.md index c09ca5da..9dd3ef61 100644 --- a/docs/CAVEATS.md +++ b/docs/CAVEATS.md @@ -7,7 +7,8 @@ instead of rewrite data files) ## No automatic schema evolution -Full schema evaluation is not supported. But sema expansion like field addition is supported, +Full schema evaluation is not supported, like converting incompatible types. But sema expansion like field addition is +supported, see `debezium.sink.iceberg.allow-field-addition` setting. ## Specific tables replication diff --git a/docs/DOCS.md b/docs/DOCS.md index ef13e0b1..5cca8a72 100644 --- a/docs/DOCS.md +++ b/docs/DOCS.md @@ -15,28 +15,29 @@ tables created automatically with the first start. #### Configuration properties -| Config | Default | Description | -|------------------------------------------------------------------------------------|---------------------------------------------------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| -| `debezium.sink.iceberg.warehouse` | | Root path of the Iceberg data warehouse | -| `debezium.sink.iceberg.catalog-name` | `default` | User-specified Iceberg catalog name. | -| `debezium.sink.iceberg.table-namespace` | `default` | A namespace in the catalog. ex: `SELECT * FROM prod.db.table -- catalog: prod, namespace: db, table: table` | -| `debezium.sink.iceberg.table-prefix` | `` | Iceberg table name prefix, Added to destination iceberg table names. | -| `debezium.sink.iceberg.write.format.default` | `parquet` | Default file format for the table; `parquet`, `avro`, or `orc` | -| `debezium.sink.iceberg.allow-field-addition` | `true` | Allow field addition to target tables | -| `debezium.sink.iceberg.upsert` | `true` | Running upsert mode overwriting updated rows. explained below. | -| `debezium.sink.iceberg.upsert-keep-deletes` | `true` | When running with upsert mode, keeps deleted rows in target table. | -| `debezium.sink.iceberg.upsert-dedup-column` | `__source_ts_ms` | With upsert mode used to deduplicate data. row with highest `__source_ts_ms` kept. _dont change!_ | -| `debezium.sink.iceberg.upsert-op-column` | `__op` | Used with upsert mode. _dont change!_ | -| `debezium.sink.iceberg.destination-regexp` | `` | Regexp to modify destination table. With this its possible to map `table_ptt1`,`table_ptt2` to `table_combined`. | -| `debezium.sink.iceberg.destination-regexp-replace` | `` | Regexp Replace part to modify destination table | -| `debezium.sink.batch.batch-size-wait` | `NoBatchSizeWait` | Batch size wait strategy to optimize data files and upload interval. explained below. | -| `debezium.sink.iceberg.{iceberg.prop.name}` | | [Iceberg config](https://iceberg.apache.org/docs/latest/configuration/) passed to Iceberg | -| `debezium.source.offset.storage` | `io.debezium.server.iceberg.offset.IcebergOffsetBackingStore` | The name of the Java class that is responsible for persistence of connector offsets. see [debezium doc](https://debezium.io/documentation/reference/stable/development/engine.html#advanced-consuming) | -| `debezium.source.offset.storage.iceberg.table-name` | `debezium_offset_storage` | Destination table name to store connector offsets. | -| `debezium.source.schema.history.internal` | `io.debezium.server.iceberg.history.IcebergSchemaHistory` | The name of the Java class that is responsible for persistence of the database schema history. see [debezium doc](https://debezium.io/documentation/reference/stable/development/engine.html#database-history-properties) | -| `debezium.source.schema.history.internal.iceberg.table-name` | `debezium_schema_history_storage` | Destination table name to store database schema history. | - -### Upsert +| Config | Default | Description | +|--------------------------------------------------------------|---------------------------------------------------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| `debezium.sink.iceberg.warehouse` | | Root path of the Iceberg data warehouse | +| `debezium.sink.iceberg.catalog-name` | `default` | User-specified Iceberg catalog name. | +| `debezium.sink.iceberg.table-namespace` | `default` | A namespace in the catalog. ex: `SELECT * FROM prod.db.table -- catalog: prod, namespace: db, table: table` | +| `debezium.sink.iceberg.table-prefix` | `` | Iceberg table name prefix, prefix added to iceberg table names. | +| `debezium.sink.iceberg.write.format.default` | `parquet` | Default file format for the table; `parquet`, `avro`, or `orc` | +| `debezium.sink.iceberg.allow-field-addition` | `true` | Allow field addition to target tables. Enables automatic schema expansion. | +| `debezium.sink.iceberg.upsert` | `true` | Running consumer in upsert mode, overwriting updated rows. explained below. | +| `debezium.sink.iceberg.upsert-keep-deletes` | `true` | When running with upsert mode, keeps deleted rows in target table (soft delete). | +| `debezium.sink.iceberg.upsert-dedup-column` | `__source_ts_ms` | With upsert mode used to deduplicate data. row with highest `__source_ts_ms` kept(last change event). _dont change!_ | +| `debezium.sink.iceberg.upsert-op-column` | `__op` | Used with upsert mode to determine event priority(order). _dont change!_ | +| `debezium.sink.iceberg.create-identifier-fields` | `true` | When set to false the consumer will create tables without identifier fields. useful when user wants to consume nested events with append only mode. | +| `debezium.sink.iceberg.destination-regexp` | `` | Regexp to modify destination iceberg table name. For example with this setting, its possible to combine some tables `table_ptt1`,`table_ptt2` to one `table_combined`. | +| `debezium.sink.iceberg.destination-regexp-replace` | `` | Regexp replace part to modify destination iceberg table name | +| `debezium.sink.batch.batch-size-wait` | `NoBatchSizeWait` | Batch size wait strategy, Used to optimize data file size and upload interval. explained below. | +| `debezium.sink.iceberg.{iceberg.prop.name}` | | [Iceberg config](https://iceberg.apache.org/docs/latest/configuration/) this settings are passed to Iceberg (without the prefix) | +| `debezium.source.offset.storage` | `io.debezium.server.iceberg.offset.IcebergOffsetBackingStore` | The name of the Java class that is responsible for persistence of connector offsets. see [debezium doc](https://debezium.io/documentation/reference/stable/development/engine.html#advanced-consuming) | +| `debezium.source.offset.storage.iceberg.table-name` | `debezium_offset_storage` | Destination table name to store connector offsets. | +| `debezium.source.schema.history.internal` | `io.debezium.server.iceberg.history.IcebergSchemaHistory` | The name of the Java class that is responsible for persistence of the database schema history. see [debezium doc](https://debezium.io/documentation/reference/stable/development/engine.html#database-history-properties) | +| `debezium.source.schema.history.internal.iceberg.table-name` | `debezium_schema_history_storage` | Destination table name to store database schema history. | + +### Upsert Mode By default, Iceberg consumer is running with upsert mode `debezium.sink.iceberg.upsert=true`. Upsert mode uses source Primary Key and does upsert on target table(delete followed by insert). For the tables without @@ -53,23 +54,23 @@ Operation type priorities are `{"c":1, "r":2, "u":3, "d":4}`. When two records w values received then the record with higher `__op` priority is kept and added to destination table and duplicate record is dropped from the batch. -### Append - -Setting `debezium.sink.iceberg.upsert=false` will set the operation mode to append. With append mode data deduplication -is not done and all received records are appended to destination table. -Note: For the tables without primary key operation mode falls back to append even upsert mode is used. - -#### Keeping Deleted Records +#### Upsert Mode, Keeping Deleted Records By default `debezium.sink.iceberg.upsert-keep-deletes=true` keeps deletes in the Iceberg table, setting it to false will remove deleted records from the destination Iceberg table too. With this config it's possible to keep last version of a deleted record in the destination Iceberg table(doing soft delete for this records `__deleted` is set to `true`). +### Append Mode + +Setting `debezium.sink.iceberg.upsert=false` will set the operation mode to append. With append mode data deduplication +is not done and all received records are appended to destination table. +Note: For the tables without primary key operation mode falls back to append even upsert mode is used. + ### Optimizing batch size (or commit interval) Debezium extracts database events in real time and this could cause too frequent commits and too many small files. Which -is not optimal for batch processing especially when near realtime data feed is sufficient. +is not optimal for performance especially when near realtime data feed is sufficient. To avoid this problem following batch-size-wait classes are available to adjust batch size and interval. Batch size wait adds delay between consumer calls to increase total number of events consumed per call. Meanwhile, @@ -84,7 +85,7 @@ This is default configuration by default consumer will not use any wait. All the #### MaxBatchSizeWait MaxBatchSizeWait uses debezium metrics to optimize batch size. -MaxBatchSizeWait periodically reads streaming queue current size and waits until it reaches to `max.batch.size`. +MaxBatchSizeWait periodically checks streaming queue size and waits until it reaches to `max.batch.size`. Maximum wait and check intervals are controlled by `debezium.sink.batch.batch-size-wait.max-wait-ms`, `debezium.sink.batch.batch-size-wait.wait-interval-ms` properties. @@ -114,7 +115,7 @@ debezium.sink.iceberg.table-prefix=cdc_ With above config database table = `inventory.customers` is replicated to `default.testc_cdc_inventory_customers` -## IcebergOffsetBackingStore Offset Storage +## Debezium Offset Storage This implementation saves CDC offset to an iceberg table. Debezium keeps source offset to track binlog position. @@ -123,7 +124,7 @@ debezium.source.offset.storage=io.debezium.server.iceberg.offset.IcebergOffsetBa debezium.source.offset.storage.iceberg.table-name=debezium_offset_storage_table ``` -## IcebergSchemaHistory Database History Storage +## Debezium Database History Storage This implementation saves database history to an iceberg table. @@ -134,7 +135,11 @@ debezium.source.database.history.iceberg.table-name=debezium_database_history_st ## Debezium Event Flattening -Iceberg consumer requires event flattening. This is required/mandatory configuration. For further details on `Message transformations` please see [debezium doc](https://debezium.io/documentation/reference/stable/development/engine.html#engine-message-transformations) +For best experience its recommended to run consumer with event flattening. For further details +on `Message transformations` please +see [debezium doc](https://debezium.io/documentation/reference/stable/development/engine.html#engine-message-transformations) + +Example Event flattening setting: ```properties debezium.transforms=unwrap @@ -144,6 +149,16 @@ debezium.transforms.unwrap.add.headers=db debezium.transforms.unwrap.delete.handling.mode=rewrite ``` +Without event flattening iceberg consumer can only run with append mode. Without event flattening upsert mode and +creation of identifier fields are not supported. + +Settings for running consumer without event flattening: + +``` +debezium.sink.iceberg.upsert=false +debezium.sink.iceberg.create-identifier-fields=false +``` + ### Configuring iceberg All the properties starting with `debezium.sink.iceberg.__ICEBERG_CONFIG__` are passed to Iceberg, and to hadoopConf @@ -159,34 +174,50 @@ Read [application.properties.example](..%2Fdebezium-server-iceberg-dist%2Fsrc%2F ## Schema Change Behaviour It is possible to get out of sync schemas between source and target tables. For Example when the source database change -its schema, adds or drops field. Below possible schema changes and current behavior of the Iceberg consumer id -documented. +its schema, adds or drops field. Below possible schema changes and current behavior documented. **NOTE**: Full schema evaluation is not supported. But sema expansion like field addition is supported, see `debezium.sink.iceberg.allow-field-addition` setting. #### Adding new column to source (A column missing in destination iceberg table) -When `debezium.sink.iceberg.allow-field-addition` is `false` data of the new column is ignored till the column added to +###### When `debezium.sink.iceberg.allow-field-addition` is `false` + +Data of the new column is ignored till the column manually added to destination iceberg table. -For example: if a column not found in iceberg table its data ignored and not copied to target! -once iceberg table adds same column then data for this column recognized and populated for the new data. +For example: if a column not found in iceberg table its data ignored and not copied to target! After the column added to +table data for this column recognized and populated for the new events. + +###### When `debezium.sink.iceberg.allow-field-addition` is `true` + +consumer will add the new columns to destination table and start populating the data for the new columns. This is +automatically done no action is necessary. #### Removing column from source (An extra column in iceberg table) -These column values are populated with null value for the new data. +These column values are populated with null value for the new data. No change applied to destination table. #### Renaming column in source -This is combination of above two cases : old column will be populated with null values and new column will not be -recognized and populated till it's added to iceberg table +This is combination of above two cases : old column will be populated with null values and new column will be populated +when added to iceberg table(added automatically consumer or added manually by user) #### Different Data Types -This is the scenario when source field type and target Iceberg field type are different. In this case consumer converts -source field value to destination type value. Conversion is done by jackson If representation cannot be converted to -destination type then default value is returned! +This is the scenario when source field type changes. + +###### When `debezium.sink.iceberg.allow-field-addition` is `true`: + +In this cae consumer will adapt destination table type automatically. +For incompatible changes consumer will throw exception. +For example float to integer conversion is not supported but int to double conversion is supported. + +###### When `debezium.sink.iceberg.allow-field-addition` is `false`: + +In this case consumer will convert source field value to destination type value. Conversion is done by jackson If +representation cannot be converted to +destination type then default value is returned by jackson! for example this is conversion rule for Long type: @@ -196,7 +227,7 @@ If representation cannot be converted to a long (including structured types like ## `icebergevents` Consumer -This consumer appends CDC events to single Iceberg table as json string. +This consumer appends all CDC events to single Iceberg table as json string. This table partitioned by `event_destination,event_sink_timestamptz` ````properties @@ -204,7 +235,7 @@ debezium.sink.type=icebergevents debezium.sink.iceberg.catalog-name=default ```` -Destination table definition: +Iceberg table definition: ```java static final String TABLE_NAME = "debezium_events";