Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tracing(aws-sdk): improve sqs dsm tracing experience #4425

Merged
merged 7 commits into from
Jun 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion packages/datadog-instrumentations/src/aws-sdk.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ function wrapRequest (send) {

return innerAr.runInAsyncScope(() => {
this.on('complete', innerAr.bind(response => {
channel(`apm:aws:request:complete:${channelSuffix}`).publish({ response })
const cbExists = typeof cb === 'function'
channel(`apm:aws:request:complete:${channelSuffix}`).publish({ response, cbExists })
}))

startCh.publish({
Expand Down
8 changes: 7 additions & 1 deletion packages/datadog-plugin-aws-sdk/src/base.js
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,17 @@ class BaseAwsSdkPlugin extends ClientPlugin {
span.setTag('region', region)
})

this.addSub(`apm:aws:request:complete:${this.serviceIdentifier}`, ({ response }) => {
this.addSub(`apm:aws:request:complete:${this.serviceIdentifier}`, ({ response, cbExists = false }) => {
const store = storage.getStore()
if (!store) return
const { span } = store
if (!span) return
// try to extract DSM context from response if no callback exists as extraction normally happens in CB
if (!cbExists && this.serviceIdentifier === 'sqs') {
const params = response.request.params
const operation = response.request.operation
this.responseExtractDSMContext(operation, params, response.data, span)
}
this.addResponseTags(span, response)
this.finish(span, response, response.error)
})
Expand Down
5 changes: 3 additions & 2 deletions packages/datadog-plugin-aws-sdk/src/services/kinesis.js
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ class Kinesis extends BaseAwsSdkPlugin {

// extract DSM context after as we might not have a parent-child but may have a DSM context
this.responseExtractDSMContext(
request.operation, response, span || null, streamName
request.operation, request.params, response, span || null, { streamName }
)
}
})
Expand Down Expand Up @@ -100,7 +100,8 @@ class Kinesis extends BaseAwsSdkPlugin {
}
}

responseExtractDSMContext (operation, response, span, streamName) {
responseExtractDSMContext (operation, params, response, span, kwargs = {}) {
const { streamName } = kwargs
if (!this.config.dsmEnabled) return
if (operation !== 'getRecords') return
if (!response || !response.Records || !response.Records[0]) return
Expand Down
21 changes: 17 additions & 4 deletions packages/datadog-plugin-aws-sdk/src/services/sqs.js
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ class Sqs extends BaseAwsSdkPlugin {
const plugin = this
const contextExtraction = this.responseExtract(request.params, request.operation, response)
let span
let parsedMessageAttributes
let parsedMessageAttributes = null
if (contextExtraction && contextExtraction.datadogContext) {
obj.needsFinish = true
const options = {
Expand All @@ -39,8 +39,9 @@ class Sqs extends BaseAwsSdkPlugin {
this.enter(span, store)
}
// extract DSM context after as we might not have a parent-child but may have a DSM context

this.responseExtractDSMContext(
request.operation, request.params, response, span || null, parsedMessageAttributes || null
request.operation, request.params, response, span || null, { parsedMessageAttributes }
)
})

Expand Down Expand Up @@ -165,7 +166,8 @@ class Sqs extends BaseAwsSdkPlugin {
}
}

responseExtractDSMContext (operation, params, response, span, parsedAttributes) {
responseExtractDSMContext (operation, params, response, span, kwargs = {}) {
let { parsedAttributes } = kwargs
if (!this.config.dsmEnabled) return
if (operation !== 'receiveMessage') return
if (!response || !response.Messages || !response.Messages[0]) return
Expand All @@ -188,7 +190,7 @@ class Sqs extends BaseAwsSdkPlugin {
// SQS to SQS
}
}
if (message.MessageAttributes && message.MessageAttributes._datadog) {
if (!parsedAttributes && message.MessageAttributes && message.MessageAttributes._datadog) {
parsedAttributes = this.parseDatadogAttributes(message.MessageAttributes._datadog)
}
}
Expand Down Expand Up @@ -219,6 +221,17 @@ class Sqs extends BaseAwsSdkPlugin {
this.injectToMessage(span, params.Entries[i], params.QueueUrl, i === 0)
}
break
case 'receiveMessage':
if (!params.MessageAttributeNames) {
params.MessageAttributeNames = ['_datadog']
} else if (
!params.MessageAttributeNames.includes('_datadog') &&
!params.MessageAttributeNames.includes('.*') &&
!params.MessageAttributeNames.includes('All')
) {
params.MessageAttributeNames.push('_datadog')
}
break
}
}

Expand Down
29 changes: 29 additions & 0 deletions packages/datadog-plugin-aws-sdk/test/sqs.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
const sinon = require('sinon')
const agent = require('../../dd-trace/test/plugins/agent')
const { setup } = require('./spec_helpers')
const semver = require('semver')
const { rawExpectedSchema } = require('./sqs-naming')

const queueName = 'SQS_QUEUE_NAME'
Expand Down Expand Up @@ -408,6 +409,34 @@ describe('Plugin', () => {
})
})

if (sqsClientName === 'aws-sdk' && semver.intersects(version, '>=2.3')) {
it('Should set pathway hash tag on a span when consuming and promise() was used over a callback',
async () => {
await sqs.sendMessage({ MessageBody: 'test DSM', QueueUrl: QueueUrlDsm })
await sqs.receiveMessage({ QueueUrl: QueueUrlDsm }).promise()

let consumeSpanMeta = {}
return new Promise((resolve, reject) => {
agent.use(traces => {
const span = traces[0][0]

if (span.name === 'aws.request' && span.meta['aws.operation'] === 'receiveMessage') {
consumeSpanMeta = span.meta
}

try {
expect(consumeSpanMeta).to.include({
'pathway.hash': expectedConsumerHash
})
resolve()
} catch (error) {
reject(error)
}
})
})
})
}

it('Should emit DSM stats to the agent when sending a message', done => {
agent.expectPipelineStats(dsmStats => {
let statsPointsReceived = 0
Expand Down
Loading