Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package io.kubernetes.client.e2e.informer;

import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;

import io.kubernetes.client.informer.ResourceEventHandler;
import io.kubernetes.client.informer.SharedIndexInformer;
import io.kubernetes.client.informer.SharedInformerFactory;
import io.kubernetes.client.informer.cache.Lister;
import io.kubernetes.client.openapi.ApiClient;
import io.kubernetes.client.openapi.models.V1Namespace;
import io.kubernetes.client.openapi.models.V1NamespaceList;
import io.kubernetes.client.util.ClientBuilder;
import io.kubernetes.client.util.generic.GenericKubernetesApi;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import org.junit.jupiter.api.Test;

/**
* E2E test verifying that initial list items delivered via the informer's onAdd callback contain
* the expected kind and apiVersion fields (not null), consistent with watch events.
*/
class InformerListTypeMetaTest {

@Test
void initialListItemsShouldHaveKindAndApiVersionPopulated() throws Exception {
ApiClient client = ClientBuilder.defaultClient();
SharedInformerFactory informerFactory = new SharedInformerFactory(client);

GenericKubernetesApi<V1Namespace, V1NamespaceList> api =
new GenericKubernetesApi<>(
V1Namespace.class,
V1NamespaceList.class,
"",
"v1",
"namespaces",
client);

SharedIndexInformer<V1Namespace> nsInformer =
informerFactory.sharedIndexInformerFor(api, V1Namespace.class, 0);
Lister<V1Namespace> nsLister = new Lister<>(nsInformer.getIndexer());

// Collect items received via onAdd during the initial list phase
List<V1Namespace> addedItems = new ArrayList<>();
nsInformer.addEventHandler(
new ResourceEventHandler<V1Namespace>() {
@Override
public void onAdd(V1Namespace obj) {
addedItems.add(obj);
}

@Override
public void onUpdate(V1Namespace oldObj, V1Namespace newObj) {}

@Override
public void onDelete(V1Namespace obj, boolean deletedFinalStateUnknown) {}
});

try {
informerFactory.startAllRegisteredInformers();

await()
.atMost(Duration.ofSeconds(30))
.untilAsserted(
() -> {
assertThat(nsInformer.hasSynced()).isTrue();
assertThat(nsLister.list()).isNotEmpty();
});

// All items received via onAdd during the initial list must have kind and apiVersion set
assertThat(addedItems).isNotEmpty();
for (V1Namespace ns : addedItems) {
assertThat(ns.getKind())
.as("kind should be populated for namespace %s", ns.getMetadata().getName())
.isEqualTo("Namespace");
assertThat(ns.getApiVersion())
.as("apiVersion should be populated for namespace %s", ns.getMetadata().getName())
.isEqualTo("v1");
}
} finally {
informerFactory.stopAllRegisteredInformers(true);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
*/
package io.kubernetes.client.informer.cache;

import io.kubernetes.client.apimachinery.GroupVersionKind;
import io.kubernetes.client.common.KubernetesListObject;
import io.kubernetes.client.common.KubernetesObject;
import io.kubernetes.client.informer.EventType;
Expand All @@ -21,9 +22,11 @@
import io.kubernetes.client.openapi.models.V1ListMeta;
import io.kubernetes.client.openapi.models.V1ObjectMeta;
import io.kubernetes.client.util.CallGeneratorParams;
import io.kubernetes.client.util.ModelMapper;
import io.kubernetes.client.util.Strings;
import io.kubernetes.client.util.Watchable;
import java.io.IOException;
import java.lang.reflect.Method;
import java.net.ConnectException;
import java.net.HttpURLConnection;
import java.time.Duration;
Expand Down Expand Up @@ -60,6 +63,9 @@ public class ReflectorRunnable<

/* visible for testing */ final BiConsumer<Class<ApiType>, Throwable> exceptionHandler;

private Method setKindMethod;
private Method setApiVersionMethod;

public ReflectorRunnable(
Class<ApiType> apiTypeClass,
ListerWatcher<ApiType, ApiListType> listerWatcher,
Expand All @@ -77,6 +83,12 @@ public ReflectorRunnable(
this.apiTypeClass = apiTypeClass;
this.exceptionHandler =
exceptionHandler == null ? ReflectorRunnable::defaultWatchErrorHandler : exceptionHandler;
try {
this.setKindMethod = apiTypeClass.getMethod("setKind", String.class);
this.setApiVersionMethod = apiTypeClass.getMethod("setApiVersion", String.class);
} catch (NoSuchMethodException e) {
log.warn("{}#setKind or setApiVersion method not found, type metadata will not be set on list items", apiTypeClass);
}
}

/**
Expand All @@ -94,6 +106,7 @@ public void run() {
V1ListMeta listMeta = list.getMetadata();
String resourceVersion = listMeta.getResourceVersion();
List<? extends KubernetesObject> items = list.getItems();
populateTypeMeta(items, list.getApiVersion(), list.getKind());

if (log.isDebugEnabled()) {
log.debug("{}#Extract resourceVersion {} list meta", apiTypeClass, resourceVersion);
Expand Down Expand Up @@ -229,6 +242,64 @@ private String getRelistResourceVersion() {
return lastSyncResourceVersion;
}

/* visible for testing */ void populateTypeMeta(
List<? extends KubernetesObject> items, String listApiVersion, String listKind) {
if (items == null || items.isEmpty()) {
return;
}

// Determine kind and apiVersion to use for items
String kind = null;
String apiVersion = null;

// First, derive from the list's own kind/apiVersion (strip "List" suffix)
if (!Strings.isNullOrEmpty(listApiVersion)
&& !Strings.isNullOrEmpty(listKind)
&& listKind.endsWith("List")) {
kind = listKind.substring(0, listKind.length() - 4);
apiVersion = listApiVersion;
}

// Fall back to ModelMapper if list metadata is absent
if (kind == null) {
GroupVersionKind gvk = ModelMapper.getGroupVersionKindByClass(apiTypeClass);
if (gvk == null) {
Optional<GroupVersionKind> preBuiltGvk =
ModelMapper.preBuiltGetGroupVersionKindByClass(apiTypeClass);
gvk = preBuiltGvk.orElse(null);
}
if (gvk != null) {
kind = gvk.getKind();
String group = gvk.getGroup();
String version = gvk.getVersion();
apiVersion = Strings.isNullOrEmpty(group) ? version : group + "/" + version;
}
}

if (kind == null || apiVersion == null) {
return;
}

if (setKindMethod == null || setApiVersionMethod == null) {
return;
}

final String resolvedKind = kind;
final String resolvedApiVersion = apiVersion;
try {
for (KubernetesObject item : items) {
if (Strings.isNullOrEmpty(item.getKind())) {
setKindMethod.invoke(item, resolvedKind);
}
if (Strings.isNullOrEmpty(item.getApiVersion())) {
setApiVersionMethod.invoke(item, resolvedApiVersion);
}
}
} catch (ReflectiveOperationException e) {
log.warn("{}#Failed to set kind/apiVersion on list items", apiTypeClass, e);
}
}

private void watchHandler(Watchable<ApiType> watch) {
while (watch.hasNext()) {
io.kubernetes.client.util.Watch.Response<ApiType> item = watch.next();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import io.kubernetes.client.informer.ListerWatcher;
import io.kubernetes.client.openapi.ApiException;
import io.kubernetes.client.openapi.models.V1ListMeta;
import io.kubernetes.client.openapi.models.V1ObjectMeta;
import io.kubernetes.client.openapi.models.V1Pod;
import io.kubernetes.client.openapi.models.V1PodList;
import io.kubernetes.client.openapi.models.V1Status;
Expand All @@ -32,6 +33,8 @@
import io.kubernetes.client.util.Watchable;
import java.net.HttpURLConnection;
import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -373,4 +376,91 @@ void givemExceptionHandlerSet() {

assertThat(reflector.exceptionHandler).isSameAs(exceptionHandler);
}

@Test
void populateTypeMetaSetsKindAndApiVersionFromListMetadata() {
ReflectorRunnable<V1Pod, V1PodList> reflector =
new ReflectorRunnable<>(V1Pod.class, listerWatcher, deltaFIFO);

V1Pod pod = new V1Pod().metadata(new V1ObjectMeta().name("test-pod"));
List<V1Pod> items = Arrays.asList(pod);

reflector.populateTypeMeta(items, "v1", "PodList");

assertThat(pod.getKind()).isEqualTo("Pod");
assertThat(pod.getApiVersion()).isEqualTo("v1");
}

@Test
void populateTypeMetaDoesNotOverwriteExistingKindAndApiVersion() {
ReflectorRunnable<V1Pod, V1PodList> reflector =
new ReflectorRunnable<>(V1Pod.class, listerWatcher, deltaFIFO);

V1Pod pod =
new V1Pod()
.metadata(new V1ObjectMeta().name("test-pod"))
.kind("AlreadySet")
.apiVersion("already/set");
List<V1Pod> items = Arrays.asList(pod);

reflector.populateTypeMeta(items, "v1", "PodList");

assertThat(pod.getKind()).isEqualTo("AlreadySet");
assertThat(pod.getApiVersion()).isEqualTo("already/set");
}

@Test
void populateTypeMetaFallsBackToModelMapperWhenListMetadataAbsent() {
ReflectorRunnable<V1Pod, V1PodList> reflector =
new ReflectorRunnable<>(V1Pod.class, listerWatcher, deltaFIFO);

V1Pod pod = new V1Pod().metadata(new V1ObjectMeta().name("test-pod"));
List<V1Pod> items = Arrays.asList(pod);

// Pass null list kind/apiVersion to trigger ModelMapper fallback
reflector.populateTypeMeta(items, null, null);

assertThat(pod.getKind()).isEqualTo("Pod");
assertThat(pod.getApiVersion()).isEqualTo("v1");
}

@Test
void initialListPopulatesTypeMetaOnItems() throws ApiException, InterruptedException {
V1Pod pod = new V1Pod().metadata(new V1ObjectMeta().name("test-pod").resourceVersion("1"));
when(listerWatcher.list(any()))
.thenReturn(
new V1PodList()
.apiVersion("v1")
.kind("PodList")
.metadata(new V1ListMeta().resourceVersion("1000"))
.addItemsItem(pod));
CountDownLatch watchCalledLatch = new CountDownLatch(1);
CountDownLatch watchCanReturnLatch = new CountDownLatch(1);
when(listerWatcher.watch(any()))
.then(
(v) -> {
watchCalledLatch.countDown();
try {
watchCanReturnLatch.await();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return new MockWatch<>();
});
ReflectorRunnable<V1Pod, V1PodList> reflectorRunnable =
new ReflectorRunnable<>(V1Pod.class, listerWatcher, deltaFIFO);

try {
Thread thread = new Thread(reflectorRunnable::run);
thread.setDaemon(true);
thread.start();
assertThat(watchCalledLatch.await(1, TimeUnit.SECONDS)).isTrue();
} finally {
reflectorRunnable.stop();
watchCanReturnLatch.countDown();
}

assertThat(pod.getKind()).isEqualTo("Pod");
assertThat(pod.getApiVersion()).isEqualTo("v1");
}
}
Loading