Explicitly confirm informers have synced with timeout

Signed-off-by: Monis Khan <mok@vmware.com>
This commit is contained in:
Monis Khan 2020-10-01 18:01:18 -04:00
parent b69eb5e850
commit fa8362ab3a
No known key found for this signature in database
GPG Key ID: 52C90ADA01B269B8
3 changed files with 59 additions and 12 deletions

View File

@ -55,7 +55,7 @@ type Config struct {
type ExtraConfig struct { type ExtraConfig struct {
Authenticator credentialrequest.TokenCredentialRequestAuthenticator Authenticator credentialrequest.TokenCredentialRequestAuthenticator
Issuer credentialrequest.CertIssuer Issuer credentialrequest.CertIssuer
StartControllersPostStartHook func(ctx context.Context) StartControllersPostStartHook func(ctx context.Context) error
} }
type PinnipedServer struct { type PinnipedServer struct {
@ -118,9 +118,8 @@ func (c completedConfig) New() (*PinnipedServer, error) {
<-postStartContext.StopCh <-postStartContext.StopCh
cancel() cancel()
}() }()
c.ExtraConfig.StartControllersPostStartHook(ctx)
return nil return c.ExtraConfig.StartControllersPostStartHook(ctx)
}, },
) )

View File

@ -8,10 +8,13 @@ package controllermanager
import ( import (
"context" "context"
"fmt" "fmt"
"reflect"
"sort"
"time" "time"
"k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/clock" "k8s.io/apimachinery/pkg/util/clock"
"k8s.io/apimachinery/pkg/util/errors"
k8sinformers "k8s.io/client-go/informers" k8sinformers "k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest" "k8s.io/client-go/rest"
@ -76,7 +79,7 @@ type Config struct {
// Prepare the controllers and their informers and return a function that will start them when called. // Prepare the controllers and their informers and return a function that will start them when called.
//nolint:funlen // Eh, fair, it is a really long function...but it is wiring the world...so... //nolint:funlen // Eh, fair, it is a really long function...but it is wiring the world...so...
func PrepareControllers(c *Config) (func(ctx context.Context), error) { func PrepareControllers(c *Config) (func(ctx context.Context) error, error) {
// Create k8s clients. // Create k8s clients.
kubeConfig, err := createConfig() kubeConfig, err := createConfig()
if err != nil { if err != nil {
@ -240,9 +243,14 @@ func PrepareControllers(c *Config) (func(ctx context.Context), error) {
) )
// Return a function which starts the informers and controllers. // Return a function which starts the informers and controllers.
return func(ctx context.Context) { return func(ctx context.Context) error {
informers.startAndWaitForSync(ctx) if err := informers.startAndWaitForSync(ctx); err != nil {
return err
}
go controllerManager.Start(ctx) go controllerManager.Start(ctx)
return nil
}, nil }, nil
} }
@ -328,16 +336,56 @@ func createInformers(
} }
} }
func (i *informers) startAndWaitForSync(ctx context.Context) { func (i *informers) startAndWaitForSync(ctx context.Context) error {
i.kubePublicNamespaceK8s.Start(ctx.Done()) i.kubePublicNamespaceK8s.Start(ctx.Done())
i.kubeSystemNamespaceK8s.Start(ctx.Done()) i.kubeSystemNamespaceK8s.Start(ctx.Done())
i.installationNamespaceK8s.Start(ctx.Done()) i.installationNamespaceK8s.Start(ctx.Done())
i.installationNamespacePinniped.Start(ctx.Done()) i.installationNamespacePinniped.Start(ctx.Done())
i.kubePublicNamespaceK8s.WaitForCacheSync(ctx.Done()) return waitForSync(
i.kubeSystemNamespaceK8s.WaitForCacheSync(ctx.Done()) i.kubePublicNamespaceK8s,
i.installationNamespaceK8s.WaitForCacheSync(ctx.Done()) i.kubeSystemNamespaceK8s,
i.installationNamespacePinniped.WaitForCacheSync(ctx.Done()) i.installationNamespaceK8s,
i.installationNamespacePinniped,
)
}
type cacheSyncWaiter interface {
WaitForCacheSync(stopCh <-chan struct{}) map[reflect.Type]bool
}
func waitForSync(cacheSyncWaiters ...cacheSyncWaiter) error {
// prevent us from blocking forever due to a broken informer
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Minute)
defer cancel()
var errs []error
for _, waiter := range cacheSyncWaiters {
informerTypeStarted := waiter.WaitForCacheSync(ctx.Done())
if len(informerTypeStarted) == 0 {
//nolint: goerr113
errs = append(errs, fmt.Errorf("no informers synced for %T", waiter))
continue
}
var notStarted []string
for informerType, started := range informerTypeStarted {
if started {
continue
}
notStarted = append(notStarted, informerType.String())
}
if len(notStarted) > 0 {
sort.Strings(notStarted)
//nolint: goerr113
errs = append(errs, fmt.Errorf("%d informers from %T failed to sync: %v", len(notStarted), waiter, notStarted))
}
}
return errors.NewAggregate(errs)
} }
// Returns a copy of the input config with the ContentConfig set to use protobuf. // Returns a copy of the input config with the ContentConfig set to use protobuf.

View File

@ -163,7 +163,7 @@ func getAggregatedAPIServerConfig(
dynamicCertProvider dynamiccert.Provider, dynamicCertProvider dynamiccert.Provider,
authenticator credentialrequest.TokenCredentialRequestAuthenticator, authenticator credentialrequest.TokenCredentialRequestAuthenticator,
issuer credentialrequest.CertIssuer, issuer credentialrequest.CertIssuer,
startControllersPostStartHook func(context.Context), startControllersPostStartHook func(context.Context) error,
) (*apiserver.Config, error) { ) (*apiserver.Config, error) {
recommendedOptions := genericoptions.NewRecommendedOptions( recommendedOptions := genericoptions.NewRecommendedOptions(
defaultEtcdPathPrefix, defaultEtcdPathPrefix,