From 664087508868d463dcf51abc1daf8aa44a187ac4 Mon Sep 17 00:00:00 2001 From: Zike Yang Date: Tue, 16 Jan 2024 08:39:22 +0800 Subject: [PATCH] [feat][Proposal-2] Support including the topic name to the metadata(#836) (#862) ### Motivation Currently, we don't include the topic name to the metadata. And there is not intuitive workaround for it. This proposal introduces a new configuration `includeTopicName` to the Cloud Storage sink connector. When activated(`true`), the connector will include the topic name to the metadata in the sink file. ### Modifications Introduce a new configuration `includeTopicToMetadata` to support including the Pulsar topic name into the metadata. The new data format of the cloud storage format would be like: ```json { "key": "value", "__message_metadata__": { "messageId": "CAgQADAA", "topic": "persistent://public/default/test-s3", "properties": {} } } ``` A new key, `topic`, would be added to the metadata, containing the Pulsar topic name. (cherry picked from commit b2b28ddc60c83b69421bd8e03fe9524c61deb2b4) --- .../io/jcloud/BlobStoreAbstractConfig.java | 1 + .../pulsar/io/jcloud/format/AvroFormat.java | 6 ++-- .../pulsar/io/jcloud/format/JsonFormat.java | 5 ++- .../io/jcloud/format/ParquetFormat.java | 8 +++-- .../pulsar/io/jcloud/util/MetadataUtil.java | 33 ++++++++++++++----- .../pulsar/io/jcloud/MetadataUtilTest.java | 9 +++-- 6 files changed, 45 insertions(+), 17 deletions(-) diff --git a/src/main/java/org/apache/pulsar/io/jcloud/BlobStoreAbstractConfig.java b/src/main/java/org/apache/pulsar/io/jcloud/BlobStoreAbstractConfig.java index 2fbda4e5..80d5c64b 100644 --- a/src/main/java/org/apache/pulsar/io/jcloud/BlobStoreAbstractConfig.java +++ b/src/main/java/org/apache/pulsar/io/jcloud/BlobStoreAbstractConfig.java @@ -114,6 +114,7 @@ public class BlobStoreAbstractConfig implements Serializable { private boolean withMetadata; private boolean useHumanReadableMessageId; private boolean useHumanReadableSchemaVersion; + private boolean includeTopicToMetadata; private boolean withTopicPartitionNumber = true; private String bytesFormatTypeSeparator = "0x10"; private boolean skipFailedMessages = false; diff --git a/src/main/java/org/apache/pulsar/io/jcloud/format/AvroFormat.java b/src/main/java/org/apache/pulsar/io/jcloud/format/AvroFormat.java index 3e4a615d..74bb7301 100644 --- a/src/main/java/org/apache/pulsar/io/jcloud/format/AvroFormat.java +++ b/src/main/java/org/apache/pulsar/io/jcloud/format/AvroFormat.java @@ -53,6 +53,7 @@ public class AvroFormat implements Format , InitConfiguration schema rootAvroSchema = AvroRecordUtil.convertToAvroSchema(schema); if (useMetadata){ rootAvroSchema = MetadataUtil.setMetadataSchema(rootAvroSchema, - useHumanReadableMessageId, useHumanReadableSchemaVersion); + useHumanReadableMessageId, useHumanReadableSchemaVersion, includeTopicToMetadata); } LOGGER.debug("Using avro schema: {}", rootAvroSchema); @@ -124,7 +126,7 @@ public ByteBuffer recordWriterBuf(Iterator> records) throw if (useMetadata) { org.apache.avro.generic.GenericRecord metadataRecord = MetadataUtil.extractedMetadataRecord(next, - useHumanReadableMessageId, useHumanReadableSchemaVersion); + useHumanReadableMessageId, useHumanReadableSchemaVersion, includeTopicToMetadata); writeRecord.put(MetadataUtil.MESSAGE_METADATA_KEY, metadataRecord); } fileWriter.append(writeRecord); diff --git a/src/main/java/org/apache/pulsar/io/jcloud/format/JsonFormat.java b/src/main/java/org/apache/pulsar/io/jcloud/format/JsonFormat.java index 4e7adce6..e310028d 100644 --- a/src/main/java/org/apache/pulsar/io/jcloud/format/JsonFormat.java +++ b/src/main/java/org/apache/pulsar/io/jcloud/format/JsonFormat.java @@ -68,6 +68,7 @@ public class JsonFormat implements Format, InitConfiguration> record) throws Map writeValue = convertRecordToObject(next.getValue(), schema); if (useMetadata) { writeValue.put(MetadataUtil.MESSAGE_METADATA_KEY, - MetadataUtil.extractedMetadata(next, useHumanReadableMessageId, useHumanReadableSchemaVersion)); + MetadataUtil.extractedMetadata(next, useHumanReadableMessageId, useHumanReadableSchemaVersion, + includeTopicToMetadata)); } String recordAsString = JSON_MAPPER.get().writeValueAsString(writeValue); stringBuilder.append(recordAsString).append("\n"); diff --git a/src/main/java/org/apache/pulsar/io/jcloud/format/ParquetFormat.java b/src/main/java/org/apache/pulsar/io/jcloud/format/ParquetFormat.java index a33806fd..93efd12e 100644 --- a/src/main/java/org/apache/pulsar/io/jcloud/format/ParquetFormat.java +++ b/src/main/java/org/apache/pulsar/io/jcloud/format/ParquetFormat.java @@ -63,6 +63,7 @@ public class ParquetFormat implements Format, InitConfiguration schema rootAvroSchema = AvroRecordUtil.convertToAvroSchema(schema); if (useMetadata) { rootAvroSchema = MetadataUtil.setMetadataSchema(rootAvroSchema, - useHumanReadableMessageId, useHumanReadableSchemaVersion); + useHumanReadableMessageId, useHumanReadableSchemaVersion, includeTopicToMetadata); } log.info("Using avro schema: {}", rootAvroSchema); } @@ -268,7 +270,9 @@ public ByteBuffer recordWriterBuf(Iterator> records) throw if (useMetadata) { org.apache.avro.generic.GenericRecord metadataRecord = MetadataUtil.extractedMetadataRecord(next, - useHumanReadableMessageId, useHumanReadableSchemaVersion); + useHumanReadableMessageId, + useHumanReadableSchemaVersion, + includeTopicToMetadata); writeRecord.put(MESSAGE_METADATA_KEY, metadataRecord); } if (parquetWriter != null) { diff --git a/src/main/java/org/apache/pulsar/io/jcloud/util/MetadataUtil.java b/src/main/java/org/apache/pulsar/io/jcloud/util/MetadataUtil.java index b92f0834..a46b9a89 100644 --- a/src/main/java/org/apache/pulsar/io/jcloud/util/MetadataUtil.java +++ b/src/main/java/org/apache/pulsar/io/jcloud/util/MetadataUtil.java @@ -41,6 +41,7 @@ public class MetadataUtil { public static final String METADATA_PROPERTIES_KEY = "properties"; public static final String METADATA_SCHEMA_VERSION_KEY = "schemaVersion"; public static final String METADATA_MESSAGE_ID_KEY = "messageId"; + public static final String METADATA_TOPIC = "topic"; public static final String MESSAGE_METADATA_KEY = "__message_metadata__"; public static final String MESSAGE_METADATA_NAME = "messageMetadata"; public static final Schema MESSAGE_METADATA = buildMetadataSchema(); @@ -49,11 +50,12 @@ public class MetadataUtil { public static org.apache.avro.generic.GenericRecord extractedMetadataRecord(Record next, boolean useHumanReadableMessageId, - boolean useHumanReadableSchemaVersion) { + boolean useHumanReadableSchemaVersion, + boolean includeTopicToMetadata) { final Message message = next.getMessage().get(); GenericData.Record record = new GenericData.Record(buildMetadataSchema( - useHumanReadableMessageId, useHumanReadableSchemaVersion)); + useHumanReadableMessageId, useHumanReadableSchemaVersion, includeTopicToMetadata)); record.put(METADATA_PROPERTIES_KEY, message.getProperties()); if (useHumanReadableSchemaVersion) { record.put(METADATA_SCHEMA_VERSION_KEY, @@ -66,20 +68,24 @@ public static org.apache.avro.generic.GenericRecord extractedMetadataRecord(Reco } else { record.put(METADATA_MESSAGE_ID_KEY, ByteBuffer.wrap(message.getMessageId().toByteArray())); } + if (includeTopicToMetadata) { + record.put(METADATA_TOPIC, message.getTopicName()); + } return record; } public static org.apache.avro.generic.GenericRecord extractedMetadataRecord(Record next) { - return extractedMetadataRecord(next, false, false); + return extractedMetadataRecord(next, false, false, false); } public static Map extractedMetadata(Record next) { - return extractedMetadata(next, false, false); + return extractedMetadata(next, false, false, false); } public static Map extractedMetadata(Record next, boolean useHumanReadableMessageId, - boolean useHumanReadableSchemaVersion) { + boolean useHumanReadableSchemaVersion, + boolean includeTopicToMetadata) { Map metadata = new HashMap<>(); final Message message = next.getMessage().get(); metadata.put(METADATA_PROPERTIES_KEY, message.getProperties()); @@ -94,6 +100,9 @@ public static Map extractedMetadata(Record next, } else { metadata.put(METADATA_MESSAGE_ID_KEY, ByteBuffer.wrap(message.getMessageId().toByteArray())); } + if (includeTopicToMetadata) { + metadata.put(METADATA_TOPIC, message.getTopicName()); + } return metadata; } @@ -114,14 +123,15 @@ public static Schema setMetadataSchema(Schema schema) { public static Schema setMetadataSchema(Schema schema, boolean useHumanReadableMessageId, - boolean useHumanReadableSchemaVersion) { + boolean useHumanReadableSchemaVersion, + boolean includeTopicToMetadata) { final List fieldWithMetadata = schemaFieldThreadLocal.get(); fieldWithMetadata.clear(); schema.getFields().forEach(f -> { fieldWithMetadata.add(new Schema.Field(f, f.schema())); }); fieldWithMetadata.add(new Schema.Field(MESSAGE_METADATA_KEY, buildMetadataSchema( - useHumanReadableMessageId, useHumanReadableSchemaVersion))); + useHumanReadableMessageId, useHumanReadableSchemaVersion, includeTopicToMetadata))); return Schema.createRecord(schema.getName(), schema.getDoc(), schema.getNamespace(), @@ -131,11 +141,12 @@ public static Schema setMetadataSchema(Schema schema, } private static Schema buildMetadataSchema(){ - return buildMetadataSchema(false, false); + return buildMetadataSchema(false, false, false); } private static Schema buildMetadataSchema(boolean useHumanReadableMessageId, - boolean useHumanReadableSchemaVersion) { + boolean useHumanReadableSchemaVersion, + boolean includeTopicToMetadata) { List fields = new ArrayList<>(); fields.add(new Schema.Field(METADATA_PROPERTIES_KEY, Schema.createUnion( @@ -157,6 +168,10 @@ private static Schema buildMetadataSchema(boolean useHumanReadableMessageId, fields.add(new Schema.Field(METADATA_MESSAGE_ID_KEY, Schema.createUnion( Schema.create(Schema.Type.NULL), Schema.create(Schema.Type.BYTES)))); } + if (includeTopicToMetadata) { + fields.add(new Schema.Field(METADATA_TOPIC, Schema.createUnion( + Schema.create(Schema.Type.NULL), Schema.create(Schema.Type.STRING)))); + } return Schema.createRecord(MESSAGE_METADATA_NAME, null, null, diff --git a/src/test/java/org/apache/pulsar/io/jcloud/MetadataUtilTest.java b/src/test/java/org/apache/pulsar/io/jcloud/MetadataUtilTest.java index 464f5906..0c4f114a 100644 --- a/src/test/java/org/apache/pulsar/io/jcloud/MetadataUtilTest.java +++ b/src/test/java/org/apache/pulsar/io/jcloud/MetadataUtilTest.java @@ -44,6 +44,7 @@ public void testExtractedMetadata() throws IOException { String messageIdString = "1:2:3:4"; byte[] schemaVersionBytes = new byte[]{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x0A}; byte[] messageIdBytes = new byte[]{0x00, 0x01, 0x02, 0x03}; + String topicName = "test-topic"; Record mockRecord = mock(Record.class); Message mockMessage = mock(Message.class); MessageId mockMessageId = mock(MessageId.class); @@ -53,23 +54,25 @@ public void testExtractedMetadata() throws IOException { when(mockMessage.getSchemaVersion()).thenReturn(schemaVersionBytes); when(mockMessageId.toString()).thenReturn(messageIdString); when(mockMessageId.toByteArray()).thenReturn(messageIdBytes); + when(mockMessage.getTopicName()).thenReturn(topicName); Map metadataWithHumanReadableMetadata = - MetadataUtil.extractedMetadata(mockRecord, true, true); + MetadataUtil.extractedMetadata(mockRecord, true, true, true); Assert.assertEquals(metadataWithHumanReadableMetadata.get(METADATA_MESSAGE_ID_KEY), messageIdString); Assert.assertNotEquals(metadataWithHumanReadableMetadata.get(METADATA_MESSAGE_ID_KEY), ByteBuffer.wrap(messageIdBytes)); Assert.assertEquals(metadataWithHumanReadableMetadata.get(METADATA_SCHEMA_VERSION_KEY), MetadataUtil.parseSchemaVersionFromBytes(schemaVersionBytes)); + Assert.assertEquals(metadataWithHumanReadableMetadata.get(MetadataUtil.METADATA_TOPIC), topicName); Map metadataWithHumanReadableMessageId = - MetadataUtil.extractedMetadata(mockRecord, true, false); + MetadataUtil.extractedMetadata(mockRecord, true, false, false); Assert.assertEquals(metadataWithHumanReadableMessageId.get(METADATA_MESSAGE_ID_KEY), messageIdString); Assert.assertNotEquals(metadataWithHumanReadableMessageId.get(METADATA_MESSAGE_ID_KEY), ByteBuffer.wrap(messageIdBytes)); - Map metadata = MetadataUtil.extractedMetadata(mockRecord, false, false); + Map metadata = MetadataUtil.extractedMetadata(mockRecord, false, false, false); Assert.assertEquals(metadata.get(METADATA_MESSAGE_ID_KEY), ByteBuffer.wrap(messageIdBytes)); Assert.assertEquals(metadata.get(METADATA_SCHEMA_VERSION_KEY), schemaVersionBytes); Assert.assertNotEquals(metadata.get(METADATA_MESSAGE_ID_KEY), messageIdString);