diff --git a/braintrust-sdk/src/main/java/dev/braintrust/config/BraintrustConfig.java b/braintrust-sdk/src/main/java/dev/braintrust/config/BraintrustConfig.java
index e084b628..0fa42c29 100644
--- a/braintrust-sdk/src/main/java/dev/braintrust/config/BraintrustConfig.java
+++ b/braintrust-sdk/src/main/java/dev/braintrust/config/BraintrustConfig.java
@@ -43,6 +43,15 @@ public final class BraintrustConfig extends BaseConfig {
/** compress otel data before exporting to braintrust */
private final Boolean compressOtelPayload = getConfig("BRAINTRUST_COMPRESS_OTEL_PAYLOAD", true);
+ /**
+ * When {@code true} (the default), the Braintrust span processor automatically extracts base64
+ * data-URI attachments from LLM conversation spans, uploads them to object storage, and
+ * replaces the inline data with a lightweight reference. This reduces the size of spans
+ * exported to Braintrust.
+ */
+ private final Boolean autoConvertAIAttachments =
+ getConfig("BRAINTRUST_AUTO_CONVERT_AI_ATTACHMENTS", true);
+
/** Custom SSL context for OTLP exporter. Builder-only field, not backed by envars. */
private final SSLContext sslContext;
@@ -198,6 +207,11 @@ public Builder compressOtelPayload(boolean value) {
return this;
}
+ public Builder autoConvertAIAttachments(boolean value) {
+ envOverrides.put("BRAINTRUST_AUTO_CONVERT_AI_ATTACHMENTS", String.valueOf(value));
+ return this;
+ }
+
public Builder sslContext(SSLContext value) {
this.sslContext = value;
return this;
diff --git a/braintrust-sdk/src/main/java/dev/braintrust/trace/AttachmentProcessor.java b/braintrust-sdk/src/main/java/dev/braintrust/trace/AttachmentProcessor.java
new file mode 100644
index 00000000..ed25a46d
--- /dev/null
+++ b/braintrust-sdk/src/main/java/dev/braintrust/trace/AttachmentProcessor.java
@@ -0,0 +1,177 @@
+package dev.braintrust.trace;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.fasterxml.jackson.databind.node.TextNode;
+import dev.braintrust.config.BraintrustConfig;
+import dev.braintrust.json.BraintrustJsonMapper;
+import java.time.Duration;
+import java.util.Base64;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import lombok.SneakyThrows;
+import lombok.extern.slf4j.Slf4j;
+
+/**
+ * Scans JSON content for base64 data URI attachments and replaces them with attachment references
+ * after uploading to S3.
+ *
+ *
Package-private; not exposed in the public API.
+ */
+@Slf4j
+class AttachmentProcessor {
+ /**
+ * quick heuristic to determine if the json payload contains a base64 encoded file
+ *
+ *
This is used for performance reasons as a fail-fast to avoid doing a json parse.
+ */
+ static final Pattern BASE64_DATA_URI_PATTERN =
+ Pattern.compile("data:([\\w/\\-.+]+);base64,([A-Za-z0-9+/=]{20,})");
+
+ private final BraintrustConfig config;
+ private final AttachmentUploader uploader;
+
+ AttachmentProcessor(BraintrustConfig config, AttachmentUploader uploader) {
+ this.config = config;
+ this.uploader = uploader;
+ }
+
+ /**
+ * Scans a JSON string for base64 data URIs, uploads them, and returns the modified JSON with
+ * attachment references.
+ *
+ * @param json the JSON string to scan
+ * @return the modified JSON with base64 data replaced by attachment references, or the original
+ * JSON if no base64 data was found
+ */
+ String processAndUpload(String json) {
+ if ((!config.autoConvertAIAttachments())
+ || json == null
+ || uploader.isShutdown()
+ || !BASE64_DATA_URI_PATTERN.matcher(json).find()) {
+ return json;
+ }
+
+ try {
+ JsonNode root = BraintrustJsonMapper.get().readTree(json);
+ AtomicBoolean modified = new AtomicBoolean(false);
+ JsonNode result = replaceBase64Attachments(root, modified);
+ return modified.get() ? BraintrustJsonMapper.get().writeValueAsString(result) : json;
+ } catch (UploaderRejectionException e) {
+ log.debug(
+ "attachment uploader rejected job. Proceeding without attachment"
+ + " replacements.");
+ return json;
+ } catch (Exception | StackOverflowError e) {
+ log.info("uploader optimization failed, falling back to span uploads", e);
+ uploader.shutdown(Duration.ofSeconds(0)); // don't block
+ return json;
+ }
+ }
+
+ // NOTE: not concerned with recursion blowing the stack because we're mutating AI vendor
+ // messages which are not deep enough for this to be an issue.
+ private JsonNode replaceBase64Attachments(JsonNode node, AtomicBoolean modified) {
+ if (node.isTextual()) {
+ return replaceInText((TextNode) node, modified);
+ } else if (node.isObject()) {
+ ObjectNode objectNode = (ObjectNode) node;
+ ObjectNode result = BraintrustJsonMapper.get().createObjectNode();
+ var fieldNames = objectNode.fieldNames();
+ while (fieldNames.hasNext()) {
+ String fieldName = fieldNames.next();
+ JsonNode child = objectNode.get(fieldName);
+ result.set(fieldName, replaceBase64Attachments(child, modified));
+ }
+ return result;
+ } else if (node.isArray()) {
+ ArrayNode arrayNode = (ArrayNode) node;
+ ArrayNode result = BraintrustJsonMapper.get().createArrayNode();
+ for (int i = 0; i < arrayNode.size(); i++) {
+ result.add(replaceBase64Attachments(arrayNode.get(i), modified));
+ }
+ return result;
+ }
+ return node;
+ }
+
+ @SneakyThrows
+ private JsonNode replaceInText(TextNode textNode, AtomicBoolean modified) {
+ String value = textNode.asText();
+ Matcher matcher = BASE64_DATA_URI_PATTERN.matcher(value);
+ if (!matcher.find()) {
+ return textNode;
+ }
+ if (!isEntirelyDataUri(value)) {
+ log.debug("found base64 string but text contained extra content {}", value);
+ return textNode;
+ }
+
+ matcher.reset();
+ StringBuilder sb = new StringBuilder();
+ while (matcher.find()) {
+ String contentType = matcher.group(1);
+ String base64Data = matcher.group(2);
+ byte[] data = Base64.getDecoder().decode(base64Data);
+
+ String extension = contentTypeToExtension(contentType);
+ String filename = "attachment" + extension;
+ AttachmentReference ref = AttachmentReference.create(filename, contentType);
+
+ if (!uploader.enqueue(ref, data)) {
+ throw new UploaderRejectionException("uploader rejected attachment upload");
+ }
+
+ matcher.appendReplacement(sb, Matcher.quoteReplacement(ref.toJson()));
+ }
+ matcher.appendTail(sb);
+
+ modified.set(true);
+
+ return BraintrustJsonMapper.get().readTree(sb.toString());
+ }
+
+ static boolean isEntirelyDataUri(String value) {
+ String trimmed = value.trim();
+ return trimmed.startsWith("data:")
+ && !trimmed.contains("\"")
+ && !trimmed.contains("\\")
+ && !trimmed.contains(" ");
+ }
+
+ private static String contentTypeToExtension(String contentType) {
+ switch (contentType.toLowerCase()) {
+ case "image/png":
+ return ".png";
+ case "image/jpeg":
+ case "image/jpg":
+ return ".jpg";
+ case "image/gif":
+ return ".gif";
+ case "image/webp":
+ return ".webp";
+ case "image/svg+xml":
+ return ".svg";
+ case "application/pdf":
+ return ".pdf";
+ case "text/plain":
+ return ".txt";
+ case "application/json":
+ return ".json";
+ default:
+ String[] parts = contentType.split("/");
+ if (parts.length == 2) {
+ return "." + parts[1].split("[;\\-]")[0];
+ }
+ return "";
+ }
+ }
+
+ private static class UploaderRejectionException extends RuntimeException {
+ public UploaderRejectionException(String message) {
+ super(message);
+ }
+ }
+}
diff --git a/braintrust-sdk/src/main/java/dev/braintrust/trace/AttachmentReference.java b/braintrust-sdk/src/main/java/dev/braintrust/trace/AttachmentReference.java
new file mode 100644
index 00000000..f7e43b42
--- /dev/null
+++ b/braintrust-sdk/src/main/java/dev/braintrust/trace/AttachmentReference.java
@@ -0,0 +1,43 @@
+package dev.braintrust.trace;
+
+import java.util.Objects;
+import java.util.UUID;
+import javax.annotation.Nonnull;
+
+/**
+ * Represents an attachment reference stored on a span in place of uploaded attachment data.
+ *
+ *
Its shape intentionally matches the cross-SDK Braintrust attachment reference format.
+ */
+record AttachmentReference(
+ @Nonnull String type,
+ @Nonnull String filename,
+ @Nonnull String contentType,
+ @Nonnull String key) {
+
+ private static final String DEFAULT_TYPE = "braintrust_attachment";
+
+ /**
+ * Creates an attachment reference with a generated UUID key.
+ *
+ * @param filename the display filename for the attachment
+ * @param contentType the MIME type of the attachment content
+ * @return a new AttachmentReference with a unique key
+ */
+ static AttachmentReference create(@Nonnull String filename, @Nonnull String contentType) {
+ Objects.requireNonNull(filename, "filename cannot be null");
+ Objects.requireNonNull(contentType, "contentType cannot be null");
+ return new AttachmentReference(
+ DEFAULT_TYPE, filename, contentType, UUID.randomUUID().toString());
+ }
+
+ public String toJson() {
+ return "{\"type\":\"braintrust_attachment\",\"content_type\":\""
+ + contentType()
+ + "\",\"filename\":\""
+ + filename()
+ + "\",\"key\":\""
+ + key()
+ + "\"}";
+ }
+}
diff --git a/braintrust-sdk/src/main/java/dev/braintrust/trace/AttachmentUploader.java b/braintrust-sdk/src/main/java/dev/braintrust/trace/AttachmentUploader.java
new file mode 100644
index 00000000..9c7aa7d3
--- /dev/null
+++ b/braintrust-sdk/src/main/java/dev/braintrust/trace/AttachmentUploader.java
@@ -0,0 +1,614 @@
+package dev.braintrust.trace;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import dev.braintrust.api.BraintrustOpenApiClient;
+import dev.braintrust.json.BraintrustJsonMapper;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.http.HttpRequest;
+import java.net.http.HttpResponse;
+import java.time.Duration;
+import java.util.Map;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicReference;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import lombok.SneakyThrows;
+import lombok.extern.slf4j.Slf4j;
+
+/**
+ * Uploads Braintrust attachments in the background.
+ *
+ *
Implementations accept attachment data via {@link #enqueue}, process them asynchronously, and
+ * support graceful shutdown with {@link #shutdown}.
+ */
+interface AttachmentUploader {
+ /**
+ * Enqueues an attachment for upload.
+ *
+ *
NOTE: if the upload queue is full, this method will block until space becomes available
+ *
+ * @param reference the attachment reference metadata
+ * @param data the attachment data to upload
+ * @return true if the attachment was successfully enqueued for upload. False if the uploader
+ * declined to enqueue the message
+ */
+ boolean enqueue(@Nonnull AttachmentReference reference, @Nonnull byte[] data);
+
+ /** runs force flush with a default timeout */
+ default void forceFlush() {
+ forceFlush(Duration.ofSeconds(30));
+ }
+
+ /**
+ * Waits for all currently enqueued uploads to complete with a timeout.
+ *
+ *
Concurrency note: Items enqueued concurrently with or after this call are not
+ * guaranteed to be included. This is safe because callers that need ordering should enqueue
+ * first, then flush.
+ *
+ * @param timeout the maximum time to wait
+ * @return true if all uploads completed, false if timed out
+ */
+ boolean forceFlush(@Nonnull Duration timeout);
+
+ /** runs shutdown with a default timeout */
+ default void shutdown() {
+ // dropping s3 uploads is a bad user experience so shut down with a gracious timeout
+ shutdown(Duration.ofSeconds(120));
+ }
+
+ /**
+ * Shuts down the uploader with a custom timeout.
+ *
+ * @param timeout the maximum time to wait for pending uploads
+ */
+ void shutdown(@Nonnull Duration timeout);
+
+ boolean isShutdown();
+
+ /**
+ * Background uploader for Braintrust attachments that uploads to S3 via signed URLs.
+ *
+ *
Uploads are enqueued and processed by a single-threaded worker that:
+ *
+ *
+ * - Requests a signed upload URL from the Braintrust API
+ *
- Uploads the data to the signed URL
+ *
- Reports the upload status (done/error) to the Braintrust API
+ *
+ *
+ * The uploader starts lazily on first enqueue and can be shut down gracefully.
+ */
+ @Slf4j
+ class S3AttachmentUploader implements AttachmentUploader {
+ private static final int QUEUE_SIZE = 1024;
+
+ /** Default per-request timeout for HTTP calls. */
+ private static final Duration DEFAULT_REQUEST_TIMEOUT = Duration.ofSeconds(60);
+
+ /** Default maximum number of retry attempts for transient failures. */
+ private static final int DEFAULT_MAX_RETRIES = 8;
+
+ /** Default initial backoff delay between retries. Doubles on each subsequent attempt. */
+ private static final Duration DEFAULT_INITIAL_RETRY_DELAY = Duration.ofMillis(500);
+
+ private final BraintrustOpenApiClient apiClient;
+ private final Duration requestTimeout;
+ private final int maxRetries;
+ private final Duration initialRetryDelay;
+
+ private final LinkedBlockingQueue queue;
+ private final AtomicReference worker = new AtomicReference<>();
+ private final AtomicReference orgId = new AtomicReference<>();
+
+ // non thread safe fields must be checked and read under the lock
+ private final Object lock = new Object();
+ private boolean rejectNewJobs = false;
+ private boolean workerDone = false;
+ private CountDownLatch currentBatch = new CountDownLatch(1);
+
+ /**
+ * Creates a new attachment uploader with default retry settings.
+ *
+ * @param apiClient the Braintrust API client (provides auth, base URL, and HTTP transport)
+ */
+ S3AttachmentUploader(@Nonnull BraintrustOpenApiClient apiClient) {
+ this(
+ apiClient,
+ DEFAULT_REQUEST_TIMEOUT,
+ DEFAULT_MAX_RETRIES,
+ DEFAULT_INITIAL_RETRY_DELAY);
+ }
+
+ /**
+ * Creates a new attachment uploader with custom retry settings.
+ *
+ * @param apiClient the Braintrust API client (provides auth, base URL, and HTTP transport)
+ * @param requestTimeout the per-request timeout for HTTP calls
+ * @param maxRetries the maximum number of retry attempts for transient failures
+ * @param initialRetryDelay the initial backoff delay between retries (doubles on each
+ * attempt)
+ */
+ S3AttachmentUploader(
+ @Nonnull BraintrustOpenApiClient apiClient,
+ @Nonnull Duration requestTimeout,
+ int maxRetries,
+ @Nonnull Duration initialRetryDelay) {
+ if (requestTimeout.toMillis() < 0) {
+ throw new IllegalArgumentException("requestTimeout must be >= 0");
+ }
+ if (maxRetries <= 0) {
+ throw new IllegalArgumentException("maxRetries must be > 0");
+ }
+ if (initialRetryDelay.toMillis() < 0) {
+ throw new IllegalArgumentException("initialRetryDelay must be >= 0");
+ }
+ this.apiClient = apiClient;
+ this.requestTimeout = requestTimeout;
+ this.maxRetries = maxRetries;
+ this.initialRetryDelay = initialRetryDelay;
+ this.queue = new LinkedBlockingQueue<>(QUEUE_SIZE);
+ BraintrustShutdownHook.addShutdownHook(
+ BraintrustShutdownHook.ShutdownOrder.ATTACHMENT_UPLOADER, this::shutdown);
+ }
+
+ @Override
+ public boolean enqueue(@Nonnull AttachmentReference reference, @Nonnull byte[] data) {
+ if (checkRejectNewJobsThreadSafe()) {
+ return false;
+ }
+ try {
+ ensureWorkerStarted();
+ UploadJob job = new UploadJob(reference, data);
+ return queue.offer(job, 0, TimeUnit.MILLISECONDS);
+ } catch (Exception e) {
+ log.error("failed to enqueue attachment", e);
+ shutdown();
+ return false;
+ }
+ }
+
+ @Override
+ @SneakyThrows
+ public boolean forceFlush(@Nonnull Duration timeout) {
+ return awaitCurrentBatch(timeout.toMillis(), TimeUnit.MILLISECONDS);
+ }
+
+ @Override
+ @SneakyThrows
+ public void shutdown(@Nonnull Duration timeout) {
+ synchronized (lock) {
+ rejectNewJobs = true;
+ if (workerDone) {
+ return;
+ }
+ }
+ ExecutorService executor = worker.getAndSet(null);
+ if (executor == null) {
+ return;
+ }
+ executor.shutdown();
+ if (!executor.awaitTermination(timeout.toMillis(), TimeUnit.MILLISECONDS)) {
+ log.warn("failed to gracefully shut down s3 upload worker");
+ executor.shutdownNow();
+ }
+ }
+
+ @Override
+ public boolean isShutdown() {
+ return checkRejectNewJobsThreadSafe();
+ }
+
+ // ── Worker lifecycle ──────────────────────────────────────────────
+
+ /**
+ * start worker thread or do nothing if already started
+ *
+ * calling this method does not require the lock
+ */
+ private void ensureWorkerStarted() {
+ if (worker.get() == null) {
+ var newWorker =
+ Executors.newSingleThreadExecutor(
+ r -> {
+ Thread t = new Thread(r, "braintrust-attachment-uploader");
+ t.setDaemon(true);
+ return t;
+ });
+ if (worker.compareAndSet(null, newWorker)) {
+ // NOTE: if shutdown is called concurrently job submission may throw an
+ // exception. This is fine.
+ newWorker.submit(this::workerLoop);
+ } else {
+ // tried to start the worker concurrently. This is fine, we'll just shut down
+ // and dereference the redundant worker
+ newWorker.shutdown();
+ }
+ }
+ }
+
+ private void workerLoop() {
+ log.debug("Attachment uploader worker started");
+ while ((!checkRejectNewJobsThreadSafe()) || queue.peek() != null) {
+ UploadJob job = null;
+ try {
+ job = queue.poll(100, TimeUnit.MILLISECONDS);
+ if (job == null) {
+ finishCurrentBatch();
+ } else {
+ upload(job);
+ }
+ } catch (InterruptedException e) {
+ // worker thread shutdownNow was invoked
+ if (!queue.isEmpty()) {
+ log.warn(
+ "s3 uploader force shutdown was reached. Dropping {} uploads",
+ queue.size(),
+ e);
+ }
+ break;
+ } catch (Exception e) {
+ // this only user of this util is our span processor so we'll just fall back to
+ // sending attachments in span data if an error occurs
+ synchronized (lock) {
+ rejectNewJobs = true;
+ }
+ if (job == null) {
+ log.warn("Failed to upload attachment", e);
+ } else {
+ log.warn("Failed to upload attachment key={}", job.reference().key(), e);
+ reportStatus(job.reference().key(), "error", e.getMessage());
+ }
+ // NOTE: we'll continue the loop attempting uploads of the remaining jobs until
+ // the queue is drained
+ }
+ }
+ synchronized (lock) {
+ workerDone = true;
+ finishCurrentBatch();
+ log.debug("Attachment uploader worker stopped");
+ }
+ }
+
+ private boolean checkRejectNewJobsThreadSafe() {
+ synchronized (lock) {
+ return rejectNewJobs;
+ }
+ }
+
+ // ── Upload orchestration ──────────────────────────────────────────
+
+ private void upload(@Nonnull UploadJob job) throws IOException, InterruptedException {
+ UploadUrlResponse urlResponse =
+ requestUploadUrl(
+ getOrgId(),
+ job.reference().key(),
+ job.reference().filename(),
+ job.reference().contentType());
+
+ uploadToSignedUrl(
+ urlResponse.signedUrl(),
+ urlResponse.headers(),
+ job.reference().contentType(),
+ job.data());
+
+ reportStatus(job.reference().key(), "done", null);
+ }
+
+ private String getOrgId() {
+ if (orgId.get() != null) {
+ return orgId.get();
+ }
+ return orgId.updateAndGet(curr -> curr != null ? curr : resolveOrgId());
+ }
+
+ private String resolveOrgId() {
+ try {
+ var loginResponse = apiClient.login();
+ if (loginResponse.orgInfo() != null && !loginResponse.orgInfo().isEmpty()) {
+ return loginResponse.orgInfo().get(0).id();
+ } else {
+ throw new IllegalStateException("No org info returned from login");
+ }
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to resolve org ID for attachment upload", e);
+ }
+ }
+
+ private void reportStatus(
+ @Nonnull String key, @Nonnull String status, @Nullable String errorMessage) {
+ try {
+ var statusMap = new java.util.HashMap();
+ statusMap.put("upload_status", status);
+ if (errorMessage != null) {
+ statusMap.put("error_message", errorMessage);
+ }
+ updateUploadStatus(getOrgId(), key, statusMap);
+ } catch (Exception e) {
+ log.warn("Failed to report attachment status key={} status={}", key, status, e);
+ }
+ }
+
+ // ── S3 HTTP operations ────────────────────────────────────────────
+
+ /**
+ * Requests a signed upload URL from the Braintrust API.
+ *
+ * @param orgId the organization ID
+ * @param key the attachment key
+ * @param filename the filename for the attachment
+ * @param contentType the MIME type of the attachment
+ * @return the signed URL response with upload URL and required headers
+ * @throws IOException if the request fails
+ * @throws InterruptedException if the request is interrupted
+ */
+ UploadUrlResponse requestUploadUrl(
+ @Nonnull String orgId,
+ @Nonnull String key,
+ @Nonnull String filename,
+ @Nonnull String contentType)
+ throws IOException, InterruptedException {
+
+ var requestBody =
+ BraintrustJsonMapper.get()
+ .writeValueAsString(
+ new UploadUrlRequest(key, filename, contentType, orgId));
+
+ var requestBuilder =
+ HttpRequest.newBuilder()
+ .uri(toUri(apiClient.getBaseUri() + "/attachment"))
+ .timeout(requestTimeout)
+ .header("Content-Type", "application/json")
+ .header("Accept", "application/json")
+ .POST(HttpRequest.BodyPublishers.ofString(requestBody));
+
+ if (apiClient.getRequestInterceptor() != null) {
+ apiClient.getRequestInterceptor().accept(requestBuilder);
+ }
+
+ HttpResponse response =
+ sendWithRetry(requestBuilder.build(), HttpResponse.BodyHandlers.ofString());
+
+ if (!isSuccessStatus(response.statusCode())) {
+ throw new IOException(
+ "Failed to request upload URL: HTTP "
+ + response.statusCode()
+ + " - "
+ + response.body());
+ }
+
+ UploadUrlResponse result =
+ BraintrustJsonMapper.get().readValue(response.body(), UploadUrlResponse.class);
+
+ if (result.signedUrl() == null || result.signedUrl().isEmpty()) {
+ throw new IOException("Signed URL response missing signedUrl");
+ }
+
+ return result;
+ }
+
+ /**
+ * Uploads data to a signed URL with the specified headers.
+ *
+ * Automatically detects Azure Blob Storage URLs and adds the required {@code
+ * x-ms-blob-type: BlockBlob} header.
+ *
+ * @param signedUrl the signed upload URL
+ * @param headers additional headers to include in the upload request
+ * @param contentType the MIME type of the data being uploaded
+ * @param data the data to upload
+ * @throws IOException if the upload fails
+ * @throws InterruptedException if the upload is interrupted
+ */
+ void uploadToSignedUrl(
+ @Nonnull String signedUrl,
+ @Nonnull Map headers,
+ @Nonnull String contentType,
+ @Nonnull byte[] data)
+ throws IOException, InterruptedException {
+
+ var requestBuilder =
+ HttpRequest.newBuilder()
+ .uri(toUri(signedUrl))
+ .timeout(requestTimeout)
+ .header("Content-Type", contentType)
+ .PUT(HttpRequest.BodyPublishers.ofByteArray(data));
+
+ for (var entry : headers.entrySet()) {
+ requestBuilder.header(entry.getKey(), entry.getValue());
+ }
+
+ addAzureBlobHeaders(signedUrl, requestBuilder);
+
+ HttpResponse response =
+ sendWithRetry(requestBuilder.build(), HttpResponse.BodyHandlers.ofString());
+
+ if (!isSuccessStatus(response.statusCode())) {
+ throw new IOException(
+ "Failed to upload to object store: HTTP "
+ + response.statusCode()
+ + " - "
+ + response.body());
+ }
+ }
+
+ /**
+ * Updates the upload status for an attachment.
+ *
+ * @param orgId the organization ID
+ * @param key the attachment key
+ * @param status the status map (e.g., {"upload_status": "done"} or {"upload_status":
+ * "error", "error_message": "..."})
+ * @throws IOException if the request fails
+ * @throws InterruptedException if the request is interrupted
+ */
+ void updateUploadStatus(
+ @Nonnull String orgId, @Nonnull String key, @Nonnull Map status)
+ throws IOException, InterruptedException {
+
+ var requestBody =
+ BraintrustJsonMapper.get()
+ .writeValueAsString(new StatusRequest(key, orgId, status));
+
+ var requestBuilder =
+ HttpRequest.newBuilder()
+ .uri(toUri(apiClient.getBaseUri() + "/attachment/status"))
+ .timeout(requestTimeout)
+ .header("Content-Type", "application/json")
+ .POST(HttpRequest.BodyPublishers.ofString(requestBody));
+
+ if (apiClient.getRequestInterceptor() != null) {
+ apiClient.getRequestInterceptor().accept(requestBuilder);
+ }
+
+ HttpResponse response =
+ sendWithRetry(requestBuilder.build(), HttpResponse.BodyHandlers.ofString());
+
+ if (!isSuccessStatus(response.statusCode())) {
+ throw new IOException(
+ "Failed to update upload status: HTTP "
+ + response.statusCode()
+ + " - "
+ + response.body());
+ }
+ }
+
+ // ── HTTP helpers ──────────────────────────────────────────────────
+
+ /** Returns {@code true} if the HTTP status code indicates success (2xx). */
+ private static boolean isSuccessStatus(int statusCode) {
+ return statusCode >= 200 && statusCode < 300;
+ }
+
+ /**
+ * Parses a URI string.
+ *
+ * @param uriString the string to parse
+ * @return the parsed URI
+ */
+ @SneakyThrows
+ private static URI toUri(@Nonnull String uriString) {
+ return new URI(uriString);
+ }
+
+ /**
+ * Sends an HTTP request with retry logic for transient failures.
+ *
+ * Retries on 5xx server errors and {@link IOException} (network errors) up to {@link
+ * #maxRetries} times with exponential backoff starting at {@link #initialRetryDelay}.
+ *
+ * @param request the request to send
+ * @param bodyHandler the response body handler
+ * @param the response body type
+ * @return the HTTP response
+ * @throws IOException if all retries are exhausted
+ * @throws InterruptedException if the thread is interrupted during retry backoff
+ */
+ private HttpResponse sendWithRetry(
+ @Nonnull HttpRequest request, @Nonnull HttpResponse.BodyHandler bodyHandler)
+ throws IOException, InterruptedException {
+
+ IOException lastException = null;
+ long backoffMs = initialRetryDelay.toMillis();
+
+ for (int attempt = 0; attempt <= maxRetries; attempt++) {
+ if (attempt > 0) {
+ log.debug(
+ "Retrying request {} (attempt {}/{})",
+ request.uri(),
+ attempt,
+ maxRetries);
+ Thread.sleep(backoffMs);
+ backoffMs *= 2;
+ }
+
+ HttpResponse response;
+ try {
+ response = apiClient.getHttpClient().send(request, bodyHandler);
+ } catch (IOException e) {
+ lastException = e;
+ log.debug("Request to {} failed with IOException", request.uri(), e);
+ continue;
+ }
+
+ // Don't retry client errors (4xx) or successes
+ if (response.statusCode() < 500) {
+ return response;
+ }
+
+ // Server error (5xx) — retry
+ lastException = new IOException("Server error: HTTP " + response.statusCode());
+ log.debug(
+ "Request to {} returned status {}, will retry",
+ request.uri(),
+ response.statusCode());
+ }
+
+ throw new IOException(
+ "Request to " + request.uri() + " failed after " + maxRetries + " retries",
+ lastException);
+ }
+
+ /** Adds Azure Blob Storage specific headers when the signed URL points to Azure. */
+ private static void addAzureBlobHeaders(
+ @Nonnull String signedUrl, HttpRequest.Builder requestBuilder) {
+ try {
+ var uri = new URI(signedUrl);
+ String host = uri.getHost();
+ if (host != null && host.endsWith(".blob.core.windows.net")) {
+ requestBuilder.header("x-ms-blob-type", "BlockBlob");
+ }
+ } catch (URISyntaxException e) {
+ log.warn("Failed to parse signed URL for Azure detection: {}", signedUrl, e);
+ }
+ }
+
+ // ── Batch coordination ────────────────────────────────────────────
+
+ private void finishCurrentBatch() {
+ synchronized (lock) {
+ currentBatch.countDown();
+ currentBatch = new CountDownLatch(1);
+ }
+ }
+
+ private boolean awaitCurrentBatch(long timeout, TimeUnit timeUnit)
+ throws InterruptedException {
+ CountDownLatch latch;
+ synchronized (lock) {
+ latch = currentBatch;
+ }
+ return latch.await(timeout, timeUnit);
+ }
+
+ // ── DTOs ──────────────────────────────────────────────────────────
+
+ private record UploadJob(AttachmentReference reference, byte[] data) {}
+
+ /** Response from requesting a signed upload URL. */
+ record UploadUrlResponse(
+ @JsonProperty("signedUrl") @Nonnull String signedUrl,
+ @JsonProperty("headers") @Nonnull Map headers) {
+
+ /** Compact constructor that enforces non-null headers with an empty map default. */
+ UploadUrlResponse {
+ if (headers == null) {
+ headers = Map.of();
+ }
+ }
+ }
+
+ private record UploadUrlRequest(
+ @Nonnull String key,
+ @Nonnull String filename,
+ @JsonProperty("content_type") @Nonnull String contentType,
+ @JsonProperty("org_id") @Nonnull String orgId) {}
+
+ private record StatusRequest(
+ @Nonnull String key,
+ @JsonProperty("org_id") @Nonnull String orgId,
+ @Nonnull Map status) {}
+ }
+}
diff --git a/braintrust-sdk/src/main/java/dev/braintrust/trace/BraintrustShutdownHook.java b/braintrust-sdk/src/main/java/dev/braintrust/trace/BraintrustShutdownHook.java
index a6f19910..824f2003 100644
--- a/braintrust-sdk/src/main/java/dev/braintrust/trace/BraintrustShutdownHook.java
+++ b/braintrust-sdk/src/main/java/dev/braintrust/trace/BraintrustShutdownHook.java
@@ -6,6 +6,14 @@
import java.util.concurrent.CopyOnWriteArrayList;
class BraintrustShutdownHook {
+
+ /** Shutdown ordering. Lower ordinal values run first. */
+ enum ShutdownOrder {
+ SPAN_PROCESSOR,
+ ATTACHMENT_UPLOADER,
+ TEST_HARNESS;
+ }
+
private record OrderedTarget(int order, Runnable target) {}
private static final List shutdownTargets = new CopyOnWriteArrayList<>();
@@ -22,10 +30,21 @@ private record OrderedTarget(int order, Runnable target) {}
}
public static void addShutdownHook(Runnable target) {
- addShutdownHook(0, target);
+ addShutdownHook(ShutdownOrder.SPAN_PROCESSOR, target);
+ }
+
+ public static void addShutdownHook(ShutdownOrder order, Runnable target) {
+ addShutdownHook(order.ordinal(), target);
}
- public static void addShutdownHook(int order, Runnable target) {
+ /**
+ * Add a jvm shutdown hook.
+ *
+ * @param order lower numbers run first. targets with the same order number can run in any
+ * order. Span processor/exporter flush runs at level 0
+ * @param target the shutdown code to run
+ */
+ private static void addShutdownHook(int order, Runnable target) {
shutdownTargets.add(new OrderedTarget(order, target));
}
}
diff --git a/braintrust-sdk/src/main/java/dev/braintrust/trace/BraintrustSpanProcessor.java b/braintrust-sdk/src/main/java/dev/braintrust/trace/BraintrustSpanProcessor.java
index 532193bd..813a12c8 100644
--- a/braintrust-sdk/src/main/java/dev/braintrust/trace/BraintrustSpanProcessor.java
+++ b/braintrust-sdk/src/main/java/dev/braintrust/trace/BraintrustSpanProcessor.java
@@ -1,13 +1,19 @@
package dev.braintrust.trace;
+import dev.braintrust.api.BraintrustOpenApiClient;
import dev.braintrust.config.BraintrustConfig;
import io.opentelemetry.api.common.AttributeKey;
+import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.context.Context;
import io.opentelemetry.sdk.common.CompletableResultCode;
+import io.opentelemetry.sdk.common.InstrumentationLibraryInfo;
import io.opentelemetry.sdk.trace.ReadWriteSpan;
import io.opentelemetry.sdk.trace.ReadableSpan;
import io.opentelemetry.sdk.trace.SpanProcessor;
+import io.opentelemetry.sdk.trace.data.DelegatingSpanData;
+import io.opentelemetry.sdk.trace.data.SpanData;
import java.util.List;
+import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@@ -21,7 +27,10 @@
*/
@Slf4j
public class BraintrustSpanProcessor implements SpanProcessor {
- // Braintrust-specific attributes
+ static final AttributeKey INPUT_JSON = AttributeKey.stringKey("braintrust.input_json");
+ static final AttributeKey OUTPUT_JSON =
+ AttributeKey.stringKey("braintrust.output_json");
+
public static final AttributeKey PARENT =
AttributeKey.stringKey(BraintrustTracing.PARENT_KEY);
@@ -29,11 +38,17 @@ public class BraintrustSpanProcessor implements SpanProcessor {
private final SpanProcessor delegate;
private final List samplers;
private final ConcurrentMap parentContexts = new ConcurrentHashMap<>();
+ private final AttachmentProcessor attachmentProcessor;
BraintrustSpanProcessor(BraintrustConfig config, SpanProcessor delegate) {
this.config = config;
this.delegate = delegate;
this.samplers = buildSamplers(config);
+ this.attachmentProcessor =
+ new AttachmentProcessor(
+ config,
+ new AttachmentUploader.S3AttachmentUploader(
+ BraintrustOpenApiClient.of(config)));
}
private static List buildSamplers(BraintrustConfig config) {
@@ -113,12 +128,25 @@ public void onEnd(ReadableSpan span) {
return;
}
}
- delegate.onEnd(span);
+
+ var spanData = span.toSpanData();
+ @Nullable String inputJson = spanData.getAttributes().get(INPUT_JSON);
+ @Nullable String outputJson = spanData.getAttributes().get(OUTPUT_JSON);
+
+ @Nullable String newInputJson = attachmentProcessor.processAndUpload(inputJson);
+ @Nullable String newOutputJson = attachmentProcessor.processAndUpload(outputJson);
+
+ if (!Objects.equals(newInputJson, inputJson)
+ || !Objects.equals(newOutputJson, outputJson)) {
+ delegate.onEnd(new TransformedReadableSpan(span, newInputJson, newOutputJson));
+ } else {
+ delegate.onEnd(span);
+ }
}
@Override
public boolean isEndRequired() {
- return !samplers.isEmpty() || delegate.isEndRequired();
+ return config.autoConvertAIAttachments() || !samplers.isEmpty() || delegate.isEndRequired();
}
@Override
@@ -170,4 +198,93 @@ public static ParentContext experiment(String experimentId) {
return new ParentContext(null, experimentId, ParentType.EXPERIMENT);
}
}
+
+ /**
+ * otel java does not implement onEnding, so this is the most idiomatic way to mutate a span
+ * once it ends
+ */
+ private static class TransformedReadableSpan implements ReadableSpan {
+ private final ReadableSpan delegate;
+ private final Attributes attributes;
+
+ TransformedReadableSpan(ReadableSpan delegate, String inputJson, String outputJson) {
+ this.delegate = delegate;
+ var builder = delegate.getAttributes().toBuilder();
+ builder.put(INPUT_JSON, inputJson);
+ builder.put(OUTPUT_JSON, outputJson);
+ attributes = builder.build();
+ }
+
+ @Override
+ public Attributes getAttributes() {
+ return attributes;
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public T getAttribute(AttributeKey key) {
+ if (key.equals(INPUT_JSON)) {
+ return (T) attributes.get(INPUT_JSON);
+ }
+ if (key.equals(OUTPUT_JSON)) {
+ return (T) attributes.get(OUTPUT_JSON);
+ }
+ return delegate.getAttribute(key);
+ }
+
+ @Override
+ public SpanData toSpanData() {
+ return new DelegatingSpanData(delegate.toSpanData()) {
+ @Override
+ public io.opentelemetry.api.common.Attributes getAttributes() {
+ return TransformedReadableSpan.this.getAttributes();
+ }
+
+ @Override
+ public int getTotalAttributeCount() {
+ return getAttributes().size();
+ }
+ };
+ }
+
+ @Override
+ public String getName() {
+ return delegate.getName();
+ }
+
+ @Override
+ public io.opentelemetry.api.trace.SpanContext getSpanContext() {
+ return delegate.getSpanContext();
+ }
+
+ @Override
+ public boolean hasEnded() {
+ return delegate.hasEnded();
+ }
+
+ @Override
+ public io.opentelemetry.sdk.common.InstrumentationScopeInfo getInstrumentationScopeInfo() {
+ return delegate.getInstrumentationScopeInfo();
+ }
+
+ @Override
+ public InstrumentationLibraryInfo getInstrumentationLibraryInfo() {
+ return delegate.getInstrumentationLibraryInfo();
+ }
+
+ @Override
+ public long getLatencyNanos() {
+ return delegate.getLatencyNanos();
+ }
+
+ @Override
+ public io.opentelemetry.api.trace.SpanContext getParentSpanContext() {
+ return delegate.getParentSpanContext();
+ }
+
+ @Override
+ public io.opentelemetry.api.trace.SpanKind getKind() {
+ return delegate.getKind();
+ }
+ }
}
diff --git a/braintrust-sdk/src/main/java/dev/braintrust/trace/BraintrustTracing.java b/braintrust-sdk/src/main/java/dev/braintrust/trace/BraintrustTracing.java
index f38bd3da..c3574038 100644
--- a/braintrust-sdk/src/main/java/dev/braintrust/trace/BraintrustTracing.java
+++ b/braintrust-sdk/src/main/java/dev/braintrust/trace/BraintrustTracing.java
@@ -19,9 +19,12 @@
import io.opentelemetry.sdk.resources.Resource;
import io.opentelemetry.sdk.trace.SdkTracerProvider;
import io.opentelemetry.sdk.trace.SdkTracerProviderBuilder;
+import io.opentelemetry.sdk.trace.SpanProcessor;
import io.opentelemetry.sdk.trace.export.BatchSpanProcessor;
import io.opentelemetry.semconv.ServiceAttributes;
import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
@@ -96,6 +99,31 @@ public static void enable(
@Nonnull SdkTracerProviderBuilder tracerProviderBuilder,
@Nonnull SdkLoggerProviderBuilder loggerProviderBuilder,
@Nonnull SdkMeterProviderBuilder meterProviderBuilder) {
+ enable(
+ config,
+ tracerProviderBuilder,
+ List.of(),
+ loggerProviderBuilder,
+ meterProviderBuilder);
+ }
+
+ /**
+ * Add braintrust to existing open telemetry builders, with additional span processors that will
+ * receive spans after Braintrust processing (attachment upload, parent assignment,
+ * etc.).
+ *
+ * The additional processors are composited into the {@link BraintrustSpanProcessor}'s
+ * delegate, so they see the transformed span data rather than the raw original.
+ *
+ * @param additionalDelegates extra span processors that receive post-processed spans alongside
+ * the Braintrust exporter. Pass {@code List.of()} if none are needed.
+ */
+ static void enable(
+ @Nonnull BraintrustConfig config,
+ @Nonnull SdkTracerProviderBuilder tracerProviderBuilder,
+ @Nonnull List additionalDelegates,
+ @Nonnull SdkLoggerProviderBuilder loggerProviderBuilder,
+ @Nonnull SdkMeterProviderBuilder meterProviderBuilder) {
final Duration exportInterval = Duration.ofSeconds(5);
final int maxQueueSize = 2048;
final int maxExportBatchSize = 512;
@@ -109,14 +137,24 @@ public static void enable(
var resource = resourceBuilder.build();
// spans
- var spanProcessor =
- new BraintrustSpanProcessor(
- config,
- BatchSpanProcessor.builder(new BraintrustSpanExporter(config))
- .setScheduleDelay(exportInterval.toMillis(), TimeUnit.MILLISECONDS)
- .setMaxQueueSize(maxQueueSize)
- .setMaxExportBatchSize(maxExportBatchSize)
- .build());
+ var braintrustExporter =
+ BatchSpanProcessor.builder(new BraintrustSpanExporter(config))
+ .setScheduleDelay(exportInterval.toMillis(), TimeUnit.MILLISECONDS)
+ .setMaxQueueSize(maxQueueSize)
+ .setMaxExportBatchSize(maxExportBatchSize)
+ .build();
+
+ SpanProcessor delegate;
+ if (additionalDelegates.isEmpty()) {
+ delegate = braintrustExporter;
+ } else {
+ var all = new ArrayList();
+ all.add(braintrustExporter);
+ all.addAll(additionalDelegates);
+ delegate = SpanProcessor.composite(all);
+ }
+
+ var spanProcessor = new BraintrustSpanProcessor(config, delegate);
tracerProviderBuilder.addResource(resource).addSpanProcessor(spanProcessor);
// logs
var logProcessor =
diff --git a/braintrust-sdk/src/test/java/dev/braintrust/trace/AttachmentProcessorTest.java b/braintrust-sdk/src/test/java/dev/braintrust/trace/AttachmentProcessorTest.java
new file mode 100644
index 00000000..e19d7770
--- /dev/null
+++ b/braintrust-sdk/src/test/java/dev/braintrust/trace/AttachmentProcessorTest.java
@@ -0,0 +1,132 @@
+package dev.braintrust.trace;
+
+import static org.junit.jupiter.api.Assertions.*;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import dev.braintrust.TestHarness;
+import dev.braintrust.json.BraintrustJsonMapper;
+import io.opentelemetry.api.common.AttributeKey;
+import io.opentelemetry.api.trace.Tracer;
+import lombok.SneakyThrows;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Integration test for the attachment processing pipeline.
+ *
+ * Uses the {@link TestHarness} which wires the {@link dev.braintrust.UnitTestSpanExporter} as an
+ * additional delegate inside the {@link BraintrustSpanProcessor}. This means spans retrieved via
+ * {@code awaitExportedSpans} reflect post-processing (base64 data URIs replaced with attachment
+ * references). The VCR layer stubs the S3 upload flow (login, signed URL, PUT, status).
+ */
+public class AttachmentProcessorTest {
+
+ /** A small valid 1x1 PNG encoded as base64. */
+ private static final String BASE64_PNG =
+ "iVBORw0KGgoAAAANSUhEUgAAAAEAAAABCAYAAAAfFcSJAAAADUlEQVR42mP8z8DwHwAFBQIAX8jx0gAAAABJRU5ErkJggg==";
+
+ private static final AttributeKey INPUT_JSON =
+ AttributeKey.stringKey("braintrust.input_json");
+
+ private static TestHarness testHarness;
+ private static Tracer tracer;
+
+ @BeforeAll
+ static void initHarness() {
+ testHarness = TestHarness.setup();
+ tracer = testHarness.openTelemetry().getTracer("attachment-processor-test");
+ }
+
+ @Test
+ @SneakyThrows
+ void base64DataUriInImageUrlIsReplacedWithAttachmentReference() {
+ String inputJson =
+ "[{\"role\":\"user\",\"content\":[{\"type\":\"text\",\"text\":\"What is in this"
+ + " image?\"},"
+ + "{\"type\":\"image_url\",\"image_url\":{\"url\":\"data:image/png;base64,"
+ + BASE64_PNG
+ + "\"}}]}]";
+
+ var span = tracer.spanBuilder("attachment-test-image-url").startSpan();
+ span.setAttribute("braintrust.input_json", inputJson);
+ span.setAttribute("braintrust.parent", "project_name:" + TestHarness.defaultProjectName());
+ span.end();
+
+ var spans = testHarness.awaitExportedSpans(1);
+ var exported =
+ spans.stream()
+ .filter(s -> s.getName().equals("attachment-test-image-url"))
+ .findFirst()
+ .orElseThrow(() -> new AssertionError("span not found"));
+
+ String exportedInputJson = exported.getAttributes().get(INPUT_JSON);
+ assertNotNull(exportedInputJson, "exported span should have braintrust.input_json");
+ assertNotEquals(inputJson, exportedInputJson, "base64 data should have been replaced");
+
+ // Parse and verify the attachment reference
+ JsonNode root = BraintrustJsonMapper.get().readTree(exportedInputJson);
+ JsonNode urlNode = root.get(0).get("content").get(1).get("image_url").get("url");
+
+ assertTrue(
+ urlNode.isObject(), "url should be an object (attachment reference), not a string");
+ assertEquals("braintrust_attachment", urlNode.get("type").asText());
+ assertEquals("image/png", urlNode.get("content_type").asText());
+ assertEquals("attachment.png", urlNode.get("filename").asText());
+ assertNotNull(urlNode.get("key"), "attachment key must be present");
+ assertFalse(urlNode.get("key").asText().isEmpty(), "attachment key must not be empty");
+
+ // Verify the rest of the message structure is intact
+ assertEquals("user", root.get(0).get("role").asText());
+ assertEquals("text", root.get(0).get("content").get(0).get("type").asText());
+ assertEquals(
+ "What is in this image?", root.get(0).get("content").get(0).get("text").asText());
+ assertEquals("image_url", root.get(0).get("content").get(1).get("type").asText());
+ }
+
+ @Test
+ void nonDataUriInputIsUnchanged() {
+ String inputJson = "[{\"role\":\"user\",\"content\":\"Hello, how are you?\"}]";
+
+ var span = tracer.spanBuilder("attachment-test-no-data-uri").startSpan();
+ span.setAttribute("braintrust.input_json", inputJson);
+ span.setAttribute("braintrust.parent", "project_name:" + TestHarness.defaultProjectName());
+ span.end();
+
+ var spans = testHarness.awaitExportedSpans(1);
+ var exported =
+ spans.stream()
+ .filter(s -> s.getName().equals("attachment-test-no-data-uri"))
+ .findFirst()
+ .orElseThrow(() -> new AssertionError("span not found"));
+
+ String exportedInputJson = exported.getAttributes().get(INPUT_JSON);
+ assertEquals(inputJson, exportedInputJson, "input without base64 data should be unchanged");
+ }
+
+ @Test
+ void partialDataUriInTextIsNotReplaced() {
+ // A data URI embedded in surrounding text should NOT be replaced (isEntirelyDataUri check)
+ String inputJson =
+ "[{\"role\":\"user\",\"content\":\"Check this: data:image/png;base64,"
+ + BASE64_PNG
+ + " please\"}]";
+
+ var span = tracer.spanBuilder("attachment-test-partial-data-uri").startSpan();
+ span.setAttribute("braintrust.input_json", inputJson);
+ span.setAttribute("braintrust.parent", "project_name:" + TestHarness.defaultProjectName());
+ span.end();
+
+ var spans = testHarness.awaitExportedSpans(1);
+ var exported =
+ spans.stream()
+ .filter(s -> s.getName().equals("attachment-test-partial-data-uri"))
+ .findFirst()
+ .orElseThrow(() -> new AssertionError("span not found"));
+
+ String exportedInputJson = exported.getAttributes().get(INPUT_JSON);
+ assertEquals(
+ inputJson,
+ exportedInputJson,
+ "partial data URI embedded in text should not be replaced");
+ }
+}
diff --git a/braintrust-sdk/src/test/java/dev/braintrust/trace/AttachmentUploaderTest.java b/braintrust-sdk/src/test/java/dev/braintrust/trace/AttachmentUploaderTest.java
new file mode 100644
index 00000000..5863d39f
--- /dev/null
+++ b/braintrust-sdk/src/test/java/dev/braintrust/trace/AttachmentUploaderTest.java
@@ -0,0 +1,354 @@
+package dev.braintrust.trace;
+
+import static com.github.tomakehurst.wiremock.client.WireMock.*;
+import static org.junit.jupiter.api.Assertions.*;
+
+import com.github.tomakehurst.wiremock.junit5.WireMockRuntimeInfo;
+import com.github.tomakehurst.wiremock.junit5.WireMockTest;
+import dev.braintrust.api.BraintrustOpenApiClient;
+import dev.braintrust.config.BraintrustConfig;
+import java.time.Duration;
+import java.util.Map;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Nested;
+import org.junit.jupiter.api.Test;
+
+@WireMockTest
+public class AttachmentUploaderTest {
+ private AttachmentUploader.S3AttachmentUploader uploader;
+ private String baseUrl;
+
+ @BeforeEach
+ void setUp(WireMockRuntimeInfo wmRuntimeInfo) {
+ baseUrl = wmRuntimeInfo.getHttpBaseUrl();
+ var config = BraintrustConfig.builder().apiKey("test-api-key").apiUrl(baseUrl).build();
+ var apiClient = BraintrustOpenApiClient.of(config);
+ uploader =
+ new AttachmentUploader.S3AttachmentUploader(
+ apiClient, Duration.ofMillis(10_000), 1, Duration.ofMillis(50));
+ }
+
+ @AfterEach
+ void tearDown() {
+ uploader.shutdown(Duration.ofSeconds(0));
+ }
+
+ private void stubLoginAndUploadFlow() {
+ stubFor(
+ post(urlEqualTo("/api/apikey/login"))
+ .willReturn(
+ aResponse()
+ .withStatus(200)
+ .withHeader("Content-Type", "application/json")
+ .withBody(
+ "{\"org_info\":[{\"id\":\"org-123\",\"name\":\"test-org\"}]}")));
+
+ stubFor(
+ post(urlEqualTo("/attachment"))
+ .willReturn(
+ aResponse()
+ .withStatus(200)
+ .withHeader("Content-Type", "application/json")
+ .withBody(
+ "{\"signedUrl\":\""
+ + baseUrl
+ + "/upload\",\"headers\":{}}")));
+
+ stubFor(put(urlEqualTo("/upload")).willReturn(aResponse().withStatus(200)));
+
+ stubFor(post(urlEqualTo("/attachment/status")).willReturn(aResponse().withStatus(200)));
+ }
+
+ // ── Worker / queue integration tests ──────────────────────────────
+
+ @Test
+ void enqueueUploadsSuccessfully() throws Exception {
+ stubLoginAndUploadFlow();
+
+ var ref = AttachmentReference.create("test.json", "application/json");
+ uploader.enqueue(ref, "{\"key\":\"value\"}".getBytes());
+ uploader.forceFlush();
+
+ verify(
+ postRequestedFor(urlEqualTo("/attachment"))
+ .withRequestBody(containing("\"key\":\"" + ref.key() + "\"")));
+ verify(putRequestedFor(urlEqualTo("/upload")));
+ verify(
+ postRequestedFor(urlEqualTo("/attachment/status"))
+ .withRequestBody(containing("\"upload_status\":\"done\"")));
+ }
+
+ @Test
+ void enqueueRejectsAfterShutdown() {
+ assertDoesNotThrow(() -> uploader.shutdown());
+
+ var ref = AttachmentReference.create("test.json", "application/json");
+ assertFalse(uploader.enqueue(ref, "data".getBytes()));
+ }
+
+ @Test
+ void uploadFailureShutsDownWorker() throws Exception {
+ stubFor(
+ post(urlEqualTo("/api/apikey/login"))
+ .willReturn(
+ aResponse()
+ .withStatus(200)
+ .withHeader("Content-Type", "application/json")
+ .withBody(
+ "{\"org_info\":[{\"id\":\"org-123\",\"name\":\"test-org\"}]}")));
+
+ stubFor(
+ post(urlEqualTo("/attachment"))
+ .willReturn(
+ aResponse()
+ .withStatus(200)
+ .withHeader("Content-Type", "application/json")
+ .withBody(
+ "{\"signedUrl\":\""
+ + baseUrl
+ + "/upload\",\"headers\":{}}")));
+
+ stubFor(
+ put(urlEqualTo("/upload"))
+ .willReturn(aResponse().withStatus(500).withBody("Failed")));
+
+ stubFor(post(urlEqualTo("/attachment/status")).willReturn(aResponse().withStatus(200)));
+
+ var ref = AttachmentReference.create("test.json", "application/json");
+ assertTrue(uploader.enqueue(ref, "data".getBytes()));
+ // even errors should notify completion
+ uploader.forceFlush(Duration.ofSeconds(5));
+ assertFalse(uploader.enqueue(ref, "data".getBytes()));
+
+ verify(
+ postRequestedFor(urlEqualTo("/attachment/status"))
+ .withRequestBody(containing("\"upload_status\":\"error\""))
+ .withRequestBody(containing("\"error_message\"")));
+ }
+
+ // ── S3 HTTP-level tests ───────────────────────────────────────────
+
+ @Nested
+ class S3HttpOperations {
+
+ @Test
+ void requestUploadUrlSendsCorrectRequest() throws Exception {
+ stubFor(
+ post(urlEqualTo("/attachment"))
+ .withHeader("Authorization", equalTo("Bearer test-api-key"))
+ .withHeader("Content-Type", equalTo("application/json"))
+ .willReturn(
+ aResponse()
+ .withStatus(200)
+ .withHeader("Content-Type", "application/json")
+ .withBody(
+ "{\"signedUrl\":\""
+ + baseUrl
+ + "/upload\",\"headers\":{\"X-Custom\":\"value\"}}")));
+
+ var response =
+ uploader.requestUploadUrl("org-123", "key-1", "test.json", "application/json");
+
+ assertEquals(baseUrl + "/upload", response.signedUrl());
+ assertEquals("value", response.headers().get("X-Custom"));
+
+ var requests = findAll(postRequestedFor(urlEqualTo("/attachment")));
+ assertEquals(1, requests.size());
+ }
+
+ @Test
+ void requestUploadUrlThrowsOnMissingSignedUrl() {
+ stubFor(
+ post(urlEqualTo("/attachment"))
+ .willReturn(aResponse().withStatus(200).withBody("{\"headers\":{}}")));
+
+ assertThrows(
+ java.io.IOException.class,
+ () ->
+ uploader.requestUploadUrl(
+ "org-123", "key-1", "test.json", "application/json"));
+ }
+
+ @Test
+ void requestUploadUrlThrowsOnApiError() {
+ stubFor(
+ post(urlEqualTo("/attachment"))
+ .willReturn(aResponse().withStatus(500).withBody("Internal error")));
+
+ assertThrows(
+ java.io.IOException.class,
+ () ->
+ uploader.requestUploadUrl(
+ "org-123", "key-1", "test.json", "application/json"));
+ }
+
+ @Test
+ void uploadToSignedUrlIncludesHeaders() throws Exception {
+ byte[] testData = "test data".getBytes();
+
+ stubFor(
+ put(urlEqualTo("/upload"))
+ .withHeader("X-Custom-Header", equalTo("custom-value"))
+ .willReturn(aResponse().withStatus(200)));
+
+ uploader.uploadToSignedUrl(
+ baseUrl + "/upload",
+ Map.of("X-Custom-Header", "custom-value"),
+ "application/octet-stream",
+ testData);
+ }
+
+ @Test
+ void uploadToSignedUrlThrowsOnUploadError() {
+ byte[] testData = "test data".getBytes();
+
+ stubFor(
+ put(urlEqualTo("/upload"))
+ .willReturn(aResponse().withStatus(500).withBody("Upload failed")));
+
+ assertThrows(
+ java.io.IOException.class,
+ () ->
+ uploader.uploadToSignedUrl(
+ baseUrl + "/upload",
+ Map.of(),
+ "application/octet-stream",
+ testData));
+ }
+
+ @Test
+ void updateUploadStatusSendsCorrectRequest() throws Exception {
+ stubFor(
+ post(urlEqualTo("/attachment/status"))
+ .withHeader("Authorization", equalTo("Bearer test-api-key"))
+ .withHeader("Content-Type", equalTo("application/json"))
+ .willReturn(aResponse().withStatus(200)));
+
+ uploader.updateUploadStatus("org-123", "key-1", Map.of("upload_status", "done"));
+
+ verify(
+ postRequestedFor(urlEqualTo("/attachment/status"))
+ .withRequestBody(containing("\"key\":\"key-1\""))
+ .withRequestBody(containing("\"org_id\":\"org-123\""))
+ .withRequestBody(containing("\"upload_status\":\"done\"")));
+ }
+
+ @Test
+ void updateUploadStatusIncludesErrorMessage() throws Exception {
+ stubFor(post(urlEqualTo("/attachment/status")).willReturn(aResponse().withStatus(200)));
+
+ uploader.updateUploadStatus(
+ "org-123",
+ "key-1",
+ Map.of("upload_status", "error", "error_message", "something went wrong"));
+
+ verify(
+ postRequestedFor(urlEqualTo("/attachment/status"))
+ .withRequestBody(
+ containing("\"error_message\":\"something went wrong\"")));
+ }
+
+ @Test
+ void updateUploadStatusThrowsOnApiError() {
+ stubFor(
+ post(urlEqualTo("/attachment/status"))
+ .willReturn(aResponse().withStatus(500).withBody("Error")));
+
+ assertThrows(
+ java.io.IOException.class,
+ () ->
+ uploader.updateUploadStatus(
+ "org-123", "key-1", Map.of("upload_status", "done")));
+ }
+
+ @Test
+ void uploadToNonAzureUrlDoesNotAddBlobHeader() throws Exception {
+ byte[] testData = "azure test".getBytes();
+
+ stubFor(put(urlEqualTo("/non-azure-upload")).willReturn(aResponse().withStatus(200)));
+
+ uploader.uploadToSignedUrl(
+ baseUrl + "/non-azure-upload", Map.of(), "application/octet-stream", testData);
+
+ // Non-Azure URL should NOT have the x-ms-blob-type header
+ verify(
+ putRequestedFor(urlEqualTo("/non-azure-upload"))
+ .withoutHeader("x-ms-blob-type"));
+ }
+
+ @Test
+ void uploadUrlResponseDefaultsNullHeadersToEmptyMap() {
+ var response =
+ new AttachmentUploader.S3AttachmentUploader.UploadUrlResponse(
+ "https://example.com/upload", null);
+
+ assertNotNull(response.headers());
+ assertTrue(response.headers().isEmpty());
+ }
+
+ @Test
+ void requestUploadUrlDefaultsNullHeadersToEmptyMap() throws Exception {
+ stubFor(
+ post(urlEqualTo("/attachment"))
+ .willReturn(
+ aResponse()
+ .withStatus(200)
+ .withHeader("Content-Type", "application/json")
+ .withBody(
+ "{\"signedUrl\":\"" + baseUrl + "/upload\"}")));
+
+ var response =
+ uploader.requestUploadUrl("org-123", "key-1", "test.json", "application/json");
+
+ assertNotNull(response.headers());
+ assertTrue(response.headers().isEmpty());
+ }
+
+ @Test
+ void retryOnServerError() throws Exception {
+ var config = BraintrustConfig.builder().apiKey("test-api-key").apiUrl(baseUrl).build();
+ var apiClient = BraintrustOpenApiClient.of(config);
+ var retryUploader =
+ new AttachmentUploader.S3AttachmentUploader(
+ apiClient, Duration.ofSeconds(30), 2, Duration.ofMillis(100));
+
+ // First two requests fail with 500, third succeeds
+ stubFor(
+ post(urlEqualTo("/attachment"))
+ .inScenario("retry-test")
+ .whenScenarioStateIs("Started")
+ .willReturn(aResponse().withStatus(500).withBody("Error"))
+ .willSetStateTo("retry-1"));
+
+ stubFor(
+ post(urlEqualTo("/attachment"))
+ .inScenario("retry-test")
+ .whenScenarioStateIs("retry-1")
+ .willReturn(aResponse().withStatus(500).withBody("Error"))
+ .willSetStateTo("retry-2"));
+
+ stubFor(
+ post(urlEqualTo("/attachment"))
+ .inScenario("retry-test")
+ .whenScenarioStateIs("retry-2")
+ .willReturn(
+ aResponse()
+ .withStatus(200)
+ .withHeader("Content-Type", "application/json")
+ .withBody(
+ "{\"signedUrl\":\""
+ + baseUrl
+ + "/upload\",\"headers\":{}}")));
+
+ var response =
+ retryUploader.requestUploadUrl(
+ "org-123", "key-1", "test.json", "application/json");
+
+ assertEquals(baseUrl + "/upload", response.signedUrl());
+ verify(3, postRequestedFor(urlEqualTo("/attachment")));
+
+ retryUploader.shutdown(Duration.ofSeconds(0));
+ }
+ }
+}
diff --git a/test-harness/src/testFixtures/java/dev/braintrust/TestHarness.java b/test-harness/src/testFixtures/java/dev/braintrust/TestHarness.java
index 534045c4..16523b24 100644
--- a/test-harness/src/testFixtures/java/dev/braintrust/TestHarness.java
+++ b/test-harness/src/testFixtures/java/dev/braintrust/TestHarness.java
@@ -35,7 +35,7 @@
import dev.braintrust.openapi.model.PromptParserNullish;
import dev.braintrust.openapi.model.SystemContent;
import dev.braintrust.openapi.model.UserContent;
-import dev.braintrust.trace.UnitTestShutdownHook;
+import dev.braintrust.trace.HarnessShim;
import io.opentelemetry.api.GlobalOpenTelemetry;
import io.opentelemetry.api.baggage.propagation.W3CBaggagePropagator;
import io.opentelemetry.api.trace.propagation.W3CTraceContextPropagator;
@@ -110,7 +110,7 @@ public class TestHarness {
"bedrock"),
apiKeysToNeverRecord);
vcr.start();
- UnitTestShutdownHook.addShutdownHook(1, vcr::stop);
+ HarnessShim.addShutdownHook(vcr::stop);
{ // set up default project if needed
var harness = setup();
var client = harness.braintrust.openApiClient();
@@ -176,9 +176,15 @@ private TestHarness(@Nonnull Braintrust braintrust) {
this.spanExporter = new UnitTestSpanExporter();
var loggerBuilder = SdkLoggerProvider.builder();
var meterBuilder = SdkMeterProvider.builder();
- braintrust.openTelemetryEnable(tracerBuilder, loggerBuilder, meterBuilder);
- // Add the in-memory span exporter for testing
- tracerBuilder.addSpanProcessor(SimpleSpanProcessor.create(this.spanExporter));
+ // Wire the in-memory span exporter as an additional delegate inside the
+ // BraintrustSpanProcessor so it sees post-processed spans (attachment references
+ // instead of raw base64 data URIs, etc.).
+ dev.braintrust.trace.HarnessShim.enableTracing(
+ braintrust.config(),
+ tracerBuilder,
+ List.of(SimpleSpanProcessor.create(this.spanExporter)),
+ loggerBuilder,
+ meterBuilder);
var contextPropagator =
ContextPropagators.create(
TextMapPropagator.composite(
diff --git a/test-harness/src/testFixtures/java/dev/braintrust/trace/HarnessShim.java b/test-harness/src/testFixtures/java/dev/braintrust/trace/HarnessShim.java
new file mode 100644
index 00000000..1d273c18
--- /dev/null
+++ b/test-harness/src/testFixtures/java/dev/braintrust/trace/HarnessShim.java
@@ -0,0 +1,33 @@
+package dev.braintrust.trace;
+
+import dev.braintrust.config.BraintrustConfig;
+import io.opentelemetry.sdk.logs.SdkLoggerProviderBuilder;
+import io.opentelemetry.sdk.metrics.SdkMeterProviderBuilder;
+import io.opentelemetry.sdk.trace.SdkTracerProviderBuilder;
+import io.opentelemetry.sdk.trace.SpanProcessor;
+import java.util.List;
+
+public class HarnessShim {
+ public static void addShutdownHook(Runnable target) {
+ BraintrustShutdownHook.addShutdownHook(
+ BraintrustShutdownHook.ShutdownOrder.TEST_HARNESS, target);
+ }
+
+ /**
+ * Enable Braintrust tracing with additional span processors composited into the {@link
+ * BraintrustSpanProcessor}'s delegate chain, so they see post-processed spans.
+ */
+ public static void enableTracing(
+ BraintrustConfig config,
+ SdkTracerProviderBuilder tracerProviderBuilder,
+ List additionalDelegates,
+ SdkLoggerProviderBuilder loggerProviderBuilder,
+ SdkMeterProviderBuilder meterProviderBuilder) {
+ BraintrustTracing.enable(
+ config,
+ tracerProviderBuilder,
+ additionalDelegates,
+ loggerProviderBuilder,
+ meterProviderBuilder);
+ }
+}
diff --git a/test-harness/src/testFixtures/java/dev/braintrust/trace/UnitTestShutdownHook.java b/test-harness/src/testFixtures/java/dev/braintrust/trace/UnitTestShutdownHook.java
deleted file mode 100644
index 79f59502..00000000
--- a/test-harness/src/testFixtures/java/dev/braintrust/trace/UnitTestShutdownHook.java
+++ /dev/null
@@ -1,11 +0,0 @@
-package dev.braintrust.trace;
-
-public class UnitTestShutdownHook {
- public static void addShutdownHook(Runnable target) {
- addShutdownHook(0, target);
- }
-
- public static void addShutdownHook(int order, Runnable target) {
- BraintrustShutdownHook.addShutdownHook(order, target);
- }
-}