Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
90b21ff
Initial dsm extractors configuration
kr-igor Nov 3, 2025
2295d15
Added ability to configure extractors
kr-igor Nov 5, 2025
b140389
Log extractors
kr-igor Nov 5, 2025
d57ec63
Fixed json parsing
kr-igor Nov 5, 2025
515f991
Added url param to DSM endpoint
kr-igor Nov 6, 2025
eb4ebb8
Refactoring
kr-igor Nov 10, 2025
d27db19
Transaction extractors implementation for http and kafka
kr-igor Nov 10, 2025
1123717
Added some logs
kr-igor Nov 10, 2025
135d9de
Added value to supported configurations
kr-igor Nov 10, 2025
9d4b3c3
Added some logs
kr-igor Nov 11, 2025
6ebc3d3
Added public API + toString implementation
kr-igor Nov 11, 2025
d624be7
Fixed get extractors
kr-igor Nov 11, 2025
576c9a9
Added serialization tests, added TRANSACTION_CHECKPOINT_IDS to DSM ch…
kr-igor Nov 11, 2025
184a07f
Added extractors lock
kr-igor Nov 11, 2025
76167c9
Cleanup + test fixes
kr-igor Nov 12, 2025
8e40194
Use constant for no extractors case
kr-igor Nov 12, 2025
08c9fa9
Avoid read lock if DSM is not enabled
kr-igor Nov 12, 2025
57b1db5
Removed one more System.out
kr-igor Nov 12, 2025
d5627fd
Fixed more tests
kr-igor Nov 12, 2025
092afea
Removed RC for now
kr-igor Nov 12, 2025
d3fa445
Mark traces with transaction id
kr-igor Nov 12, 2025
7ea2ba5
Merge branch 'master' into kr-igor/dsm-transaction-extractors
kr-igor Nov 12, 2025
d9349f3
Some debug logs
kr-igor Nov 19, 2025
413efac
Added toString to wellKnownTags
kr-igor Nov 19, 2025
05d730b
Merge branch 'master' into kr-igor/dsm-transaction-extractors
kr-igor Jan 12, 2026
d0eb9ee
Fixed conflicts
kr-igor Jan 12, 2026
0fee4f2
Added force flush based on the payload size
kr-igor Jan 12, 2026
fd4e92e
Fixed a few tests
kr-igor Jan 12, 2026
8f0855f
Cleanup
kr-igor Jan 12, 2026
c6f2e64
Cleanup
kr-igor Jan 12, 2026
6e626a7
Merge branch 'master' into kr-igor/dsm-transaction-extractors
kr-igor Jan 13, 2026
7e43295
Minor cleanup
kr-igor Jan 13, 2026
3b6e91a
Merge branch 'master' into kr-igor/dsm-transaction-extractors
kr-igor Jan 13, 2026
155788c
Merge branch 'master' into kr-igor/dsm-transaction-extractors
kr-igor Jan 14, 2026
2c0fa9f
Limit the number of active extractors
kr-igor Jan 14, 2026
9a2e80f
Initial round of fixes
kr-igor Jan 15, 2026
eedd097
More fixes
kr-igor Jan 15, 2026
d331c1f
Simplified logic to avoid duplicates
kr-igor Jan 15, 2026
add9a5e
Merge branch 'master' into kr-igor/dsm-transaction-extractors
kr-igor Jan 15, 2026
ea690bd
Fixed helper classes
kr-igor Jan 15, 2026
6e55bf8
Fixed tests
kr-igor Jan 15, 2026
1a0d460
Merge branch 'master' into kr-igor/dsm-transaction-extractors
kr-igor Jan 16, 2026
030b792
Added missing reflect config
kr-igor Jan 16, 2026
3ac8b1c
More updated to reflect-config.json
kr-igor Jan 16, 2026
6b95123
Merge branch 'master' into kr-igor/dsm-transaction-extractors
kr-igor Jan 16, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
import datadog.trace.api.InstrumenterConfig;
import datadog.trace.api.ProductActivation;
import datadog.trace.api.appsec.HttpClientRequest;
import datadog.trace.api.datastreams.DataStreamsTransactionExtractor;
import datadog.trace.api.datastreams.DataStreamsTransactionTracker;
import datadog.trace.api.gateway.BlockResponseFunction;
import datadog.trace.api.gateway.Flow;
import datadog.trace.api.gateway.RequestContext;
Expand Down Expand Up @@ -68,8 +70,19 @@ protected boolean shouldSetResourceName() {
return true;
}

