Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 29 additions & 54 deletions test/e2e/steps/steps.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"encoding/json"
"fmt"
"io"
"net"
"net/http"
"os"
"os/exec"
Expand Down Expand Up @@ -1394,36 +1393,24 @@ func httpGet(url string, token string) (*http.Response, error) {
return resp, nil
}

func randomAvailablePort() (int, error) {
l, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil {
return 0, err
}
defer l.Close()
return l.Addr().(*net.TCPAddr).Port, nil
}

// SendMetricsRequest sets up port-forwarding to the controller's service pods and waits for the metrics endpoint
// to return a successful response. Stores the response body per pod in the scenario context. Polls with timeout.
func SendMetricsRequest(ctx context.Context, serviceAccount string, endpoint string, controllerName string) error {
sc := scenarioCtx(ctx)
serviceNs, err := k8sClient("get", "service", "-A", "-o", fmt.Sprintf(`jsonpath={.items[?(@.metadata.name=="%s-service")].metadata.namespace}`, controllerName))
svc, err := getMetricsService(controllerName)
if err != nil {
return err
}
v, err := k8sClient("get", "service", "-n", serviceNs, fmt.Sprintf("%s-service", controllerName), "-o", "json")
mPort, err := metricsPort(svc)
if err != nil {
return err
}
var service corev1.Service
if err := json.Unmarshal([]byte(v), &service); err != nil {
return err
}
podNameCmd := []string{"get", "pod", "-n", olmNamespace, "-o", "jsonpath={.items}"}
for k, v := range service.Spec.Selector {

podNameCmd := []string{"get", "pod", "-n", svc.Namespace, "-o", "jsonpath={.items}"}
for k, v := range svc.Spec.Selector {
podNameCmd = append(podNameCmd, fmt.Sprintf("--selector=%s=%s", k, v))
}
v, err = k8sClient(podNameCmd...)
v, err := k8sClient(podNameCmd...)
if err != nil {
return err
}
Expand All @@ -1436,51 +1423,39 @@ func SendMetricsRequest(ctx context.Context, serviceAccount string, endpoint str
if err != nil {
return err
}
var metricsPort int32
for _, p := range service.Spec.Ports {
if p.Name == "metrics" {
metricsPort = p.Port
break
}
}

sc.metricsResponse = make(map[string]string)
for _, p := range pods {
port, err := randomAvailablePort()
if err != nil {
return err
}
portForwardCmd := exec.Command(k8sCli, "port-forward", "-n", p.Namespace, fmt.Sprintf("pod/%s", p.Name), fmt.Sprintf("%d:%d", port, metricsPort)) //nolint:gosec // perfectly safe to start port-forwarder for provided controller name
logger.V(1).Info("starting port-forward", "command", strings.Join(portForwardCmd.Args, " "))
if err := portForwardCmd.Start(); err != nil {
logger.Error(err, fmt.Sprintf("failed to start port-forward for pod %s", p.Name))
return err
}
waitFor(ctx, func() bool {
resp, err := httpGet(fmt.Sprintf("https://localhost:%d%s", port, endpoint), token)
if err := func() error {
addr, cleanup, err := portForward(p.Namespace, fmt.Sprintf("pod/%s", p.Name), mPort)
if err != nil {
return false
return err
}
defer resp.Body.Close()
defer cleanup()
waitFor(ctx, func() bool {
resp, err := httpGet(fmt.Sprintf("https://%s%s", addr, endpoint), token)
if err != nil {
return false
}
defer resp.Body.Close()

if resp.StatusCode == http.StatusOK {
if resp.StatusCode == http.StatusOK {
b, err := io.ReadAll(resp.Body)
if err != nil {
return false
}
sc.metricsResponse[p.Name] = string(b)
return true
}
b, err := io.ReadAll(resp.Body)
if err != nil {
return false
}
sc.metricsResponse[p.Name] = string(b)
return true
}
b, err := io.ReadAll(resp.Body)
if err != nil {
logger.V(1).Info("failed to get metrics", "pod", p.Name, "response", string(b))
return false
}
logger.V(1).Info("failed to get metrics", "pod", p.Name, "response", string(b))
return false
})
if err := portForwardCmd.Process.Kill(); err != nil {
return err
}
if _, err := portForwardCmd.Process.Wait(); err != nil {
})
return nil
}(); err != nil {
return err
}
}
Expand Down
72 changes: 51 additions & 21 deletions test/e2e/steps/tls_steps.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,65 +53,95 @@ var curveIDByName = map[string]tls.CurveID{
"secp521r1": tls.CurveP521,
}

// getMetricsServiceEndpoint returns the namespace and metrics port for the named component service.
func getMetricsServiceEndpoint(component string) (string, int32, error) {
// getMetricsService returns the full Service object for the named component's metrics service.
// The namespace is available via svc.Namespace.
func getMetricsService(component string) (*corev1.Service, error) {
serviceName := fmt.Sprintf("%s-service", component)
serviceNs, err := k8sClient("get", "service", "-A", "-o",
fmt.Sprintf(`jsonpath={.items[?(@.metadata.name=="%s")].metadata.namespace}`, serviceName))
if err != nil {
return "", 0, fmt.Errorf("failed to find namespace for service %s: %w", serviceName, err)
return nil, fmt.Errorf("failed to find namespace for service %s: %w", serviceName, err)
}
serviceNs = strings.TrimSpace(serviceNs)
if serviceNs == "" {
return "", 0, fmt.Errorf("service %s not found in any namespace", serviceName)
return nil, fmt.Errorf("service %s not found in any namespace", serviceName)
}

raw, err := k8sClient("get", "service", "-n", serviceNs, serviceName, "-o", "json")
if err != nil {
return "", 0, fmt.Errorf("failed to get service %s: %w", serviceName, err)
return nil, fmt.Errorf("failed to get service %s: %w", serviceName, err)
}
var svc corev1.Service
if err := json.Unmarshal([]byte(raw), &svc); err != nil {
return "", 0, fmt.Errorf("failed to unmarshal service %s: %w", serviceName, err)
return nil, fmt.Errorf("failed to unmarshal service %s: %w", serviceName, err)
}
return &svc, nil
}

// metricsPort returns the port number of the port named "metrics" on the given service.
func metricsPort(svc *corev1.Service) (int32, error) {
for _, p := range svc.Spec.Ports {
if p.Name == "metrics" {
return serviceNs, p.Port, nil
return p.Port, nil
}
}
return "", 0, fmt.Errorf("no port named 'metrics' found on service %s", serviceName)
return 0, fmt.Errorf("no port named 'metrics' found on service %s", svc.Name)
}

// withMetricsPortForward starts a kubectl port-forward to the component's metrics service,
// waits until a basic TLS connection succeeds (confirming the port-forward is ready),
// then calls fn with the local address. The port-forward is torn down when fn returns.
func withMetricsPortForward(ctx context.Context, component string, fn func(addr string) error) error {
ns, metricsPort, err := getMetricsServiceEndpoint(component)
func randomAvailablePort() (int, error) {
l, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil {
return err
return 0, err
}
defer l.Close()
return l.Addr().(*net.TCPAddr).Port, nil
}

// portForward starts a kubectl port-forward to target (e.g. "service/foo" or "pod/bar")
// in the given namespace, mapping a random local port to remotePort. It returns the
// local address and a cleanup function. The caller is responsible for calling cleanup.
func portForward(ns, target string, remotePort int32) (string, func(), error) {
localPort, err := randomAvailablePort()
if err != nil {
return fmt.Errorf("failed to find a free local port: %w", err)
return "", nil, fmt.Errorf("failed to find a free local port: %w", err)
}

serviceName := fmt.Sprintf("%s-service", component)
pfCmd := exec.Command(k8sCli, "port-forward", "-n", ns, //nolint:gosec
fmt.Sprintf("service/%s", serviceName),
fmt.Sprintf("%d:%d", localPort, metricsPort))
target, fmt.Sprintf("%d:%d", localPort, remotePort))
pfCmd.Env = append(os.Environ(), fmt.Sprintf("KUBECONFIG=%s", kubeconfigPath))
if err := pfCmd.Start(); err != nil {
return fmt.Errorf("failed to start port-forward to %s: %w", serviceName, err)
return "", nil, fmt.Errorf("failed to start port-forward to %s: %w", target, err)
}
defer func() {

cleanup := func() {
if p := pfCmd.Process; p != nil {
_ = p.Kill()
_ = pfCmd.Wait()
}
}()
}

addr := fmt.Sprintf("127.0.0.1:%d", localPort)
return addr, cleanup, nil
}

// withMetricsPortForward starts a kubectl port-forward to the component's metrics service,
// waits until a basic TLS connection succeeds (confirming the port-forward is ready),
// then calls fn with the local address. The port-forward is torn down when fn returns.
func withMetricsPortForward(ctx context.Context, component string, fn func(addr string) error) error {
svc, err := getMetricsService(component)
if err != nil {
return err
}
port, err := metricsPort(svc)
if err != nil {
return err
}

addr, cleanup, err := portForward(svc.Namespace, fmt.Sprintf("service/%s", svc.Name), port)
if err != nil {
return err
}
defer cleanup()

// Wait until the port-forward is accepting connections. A plain TLS dial (no version
// restrictions) serves as the readiness probe; any successful TLS handshake confirms
Expand Down