diff --git a/braintrust-sdk/src/main/java/dev/braintrust/config/BraintrustConfig.java b/braintrust-sdk/src/main/java/dev/braintrust/config/BraintrustConfig.java index e084b628..0fa42c29 100644 --- a/braintrust-sdk/src/main/java/dev/braintrust/config/BraintrustConfig.java +++ b/braintrust-sdk/src/main/java/dev/braintrust/config/BraintrustConfig.java @@ -43,6 +43,15 @@ public final class BraintrustConfig extends BaseConfig { /** compress otel data before exporting to braintrust */ private final Boolean compressOtelPayload = getConfig("BRAINTRUST_COMPRESS_OTEL_PAYLOAD", true); + /** + * When {@code true} (the default), the Braintrust span processor automatically extracts base64 + * data-URI attachments from LLM conversation spans, uploads them to object storage, and + * replaces the inline data with a lightweight reference. This reduces the size of spans + * exported to Braintrust. + */ + private final Boolean autoConvertAIAttachments = + getConfig("BRAINTRUST_AUTO_CONVERT_AI_ATTACHMENTS", true); + /** Custom SSL context for OTLP exporter. Builder-only field, not backed by envars. */ private final SSLContext sslContext; @@ -198,6 +207,11 @@ public Builder compressOtelPayload(boolean value) { return this; } + public Builder autoConvertAIAttachments(boolean value) { + envOverrides.put("BRAINTRUST_AUTO_CONVERT_AI_ATTACHMENTS", String.valueOf(value)); + return this; + } + public Builder sslContext(SSLContext value) { this.sslContext = value; return this; diff --git a/braintrust-sdk/src/main/java/dev/braintrust/trace/AttachmentProcessor.java b/braintrust-sdk/src/main/java/dev/braintrust/trace/AttachmentProcessor.java new file mode 100644 index 00000000..ed25a46d --- /dev/null +++ b/braintrust-sdk/src/main/java/dev/braintrust/trace/AttachmentProcessor.java @@ -0,0 +1,177 @@ +package dev.braintrust.trace; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ArrayNode; +import com.fasterxml.jackson.databind.node.ObjectNode; +import com.fasterxml.jackson.databind.node.TextNode; +import dev.braintrust.config.BraintrustConfig; +import dev.braintrust.json.BraintrustJsonMapper; +import java.time.Duration; +import java.util.Base64; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import lombok.SneakyThrows; +import lombok.extern.slf4j.Slf4j; + +/** + * Scans JSON content for base64 data URI attachments and replaces them with attachment references + * after uploading to S3. + * + *

Package-private; not exposed in the public API. + */ +@Slf4j +class AttachmentProcessor { + /** + * quick heuristic to determine if the json payload contains a base64 encoded file + * + *

This is used for performance reasons as a fail-fast to avoid doing a json parse. + */ + static final Pattern BASE64_DATA_URI_PATTERN = + Pattern.compile("data:([\\w/\\-.+]+);base64,([A-Za-z0-9+/=]{20,})"); + + private final BraintrustConfig config; + private final AttachmentUploader uploader; + + AttachmentProcessor(BraintrustConfig config, AttachmentUploader uploader) { + this.config = config; + this.uploader = uploader; + } + + /** + * Scans a JSON string for base64 data URIs, uploads them, and returns the modified JSON with + * attachment references. + * + * @param json the JSON string to scan + * @return the modified JSON with base64 data replaced by attachment references, or the original + * JSON if no base64 data was found + */ + String processAndUpload(String json) { + if ((!config.autoConvertAIAttachments()) + || json == null + || uploader.isShutdown() + || !BASE64_DATA_URI_PATTERN.matcher(json).find()) { + return json; + } + + try { + JsonNode root = BraintrustJsonMapper.get().readTree(json); + AtomicBoolean modified = new AtomicBoolean(false); + JsonNode result = replaceBase64Attachments(root, modified); + return modified.get() ? BraintrustJsonMapper.get().writeValueAsString(result) : json; + } catch (UploaderRejectionException e) { + log.debug( + "attachment uploader rejected job. Proceeding without attachment" + + " replacements."); + return json; + } catch (Exception | StackOverflowError e) { + log.info("uploader optimization failed, falling back to span uploads", e); + uploader.shutdown(Duration.ofSeconds(0)); // don't block + return json; + } + } + + // NOTE: not concerned with recursion blowing the stack because we're mutating AI vendor + // messages which are not deep enough for this to be an issue. + private JsonNode replaceBase64Attachments(JsonNode node, AtomicBoolean modified) { + if (node.isTextual()) { + return replaceInText((TextNode) node, modified); + } else if (node.isObject()) { + ObjectNode objectNode = (ObjectNode) node; + ObjectNode result = BraintrustJsonMapper.get().createObjectNode(); + var fieldNames = objectNode.fieldNames(); + while (fieldNames.hasNext()) { + String fieldName = fieldNames.next(); + JsonNode child = objectNode.get(fieldName); + result.set(fieldName, replaceBase64Attachments(child, modified)); + } + return result; + } else if (node.isArray()) { + ArrayNode arrayNode = (ArrayNode) node; + ArrayNode result = BraintrustJsonMapper.get().createArrayNode(); + for (int i = 0; i < arrayNode.size(); i++) { + result.add(replaceBase64Attachments(arrayNode.get(i), modified)); + } + return result; + } + return node; + } + + @SneakyThrows + private JsonNode replaceInText(TextNode textNode, AtomicBoolean modified) { + String value = textNode.asText(); + Matcher matcher = BASE64_DATA_URI_PATTERN.matcher(value); + if (!matcher.find()) { + return textNode; + } + if (!isEntirelyDataUri(value)) { + log.debug("found base64 string but text contained extra content {}", value); + return textNode; + } + + matcher.reset(); + StringBuilder sb = new StringBuilder(); + while (matcher.find()) { + String contentType = matcher.group(1); + String base64Data = matcher.group(2); + byte[] data = Base64.getDecoder().decode(base64Data); + + String extension = contentTypeToExtension(contentType); + String filename = "attachment" + extension; + AttachmentReference ref = AttachmentReference.create(filename, contentType); + + if (!uploader.enqueue(ref, data)) { + throw new UploaderRejectionException("uploader rejected attachment upload"); + } + + matcher.appendReplacement(sb, Matcher.quoteReplacement(ref.toJson())); + } + matcher.appendTail(sb); + + modified.set(true); + + return BraintrustJsonMapper.get().readTree(sb.toString()); + } + + static boolean isEntirelyDataUri(String value) { + String trimmed = value.trim(); + return trimmed.startsWith("data:") + && !trimmed.contains("\"") + && !trimmed.contains("\\") + && !trimmed.contains(" "); + } + + private static String contentTypeToExtension(String contentType) { + switch (contentType.toLowerCase()) { + case "image/png": + return ".png"; + case "image/jpeg": + case "image/jpg": + return ".jpg"; + case "image/gif": + return ".gif"; + case "image/webp": + return ".webp"; + case "image/svg+xml": + return ".svg"; + case "application/pdf": + return ".pdf"; + case "text/plain": + return ".txt"; + case "application/json": + return ".json"; + default: + String[] parts = contentType.split("/"); + if (parts.length == 2) { + return "." + parts[1].split("[;\\-]")[0]; + } + return ""; + } + } + + private static class UploaderRejectionException extends RuntimeException { + public UploaderRejectionException(String message) { + super(message); + } + } +} diff --git a/braintrust-sdk/src/main/java/dev/braintrust/trace/AttachmentReference.java b/braintrust-sdk/src/main/java/dev/braintrust/trace/AttachmentReference.java new file mode 100644 index 00000000..f7e43b42 --- /dev/null +++ b/braintrust-sdk/src/main/java/dev/braintrust/trace/AttachmentReference.java @@ -0,0 +1,43 @@ +package dev.braintrust.trace; + +import java.util.Objects; +import java.util.UUID; +import javax.annotation.Nonnull; + +/** + * Represents an attachment reference stored on a span in place of uploaded attachment data. + * + *

Its shape intentionally matches the cross-SDK Braintrust attachment reference format. + */ +record AttachmentReference( + @Nonnull String type, + @Nonnull String filename, + @Nonnull String contentType, + @Nonnull String key) { + + private static final String DEFAULT_TYPE = "braintrust_attachment"; + + /** + * Creates an attachment reference with a generated UUID key. + * + * @param filename the display filename for the attachment + * @param contentType the MIME type of the attachment content + * @return a new AttachmentReference with a unique key + */ + static AttachmentReference create(@Nonnull String filename, @Nonnull String contentType) { + Objects.requireNonNull(filename, "filename cannot be null"); + Objects.requireNonNull(contentType, "contentType cannot be null"); + return new AttachmentReference( + DEFAULT_TYPE, filename, contentType, UUID.randomUUID().toString()); + } + + public String toJson() { + return "{\"type\":\"braintrust_attachment\",\"content_type\":\"" + + contentType() + + "\",\"filename\":\"" + + filename() + + "\",\"key\":\"" + + key() + + "\"}"; + } +} diff --git a/braintrust-sdk/src/main/java/dev/braintrust/trace/AttachmentUploader.java b/braintrust-sdk/src/main/java/dev/braintrust/trace/AttachmentUploader.java new file mode 100644 index 00000000..9c7aa7d3 --- /dev/null +++ b/braintrust-sdk/src/main/java/dev/braintrust/trace/AttachmentUploader.java @@ -0,0 +1,614 @@ +package dev.braintrust.trace; + +import com.fasterxml.jackson.annotation.JsonProperty; +import dev.braintrust.api.BraintrustOpenApiClient; +import dev.braintrust.json.BraintrustJsonMapper; +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.net.http.HttpRequest; +import java.net.http.HttpResponse; +import java.time.Duration; +import java.util.Map; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicReference; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import lombok.SneakyThrows; +import lombok.extern.slf4j.Slf4j; + +/** + * Uploads Braintrust attachments in the background. + * + *

Implementations accept attachment data via {@link #enqueue}, process them asynchronously, and + * support graceful shutdown with {@link #shutdown}. + */ +interface AttachmentUploader { + /** + * Enqueues an attachment for upload. + * + *

NOTE: if the upload queue is full, this method will block until space becomes available + * + * @param reference the attachment reference metadata + * @param data the attachment data to upload + * @return true if the attachment was successfully enqueued for upload. False if the uploader + * declined to enqueue the message + */ + boolean enqueue(@Nonnull AttachmentReference reference, @Nonnull byte[] data); + + /** runs force flush with a default timeout */ + default void forceFlush() { + forceFlush(Duration.ofSeconds(30)); + } + + /** + * Waits for all currently enqueued uploads to complete with a timeout. + * + *

Concurrency note: Items enqueued concurrently with or after this call are not + * guaranteed to be included. This is safe because callers that need ordering should enqueue + * first, then flush. + * + * @param timeout the maximum time to wait + * @return true if all uploads completed, false if timed out + */ + boolean forceFlush(@Nonnull Duration timeout); + + /** runs shutdown with a default timeout */ + default void shutdown() { + // dropping s3 uploads is a bad user experience so shut down with a gracious timeout + shutdown(Duration.ofSeconds(120)); + } + + /** + * Shuts down the uploader with a custom timeout. + * + * @param timeout the maximum time to wait for pending uploads + */ + void shutdown(@Nonnull Duration timeout); + + boolean isShutdown(); + + /** + * Background uploader for Braintrust attachments that uploads to S3 via signed URLs. + * + *

Uploads are enqueued and processed by a single-threaded worker that: + * + *

    + *
  1. Requests a signed upload URL from the Braintrust API + *
  2. Uploads the data to the signed URL + *
  3. Reports the upload status (done/error) to the Braintrust API + *
+ * + *

The uploader starts lazily on first enqueue and can be shut down gracefully. + */ + @Slf4j + class S3AttachmentUploader implements AttachmentUploader { + private static final int QUEUE_SIZE = 1024; + + /** Default per-request timeout for HTTP calls. */ + private static final Duration DEFAULT_REQUEST_TIMEOUT = Duration.ofSeconds(60); + + /** Default maximum number of retry attempts for transient failures. */ + private static final int DEFAULT_MAX_RETRIES = 8; + + /** Default initial backoff delay between retries. Doubles on each subsequent attempt. */ + private static final Duration DEFAULT_INITIAL_RETRY_DELAY = Duration.ofMillis(500); + + private final BraintrustOpenApiClient apiClient; + private final Duration requestTimeout; + private final int maxRetries; + private final Duration initialRetryDelay; + + private final LinkedBlockingQueue queue; + private final AtomicReference worker = new AtomicReference<>(); + private final AtomicReference orgId = new AtomicReference<>(); + + // non thread safe fields must be checked and read under the lock + private final Object lock = new Object(); + private boolean rejectNewJobs = false; + private boolean workerDone = false; + private CountDownLatch currentBatch = new CountDownLatch(1); + + /** + * Creates a new attachment uploader with default retry settings. + * + * @param apiClient the Braintrust API client (provides auth, base URL, and HTTP transport) + */ + S3AttachmentUploader(@Nonnull BraintrustOpenApiClient apiClient) { + this( + apiClient, + DEFAULT_REQUEST_TIMEOUT, + DEFAULT_MAX_RETRIES, + DEFAULT_INITIAL_RETRY_DELAY); + } + + /** + * Creates a new attachment uploader with custom retry settings. + * + * @param apiClient the Braintrust API client (provides auth, base URL, and HTTP transport) + * @param requestTimeout the per-request timeout for HTTP calls + * @param maxRetries the maximum number of retry attempts for transient failures + * @param initialRetryDelay the initial backoff delay between retries (doubles on each + * attempt) + */ + S3AttachmentUploader( + @Nonnull BraintrustOpenApiClient apiClient, + @Nonnull Duration requestTimeout, + int maxRetries, + @Nonnull Duration initialRetryDelay) { + if (requestTimeout.toMillis() < 0) { + throw new IllegalArgumentException("requestTimeout must be >= 0"); + } + if (maxRetries <= 0) { + throw new IllegalArgumentException("maxRetries must be > 0"); + } + if (initialRetryDelay.toMillis() < 0) { + throw new IllegalArgumentException("initialRetryDelay must be >= 0"); + } + this.apiClient = apiClient; + this.requestTimeout = requestTimeout; + this.maxRetries = maxRetries; + this.initialRetryDelay = initialRetryDelay; + this.queue = new LinkedBlockingQueue<>(QUEUE_SIZE); + BraintrustShutdownHook.addShutdownHook( + BraintrustShutdownHook.ShutdownOrder.ATTACHMENT_UPLOADER, this::shutdown); + } + + @Override + public boolean enqueue(@Nonnull AttachmentReference reference, @Nonnull byte[] data) { + if (checkRejectNewJobsThreadSafe()) { + return false; + } + try { + ensureWorkerStarted(); + UploadJob job = new UploadJob(reference, data); + return queue.offer(job, 0, TimeUnit.MILLISECONDS); + } catch (Exception e) { + log.error("failed to enqueue attachment", e); + shutdown(); + return false; + } + } + + @Override + @SneakyThrows + public boolean forceFlush(@Nonnull Duration timeout) { + return awaitCurrentBatch(timeout.toMillis(), TimeUnit.MILLISECONDS); + } + + @Override + @SneakyThrows + public void shutdown(@Nonnull Duration timeout) { + synchronized (lock) { + rejectNewJobs = true; + if (workerDone) { + return; + } + } + ExecutorService executor = worker.getAndSet(null); + if (executor == null) { + return; + } + executor.shutdown(); + if (!executor.awaitTermination(timeout.toMillis(), TimeUnit.MILLISECONDS)) { + log.warn("failed to gracefully shut down s3 upload worker"); + executor.shutdownNow(); + } + } + + @Override + public boolean isShutdown() { + return checkRejectNewJobsThreadSafe(); + } + + // ── Worker lifecycle ────────────────────────────────────────────── + + /** + * start worker thread or do nothing if already started + * + *

calling this method does not require the lock + */ + private void ensureWorkerStarted() { + if (worker.get() == null) { + var newWorker = + Executors.newSingleThreadExecutor( + r -> { + Thread t = new Thread(r, "braintrust-attachment-uploader"); + t.setDaemon(true); + return t; + }); + if (worker.compareAndSet(null, newWorker)) { + // NOTE: if shutdown is called concurrently job submission may throw an + // exception. This is fine. + newWorker.submit(this::workerLoop); + } else { + // tried to start the worker concurrently. This is fine, we'll just shut down + // and dereference the redundant worker + newWorker.shutdown(); + } + } + } + + private void workerLoop() { + log.debug("Attachment uploader worker started"); + while ((!checkRejectNewJobsThreadSafe()) || queue.peek() != null) { + UploadJob job = null; + try { + job = queue.poll(100, TimeUnit.MILLISECONDS); + if (job == null) { + finishCurrentBatch(); + } else { + upload(job); + } + } catch (InterruptedException e) { + // worker thread shutdownNow was invoked + if (!queue.isEmpty()) { + log.warn( + "s3 uploader force shutdown was reached. Dropping {} uploads", + queue.size(), + e); + } + break; + } catch (Exception e) { + // this only user of this util is our span processor so we'll just fall back to + // sending attachments in span data if an error occurs + synchronized (lock) { + rejectNewJobs = true; + } + if (job == null) { + log.warn("Failed to upload attachment", e); + } else { + log.warn("Failed to upload attachment key={}", job.reference().key(), e); + reportStatus(job.reference().key(), "error", e.getMessage()); + } + // NOTE: we'll continue the loop attempting uploads of the remaining jobs until + // the queue is drained + } + } + synchronized (lock) { + workerDone = true; + finishCurrentBatch(); + log.debug("Attachment uploader worker stopped"); + } + } + + private boolean checkRejectNewJobsThreadSafe() { + synchronized (lock) { + return rejectNewJobs; + } + } + + // ── Upload orchestration ────────────────────────────────────────── + + private void upload(@Nonnull UploadJob job) throws IOException, InterruptedException { + UploadUrlResponse urlResponse = + requestUploadUrl( + getOrgId(), + job.reference().key(), + job.reference().filename(), + job.reference().contentType()); + + uploadToSignedUrl( + urlResponse.signedUrl(), + urlResponse.headers(), + job.reference().contentType(), + job.data()); + + reportStatus(job.reference().key(), "done", null); + } + + private String getOrgId() { + if (orgId.get() != null) { + return orgId.get(); + } + return orgId.updateAndGet(curr -> curr != null ? curr : resolveOrgId()); + } + + private String resolveOrgId() { + try { + var loginResponse = apiClient.login(); + if (loginResponse.orgInfo() != null && !loginResponse.orgInfo().isEmpty()) { + return loginResponse.orgInfo().get(0).id(); + } else { + throw new IllegalStateException("No org info returned from login"); + } + } catch (Exception e) { + throw new RuntimeException("Failed to resolve org ID for attachment upload", e); + } + } + + private void reportStatus( + @Nonnull String key, @Nonnull String status, @Nullable String errorMessage) { + try { + var statusMap = new java.util.HashMap(); + statusMap.put("upload_status", status); + if (errorMessage != null) { + statusMap.put("error_message", errorMessage); + } + updateUploadStatus(getOrgId(), key, statusMap); + } catch (Exception e) { + log.warn("Failed to report attachment status key={} status={}", key, status, e); + } + } + + // ── S3 HTTP operations ──────────────────────────────────────────── + + /** + * Requests a signed upload URL from the Braintrust API. + * + * @param orgId the organization ID + * @param key the attachment key + * @param filename the filename for the attachment + * @param contentType the MIME type of the attachment + * @return the signed URL response with upload URL and required headers + * @throws IOException if the request fails + * @throws InterruptedException if the request is interrupted + */ + UploadUrlResponse requestUploadUrl( + @Nonnull String orgId, + @Nonnull String key, + @Nonnull String filename, + @Nonnull String contentType) + throws IOException, InterruptedException { + + var requestBody = + BraintrustJsonMapper.get() + .writeValueAsString( + new UploadUrlRequest(key, filename, contentType, orgId)); + + var requestBuilder = + HttpRequest.newBuilder() + .uri(toUri(apiClient.getBaseUri() + "/attachment")) + .timeout(requestTimeout) + .header("Content-Type", "application/json") + .header("Accept", "application/json") + .POST(HttpRequest.BodyPublishers.ofString(requestBody)); + + if (apiClient.getRequestInterceptor() != null) { + apiClient.getRequestInterceptor().accept(requestBuilder); + } + + HttpResponse response = + sendWithRetry(requestBuilder.build(), HttpResponse.BodyHandlers.ofString()); + + if (!isSuccessStatus(response.statusCode())) { + throw new IOException( + "Failed to request upload URL: HTTP " + + response.statusCode() + + " - " + + response.body()); + } + + UploadUrlResponse result = + BraintrustJsonMapper.get().readValue(response.body(), UploadUrlResponse.class); + + if (result.signedUrl() == null || result.signedUrl().isEmpty()) { + throw new IOException("Signed URL response missing signedUrl"); + } + + return result; + } + + /** + * Uploads data to a signed URL with the specified headers. + * + *

Automatically detects Azure Blob Storage URLs and adds the required {@code + * x-ms-blob-type: BlockBlob} header. + * + * @param signedUrl the signed upload URL + * @param headers additional headers to include in the upload request + * @param contentType the MIME type of the data being uploaded + * @param data the data to upload + * @throws IOException if the upload fails + * @throws InterruptedException if the upload is interrupted + */ + void uploadToSignedUrl( + @Nonnull String signedUrl, + @Nonnull Map headers, + @Nonnull String contentType, + @Nonnull byte[] data) + throws IOException, InterruptedException { + + var requestBuilder = + HttpRequest.newBuilder() + .uri(toUri(signedUrl)) + .timeout(requestTimeout) + .header("Content-Type", contentType) + .PUT(HttpRequest.BodyPublishers.ofByteArray(data)); + + for (var entry : headers.entrySet()) { + requestBuilder.header(entry.getKey(), entry.getValue()); + } + + addAzureBlobHeaders(signedUrl, requestBuilder); + + HttpResponse response = + sendWithRetry(requestBuilder.build(), HttpResponse.BodyHandlers.ofString()); + + if (!isSuccessStatus(response.statusCode())) { + throw new IOException( + "Failed to upload to object store: HTTP " + + response.statusCode() + + " - " + + response.body()); + } + } + + /** + * Updates the upload status for an attachment. + * + * @param orgId the organization ID + * @param key the attachment key + * @param status the status map (e.g., {"upload_status": "done"} or {"upload_status": + * "error", "error_message": "..."}) + * @throws IOException if the request fails + * @throws InterruptedException if the request is interrupted + */ + void updateUploadStatus( + @Nonnull String orgId, @Nonnull String key, @Nonnull Map status) + throws IOException, InterruptedException { + + var requestBody = + BraintrustJsonMapper.get() + .writeValueAsString(new StatusRequest(key, orgId, status)); + + var requestBuilder = + HttpRequest.newBuilder() + .uri(toUri(apiClient.getBaseUri() + "/attachment/status")) + .timeout(requestTimeout) + .header("Content-Type", "application/json") + .POST(HttpRequest.BodyPublishers.ofString(requestBody)); + + if (apiClient.getRequestInterceptor() != null) { + apiClient.getRequestInterceptor().accept(requestBuilder); + } + + HttpResponse response = + sendWithRetry(requestBuilder.build(), HttpResponse.BodyHandlers.ofString()); + + if (!isSuccessStatus(response.statusCode())) { + throw new IOException( + "Failed to update upload status: HTTP " + + response.statusCode() + + " - " + + response.body()); + } + } + + // ── HTTP helpers ────────────────────────────────────────────────── + + /** Returns {@code true} if the HTTP status code indicates success (2xx). */ + private static boolean isSuccessStatus(int statusCode) { + return statusCode >= 200 && statusCode < 300; + } + + /** + * Parses a URI string. + * + * @param uriString the string to parse + * @return the parsed URI + */ + @SneakyThrows + private static URI toUri(@Nonnull String uriString) { + return new URI(uriString); + } + + /** + * Sends an HTTP request with retry logic for transient failures. + * + *

Retries on 5xx server errors and {@link IOException} (network errors) up to {@link + * #maxRetries} times with exponential backoff starting at {@link #initialRetryDelay}. + * + * @param request the request to send + * @param bodyHandler the response body handler + * @param the response body type + * @return the HTTP response + * @throws IOException if all retries are exhausted + * @throws InterruptedException if the thread is interrupted during retry backoff + */ + private HttpResponse sendWithRetry( + @Nonnull HttpRequest request, @Nonnull HttpResponse.BodyHandler bodyHandler) + throws IOException, InterruptedException { + + IOException lastException = null; + long backoffMs = initialRetryDelay.toMillis(); + + for (int attempt = 0; attempt <= maxRetries; attempt++) { + if (attempt > 0) { + log.debug( + "Retrying request {} (attempt {}/{})", + request.uri(), + attempt, + maxRetries); + Thread.sleep(backoffMs); + backoffMs *= 2; + } + + HttpResponse response; + try { + response = apiClient.getHttpClient().send(request, bodyHandler); + } catch (IOException e) { + lastException = e; + log.debug("Request to {} failed with IOException", request.uri(), e); + continue; + } + + // Don't retry client errors (4xx) or successes + if (response.statusCode() < 500) { + return response; + } + + // Server error (5xx) — retry + lastException = new IOException("Server error: HTTP " + response.statusCode()); + log.debug( + "Request to {} returned status {}, will retry", + request.uri(), + response.statusCode()); + } + + throw new IOException( + "Request to " + request.uri() + " failed after " + maxRetries + " retries", + lastException); + } + + /** Adds Azure Blob Storage specific headers when the signed URL points to Azure. */ + private static void addAzureBlobHeaders( + @Nonnull String signedUrl, HttpRequest.Builder requestBuilder) { + try { + var uri = new URI(signedUrl); + String host = uri.getHost(); + if (host != null && host.endsWith(".blob.core.windows.net")) { + requestBuilder.header("x-ms-blob-type", "BlockBlob"); + } + } catch (URISyntaxException e) { + log.warn("Failed to parse signed URL for Azure detection: {}", signedUrl, e); + } + } + + // ── Batch coordination ──────────────────────────────────────────── + + private void finishCurrentBatch() { + synchronized (lock) { + currentBatch.countDown(); + currentBatch = new CountDownLatch(1); + } + } + + private boolean awaitCurrentBatch(long timeout, TimeUnit timeUnit) + throws InterruptedException { + CountDownLatch latch; + synchronized (lock) { + latch = currentBatch; + } + return latch.await(timeout, timeUnit); + } + + // ── DTOs ────────────────────────────────────────────────────────── + + private record UploadJob(AttachmentReference reference, byte[] data) {} + + /** Response from requesting a signed upload URL. */ + record UploadUrlResponse( + @JsonProperty("signedUrl") @Nonnull String signedUrl, + @JsonProperty("headers") @Nonnull Map headers) { + + /** Compact constructor that enforces non-null headers with an empty map default. */ + UploadUrlResponse { + if (headers == null) { + headers = Map.of(); + } + } + } + + private record UploadUrlRequest( + @Nonnull String key, + @Nonnull String filename, + @JsonProperty("content_type") @Nonnull String contentType, + @JsonProperty("org_id") @Nonnull String orgId) {} + + private record StatusRequest( + @Nonnull String key, + @JsonProperty("org_id") @Nonnull String orgId, + @Nonnull Map status) {} + } +} diff --git a/braintrust-sdk/src/main/java/dev/braintrust/trace/BraintrustShutdownHook.java b/braintrust-sdk/src/main/java/dev/braintrust/trace/BraintrustShutdownHook.java index a6f19910..824f2003 100644 --- a/braintrust-sdk/src/main/java/dev/braintrust/trace/BraintrustShutdownHook.java +++ b/braintrust-sdk/src/main/java/dev/braintrust/trace/BraintrustShutdownHook.java @@ -6,6 +6,14 @@ import java.util.concurrent.CopyOnWriteArrayList; class BraintrustShutdownHook { + + /** Shutdown ordering. Lower ordinal values run first. */ + enum ShutdownOrder { + SPAN_PROCESSOR, + ATTACHMENT_UPLOADER, + TEST_HARNESS; + } + private record OrderedTarget(int order, Runnable target) {} private static final List shutdownTargets = new CopyOnWriteArrayList<>(); @@ -22,10 +30,21 @@ private record OrderedTarget(int order, Runnable target) {} } public static void addShutdownHook(Runnable target) { - addShutdownHook(0, target); + addShutdownHook(ShutdownOrder.SPAN_PROCESSOR, target); + } + + public static void addShutdownHook(ShutdownOrder order, Runnable target) { + addShutdownHook(order.ordinal(), target); } - public static void addShutdownHook(int order, Runnable target) { + /** + * Add a jvm shutdown hook. + * + * @param order lower numbers run first. targets with the same order number can run in any + * order. Span processor/exporter flush runs at level 0 + * @param target the shutdown code to run + */ + private static void addShutdownHook(int order, Runnable target) { shutdownTargets.add(new OrderedTarget(order, target)); } } diff --git a/braintrust-sdk/src/main/java/dev/braintrust/trace/BraintrustSpanProcessor.java b/braintrust-sdk/src/main/java/dev/braintrust/trace/BraintrustSpanProcessor.java index 532193bd..813a12c8 100644 --- a/braintrust-sdk/src/main/java/dev/braintrust/trace/BraintrustSpanProcessor.java +++ b/braintrust-sdk/src/main/java/dev/braintrust/trace/BraintrustSpanProcessor.java @@ -1,13 +1,19 @@ package dev.braintrust.trace; +import dev.braintrust.api.BraintrustOpenApiClient; import dev.braintrust.config.BraintrustConfig; import io.opentelemetry.api.common.AttributeKey; +import io.opentelemetry.api.common.Attributes; import io.opentelemetry.context.Context; import io.opentelemetry.sdk.common.CompletableResultCode; +import io.opentelemetry.sdk.common.InstrumentationLibraryInfo; import io.opentelemetry.sdk.trace.ReadWriteSpan; import io.opentelemetry.sdk.trace.ReadableSpan; import io.opentelemetry.sdk.trace.SpanProcessor; +import io.opentelemetry.sdk.trace.data.DelegatingSpanData; +import io.opentelemetry.sdk.trace.data.SpanData; import java.util.List; +import java.util.Objects; import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -21,7 +27,10 @@ */ @Slf4j public class BraintrustSpanProcessor implements SpanProcessor { - // Braintrust-specific attributes + static final AttributeKey INPUT_JSON = AttributeKey.stringKey("braintrust.input_json"); + static final AttributeKey OUTPUT_JSON = + AttributeKey.stringKey("braintrust.output_json"); + public static final AttributeKey PARENT = AttributeKey.stringKey(BraintrustTracing.PARENT_KEY); @@ -29,11 +38,17 @@ public class BraintrustSpanProcessor implements SpanProcessor { private final SpanProcessor delegate; private final List samplers; private final ConcurrentMap parentContexts = new ConcurrentHashMap<>(); + private final AttachmentProcessor attachmentProcessor; BraintrustSpanProcessor(BraintrustConfig config, SpanProcessor delegate) { this.config = config; this.delegate = delegate; this.samplers = buildSamplers(config); + this.attachmentProcessor = + new AttachmentProcessor( + config, + new AttachmentUploader.S3AttachmentUploader( + BraintrustOpenApiClient.of(config))); } private static List buildSamplers(BraintrustConfig config) { @@ -113,12 +128,25 @@ public void onEnd(ReadableSpan span) { return; } } - delegate.onEnd(span); + + var spanData = span.toSpanData(); + @Nullable String inputJson = spanData.getAttributes().get(INPUT_JSON); + @Nullable String outputJson = spanData.getAttributes().get(OUTPUT_JSON); + + @Nullable String newInputJson = attachmentProcessor.processAndUpload(inputJson); + @Nullable String newOutputJson = attachmentProcessor.processAndUpload(outputJson); + + if (!Objects.equals(newInputJson, inputJson) + || !Objects.equals(newOutputJson, outputJson)) { + delegate.onEnd(new TransformedReadableSpan(span, newInputJson, newOutputJson)); + } else { + delegate.onEnd(span); + } } @Override public boolean isEndRequired() { - return !samplers.isEmpty() || delegate.isEndRequired(); + return config.autoConvertAIAttachments() || !samplers.isEmpty() || delegate.isEndRequired(); } @Override @@ -170,4 +198,93 @@ public static ParentContext experiment(String experimentId) { return new ParentContext(null, experimentId, ParentType.EXPERIMENT); } } + + /** + * otel java does not implement onEnding, so this is the most idiomatic way to mutate a span + * once it ends + */ + private static class TransformedReadableSpan implements ReadableSpan { + private final ReadableSpan delegate; + private final Attributes attributes; + + TransformedReadableSpan(ReadableSpan delegate, String inputJson, String outputJson) { + this.delegate = delegate; + var builder = delegate.getAttributes().toBuilder(); + builder.put(INPUT_JSON, inputJson); + builder.put(OUTPUT_JSON, outputJson); + attributes = builder.build(); + } + + @Override + public Attributes getAttributes() { + return attributes; + } + + @Override + @SuppressWarnings("unchecked") + public T getAttribute(AttributeKey key) { + if (key.equals(INPUT_JSON)) { + return (T) attributes.get(INPUT_JSON); + } + if (key.equals(OUTPUT_JSON)) { + return (T) attributes.get(OUTPUT_JSON); + } + return delegate.getAttribute(key); + } + + @Override + public SpanData toSpanData() { + return new DelegatingSpanData(delegate.toSpanData()) { + @Override + public io.opentelemetry.api.common.Attributes getAttributes() { + return TransformedReadableSpan.this.getAttributes(); + } + + @Override + public int getTotalAttributeCount() { + return getAttributes().size(); + } + }; + } + + @Override + public String getName() { + return delegate.getName(); + } + + @Override + public io.opentelemetry.api.trace.SpanContext getSpanContext() { + return delegate.getSpanContext(); + } + + @Override + public boolean hasEnded() { + return delegate.hasEnded(); + } + + @Override + public io.opentelemetry.sdk.common.InstrumentationScopeInfo getInstrumentationScopeInfo() { + return delegate.getInstrumentationScopeInfo(); + } + + @Override + public InstrumentationLibraryInfo getInstrumentationLibraryInfo() { + return delegate.getInstrumentationLibraryInfo(); + } + + @Override + public long getLatencyNanos() { + return delegate.getLatencyNanos(); + } + + @Override + public io.opentelemetry.api.trace.SpanContext getParentSpanContext() { + return delegate.getParentSpanContext(); + } + + @Override + public io.opentelemetry.api.trace.SpanKind getKind() { + return delegate.getKind(); + } + } } diff --git a/braintrust-sdk/src/main/java/dev/braintrust/trace/BraintrustTracing.java b/braintrust-sdk/src/main/java/dev/braintrust/trace/BraintrustTracing.java index f38bd3da..c3574038 100644 --- a/braintrust-sdk/src/main/java/dev/braintrust/trace/BraintrustTracing.java +++ b/braintrust-sdk/src/main/java/dev/braintrust/trace/BraintrustTracing.java @@ -19,9 +19,12 @@ import io.opentelemetry.sdk.resources.Resource; import io.opentelemetry.sdk.trace.SdkTracerProvider; import io.opentelemetry.sdk.trace.SdkTracerProviderBuilder; +import io.opentelemetry.sdk.trace.SpanProcessor; import io.opentelemetry.sdk.trace.export.BatchSpanProcessor; import io.opentelemetry.semconv.ServiceAttributes; import java.time.Duration; +import java.util.ArrayList; +import java.util.List; import java.util.Properties; import java.util.concurrent.TimeUnit; import java.util.stream.Stream; @@ -96,6 +99,31 @@ public static void enable( @Nonnull SdkTracerProviderBuilder tracerProviderBuilder, @Nonnull SdkLoggerProviderBuilder loggerProviderBuilder, @Nonnull SdkMeterProviderBuilder meterProviderBuilder) { + enable( + config, + tracerProviderBuilder, + List.of(), + loggerProviderBuilder, + meterProviderBuilder); + } + + /** + * Add braintrust to existing open telemetry builders, with additional span processors that will + * receive spans after Braintrust processing (attachment upload, parent assignment, + * etc.). + * + *

The additional processors are composited into the {@link BraintrustSpanProcessor}'s + * delegate, so they see the transformed span data rather than the raw original. + * + * @param additionalDelegates extra span processors that receive post-processed spans alongside + * the Braintrust exporter. Pass {@code List.of()} if none are needed. + */ + static void enable( + @Nonnull BraintrustConfig config, + @Nonnull SdkTracerProviderBuilder tracerProviderBuilder, + @Nonnull List additionalDelegates, + @Nonnull SdkLoggerProviderBuilder loggerProviderBuilder, + @Nonnull SdkMeterProviderBuilder meterProviderBuilder) { final Duration exportInterval = Duration.ofSeconds(5); final int maxQueueSize = 2048; final int maxExportBatchSize = 512; @@ -109,14 +137,24 @@ public static void enable( var resource = resourceBuilder.build(); // spans - var spanProcessor = - new BraintrustSpanProcessor( - config, - BatchSpanProcessor.builder(new BraintrustSpanExporter(config)) - .setScheduleDelay(exportInterval.toMillis(), TimeUnit.MILLISECONDS) - .setMaxQueueSize(maxQueueSize) - .setMaxExportBatchSize(maxExportBatchSize) - .build()); + var braintrustExporter = + BatchSpanProcessor.builder(new BraintrustSpanExporter(config)) + .setScheduleDelay(exportInterval.toMillis(), TimeUnit.MILLISECONDS) + .setMaxQueueSize(maxQueueSize) + .setMaxExportBatchSize(maxExportBatchSize) + .build(); + + SpanProcessor delegate; + if (additionalDelegates.isEmpty()) { + delegate = braintrustExporter; + } else { + var all = new ArrayList(); + all.add(braintrustExporter); + all.addAll(additionalDelegates); + delegate = SpanProcessor.composite(all); + } + + var spanProcessor = new BraintrustSpanProcessor(config, delegate); tracerProviderBuilder.addResource(resource).addSpanProcessor(spanProcessor); // logs var logProcessor = diff --git a/braintrust-sdk/src/test/java/dev/braintrust/trace/AttachmentProcessorTest.java b/braintrust-sdk/src/test/java/dev/braintrust/trace/AttachmentProcessorTest.java new file mode 100644 index 00000000..e19d7770 --- /dev/null +++ b/braintrust-sdk/src/test/java/dev/braintrust/trace/AttachmentProcessorTest.java @@ -0,0 +1,132 @@ +package dev.braintrust.trace; + +import static org.junit.jupiter.api.Assertions.*; + +import com.fasterxml.jackson.databind.JsonNode; +import dev.braintrust.TestHarness; +import dev.braintrust.json.BraintrustJsonMapper; +import io.opentelemetry.api.common.AttributeKey; +import io.opentelemetry.api.trace.Tracer; +import lombok.SneakyThrows; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; + +/** + * Integration test for the attachment processing pipeline. + * + *

Uses the {@link TestHarness} which wires the {@link dev.braintrust.UnitTestSpanExporter} as an + * additional delegate inside the {@link BraintrustSpanProcessor}. This means spans retrieved via + * {@code awaitExportedSpans} reflect post-processing (base64 data URIs replaced with attachment + * references). The VCR layer stubs the S3 upload flow (login, signed URL, PUT, status). + */ +public class AttachmentProcessorTest { + + /** A small valid 1x1 PNG encoded as base64. */ + private static final String BASE64_PNG = + "iVBORw0KGgoAAAANSUhEUgAAAAEAAAABCAYAAAAfFcSJAAAADUlEQVR42mP8z8DwHwAFBQIAX8jx0gAAAABJRU5ErkJggg=="; + + private static final AttributeKey INPUT_JSON = + AttributeKey.stringKey("braintrust.input_json"); + + private static TestHarness testHarness; + private static Tracer tracer; + + @BeforeAll + static void initHarness() { + testHarness = TestHarness.setup(); + tracer = testHarness.openTelemetry().getTracer("attachment-processor-test"); + } + + @Test + @SneakyThrows + void base64DataUriInImageUrlIsReplacedWithAttachmentReference() { + String inputJson = + "[{\"role\":\"user\",\"content\":[{\"type\":\"text\",\"text\":\"What is in this" + + " image?\"}," + + "{\"type\":\"image_url\",\"image_url\":{\"url\":\"data:image/png;base64," + + BASE64_PNG + + "\"}}]}]"; + + var span = tracer.spanBuilder("attachment-test-image-url").startSpan(); + span.setAttribute("braintrust.input_json", inputJson); + span.setAttribute("braintrust.parent", "project_name:" + TestHarness.defaultProjectName()); + span.end(); + + var spans = testHarness.awaitExportedSpans(1); + var exported = + spans.stream() + .filter(s -> s.getName().equals("attachment-test-image-url")) + .findFirst() + .orElseThrow(() -> new AssertionError("span not found")); + + String exportedInputJson = exported.getAttributes().get(INPUT_JSON); + assertNotNull(exportedInputJson, "exported span should have braintrust.input_json"); + assertNotEquals(inputJson, exportedInputJson, "base64 data should have been replaced"); + + // Parse and verify the attachment reference + JsonNode root = BraintrustJsonMapper.get().readTree(exportedInputJson); + JsonNode urlNode = root.get(0).get("content").get(1).get("image_url").get("url"); + + assertTrue( + urlNode.isObject(), "url should be an object (attachment reference), not a string"); + assertEquals("braintrust_attachment", urlNode.get("type").asText()); + assertEquals("image/png", urlNode.get("content_type").asText()); + assertEquals("attachment.png", urlNode.get("filename").asText()); + assertNotNull(urlNode.get("key"), "attachment key must be present"); + assertFalse(urlNode.get("key").asText().isEmpty(), "attachment key must not be empty"); + + // Verify the rest of the message structure is intact + assertEquals("user", root.get(0).get("role").asText()); + assertEquals("text", root.get(0).get("content").get(0).get("type").asText()); + assertEquals( + "What is in this image?", root.get(0).get("content").get(0).get("text").asText()); + assertEquals("image_url", root.get(0).get("content").get(1).get("type").asText()); + } + + @Test + void nonDataUriInputIsUnchanged() { + String inputJson = "[{\"role\":\"user\",\"content\":\"Hello, how are you?\"}]"; + + var span = tracer.spanBuilder("attachment-test-no-data-uri").startSpan(); + span.setAttribute("braintrust.input_json", inputJson); + span.setAttribute("braintrust.parent", "project_name:" + TestHarness.defaultProjectName()); + span.end(); + + var spans = testHarness.awaitExportedSpans(1); + var exported = + spans.stream() + .filter(s -> s.getName().equals("attachment-test-no-data-uri")) + .findFirst() + .orElseThrow(() -> new AssertionError("span not found")); + + String exportedInputJson = exported.getAttributes().get(INPUT_JSON); + assertEquals(inputJson, exportedInputJson, "input without base64 data should be unchanged"); + } + + @Test + void partialDataUriInTextIsNotReplaced() { + // A data URI embedded in surrounding text should NOT be replaced (isEntirelyDataUri check) + String inputJson = + "[{\"role\":\"user\",\"content\":\"Check this: data:image/png;base64," + + BASE64_PNG + + " please\"}]"; + + var span = tracer.spanBuilder("attachment-test-partial-data-uri").startSpan(); + span.setAttribute("braintrust.input_json", inputJson); + span.setAttribute("braintrust.parent", "project_name:" + TestHarness.defaultProjectName()); + span.end(); + + var spans = testHarness.awaitExportedSpans(1); + var exported = + spans.stream() + .filter(s -> s.getName().equals("attachment-test-partial-data-uri")) + .findFirst() + .orElseThrow(() -> new AssertionError("span not found")); + + String exportedInputJson = exported.getAttributes().get(INPUT_JSON); + assertEquals( + inputJson, + exportedInputJson, + "partial data URI embedded in text should not be replaced"); + } +} diff --git a/braintrust-sdk/src/test/java/dev/braintrust/trace/AttachmentUploaderTest.java b/braintrust-sdk/src/test/java/dev/braintrust/trace/AttachmentUploaderTest.java new file mode 100644 index 00000000..5863d39f --- /dev/null +++ b/braintrust-sdk/src/test/java/dev/braintrust/trace/AttachmentUploaderTest.java @@ -0,0 +1,354 @@ +package dev.braintrust.trace; + +import static com.github.tomakehurst.wiremock.client.WireMock.*; +import static org.junit.jupiter.api.Assertions.*; + +import com.github.tomakehurst.wiremock.junit5.WireMockRuntimeInfo; +import com.github.tomakehurst.wiremock.junit5.WireMockTest; +import dev.braintrust.api.BraintrustOpenApiClient; +import dev.braintrust.config.BraintrustConfig; +import java.time.Duration; +import java.util.Map; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Nested; +import org.junit.jupiter.api.Test; + +@WireMockTest +public class AttachmentUploaderTest { + private AttachmentUploader.S3AttachmentUploader uploader; + private String baseUrl; + + @BeforeEach + void setUp(WireMockRuntimeInfo wmRuntimeInfo) { + baseUrl = wmRuntimeInfo.getHttpBaseUrl(); + var config = BraintrustConfig.builder().apiKey("test-api-key").apiUrl(baseUrl).build(); + var apiClient = BraintrustOpenApiClient.of(config); + uploader = + new AttachmentUploader.S3AttachmentUploader( + apiClient, Duration.ofMillis(10_000), 1, Duration.ofMillis(50)); + } + + @AfterEach + void tearDown() { + uploader.shutdown(Duration.ofSeconds(0)); + } + + private void stubLoginAndUploadFlow() { + stubFor( + post(urlEqualTo("/api/apikey/login")) + .willReturn( + aResponse() + .withStatus(200) + .withHeader("Content-Type", "application/json") + .withBody( + "{\"org_info\":[{\"id\":\"org-123\",\"name\":\"test-org\"}]}"))); + + stubFor( + post(urlEqualTo("/attachment")) + .willReturn( + aResponse() + .withStatus(200) + .withHeader("Content-Type", "application/json") + .withBody( + "{\"signedUrl\":\"" + + baseUrl + + "/upload\",\"headers\":{}}"))); + + stubFor(put(urlEqualTo("/upload")).willReturn(aResponse().withStatus(200))); + + stubFor(post(urlEqualTo("/attachment/status")).willReturn(aResponse().withStatus(200))); + } + + // ── Worker / queue integration tests ────────────────────────────── + + @Test + void enqueueUploadsSuccessfully() throws Exception { + stubLoginAndUploadFlow(); + + var ref = AttachmentReference.create("test.json", "application/json"); + uploader.enqueue(ref, "{\"key\":\"value\"}".getBytes()); + uploader.forceFlush(); + + verify( + postRequestedFor(urlEqualTo("/attachment")) + .withRequestBody(containing("\"key\":\"" + ref.key() + "\""))); + verify(putRequestedFor(urlEqualTo("/upload"))); + verify( + postRequestedFor(urlEqualTo("/attachment/status")) + .withRequestBody(containing("\"upload_status\":\"done\""))); + } + + @Test + void enqueueRejectsAfterShutdown() { + assertDoesNotThrow(() -> uploader.shutdown()); + + var ref = AttachmentReference.create("test.json", "application/json"); + assertFalse(uploader.enqueue(ref, "data".getBytes())); + } + + @Test + void uploadFailureShutsDownWorker() throws Exception { + stubFor( + post(urlEqualTo("/api/apikey/login")) + .willReturn( + aResponse() + .withStatus(200) + .withHeader("Content-Type", "application/json") + .withBody( + "{\"org_info\":[{\"id\":\"org-123\",\"name\":\"test-org\"}]}"))); + + stubFor( + post(urlEqualTo("/attachment")) + .willReturn( + aResponse() + .withStatus(200) + .withHeader("Content-Type", "application/json") + .withBody( + "{\"signedUrl\":\"" + + baseUrl + + "/upload\",\"headers\":{}}"))); + + stubFor( + put(urlEqualTo("/upload")) + .willReturn(aResponse().withStatus(500).withBody("Failed"))); + + stubFor(post(urlEqualTo("/attachment/status")).willReturn(aResponse().withStatus(200))); + + var ref = AttachmentReference.create("test.json", "application/json"); + assertTrue(uploader.enqueue(ref, "data".getBytes())); + // even errors should notify completion + uploader.forceFlush(Duration.ofSeconds(5)); + assertFalse(uploader.enqueue(ref, "data".getBytes())); + + verify( + postRequestedFor(urlEqualTo("/attachment/status")) + .withRequestBody(containing("\"upload_status\":\"error\"")) + .withRequestBody(containing("\"error_message\""))); + } + + // ── S3 HTTP-level tests ─────────────────────────────────────────── + + @Nested + class S3HttpOperations { + + @Test + void requestUploadUrlSendsCorrectRequest() throws Exception { + stubFor( + post(urlEqualTo("/attachment")) + .withHeader("Authorization", equalTo("Bearer test-api-key")) + .withHeader("Content-Type", equalTo("application/json")) + .willReturn( + aResponse() + .withStatus(200) + .withHeader("Content-Type", "application/json") + .withBody( + "{\"signedUrl\":\"" + + baseUrl + + "/upload\",\"headers\":{\"X-Custom\":\"value\"}}"))); + + var response = + uploader.requestUploadUrl("org-123", "key-1", "test.json", "application/json"); + + assertEquals(baseUrl + "/upload", response.signedUrl()); + assertEquals("value", response.headers().get("X-Custom")); + + var requests = findAll(postRequestedFor(urlEqualTo("/attachment"))); + assertEquals(1, requests.size()); + } + + @Test + void requestUploadUrlThrowsOnMissingSignedUrl() { + stubFor( + post(urlEqualTo("/attachment")) + .willReturn(aResponse().withStatus(200).withBody("{\"headers\":{}}"))); + + assertThrows( + java.io.IOException.class, + () -> + uploader.requestUploadUrl( + "org-123", "key-1", "test.json", "application/json")); + } + + @Test + void requestUploadUrlThrowsOnApiError() { + stubFor( + post(urlEqualTo("/attachment")) + .willReturn(aResponse().withStatus(500).withBody("Internal error"))); + + assertThrows( + java.io.IOException.class, + () -> + uploader.requestUploadUrl( + "org-123", "key-1", "test.json", "application/json")); + } + + @Test + void uploadToSignedUrlIncludesHeaders() throws Exception { + byte[] testData = "test data".getBytes(); + + stubFor( + put(urlEqualTo("/upload")) + .withHeader("X-Custom-Header", equalTo("custom-value")) + .willReturn(aResponse().withStatus(200))); + + uploader.uploadToSignedUrl( + baseUrl + "/upload", + Map.of("X-Custom-Header", "custom-value"), + "application/octet-stream", + testData); + } + + @Test + void uploadToSignedUrlThrowsOnUploadError() { + byte[] testData = "test data".getBytes(); + + stubFor( + put(urlEqualTo("/upload")) + .willReturn(aResponse().withStatus(500).withBody("Upload failed"))); + + assertThrows( + java.io.IOException.class, + () -> + uploader.uploadToSignedUrl( + baseUrl + "/upload", + Map.of(), + "application/octet-stream", + testData)); + } + + @Test + void updateUploadStatusSendsCorrectRequest() throws Exception { + stubFor( + post(urlEqualTo("/attachment/status")) + .withHeader("Authorization", equalTo("Bearer test-api-key")) + .withHeader("Content-Type", equalTo("application/json")) + .willReturn(aResponse().withStatus(200))); + + uploader.updateUploadStatus("org-123", "key-1", Map.of("upload_status", "done")); + + verify( + postRequestedFor(urlEqualTo("/attachment/status")) + .withRequestBody(containing("\"key\":\"key-1\"")) + .withRequestBody(containing("\"org_id\":\"org-123\"")) + .withRequestBody(containing("\"upload_status\":\"done\""))); + } + + @Test + void updateUploadStatusIncludesErrorMessage() throws Exception { + stubFor(post(urlEqualTo("/attachment/status")).willReturn(aResponse().withStatus(200))); + + uploader.updateUploadStatus( + "org-123", + "key-1", + Map.of("upload_status", "error", "error_message", "something went wrong")); + + verify( + postRequestedFor(urlEqualTo("/attachment/status")) + .withRequestBody( + containing("\"error_message\":\"something went wrong\""))); + } + + @Test + void updateUploadStatusThrowsOnApiError() { + stubFor( + post(urlEqualTo("/attachment/status")) + .willReturn(aResponse().withStatus(500).withBody("Error"))); + + assertThrows( + java.io.IOException.class, + () -> + uploader.updateUploadStatus( + "org-123", "key-1", Map.of("upload_status", "done"))); + } + + @Test + void uploadToNonAzureUrlDoesNotAddBlobHeader() throws Exception { + byte[] testData = "azure test".getBytes(); + + stubFor(put(urlEqualTo("/non-azure-upload")).willReturn(aResponse().withStatus(200))); + + uploader.uploadToSignedUrl( + baseUrl + "/non-azure-upload", Map.of(), "application/octet-stream", testData); + + // Non-Azure URL should NOT have the x-ms-blob-type header + verify( + putRequestedFor(urlEqualTo("/non-azure-upload")) + .withoutHeader("x-ms-blob-type")); + } + + @Test + void uploadUrlResponseDefaultsNullHeadersToEmptyMap() { + var response = + new AttachmentUploader.S3AttachmentUploader.UploadUrlResponse( + "https://example.com/upload", null); + + assertNotNull(response.headers()); + assertTrue(response.headers().isEmpty()); + } + + @Test + void requestUploadUrlDefaultsNullHeadersToEmptyMap() throws Exception { + stubFor( + post(urlEqualTo("/attachment")) + .willReturn( + aResponse() + .withStatus(200) + .withHeader("Content-Type", "application/json") + .withBody( + "{\"signedUrl\":\"" + baseUrl + "/upload\"}"))); + + var response = + uploader.requestUploadUrl("org-123", "key-1", "test.json", "application/json"); + + assertNotNull(response.headers()); + assertTrue(response.headers().isEmpty()); + } + + @Test + void retryOnServerError() throws Exception { + var config = BraintrustConfig.builder().apiKey("test-api-key").apiUrl(baseUrl).build(); + var apiClient = BraintrustOpenApiClient.of(config); + var retryUploader = + new AttachmentUploader.S3AttachmentUploader( + apiClient, Duration.ofSeconds(30), 2, Duration.ofMillis(100)); + + // First two requests fail with 500, third succeeds + stubFor( + post(urlEqualTo("/attachment")) + .inScenario("retry-test") + .whenScenarioStateIs("Started") + .willReturn(aResponse().withStatus(500).withBody("Error")) + .willSetStateTo("retry-1")); + + stubFor( + post(urlEqualTo("/attachment")) + .inScenario("retry-test") + .whenScenarioStateIs("retry-1") + .willReturn(aResponse().withStatus(500).withBody("Error")) + .willSetStateTo("retry-2")); + + stubFor( + post(urlEqualTo("/attachment")) + .inScenario("retry-test") + .whenScenarioStateIs("retry-2") + .willReturn( + aResponse() + .withStatus(200) + .withHeader("Content-Type", "application/json") + .withBody( + "{\"signedUrl\":\"" + + baseUrl + + "/upload\",\"headers\":{}}"))); + + var response = + retryUploader.requestUploadUrl( + "org-123", "key-1", "test.json", "application/json"); + + assertEquals(baseUrl + "/upload", response.signedUrl()); + verify(3, postRequestedFor(urlEqualTo("/attachment"))); + + retryUploader.shutdown(Duration.ofSeconds(0)); + } + } +} diff --git a/test-harness/src/testFixtures/java/dev/braintrust/TestHarness.java b/test-harness/src/testFixtures/java/dev/braintrust/TestHarness.java index 534045c4..16523b24 100644 --- a/test-harness/src/testFixtures/java/dev/braintrust/TestHarness.java +++ b/test-harness/src/testFixtures/java/dev/braintrust/TestHarness.java @@ -35,7 +35,7 @@ import dev.braintrust.openapi.model.PromptParserNullish; import dev.braintrust.openapi.model.SystemContent; import dev.braintrust.openapi.model.UserContent; -import dev.braintrust.trace.UnitTestShutdownHook; +import dev.braintrust.trace.HarnessShim; import io.opentelemetry.api.GlobalOpenTelemetry; import io.opentelemetry.api.baggage.propagation.W3CBaggagePropagator; import io.opentelemetry.api.trace.propagation.W3CTraceContextPropagator; @@ -110,7 +110,7 @@ public class TestHarness { "bedrock"), apiKeysToNeverRecord); vcr.start(); - UnitTestShutdownHook.addShutdownHook(1, vcr::stop); + HarnessShim.addShutdownHook(vcr::stop); { // set up default project if needed var harness = setup(); var client = harness.braintrust.openApiClient(); @@ -176,9 +176,15 @@ private TestHarness(@Nonnull Braintrust braintrust) { this.spanExporter = new UnitTestSpanExporter(); var loggerBuilder = SdkLoggerProvider.builder(); var meterBuilder = SdkMeterProvider.builder(); - braintrust.openTelemetryEnable(tracerBuilder, loggerBuilder, meterBuilder); - // Add the in-memory span exporter for testing - tracerBuilder.addSpanProcessor(SimpleSpanProcessor.create(this.spanExporter)); + // Wire the in-memory span exporter as an additional delegate inside the + // BraintrustSpanProcessor so it sees post-processed spans (attachment references + // instead of raw base64 data URIs, etc.). + dev.braintrust.trace.HarnessShim.enableTracing( + braintrust.config(), + tracerBuilder, + List.of(SimpleSpanProcessor.create(this.spanExporter)), + loggerBuilder, + meterBuilder); var contextPropagator = ContextPropagators.create( TextMapPropagator.composite( diff --git a/test-harness/src/testFixtures/java/dev/braintrust/trace/HarnessShim.java b/test-harness/src/testFixtures/java/dev/braintrust/trace/HarnessShim.java new file mode 100644 index 00000000..1d273c18 --- /dev/null +++ b/test-harness/src/testFixtures/java/dev/braintrust/trace/HarnessShim.java @@ -0,0 +1,33 @@ +package dev.braintrust.trace; + +import dev.braintrust.config.BraintrustConfig; +import io.opentelemetry.sdk.logs.SdkLoggerProviderBuilder; +import io.opentelemetry.sdk.metrics.SdkMeterProviderBuilder; +import io.opentelemetry.sdk.trace.SdkTracerProviderBuilder; +import io.opentelemetry.sdk.trace.SpanProcessor; +import java.util.List; + +public class HarnessShim { + public static void addShutdownHook(Runnable target) { + BraintrustShutdownHook.addShutdownHook( + BraintrustShutdownHook.ShutdownOrder.TEST_HARNESS, target); + } + + /** + * Enable Braintrust tracing with additional span processors composited into the {@link + * BraintrustSpanProcessor}'s delegate chain, so they see post-processed spans. + */ + public static void enableTracing( + BraintrustConfig config, + SdkTracerProviderBuilder tracerProviderBuilder, + List additionalDelegates, + SdkLoggerProviderBuilder loggerProviderBuilder, + SdkMeterProviderBuilder meterProviderBuilder) { + BraintrustTracing.enable( + config, + tracerProviderBuilder, + additionalDelegates, + loggerProviderBuilder, + meterProviderBuilder); + } +} diff --git a/test-harness/src/testFixtures/java/dev/braintrust/trace/UnitTestShutdownHook.java b/test-harness/src/testFixtures/java/dev/braintrust/trace/UnitTestShutdownHook.java deleted file mode 100644 index 79f59502..00000000 --- a/test-harness/src/testFixtures/java/dev/braintrust/trace/UnitTestShutdownHook.java +++ /dev/null @@ -1,11 +0,0 @@ -package dev.braintrust.trace; - -public class UnitTestShutdownHook { - public static void addShutdownHook(Runnable target) { - addShutdownHook(0, target); - } - - public static void addShutdownHook(int order, Runnable target) { - BraintrustShutdownHook.addShutdownHook(order, target); - } -}