Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,83 @@ default <R> Optional<R> getSecondaryResource(Class<R> expectedType) {
return getSecondaryResource(expectedType, null);
}

<R> Set<R> getSecondaryResources(Class<R> expectedType);
/**
* Retrieves a {@link Set} of the secondary resources of the specified type, which are associated
* with the primary resource being processed, possibly making sure that only the latest version of
* each resource is retrieved.
*
* <p>Note: While this method returns a {@link Set}, it is possible to get several copies of a
* given resource albeit all with different {@code resourceVersion}. If you want to avoid this
* situation, call {@link #getSecondaryResources(Class, boolean)} with the {@code deduplicate}
* parameter set to {@code true}.
*
* @param expectedType a class representing the type of secondary resources to retrieve
* @param <R> the type of secondary resources to retrieve
* @return a {@link Stream} of secondary resources of the specified type, possibly deduplicated
*/
default <R> Set<R> getSecondaryResources(Class<R> expectedType) {
return getSecondaryResources(expectedType, false);
}

/**
* Retrieves a {@link Set} of the secondary resources of the specified type, which are associated
* with the primary resource being processed, possibly making sure that only the latest version of
* each resource is retrieved.
*
* <p>Note: While this method returns a {@link Set}, it is possible to get several copies of a
* given resource albeit all with different {@code resourceVersion}. If you want to avoid this
* situation, ask for the deduplicated version by setting the {@code deduplicate} parameter to
* {@code true}.
*
* @param expectedType a class representing the type of secondary resources to retrieve
* @param deduplicate {@code true} if only the latest version of each resource should be kept,
* {@code false} otherwise
* @param <R> the type of secondary resources to retrieve
* @return a {@link Set} of secondary resources of the specified type, possibly deduplicated
* @throws IllegalArgumentException if the secondary resource type cannot be deduplicated because
* it's not extending {@link HasMetadata}, which is required to access the resource version
* @since 5.3.0
*/
<R> Set<R> getSecondaryResources(Class<R> expectedType, boolean deduplicate);

/**
* Retrieves a {@link Stream} of the secondary resources of the specified type, which are
* associated with the primary resource being processed, possibly making sure that only the latest
* version of each resource is retrieved.
*
* <p>Note: It is possible to get several copies of a given resource albeit all with different
* {@code resourceVersion}. If you want to avoid this situation, call {@link
* #getSecondaryResourcesAsStream(Class, boolean)} with the {@code deduplicate} parameter set to
* {@code true}.
*
* @param expectedType a class representing the type of secondary resources to retrieve
* @param <R> the type of secondary resources to retrieve
* @return a {@link Stream} of secondary resources of the specified type, possibly deduplicated
*/
default <R> Stream<R> getSecondaryResourcesAsStream(Class<R> expectedType) {
return getSecondaryResources(expectedType).stream();
return getSecondaryResourcesAsStream(expectedType, false);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here we could use the former implementation and call this in case in the new method the deduplication is false.

}

/**
* Retrieves a {@link Stream} of the secondary resources of the specified type, which are
* associated with the primary resource being processed, possibly making sure that only the latest
* version of each resource is retrieved.
*
* <p>Note: It is possible to get several copies of a given resource albeit all with different
* {@code resourceVersion}. If you want to avoid this situation, ask for the deduplicated version
* by setting the {@code deduplicate} parameter to {@code true}.
*
* @param expectedType a class representing the type of secondary resources to retrieve
* @param deduplicate {@code true} if only the latest version of each resource should be kept,
* {@code false} otherwise
* @param <R> the type of secondary resources to retrieve
* @return a {@link Stream} of secondary resources of the specified type, possibly deduplicated
* @throws IllegalArgumentException if the secondary resource type cannot be deduplicated because
* it's not extending {@link HasMetadata}, which is required to access the resource version
* @since 5.3.0
*/
<R> Stream<R> getSecondaryResourcesAsStream(Class<R> expectedType, boolean deduplicate);

<R> Optional<R> getSecondaryResource(Class<R> expectedType, String eventSourceName);

ControllerConfiguration<P> getControllerConfiguration();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package io.javaoperatorsdk.operator.api.reconciler;

import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
Expand All @@ -26,6 +27,7 @@

import io.fabric8.kubernetes.api.model.HasMetadata;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.javaoperatorsdk.operator.ReconcilerUtilsInternal;
import io.javaoperatorsdk.operator.api.config.ControllerConfiguration;
import io.javaoperatorsdk.operator.api.reconciler.dependent.DependentResource;
import io.javaoperatorsdk.operator.api.reconciler.dependent.managed.DefaultManagedWorkflowAndDependentResourceContext;
Expand All @@ -36,7 +38,6 @@
import io.javaoperatorsdk.operator.processing.event.ResourceID;

