|
31 | 31 | import java.util.HashMap; |
32 | 32 | import java.util.List; |
33 | 33 | import java.util.Map; |
| 34 | +import java.util.function.Supplier; |
34 | 35 | import land.oras.auth.AuthProvider; |
35 | 36 | import land.oras.auth.AuthStoreAuthenticationProvider; |
36 | 37 | import land.oras.auth.HttpClient; |
@@ -467,6 +468,58 @@ public Layer pushBlob(ContainerRef containerRef, Path blob, Map<String, String> |
467 | 468 | return Layer.fromFile(blob, containerRef.getAlgorithm()).withAnnotations(annotations); |
468 | 469 | } |
469 | 470 |
|
| 471 | + @Override |
| 472 | + public Layer pushBlob(ContainerRef ref, long size, Supplier<InputStream> stream, Map<String, String> annotations) { |
| 473 | + String digest = ref.getDigest(); |
| 474 | + if (digest == null) { |
| 475 | + throw new OrasException("Digest is required to push blob with stream"); |
| 476 | + } |
| 477 | + ContainerRef containerRef = ref.forRegistry(this).checkBlocked(this); |
| 478 | + if (containerRef.isInsecure(this) && !this.isInsecure()) { |
| 479 | + return asInsecure().pushBlob(ref, size, stream, annotations); |
| 480 | + } |
| 481 | + if (hasBlob(containerRef)) { |
| 482 | + LOG.info("Blob already exists: {}", digest); |
| 483 | + return Layer.fromDigest(digest, size).withAnnotations(annotations); |
| 484 | + } |
| 485 | + // Empty post without digest |
| 486 | + URI uri = URI.create("%s://%s".formatted(getScheme(), containerRef.getBlobsUploadPath(this))); |
| 487 | + HttpClient.ResponseWrapper<String> response = client.post( |
| 488 | + uri, |
| 489 | + new byte[0], |
| 490 | + Map.of(Const.CONTENT_TYPE_HEADER, Const.APPLICATION_OCTET_STREAM_HEADER_VALUE), |
| 491 | + Scopes.of(this, containerRef), |
| 492 | + authProvider); |
| 493 | + logResponse(response); |
| 494 | + if (response.statusCode() != 202) { |
| 495 | + throw new OrasException("Failed to initiate blob upload: %s".formatted(response.response())); |
| 496 | + } |
| 497 | + String location = response.headers().get(Const.LOCATION_HEADER.toLowerCase()); |
| 498 | + // Ensure location is absolute URI |
| 499 | + if (!location.startsWith("http") && !location.startsWith("https")) { |
| 500 | + location = "%s://%s/%s".formatted(getScheme(), ref.getApiRegistry(this), location.replaceFirst("^/", "")); |
| 501 | + } |
| 502 | + LOG.debug("Location header: {}", location); |
| 503 | + |
| 504 | + URI uploadURI = createLocationWithDigest(location, digest); |
| 505 | + |
| 506 | + response = client.upload( |
| 507 | + uploadURI, |
| 508 | + size, |
| 509 | + Map.of(Const.CONTENT_TYPE_HEADER, Const.APPLICATION_OCTET_STREAM_HEADER_VALUE), |
| 510 | + stream, |
| 511 | + Scopes.of(this, containerRef), |
| 512 | + authProvider); |
| 513 | + logResponse(response); |
| 514 | + if (response.statusCode() == 201) { |
| 515 | + LOG.debug("Successful push: {}", response.response()); |
| 516 | + } else { |
| 517 | + throw new OrasException("Failed to push layer: %s".formatted(response.response())); |
| 518 | + } |
| 519 | + handleError(response); |
| 520 | + return Layer.fromDigest(digest, size).withAnnotations(annotations); |
| 521 | + } |
| 522 | + |
470 | 523 | @Override |
471 | 524 | public Layer pushBlob(ContainerRef containerRef, byte[] data) { |
472 | 525 | String digest = containerRef.getAlgorithm().digest(data); |
|
0 commit comments