Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,11 @@ default <R> Optional<R> getSecondaryResource(Class<R> expectedType) {
<R> Set<R> getSecondaryResources(Class<R> expectedType);

default <R> Stream<R> getSecondaryResourcesAsStream(Class<R> expectedType) {
return getSecondaryResources(expectedType).stream();
return getSecondaryResourcesAsStream(expectedType, false);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here we could use the former implementation and call this in case in the new method the deduplication is false.

}

<R> Stream<R> getSecondaryResourcesAsStream(Class<R> expectedType, boolean deduplicate);

<R> Optional<R> getSecondaryResource(Class<R> expectedType, String eventSourceName);

ControllerConfiguration<P> getControllerConfiguration();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package io.javaoperatorsdk.operator.api.reconciler;

import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
Expand All @@ -24,8 +25,12 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.fabric8.kubernetes.api.model.HasMetadata;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.javaoperatorsdk.operator.ReconcilerUtilsInternal;
import io.javaoperatorsdk.operator.api.config.ControllerConfiguration;
import io.javaoperatorsdk.operator.api.reconciler.dependent.DependentResource;
import io.javaoperatorsdk.operator.api.reconciler.dependent.managed.DefaultManagedWorkflowAndDependentResourceContext;
Expand All @@ -36,6 +41,7 @@
import io.javaoperatorsdk.operator.processing.event.ResourceID;

public class DefaultContext<P extends HasMetadata> implements Context<P> {
private static final Logger log = LoggerFactory.getLogger(DefaultContext.class);

private RetryInfo retryInfo;
private final Controller<P> controller;
Expand Down Expand Up @@ -73,11 +79,56 @@ public <T> Set<T> getSecondaryResources(Class<T> expectedType) {
return getSecondaryResourcesAsStream(expectedType).collect(Collectors.toSet());
}

@Override
public <R> Stream<R> getSecondaryResourcesAsStream(Class<R> expectedType) {
return controller.getEventSourceManager().getEventSourcesFor(expectedType).stream()
.map(es -> es.getSecondaryResources(primaryResource))
.flatMap(Set::stream);
public <R> Stream<R> getSecondaryResourcesAsStream(Class<R> expectedType, boolean deduplicate) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add comprehensive javadoc

if (deduplicate && !HasMetadata.class.isAssignableFrom(expectedType)) {
throw new IllegalArgumentException("Can only de-duplicate HasMetadata descendants");
}

final var idToLatest = deduplicate ? new HashMap<ResourceID, String>() : null;
final var stream =
controller.getEventSourceManager().getEventSourcesFor(expectedType).stream()
.<R>mapMulti(
(es, consumer) ->
es.getSecondaryResources(primaryResource)
.forEach(
r -> {
var reject = false;
if (deduplicate) {
final boolean[] rejectAr = new boolean[1];
final var hm = (HasMetadata) r;
final var resourceVersion = hm.getMetadata().getResourceVersion();
idToLatest.merge(
ResourceID.fromResource(hm),
resourceVersion,
(existing, replacement) -> {
final var comparison =
ReconcilerUtilsInternal.compareResourceVersions(
existing, replacement);
rejectAr[0] =
comparison == 0; // rejecting resource if version is equal
return comparison >= 0 ? existing : replacement;
});
reject = rejectAr[0];
}
// only keep resources that don't have the same id and resource
// version
if (!reject) {
consumer.accept(r);
}
}));
if (deduplicate) {
//noinspection unchecked
return stream
.map(HasMetadata.class::cast)
.filter(
hm -> {
final var resourceVersion = hm.getMetadata().getResourceVersion();
return resourceVersion.equals(idToLatest.get(ResourceID.fromResource(hm)));
})
.map(hasMetadata -> (R) hasMetadata);
} else {
return stream;
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,17 @@
package io.javaoperatorsdk.operator.api.reconciler;

import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.UnaryOperator;
import java.util.stream.Collector;
import java.util.stream.Collectors;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -32,6 +41,7 @@
import io.javaoperatorsdk.operator.processing.event.ResourceID;
import io.javaoperatorsdk.operator.processing.event.source.informer.ManagedInformerEventSource;

import static io.javaoperatorsdk.operator.ReconcilerUtilsInternal.compareResourceVersions;
import static io.javaoperatorsdk.operator.processing.KubernetesResourceUtils.getUID;
import static io.javaoperatorsdk.operator.processing.KubernetesResourceUtils.getVersion;

Expand Down Expand Up @@ -364,13 +374,13 @@ public static <R extends HasMetadata> R resourcePatch(
if (esList.isEmpty()) {
throw new IllegalStateException("No event source found for type: " + resource.getClass());
}
var es = esList.get(0);
if (esList.size() > 1) {
throw new IllegalStateException(
"Multiple event sources found for: "
+ resource.getClass()
+ " please provide the target event source");
log.warn(
"Multiple event sources found for type: {}, selecting first with name {}",
resource.getClass(),
es.name());
}
var es = esList.get(0);
if (es instanceof ManagedInformerEventSource mes) {
return resourcePatch(resource, updateOperation, mes);
} else {
Expand Down Expand Up @@ -595,4 +605,55 @@ public static <P extends HasMetadata> P addFinalizerWithSSA(
e);
}
}

/**
* Returns a collector that deduplicates Kubernetes objects by keeping only the one with the
* latest metadata.resourceVersion for each unique name and namespace combination. The intended
* use case is for the rather rare setup when there are overlapping {@link
* io.javaoperatorsdk.operator.processing.event.source.informer.InformerEventSource}s for a
* resource type.
*
* @param <T> the type of HasMetadata objects
* @return a collector that produces a collection of deduplicated Kubernetes objects
*/
public static <T extends HasMetadata> Collector<T, ?, Collection<T>> latestDistinct() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wasn't this moved to ReconcileUtilsInternal

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we were to keep this implementation, this actually would be removed

return Collectors.collectingAndThen(latestDistinctToMap(), Map::values);
}

/**
* Returns a collector that deduplicates Kubernetes objects by keeping only the one with the
* latest metadata.resourceVersion for each unique name and namespace combination. The intended
* use case is for the rather rare setup when there are overlapping {@link
* io.javaoperatorsdk.operator.processing.event.source.informer.InformerEventSource}s for a
* resource type.
*
* @param <T> the type of HasMetadata objects
* @return a collector that produces a List of deduplicated Kubernetes objects
*/
public static <T extends HasMetadata> Collector<T, ?, List<T>> latestDistinctList() {
return Collectors.collectingAndThen(
latestDistinctToMap(), map -> new ArrayList<>(map.values()));
}

/**
* Returns a collector that deduplicates Kubernetes objects by keeping only the one with the
* latest metadata.resourceVersion for each unique name and namespace combination. The intended
* use case is for the rather rare setup when there are overlapping {@link
* io.javaoperatorsdk.operator.processing.event.source.informer.InformerEventSource}s for a
* resource type.
*
* @param <T> the type of HasMetadata objects
* @return a collector that produces a Set of deduplicated Kubernetes objects
*/
public static <T extends HasMetadata> Collector<T, ?, Set<T>> latestDistinctSet() {
return Collectors.collectingAndThen(latestDistinctToMap(), map -> new HashSet<>(map.values()));
}

private static <T extends HasMetadata> Collector<T, ?, Map<ResourceID, T>> latestDistinctToMap() {
return Collectors.toMap(
ResourceID::fromResource,
Function.identity(),
(existing, replacement) ->
compareResourceVersions(existing, replacement) >= 0 ? existing : replacement);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,8 @@ public Set<R> getSecondaryResources(P primary) {
}
return secondaryIDs.stream()
.map(this::get)
.flatMap(Optional::stream)
.filter(Optional::isPresent)
.map(Optional::get)
.collect(Collectors.toSet());
}

Expand Down
Loading