public class DefaultContext<P extends HasMetadata> implements Context<P> {

private RetryInfo retryInfo;
private final Controller<P> controller;
private final P primaryResource;
Expand Down Expand Up @@ -71,15 +72,44 @@ public Optional<RetryInfo> getRetryInfo() {
}

@Override
public <T> Set<T> getSecondaryResources(Class<T> expectedType) {
public <T> Set<T> getSecondaryResources(Class<T> expectedType, boolean deduplicate) {
if (deduplicate) {
final var deduplicatedMap = deduplicatedMap(getSecondaryResourcesAsStream(expectedType));
return new HashSet<>(deduplicatedMap.values());
}
return getSecondaryResourcesAsStream(expectedType).collect(Collectors.toSet());
}

@Override
public <R> Stream<R> getSecondaryResourcesAsStream(Class<R> expectedType) {
return controller.getEventSourceManager().getEventSourcesFor(expectedType).stream()
.map(es -> es.getSecondaryResources(primaryResource))
.flatMap(Set::stream);
public <R> Stream<R> getSecondaryResourcesAsStream(Class<R> expectedType, boolean deduplicate) {
final var stream =
controller.getEventSourceManager().getEventSourcesFor(expectedType).stream()
.<R>mapMulti(
(es, consumer) -> es.getSecondaryResources(primaryResource).forEach(consumer));
if (deduplicate) {
if (!HasMetadata.class.isAssignableFrom(expectedType)) {
throw new IllegalArgumentException("Can only de-duplicate HasMetadata descendants");
}
return deduplicatedMap(stream).values().stream();
} else {
return stream;
}
}

private <R> Map<ResourceID, R> deduplicatedMap(Stream<R> stream) {
return stream.collect(
Collectors.toUnmodifiableMap(
DefaultContext::resourceID,
Function.identity(),
(existing, replacement) ->
compareResourceVersions(existing, replacement) >= 0 ? existing : replacement));
}

private static ResourceID resourceID(Object hasMetadata) {
return ResourceID.fromResource((HasMetadata) hasMetadata);
}

private static int compareResourceVersions(Object v1, Object v2) {
return ReconcilerUtilsInternal.compareResourceVersions((HasMetadata) v1, (HasMetadata) v2);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -535,13 +535,13 @@ public <R extends HasMetadata> R resourcePatch(R resource, UnaryOperator<R> upda
if (esList.isEmpty()) {
throw new IllegalStateException("No event source found for type: " + resource.getClass());
}
var es = esList.get(0);
if (esList.size() > 1) {
throw new IllegalStateException(
"Multiple event sources found for: "
+ resource.getClass()
+ " please provide the target event source");
log.warn(
"Multiple event sources found for type: {}, selecting first with name {}",
resource.getClass(),
es.name());
}
var es = esList.get(0);
if (es instanceof ManagedInformerEventSource mes) {
return resourcePatch(resource, updateOperation, (ManagedInformerEventSource<R, P, ?>) mes);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,26 @@ public boolean equals(Object o) {
}

public boolean isSameResource(HasMetadata hasMetadata) {
if (hasMetadata == null) {
return false;
}
final var metadata = hasMetadata.getMetadata();
return getName().equals(metadata.getName())
&& getNamespace().map(ns -> ns.equals(metadata.getNamespace())).orElse(true);
return isSameResource(metadata.getName(), metadata.getNamespace());
}

/**
* Whether this ResourceID points to the same resource as the one identified by the specified name
* and namespace. Note that this doesn't take API version or Kind into account so this should only
* be used when checking resources that are reasonably expected to be of the same type.
*
* @param name the name of the resource we want to check
* @param namespace the possibly {@code null} namespace of the resource we want to check
* @return {@code true} if this resource points to the same resource as the one pointed to by the
* specified name and namespace, {@code false} otherwise
* @since 5.3.0
*/
public boolean isSameResource(String name, String namespace) {
return Objects.equals(this.name, name) && Objects.equals(this.namespace, namespace);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,8 @@ public Set<R> getSecondaryResources(P primary) {
}
return secondaryIDs.stream()
.map(this::get)
.flatMap(Optional::stream)
.filter(Optional::isPresent)
.map(Optional::get)
.collect(Collectors.toSet());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,23 @@
*/
package io.javaoperatorsdk.operator.api.reconciler;

import java.util.List;
import java.util.Set;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import io.fabric8.kubernetes.api.model.ConfigMap;
import io.fabric8.kubernetes.api.model.HasMetadata;
import io.fabric8.kubernetes.api.model.ObjectMetaBuilder;
import io.fabric8.kubernetes.api.model.Pod;
import io.fabric8.kubernetes.api.model.PodBuilder;
import io.fabric8.kubernetes.api.model.Secret;
import io.javaoperatorsdk.operator.processing.Controller;
import io.javaoperatorsdk.operator.processing.event.EventSourceManager;
import io.javaoperatorsdk.operator.processing.event.NoEventSourceForClassException;
import io.javaoperatorsdk.operator.processing.event.ResourceID;
import io.javaoperatorsdk.operator.processing.event.source.EventSource;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
Expand All @@ -30,17 +40,21 @@

class DefaultContextTest {

private final Secret primary = new Secret();
private final Controller<Secret> mockController = mock();
private DefaultContext<?> context;
private Controller<HasMetadata> mockController;
private EventSourceManager<HasMetadata> mockManager;

private final DefaultContext<?> context =
new DefaultContext<>(null, mockController, primary, false, false);
@BeforeEach
void setUp() {
mockController = mock();
mockManager = mock();
when(mockController.getEventSourceManager()).thenReturn(mockManager);

context = new DefaultContext<>(null, mockController, new Secret(), false, false);
}

@Test
@SuppressWarnings("unchecked")
void getSecondaryResourceReturnsEmptyOptionalOnNonActivatedDRType() {
var mockManager = mock(EventSourceManager.class);
when(mockController.getEventSourceManager()).thenReturn(mockManager);
when(mockController.workflowContainsDependentForType(ConfigMap.class)).thenReturn(true);
when(mockManager.getEventSourceFor(any(), any()))
.thenThrow(new NoEventSourceForClassException(ConfigMap.class));
Expand All @@ -56,4 +70,101 @@ void setRetryInfo() {
assertThat(newContext).isSameAs(context);
assertThat(newContext.getRetryInfo()).hasValue(retryInfo);
}

@Test
void latestDistinctKeepsOnlyLatestResourceVersion() {
// Create multiple resources with same name and namespace but different versions
var pod1v1 = podWithNameAndVersion("pod1", "100");
var pod1v2 = podWithNameAndVersion("pod1", "200");
var pod1v3 = podWithNameAndVersion("pod1", "150");

// Create a resource with different name
var pod2v1 = podWithNameAndVersion("pod2", "100");

// Create a resource with same name but different namespace
var pod1OtherNsv1 = podWithNameAndVersion("pod1", "50", "other");

setUpEventSourceWith(pod1v1, pod1v2, pod1v3, pod1OtherNsv1, pod2v1);

var result = context.getSecondaryResourcesAsStream(Pod.class, true).toList();

// Should have 3 resources: pod1 in default (latest version 200), pod2 in default, and pod1 in
// other
assertThat(result).hasSize(3);

// Find pod1 in default namespace - should have version 200
final var pod1InDefault =
result.stream()
.filter(r -> ResourceID.fromResource(r).isSameResource("pod1", "default"))
.findFirst()
.orElseThrow();
assertThat(pod1InDefault.getMetadata().getResourceVersion()).isEqualTo("200");

// Find pod2 in default namespace - should exist
HasMetadata pod2InDefault =
result.stream()
.filter(r -> ResourceID.fromResource(r).isSameResource("pod2", "default"))
.findFirst()
.orElseThrow();
assertThat(pod2InDefault.getMetadata().getResourceVersion()).isEqualTo("100");

// Find pod1 in other namespace - should exist
HasMetadata pod1InOther =
result.stream()
.filter(r -> ResourceID.fromResource(r).isSameResource("pod1", "other"))
.findFirst()
.orElseThrow();
assertThat(pod1InOther.getMetadata().getResourceVersion()).isEqualTo("50");
}

private void setUpEventSourceWith(Pod... pods) {
EventSource<Pod, HasMetadata> mockEventSource = mock();
when(mockEventSource.getSecondaryResources(any())).thenReturn(Set.of(pods));
when(mockManager.getEventSourcesFor(Pod.class)).thenReturn(List.of(mockEventSource));
}

private static Pod podWithNameAndVersion(
String name, String resourceVersion, String... namespace) {
final var ns = namespace != null && namespace.length > 0 ? namespace[0] : "default";
return new PodBuilder()
.withMetadata(
new ObjectMetaBuilder()
.withName(name)
.withNamespace(ns)
.withResourceVersion(resourceVersion)
.build())
.build();
}

@Test
void latestDistinctHandlesEmptyStream() {
var result = context.getSecondaryResourcesAsStream(Pod.class, true).toList();

assertThat(result).isEmpty();
}

@Test
void latestDistinctHandlesSingleResource() {
final var pod = podWithNameAndVersion("pod1", "100");
setUpEventSourceWith(pod);

var result = context.getSecondaryResourcesAsStream(Pod.class, true).toList();

assertThat(result).hasSize(1);
assertThat(result).contains(pod);
}

@Test
void latestDistinctComparesNumericVersionsCorrectly() {
// Test that version 1000 is greater than version 999 (not lexicographic)
final var podV999 = podWithNameAndVersion("pod1", "999");
final var podV1000 = podWithNameAndVersion("pod1", "1000");
setUpEventSourceWith(podV999, podV1000);

var result = context.getSecondaryResourcesAsStream(Pod.class, true).toList();

assertThat(result).hasSize(1);
HasMetadata resultPod = result.iterator().next();
assertThat(resultPod.getMetadata().getResourceVersion()).isEqualTo("1000");
}
}
Loading
Loading