Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
122 changes: 111 additions & 11 deletions pkg/monitortests/network/disruptionpodnetwork/monitortest.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,11 @@ import (
"context"
"embed"
_ "embed"
"errors"
"fmt"
"io"
"net"
"strings"
"time"

"github.com/google/uuid"
Expand Down Expand Up @@ -82,6 +86,80 @@ type podNetworkAvalibility struct {
kubeClient kubernetes.Interface
}

// retryBackoff defines the backoff parameters for transient API server errors
// during preparation. The preparation phase can race against heavy cluster load
// (e.g. CNV + FRR deployment on metal BGP-virt jobs), so we retry on errors
// that indicate the API server is temporarily overloaded.
var retryBackoff = wait.Backoff{
Duration: 2 * time.Second,
Factor: 2.0,
Jitter: 0.1,
Steps: 5,
Cap: 30 * time.Second,
}

// isTransientAPIError returns true for errors that indicate the API server is
// temporarily unable to handle the request and the operation should be retried.
func isTransientAPIError(err error) bool {
if err == nil {
return false
}
// Standard API server overload / timeout responses
if apierrors.IsServerTimeout(err) || apierrors.IsTimeout(err) ||
apierrors.IsTooManyRequests(err) || apierrors.IsServiceUnavailable(err) ||
apierrors.IsInternalError(err) {
return true
}
// Network-level errors: unexpected EOF, connection reset
if errors.Is(err, io.ErrUnexpectedEOF) || errors.Is(err, io.EOF) {
return true
}
var netErr *net.OpError
if errors.As(err, &netErr) {
return true
}
// Catch remaining transient error strings from the Go HTTP/2 client and etcd
errMsg := err.Error()
for _, substr := range []string{
"http2: client connection lost",
"connection reset by peer",
"etcdserver: request timed out",
"unexpected EOF",
} {
if strings.Contains(errMsg, substr) {
return true
}
}
return false
}

// createWithRetry wraps a Kubernetes resource creation call with exponential
// backoff retry on transient API server errors. This prevents the preparation
// phase from failing when the API server is under heavy load (e.g. right after
// CNV or FRR deployment).
func createWithRetry[T any](fn func() (T, error)) (T, error) {
var result T
var lastErr error
err := wait.ExponentialBackoff(retryBackoff, func() (bool, error) {
var createErr error
result, createErr = fn()
if createErr == nil {
return true, nil
Comment on lines +143 to +147

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@copilot, is this even relevant given this is running only on temporary clusters created for CI purposes?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@copilot you there?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The context.Background() usage isn't a regression — the original code already used context.Background() for all these Create() calls before this PR. This change only wraps them in retry logic without altering the context behavior.

That said, the broader point about context propagation: this code runs exclusively in CI on ephemeral clusters. If a job is aborted, the entire cluster gets destroyed — there's nothing to "gracefully cancel" into. The worst case is ~60s of retries against an API server that's about to be torn down anyway.

I could thread ctx through createWithRetry and switch to ExponentialBackoffWithContext, but it adds signature complexity for zero practical benefit in this context. Happy to do it if reviewers feel strongly, but I'd rather keep the helper simple.


🤖 This comment was posted by OpenClaw on behalf of @mkowalski.

}
if isTransientAPIError(createErr) {
klog.Warningf("Transient API error during preparation, retrying: %v", createErr)
lastErr = createErr
return false, nil
}
// Non-retryable error, stop immediately
return false, createErr
})
if wait.Interrupted(err) {
return result, fmt.Errorf("timed out retrying after transient errors, last error: %w", lastErr)
}
return result, err
}

func NewPodNetworkAvalibilityInvariant(info monitortestframework.MonitorTestInitializationInfo) monitortestframework.MonitorTest {
return &podNetworkAvalibility{
payloadImagePullSpec: info.UpgradeTargetPayloadImagePullSpec,
Expand Down Expand Up @@ -109,13 +187,17 @@ func (pna *podNetworkAvalibility) PrepareCollection(ctx context.Context, adminRE
return err
}

actualNamespace, err := pna.kubeClient.CoreV1().Namespaces().Create(context.Background(), namespace, metav1.CreateOptions{})
actualNamespace, err := createWithRetry(func() (*corev1.Namespace, error) {
return pna.kubeClient.CoreV1().Namespaces().Create(context.Background(), namespace, metav1.CreateOptions{})
})
if err != nil {
return err
}
pna.namespaceName = actualNamespace.Name

if _, err = pna.kubeClient.RbacV1().RoleBindings(pna.namespaceName).Create(context.Background(), pollerRoleBinding, metav1.CreateOptions{}); err != nil {
if _, err = createWithRetry(func() (*rbacv1.RoleBinding, error) {
return pna.kubeClient.RbacV1().RoleBindings(pna.namespaceName).Create(context.Background(), pollerRoleBinding, metav1.CreateOptions{})
}); err != nil {
return err
}

Expand All @@ -130,31 +212,39 @@ func (pna *podNetworkAvalibility) PrepareCollection(ctx context.Context, adminRE
podNetworkToPodNetworkPollerDeployment.Spec.Replicas = &numNodes
podNetworkToPodNetworkPollerDeployment.Spec.Template.Spec.Containers[0].Image = openshiftTestsImagePullSpec
podNetworkToPodNetworkPollerDeployment = disruptionlibrary.UpdateDeploymentENVs(podNetworkToPodNetworkPollerDeployment, deploymentID, "")
if _, err = pna.kubeClient.AppsV1().Deployments(pna.namespaceName).Create(context.Background(), podNetworkToPodNetworkPollerDeployment, metav1.CreateOptions{}); err != nil {
if _, err = createWithRetry(func() (*appsv1.Deployment, error) {
return pna.kubeClient.AppsV1().Deployments(pna.namespaceName).Create(context.Background(), podNetworkToPodNetworkPollerDeployment, metav1.CreateOptions{})
}); err != nil {
return err
}
time.Sleep(2 * time.Second)
klog.Infof("Starting deployment: %s", podNetworkToHostNetworkPollerDeployment.Name)
podNetworkToHostNetworkPollerDeployment.Spec.Replicas = &numNodes
podNetworkToHostNetworkPollerDeployment.Spec.Template.Spec.Containers[0].Image = openshiftTestsImagePullSpec
podNetworkToHostNetworkPollerDeployment = disruptionlibrary.UpdateDeploymentENVs(podNetworkToHostNetworkPollerDeployment, deploymentID, "")
if _, err = pna.kubeClient.AppsV1().Deployments(pna.namespaceName).Create(context.Background(), podNetworkToHostNetworkPollerDeployment, metav1.CreateOptions{}); err != nil {
if _, err = createWithRetry(func() (*appsv1.Deployment, error) {
return pna.kubeClient.AppsV1().Deployments(pna.namespaceName).Create(context.Background(), podNetworkToHostNetworkPollerDeployment, metav1.CreateOptions{})
}); err != nil {
return err
}
time.Sleep(2 * time.Second)
klog.Infof("Starting deployment: %s", hostNetworkToPodNetworkPollerDeployment.Name)
hostNetworkToPodNetworkPollerDeployment.Spec.Replicas = &numNodes
hostNetworkToPodNetworkPollerDeployment.Spec.Template.Spec.Containers[0].Image = openshiftTestsImagePullSpec
hostNetworkToPodNetworkPollerDeployment = disruptionlibrary.UpdateDeploymentENVs(hostNetworkToPodNetworkPollerDeployment, deploymentID, "")
if _, err = pna.kubeClient.AppsV1().Deployments(pna.namespaceName).Create(context.Background(), hostNetworkToPodNetworkPollerDeployment, metav1.CreateOptions{}); err != nil {
if _, err = createWithRetry(func() (*appsv1.Deployment, error) {
return pna.kubeClient.AppsV1().Deployments(pna.namespaceName).Create(context.Background(), hostNetworkToPodNetworkPollerDeployment, metav1.CreateOptions{})
}); err != nil {
return err
}
time.Sleep(2 * time.Second)
klog.Infof("Starting deployment: %s", hostNetworkToHostNetworkPollerDeployment.Name)
hostNetworkToHostNetworkPollerDeployment.Spec.Replicas = &numNodes
hostNetworkToHostNetworkPollerDeployment.Spec.Template.Spec.Containers[0].Image = openshiftTestsImagePullSpec
hostNetworkToHostNetworkPollerDeployment = disruptionlibrary.UpdateDeploymentENVs(hostNetworkToHostNetworkPollerDeployment, deploymentID, "")
if _, err = pna.kubeClient.AppsV1().Deployments(pna.namespaceName).Create(context.Background(), hostNetworkToHostNetworkPollerDeployment, metav1.CreateOptions{}); err != nil {
if _, err = createWithRetry(func() (*appsv1.Deployment, error) {
return pna.kubeClient.AppsV1().Deployments(pna.namespaceName).Create(context.Background(), hostNetworkToHostNetworkPollerDeployment, metav1.CreateOptions{})
}); err != nil {
return err
}
time.Sleep(2 * time.Second)
Expand All @@ -163,10 +253,14 @@ func (pna *podNetworkAvalibility) PrepareCollection(ctx context.Context, adminRE
originalAgnhost := k8simage.GetOriginalImageConfigs()[k8simage.Agnhost]
podNetworkTargetDeployment.Spec.Replicas = &numNodes
podNetworkTargetDeployment.Spec.Template.Spec.Containers[0].Image = image.LocationFor(originalAgnhost.GetE2EImage())
if _, err := pna.kubeClient.AppsV1().Deployments(pna.namespaceName).Create(context.Background(), podNetworkTargetDeployment, metav1.CreateOptions{}); err != nil {
if _, err := createWithRetry(func() (*appsv1.Deployment, error) {
return pna.kubeClient.AppsV1().Deployments(pna.namespaceName).Create(context.Background(), podNetworkTargetDeployment, metav1.CreateOptions{})
}); err != nil {
return err
}
service, err := pna.kubeClient.CoreV1().Services(pna.namespaceName).Create(context.Background(), podNetworkTargetService, metav1.CreateOptions{})
service, err := createWithRetry(func() (*corev1.Service, error) {
return pna.kubeClient.CoreV1().Services(pna.namespaceName).Create(context.Background(), podNetworkTargetService, metav1.CreateOptions{})
})
if err != nil {
return err
}
Expand All @@ -175,10 +269,14 @@ func (pna *podNetworkAvalibility) PrepareCollection(ctx context.Context, adminRE
klog.Infof("Starting deployment: %s", hostNetworkTargetDeployment.Name)
hostNetworkTargetDeployment.Spec.Replicas = &numNodes
hostNetworkTargetDeployment.Spec.Template.Spec.Containers[0].Image = openshiftTestsImagePullSpec
if _, err := pna.kubeClient.AppsV1().Deployments(pna.namespaceName).Create(context.Background(), hostNetworkTargetDeployment, metav1.CreateOptions{}); err != nil {
if _, err := createWithRetry(func() (*appsv1.Deployment, error) {
return pna.kubeClient.AppsV1().Deployments(pna.namespaceName).Create(context.Background(), hostNetworkTargetDeployment, metav1.CreateOptions{})
}); err != nil {
return err
}
if _, err := pna.kubeClient.CoreV1().Services(pna.namespaceName).Create(context.Background(), hostNetworkTargetService, metav1.CreateOptions{}); err != nil {
if _, err := createWithRetry(func() (*corev1.Service, error) {
return pna.kubeClient.CoreV1().Services(pna.namespaceName).Create(context.Background(), hostNetworkTargetService, metav1.CreateOptions{})
}); err != nil {
return err
}

Expand All @@ -194,7 +292,9 @@ func (pna *podNetworkAvalibility) PrepareCollection(ctx context.Context, adminRE
deployment.Spec.Replicas = &numNodes
deployment.Spec.Template.Spec.Containers[0].Image = openshiftTestsImagePullSpec
deployment = disruptionlibrary.UpdateDeploymentENVs(deployment, deploymentID, service.Spec.ClusterIP)
if _, err = pna.kubeClient.AppsV1().Deployments(pna.namespaceName).Create(context.Background(), deployment, metav1.CreateOptions{}); err != nil {
if _, err = createWithRetry(func() (*appsv1.Deployment, error) {
return pna.kubeClient.AppsV1().Deployments(pna.namespaceName).Create(context.Background(), deployment, metav1.CreateOptions{})
}); err != nil {
return err
}
}
Expand Down