Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[feat][Proposal-2] Support including the topic name to the metadata(#836) #862

Merged
merged 1 commit into from
Jan 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ public class BlobStoreAbstractConfig implements Serializable {
private boolean withMetadata;
private boolean useHumanReadableMessageId;
private boolean useHumanReadableSchemaVersion;
private boolean includeTopicToMetadata;
private boolean withTopicPartitionNumber = true;
private String bytesFormatTypeSeparator = "0x10";
private boolean skipFailedMessages = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ public class AvroFormat implements Format<GenericRecord> , InitConfiguration<Blo
private boolean useMetadata;
private boolean useHumanReadableMessageId;
private boolean useHumanReadableSchemaVersion;
private boolean includeTopicToMetadata;
private CodecFactory codecFactory;

@Override
Expand All @@ -65,6 +66,7 @@ public void configure(BlobStoreAbstractConfig configuration) {
this.useMetadata = configuration.isWithMetadata();
this.useHumanReadableMessageId = configuration.isUseHumanReadableMessageId();
this.useHumanReadableSchemaVersion = configuration.isUseHumanReadableSchemaVersion();
this.includeTopicToMetadata = configuration.isIncludeTopicToMetadata();
String codecName = configuration.getAvroCodec();
if (codecName == null) {
this.codecFactory = CodecFactory.nullCodec();
Expand All @@ -86,7 +88,7 @@ public void initSchema(org.apache.pulsar.client.api.Schema<GenericRecord> schema
rootAvroSchema = AvroRecordUtil.convertToAvroSchema(schema);
if (useMetadata){
rootAvroSchema = MetadataUtil.setMetadataSchema(rootAvroSchema,
useHumanReadableMessageId, useHumanReadableSchemaVersion);
useHumanReadableMessageId, useHumanReadableSchemaVersion, includeTopicToMetadata);
}

LOGGER.debug("Using avro schema: {}", rootAvroSchema);
Expand Down Expand Up @@ -124,7 +126,7 @@ public ByteBuffer recordWriterBuf(Iterator<Record<GenericRecord>> records) throw
if (useMetadata) {
org.apache.avro.generic.GenericRecord metadataRecord =
MetadataUtil.extractedMetadataRecord(next,
useHumanReadableMessageId, useHumanReadableSchemaVersion);
useHumanReadableMessageId, useHumanReadableSchemaVersion, includeTopicToMetadata);
writeRecord.put(MetadataUtil.MESSAGE_METADATA_KEY, metadataRecord);
}
fileWriter.append(writeRecord);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ public class JsonFormat implements Format<GenericRecord>, InitConfiguration<Blob
private boolean useMetadata;
private boolean useHumanReadableMessageId;
private boolean useHumanReadableSchemaVersion;
private boolean includeTopicToMetadata;

@Override
public String getExtension() {
Expand All @@ -79,6 +80,7 @@ public void configure(BlobStoreAbstractConfig configuration) {
this.useMetadata = configuration.isWithMetadata();
this.useHumanReadableMessageId = configuration.isUseHumanReadableMessageId();
this.useHumanReadableSchemaVersion = configuration.isUseHumanReadableSchemaVersion();
this.includeTopicToMetadata = configuration.isIncludeTopicToMetadata();

if (configuration.isJsonAllowNaN()) {
JSON_MAPPER.get().enable(ALLOW_NON_NUMERIC_NUMBERS.mappedFeature());
Expand Down Expand Up @@ -117,7 +119,8 @@ public ByteBuffer recordWriterBuf(Iterator<Record<GenericRecord>> record) throws
Map<String, Object> writeValue = convertRecordToObject(next.getValue(), schema);
if (useMetadata) {
writeValue.put(MetadataUtil.MESSAGE_METADATA_KEY,
MetadataUtil.extractedMetadata(next, useHumanReadableMessageId, useHumanReadableSchemaVersion));
MetadataUtil.extractedMetadata(next, useHumanReadableMessageId, useHumanReadableSchemaVersion,
includeTopicToMetadata));
}
String recordAsString = JSON_MAPPER.get().writeValueAsString(writeValue);
stringBuilder.append(recordAsString).append("\n");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ public class ParquetFormat implements Format<GenericRecord>, InitConfiguration<B
private boolean useMetadata;
private boolean useHumanReadableMessageId;
private boolean useHumanReadableSchemaVersion;
private boolean includeTopicToMetadata;

private CompressionCodecName compressionCodecName = CompressionCodecName.GZIP;

Expand All @@ -76,6 +77,7 @@ public void configure(BlobStoreAbstractConfig configuration) {
this.useMetadata = configuration.isWithMetadata();
this.useHumanReadableMessageId = configuration.isUseHumanReadableMessageId();
this.useHumanReadableSchemaVersion = configuration.isUseHumanReadableSchemaVersion();
this.includeTopicToMetadata = configuration.isIncludeTopicToMetadata();
this.compressionCodecName = CompressionCodecName.fromConf(configuration.getParquetCodec());
}

Expand Down Expand Up @@ -185,7 +187,7 @@ public void initSchema(org.apache.pulsar.client.api.Schema<GenericRecord> schema
rootAvroSchema = AvroRecordUtil.convertToAvroSchema(schema);
if (useMetadata) {
rootAvroSchema = MetadataUtil.setMetadataSchema(rootAvroSchema,
useHumanReadableMessageId, useHumanReadableSchemaVersion);
useHumanReadableMessageId, useHumanReadableSchemaVersion, includeTopicToMetadata);
}
log.info("Using avro schema: {}", rootAvroSchema);
}
Expand Down Expand Up @@ -268,7 +270,9 @@ public ByteBuffer recordWriterBuf(Iterator<Record<GenericRecord>> records) throw
if (useMetadata) {
org.apache.avro.generic.GenericRecord metadataRecord =
MetadataUtil.extractedMetadataRecord(next,
useHumanReadableMessageId, useHumanReadableSchemaVersion);
useHumanReadableMessageId,
useHumanReadableSchemaVersion,
includeTopicToMetadata);
writeRecord.put(MESSAGE_METADATA_KEY, metadataRecord);
}
if (parquetWriter != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ public class MetadataUtil {
public static final String METADATA_PROPERTIES_KEY = "properties";
public static final String METADATA_SCHEMA_VERSION_KEY = "schemaVersion";
public static final String METADATA_MESSAGE_ID_KEY = "messageId";
public static final String METADATA_TOPIC = "topic";
public static final String MESSAGE_METADATA_KEY = "__message_metadata__";
public static final String MESSAGE_METADATA_NAME = "messageMetadata";
public static final Schema MESSAGE_METADATA = buildMetadataSchema();
Expand All @@ -49,11 +50,12 @@ public class MetadataUtil {

public static org.apache.avro.generic.GenericRecord extractedMetadataRecord(Record<GenericRecord> next,
boolean useHumanReadableMessageId,
boolean useHumanReadableSchemaVersion) {
boolean useHumanReadableSchemaVersion,
boolean includeTopicToMetadata) {
final Message<GenericRecord> message = next.getMessage().get();

GenericData.Record record = new GenericData.Record(buildMetadataSchema(
useHumanReadableMessageId, useHumanReadableSchemaVersion));
useHumanReadableMessageId, useHumanReadableSchemaVersion, includeTopicToMetadata));
record.put(METADATA_PROPERTIES_KEY, message.getProperties());
if (useHumanReadableSchemaVersion) {
record.put(METADATA_SCHEMA_VERSION_KEY,
Expand All @@ -66,20 +68,24 @@ public static org.apache.avro.generic.GenericRecord extractedMetadataRecord(Reco
} else {
record.put(METADATA_MESSAGE_ID_KEY, ByteBuffer.wrap(message.getMessageId().toByteArray()));
}
if (includeTopicToMetadata) {
record.put(METADATA_TOPIC, message.getTopicName());
}
return record;
}

public static org.apache.avro.generic.GenericRecord extractedMetadataRecord(Record<GenericRecord> next) {
return extractedMetadataRecord(next, false, false);
return extractedMetadataRecord(next, false, false, false);
}

public static Map<String, Object> extractedMetadata(Record<GenericRecord> next) {
return extractedMetadata(next, false, false);
return extractedMetadata(next, false, false, false);
}

public static Map<String, Object> extractedMetadata(Record<GenericRecord> next,
boolean useHumanReadableMessageId,
boolean useHumanReadableSchemaVersion) {
boolean useHumanReadableSchemaVersion,
boolean includeTopicToMetadata) {
Map<String, Object> metadata = new HashMap<>();
final Message<GenericRecord> message = next.getMessage().get();
metadata.put(METADATA_PROPERTIES_KEY, message.getProperties());
Expand All @@ -94,6 +100,9 @@ public static Map<String, Object> extractedMetadata(Record<GenericRecord> next,
} else {
metadata.put(METADATA_MESSAGE_ID_KEY, ByteBuffer.wrap(message.getMessageId().toByteArray()));
}
if (includeTopicToMetadata) {
metadata.put(METADATA_TOPIC, message.getTopicName());
}
return metadata;
}

Expand All @@ -114,14 +123,15 @@ public static Schema setMetadataSchema(Schema schema) {

public static Schema setMetadataSchema(Schema schema,
boolean useHumanReadableMessageId,
boolean useHumanReadableSchemaVersion) {
boolean useHumanReadableSchemaVersion,
boolean includeTopicToMetadata) {
final List<Schema.Field> fieldWithMetadata = schemaFieldThreadLocal.get();
fieldWithMetadata.clear();
schema.getFields().forEach(f -> {
fieldWithMetadata.add(new Schema.Field(f, f.schema()));
});
fieldWithMetadata.add(new Schema.Field(MESSAGE_METADATA_KEY, buildMetadataSchema(
useHumanReadableMessageId, useHumanReadableSchemaVersion)));
useHumanReadableMessageId, useHumanReadableSchemaVersion, includeTopicToMetadata)));
return Schema.createRecord(schema.getName(),
schema.getDoc(),
schema.getNamespace(),
Expand All @@ -131,11 +141,12 @@ public static Schema setMetadataSchema(Schema schema,
}

private static Schema buildMetadataSchema(){
return buildMetadataSchema(false, false);
return buildMetadataSchema(false, false, false);
}

private static Schema buildMetadataSchema(boolean useHumanReadableMessageId,
boolean useHumanReadableSchemaVersion) {
boolean useHumanReadableSchemaVersion,
boolean includeTopicToMetadata) {
List<Schema.Field> fields = new ArrayList<>();
fields.add(new Schema.Field(METADATA_PROPERTIES_KEY,
Schema.createUnion(
Expand All @@ -157,6 +168,10 @@ private static Schema buildMetadataSchema(boolean useHumanReadableMessageId,
fields.add(new Schema.Field(METADATA_MESSAGE_ID_KEY, Schema.createUnion(
Schema.create(Schema.Type.NULL), Schema.create(Schema.Type.BYTES))));
}
if (includeTopicToMetadata) {
fields.add(new Schema.Field(METADATA_TOPIC, Schema.createUnion(
Schema.create(Schema.Type.NULL), Schema.create(Schema.Type.STRING))));
}
return Schema.createRecord(MESSAGE_METADATA_NAME,
null,
null,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ public void testExtractedMetadata() throws IOException {
String messageIdString = "1:2:3:4";
byte[] schemaVersionBytes = new byte[]{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x0A};
byte[] messageIdBytes = new byte[]{0x00, 0x01, 0x02, 0x03};
String topicName = "test-topic";
Record<GenericRecord> mockRecord = mock(Record.class);
Message<GenericRecord> mockMessage = mock(Message.class);
MessageId mockMessageId = mock(MessageId.class);
Expand All @@ -53,23 +54,25 @@ public void testExtractedMetadata() throws IOException {
when(mockMessage.getSchemaVersion()).thenReturn(schemaVersionBytes);
when(mockMessageId.toString()).thenReturn(messageIdString);
when(mockMessageId.toByteArray()).thenReturn(messageIdBytes);
when(mockMessage.getTopicName()).thenReturn(topicName);

Map<String, Object> metadataWithHumanReadableMetadata =
MetadataUtil.extractedMetadata(mockRecord, true, true);
MetadataUtil.extractedMetadata(mockRecord, true, true, true);
Assert.assertEquals(metadataWithHumanReadableMetadata.get(METADATA_MESSAGE_ID_KEY), messageIdString);
Assert.assertNotEquals(metadataWithHumanReadableMetadata.get(METADATA_MESSAGE_ID_KEY),
ByteBuffer.wrap(messageIdBytes));
Assert.assertEquals(metadataWithHumanReadableMetadata.get(METADATA_SCHEMA_VERSION_KEY),
MetadataUtil.parseSchemaVersionFromBytes(schemaVersionBytes));
Assert.assertEquals(metadataWithHumanReadableMetadata.get(MetadataUtil.METADATA_TOPIC), topicName);

Map<String, Object> metadataWithHumanReadableMessageId =
MetadataUtil.extractedMetadata(mockRecord, true, false);
MetadataUtil.extractedMetadata(mockRecord, true, false, false);
Assert.assertEquals(metadataWithHumanReadableMessageId.get(METADATA_MESSAGE_ID_KEY), messageIdString);
Assert.assertNotEquals(metadataWithHumanReadableMessageId.get(METADATA_MESSAGE_ID_KEY),
ByteBuffer.wrap(messageIdBytes));


Map<String, Object> metadata = MetadataUtil.extractedMetadata(mockRecord, false, false);
Map<String, Object> metadata = MetadataUtil.extractedMetadata(mockRecord, false, false, false);
Assert.assertEquals(metadata.get(METADATA_MESSAGE_ID_KEY), ByteBuffer.wrap(messageIdBytes));
Assert.assertEquals(metadata.get(METADATA_SCHEMA_VERSION_KEY), schemaVersionBytes);
Assert.assertNotEquals(metadata.get(METADATA_MESSAGE_ID_KEY), messageIdString);
Expand Down
Loading