diff --git a/internal/concierge/apiserver/apiserver.go b/internal/concierge/apiserver/apiserver.go index f1bce95b..bc08ad68 100644 --- a/internal/concierge/apiserver/apiserver.go +++ b/internal/concierge/apiserver/apiserver.go @@ -6,6 +6,7 @@ package apiserver import ( "context" "fmt" + "sync" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" @@ -15,6 +16,7 @@ import ( genericapiserver "k8s.io/apiserver/pkg/server" "k8s.io/client-go/pkg/version" + "go.pinniped.dev/internal/controllerinit" "go.pinniped.dev/internal/issuer" "go.pinniped.dev/internal/plog" "go.pinniped.dev/internal/registry/credentialrequest" @@ -29,7 +31,7 @@ type Config struct { type ExtraConfig struct { Authenticator credentialrequest.TokenCredentialRequestAuthenticator Issuer issuer.ClientCertIssuer - StartControllersPostStartHook func(ctx context.Context) + BuildControllersPostStartHook controllerinit.RunnerBuilder Scheme *runtime.Scheme NegotiatedSerializer runtime.NegotiatedSerializer LoginConciergeGroupVersion schema.GroupVersion @@ -105,16 +107,39 @@ func (c completedConfig) New() (*PinnipedServer, error) { return nil, fmt.Errorf("could not install API groups: %w", err) } + shutdown := &sync.WaitGroup{} s.GenericAPIServer.AddPostStartHookOrDie("start-controllers", func(postStartContext genericapiserver.PostStartHookContext) error { plog.Debug("start-controllers post start hook starting") ctx, cancel := context.WithCancel(context.Background()) go func() { + defer cancel() + <-postStartContext.StopCh - cancel() }() - c.ExtraConfig.StartControllersPostStartHook(ctx) + + runControllers, err := c.ExtraConfig.BuildControllersPostStartHook(ctx) + if err != nil { + return fmt.Errorf("cannot create run controller func: %w", err) + } + + shutdown.Add(1) + go func() { + defer shutdown.Done() + + runControllers(ctx) + }() + + return nil + }, + ) + s.GenericAPIServer.AddPreShutdownHookOrDie("stop-controllers", + func() error { + plog.Debug("stop-controllers pre shutdown hook starting") + defer plog.Debug("stop-controllers pre shutdown hook completed") + + shutdown.Wait() return nil }, diff --git a/internal/concierge/server/server.go b/internal/concierge/server/server.go index b0ff941c..71e583cc 100644 --- a/internal/concierge/server/server.go +++ b/internal/concierge/server/server.go @@ -27,6 +27,7 @@ import ( conciergescheme "go.pinniped.dev/internal/concierge/scheme" "go.pinniped.dev/internal/config/concierge" "go.pinniped.dev/internal/controller/authenticator/authncache" + "go.pinniped.dev/internal/controllerinit" "go.pinniped.dev/internal/controllermanager" "go.pinniped.dev/internal/downward" "go.pinniped.dev/internal/dynamiccert" @@ -135,7 +136,7 @@ func (a *App) runServer(ctx context.Context) error { // Prepare to start the controllers, but defer actually starting them until the // post start hook of the aggregated API server. - startControllersFunc, err := controllermanager.PrepareControllers( + buildControllers, err := controllermanager.PrepareControllers( &controllermanager.Config{ ServerInstallationInfo: podInfo, APIGroupSuffix: *cfg.APIGroupSuffix, @@ -165,7 +166,7 @@ func (a *App) runServer(ctx context.Context) error { dynamicServingCertProvider, authenticators, certIssuer, - startControllersFunc, + buildControllers, *cfg.APIGroupSuffix, scheme, loginGV, @@ -190,7 +191,7 @@ func getAggregatedAPIServerConfig( dynamicCertProvider dynamiccert.Private, authenticator credentialrequest.TokenCredentialRequestAuthenticator, issuer issuer.ClientCertIssuer, - startControllersPostStartHook func(context.Context), + buildControllers controllerinit.RunnerBuilder, apiGroupSuffix string, scheme *runtime.Scheme, loginConciergeGroupVersion, identityConciergeGroupVersion schema.GroupVersion, @@ -227,7 +228,7 @@ func getAggregatedAPIServerConfig( ExtraConfig: apiserver.ExtraConfig{ Authenticator: authenticator, Issuer: issuer, - StartControllersPostStartHook: startControllersPostStartHook, + BuildControllersPostStartHook: buildControllers, Scheme: scheme, NegotiatedSerializer: codecs, LoginConciergeGroupVersion: loginConciergeGroupVersion, @@ -237,7 +238,7 @@ func getAggregatedAPIServerConfig( return apiServerConfig, nil } -func Main() { +func main() error { // return an error instead of klog.Fatal to allow defer statements to run logs.InitLogs() defer logs.FlushLogs() @@ -250,7 +251,11 @@ func Main() { ctx := genericapiserver.SetupSignalContext() - if err := New(ctx, os.Args[1:], os.Stdout, os.Stderr).Run(); err != nil { + return New(ctx, os.Args[1:], os.Stdout, os.Stderr).Run() +} + +func Main() { + if err := main(); err != nil { klog.Fatal(err) } } diff --git a/internal/controllerinit/controllerinit.go b/internal/controllerinit/controllerinit.go new file mode 100644 index 00000000..6663c1af --- /dev/null +++ b/internal/controllerinit/controllerinit.go @@ -0,0 +1,87 @@ +// Copyright 2021 the Pinniped contributors. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +package controllerinit + +import ( + "context" + "fmt" + "reflect" + "sort" + "time" +) + +// Runner is something that can be run such as a series of controllers. Blocks until context is canceled. +type Runner func(context.Context) + +// RunnerWrapper takes a Runner and wraps its execution with other logic. Blocks until context is canceled. +// RunnerWrapper is responsible for the lifetime of the passed in Runner. +type RunnerWrapper func(context.Context, Runner) + +// RunnerBuilder is a function that can be used to construct a Runner. +// It is expected to be called in the main go routine since the construction can fail. +type RunnerBuilder func(context.Context) (Runner, error) + +// informer is the subset of SharedInformerFactory needed for starting an informer cache and waiting for it to sync. +type informer interface { + Start(stopCh <-chan struct{}) + WaitForCacheSync(stopCh <-chan struct{}) map[reflect.Type]bool +} + +// Prepare returns RunnerBuilder that, when called: +// 1. Starts all provided informers and waits for them sync (and fails if they hang) +// 2. Returns a Runner that combines the Runner and RunnerWrapper passed into Prepare +func Prepare(controllers Runner, controllersWrapper RunnerWrapper, informers ...informer) RunnerBuilder { + return func(ctx context.Context) (Runner, error) { + for _, informer := range informers { + informer := informer + + informer.Start(ctx.Done()) + + // prevent us from blocking forever due to a broken informer + waitCtx, waitCancel := context.WithTimeout(ctx, time.Minute) + defer waitCancel() + + // wait until the caches are synced before returning + status := informer.WaitForCacheSync(waitCtx.Done()) + + if unsynced := unsyncedInformers(status); len(unsynced) > 0 { + return nil, fmt.Errorf("failed to sync informers of %s: %v", anyToFullname(informer), unsynced) + } + } + + return func(controllerCtx context.Context) { + controllersWrapper(controllerCtx, controllers) + }, nil + } +} + +func unsyncedInformers(status map[reflect.Type]bool) []string { + if len(status) == 0 { + return []string{"all:empty"} + } + + var names []string + + for typ, synced := range status { + if !synced { + names = append(names, typeToFullname(typ)) + } + } + + sort.Strings(names) + + return names +} + +func anyToFullname(any interface{}) string { + typ := reflect.TypeOf(any) + return typeToFullname(typ) +} + +func typeToFullname(typ reflect.Type) string { + if typ.Kind() == reflect.Ptr { + typ = typ.Elem() + } + return typ.PkgPath() + "." + typ.Name() +} diff --git a/internal/controllermanager/prepare_controllers.go b/internal/controllermanager/prepare_controllers.go index 8c9e9808..1ba9cd7c 100644 --- a/internal/controllermanager/prepare_controllers.go +++ b/internal/controllermanager/prepare_controllers.go @@ -6,7 +6,6 @@ package controllermanager import ( - "context" "fmt" "time" @@ -27,6 +26,7 @@ import ( "go.pinniped.dev/internal/controller/authenticator/webhookcachefiller" "go.pinniped.dev/internal/controller/impersonatorconfig" "go.pinniped.dev/internal/controller/kubecertagent" + "go.pinniped.dev/internal/controllerinit" "go.pinniped.dev/internal/controllerlib" "go.pinniped.dev/internal/deploymentref" "go.pinniped.dev/internal/downward" @@ -95,7 +95,7 @@ type Config struct { // Prepare the controllers and their informers and return a function that will start them when called. //nolint:funlen // Eh, fair, it is a really long function...but it is wiring the world...so... -func PrepareControllers(c *Config) (func(ctx context.Context), error) { +func PrepareControllers(c *Config) (controllerinit.RunnerBuilder, error) { loginConciergeGroupData, identityConciergeGroupData := groupsuffix.ConciergeAggregatedGroups(c.APIGroupSuffix) dref, deployment, err := deploymentref.New(c.ServerInstallationInfo) @@ -303,11 +303,12 @@ func PrepareControllers(c *Config) (func(ctx context.Context), error) { singletonWorker, ) - // Return a function which starts the informers and controllers. - return func(ctx context.Context) { - informers.startAndWaitForSync(ctx) - go leaderElector(ctx, controllerManager.Start) - }, nil + return controllerinit.Prepare(controllerManager.Start, leaderElector, + informers.kubePublicNamespaceK8s, + informers.kubeSystemNamespaceK8s, + informers.installationNamespaceK8s, + informers.pinniped, + ), nil } type informers struct { @@ -345,15 +346,3 @@ func createInformers( ), } } - -func (i *informers) startAndWaitForSync(ctx context.Context) { - i.kubePublicNamespaceK8s.Start(ctx.Done()) - i.kubeSystemNamespaceK8s.Start(ctx.Done()) - i.installationNamespaceK8s.Start(ctx.Done()) - i.pinniped.Start(ctx.Done()) - - i.kubePublicNamespaceK8s.WaitForCacheSync(ctx.Done()) - i.kubeSystemNamespaceK8s.WaitForCacheSync(ctx.Done()) - i.installationNamespaceK8s.WaitForCacheSync(ctx.Done()) - i.pinniped.WaitForCacheSync(ctx.Done()) -} diff --git a/internal/leaderelection/leaderelection.go b/internal/leaderelection/leaderelection.go index babd5450..1d083065 100644 --- a/internal/leaderelection/leaderelection.go +++ b/internal/leaderelection/leaderelection.go @@ -16,6 +16,7 @@ import ( "k8s.io/client-go/tools/leaderelection/resourcelock" "go.pinniped.dev/internal/constable" + "go.pinniped.dev/internal/controllerinit" "go.pinniped.dev/internal/downward" "go.pinniped.dev/internal/kubeclient" "go.pinniped.dev/internal/plog" @@ -36,7 +37,7 @@ const ErrNotLeader constable.Error = "write attempt rejected as client is not le // logic and will coordinate lease release with the input controller starter function. func New(podInfo *downward.PodInfo, deployment *appsv1.Deployment, opts ...kubeclient.Option) ( *kubeclient.Client, - func(context.Context, func(context.Context)), + controllerinit.RunnerWrapper, error, ) { internalClient, err := kubeclient.New(opts...) @@ -89,7 +90,10 @@ func New(podInfo *downward.PodInfo, deployment *appsv1.Deployment, opts ...kubec return nil, nil, fmt.Errorf("could not create leader election client: %w", err) } - controllersWithLeaderElector := func(ctx context.Context, controllers func(context.Context)) { + controllersWithLeaderElector := func(ctx context.Context, controllers controllerinit.Runner) { + plog.Debug("leader election loop start", "identity", identity) + defer plog.Debug("leader election loop shutdown", "identity", identity) + leaderElectorCtx, leaderElectorCancel := context.WithCancel(context.Background()) // purposefully detached context go func() { diff --git a/internal/supervisor/server/server.go b/internal/supervisor/server/server.go index 329dfb83..805c4d3e 100644 --- a/internal/supervisor/server/server.go +++ b/internal/supervisor/server/server.go @@ -14,6 +14,7 @@ import ( "os" "os/signal" "strings" + "sync" "syscall" "time" @@ -38,6 +39,7 @@ import ( "go.pinniped.dev/internal/controller/supervisorconfig/ldapupstreamwatcher" "go.pinniped.dev/internal/controller/supervisorconfig/oidcupstreamwatcher" "go.pinniped.dev/internal/controller/supervisorstorage" + "go.pinniped.dev/internal/controllerinit" "go.pinniped.dev/internal/controllerlib" "go.pinniped.dev/internal/deploymentref" "go.pinniped.dev/internal/downward" @@ -56,36 +58,51 @@ const ( defaultResyncInterval = 3 * time.Minute ) -func start(ctx context.Context, l net.Listener, handler http.Handler) { +func startServer(ctx context.Context, shutdown *sync.WaitGroup, l net.Listener, handler http.Handler) { server := http.Server{Handler: handler} - errCh := make(chan error) + shutdown.Add(1) go func() { - errCh <- server.Serve(l) + defer shutdown.Done() + + err := server.Serve(l) + plog.Debug("server exited", "err", err) }() + shutdown.Add(1) go func() { - select { - case err := <-errCh: - plog.Debug("server exited", "err", err) - case <-ctx.Done(): - plog.Debug("server context cancelled", "err", ctx.Err()) - if err := server.Shutdown(context.Background()); err != nil { - plog.Debug("server shutdown failed", "err", err) - } + defer shutdown.Done() + + <-ctx.Done() + plog.Debug("server context cancelled", "err", ctx.Err()) + + // allow up to a minute grace period for active connections to return to idle + connectionsCtx, connectionsCancel := context.WithTimeout(context.Background(), time.Minute) + defer connectionsCancel() + + if err := server.Shutdown(connectionsCtx); err != nil { + plog.Debug("server shutdown failed", "err", err) } }() } -func waitForSignal() os.Signal { +func signalCtx() context.Context { signalCh := make(chan os.Signal, 1) signal.Notify(signalCh, os.Interrupt, syscall.SIGTERM) - return <-signalCh + + ctx, cancel := context.WithCancel(context.Background()) + go func() { + defer cancel() + + s := <-signalCh + plog.Debug("saw signal", "signal", s) + }() + + return ctx } //nolint:funlen -func startControllers( - ctx context.Context, +func prepareControllers( cfg *supervisor.Config, issuerManager *manager.Manager, dynamicJWKSProvider jwks.DynamicJWKSProvider, @@ -97,8 +114,8 @@ func startControllers( pinnipedClient pinnipedclientset.Interface, kubeInformers kubeinformers.SharedInformerFactory, pinnipedInformers pinnipedinformers.SharedInformerFactory, - leaderElector func(context.Context, func(context.Context)), -) { + leaderElector controllerinit.RunnerWrapper, +) controllerinit.RunnerBuilder { federationDomainInformer := pinnipedInformers.Config().V1alpha1().FederationDomains() secretInformer := kubeInformers.Core().V1().Secrets() @@ -267,21 +284,27 @@ func startControllers( ), singletonWorker) - kubeInformers.Start(ctx.Done()) - pinnipedInformers.Start(ctx.Done()) - - // Wait until the caches are synced before returning. - kubeInformers.WaitForCacheSync(ctx.Done()) - pinnipedInformers.WaitForCacheSync(ctx.Done()) - - go leaderElector(ctx, controllerManager.Start) + return controllerinit.Prepare(controllerManager.Start, leaderElector, kubeInformers, pinnipedInformers) } -func run(podInfo *downward.PodInfo, cfg *supervisor.Config) error { - serverInstallationNamespace := podInfo.Namespace +func startControllers(ctx context.Context, shutdown *sync.WaitGroup, buildControllers controllerinit.RunnerBuilder) error { + runControllers, err := buildControllers(ctx) + if err != nil { + return fmt.Errorf("cannot create run controller func: %w", err) + } - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() + shutdown.Add(1) + go func() { + defer shutdown.Done() + + runControllers(ctx) + }() + + return nil +} + +func runSupervisor(podInfo *downward.PodInfo, cfg *supervisor.Config) error { + serverInstallationNamespace := podInfo.Namespace dref, supervisorDeployment, err := deploymentref.New(podInfo) if err != nil { @@ -339,8 +362,7 @@ func run(podInfo *downward.PodInfo, cfg *supervisor.Config) error { clientWithoutLeaderElection.Kubernetes.CoreV1().Secrets(serverInstallationNamespace), // writes to kube storage are allowed for non-leaders ) - startControllers( - ctx, + buildControllersFunc := prepareControllers( cfg, oidProvidersManager, dynamicJWKSProvider, @@ -355,13 +377,20 @@ func run(podInfo *downward.PodInfo, cfg *supervisor.Config) error { leaderElector, ) + ctx := signalCtx() + shutdown := &sync.WaitGroup{} + + if err := startControllers(ctx, shutdown, buildControllersFunc); err != nil { + return err + } + //nolint: gosec // Intentionally binding to all network interfaces. httpListener, err := net.Listen("tcp", ":8080") if err != nil { return fmt.Errorf("cannot create listener: %w", err) } defer func() { _ = httpListener.Close() }() - start(ctx, httpListener, oidProvidersManager) + startServer(ctx, shutdown, httpListener, oidProvidersManager) //nolint: gosec // Intentionally binding to all network interfaces. httpsListener, err := tls.Listen("tcp", ":8443", &tls.Config{ @@ -384,20 +413,20 @@ func run(podInfo *downward.PodInfo, cfg *supervisor.Config) error { return fmt.Errorf("cannot create listener: %w", err) } defer func() { _ = httpsListener.Close() }() - start(ctx, httpsListener, oidProvidersManager) + startServer(ctx, shutdown, httpsListener, oidProvidersManager) plog.Debug("supervisor is ready", "httpAddress", httpListener.Addr().String(), "httpsAddress", httpsListener.Addr().String(), ) + defer plog.Debug("supervisor exiting") - gotSignal := waitForSignal() - plog.Debug("supervisor exiting", "signal", gotSignal) + shutdown.Wait() return nil } -func Main() { +func main() error { // return an error instead of klog.Fatal to allow defer statements to run logs.InitLogs() defer logs.FlushLogs() plog.RemoveKlogGlobalFlags() // move this whenever the below code gets refactored to use cobra @@ -408,16 +437,20 @@ func Main() { // Discover in which namespace we are installed. podInfo, err := downward.Load(os.Args[1]) if err != nil { - klog.Fatal(fmt.Errorf("could not read pod metadata: %w", err)) + return fmt.Errorf("could not read pod metadata: %w", err) } // Read the server config file. cfg, err := supervisor.FromPath(os.Args[2]) if err != nil { - klog.Fatal(fmt.Errorf("could not load config: %w", err)) + return fmt.Errorf("could not load config: %w", err) } - if err := run(podInfo, cfg); err != nil { + return runSupervisor(podInfo, cfg) +} + +func Main() { + if err := main(); err != nil { klog.Fatal(err) } } diff --git a/test/integration/controllerinit_test.go b/test/integration/controllerinit_test.go new file mode 100644 index 00000000..22e8ea40 --- /dev/null +++ b/test/integration/controllerinit_test.go @@ -0,0 +1,81 @@ +// Copyright 2021 the Pinniped contributors. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +package integration + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/require" + kubeinformers "k8s.io/client-go/informers" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" + + "go.pinniped.dev/internal/controllerinit" + "go.pinniped.dev/test/testlib" +) + +// this just makes some slow read requests which are safe to run in parallel with serial tests, see main_test.go. +func TestControllerInitPrepare_Parallel(t *testing.T) { + _ = testlib.IntegrationEnv(t) + + t.Run("with parent context that is never canceled", func(t *testing.T) { + t.Parallel() + + // the nil params should never be used in this case + buildControllers := controllerinit.Prepare(nil, nil, buildBrokenInformer(t)) + + start := time.Now() + runControllers, err := buildControllers(context.Background()) // we expect this to not block forever even with a context.Background() + delta := time.Since(start) + + require.EqualError(t, err, + "failed to sync informers of k8s.io/client-go/informers.sharedInformerFactory: "+ + "[k8s.io/api/core/v1.Namespace k8s.io/api/core/v1.Node]") + require.Nil(t, runControllers) + + require.InDelta(t, time.Minute, delta, float64(30*time.Second)) + }) + + t.Run("with parent context that is canceled early", func(t *testing.T) { + t.Parallel() + + // the nil params should never be used in this case + buildControllers := controllerinit.Prepare(nil, nil, buildBrokenInformer(t)) + + // we expect this to exit sooner because the parent context is shorter + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + t.Cleanup(cancel) + + start := time.Now() + runControllers, err := buildControllers(ctx) + delta := time.Since(start) + + require.EqualError(t, err, + "failed to sync informers of k8s.io/client-go/informers.sharedInformerFactory: "+ + "[k8s.io/api/core/v1.Namespace k8s.io/api/core/v1.Node]") + require.Nil(t, runControllers) + + require.InDelta(t, 10*time.Second, delta, float64(15*time.Second)) + }) +} + +func buildBrokenInformer(t *testing.T) kubeinformers.SharedInformerFactory { + t.Helper() + + config := testlib.NewClientConfig(t) + config = rest.CopyConfig(config) + config.Impersonate.UserName = "user-with-no-rbac" // so we can test that we correctly detect a cache sync failure + + client := kubernetes.NewForConfigOrDie(config) + + informers := kubeinformers.NewSharedInformerFactoryWithOptions(client, 0) + + // make sure some informers gets lazily loaded + _ = informers.Core().V1().Nodes().Informer() + _ = informers.Core().V1().Namespaces().Informer() + + return informers +}