From fa8362ab3acc2c648a92b60f85812c554781ed96 Mon Sep 17 00:00:00 2001 From: Monis Khan Date: Thu, 1 Oct 2020 18:01:18 -0400 Subject: [PATCH] Explicitly confirm informers have synced with timeout Signed-off-by: Monis Khan --- internal/apiserver/apiserver.go | 5 +- .../controllermanager/prepare_controllers.go | 64 ++++++++++++++++--- internal/server/server.go | 2 +- 3 files changed, 59 insertions(+), 12 deletions(-) diff --git a/internal/apiserver/apiserver.go b/internal/apiserver/apiserver.go index 98d2c7bd..a393b442 100644 --- a/internal/apiserver/apiserver.go +++ b/internal/apiserver/apiserver.go @@ -55,7 +55,7 @@ type Config struct { type ExtraConfig struct { Authenticator credentialrequest.TokenCredentialRequestAuthenticator Issuer credentialrequest.CertIssuer - StartControllersPostStartHook func(ctx context.Context) + StartControllersPostStartHook func(ctx context.Context) error } type PinnipedServer struct { @@ -118,9 +118,8 @@ func (c completedConfig) New() (*PinnipedServer, error) { <-postStartContext.StopCh cancel() }() - c.ExtraConfig.StartControllersPostStartHook(ctx) - return nil + return c.ExtraConfig.StartControllersPostStartHook(ctx) }, ) diff --git a/internal/controllermanager/prepare_controllers.go b/internal/controllermanager/prepare_controllers.go index 88296f3d..f9605c92 100644 --- a/internal/controllermanager/prepare_controllers.go +++ b/internal/controllermanager/prepare_controllers.go @@ -8,10 +8,13 @@ package controllermanager import ( "context" "fmt" + "reflect" + "sort" "time" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/clock" + "k8s.io/apimachinery/pkg/util/errors" k8sinformers "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" @@ -76,7 +79,7 @@ type Config struct { // Prepare the controllers and their informers and return a function that will start them when called. //nolint:funlen // Eh, fair, it is a really long function...but it is wiring the world...so... -func PrepareControllers(c *Config) (func(ctx context.Context), error) { +func PrepareControllers(c *Config) (func(ctx context.Context) error, error) { // Create k8s clients. kubeConfig, err := createConfig() if err != nil { @@ -240,9 +243,14 @@ func PrepareControllers(c *Config) (func(ctx context.Context), error) { ) // Return a function which starts the informers and controllers. - return func(ctx context.Context) { - informers.startAndWaitForSync(ctx) + return func(ctx context.Context) error { + if err := informers.startAndWaitForSync(ctx); err != nil { + return err + } + go controllerManager.Start(ctx) + + return nil }, nil } @@ -328,16 +336,56 @@ func createInformers( } } -func (i *informers) startAndWaitForSync(ctx context.Context) { +func (i *informers) startAndWaitForSync(ctx context.Context) error { i.kubePublicNamespaceK8s.Start(ctx.Done()) i.kubeSystemNamespaceK8s.Start(ctx.Done()) i.installationNamespaceK8s.Start(ctx.Done()) i.installationNamespacePinniped.Start(ctx.Done()) - i.kubePublicNamespaceK8s.WaitForCacheSync(ctx.Done()) - i.kubeSystemNamespaceK8s.WaitForCacheSync(ctx.Done()) - i.installationNamespaceK8s.WaitForCacheSync(ctx.Done()) - i.installationNamespacePinniped.WaitForCacheSync(ctx.Done()) + return waitForSync( + i.kubePublicNamespaceK8s, + i.kubeSystemNamespaceK8s, + i.installationNamespaceK8s, + i.installationNamespacePinniped, + ) +} + +type cacheSyncWaiter interface { + WaitForCacheSync(stopCh <-chan struct{}) map[reflect.Type]bool +} + +func waitForSync(cacheSyncWaiters ...cacheSyncWaiter) error { + // prevent us from blocking forever due to a broken informer + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Minute) + defer cancel() + + var errs []error + + for _, waiter := range cacheSyncWaiters { + informerTypeStarted := waiter.WaitForCacheSync(ctx.Done()) + + if len(informerTypeStarted) == 0 { + //nolint: goerr113 + errs = append(errs, fmt.Errorf("no informers synced for %T", waiter)) + continue + } + + var notStarted []string + for informerType, started := range informerTypeStarted { + if started { + continue + } + notStarted = append(notStarted, informerType.String()) + } + + if len(notStarted) > 0 { + sort.Strings(notStarted) + //nolint: goerr113 + errs = append(errs, fmt.Errorf("%d informers from %T failed to sync: %v", len(notStarted), waiter, notStarted)) + } + } + + return errors.NewAggregate(errs) } // Returns a copy of the input config with the ContentConfig set to use protobuf. diff --git a/internal/server/server.go b/internal/server/server.go index 63456a79..0ed6132d 100644 --- a/internal/server/server.go +++ b/internal/server/server.go @@ -163,7 +163,7 @@ func getAggregatedAPIServerConfig( dynamicCertProvider dynamiccert.Provider, authenticator credentialrequest.TokenCredentialRequestAuthenticator, issuer credentialrequest.CertIssuer, - startControllersPostStartHook func(context.Context), + startControllersPostStartHook func(context.Context) error, ) (*apiserver.Config, error) { recommendedOptions := genericoptions.NewRecommendedOptions( defaultEtcdPathPrefix,