private final DataStreamsTransactionTracker.TransactionSourceReader
DSM_TRANSACTION_SOURCE_READER =
(source, headerName) -> getRequestHeader((REQUEST) source, headerName);

public AgentSpan onRequest(final AgentSpan span, final REQUEST request) {
if (request != null) {
AgentTracer.get()
.getDataStreamsMonitoring()
.trackTransaction(
span,
DataStreamsTransactionExtractor.Type.HTTP_OUT_HEADERS,
request,
DSM_TRANSACTION_SOURCE_READER);

String method = method(request);
span.setTag(Tags.HTTP_METHOD, method);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
import datadog.context.propagation.Propagators;
import datadog.trace.api.Config;
import datadog.trace.api.DDTags;
import datadog.trace.api.datastreams.DataStreamsTransactionExtractor;
import datadog.trace.api.datastreams.DataStreamsTransactionTracker;
import datadog.trace.api.function.TriConsumer;
import datadog.trace.api.function.TriFunction;
import datadog.trace.api.gateway.BlockResponseFunction;
Expand Down Expand Up @@ -95,6 +97,14 @@ public abstract class HttpServerDecorator<REQUEST, CONNECTION, RESPONSE, REQUEST

protected abstract int status(RESPONSE response);

protected String getRequestHeader(REQUEST request, String key) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if it's supposed to be implemented by all the server instrumentations I would rather declare the method as abstract otherwise it will be forgotten in the future by new instrumentations

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did that and then realized that every instrumentation needs to have that method implemented. Instead I went with this by default + override for servers we support in transaction tracking. I prefer to start with this and then revise / extend as needed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is error prone for the next contributors that will use the API to get header values and will get null as if there was no header while it's due to a missing implementation.

// This method was not marked as abstract in order to avoid changing all server instrumentation
// at once.
// Instead, only ones required (by DSM specifically) have it implemented.
// This can change in the future.
return null;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would be nice to have a comment why null returned

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💭 thought: ‏Too bad the client decorator did not follow the same conventions for http headers than url and method... no get prefix. I might rework it later.

}

protected String requestedSessionId(REQUEST request) {
return null;
}
Expand Down Expand Up @@ -174,6 +184,10 @@ protected AgentSpanContext startInferredProxySpan(Context context, AgentSpanCont
return span.start(extracted);
}

private final DataStreamsTransactionTracker.TransactionSourceReader
DSM_TRANSACTION_SOURCE_READER =
(source, headerName) -> getRequestHeader((REQUEST) source, headerName);

public AgentSpan onRequest(
final AgentSpan span,
final CONNECTION connection,
Expand Down Expand Up @@ -326,6 +340,13 @@ public AgentSpan onRequest(
span.setRequestBlockingAction((RequestBlockingAction) flow.getAction());
}

AgentTracer.get()
.getDataStreamsMonitoring()
.trackTransaction(
span,
DataStreamsTransactionExtractor.Type.HTTP_IN_HEADERS,
request,
DSM_TRANSACTION_SOURCE_READER);
return span;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,20 @@
{"name": "toJson"}
]
},
{
"name" : "datadog.trace.agent.core.datastreams.DataStreamsTransactionExtractors$DataStreamsTransactionExtractorAdapter",
"methods": [
{"name": "fromJson"},
{"name": "toJson"}
]
},
{
"name" : "datadog.trace.agent.core.datastreams.DataStreamsTransactionExtractors$DataStreamsTransactionExtractorsAdapter",
"methods": [
{"name": "fromJson"},
{"name": "toJson"}
]
},
{
"name" : "datadog.trace.agent.core.DDSpanLink$SpanLinkAdapter",
"methods": [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -468,6 +468,11 @@ class HttpServerDecoratorTest extends ServerDecoratorTest {
protected int status(Map m) {
return m.status == null ? 0 : m.status
}

@Override
protected String getRequestHeader(Map map, String key) {
return map.getOrDefault(key, null)
}
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,15 @@
package datadog.trace.agent.test

import static datadog.communication.http.OkHttpUtils.buildHttpClient
import static datadog.trace.api.ConfigDefaults.DEFAULT_AGENT_HOST
import static datadog.trace.api.ConfigDefaults.DEFAULT_AGENT_TIMEOUT
import static datadog.trace.api.ConfigDefaults.DEFAULT_TRACE_AGENT_PORT
import static datadog.trace.api.config.DebuggerConfig.DYNAMIC_INSTRUMENTATION_SNAPSHOT_URL
import static datadog.trace.api.config.DebuggerConfig.DYNAMIC_INSTRUMENTATION_VERIFY_BYTECODE
import static datadog.trace.api.config.TraceInstrumentationConfig.CODE_ORIGIN_FOR_SPANS_ENABLED
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.closePrevious
import static datadog.trace.util.AgentThreadFactory.AgentThread.TASK_SCHEDULER

import ch.qos.logback.classic.Level
import ch.qos.logback.classic.util.ContextInitializer
import com.datadog.debugger.agent.ClassesToRetransformFinder
Expand Down Expand Up @@ -33,6 +43,7 @@ import datadog.trace.api.TraceConfig
import datadog.trace.api.config.GeneralConfig
import datadog.trace.api.config.TracerConfig
import datadog.trace.api.datastreams.AgentDataStreamsMonitoring
import datadog.trace.api.datastreams.DataStreamsTransactionExtractor
import datadog.trace.api.sampling.SamplingRule
import datadog.trace.api.time.SystemTimeSource
import datadog.trace.bootstrap.ActiveSubsystems
Expand All @@ -56,6 +67,13 @@ import de.thetaphi.forbiddenapis.SuppressForbidden
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings
import groovy.transform.stc.ClosureParams
import groovy.transform.stc.SimpleType
import java.lang.instrument.ClassFileTransformer
import java.lang.instrument.Instrumentation
import java.nio.ByteBuffer
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.TimeUnit
import java.util.concurrent.TimeoutException
import java.util.concurrent.atomic.AtomicInteger
import net.bytebuddy.agent.ByteBuddyAgent
import net.bytebuddy.agent.builder.AgentBuilder
import net.bytebuddy.description.type.TypeDescription
Expand All @@ -69,24 +87,6 @@ import org.slf4j.LoggerFactory
import org.spockframework.mock.MockUtil
import spock.lang.Shared

import java.lang.instrument.ClassFileTransformer
import java.lang.instrument.Instrumentation
import java.nio.ByteBuffer
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.TimeUnit
import java.util.concurrent.TimeoutException
import java.util.concurrent.atomic.AtomicInteger

import static datadog.communication.http.OkHttpUtils.buildHttpClient
import static datadog.trace.api.ConfigDefaults.DEFAULT_AGENT_HOST
import static datadog.trace.api.ConfigDefaults.DEFAULT_AGENT_TIMEOUT
import static datadog.trace.api.ConfigDefaults.DEFAULT_TRACE_AGENT_PORT
import static datadog.trace.api.config.DebuggerConfig.DYNAMIC_INSTRUMENTATION_SNAPSHOT_URL
import static datadog.trace.api.config.DebuggerConfig.DYNAMIC_INSTRUMENTATION_VERIFY_BYTECODE
import static datadog.trace.api.config.TraceInstrumentationConfig.CODE_ORIGIN_FOR_SPANS_ENABLED
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.closePrevious
import static datadog.trace.util.AgentThreadFactory.AgentThread.TASK_SCHEDULER

/**
* A specification that automatically applies instrumentation and exposes a global trace
* writer.
Expand Down Expand Up @@ -250,6 +250,10 @@ abstract class InstrumentationSpecification extends DDSpecification implements A
List<? extends SamplingRule.TraceSamplingRule> getTraceSamplingRules() {
return null
}

List<DataStreamsTransactionExtractor> getDataStreamsTransactionExtractors() {
return null
}
}

@SuppressFBWarnings(value = "AT_STALE_THREAD_WRITE_OF_PRIMITIVE", justification = "The variable is accessed only by the test thread in setup and cleanup.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,12 @@ protected int status(final HttpResponse httpResponse) {
return httpResponse.status().intValue();
}

@Override
protected String getRequestHeader(HttpRequest httpRequest, String key) {
Optional<akka.http.javadsl.model.HttpHeader> header = httpRequest.getHeader(key);
return header.map(HttpHeader::value).orElse(null);
}

@Override
protected boolean isAppSecOnResponseSeparate() {
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,11 @@ protected boolean isAppSecOnResponseSeparate() {
return true;
}

@Override
protected String getRequestHeader(final Request request, String key) {
return request.getHeader(key);
}

@Override
protected BlockResponseFunction createBlockResponseFunction(Request request, Request connection) {
return new JettyBlockResponseFunction(request);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,11 @@ protected boolean isAppSecOnResponseSeparate() {
return true;
}

@Override
protected String getRequestHeader(final Request request, String key) {
return request.getHeader(key);
}

public AgentSpan onResponse(AgentSpan span, HttpChannel channel) {
Request request = channel.getRequest();
Response response = channel.getResponse();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,11 @@ protected int status(final Response response) {
return response.getStatus();
}

@Override
protected String getRequestHeader(final Request request, String key) {
return request.getHeaders().get(key);
}

public AgentSpan onResponse(AgentSpan span, HttpChannelState channel) {
Request request = channel.getRequest();
Response response = channel.getResponse();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,11 @@ protected boolean isAppSecOnResponseSeparate() {
return true;
}

@Override
protected String getRequestHeader(final Request request, String key) {
return request.getHeader(key);
}

public AgentSpan onResponse(AgentSpan span, HttpConnection channel) {
Request request = channel.getRequest();
Response response = channel.getResponse();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,11 @@ protected boolean isAppSecOnResponseSeparate() {
return true;
}

@Override
protected String getRequestHeader(final Request request, String key) {
return request.getHeader(key);
}

public AgentSpan onResponse(AgentSpan span, AbstractHttpConnection connection) {
Request request = connection.getRequest();
Response response = connection.getResponse();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,11 @@ protected boolean isAppSecOnResponseSeparate() {
return true;
}

@Override
protected String getRequestHeader(final Request request, String key) {
return request.getHeader(key);
}

public AgentSpan onResponse(AgentSpan span, HttpChannel<?> channel) {
Request request = channel.getRequest();
Response response = channel.getResponse();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import static datadog.trace.instrumentation.kafka_clients.KafkaDecorator.PRODUCER_DECORATE;
import static datadog.trace.instrumentation.kafka_clients.KafkaDecorator.TIME_IN_QUEUE_ENABLED;
import static datadog.trace.instrumentation.kafka_common.StreamingContext.STREAMING_CONTEXT;
import static datadog.trace.instrumentation.kafka_common.Utils.DSM_TRANSACTION_SOURCE_READER;
import static java.util.Collections.singletonMap;
import static net.bytebuddy.matcher.ElementMatchers.isMethod;
import static net.bytebuddy.matcher.ElementMatchers.isPrivate;
Expand All @@ -30,6 +31,7 @@
import datadog.trace.api.Config;
import datadog.trace.api.datastreams.DataStreamsContext;
import datadog.trace.api.datastreams.DataStreamsTags;
import datadog.trace.api.datastreams.DataStreamsTransactionExtractor;
import datadog.trace.api.datastreams.StatsPoint;
import datadog.trace.bootstrap.InstrumentationContext;
import datadog.trace.bootstrap.instrumentation.api.AgentScope;
Expand Down Expand Up @@ -83,6 +85,7 @@ public String[] helperClassNames() {
packageName + ".KafkaProducerCallback",
"datadog.trace.instrumentation.kafka_common.StreamingContext",
"datadog.trace.instrumentation.kafka_common.ClusterIdHolder",
"datadog.trace.instrumentation.kafka_common.Utils",
packageName + ".AvroSchemaExtractor",
};
}
Expand Down Expand Up @@ -202,6 +205,14 @@ record =
if (TIME_IN_QUEUE_ENABLED) {
setter.injectTimeInQueue(record.headers());
}

AgentTracer.get()
.getDataStreamsMonitoring()
.trackTransaction(
span,
DataStreamsTransactionExtractor.Type.KAFKA_PRODUCE_HEADERS,
record,
DSM_TRANSACTION_SOURCE_READER);
return activateSpan(span);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import static datadog.trace.instrumentation.kafka_clients.TextMapExtractAdapter.GETTER;
import static datadog.trace.instrumentation.kafka_clients.TextMapInjectAdapter.SETTER;
import static datadog.trace.instrumentation.kafka_common.StreamingContext.STREAMING_CONTEXT;
import static datadog.trace.instrumentation.kafka_common.Utils.DSM_TRANSACTION_SOURCE_READER;
import static datadog.trace.instrumentation.kafka_common.Utils.computePayloadSizeBytes;
import static java.util.concurrent.TimeUnit.MILLISECONDS;

Expand All @@ -23,6 +24,7 @@
import datadog.trace.api.Config;
import datadog.trace.api.datastreams.DataStreamsContext;
import datadog.trace.api.datastreams.DataStreamsTags;
import datadog.trace.api.datastreams.DataStreamsTransactionExtractor;
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
import datadog.trace.bootstrap.instrumentation.api.AgentSpanContext;
import datadog.trace.bootstrap.instrumentation.api.AgentTracer;
Expand Down Expand Up @@ -128,6 +130,14 @@ protected void startNewRecordSpan(ConsumerRecord<?, ?> val) {
if (null != queueSpan) {
queueSpan.finish();
}

AgentTracer.get()
.getDataStreamsMonitoring()
.trackTransaction(
span,
DataStreamsTransactionExtractor.Type.KAFKA_CONSUME_HEADERS,
val.headers(),
DSM_TRANSACTION_SOURCE_READER);
}
} catch (final Exception e) {
log.debug("Error starting new record span", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ public String[] helperClassNames() {
packageName + ".KafkaProducerCallback",
"datadog.trace.instrumentation.kafka_common.StreamingContext",
"datadog.trace.instrumentation.kafka_common.ClusterIdHolder",
"datadog.trace.instrumentation.kafka_common.Utils",
packageName + ".AvroSchemaExtractor",
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import datadog.trace.api.Config;
import datadog.trace.api.datastreams.DataStreamsContext;
import datadog.trace.api.datastreams.DataStreamsTags;
import datadog.trace.api.datastreams.DataStreamsTransactionExtractor;
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
import datadog.trace.bootstrap.instrumentation.api.AgentSpanContext;
import datadog.trace.bootstrap.instrumentation.api.AgentTracer;
Expand Down Expand Up @@ -128,6 +129,14 @@ protected void startNewRecordSpan(ConsumerRecord<?, ?> val) {
if (null != queueSpan) {
queueSpan.finish();
}

AgentTracer.get()
.getDataStreamsMonitoring()
.trackTransaction(
span,
DataStreamsTransactionExtractor.Type.KAFKA_CONSUME_HEADERS,
val.headers(),
Utils.DSM_TRANSACTION_SOURCE_READER);
}
} catch (final Exception e) {
log.debug("Error starting new record span", e);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package datadog.trace.instrumentation.kafka_common;

import datadog.trace.api.datastreams.DataStreamsTransactionTracker;
import java.nio.charset.StandardCharsets;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.header.Header;
Expand All @@ -8,6 +9,10 @@
public final class Utils {
private Utils() {} // prevent instantiation

public static DataStreamsTransactionTracker.TransactionSourceReader
DSM_TRANSACTION_SOURCE_READER =
(source, headerName) -> new String(((Headers) source).lastHeader(headerName).value());

// this method is used in kafka-clients and kafka-streams instrumentations
public static long computePayloadSizeBytes(ConsumerRecord<?, ?> val) {
long headersSize = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,4 +67,9 @@ protected int peerPort(final HttpExchange exchange) {
protected int status(final HttpExchange exchange) {
return exchange.getResponseCode();
}

@Override
protected String getRequestHeader(final HttpExchange exchange, String key) {
return exchange.getRequestHeaders().getFirst(key);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

❔ question: Don't you fear we will miss header value by only taking the first one?‏

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As of now we expect customers to set one value per header and I honestly can't imagine people associating multiple transactions with a single http call.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, but if you add API at the decorator level, it should be able to serve all purposes, not only the DSM one.
Here, it could be dropping many values like the Set-Cookie case (one cookie per header declaration according the RFC).

}
}
Loading