diff --git a/docker-compose.yml b/docker-compose.yml index a16fef8893d..81bdd3c2032 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -129,7 +129,7 @@ services: - KAFKA_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093 - KAFKA_CONTROLLER_QUORUM_VOTERS=1@127.0.0.1:9093 - KAFKA_CONTROLLER_LISTENER_NAMES=CONTROLLER - - KAFKA_CLUSTER_ID=r4zt_wrqTRuT7W2NJsB_GA + - CLUSTER_ID=5L6g3nShT-eMCtK--X86sw - KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:9092 - KAFKA_INTER_BROKER_LISTENER_NAME=PLAINTEXT - KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT diff --git a/packages/datadog-instrumentations/src/kafkajs.js b/packages/datadog-instrumentations/src/kafkajs.js index 395c69de057..e75c03e7e64 100644 --- a/packages/datadog-instrumentations/src/kafkajs.js +++ b/packages/datadog-instrumentations/src/kafkajs.js @@ -52,45 +52,59 @@ addHook({ name: 'kafkajs', file: 'src/index.js', versions: ['>=1.4'] }, (BaseKaf const send = producer.send const bootstrapServers = this._brokers - producer.send = function () { - const innerAsyncResource = new AsyncResource('bound-anonymous-fn') + const kafkaClusterIdPromise = getKafkaClusterId(this) - return innerAsyncResource.runInAsyncScope(() => { - if (!producerStartCh.hasSubscribers) { - return send.apply(this, arguments) - } + producer.send = function () { + const wrappedSend = (clusterId) => { + const innerAsyncResource = new AsyncResource('bound-anonymous-fn') - try { - const { topic, messages = [] } = arguments[0] - for (const message of messages) { - if (message !== null && typeof message === 'object') { - message.headers = message.headers || {} - } + return innerAsyncResource.runInAsyncScope(() => { + if (!producerStartCh.hasSubscribers) { + return send.apply(this, arguments) } - producerStartCh.publish({ topic, messages, bootstrapServers }) - - const result = send.apply(this, arguments) - - result.then( - innerAsyncResource.bind(res => { - producerFinishCh.publish(undefined) - producerCommitCh.publish(res) - }), - innerAsyncResource.bind(err => { - if (err) { - producerErrorCh.publish(err) + + try { + const { topic, messages = [] } = arguments[0] + for (const message of messages) { + if (message !== null && typeof message === 'object') { + message.headers = message.headers || {} } - producerFinishCh.publish(undefined) - }) - ) + } + producerStartCh.publish({ topic, messages, bootstrapServers, clusterId }) - return result - } catch (e) { - producerErrorCh.publish(e) - producerFinishCh.publish(undefined) - throw e - } - }) + const result = send.apply(this, arguments) + + result.then( + innerAsyncResource.bind(res => { + producerFinishCh.publish(undefined) + producerCommitCh.publish(res) + }), + innerAsyncResource.bind(err => { + if (err) { + producerErrorCh.publish(err) + } + producerFinishCh.publish(undefined) + }) + ) + + return result + } catch (e) { + producerErrorCh.publish(e) + producerFinishCh.publish(undefined) + throw e + } + }) + } + + if (!isPromise(kafkaClusterIdPromise)) { + // promise is already resolved + return wrappedSend(kafkaClusterIdPromise) + } else { + // promise is not resolved + return kafkaClusterIdPromise.then((clusterId) => { + return wrappedSend(clusterId) + }) + } } return producer }) @@ -100,15 +114,17 @@ addHook({ name: 'kafkajs', file: 'src/index.js', versions: ['>=1.4'] }, (BaseKaf return createConsumer.apply(this, arguments) } - const eachMessageExtractor = (args) => { + const kafkaClusterIdPromise = getKafkaClusterId(this) + + const eachMessageExtractor = (args, clusterId) => { const { topic, partition, message } = args[0] - return { topic, partition, message, groupId } + return { topic, partition, message, groupId, clusterId } } - const eachBatchExtractor = (args) => { + const eachBatchExtractor = (args, clusterId) => { const { batch } = args[0] const { topic, partition, messages } = batch - return { topic, partition, messages, groupId } + return { topic, partition, messages, groupId, clusterId } } const consumer = createConsumer.apply(this, arguments) @@ -116,43 +132,53 @@ addHook({ name: 'kafkajs', file: 'src/index.js', versions: ['>=1.4'] }, (BaseKaf consumer.on(consumer.events.COMMIT_OFFSETS, commitsFromEvent) const run = consumer.run - const groupId = arguments[0].groupId + consumer.run = function ({ eachMessage, eachBatch, ...runArgs }) { - eachMessage = wrapFunction( - eachMessage, - consumerStartCh, - consumerFinishCh, - consumerErrorCh, - eachMessageExtractor - ) - - eachBatch = wrapFunction( - eachBatch, - batchConsumerStartCh, - batchConsumerFinishCh, - batchConsumerErrorCh, - eachBatchExtractor - ) - - return run({ - eachMessage, - eachBatch, - ...runArgs - }) + const wrapConsume = (clusterId) => { + return run({ + eachMessage: wrappedCallback( + eachMessage, + consumerStartCh, + consumerFinishCh, + consumerErrorCh, + eachMessageExtractor, + clusterId + ), + eachBatch: wrappedCallback( + eachBatch, + batchConsumerStartCh, + batchConsumerFinishCh, + batchConsumerErrorCh, + eachBatchExtractor, + clusterId + ), + ...runArgs + }) + } + + if (!isPromise(kafkaClusterIdPromise)) { + // promise is already resolved + return wrapConsume(kafkaClusterIdPromise) + } else { + // promise is not resolved + return kafkaClusterIdPromise.then((clusterId) => { + return wrapConsume(clusterId) + }) + } } - return consumer }) return Kafka }) -const wrapFunction = (fn, startCh, finishCh, errorCh, extractArgs) => { +const wrappedCallback = (fn, startCh, finishCh, errorCh, extractArgs, clusterId) => { return typeof fn === 'function' ? function (...args) { const innerAsyncResource = new AsyncResource('bound-anonymous-fn') return innerAsyncResource.runInAsyncScope(() => { - const extractedArgs = extractArgs(args) + const extractedArgs = extractArgs(args, clusterId) + startCh.publish(extractedArgs) try { const result = fn.apply(this, args) @@ -179,3 +205,37 @@ const wrapFunction = (fn, startCh, finishCh, errorCh, extractArgs) => { } : fn } + +const getKafkaClusterId = (kafka) => { + if (kafka._ddKafkaClusterId) { + return kafka._ddKafkaClusterId + } + + if (!kafka.admin) { + return null + } + + const admin = kafka.admin() + + if (!admin.describeCluster) { + return null + } + + return admin.connect() + .then(() => { + return admin.describeCluster() + }) + .then((clusterInfo) => { + const clusterId = clusterInfo?.clusterId + kafka._ddKafkaClusterId = clusterId + admin.disconnect() + return clusterId + }) + .catch((error) => { + throw error + }) +} + +function isPromise (obj) { + return !!obj && (typeof obj === 'object' || typeof obj === 'function') && typeof obj.then === 'function' +} diff --git a/packages/datadog-plugin-kafkajs/src/batch-consumer.js b/packages/datadog-plugin-kafkajs/src/batch-consumer.js index 8415b037644..e0228a018c2 100644 --- a/packages/datadog-plugin-kafkajs/src/batch-consumer.js +++ b/packages/datadog-plugin-kafkajs/src/batch-consumer.js @@ -5,14 +5,17 @@ class KafkajsBatchConsumerPlugin extends ConsumerPlugin { static get id () { return 'kafkajs' } static get operation () { return 'consume-batch' } - start ({ topic, partition, messages, groupId }) { + start ({ topic, partition, messages, groupId, clusterId }) { if (!this.config.dsmEnabled) return for (const message of messages) { if (!message || !message.headers) continue const payloadSize = getMessageSize(message) this.tracer.decodeDataStreamsContext(message.headers) - this.tracer - .setCheckpoint(['direction:in', `group:${groupId}`, `topic:${topic}`, 'type:kafka'], null, payloadSize) + const edgeTags = ['direction:in', `group:${groupId}`, `topic:${topic}`, 'type:kafka'] + if (clusterId) { + edgeTags.push(`kafka_cluster_id:${clusterId}`) + } + this.tracer.setCheckpoint(edgeTags, null, payloadSize) } } } diff --git a/packages/datadog-plugin-kafkajs/src/consumer.js b/packages/datadog-plugin-kafkajs/src/consumer.js index 84b6a02fdda..ee04c5eb60c 100644 --- a/packages/datadog-plugin-kafkajs/src/consumer.js +++ b/packages/datadog-plugin-kafkajs/src/consumer.js @@ -62,7 +62,7 @@ class KafkajsConsumerPlugin extends ConsumerPlugin { } } - start ({ topic, partition, message, groupId }) { + start ({ topic, partition, message, groupId, clusterId }) { const childOf = extract(this.tracer, message.headers) const span = this.startSpan({ childOf, @@ -71,7 +71,8 @@ class KafkajsConsumerPlugin extends ConsumerPlugin { meta: { component: 'kafkajs', 'kafka.topic': topic, - 'kafka.message.offset': message.offset + 'kafka.message.offset': message.offset, + 'kafka.cluster_id': clusterId }, metrics: { 'kafka.partition': partition @@ -80,8 +81,11 @@ class KafkajsConsumerPlugin extends ConsumerPlugin { if (this.config.dsmEnabled && message?.headers) { const payloadSize = getMessageSize(message) this.tracer.decodeDataStreamsContext(message.headers) - this.tracer - .setCheckpoint(['direction:in', `group:${groupId}`, `topic:${topic}`, 'type:kafka'], span, payloadSize) + const edgeTags = ['direction:in', `group:${groupId}`, `topic:${topic}`, 'type:kafka'] + if (clusterId) { + edgeTags.push(`kafka_cluster_id:${clusterId}`) + } + this.tracer.setCheckpoint(edgeTags, span, payloadSize) } if (afterStartCh.hasSubscribers) { diff --git a/packages/datadog-plugin-kafkajs/src/producer.js b/packages/datadog-plugin-kafkajs/src/producer.js index 7b9aff95310..aa12357b4cf 100644 --- a/packages/datadog-plugin-kafkajs/src/producer.js +++ b/packages/datadog-plugin-kafkajs/src/producer.js @@ -66,12 +66,13 @@ class KafkajsProducerPlugin extends ProducerPlugin { } } - start ({ topic, messages, bootstrapServers }) { + start ({ topic, messages, bootstrapServers, clusterId }) { const span = this.startSpan({ resource: topic, meta: { component: 'kafkajs', - 'kafka.topic': topic + 'kafka.topic': topic, + 'kafka.cluster_id': clusterId }, metrics: { 'kafka.batch_size': messages.length @@ -85,8 +86,13 @@ class KafkajsProducerPlugin extends ProducerPlugin { this.tracer.inject(span, 'text_map', message.headers) if (this.config.dsmEnabled) { const payloadSize = getMessageSize(message) - const dataStreamsContext = this.tracer - .setCheckpoint(['direction:out', `topic:${topic}`, 'type:kafka'], span, payloadSize) + const edgeTags = ['direction:out', `topic:${topic}`, 'type:kafka'] + + if (clusterId) { + edgeTags.push(`kafka_cluster_id:${clusterId}`) + } + + const dataStreamsContext = this.tracer.setCheckpoint(edgeTags, span, payloadSize) DsmPathwayCodec.encode(dataStreamsContext, message.headers) } } diff --git a/packages/datadog-plugin-kafkajs/test/index.spec.js b/packages/datadog-plugin-kafkajs/test/index.spec.js index 3df303a95cf..f67279bdd9f 100644 --- a/packages/datadog-plugin-kafkajs/test/index.spec.js +++ b/packages/datadog-plugin-kafkajs/test/index.spec.js @@ -13,18 +13,22 @@ const { computePathwayHash } = require('../../dd-trace/src/datastreams/pathway') const { ENTRY_PARENT_HASH, DataStreamsProcessor } = require('../../dd-trace/src/datastreams/processor') const testTopic = 'test-topic' -const expectedProducerHash = computePathwayHash( - 'test', - 'tester', - ['direction:out', 'topic:' + testTopic, 'type:kafka'], - ENTRY_PARENT_HASH -) -const expectedConsumerHash = computePathwayHash( - 'test', - 'tester', - ['direction:in', 'group:test-group', 'topic:' + testTopic, 'type:kafka'], - expectedProducerHash -) +const testKafkaClusterId = '5L6g3nShT-eMCtK--X86sw' + +const getDsmPathwayHash = (clusterIdAvailable, isProducer, parentHash) => { + let edgeTags + if (isProducer) { + edgeTags = ['direction:out', 'topic:' + testTopic, 'type:kafka'] + } else { + edgeTags = ['direction:in', 'group:test-group', 'topic:' + testTopic, 'type:kafka'] + } + + if (clusterIdAvailable) { + edgeTags.push(`kafka_cluster_id:${testKafkaClusterId}`) + } + edgeTags.sort() + return computePathwayHash('test', 'tester', edgeTags, parentHash) +} describe('Plugin', () => { describe('kafkajs', function () { @@ -38,6 +42,16 @@ describe('Plugin', () => { let kafka let tracer let Kafka + let clusterIdAvailable + let expectedProducerHash + let expectedConsumerHash + + before(() => { + clusterIdAvailable = semver.intersects(version, '>=1.13') + expectedProducerHash = getDsmPathwayHash(clusterIdAvailable, true, ENTRY_PARENT_HASH) + expectedConsumerHash = getDsmPathwayHash(clusterIdAvailable, false, expectedProducerHash) + }) + describe('without configuration', () => { const messages = [{ key: 'key1', value: 'test2' }] @@ -56,14 +70,17 @@ describe('Plugin', () => { describe('producer', () => { it('should be instrumented', async () => { + const meta = { + 'span.kind': 'producer', + component: 'kafkajs', + 'pathway.hash': expectedProducerHash.readBigUInt64BE(0).toString() + } + if (clusterIdAvailable) meta['kafka.cluster_id'] = testKafkaClusterId + const expectedSpanPromise = expectSpanWithDefaults({ name: expectedSchema.send.opName, service: expectedSchema.send.serviceName, - meta: { - 'span.kind': 'producer', - component: 'kafkajs', - 'pathway.hash': expectedProducerHash.readBigUInt64BE(0).toString() - }, + meta, metrics: { 'kafka.batch_size': messages.length }, @@ -353,6 +370,12 @@ describe('Plugin', () => { await consumer.subscribe({ topic: testTopic }) }) + before(() => { + clusterIdAvailable = semver.intersects(version, '>=1.13') + expectedProducerHash = getDsmPathwayHash(clusterIdAvailable, true, ENTRY_PARENT_HASH) + expectedConsumerHash = getDsmPathwayHash(clusterIdAvailable, false, expectedProducerHash) + }) + afterEach(async () => { await consumer.disconnect() }) @@ -368,19 +391,6 @@ describe('Plugin', () => { setDataStreamsContextSpy.restore() }) - const expectedProducerHash = computePathwayHash( - 'test', - 'tester', - ['direction:out', 'topic:' + testTopic, 'type:kafka'], - ENTRY_PARENT_HASH - ) - const expectedConsumerHash = computePathwayHash( - 'test', - 'tester', - ['direction:in', 'group:test-group', 'topic:' + testTopic, 'type:kafka'], - expectedProducerHash - ) - it('Should set a checkpoint on produce', async () => { const messages = [{ key: 'consumerDSM1', value: 'test2' }] await sendMessages(kafka, testTopic, messages) @@ -476,9 +486,9 @@ describe('Plugin', () => { } /** - * No choice but to reinitialize everything, because the only way to flush eachMessage - * calls is to disconnect. - */ + * No choice but to reinitialize everything, because the only way to flush eachMessage + * calls is to disconnect. + */ consumer.connect() await sendMessages(kafka, testTopic, messages) await consumer.run({ eachMessage: async () => {}, autoCommit: false }) diff --git a/packages/dd-trace/src/datastreams/pathway.js b/packages/dd-trace/src/datastreams/pathway.js index 066af789e64..ed2f6cc85f8 100644 --- a/packages/dd-trace/src/datastreams/pathway.js +++ b/packages/dd-trace/src/datastreams/pathway.js @@ -21,6 +21,7 @@ function shaHash (checkpointString) { } function computeHash (service, env, edgeTags, parentHash) { + edgeTags.sort() const hashableEdgeTags = edgeTags.filter(item => item !== 'manual_checkpoint:true') const key = `${service}${env}` + hashableEdgeTags.join('') + parentHash.toString() diff --git a/packages/dd-trace/test/datastreams/data_streams_checkpointer.spec.js b/packages/dd-trace/test/datastreams/data_streams_checkpointer.spec.js index ba33d4c8bdf..db29f96b575 100644 --- a/packages/dd-trace/test/datastreams/data_streams_checkpointer.spec.js +++ b/packages/dd-trace/test/datastreams/data_streams_checkpointer.spec.js @@ -2,8 +2,8 @@ require('../setup/tap') const agent = require('../plugins/agent') -const expectedProducerHash = '13182885521735152072' -const expectedConsumerHash = '5980058680018671020' +const expectedProducerHash = '11369286567396183453' +const expectedConsumerHash = '11204511019589278729' const DSM_CONTEXT_HEADER = 'dd-pathway-ctx-base64' describe('data streams checkpointer manual api', () => {