Skip to content

Commit

Permalink
Universal profiling integration: Added serialization of stacktrace ID…
Browse files Browse the repository at this point in the history
…s as profiler_stack_trace_ids otel attributes (#3607)
  • Loading branch information
JonasKunz authored May 2, 2024
1 parent b14f658 commit 1f1857b
Show file tree
Hide file tree
Showing 7 changed files with 199 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,11 @@
import co.elastic.apm.agent.impl.metadata.MetaDataFuture;
import co.elastic.apm.agent.impl.metadata.NameAndIdField;
import co.elastic.apm.agent.impl.metadata.ServiceFactory;
import co.elastic.apm.agent.impl.transaction.Id;
import co.elastic.apm.agent.sdk.internal.util.LoggerUtils;
import co.elastic.apm.agent.tracer.metrics.DoubleSupplier;
import co.elastic.apm.agent.tracer.metrics.Labels;
import co.elastic.apm.agent.tracer.pooling.Allocator;
import co.elastic.apm.agent.tracer.service.Service;
import co.elastic.apm.agent.tracer.service.ServiceInfo;
import co.elastic.apm.agent.configuration.SpanConfiguration;
Expand Down Expand Up @@ -127,6 +129,7 @@ public class ElasticApmTracer implements Tracer {
private final ObjectPool<Span> spanPool;
private final ObjectPool<ErrorCapture> errorPool;
private final ObjectPool<TraceContext> spanLinkPool;
private final ObjectPool<Id> profilingCorrelationStackTraceIdPool;
private final Reporter reporter;
private final ObjectPoolFactory objectPoolFactory;

Expand Down Expand Up @@ -245,6 +248,13 @@ public void onChange(ConfigurationOption<?> configurationOption, Boolean oldValu
// span links pool allows for 10X the maximum allowed span links per span
spanLinkPool = poolFactory.createSpanLinkPool(AbstractSpan.MAX_ALLOWED_SPAN_LINKS * 10, this);

profilingCorrelationStackTraceIdPool = poolFactory.createRecyclableObjectPool(maxPooledElements, new Allocator<Id>() {
@Override
public Id createInstance() {
return Id.new128BitId();
}
});

sampler = ProbabilitySampler.of(coreConfiguration.getSampleRate().get());
coreConfiguration.getSampleRate().addChangeListener(new ConfigurationOption.ChangeListener<Double>() {
@Override
Expand Down Expand Up @@ -604,6 +614,10 @@ public TraceContext createSpanLink() {
return spanLinkPool.createInstance();
}

public Id createProfilingCorrelationStackTraceId() {
return profilingCorrelationStackTraceIdPool.createInstance();
}

public void recycle(Transaction transaction) {
transactionPool.recycle(transaction);
}
Expand All @@ -620,6 +634,10 @@ public void recycle(TraceContext traceContext) {
spanLinkPool.recycle(traceContext);
}

public void recycleProfilingCorrelationStackTraceId(Id id) {
profilingCorrelationStackTraceIdPool.recycle(id);
}

public synchronized void stop() {
if (tracerState == TracerState.STOPPED) {
// may happen if explicitly stopped in a unit test and executed again within a shutdown hook
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package co.elastic.apm.agent.impl.transaction;

import co.elastic.apm.agent.report.serialize.Base64SerializationUtils;
import co.elastic.apm.agent.report.serialize.HexSerializationUtils;
import co.elastic.apm.agent.tracer.pooling.Recyclable;
import co.elastic.apm.agent.tracer.util.HexUtils;
Expand Down Expand Up @@ -175,6 +176,10 @@ public void writeAsHex(JsonWriter jw) {
HexSerializationUtils.writeBytesAsHex(data, jw);
}

public void writeAsBase64UrlSafe(JsonWriter jw) {
Base64SerializationUtils.writeBytesAsBase64UrlSafe(data, jw);
}

public void writeAsHex(StringBuilder sb) {
HexUtils.writeBytesAsHex(data, sb);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.HdrHistogram.WriterReaderPhaser;

import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;

Expand Down Expand Up @@ -107,6 +108,8 @@ public class Transaction extends AbstractSpan<Transaction> implements co.elastic
@Nullable
private Throwable pendingException;

private final ArrayList<Id> profilingCorrelationStackTraceIds = new ArrayList<>();

/**
* Faas
* <p>
Expand Down Expand Up @@ -341,9 +344,23 @@ public void resetState() {
faas.resetState();
wasActivated.set(false);
pendingException = null;
recycleProfilingCorrelationStackTraceIds();
// don't clear timerBySpanTypeAndSubtype map (see field-level javadoc)
}

private void recycleProfilingCorrelationStackTraceIds() {
for (Id toRecycle : profilingCorrelationStackTraceIds) {
tracer.recycleProfilingCorrelationStackTraceId(toRecycle);
}
if (profilingCorrelationStackTraceIds.size() > 100) {
profilingCorrelationStackTraceIds.clear();
//trim overly big lists
profilingCorrelationStackTraceIds.trimToSize();
} else {
profilingCorrelationStackTraceIds.clear();
}
}

@Override
public boolean isNoop() {
return noop;
Expand Down Expand Up @@ -552,4 +569,19 @@ public Throwable getPendingTransactionException() {
return this.pendingException;
}

public void addProfilerCorrelationStackTrace(Id idToCopy) {
Id id = tracer.createProfilingCorrelationStackTraceId();
id.copyFrom(idToCopy);
synchronized (profilingCorrelationStackTraceIds) {
this.profilingCorrelationStackTraceIds.add(id);
}
}

/**
* Returns the list of stacktrace-IDs from the profiler associated with this transaction
* To protect agains concurrent modifications, consumers must synchronize on the returned list.
*/
public List<Id> getProfilingCorrelationStackTraceIds() {
return profilingCorrelationStackTraceIds;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package co.elastic.apm.agent.report.serialize;

import com.dslplatform.json.JsonWriter;

public class Base64SerializationUtils {

private static final byte[] BASE64_URL_CHARS = new byte[]{
'A', 'B', 'C', 'D', 'E', 'F', 'G', 'H',
'I', 'J', 'K', 'L', 'M', 'N', 'O', 'P',
'Q', 'R', 'S', 'T', 'U', 'V', 'W', 'X',
'Y', 'Z', 'a', 'b', 'c', 'd', 'e', 'f',
'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n',
'o', 'p', 'q', 'r', 's', 't', 'u', 'v',
'w', 'x', 'y', 'z', '0', '1', '2', '3',
'4', '5', '6', '7', '8', '9', '-', '_',
};


public static void writeBytesAsBase64UrlSafe(byte[] data, JsonWriter jw) {
int i = 0;
for (; i + 2 < data.length; i += 3) {
int b0 = ((int) data[i]) & 0xFF;
int b1 = ((int) data[i + 1]) & 0xFF;
int b2 = ((int) data[i + 2]) & 0xFF;
jw.writeByte(BASE64_URL_CHARS[b0 >> 2]);
jw.writeByte(BASE64_URL_CHARS[((b0 << 4) & 63) | (b1 >> 4)]);
jw.writeByte(BASE64_URL_CHARS[((b1 << 2) & 63) | (b2 >> 6)]);
jw.writeByte(BASE64_URL_CHARS[b2 & 63]);
}
int leftOver = data.length - i;
if (leftOver == 1) {
int b0 = ((int) data[i]) & 0xFF;
jw.writeByte(BASE64_URL_CHARS[b0 >> 2]);
jw.writeByte(BASE64_URL_CHARS[(b0 << 4) & 63]);
} else if (leftOver == 2) {
int b0 = ((int) data[i]) & 0xFF;
int b1 = ((int) data[i + 1]) & 0xFF;
jw.writeByte(BASE64_URL_CHARS[b0 >> 2]);
jw.writeByte(BASE64_URL_CHARS[((b0 << 4) & 63) | (b1 >> 4)]);
jw.writeByte(BASE64_URL_CHARS[(b1 << 2) & 63]);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -1050,10 +1051,22 @@ private void serializeSpanLinks(List<TraceContext> spanLinks) {
}
}

private void serializeOTel(AbstractSpan<?> span) {
private void serializeOTel(Span span) {
serializeOtel(span, Collections.<Id>emptyList());
}

private void serializeOTel(Transaction transaction) {
List<Id> profilingCorrelationStackTraceIds = transaction.getProfilingCorrelationStackTraceIds();
synchronized (profilingCorrelationStackTraceIds) {
serializeOtel(transaction, profilingCorrelationStackTraceIds);
}
}

private void serializeOtel(AbstractSpan<?> span, List<Id> profilingStackTraceIds) {
OTelSpanKind kind = span.getOtelKind();
Map<String, Object> attributes = span.getOtelAttributes();
boolean hasAttributes = !attributes.isEmpty();

boolean hasAttributes = !attributes.isEmpty() || !profilingStackTraceIds.isEmpty();
boolean hasKind = kind != null;
if (hasKind || hasAttributes) {
writeFieldName("otel");
Expand All @@ -1070,11 +1083,13 @@ private void serializeOTel(AbstractSpan<?> span) {
}
writeFieldName("attributes");
jw.writeByte(OBJECT_START);
int index = 0;
boolean isFirstAttrib = true;
for (Map.Entry<String, Object> entry : attributes.entrySet()) {
if (index++ > 0) {
if (!isFirstAttrib) {
jw.writeByte(COMMA);
}
isFirstAttrib = false;

writeFieldName(entry.getKey());
Object o = entry.getValue();
if (o instanceof Number) {
Expand All @@ -1085,6 +1100,22 @@ private void serializeOTel(AbstractSpan<?> span) {
BoolConverter.serialize((Boolean) o, jw);
}
}
if (!profilingStackTraceIds.isEmpty()) {
if (!isFirstAttrib) {
jw.writeByte(COMMA);
}
writeFieldName("elastic.profiler_stack_trace_ids");
jw.writeByte(ARRAY_START);
for (int i = 0; i < profilingStackTraceIds.size(); i++) {
if (i != 0) {
jw.writeByte(COMMA);
}
jw.writeByte(QUOTE);
profilingStackTraceIds.get(i).writeAsBase64UrlSafe(jw);
jw.writeByte(QUOTE);
}
jw.writeByte(ARRAY_END);
}
jw.writeByte(OBJECT_END);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package co.elastic.apm.agent.report.serialize;

import com.dslplatform.json.DslJson;
import com.dslplatform.json.JsonWriter;
import org.junit.jupiter.api.Test;

import java.util.Base64;
import java.util.Random;

import static org.assertj.core.api.AssertionsForClassTypes.assertThat;

public class Base64SerializationUtilTest {

@Test
public void empty() {
JsonWriter jw = new DslJson<>(new DslJson.Settings<>()).newWriter();
Base64SerializationUtils.writeBytesAsBase64UrlSafe(new byte[0], jw);
assertThat(jw.size()).isEqualTo(0);
}

@Test
public void randomInputs() {
DslJson<Object> dslJson = new DslJson<>(new DslJson.Settings<>());

Base64.Encoder reference = Base64.getUrlEncoder().withoutPadding();

Random rnd = new Random(42);
for (int i = 0; i < 100_000; i++) {
int len = rnd.nextInt(31) + 1;
byte[] data = new byte[len];
rnd.nextBytes(data);

String expectedResult = reference.encodeToString(data);

JsonWriter jw = dslJson.newWriter();
Base64SerializationUtils.writeBytesAsBase64UrlSafe(data, jw);

assertThat(jw.toString()).isEqualTo(expectedResult);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1584,6 +1584,31 @@ void testSpanLinksSerialization() {
assertThat(parent2link.get("span_id").textValue()).isEqualTo(parent2.getTraceContext().getId().toString());
}

private static Id create128BitId(String id) {
Id idObj = Id.new128BitId();
idObj.fromHexString(id, 0);
return idObj;
}

@Test
void testProfilingStackTraceIdSerialization() {
Transaction transaction = tracer.startRootTransaction(null);

transaction.addProfilerCorrelationStackTrace(create128BitId("a1a2a3a4a5a6a7a8b1b2b3b4b5b6b7b8"));
transaction.addProfilerCorrelationStackTrace(create128BitId("c1c2c3c4c5c6c7c8d1d2d3d4d5d6d7d8"));

JsonNode transactionJson = readJsonString(writer.toJsonString(transaction));
JsonNode otel = transactionJson.get("otel");
assertThat(otel).isNotNull();
JsonNode attributes = otel.get("attributes");
assertThat(attributes).isNotNull();
JsonNode ids = attributes.get("elastic.profiler_stack_trace_ids");
assertThat(ids.isArray()).isTrue();
assertThat(ids.size()).isEqualTo(2);
assertThat(ids.get(0).asText()).isEqualTo("oaKjpKWmp6ixsrO0tba3uA");
assertThat(ids.get(1).asText()).isEqualTo("wcLDxMXGx8jR0tPU1dbX2A");
}

@ParameterizedTest
@ValueSource(booleans = {true, false})
void testSerializeLog(boolean asString) {
Expand Down

0 comments on commit 1f1857b

Please sign in to comment.