diff --git a/src/main/java/land/oras/ContainerRef.java b/src/main/java/land/oras/ContainerRef.java index a4a2e062..2b929ec6 100644 --- a/src/main/java/land/oras/ContainerRef.java +++ b/src/main/java/land/oras/ContainerRef.java @@ -325,6 +325,15 @@ public String getBlobsUploadDigestPath(Registry registry) { return "%s/blobs/uploads/?digest=%s".formatted(getApiPrefix(registry), digest); } + /** + * Return the blobs upload URL for POST upload to get the upload location + * @param registry The registry + * @return The blobs upload URL + */ + public String getBlobsUploadPath(Registry registry) { + return "%s/blobs/uploads/".formatted(getApiPrefix(registry)); + } + /** * Return the blobs URL * @param registry The registry diff --git a/src/main/java/land/oras/CopyUtils.java b/src/main/java/land/oras/CopyUtils.java index da18b32d..9e39f293 100644 --- a/src/main/java/land/oras/CopyUtils.java +++ b/src/main/java/land/oras/CopyUtils.java @@ -22,6 +22,7 @@ import java.io.IOException; import java.io.InputStream; +import java.util.Objects; import land.oras.exception.OrasException; import org.jspecify.annotations.NonNull; import org.slf4j.Logger; @@ -73,10 +74,14 @@ void copy( // Write all layer for (Layer layer : source.collectLayers(sourceRef, contentType, true)) { - try (InputStream is = source.fetchBlob(sourceRef.withDigest(layer.getDigest()))) { - target.pushBlob(targetRef.withDigest(layer.getDigest()), is); - LOG.debug("Copied layer {}", layer.getDigest()); - } + Objects.requireNonNull(layer.getDigest(), "Layer digest is required for streaming copy"); + Objects.requireNonNull(layer.getSize(), "Layer size is required for streaming copy"); + target.pushBlob( + targetRef.withDigest(layer.getDigest()), + layer.getSize(), + () -> source.fetchBlob(sourceRef.withDigest(layer.getDigest())), + layer.getAnnotations()); + LOG.debug("Copied layer {}", layer.getDigest()); } // Single manifest diff --git a/src/main/java/land/oras/OCI.java b/src/main/java/land/oras/OCI.java index 5494dc04..96901f16 100644 --- a/src/main/java/land/oras/OCI.java +++ b/src/main/java/land/oras/OCI.java @@ -31,6 +31,7 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.function.Supplier; import land.oras.exception.OrasException; import land.oras.utils.ArchiveUtils; import land.oras.utils.Const; @@ -375,6 +376,16 @@ public abstract Manifest pushArtifact( */ public abstract Layer pushBlob(T ref, Path blob, Map annotations); + /** + * Push a blob from input stream with known digest and size + * @param ref The container ref with digest + * @param size The size of the blob + * @param stream The input stream of the blob + * @param annotations The annotations + * @return The layer + */ + public abstract Layer pushBlob(T ref, long size, Supplier stream, Map annotations); + /** * Push the blob for the given layer * @param ref The container ref diff --git a/src/main/java/land/oras/OCILayout.java b/src/main/java/land/oras/OCILayout.java index 81785041..847ce708 100644 --- a/src/main/java/land/oras/OCILayout.java +++ b/src/main/java/land/oras/OCILayout.java @@ -29,6 +29,7 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.function.Supplier; import land.oras.exception.OrasException; import land.oras.utils.Const; import land.oras.utils.JsonUtils; @@ -280,6 +281,33 @@ public Layer pushBlob(LayoutRef ref, Path blob, Map annotations) } } + @Override + public Layer pushBlob(LayoutRef ref, long size, Supplier stream, Map annotations) { + String digest = ref.getTag(); + if (digest == null) { + throw new OrasException("Digest is required to push blob to layout"); + } + boolean isDigest = SupportedAlgorithm.isSupported(digest); + if (!isDigest) { + throw new OrasException("Unsupported digest: %s".formatted(digest)); + } + ensureAlgorithmPath(digest); + try { + Path blobPath = getBlobPath(ref); + if (Files.exists(blobPath)) { + LOG.info("Blob already exists: {}", digest); + return Layer.fromFile(blobPath, ref.getAlgorithm()).withAnnotations(annotations); + } + try (InputStream is = stream.get()) { + Files.copy(is, blobPath); + } + ensureDigest(ref, blobPath); + return Layer.fromFile(blobPath, ref.getAlgorithm()).withAnnotations(annotations); + } catch (IOException e) { + throw new OrasException("Failed to push blob", e); + } + } + @Override public Layer pushBlob(LayoutRef ref, byte[] data) { try { diff --git a/src/main/java/land/oras/Registry.java b/src/main/java/land/oras/Registry.java index 87d1acaf..9a89fa1d 100644 --- a/src/main/java/land/oras/Registry.java +++ b/src/main/java/land/oras/Registry.java @@ -31,6 +31,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.function.Supplier; import land.oras.auth.AuthProvider; import land.oras.auth.AuthStoreAuthenticationProvider; import land.oras.auth.HttpClient; @@ -467,6 +468,58 @@ public Layer pushBlob(ContainerRef containerRef, Path blob, Map return Layer.fromFile(blob, containerRef.getAlgorithm()).withAnnotations(annotations); } + @Override + public Layer pushBlob(ContainerRef ref, long size, Supplier stream, Map annotations) { + String digest = ref.getDigest(); + if (digest == null) { + throw new OrasException("Digest is required to push blob with stream"); + } + ContainerRef containerRef = ref.forRegistry(this).checkBlocked(this); + if (containerRef.isInsecure(this) && !this.isInsecure()) { + return asInsecure().pushBlob(ref, size, stream, annotations); + } + if (hasBlob(containerRef)) { + LOG.info("Blob already exists: {}", digest); + return Layer.fromDigest(digest, size).withAnnotations(annotations); + } + // Empty post without digest + URI uri = URI.create("%s://%s".formatted(getScheme(), containerRef.getBlobsUploadPath(this))); + HttpClient.ResponseWrapper response = client.post( + uri, + new byte[0], + Map.of(Const.CONTENT_TYPE_HEADER, Const.APPLICATION_OCTET_STREAM_HEADER_VALUE), + Scopes.of(this, containerRef), + authProvider); + logResponse(response); + if (response.statusCode() != 202) { + throw new OrasException("Failed to initiate blob upload: %s".formatted(response.response())); + } + String location = response.headers().get(Const.LOCATION_HEADER.toLowerCase()); + // Ensure location is absolute URI + if (!location.startsWith("http") && !location.startsWith("https")) { + location = "%s://%s/%s".formatted(getScheme(), ref.getApiRegistry(this), location.replaceFirst("^/", "")); + } + LOG.debug("Location header: {}", location); + + URI uploadURI = createLocationWithDigest(location, digest); + + response = client.upload( + uploadURI, + size, + Map.of(Const.CONTENT_TYPE_HEADER, Const.APPLICATION_OCTET_STREAM_HEADER_VALUE), + stream, + Scopes.of(this, containerRef), + authProvider); + logResponse(response); + if (response.statusCode() == 201) { + LOG.debug("Successful push: {}", response.response()); + } else { + throw new OrasException("Failed to push layer: %s".formatted(response.response())); + } + handleError(response); + return Layer.fromDigest(digest, size).withAnnotations(annotations); + } + @Override public Layer pushBlob(ContainerRef containerRef, byte[] data) { String digest = containerRef.getAlgorithm().digest(data); diff --git a/src/main/java/land/oras/auth/HttpClient.java b/src/main/java/land/oras/auth/HttpClient.java index 1a231b8b..8938576f 100644 --- a/src/main/java/land/oras/auth/HttpClient.java +++ b/src/main/java/land/oras/auth/HttpClient.java @@ -35,6 +35,7 @@ import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.function.Supplier; import java.util.regex.Matcher; import java.util.regex.Pattern; import java.util.stream.Collectors; @@ -234,6 +235,35 @@ public ResponseWrapper upload( } } + /** + * Upload from an input stream. + * @param uri The URI + * @param size The size of the input stream + * @param headers The headers + * @param stream The input stream + * @param scopes The scopes + * @param authProvider The authentication provider + * @return The response + */ + public ResponseWrapper upload( + URI uri, + long size, + Map headers, + Supplier stream, + Scopes scopes, + AuthProvider authProvider) { + return executeRequest( + "PUT", + uri, + true, + headers, + new byte[0], + HttpResponse.BodyHandlers.ofString(), + HttpRequest.BodyPublishers.fromPublisher(HttpRequest.BodyPublishers.ofInputStream(stream), size), + scopes, + authProvider); + } + /** * Perform a HEAD request * @param uri The URI @@ -296,7 +326,7 @@ public ResponseWrapper post( headers, body, HttpResponse.BodyHandlers.ofString(), - HttpRequest.BodyPublishers.ofByteArray(body), + body.length == 0 ? HttpRequest.BodyPublishers.noBody() : HttpRequest.BodyPublishers.ofByteArray(body), scopes, authProvider); } diff --git a/src/test/java/land/oras/ContainerRefTest.java b/src/test/java/land/oras/ContainerRefTest.java index 1b8cc41d..5cd0879c 100644 --- a/src/test/java/land/oras/ContainerRefTest.java +++ b/src/test/java/land/oras/ContainerRefTest.java @@ -37,6 +37,7 @@ class ContainerRefTest { @Test + @Execution(ExecutionMode.SAME_THREAD) void shouldReadRegistriesConfig(@TempDir Path homeDir) throws Exception { // language=toml String config = @@ -77,6 +78,7 @@ void shouldReadRegistriesConfig(@TempDir Path homeDir) throws Exception { } @Test + @Execution(ExecutionMode.SAME_THREAD) void shouldDetermineFromAlias(@TempDir Path homeDir) throws Exception { // language=toml @@ -99,6 +101,7 @@ void shouldDetermineFromAlias(@TempDir Path homeDir) throws Exception { } @Test + @Execution(ExecutionMode.SAME_THREAD) void shouldRewriteAllSubdomainToLocalProxy(@TempDir Path homeDir) throws Exception { // language=toml @@ -145,6 +148,7 @@ void shouldRewriteAllSubdomainToLocalProxy(@TempDir Path homeDir) throws Excepti } @Test + @Execution(ExecutionMode.SAME_THREAD) void shouldDetermineEffectiveRegistry(@TempDir Path homeDir) throws Exception { // Use from container ref diff --git a/src/test/java/land/oras/GitHubContainerRegistryITCase.java b/src/test/java/land/oras/GitHubContainerRegistryITCase.java index f493262d..b6361597 100644 --- a/src/test/java/land/oras/GitHubContainerRegistryITCase.java +++ b/src/test/java/land/oras/GitHubContainerRegistryITCase.java @@ -36,9 +36,6 @@ class GitHubContainerRegistryITCase { @TempDir Path tempDir; - @TempDir - private static Path homeDir; - @Test void shouldPullIndex() { Registry registry = Registry.builder().build(); @@ -48,7 +45,8 @@ void shouldPullIndex() { } @Test - void shouldPullIndexWithAlias() throws Exception { + @Execution(ExecutionMode.SAME_THREAD) + void shouldPullIndexWithAlias(@TempDir Path homeDir) throws Exception { // language=toml String config = """ @@ -57,7 +55,7 @@ void shouldPullIndexWithAlias() throws Exception { """; // Setup - TestUtils.createRegistriesConfFile(tempDir, config); + TestUtils.createRegistriesConfFile(homeDir, config); TestUtils.withHome(homeDir, () -> { Registry registry = Registry.builder().defaults().build(); diff --git a/src/test/java/land/oras/OCILayoutTest.java b/src/test/java/land/oras/OCILayoutTest.java index b52eb45d..09347326 100644 --- a/src/test/java/land/oras/OCILayoutTest.java +++ b/src/test/java/land/oras/OCILayoutTest.java @@ -618,6 +618,30 @@ void shouldPushBlob() throws IOException { assertBlobContent(path, digest, "hi"); } + @Test + void shouldFailToPushBlobViaStreamWithoutDigest() { + Path path = layoutPath.resolve("shouldFailToPushBlobViaStreamWithoutDigest"); + byte[] content = "hi".getBytes(StandardCharsets.UTF_8); + OCILayout ociLayout = OCILayout.Builder.builder().defaults(path).build(); + LayoutRef layoutRef = LayoutRef.parse("test"); + OrasException e = assertThrows(OrasException.class, () -> { + ociLayout.pushBlob(layoutRef, content.length, () -> InputStream.nullInputStream(), Map.of()); + }); + assertEquals("Digest is required to push blob to layout", e.getMessage()); + } + + @Test + void shouldFailToPushBlobViaStreamWithInvalidDigest() { + Path path = layoutPath.resolve("shouldFailToPushBlobViaStreamWithInvalidDigest"); + byte[] content = "hi".getBytes(StandardCharsets.UTF_8); + OCILayout ociLayout = OCILayout.Builder.builder().defaults(path).build(); + LayoutRef layoutRef = LayoutRef.parse("test:1234"); + OrasException e = assertThrows(OrasException.class, () -> { + ociLayout.pushBlob(layoutRef, content.length, () -> InputStream.nullInputStream(), Map.of()); + }); + assertEquals("Unsupported digest: 1234", e.getMessage()); + } + @Test void cannotPushBlobWithoutTagOrDigest() throws IOException { diff --git a/src/test/java/land/oras/PublicECRITCase.java b/src/test/java/land/oras/PublicECRITCase.java index a478c90f..107cb20d 100644 --- a/src/test/java/land/oras/PublicECRITCase.java +++ b/src/test/java/land/oras/PublicECRITCase.java @@ -31,7 +31,7 @@ import org.junit.jupiter.api.parallel.ExecutionMode; import uk.org.webcompere.systemstubs.environment.EnvironmentVariables; -@Execution(ExecutionMode.SAME_THREAD) // Avoid 429 Too Many Requests for unauthenticated requests to public ECR +@Execution(ExecutionMode.SAME_THREAD) class PublicECRITCase { @TempDir diff --git a/src/test/java/land/oras/RegistryTest.java b/src/test/java/land/oras/RegistryTest.java index d780c94f..4ed2d9d8 100644 --- a/src/test/java/land/oras/RegistryTest.java +++ b/src/test/java/land/oras/RegistryTest.java @@ -22,6 +22,7 @@ import static org.junit.jupiter.api.Assertions.*; +import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStream; import java.nio.charset.StandardCharsets; @@ -71,6 +72,7 @@ void before() { } @Test + @Execution(ExecutionMode.SAME_THREAD) void shouldThrowIfUnableToFindOnAnyUnQualifiedSearchRegistry(@TempDir Path homeDir) throws Exception { // language=toml @@ -103,6 +105,7 @@ void shouldListRepositories() { } @Test + @Execution(ExecutionMode.SAME_THREAD) void shouldListRepositoriesInsecure(@TempDir Path homeDir) throws Exception { // language=toml @@ -131,12 +134,42 @@ void shouldFailToPushBlobForInvalidDigest() { .build(); ContainerRef containerRef1 = ContainerRef.parse( "library/artifact-text@sha256:2cf24dba5fb0a30e26e83b2ac5b9e29e1b161e5c1fa7425e73043362938b9824"); - // Ensure the blob is deleted assertThrows(OrasException.class, () -> { registry.pushBlob(containerRef1, "invalid".getBytes()); }); } + @Test + void shouldFailToPushBlobWithMissingDigestViaStream() { + Registry registry = Registry.builder() + .insecure(this.registry.getRegistry(), "myuser", "mypass") + .build(); + ContainerRef containerRef = ContainerRef.parse("library/artifact-text:latest"); + OrasException e = assertThrows(OrasException.class, () -> { + registry.pushBlob( + containerRef, + 10L, + () -> new ByteArrayInputStream("foo".getBytes(StandardCharsets.UTF_8)), + Map.of()); + }); + assertEquals("Digest is required to push blob with stream", e.getMessage()); + } + + @Test + void shouldPushBlobWithDigestViaStream() { + Registry registry = Registry.builder() + .insecure(this.registry.getRegistry(), "myuser", "mypass") + .build(); + byte[] content = "foo".getBytes(StandardCharsets.UTF_8); + String digest = SupportedAlgorithm.SHA512.digest(content); + long size = content.length; + InputStream stream = new ByteArrayInputStream(content); + ContainerRef containerRef = + ContainerRef.parse("library/artifact-blob-stream").withDigest(digest); + registry.pushBlob(containerRef, size, () -> stream, Map.of()); + registry.pushBlob(containerRef, size, () -> stream, Map.of()); + } + @Test void shouldPushAndGetBlobThenDeleteWithSha256() { Registry registry = Registry.Builder.builder() @@ -211,6 +244,7 @@ void shouldPushUnsecure() { } @Test + @Execution(ExecutionMode.SAME_THREAD) void shouldPushPullManifestsAndBlobsByUsingConfig(@TempDir Path homeDir) throws Exception { // language=toml @@ -408,6 +442,7 @@ void shouldPushManifest() { } @Test + @Execution(ExecutionMode.SAME_THREAD) void shouldPushManifestWithRegistryConfig(@TempDir Path homeDir) throws Exception { // language=toml @@ -447,6 +482,7 @@ void shouldPushManifestWithRegistryConfig(@TempDir Path homeDir) throws Exceptio } @Test + @Execution(ExecutionMode.SAME_THREAD) void shouldDetermineRegistryFromAlias(@TempDir Path homeDir) throws Exception { // language=toml @@ -604,6 +640,7 @@ void shouldPushComplexArtifactWithConfigArtifactType() throws IOException { } @Test + @Execution(ExecutionMode.SAME_THREAD) void shouldListReferrers(@TempDir Path homeDir) throws Exception { Registry registry = Registry.Builder.builder() .defaults("myuser", "mypass") diff --git a/src/test/java/land/oras/TestUtils.java b/src/test/java/land/oras/TestUtils.java index 620047ff..9ed005dc 100644 --- a/src/test/java/land/oras/TestUtils.java +++ b/src/test/java/land/oras/TestUtils.java @@ -51,7 +51,7 @@ public static void createRegistriesConfFile(Path homeDir, String content) { * @param action the action to execute with the HOME environment variable set * @throws Exception if any exception occurs during the execution of the action */ - public static void withHome(Path homeDir, Runnable action) throws Exception, IOException { + public static void withHome(Path homeDir, Runnable action) throws Exception { new EnvironmentVariables() .set("HOME", homeDir.toAbsolutePath().toString()) .execute(() -> { diff --git a/src/test/java/land/oras/auth/RegistriesConfTest.java b/src/test/java/land/oras/auth/RegistriesConfTest.java index af2d7fb1..35c90a1b 100644 --- a/src/test/java/land/oras/auth/RegistriesConfTest.java +++ b/src/test/java/land/oras/auth/RegistriesConfTest.java @@ -33,7 +33,7 @@ /** * Test class of {@link RegistriesConf}. */ -@Execution(ExecutionMode.CONCURRENT) +@Execution(ExecutionMode.SAME_THREAD) class RegistriesConfTest { @TempDir