diff --git a/dd-java-agent/agent-bootstrap/src/main/java/datadog/trace/bootstrap/instrumentation/decorator/HttpClientDecorator.java b/dd-java-agent/agent-bootstrap/src/main/java/datadog/trace/bootstrap/instrumentation/decorator/HttpClientDecorator.java index a5578202bb0..c57a7fc8e76 100644 --- a/dd-java-agent/agent-bootstrap/src/main/java/datadog/trace/bootstrap/instrumentation/decorator/HttpClientDecorator.java +++ b/dd-java-agent/agent-bootstrap/src/main/java/datadog/trace/bootstrap/instrumentation/decorator/HttpClientDecorator.java @@ -11,6 +11,8 @@ import datadog.trace.api.InstrumenterConfig; import datadog.trace.api.ProductActivation; import datadog.trace.api.appsec.HttpClientRequest; +import datadog.trace.api.datastreams.DataStreamsTransactionExtractor; +import datadog.trace.api.datastreams.DataStreamsTransactionTracker; import datadog.trace.api.gateway.BlockResponseFunction; import datadog.trace.api.gateway.Flow; import datadog.trace.api.gateway.RequestContext; @@ -68,8 +70,19 @@ protected boolean shouldSetResourceName() { return true; } + private final DataStreamsTransactionTracker.TransactionSourceReader + DSM_TRANSACTION_SOURCE_READER = + (source, headerName) -> getRequestHeader((REQUEST) source, headerName); + public AgentSpan onRequest(final AgentSpan span, final REQUEST request) { if (request != null) { + AgentTracer.get() + .getDataStreamsMonitoring() + .trackTransaction( + span, + DataStreamsTransactionExtractor.Type.HTTP_OUT_HEADERS, + request, + DSM_TRANSACTION_SOURCE_READER); String method = method(request); span.setTag(Tags.HTTP_METHOD, method); diff --git a/dd-java-agent/agent-bootstrap/src/main/java/datadog/trace/bootstrap/instrumentation/decorator/HttpServerDecorator.java b/dd-java-agent/agent-bootstrap/src/main/java/datadog/trace/bootstrap/instrumentation/decorator/HttpServerDecorator.java index efd179ce8c9..6073da7ea2a 100644 --- a/dd-java-agent/agent-bootstrap/src/main/java/datadog/trace/bootstrap/instrumentation/decorator/HttpServerDecorator.java +++ b/dd-java-agent/agent-bootstrap/src/main/java/datadog/trace/bootstrap/instrumentation/decorator/HttpServerDecorator.java @@ -13,6 +13,8 @@ import datadog.context.propagation.Propagators; import datadog.trace.api.Config; import datadog.trace.api.DDTags; +import datadog.trace.api.datastreams.DataStreamsTransactionExtractor; +import datadog.trace.api.datastreams.DataStreamsTransactionTracker; import datadog.trace.api.function.TriConsumer; import datadog.trace.api.function.TriFunction; import datadog.trace.api.gateway.BlockResponseFunction; @@ -95,6 +97,14 @@ public abstract class HttpServerDecorator getRequestHeader((REQUEST) source, headerName); + public AgentSpan onRequest( final AgentSpan span, final CONNECTION connection, @@ -326,6 +340,13 @@ public AgentSpan onRequest( span.setRequestBlockingAction((RequestBlockingAction) flow.getAction()); } + AgentTracer.get() + .getDataStreamsMonitoring() + .trackTransaction( + span, + DataStreamsTransactionExtractor.Type.HTTP_IN_HEADERS, + request, + DSM_TRANSACTION_SOURCE_READER); return span; } diff --git a/dd-java-agent/agent-bootstrap/src/main/resources/META-INF/native-image/com.datadoghq/dd-java-agent/reflect-config.json b/dd-java-agent/agent-bootstrap/src/main/resources/META-INF/native-image/com.datadoghq/dd-java-agent/reflect-config.json index adf96f3c8fe..e45e9e9c3eb 100644 --- a/dd-java-agent/agent-bootstrap/src/main/resources/META-INF/native-image/com.datadoghq/dd-java-agent/reflect-config.json +++ b/dd-java-agent/agent-bootstrap/src/main/resources/META-INF/native-image/com.datadoghq/dd-java-agent/reflect-config.json @@ -70,6 +70,27 @@ {"name": "toJson"} ] }, + { + "name" : "datadog.trace.agent.core.datastreams.DataStreamsTransactionExtractors$DataStreamsTransactionExtractorAdapter", + "methods": [ + {"name": "fromJson"}, + {"name": "toJson"} + ] + }, + { + "name" : "datadog.trace.agent.core.datastreams.DataStreamsTransactionExtractors$DataStreamsTransactionExtractorImpl", + "allDeclaredConstructors" : true, + "allPublicConstructors" : true, + "allDeclaredFields" : true, + "allPublicFields" : true + }, + { + "name" : "datadog.trace.agent.core.datastreams.DataStreamsTransactionExtractors$DataStreamsTransactionExtractorsAdapter", + "methods": [ + {"name": "fromJson"}, + {"name": "toJson"} + ] + }, { "name" : "datadog.trace.agent.core.DDSpanLink$SpanLinkAdapter", "methods": [ diff --git a/dd-java-agent/agent-bootstrap/src/test/groovy/datadog/trace/bootstrap/instrumentation/decorator/HttpServerDecoratorTest.groovy b/dd-java-agent/agent-bootstrap/src/test/groovy/datadog/trace/bootstrap/instrumentation/decorator/HttpServerDecoratorTest.groovy index 7c3efb1d48c..c56efc5f014 100644 --- a/dd-java-agent/agent-bootstrap/src/test/groovy/datadog/trace/bootstrap/instrumentation/decorator/HttpServerDecoratorTest.groovy +++ b/dd-java-agent/agent-bootstrap/src/test/groovy/datadog/trace/bootstrap/instrumentation/decorator/HttpServerDecoratorTest.groovy @@ -468,6 +468,11 @@ class HttpServerDecoratorTest extends ServerDecoratorTest { protected int status(Map m) { return m.status == null ? 0 : m.status } + + @Override + protected String getRequestHeader(Map map, String key) { + return map.getOrDefault(key, null) + } } } diff --git a/dd-java-agent/instrumentation-testing/src/main/groovy/datadog/trace/agent/test/InstrumentationSpecification.groovy b/dd-java-agent/instrumentation-testing/src/main/groovy/datadog/trace/agent/test/InstrumentationSpecification.groovy index 47e67ffbafc..713c6bf6dfc 100644 --- a/dd-java-agent/instrumentation-testing/src/main/groovy/datadog/trace/agent/test/InstrumentationSpecification.groovy +++ b/dd-java-agent/instrumentation-testing/src/main/groovy/datadog/trace/agent/test/InstrumentationSpecification.groovy @@ -1,5 +1,15 @@ package datadog.trace.agent.test +import static datadog.communication.http.OkHttpUtils.buildHttpClient +import static datadog.trace.api.ConfigDefaults.DEFAULT_AGENT_HOST +import static datadog.trace.api.ConfigDefaults.DEFAULT_AGENT_TIMEOUT +import static datadog.trace.api.ConfigDefaults.DEFAULT_TRACE_AGENT_PORT +import static datadog.trace.api.config.DebuggerConfig.DYNAMIC_INSTRUMENTATION_SNAPSHOT_URL +import static datadog.trace.api.config.DebuggerConfig.DYNAMIC_INSTRUMENTATION_VERIFY_BYTECODE +import static datadog.trace.api.config.TraceInstrumentationConfig.CODE_ORIGIN_FOR_SPANS_ENABLED +import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.closePrevious +import static datadog.trace.util.AgentThreadFactory.AgentThread.TASK_SCHEDULER + import ch.qos.logback.classic.Level import ch.qos.logback.classic.util.ContextInitializer import com.datadog.debugger.agent.ClassesToRetransformFinder @@ -33,6 +43,7 @@ import datadog.trace.api.TraceConfig import datadog.trace.api.config.GeneralConfig import datadog.trace.api.config.TracerConfig import datadog.trace.api.datastreams.AgentDataStreamsMonitoring +import datadog.trace.api.datastreams.DataStreamsTransactionExtractor import datadog.trace.api.sampling.SamplingRule import datadog.trace.api.time.SystemTimeSource import datadog.trace.bootstrap.ActiveSubsystems @@ -56,6 +67,13 @@ import de.thetaphi.forbiddenapis.SuppressForbidden import edu.umd.cs.findbugs.annotations.SuppressFBWarnings import groovy.transform.stc.ClosureParams import groovy.transform.stc.SimpleType +import java.lang.instrument.ClassFileTransformer +import java.lang.instrument.Instrumentation +import java.nio.ByteBuffer +import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.TimeUnit +import java.util.concurrent.TimeoutException +import java.util.concurrent.atomic.AtomicInteger import net.bytebuddy.agent.ByteBuddyAgent import net.bytebuddy.agent.builder.AgentBuilder import net.bytebuddy.description.type.TypeDescription @@ -69,24 +87,6 @@ import org.slf4j.LoggerFactory import org.spockframework.mock.MockUtil import spock.lang.Shared -import java.lang.instrument.ClassFileTransformer -import java.lang.instrument.Instrumentation -import java.nio.ByteBuffer -import java.util.concurrent.ConcurrentHashMap -import java.util.concurrent.TimeUnit -import java.util.concurrent.TimeoutException -import java.util.concurrent.atomic.AtomicInteger - -import static datadog.communication.http.OkHttpUtils.buildHttpClient -import static datadog.trace.api.ConfigDefaults.DEFAULT_AGENT_HOST -import static datadog.trace.api.ConfigDefaults.DEFAULT_AGENT_TIMEOUT -import static datadog.trace.api.ConfigDefaults.DEFAULT_TRACE_AGENT_PORT -import static datadog.trace.api.config.DebuggerConfig.DYNAMIC_INSTRUMENTATION_SNAPSHOT_URL -import static datadog.trace.api.config.DebuggerConfig.DYNAMIC_INSTRUMENTATION_VERIFY_BYTECODE -import static datadog.trace.api.config.TraceInstrumentationConfig.CODE_ORIGIN_FOR_SPANS_ENABLED -import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.closePrevious -import static datadog.trace.util.AgentThreadFactory.AgentThread.TASK_SCHEDULER - /** * A specification that automatically applies instrumentation and exposes a global trace * writer. @@ -250,6 +250,10 @@ abstract class InstrumentationSpecification extends DDSpecification implements A List getTraceSamplingRules() { return null } + + List getDataStreamsTransactionExtractors() { + return null + } } @SuppressFBWarnings(value = "AT_STALE_THREAD_WRITE_OF_PRIMITIVE", justification = "The variable is accessed only by the test thread in setup and cleanup.") diff --git a/dd-java-agent/instrumentation/akka/akka-http/akka-http-10.0/src/main/java/datadog/trace/instrumentation/akkahttp/AkkaHttpServerDecorator.java b/dd-java-agent/instrumentation/akka/akka-http/akka-http-10.0/src/main/java/datadog/trace/instrumentation/akkahttp/AkkaHttpServerDecorator.java index efa73e608cd..c4ffd1d83b0 100644 --- a/dd-java-agent/instrumentation/akka/akka-http/akka-http-10.0/src/main/java/datadog/trace/instrumentation/akkahttp/AkkaHttpServerDecorator.java +++ b/dd-java-agent/instrumentation/akka/akka-http/akka-http-10.0/src/main/java/datadog/trace/instrumentation/akkahttp/AkkaHttpServerDecorator.java @@ -88,6 +88,12 @@ protected int status(final HttpResponse httpResponse) { return httpResponse.status().intValue(); } + @Override + protected String getRequestHeader(HttpRequest httpRequest, String key) { + Optional header = httpRequest.getHeader(key); + return header.map(HttpHeader::value).orElse(null); + } + @Override protected boolean isAppSecOnResponseSeparate() { return true; diff --git a/dd-java-agent/instrumentation/jetty/jetty-server/jetty-server-10.0/src/main/java11/datadog/trace/instrumentation/jetty10/JettyDecorator.java b/dd-java-agent/instrumentation/jetty/jetty-server/jetty-server-10.0/src/main/java11/datadog/trace/instrumentation/jetty10/JettyDecorator.java index ed42375dd12..8a009440cb5 100644 --- a/dd-java-agent/instrumentation/jetty/jetty-server/jetty-server-10.0/src/main/java11/datadog/trace/instrumentation/jetty10/JettyDecorator.java +++ b/dd-java-agent/instrumentation/jetty/jetty-server/jetty-server-10.0/src/main/java11/datadog/trace/instrumentation/jetty10/JettyDecorator.java @@ -78,6 +78,11 @@ protected boolean isAppSecOnResponseSeparate() { return true; } + @Override + protected String getRequestHeader(final Request request, String key) { + return request.getHeader(key); + } + @Override protected BlockResponseFunction createBlockResponseFunction(Request request, Request connection) { return new JettyBlockResponseFunction(request); diff --git a/dd-java-agent/instrumentation/jetty/jetty-server/jetty-server-11.0/src/main/java11/datadog/trace/instrumentation/jetty11/JettyDecorator.java b/dd-java-agent/instrumentation/jetty/jetty-server/jetty-server-11.0/src/main/java11/datadog/trace/instrumentation/jetty11/JettyDecorator.java index 9b74d1f418e..014d32bb26a 100644 --- a/dd-java-agent/instrumentation/jetty/jetty-server/jetty-server-11.0/src/main/java11/datadog/trace/instrumentation/jetty11/JettyDecorator.java +++ b/dd-java-agent/instrumentation/jetty/jetty-server/jetty-server-11.0/src/main/java11/datadog/trace/instrumentation/jetty11/JettyDecorator.java @@ -78,6 +78,11 @@ protected boolean isAppSecOnResponseSeparate() { return true; } + @Override + protected String getRequestHeader(final Request request, String key) { + return request.getHeader(key); + } + public AgentSpan onResponse(AgentSpan span, HttpChannel channel) { Request request = channel.getRequest(); Response response = channel.getResponse(); diff --git a/dd-java-agent/instrumentation/jetty/jetty-server/jetty-server-12.0/src/main/java17/datadog/trace/instrumentation/jetty12/JettyDecorator.java b/dd-java-agent/instrumentation/jetty/jetty-server/jetty-server-12.0/src/main/java17/datadog/trace/instrumentation/jetty12/JettyDecorator.java index 979cb5d62a5..0450fa2b44e 100644 --- a/dd-java-agent/instrumentation/jetty/jetty-server/jetty-server-12.0/src/main/java17/datadog/trace/instrumentation/jetty12/JettyDecorator.java +++ b/dd-java-agent/instrumentation/jetty/jetty-server/jetty-server-12.0/src/main/java17/datadog/trace/instrumentation/jetty12/JettyDecorator.java @@ -83,6 +83,11 @@ protected int status(final Response response) { return response.getStatus(); } + @Override + protected String getRequestHeader(final Request request, String key) { + return request.getHeaders().get(key); + } + public AgentSpan onResponse(AgentSpan span, HttpChannelState channel) { Request request = channel.getRequest(); Response response = channel.getResponse(); diff --git a/dd-java-agent/instrumentation/jetty/jetty-server/jetty-server-7.0/src/main/java/datadog/trace/instrumentation/jetty70/JettyDecorator.java b/dd-java-agent/instrumentation/jetty/jetty-server/jetty-server-7.0/src/main/java/datadog/trace/instrumentation/jetty70/JettyDecorator.java index 4822293650c..7aa3f5a4c13 100644 --- a/dd-java-agent/instrumentation/jetty/jetty-server/jetty-server-7.0/src/main/java/datadog/trace/instrumentation/jetty70/JettyDecorator.java +++ b/dd-java-agent/instrumentation/jetty/jetty-server/jetty-server-7.0/src/main/java/datadog/trace/instrumentation/jetty70/JettyDecorator.java @@ -77,6 +77,11 @@ protected boolean isAppSecOnResponseSeparate() { return true; } + @Override + protected String getRequestHeader(final Request request, String key) { + return request.getHeader(key); + } + public AgentSpan onResponse(AgentSpan span, HttpConnection channel) { Request request = channel.getRequest(); Response response = channel.getResponse(); diff --git a/dd-java-agent/instrumentation/jetty/jetty-server/jetty-server-7.6/src/main/java/datadog/trace/instrumentation/jetty76/JettyDecorator.java b/dd-java-agent/instrumentation/jetty/jetty-server/jetty-server-7.6/src/main/java/datadog/trace/instrumentation/jetty76/JettyDecorator.java index f158ee1c418..b86623b8061 100644 --- a/dd-java-agent/instrumentation/jetty/jetty-server/jetty-server-7.6/src/main/java/datadog/trace/instrumentation/jetty76/JettyDecorator.java +++ b/dd-java-agent/instrumentation/jetty/jetty-server/jetty-server-7.6/src/main/java/datadog/trace/instrumentation/jetty76/JettyDecorator.java @@ -77,6 +77,11 @@ protected boolean isAppSecOnResponseSeparate() { return true; } + @Override + protected String getRequestHeader(final Request request, String key) { + return request.getHeader(key); + } + public AgentSpan onResponse(AgentSpan span, AbstractHttpConnection connection) { Request request = connection.getRequest(); Response response = connection.getResponse(); diff --git a/dd-java-agent/instrumentation/jetty/jetty-server/jetty-server-9.0/src/main/java/datadog/trace/instrumentation/jetty9/JettyDecorator.java b/dd-java-agent/instrumentation/jetty/jetty-server/jetty-server-9.0/src/main/java/datadog/trace/instrumentation/jetty9/JettyDecorator.java index 87b50fea628..6234b00dd5d 100644 --- a/dd-java-agent/instrumentation/jetty/jetty-server/jetty-server-9.0/src/main/java/datadog/trace/instrumentation/jetty9/JettyDecorator.java +++ b/dd-java-agent/instrumentation/jetty/jetty-server/jetty-server-9.0/src/main/java/datadog/trace/instrumentation/jetty9/JettyDecorator.java @@ -77,6 +77,11 @@ protected boolean isAppSecOnResponseSeparate() { return true; } + @Override + protected String getRequestHeader(final Request request, String key) { + return request.getHeader(key); + } + public AgentSpan onResponse(AgentSpan span, HttpChannel channel) { Request request = channel.getRequest(); Response response = channel.getResponse(); diff --git a/dd-java-agent/instrumentation/kafka/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaProducerInstrumentation.java b/dd-java-agent/instrumentation/kafka/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaProducerInstrumentation.java index 341b4f654da..c889983e338 100644 --- a/dd-java-agent/instrumentation/kafka/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaProducerInstrumentation.java +++ b/dd-java-agent/instrumentation/kafka/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaProducerInstrumentation.java @@ -15,6 +15,7 @@ import static datadog.trace.instrumentation.kafka_clients.KafkaDecorator.PRODUCER_DECORATE; import static datadog.trace.instrumentation.kafka_clients.KafkaDecorator.TIME_IN_QUEUE_ENABLED; import static datadog.trace.instrumentation.kafka_common.StreamingContext.STREAMING_CONTEXT; +import static datadog.trace.instrumentation.kafka_common.Utils.DSM_TRANSACTION_SOURCE_READER; import static java.util.Collections.singletonMap; import static net.bytebuddy.matcher.ElementMatchers.isMethod; import static net.bytebuddy.matcher.ElementMatchers.isPrivate; @@ -30,6 +31,7 @@ import datadog.trace.api.Config; import datadog.trace.api.datastreams.DataStreamsContext; import datadog.trace.api.datastreams.DataStreamsTags; +import datadog.trace.api.datastreams.DataStreamsTransactionExtractor; import datadog.trace.api.datastreams.StatsPoint; import datadog.trace.bootstrap.InstrumentationContext; import datadog.trace.bootstrap.instrumentation.api.AgentScope; @@ -83,6 +85,7 @@ public String[] helperClassNames() { packageName + ".KafkaProducerCallback", "datadog.trace.instrumentation.kafka_common.StreamingContext", "datadog.trace.instrumentation.kafka_common.ClusterIdHolder", + "datadog.trace.instrumentation.kafka_common.Utils", packageName + ".AvroSchemaExtractor", }; } @@ -202,6 +205,14 @@ record = if (TIME_IN_QUEUE_ENABLED) { setter.injectTimeInQueue(record.headers()); } + + AgentTracer.get() + .getDataStreamsMonitoring() + .trackTransaction( + span, + DataStreamsTransactionExtractor.Type.KAFKA_PRODUCE_HEADERS, + record, + DSM_TRANSACTION_SOURCE_READER); return activateSpan(span); } diff --git a/dd-java-agent/instrumentation/kafka/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/TracingIterator.java b/dd-java-agent/instrumentation/kafka/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/TracingIterator.java index f634706ca5e..e0076ce5aed 100644 --- a/dd-java-agent/instrumentation/kafka/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/TracingIterator.java +++ b/dd-java-agent/instrumentation/kafka/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/TracingIterator.java @@ -15,6 +15,7 @@ import static datadog.trace.instrumentation.kafka_clients.TextMapExtractAdapter.GETTER; import static datadog.trace.instrumentation.kafka_clients.TextMapInjectAdapter.SETTER; import static datadog.trace.instrumentation.kafka_common.StreamingContext.STREAMING_CONTEXT; +import static datadog.trace.instrumentation.kafka_common.Utils.DSM_TRANSACTION_SOURCE_READER; import static datadog.trace.instrumentation.kafka_common.Utils.computePayloadSizeBytes; import static java.util.concurrent.TimeUnit.MILLISECONDS; @@ -23,6 +24,7 @@ import datadog.trace.api.Config; import datadog.trace.api.datastreams.DataStreamsContext; import datadog.trace.api.datastreams.DataStreamsTags; +import datadog.trace.api.datastreams.DataStreamsTransactionExtractor; import datadog.trace.bootstrap.instrumentation.api.AgentSpan; import datadog.trace.bootstrap.instrumentation.api.AgentSpanContext; import datadog.trace.bootstrap.instrumentation.api.AgentTracer; @@ -128,6 +130,14 @@ protected void startNewRecordSpan(ConsumerRecord val) { if (null != queueSpan) { queueSpan.finish(); } + + AgentTracer.get() + .getDataStreamsMonitoring() + .trackTransaction( + span, + DataStreamsTransactionExtractor.Type.KAFKA_CONSUME_HEADERS, + val.headers(), + DSM_TRANSACTION_SOURCE_READER); } } catch (final Exception e) { log.debug("Error starting new record span", e); diff --git a/dd-java-agent/instrumentation/kafka/kafka-clients-3.8/src/main/java/datadog/trace/instrumentation/kafka_clients38/KafkaProducerInstrumentation.java b/dd-java-agent/instrumentation/kafka/kafka-clients-3.8/src/main/java/datadog/trace/instrumentation/kafka_clients38/KafkaProducerInstrumentation.java index 2e40f6b67e3..89fa295bbe2 100644 --- a/dd-java-agent/instrumentation/kafka/kafka-clients-3.8/src/main/java/datadog/trace/instrumentation/kafka_clients38/KafkaProducerInstrumentation.java +++ b/dd-java-agent/instrumentation/kafka/kafka-clients-3.8/src/main/java/datadog/trace/instrumentation/kafka_clients38/KafkaProducerInstrumentation.java @@ -43,6 +43,7 @@ public String[] helperClassNames() { packageName + ".KafkaProducerCallback", "datadog.trace.instrumentation.kafka_common.StreamingContext", "datadog.trace.instrumentation.kafka_common.ClusterIdHolder", + "datadog.trace.instrumentation.kafka_common.Utils", packageName + ".AvroSchemaExtractor", }; } diff --git a/dd-java-agent/instrumentation/kafka/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/TracingIterator.java b/dd-java-agent/instrumentation/kafka/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/TracingIterator.java index 1d5e93031e6..ceba52acac8 100644 --- a/dd-java-agent/instrumentation/kafka/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/TracingIterator.java +++ b/dd-java-agent/instrumentation/kafka/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/TracingIterator.java @@ -18,6 +18,7 @@ import datadog.trace.api.Config; import datadog.trace.api.datastreams.DataStreamsContext; import datadog.trace.api.datastreams.DataStreamsTags; +import datadog.trace.api.datastreams.DataStreamsTransactionExtractor; import datadog.trace.bootstrap.instrumentation.api.AgentSpan; import datadog.trace.bootstrap.instrumentation.api.AgentSpanContext; import datadog.trace.bootstrap.instrumentation.api.AgentTracer; @@ -128,6 +129,14 @@ protected void startNewRecordSpan(ConsumerRecord val) { if (null != queueSpan) { queueSpan.finish(); } + + AgentTracer.get() + .getDataStreamsMonitoring() + .trackTransaction( + span, + DataStreamsTransactionExtractor.Type.KAFKA_CONSUME_HEADERS, + val.headers(), + Utils.DSM_TRANSACTION_SOURCE_READER); } } catch (final Exception e) { log.debug("Error starting new record span", e); diff --git a/dd-java-agent/instrumentation/kafka/kafka-common/src/main/java/datadog/trace/instrumentation/kafka_common/Utils.java b/dd-java-agent/instrumentation/kafka/kafka-common/src/main/java/datadog/trace/instrumentation/kafka_common/Utils.java index 935692fee4c..cb035ce2c50 100644 --- a/dd-java-agent/instrumentation/kafka/kafka-common/src/main/java/datadog/trace/instrumentation/kafka_common/Utils.java +++ b/dd-java-agent/instrumentation/kafka/kafka-common/src/main/java/datadog/trace/instrumentation/kafka_common/Utils.java @@ -1,5 +1,6 @@ package datadog.trace.instrumentation.kafka_common; +import datadog.trace.api.datastreams.DataStreamsTransactionTracker; import java.nio.charset.StandardCharsets; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.header.Header; @@ -8,6 +9,10 @@ public final class Utils { private Utils() {} // prevent instantiation + public static DataStreamsTransactionTracker.TransactionSourceReader + DSM_TRANSACTION_SOURCE_READER = + (source, headerName) -> new String(((Headers) source).lastHeader(headerName).value()); + // this method is used in kafka-clients and kafka-streams instrumentations public static long computePayloadSizeBytes(ConsumerRecord val) { long headersSize = 0; diff --git a/dd-java-agent/instrumentation/restlet-2.2/src/main/java/datadog/trace/instrumentation/restlet/RestletDecorator.java b/dd-java-agent/instrumentation/restlet-2.2/src/main/java/datadog/trace/instrumentation/restlet/RestletDecorator.java index 58c37ffe5c2..627def26a91 100644 --- a/dd-java-agent/instrumentation/restlet-2.2/src/main/java/datadog/trace/instrumentation/restlet/RestletDecorator.java +++ b/dd-java-agent/instrumentation/restlet-2.2/src/main/java/datadog/trace/instrumentation/restlet/RestletDecorator.java @@ -67,4 +67,9 @@ protected int peerPort(final HttpExchange exchange) { protected int status(final HttpExchange exchange) { return exchange.getResponseCode(); } + + @Override + protected String getRequestHeader(final HttpExchange exchange, String key) { + return exchange.getRequestHeaders().getFirst(key); + } } diff --git a/dd-java-agent/instrumentation/servlet/javax-servlet/javax-servlet-2.2/src/main/java/datadog/trace/instrumentation/servlet2/Servlet2Decorator.java b/dd-java-agent/instrumentation/servlet/javax-servlet/javax-servlet-2.2/src/main/java/datadog/trace/instrumentation/servlet2/Servlet2Decorator.java index aab83420d51..b93877f73ed 100644 --- a/dd-java-agent/instrumentation/servlet/javax-servlet/javax-servlet-2.2/src/main/java/datadog/trace/instrumentation/servlet2/Servlet2Decorator.java +++ b/dd-java-agent/instrumentation/servlet/javax-servlet/javax-servlet-2.2/src/main/java/datadog/trace/instrumentation/servlet2/Servlet2Decorator.java @@ -69,6 +69,11 @@ protected int peerPort(final HttpServletRequest httpServletRequest) { return 0; } + @Override + protected String getRequestHeader(final HttpServletRequest request, String key) { + return request.getHeader(key); + } + @Override protected int status(final Integer status) { return status; diff --git a/dd-java-agent/instrumentation/servlet/javax-servlet/javax-servlet-3.0/src/main/java/datadog/trace/instrumentation/servlet3/Servlet3Decorator.java b/dd-java-agent/instrumentation/servlet/javax-servlet/javax-servlet-3.0/src/main/java/datadog/trace/instrumentation/servlet3/Servlet3Decorator.java index c153e2c747e..61a8d22a2b8 100644 --- a/dd-java-agent/instrumentation/servlet/javax-servlet/javax-servlet-3.0/src/main/java/datadog/trace/instrumentation/servlet3/Servlet3Decorator.java +++ b/dd-java-agent/instrumentation/servlet/javax-servlet/javax-servlet-3.0/src/main/java/datadog/trace/instrumentation/servlet3/Servlet3Decorator.java @@ -77,6 +77,11 @@ protected String requestedSessionId(final HttpServletRequest request) { return request.getRequestedSessionId(); } + @Override + protected String getRequestHeader(final HttpServletRequest request, String key) { + return request.getHeader(key); + } + @Override public AgentSpan onRequest( final AgentSpan span, diff --git a/dd-java-agent/instrumentation/spring/spring-webmvc/spring-webmvc-3.1/src/main/java/datadog/trace/instrumentation/springweb/SpringWebHttpServerDecorator.java b/dd-java-agent/instrumentation/spring/spring-webmvc/spring-webmvc-3.1/src/main/java/datadog/trace/instrumentation/springweb/SpringWebHttpServerDecorator.java index e4e759e2cc2..52ce8855048 100644 --- a/dd-java-agent/instrumentation/spring/spring-webmvc/spring-webmvc-3.1/src/main/java/datadog/trace/instrumentation/springweb/SpringWebHttpServerDecorator.java +++ b/dd-java-agent/instrumentation/spring/spring-webmvc/spring-webmvc-3.1/src/main/java/datadog/trace/instrumentation/springweb/SpringWebHttpServerDecorator.java @@ -92,6 +92,11 @@ protected int status(final HttpServletResponse httpServletResponse) { return httpServletResponse.getStatus(); } + @Override + protected String getRequestHeader(final HttpServletRequest request, String key) { + return request.getHeader(key); + } + @Override public AgentSpan onRequest( final AgentSpan span, diff --git a/dd-java-agent/instrumentation/spring/spring-webmvc/spring-webmvc-6.0/src/main/java17/datadog/trace/instrumentation/springweb6/SpringWebHttpServerDecorator.java b/dd-java-agent/instrumentation/spring/spring-webmvc/spring-webmvc-6.0/src/main/java17/datadog/trace/instrumentation/springweb6/SpringWebHttpServerDecorator.java index 9811ab6c662..79b89c1aad5 100644 --- a/dd-java-agent/instrumentation/spring/spring-webmvc/spring-webmvc-6.0/src/main/java17/datadog/trace/instrumentation/springweb6/SpringWebHttpServerDecorator.java +++ b/dd-java-agent/instrumentation/spring/spring-webmvc/spring-webmvc-6.0/src/main/java17/datadog/trace/instrumentation/springweb6/SpringWebHttpServerDecorator.java @@ -86,6 +86,11 @@ protected String peerHostIP(final HttpServletRequest httpServletRequest) { return httpServletRequest.getRemoteAddr(); } + @Override + protected String getRequestHeader(final HttpServletRequest request, String key) { + return request.getHeader(key); + } + @Override protected int peerPort(final HttpServletRequest httpServletRequest) { return httpServletRequest.getRemotePort(); diff --git a/dd-java-agent/instrumentation/tomcat/tomcat-common/src/main/java/datadog/trace/instrumentation/tomcat/TomcatDecorator.java b/dd-java-agent/instrumentation/tomcat/tomcat-common/src/main/java/datadog/trace/instrumentation/tomcat/TomcatDecorator.java index 7d20ed65e15..0303e7cdcd4 100644 --- a/dd-java-agent/instrumentation/tomcat/tomcat-common/src/main/java/datadog/trace/instrumentation/tomcat/TomcatDecorator.java +++ b/dd-java-agent/instrumentation/tomcat/tomcat-common/src/main/java/datadog/trace/instrumentation/tomcat/TomcatDecorator.java @@ -72,6 +72,11 @@ protected int peerPort(final Request request) { return request.getRemotePort(); } + @Override + protected String getRequestHeader(final Request request, String key) { + return request.getHeader(key); + } + @Override protected int status(final Response response) { int status = response.getStatus(); diff --git a/dd-trace-api/src/main/java/datadog/trace/api/config/GeneralConfig.java b/dd-trace-api/src/main/java/datadog/trace/api/config/GeneralConfig.java index 3d58a5d21ed..8b6ed02380c 100644 --- a/dd-trace-api/src/main/java/datadog/trace/api/config/GeneralConfig.java +++ b/dd-trace-api/src/main/java/datadog/trace/api/config/GeneralConfig.java @@ -88,6 +88,8 @@ public final class GeneralConfig { public static final String DATA_STREAMS_ENABLED = "data.streams.enabled"; public static final String DATA_STREAMS_BUCKET_DURATION_SECONDS = "data.streams.bucket_duration.seconds"; + public static final String DATA_STREAMS_TRANSACTION_EXTRACTORS = + "data.streams.transaction_extractors"; public static final String TELEMETRY_ENABLED = "instrumentation.telemetry.enabled"; public static final String TELEMETRY_HEARTBEAT_INTERVAL = "telemetry.heartbeat.interval"; diff --git a/dd-trace-api/src/main/java/datadog/trace/api/experimental/DataStreamsCheckpointer.java b/dd-trace-api/src/main/java/datadog/trace/api/experimental/DataStreamsCheckpointer.java index 574b9fdb2a0..6ee9365f336 100644 --- a/dd-trace-api/src/main/java/datadog/trace/api/experimental/DataStreamsCheckpointer.java +++ b/dd-trace-api/src/main/java/datadog/trace/api/experimental/DataStreamsCheckpointer.java @@ -24,6 +24,12 @@ static DataStreamsCheckpointer get() { */ void setConsumeCheckpoint(String type, String source, DataStreamsContextCarrier carrier); + /** + * @param transactionId Transaction ID to track. + * @param checkpointName Unique checkpoint name. + */ + void trackTransaction(String transactionId, String checkpointName); + /** * @param type The type of the checkpoint, usually the streaming technology being used. Examples: * kafka, kinesis, sns etc. @@ -45,5 +51,8 @@ public void setConsumeCheckpoint( @Override public void setProduceCheckpoint( String type, String target, DataStreamsContextCarrier carrier) {} + + @Override + public void trackTransaction(String transactionId, String checkpointName) {} } } diff --git a/dd-trace-core/src/main/java/datadog/trace/core/CoreTracer.java b/dd-trace-core/src/main/java/datadog/trace/core/CoreTracer.java index f434fe820b9..8916e8d757b 100644 --- a/dd-trace-core/src/main/java/datadog/trace/core/CoreTracer.java +++ b/dd-trace-core/src/main/java/datadog/trace/core/CoreTracer.java @@ -86,6 +86,7 @@ import datadog.trace.context.TraceScope; import datadog.trace.core.baggage.BaggagePropagator; import datadog.trace.core.datastreams.DataStreamsMonitoring; +import datadog.trace.core.datastreams.DataStreamsTransactionExtractors; import datadog.trace.core.datastreams.DefaultDataStreamsMonitoring; import datadog.trace.core.histogram.Histograms; import datadog.trace.core.monitor.HealthMetrics; @@ -682,6 +683,16 @@ private CoreTracer( } else { traceSamplingRules = TraceSamplingRules.deserialize(traceSamplingRulesJson); } + + DataStreamsTransactionExtractors dataStreamsTransactionExtractors; + String dataStreamsTransactionExtractorsJson = config.getDataStreamsTransactionExtractors(); + if (dataStreamsTransactionExtractorsJson == null) { + dataStreamsTransactionExtractors = DataStreamsTransactionExtractors.EMPTY; + } else { + dataStreamsTransactionExtractors = + DataStreamsTransactionExtractors.deserialize(dataStreamsTransactionExtractorsJson); + } + // Get initial Span Sampling Rules from config String spanSamplingRulesJson = config.getSpanSamplingRules(); String spanSamplingRulesFile = config.getSpanSamplingRulesFile(); @@ -711,6 +722,7 @@ private CoreTracer( .setSpanSamplingRules(spanSamplingRules.getRules()) .setTraceSamplingRules(traceSamplingRules.getRules(), traceSamplingRulesJson) .setTracingTags(config.getMergedSpanTags()) + .setDataStreamsTransactionExtractors(dataStreamsTransactionExtractors.getExtractors()) .apply(); this.logs128bTraceIdEnabled = Config.get().isLogs128bitTraceIdEnabled(); diff --git a/dd-trace-core/src/main/java/datadog/trace/core/StatusLogger.java b/dd-trace-core/src/main/java/datadog/trace/core/StatusLogger.java index 160081c706d..78b88d786c9 100644 --- a/dd-trace-core/src/main/java/datadog/trace/core/StatusLogger.java +++ b/dd-trace-core/src/main/java/datadog/trace/core/StatusLogger.java @@ -156,6 +156,9 @@ public void toJson(JsonWriter writer, Config config) throws IOException { } writer.name("data_streams_enabled"); writer.value(config.isDataStreamsEnabled()); + writer.name("data_streams_transaction_extractors"); + writer.value(config.getDataStreamsTransactionExtractors()); + writer.name("app_logs_collection_enabled"); writer.value(config.isAppLogsCollectionEnabled()); writer.endObject(); diff --git a/dd-trace-core/src/main/java/datadog/trace/core/TracingConfigPoller.java b/dd-trace-core/src/main/java/datadog/trace/core/TracingConfigPoller.java index ac88784ac79..6bbc13151e3 100644 --- a/dd-trace-core/src/main/java/datadog/trace/core/TracingConfigPoller.java +++ b/dd-trace-core/src/main/java/datadog/trace/core/TracingConfigPoller.java @@ -31,6 +31,7 @@ import datadog.trace.api.debugger.DebuggerConfigBridge; import datadog.trace.api.debugger.DebuggerConfigUpdate; import datadog.trace.api.sampling.SamplingRule; +import datadog.trace.core.datastreams.DataStreamsTransactionExtractors; import datadog.trace.logging.GlobalLogLevelSwitcher; import datadog.trace.logging.LogLevel; import java.io.ByteArrayInputStream; @@ -95,7 +96,12 @@ final class Updater implements ProductListener { private final JsonAdapter TRACE_SAMPLING_RULE; { - Moshi MOSHI = new Moshi.Builder().add(new TracingSamplingRulesAdapter()).build(); + Moshi MOSHI = + new Moshi.Builder() + .add(new TracingSamplingRulesAdapter()) + .add(new DataStreamsTransactionExtractors.DataStreamsTransactionExtractorsAdapter()) + .add(new DataStreamsTransactionExtractors.DataStreamsTransactionExtractorAdapter()) + .build(); CONFIG_OVERRIDES_ADAPTER = MOSHI.adapter(ConfigOverrides.class); LIB_CONFIG_ADAPTER = MOSHI.adapter(LibConfig.class); TRACE_SAMPLING_RULE = MOSHI.adapter(TracingSamplingRule.class); @@ -238,6 +244,10 @@ void applyConfigOverrides(LibConfig libConfig) { maybeOverride(builder::setServiceMapping, libConfig.serviceMapping); maybeOverride(builder::setHeaderTags, libConfig.headerTags); + if (null != libConfig.dataStreamsTransactionExtractors) { + builder.setDataStreamsTransactionExtractors( + libConfig.dataStreamsTransactionExtractors.getExtractors()); + } if (null != libConfig.tracingSamplingRules) { builder.setTraceSamplingRules( @@ -406,6 +416,9 @@ static final class LibConfig { @Json(name = "live_debugging_enabled") public Boolean liveDebuggingEnabled; + @Json(name = "data_streams_transaction_extractors") + public DataStreamsTransactionExtractors dataStreamsTransactionExtractors; + /** * Merges a list of LibConfig objects by taking the first non-null value for each field. * @@ -454,6 +467,9 @@ public static LibConfig mergeLibConfigs(List configs) { if (merged.tracingSamplingRules == null) { merged.tracingSamplingRules = config.tracingSamplingRules; } + if (merged.dataStreamsTransactionExtractors == null) { + merged.dataStreamsTransactionExtractors = config.dataStreamsTransactionExtractors; + } if (merged.dynamicInstrumentationEnabled == null) { merged.dynamicInstrumentationEnabled = config.dynamicInstrumentationEnabled; } diff --git a/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DataStreamsTransactionExtractors.java b/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DataStreamsTransactionExtractors.java new file mode 100644 index 00000000000..52088e97888 --- /dev/null +++ b/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DataStreamsTransactionExtractors.java @@ -0,0 +1,149 @@ +package datadog.trace.core.datastreams; + +import com.squareup.moshi.FromJson; +import com.squareup.moshi.JsonAdapter; +import com.squareup.moshi.JsonReader; +import com.squareup.moshi.Moshi; +import com.squareup.moshi.ToJson; +import com.squareup.moshi.Types; +import datadog.trace.api.datastreams.DataStreamsTransactionExtractor; +import java.io.IOException; +import java.lang.reflect.ParameterizedType; +import java.util.Collections; +import java.util.List; +import okio.BufferedSource; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class DataStreamsTransactionExtractors { + public static final DataStreamsTransactionExtractors EMPTY = + new DataStreamsTransactionExtractors("[]", Collections.emptyList()); + private static final Logger LOG = LoggerFactory.getLogger(DataStreamsTransactionExtractors.class); + private static final Moshi MOSHI = + new Moshi.Builder() + .add(new DataStreamsTransactionExtractors.DataStreamsTransactionExtractorAdapter()) + .build(); + private static final ParameterizedType LIST_OF_RULES = + Types.newParameterizedType(List.class, DataStreamsTransactionExtractorImpl.class); + public static final JsonAdapter> LIST_OF_RULES_ADAPTER = + MOSHI.adapter(LIST_OF_RULES); + + private final List extractors; + private final String json; + + public DataStreamsTransactionExtractors( + String json, List extractors) { + this.extractors = Collections.unmodifiableList(extractors); + this.json = json; + } + + public static DataStreamsTransactionExtractors deserialize(String json) { + try { + return new DataStreamsTransactionExtractors(json, LIST_OF_RULES_ADAPTER.fromJson(json)); + } catch (Throwable ex) { + LOG.debug("Couldn't parse Data Streams Extractors from JSON: {}", json, ex); + } + + return EMPTY; + } + + public List getExtractors() { + return extractors; + } + + private static final class JsonDataStreamsTransactionExtractor { + private static final JsonAdapter jsonAdapter = + MOSHI.adapter(JsonDataStreamsTransactionExtractor.class); + String name; + String type; + String value; + + @Override + public String toString() { + return jsonAdapter.toJson(this); + } + } + + public static final class DataStreamsTransactionExtractorsAdapter { + @FromJson + DataStreamsTransactionExtractors fromJson( + JsonReader reader, JsonAdapter> parser) + throws IOException { + if (reader.peek() == JsonReader.Token.NULL) { + return reader.nextNull(); + } + try (BufferedSource source = reader.nextSource()) { + String json = source.readUtf8(); + return new DataStreamsTransactionExtractors(json, parser.fromJson(json)); + } + } + + @ToJson + String toJson(DataStreamsTransactionExtractors extractors) { + return extractors.json; + } + } + + public static final class DataStreamsTransactionExtractorAdapter { + private static DataStreamsTransactionExtractor create( + JsonDataStreamsTransactionExtractor jsonExtractor) { + + DataStreamsTransactionExtractor.Type type; + try { + type = DataStreamsTransactionExtractor.Type.valueOf(jsonExtractor.type); + } catch (Throwable ex) { + type = DataStreamsTransactionExtractor.Type.UNKNOWN; + } + + return new DataStreamsTransactionExtractorImpl(jsonExtractor.name, type, jsonExtractor.value); + } + + @FromJson + DataStreamsTransactionExtractor fromJson(JsonDataStreamsTransactionExtractor jsonExtractor) { + return create(jsonExtractor); + } + + @ToJson + JsonDataStreamsTransactionExtractor toJson(DataStreamsTransactionExtractor extractor) { + throw new UnsupportedOperationException(); + } + } + + public static final class DataStreamsTransactionExtractorImpl + implements DataStreamsTransactionExtractor { + private final String name; + private final DataStreamsTransactionExtractor.Type type; + private final String value; + + public DataStreamsTransactionExtractorImpl( + final String name, final DataStreamsTransactionExtractor.Type type, final String value) { + this.name = name; + this.type = type; + this.value = value; + } + + public String getName() { + return name; + } + + public DataStreamsTransactionExtractor.Type getType() { + return type; + } + + public String getValue() { + return value; + } + + @Override + public String toString() { + return "DataStreamsTransactionExtractorImpl{" + + "name='" + + name + + "', type='" + + type.name() + + "', value='" + + value + + "'}"; + } + } +} diff --git a/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DefaultDataStreamsMonitoring.java b/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DefaultDataStreamsMonitoring.java index 931d3583afe..fc7c328213b 100644 --- a/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DefaultDataStreamsMonitoring.java +++ b/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DefaultDataStreamsMonitoring.java @@ -6,6 +6,7 @@ import static datadog.trace.api.datastreams.DataStreamsTags.Direction.OUTBOUND; import static datadog.trace.api.datastreams.DataStreamsTags.create; import static datadog.trace.api.datastreams.DataStreamsTags.createManual; +import static datadog.trace.api.datastreams.DataStreamsTransactionExtractor.MAX_NUM_EXTRACTORS; import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activeSpan; import static datadog.trace.util.AgentThreadFactory.AgentThread.DATA_STREAMS_MONITORING; import static datadog.trace.util.AgentThreadFactory.THREAD_JOIN_TIMOUT_MS; @@ -16,20 +17,31 @@ import datadog.context.propagation.Propagator; import datadog.trace.api.Config; import datadog.trace.api.TraceConfig; -import datadog.trace.api.datastreams.*; +import datadog.trace.api.datastreams.Backlog; +import datadog.trace.api.datastreams.DataStreamsContext; +import datadog.trace.api.datastreams.DataStreamsTags; +import datadog.trace.api.datastreams.DataStreamsTransactionExtractor; +import datadog.trace.api.datastreams.InboxItem; +import datadog.trace.api.datastreams.NoopPathwayContext; +import datadog.trace.api.datastreams.PathwayContext; import datadog.trace.api.datastreams.SchemaRegistryUsage; +import datadog.trace.api.datastreams.StatsPoint; +import datadog.trace.api.datastreams.TransactionInfo; import datadog.trace.api.experimental.DataStreamsContextCarrier; import datadog.trace.api.time.TimeSource; import datadog.trace.bootstrap.instrumentation.api.AgentSpan; import datadog.trace.bootstrap.instrumentation.api.Schema; import datadog.trace.bootstrap.instrumentation.api.SchemaIterator; +import datadog.trace.bootstrap.instrumentation.api.Tags; import datadog.trace.common.metrics.EventListener; import datadog.trace.common.metrics.OkHttpSink; import datadog.trace.common.metrics.Sink; import datadog.trace.core.DDSpan; import datadog.trace.core.DDTraceCoreInfo; import datadog.trace.util.AgentTaskScheduler; +import java.util.ArrayList; import java.util.Collections; +import java.util.EnumMap; import java.util.HashMap; import java.util.Iterator; import java.util.LinkedList; @@ -46,6 +58,8 @@ public class DefaultDataStreamsMonitoring implements DataStreamsMonitoring, Even private static final Logger log = LoggerFactory.getLogger(DefaultDataStreamsMonitoring.class); static final long FEATURE_CHECK_INTERVAL_NANOS = TimeUnit.MINUTES.toNanos(5); + static final long MAX_TRANSACTION_CONTAINER_SIZE = 1024 * 512; + static final List NO_EXTRACTORS = Collections.emptyList(); private static final StatsPoint REPORT = new StatsPoint(DataStreamsTags.EMPTY, 0, 0, 0, 0, 0, 0, 0, null); @@ -69,6 +83,12 @@ public class DefaultDataStreamsMonitoring implements DataStreamsMonitoring, Even private final ConcurrentHashMap schemaSamplers; private static final ThreadLocal serviceNameOverride = new ThreadLocal<>(); + // contains a list of active extractors by type. It is not thread safe, but it's populated only + // once on background thread start. + private static final Map< + DataStreamsTransactionExtractor.Type, List> + extractorsByType = new EnumMap<>(DataStreamsTransactionExtractor.Type.class); + public DefaultDataStreamsMonitoring( Config config, SharedCommunicationObjects sharedCommunicationObjects, @@ -183,6 +203,38 @@ public void clearThreadServiceName() { serviceNameOverride.remove(); } + @Override + public void trackTransaction(String transactionId, String checkpointName) { + inbox.offer( + new TransactionInfo(transactionId, timeSource.getCurrentTimeNanos(), checkpointName)); + } + + @Override + public void trackTransaction( + AgentSpan span, + DataStreamsTransactionExtractor.Type extractorType, + Object source, + TransactionSourceReader sourceReader) { + if (!supportsDataStreams || source == null) { + return; + } + + List extractorList = extractorsByType.get(extractorType); + if (extractorList == null) { + return; + } + + for (DataStreamsTransactionExtractor extractor : extractorList) { + String transactionId = sourceReader.readHeader(source, extractor.getValue()); + if (transactionId != null && !transactionId.isEmpty()) { + trackTransaction(transactionId, extractor.getName()); + + span.setTag(Tags.DSM_TRANSACTION_ID, transactionId); + span.setTag(Tags.DSM_TRANSACTION_CHECKPOINT, extractor.getName()); + } + } + } + private static String getThreadServiceName() { return serviceNameOverride.get(); } @@ -214,6 +266,7 @@ public void mergePathwayContextIntoSpan(AgentSpan span, DataStreamsContextCarrie } } + @Override public void trackBacklog(DataStreamsTags tags, long value) { inbox.offer(new Backlog(tags, value, timeSource.getCurrentTimeNanos(), getThreadServiceName())); } @@ -336,6 +389,8 @@ public void run() { agentSupportsDataStreams = features.supportsDataStreams(); checkDynamicConfig(); + // only load extractors on startup + updateExtractorsFromConfig(); if (!configSupportsDataStreams) { log.debug("Data streams is disabled"); @@ -379,6 +434,17 @@ public void run() { StatsBucket statsBucket = getStatsBucket(backlog.getTimestampNanos(), backlog.getServiceNameOverride()); statsBucket.addBacklog(backlog); + } else if (payload instanceof TransactionInfo) { + TransactionInfo transactionInfo = (TransactionInfo) payload; + StatsBucket statsBucket = getStatsBucket(transactionInfo.getTimestamp(), ""); + statsBucket.addTransaction(transactionInfo); + // we want to force flush when the transaction payload gets too big + // with 512kb and approx 20 bytes per transaction we're looking at ~26k + // transaction/sec + // this should be enough for 99.9% of the users + if (statsBucket.getTransactions().getSize() >= MAX_TRANSACTION_CONTAINER_SIZE) { + inbox.offer(REPORT); + } } else if (payload instanceof SchemaRegistryUsage) { SchemaRegistryUsage usage = (SchemaRegistryUsage) payload; StatsBucket statsBucket = @@ -456,6 +522,27 @@ public void onEvent(EventType eventType, String message) { } } + /* updateExtractorsFromConfig is called only once during startup */ + private void updateExtractorsFromConfig() { + if (!supportsDataStreams) { + return; + } + + List extractors = + traceConfigSupplier.get().getDataStreamsTransactionExtractors(); + if (extractors == null) { + return; + } + // we support up to MAX_NUM_EXTRACTORS + for (int i = 0; i < Math.min(extractors.size(), MAX_NUM_EXTRACTORS); i++) { + DataStreamsTransactionExtractor extractor = extractors.get(i); + List list = + extractorsByType.computeIfAbsent(extractor.getType(), k -> new ArrayList<>()); + list.add(extractor); + } + log.debug("Added {} data streams transaction extractors", extractors.size()); + } + private void checkDynamicConfig() { configSupportsDataStreams = traceConfigSupplier.get().isDataStreamsEnabled(); supportsDataStreams = agentSupportsDataStreams && configSupportsDataStreams; diff --git a/dd-trace-core/src/main/java/datadog/trace/core/datastreams/MsgPackDatastreamsPayloadWriter.java b/dd-trace-core/src/main/java/datadog/trace/core/datastreams/MsgPackDatastreamsPayloadWriter.java index 277820af4e9..2f7ad81f486 100644 --- a/dd-trace-core/src/main/java/datadog/trace/core/datastreams/MsgPackDatastreamsPayloadWriter.java +++ b/dd-trace-core/src/main/java/datadog/trace/core/datastreams/MsgPackDatastreamsPayloadWriter.java @@ -10,6 +10,7 @@ import datadog.trace.api.ProcessTags; import datadog.trace.api.WellKnownTags; import datadog.trace.api.datastreams.DataStreamsTags; +import datadog.trace.api.datastreams.TransactionInfo; import datadog.trace.bootstrap.instrumentation.api.UTF8BytesString; import datadog.trace.common.metrics.Sink; import java.util.Collection; @@ -45,6 +46,9 @@ public class MsgPackDatastreamsPayloadWriter implements DatastreamsPayloadWriter private static final byte[] OPERATION = "Operation".getBytes(ISO_8859_1); private static final byte[] PRODUCTS_MASK = "ProductMask".getBytes(ISO_8859_1); private static final byte[] PROCESS_TAGS = "ProcessTags".getBytes(ISO_8859_1); + private static final byte[] TRANSACTIONS = "Transactions".getBytes(ISO_8859_1); + private static final byte[] TRANSACTION_CHECKPOINT_IDS = + "TransactionCheckpointIds".getBytes(ISO_8859_1); private static final int INITIAL_CAPACITY = 512 * 1024; @@ -101,7 +105,7 @@ public void writePayload(Collection data, String serviceNameOverrid /* 2 */ writer.writeUTF8(SERVICE); - if (serviceNameOverride != null) { + if (serviceNameOverride != null && !serviceNameOverride.isEmpty()) { writer.writeUTF8(serviceNameOverride.getBytes(ISO_8859_1)); } else { writer.writeUTF8(wellKnownTags.getService()); @@ -129,15 +133,14 @@ public void writePayload(Collection data, String serviceNameOverrid for (StatsBucket bucket : data) { boolean hasBacklogs = !bucket.getBacklogs().isEmpty(); + boolean hasTransactions = !bucket.getTransactions().isEmpty(); + boolean hasSchemaRegistryUsages = !bucket.getSchemaRegistryUsages().isEmpty(); - int mapSize = 3; - if (hasBacklogs) { - mapSize++; - } - if (hasSchemaRegistryUsages) { - mapSize++; - } - writer.startMap(mapSize); + writer.startMap( + 3 + + (hasBacklogs ? 1 : 0) + + (hasTransactions ? 2 : 0) + + (hasSchemaRegistryUsages ? 1 : 0)); /* 1 */ writer.writeUTF8(START); @@ -160,6 +163,15 @@ public void writePayload(Collection data, String serviceNameOverrid /* 5 */ writeSchemaRegistryUsages(bucket.getSchemaRegistryUsages(), writer); } + + if (hasTransactions) { + /* 6 */ + writer.writeUTF8(TRANSACTIONS); + writer.writeBinary(bucket.getTransactions().getData()); + /* 7 */ + writer.writeUTF8(TRANSACTION_CHECKPOINT_IDS); + writer.writeBinary(TransactionInfo.getCheckpointIdCacheBytes()); + } } /* 8 */ diff --git a/dd-trace-core/src/main/java/datadog/trace/core/datastreams/StatsBucket.java b/dd-trace-core/src/main/java/datadog/trace/core/datastreams/StatsBucket.java index 1ddbffd94a4..22c9a8756f0 100644 --- a/dd-trace-core/src/main/java/datadog/trace/core/datastreams/StatsBucket.java +++ b/dd-trace-core/src/main/java/datadog/trace/core/datastreams/StatsBucket.java @@ -4,6 +4,7 @@ import datadog.trace.api.datastreams.DataStreamsTags; import datadog.trace.api.datastreams.SchemaRegistryUsage; import datadog.trace.api.datastreams.StatsPoint; +import datadog.trace.api.datastreams.TransactionInfo; import java.util.Collection; import java.util.HashMap; import java.util.Map; @@ -13,6 +14,7 @@ public class StatsBucket { private final long bucketDurationNanos; private final Map hashToGroup = new HashMap<>(); private final Map backlogs = new HashMap<>(); + private final TransactionContainer transactions = new TransactionContainer(1024); private final Map schemaRegistryUsages = new HashMap<>(); public StatsBucket(long startTimeNanos, long bucketDurationNanos) { @@ -54,6 +56,10 @@ public void addSchemaRegistryUsage(SchemaRegistryUsage usage) { schemaRegistryUsages.merge(key, 1L, Long::sum); } + public void addTransaction(TransactionInfo transaction) { + transactions.add(transaction); + } + public long getStartTimeNanos() { return startTimeNanos; } @@ -70,6 +76,10 @@ public Collection> getBacklogs() { return backlogs.entrySet(); } + public TransactionContainer getTransactions() { + return transactions; + } + public Collection> getSchemaRegistryUsages() { return schemaRegistryUsages.entrySet(); } diff --git a/dd-trace-core/src/main/java/datadog/trace/core/datastreams/TransactionContainer.java b/dd-trace-core/src/main/java/datadog/trace/core/datastreams/TransactionContainer.java new file mode 100644 index 00000000000..834064293cc --- /dev/null +++ b/dd-trace-core/src/main/java/datadog/trace/core/datastreams/TransactionContainer.java @@ -0,0 +1,44 @@ +package datadog.trace.core.datastreams; + +import datadog.trace.api.datastreams.TransactionInfo; +import java.util.Arrays; + +public class TransactionContainer { + // we store data as an array of bytes, since the number of object can be significant + private byte[] data; + private int size; + + public TransactionContainer(Integer initialSizeBytes) { + this.data = new byte[initialSizeBytes]; + } + + public void add(TransactionInfo transactionInfo) { + // check if we need to resize + byte[] transactionBytes = transactionInfo.getBytes(); + + // resize buffer if needed + if (data.length - size < transactionBytes.length) { + data = Arrays.copyOf(data, data.length << 1); + } + + // add data + System.arraycopy(transactionBytes, 0, data, size, transactionBytes.length); + size += transactionBytes.length; + } + + public boolean isEmpty() { + return size == 0; + } + + public void clear() { + size = 0; + } + + public int getSize() { + return size; + } + + public byte[] getData() { + return Arrays.copyOf(data, size); + } +} diff --git a/dd-trace-core/src/test/groovy/datadog/trace/core/TracingConfigPollerTest.groovy b/dd-trace-core/src/test/groovy/datadog/trace/core/TracingConfigPollerTest.groovy index ff5b0ea4e25..c8a67779585 100644 --- a/dd-trace-core/src/test/groovy/datadog/trace/core/TracingConfigPollerTest.groovy +++ b/dd-trace-core/src/test/groovy/datadog/trace/core/TracingConfigPollerTest.groovy @@ -7,13 +7,13 @@ import datadog.remoteconfig.ConfigurationPoller import datadog.remoteconfig.Product import datadog.remoteconfig.state.ParsedConfigKey import datadog.remoteconfig.state.ProductListener +import datadog.trace.api.datastreams.DataStreamsTransactionExtractor import datadog.trace.core.test.DDCoreSpecification +import java.nio.charset.StandardCharsets import okhttp3.HttpUrl import okhttp3.OkHttpClient import spock.lang.Timeout -import java.nio.charset.StandardCharsets - @Timeout(10) class TracingConfigPollerTest extends DDCoreSpecification { @@ -169,7 +169,14 @@ class TracingConfigPollerTest extends DDCoreSpecification { "tag_name": "custom.header" } ], - "tracing_sampling_rate": 1.3 + "tracing_sampling_rate": 1.3, + "data_streams_transaction_extractors": [ + { + "name": "test", + "type": "unknown", + "value": "value" + } + ] } } """.getBytes(StandardCharsets.UTF_8), null) @@ -183,6 +190,10 @@ class TracingConfigPollerTest extends DDCoreSpecification { tracer.captureTraceConfig().traceSampleRate == 1.0 // should be clamped to 1.0 tracer.captureTraceConfig().requestHeaderTags == ["x-custom-header": "custom.header"] tracer.captureTraceConfig().responseHeaderTags == ["x-custom-header": "custom.header"] + tracer.captureTraceConfig().getDataStreamsTransactionExtractors().size() == 1 + tracer.captureTraceConfig().getDataStreamsTransactionExtractors()[0].name == "test" + tracer.captureTraceConfig().getDataStreamsTransactionExtractors()[0].type == DataStreamsTransactionExtractor.Type.UNKNOWN + tracer.captureTraceConfig().getDataStreamsTransactionExtractors()[0].value == "value" when: // Remove service level config diff --git a/dd-trace-core/src/test/groovy/datadog/trace/core/datastreams/DataStreamsTransactionExtractorsTest.groovy b/dd-trace-core/src/test/groovy/datadog/trace/core/datastreams/DataStreamsTransactionExtractorsTest.groovy new file mode 100644 index 00000000000..be3bdc9d003 --- /dev/null +++ b/dd-trace-core/src/test/groovy/datadog/trace/core/datastreams/DataStreamsTransactionExtractorsTest.groovy @@ -0,0 +1,23 @@ +package datadog.trace.core.datastreams + +import datadog.trace.api.datastreams.DataStreamsTransactionExtractor +import datadog.trace.core.test.DDCoreSpecification + +class DataStreamsTransactionExtractorsTest extends DDCoreSpecification { + def "Deserialize from json"() { + when: + def list = DataStreamsTransactionExtractors.deserialize("""[ + {"name": "extractor", "type": "HTTP_OUT_HEADERS", "value": "transaction_id"}, + {"name": "second_extractor", "type": "HTTP_IN_HEADERS", "value": "transaction_id"} + ]""") + def extractors = list.getExtractors() + then: + extractors.size() == 2 + extractors[0].getName() == "extractor" + extractors[0].getType() == DataStreamsTransactionExtractor.Type.HTTP_OUT_HEADERS + extractors[0].getValue() == "transaction_id" + extractors[1].getName() == "second_extractor" + extractors[1].getType() == DataStreamsTransactionExtractor.Type.HTTP_IN_HEADERS + extractors[1].getValue() == "transaction_id" + } +} diff --git a/dd-trace-core/src/test/groovy/datadog/trace/core/datastreams/TransactionContainerTest.groovy b/dd-trace-core/src/test/groovy/datadog/trace/core/datastreams/TransactionContainerTest.groovy new file mode 100644 index 00000000000..a7593acb19e --- /dev/null +++ b/dd-trace-core/src/test/groovy/datadog/trace/core/datastreams/TransactionContainerTest.groovy @@ -0,0 +1,49 @@ +package datadog.trace.core.datastreams + +import datadog.trace.api.datastreams.TransactionInfo +import datadog.trace.core.test.DDCoreSpecification + +class TransactionContainerTest extends DDCoreSpecification { + def "test with no resize"() { + given: + TransactionInfo.resetCache() + def container = new TransactionContainer(1024) + container.add(new TransactionInfo("1", 1, "1")) + container.add(new TransactionInfo("2", 2, "2")) + def data = container.getData() + + expect: + data.size() == 22 + data == new byte[] { + 1, 0, 0, 0, 0, 0, 0, 0, 1, 1, 49, 2, 0, 0, 0, 0, 0, 0, 0, 2, 1, 50 + } + } + + def "test with with resize"() { + given: + TransactionInfo.resetCache() + def container = new TransactionContainer(10) + container.add(new TransactionInfo("1", 1, "1")) + container.add(new TransactionInfo("2", 2, "2")) + def data = container.getData() + + expect: + data.size() == 22 + data == new byte[] { + 1, 0, 0, 0, 0, 0, 0, 0, 1, 1, 49, 2, 0, 0, 0, 0, 0, 0, 0, 2, 1, 50 + } + } + + def "test checkpoint map"() { + given: + TransactionInfo.resetCache() + new TransactionInfo("1", 1, "1") + new TransactionInfo("2", 2, "2") + def data = TransactionInfo.getCheckpointIdCacheBytes() + expect: + data.size() == 6 + data == new byte[] { + 1, 1, 49, 2, 1, 50 + } + } +} diff --git a/internal-api/src/main/java/datadog/trace/api/Config.java b/internal-api/src/main/java/datadog/trace/api/Config.java index 9b080e6bff4..07699f7e6b2 100644 --- a/internal-api/src/main/java/datadog/trace/api/Config.java +++ b/internal-api/src/main/java/datadog/trace/api/Config.java @@ -357,6 +357,7 @@ import static datadog.trace.api.config.GeneralConfig.DATA_JOBS_PARSE_SPARK_PLAN_ENABLED; import static datadog.trace.api.config.GeneralConfig.DATA_STREAMS_BUCKET_DURATION_SECONDS; import static datadog.trace.api.config.GeneralConfig.DATA_STREAMS_ENABLED; +import static datadog.trace.api.config.GeneralConfig.DATA_STREAMS_TRANSACTION_EXTRACTORS; import static datadog.trace.api.config.GeneralConfig.DOGSTATSD_ARGS; import static datadog.trace.api.config.GeneralConfig.DOGSTATSD_HOST; import static datadog.trace.api.config.GeneralConfig.DOGSTATSD_NAMED_PIPE; @@ -1228,6 +1229,7 @@ public static String getHostName() { private final boolean dataStreamsEnabled; private final float dataStreamsBucketDurationSeconds; + private final String dataStreamsTransactionExtractors; private final boolean serviceDiscoveryEnabled; @@ -2751,6 +2753,8 @@ PROFILING_DATADOG_PROFILER_ENABLED, isDatadogProfilerSafeInCurrentEnvironment()) dataStreamsBucketDurationSeconds = configProvider.getFloat( DATA_STREAMS_BUCKET_DURATION_SECONDS, DEFAULT_DATA_STREAMS_BUCKET_DURATION); + dataStreamsTransactionExtractors = + configProvider.getString(DATA_STREAMS_TRANSACTION_EXTRACTORS); azureAppServices = configProvider.getBoolean(AZURE_APP_SERVICES, false); traceAgentPath = configProvider.getString(TRACE_AGENT_PATH); @@ -4599,6 +4603,10 @@ public float getDataStreamsBucketDurationSeconds() { return dataStreamsBucketDurationSeconds; } + public String getDataStreamsTransactionExtractors() { + return dataStreamsTransactionExtractors; + } + public long getDataStreamsBucketDurationNanoseconds() { // Rounds to the nearest millisecond before converting to nanos int milliseconds = Math.round(dataStreamsBucketDurationSeconds * 1000); diff --git a/internal-api/src/main/java/datadog/trace/api/DynamicConfig.java b/internal-api/src/main/java/datadog/trace/api/DynamicConfig.java index fb14e8370aa..4a33193b2ae 100644 --- a/internal-api/src/main/java/datadog/trace/api/DynamicConfig.java +++ b/internal-api/src/main/java/datadog/trace/api/DynamicConfig.java @@ -14,6 +14,7 @@ import static datadog.trace.util.ConfigStrings.normalizedHeaderTag; import static datadog.trace.util.ConfigStrings.trim; +import datadog.trace.api.datastreams.DataStreamsTransactionExtractor; import datadog.trace.api.sampling.SamplingRule.SpanSamplingRule; import datadog.trace.api.sampling.SamplingRule.TraceSamplingRule; import java.util.Collection; @@ -111,6 +112,7 @@ public final class Builder { Double traceSampleRate; String preferredServiceName; + List dataStreamsTransactionExtractors; Builder() {} @@ -134,6 +136,7 @@ public final class Builder { this.tracingTags = snapshot.tracingTags; this.preferredServiceName = snapshot.preferredServiceName; + this.dataStreamsTransactionExtractors = snapshot.dataStreamsTransactionExtractors; } public Builder setRuntimeMetricsEnabled(boolean runtimeMetricsEnabled) { @@ -151,6 +154,12 @@ public Builder setDataStreamsEnabled(boolean dataStreamsEnabled) { return this; } + public Builder setDataStreamsTransactionExtractors( + List dataStreamsTransactionExtractors) { + this.dataStreamsTransactionExtractors = dataStreamsTransactionExtractors; + return this; + } + public Builder setServiceMapping(Map serviceMapping) { return setServiceMapping(serviceMapping.entrySet()); } @@ -324,6 +333,7 @@ public static class Snapshot implements TraceConfig { final Map tracingTags; final String preferredServiceName; + final List dataStreamsTransactionExtractors; protected Snapshot(DynamicConfig.Builder builder, Snapshot oldSnapshot) { @@ -345,6 +355,7 @@ protected Snapshot(DynamicConfig.Builder builder, Snapshot oldSnapshot) { this.tracingTags = nullToEmpty(builder.tracingTags); this.preferredServiceName = builder.preferredServiceName; + this.dataStreamsTransactionExtractors = builder.dataStreamsTransactionExtractors; } private static Map nullToEmpty(Map mapping) { @@ -415,6 +426,11 @@ public List getTraceSamplingRules() { return traceSamplingRules; } + @Override + public List getDataStreamsTransactionExtractors() { + return dataStreamsTransactionExtractors; + } + @Override public Map getTracingTags() { return tracingTags; diff --git a/internal-api/src/main/java/datadog/trace/api/TraceConfig.java b/internal-api/src/main/java/datadog/trace/api/TraceConfig.java index fa7b814fbea..470e662adea 100644 --- a/internal-api/src/main/java/datadog/trace/api/TraceConfig.java +++ b/internal-api/src/main/java/datadog/trace/api/TraceConfig.java @@ -1,5 +1,6 @@ package datadog.trace.api; +import datadog.trace.api.datastreams.DataStreamsTransactionExtractor; import datadog.trace.api.sampling.SamplingRule.SpanSamplingRule; import datadog.trace.api.sampling.SamplingRule.TraceSamplingRule; import java.util.List; @@ -47,4 +48,11 @@ public interface TraceConfig { * @return The tracer sampler Trace Sampling Rules, or an empty collection if no rule is defined. */ List getTraceSamplingRules(); + + /** + * Get DSM transaction extractors. + * + * @return List of Data Streams Transactions extractors. + */ + List getDataStreamsTransactionExtractors(); } diff --git a/internal-api/src/main/java/datadog/trace/api/WellKnownTags.java b/internal-api/src/main/java/datadog/trace/api/WellKnownTags.java index f01e8d154ef..fa00fea11cb 100644 --- a/internal-api/src/main/java/datadog/trace/api/WellKnownTags.java +++ b/internal-api/src/main/java/datadog/trace/api/WellKnownTags.java @@ -49,4 +49,21 @@ public UTF8BytesString getVersion() { public UTF8BytesString getLanguage() { return language; } + + public String toString() { + return "WellKnownTags{" + + "runtimeId=" + + runtimeId + + ", hostname=" + + hostname + + ", env=" + + env + + ", service=" + + service + + ", version=" + + version + + ", language=" + + language + + "}"; + } } diff --git a/internal-api/src/main/java/datadog/trace/api/datastreams/AgentDataStreamsMonitoring.java b/internal-api/src/main/java/datadog/trace/api/datastreams/AgentDataStreamsMonitoring.java index c11fab3fc28..62bab0d9872 100644 --- a/internal-api/src/main/java/datadog/trace/api/datastreams/AgentDataStreamsMonitoring.java +++ b/internal-api/src/main/java/datadog/trace/api/datastreams/AgentDataStreamsMonitoring.java @@ -5,7 +5,8 @@ import datadog.trace.bootstrap.instrumentation.api.Schema; import datadog.trace.bootstrap.instrumentation.api.SchemaIterator; -public interface AgentDataStreamsMonitoring extends DataStreamsCheckpointer { +public interface AgentDataStreamsMonitoring + extends DataStreamsCheckpointer, DataStreamsTransactionTracker { void trackBacklog(DataStreamsTags tags, long value); /** diff --git a/internal-api/src/main/java/datadog/trace/api/datastreams/DataStreamsTransactionExtractor.java b/internal-api/src/main/java/datadog/trace/api/datastreams/DataStreamsTransactionExtractor.java new file mode 100644 index 00000000000..a8b314ff48f --- /dev/null +++ b/internal-api/src/main/java/datadog/trace/api/datastreams/DataStreamsTransactionExtractor.java @@ -0,0 +1,26 @@ +package datadog.trace.api.datastreams; + +public interface DataStreamsTransactionExtractor { + int MAX_NUM_EXTRACTORS = 64; + + enum Type { + UNKNOWN, + /** HTTP_OUT_HEADERS targets outgoing HTTP requests */ + HTTP_OUT_HEADERS, + /** HTTP_IN_HEADERS targets incoming HTTP requests */ + HTTP_IN_HEADERS, + /** KAFKA_CONSUME_HEADERS targets headers from consumed messages (after consume) */ + KAFKA_CONSUME_HEADERS, + /** KAFKA_CONSUME_HEADERS targets headers from produced messages (before produce) */ + KAFKA_PRODUCE_HEADERS + } + + /** getName returns transaction extractor name */ + String getName(); + + /** getType returns transaction extractor type */ + Type getType(); + + /** getValue returns transaction extractor value */ + String getValue(); +} diff --git a/internal-api/src/main/java/datadog/trace/api/datastreams/DataStreamsTransactionTracker.java b/internal-api/src/main/java/datadog/trace/api/datastreams/DataStreamsTransactionTracker.java new file mode 100644 index 00000000000..4875c13f365 --- /dev/null +++ b/internal-api/src/main/java/datadog/trace/api/datastreams/DataStreamsTransactionTracker.java @@ -0,0 +1,22 @@ +package datadog.trace.api.datastreams; + +import datadog.trace.bootstrap.instrumentation.api.AgentSpan; + +public interface DataStreamsTransactionTracker { + interface TransactionSourceReader { + String readHeader(Object source, String headerName); + } + + /** trackTransaction used to emit "seen" event for transactions */ + void trackTransaction(String transactionId, String checkpointName); + + /** + * trackTransaction which tries to extract / track transactions info using extractors of + * extractorType from the provided source using source reader + */ + void trackTransaction( + AgentSpan span, + DataStreamsTransactionExtractor.Type extractorType, + Object source, + TransactionSourceReader sourceReader); +} diff --git a/internal-api/src/main/java/datadog/trace/api/datastreams/NoopDataStreamsMonitoring.java b/internal-api/src/main/java/datadog/trace/api/datastreams/NoopDataStreamsMonitoring.java index bd8e19fe6dc..fc51634c098 100644 --- a/internal-api/src/main/java/datadog/trace/api/datastreams/NoopDataStreamsMonitoring.java +++ b/internal-api/src/main/java/datadog/trace/api/datastreams/NoopDataStreamsMonitoring.java @@ -55,6 +55,16 @@ public void setThreadServiceName(String serviceName) {} @Override public void clearThreadServiceName() {} + @Override + public void trackTransaction(String transactionId, String checkpointName) {} + + @Override + public void trackTransaction( + AgentSpan span, + DataStreamsTransactionExtractor.Type extractorType, + Object source, + TransactionSourceReader sourceReader) {} + @Override public void setConsumeCheckpoint(String type, String source, DataStreamsContextCarrier carrier) {} diff --git a/internal-api/src/main/java/datadog/trace/api/datastreams/TransactionInfo.java b/internal-api/src/main/java/datadog/trace/api/datastreams/TransactionInfo.java new file mode 100644 index 00000000000..cb17253783e --- /dev/null +++ b/internal-api/src/main/java/datadog/trace/api/datastreams/TransactionInfo.java @@ -0,0 +1,76 @@ +package datadog.trace.api.datastreams; + +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.util.Arrays; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +public final class TransactionInfo implements InboxItem { + private static final int MAX_ID_SIZE = 256; + private static final Map CACHE = new ConcurrentHashMap<>(); + private static byte[] CACHE_BYTES = new byte[0]; + + private final String id; + private final long timestamp; + private final int checkpointId; + + public TransactionInfo(String id, long timestamp, String checkpoint) { + this.id = id; + this.timestamp = timestamp; + this.checkpointId = CACHE.computeIfAbsent(checkpoint, k -> generateCheckpointId(checkpoint)); + } + + public String getId() { + return id; + } + + public Long getTimestamp() { + return timestamp; + } + + public Integer getCheckpointId() { + return checkpointId; + } + + private int generateCheckpointId(String checkpoint) { + int id = CACHE.size() + 1; + + // update cache bytes + byte[] checkpointBytes = checkpoint.getBytes(); + int idx = CACHE_BYTES.length; + CACHE_BYTES = Arrays.copyOf(CACHE_BYTES, idx + 2 + checkpointBytes.length); + CACHE_BYTES[idx] = (byte) id; + CACHE_BYTES[idx + 1] = (byte) checkpointBytes.length; + System.arraycopy(checkpointBytes, 0, CACHE_BYTES, idx + 2, checkpointBytes.length); + + return id; + } + + public byte[] getBytes() { + byte[] idBytes = id.getBytes(); + + // long ids will be truncated + int idLen = Math.min(idBytes.length, MAX_ID_SIZE); + ByteBuffer buffer = ByteBuffer.allocate(1 + Long.BYTES + 1 + idLen).order(ByteOrder.BIG_ENDIAN); + + buffer.put((byte) checkpointId); + buffer.putLong(timestamp); + buffer.put((byte) idLen); + buffer.put(idBytes, 0, idLen); + + return buffer.array(); + } + + // @VisibleForTesting + static void resetCache() { + synchronized (CACHE) { + CACHE.clear(); + CACHE_BYTES = new byte[0]; + } + } + + public static byte[] getCheckpointIdCacheBytes() { + return CACHE_BYTES.clone(); + } +} diff --git a/internal-api/src/main/java/datadog/trace/bootstrap/instrumentation/api/AgentTracer.java b/internal-api/src/main/java/datadog/trace/bootstrap/instrumentation/api/AgentTracer.java index a6758d048a5..fb3eb9f853b 100644 --- a/internal-api/src/main/java/datadog/trace/bootstrap/instrumentation/api/AgentTracer.java +++ b/internal-api/src/main/java/datadog/trace/bootstrap/instrumentation/api/AgentTracer.java @@ -6,6 +6,7 @@ import datadog.trace.api.EndpointTracker; import datadog.trace.api.TraceConfig; import datadog.trace.api.datastreams.AgentDataStreamsMonitoring; +import datadog.trace.api.datastreams.DataStreamsTransactionExtractor; import datadog.trace.api.datastreams.NoopDataStreamsMonitoring; import datadog.trace.api.experimental.DataStreamsCheckpointer; import datadog.trace.api.gateway.CallbackProvider; @@ -811,5 +812,10 @@ public List getSpanSamplingRules() { public List getTraceSamplingRules() { return Collections.emptyList(); } + + @Override + public List getDataStreamsTransactionExtractors() { + return null; + } } } diff --git a/internal-api/src/main/java/datadog/trace/bootstrap/instrumentation/api/Tags.java b/internal-api/src/main/java/datadog/trace/bootstrap/instrumentation/api/Tags.java index 3eaa1e292cc..20629daf051 100644 --- a/internal-api/src/main/java/datadog/trace/bootstrap/instrumentation/api/Tags.java +++ b/internal-api/src/main/java/datadog/trace/bootstrap/instrumentation/api/Tags.java @@ -174,4 +174,6 @@ public class Tags { public static final String LLMOBS_TOOL_SPAN_KIND = "tool"; public static final String LLMOBS_EMBEDDING_SPAN_KIND = "embedding"; public static final String LLMOBS_RETRIEVAL_SPAN_KIND = "retrieval"; + public static final String DSM_TRANSACTION_ID = "dsm.transaction.id"; + public static final String DSM_TRANSACTION_CHECKPOINT = "dsm.transaction.checkpoint"; } diff --git a/internal-api/src/test/groovy/datadog/trace/api/datastreams/TransactionInfoTest.groovy b/internal-api/src/test/groovy/datadog/trace/api/datastreams/TransactionInfoTest.groovy new file mode 100644 index 00000000000..452f7167bd8 --- /dev/null +++ b/internal-api/src/test/groovy/datadog/trace/api/datastreams/TransactionInfoTest.groovy @@ -0,0 +1,65 @@ +package datadog.trace.api.datastreams + +import datadog.trace.api.Pair +import spock.lang.Specification + +class TransactionInfoTest extends Specification { + + def "test checkpoint id cache serialization multiple"() { + given: + TransactionInfo.resetCache() + def controlSize = 10 + // generate multiple transaction ids to trigger cache updates + for (int i = 0; i < controlSize; i++) { + new TransactionInfo("id " + i, i, "checkpoint " + i) + } + + def items = new LinkedList>() + // get cache data + def data = TransactionInfo.getCheckpointIdCacheBytes() + def i = 0 + while (i < data.size()) { + def id = data[i] + i++ + + def size = data[i] + i++ + + def str = new String(data, i, size) + i += size + + items.add(Pair.of(str, id) as Pair) + } + + expect: + items.size() == controlSize + + for (def item in items) { + item.left == "Checkpoint " + item.right + } + } + + def "test checkpoint id cache serialization"() { + given: + TransactionInfo.resetCache() + new TransactionInfo("id", 1, "checkpoint") + def bytes = TransactionInfo.getCheckpointIdCacheBytes() + expect: + bytes.size() == 12 + bytes == new byte[] { + 1, 10, 99, 104, 101, 99, 107, 112, 111, 105, 110, 116 + } + } + + def "test transaction id serialization"() { + given: + TransactionInfo.resetCache() + def test = new TransactionInfo("id", 1, "checkpoint") + def bytes = test.getBytes() + expect: + bytes.size() == 12 + bytes == new byte[] { + 1, 0, 0, 0, 0, 0, 0, 0, 1, 2, 105, 100 + } + } +} diff --git a/metadata/supported-configurations.json b/metadata/supported-configurations.json index c270af1c7d3..bdba0282da7 100644 --- a/metadata/supported-configurations.json +++ b/metadata/supported-configurations.json @@ -1089,6 +1089,14 @@ "aliases": [] } ], + "DD_DATA_STREAMS_TRANSACTION_EXTRACTORS": [ + { + "version": "A", + "type": "string", + "default": null, + "aliases": [] + } + ], "DD_DBM_INJECT_SQL_BASEHASH": [ { "version": "A",