Merge pull request #829 from enj/enj/i/wait_shutdown

Ensure concierge and supervisor gracefully exit
This commit is contained in:
Ryan Richard 2021-08-31 11:30:35 -07:00 committed by GitHub
commit b19af2e135
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 293 additions and 69 deletions

View File

@ -6,6 +6,7 @@ package apiserver
import (
"context"
"fmt"
"sync"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
@ -15,6 +16,7 @@ import (
genericapiserver "k8s.io/apiserver/pkg/server"
"k8s.io/client-go/pkg/version"
"go.pinniped.dev/internal/controllerinit"
"go.pinniped.dev/internal/issuer"
"go.pinniped.dev/internal/plog"
"go.pinniped.dev/internal/registry/credentialrequest"
@ -29,7 +31,7 @@ type Config struct {
type ExtraConfig struct {
Authenticator credentialrequest.TokenCredentialRequestAuthenticator
Issuer issuer.ClientCertIssuer
StartControllersPostStartHook func(ctx context.Context)
BuildControllersPostStartHook controllerinit.RunnerBuilder
Scheme *runtime.Scheme
NegotiatedSerializer runtime.NegotiatedSerializer
LoginConciergeGroupVersion schema.GroupVersion
@ -105,16 +107,39 @@ func (c completedConfig) New() (*PinnipedServer, error) {
return nil, fmt.Errorf("could not install API groups: %w", err)
}
shutdown := &sync.WaitGroup{}
s.GenericAPIServer.AddPostStartHookOrDie("start-controllers",
func(postStartContext genericapiserver.PostStartHookContext) error {
plog.Debug("start-controllers post start hook starting")
ctx, cancel := context.WithCancel(context.Background())
go func() {
defer cancel()
<-postStartContext.StopCh
cancel()
}()
c.ExtraConfig.StartControllersPostStartHook(ctx)
runControllers, err := c.ExtraConfig.BuildControllersPostStartHook(ctx)
if err != nil {
return fmt.Errorf("cannot create run controller func: %w", err)
}
shutdown.Add(1)
go func() {
defer shutdown.Done()
runControllers(ctx)
}()
return nil
},
)
s.GenericAPIServer.AddPreShutdownHookOrDie("stop-controllers",
func() error {
plog.Debug("stop-controllers pre shutdown hook starting")
defer plog.Debug("stop-controllers pre shutdown hook completed")
shutdown.Wait()
return nil
},

View File

@ -27,6 +27,7 @@ import (
conciergescheme "go.pinniped.dev/internal/concierge/scheme"
"go.pinniped.dev/internal/config/concierge"
"go.pinniped.dev/internal/controller/authenticator/authncache"
"go.pinniped.dev/internal/controllerinit"
"go.pinniped.dev/internal/controllermanager"
"go.pinniped.dev/internal/downward"
"go.pinniped.dev/internal/dynamiccert"
@ -135,7 +136,7 @@ func (a *App) runServer(ctx context.Context) error {
// Prepare to start the controllers, but defer actually starting them until the
// post start hook of the aggregated API server.
startControllersFunc, err := controllermanager.PrepareControllers(
buildControllers, err := controllermanager.PrepareControllers(
&controllermanager.Config{
ServerInstallationInfo: podInfo,
APIGroupSuffix: *cfg.APIGroupSuffix,
@ -165,7 +166,7 @@ func (a *App) runServer(ctx context.Context) error {
dynamicServingCertProvider,
authenticators,
certIssuer,
startControllersFunc,
buildControllers,
*cfg.APIGroupSuffix,
scheme,
loginGV,
@ -190,7 +191,7 @@ func getAggregatedAPIServerConfig(
dynamicCertProvider dynamiccert.Private,
authenticator credentialrequest.TokenCredentialRequestAuthenticator,
issuer issuer.ClientCertIssuer,
startControllersPostStartHook func(context.Context),
buildControllers controllerinit.RunnerBuilder,
apiGroupSuffix string,
scheme *runtime.Scheme,
loginConciergeGroupVersion, identityConciergeGroupVersion schema.GroupVersion,
@ -227,7 +228,7 @@ func getAggregatedAPIServerConfig(
ExtraConfig: apiserver.ExtraConfig{
Authenticator: authenticator,
Issuer: issuer,
StartControllersPostStartHook: startControllersPostStartHook,
BuildControllersPostStartHook: buildControllers,
Scheme: scheme,
NegotiatedSerializer: codecs,
LoginConciergeGroupVersion: loginConciergeGroupVersion,
@ -237,7 +238,7 @@ func getAggregatedAPIServerConfig(
return apiServerConfig, nil
}
func Main() {
func main() error { // return an error instead of klog.Fatal to allow defer statements to run
logs.InitLogs()
defer logs.FlushLogs()
@ -250,7 +251,11 @@ func Main() {
ctx := genericapiserver.SetupSignalContext()
if err := New(ctx, os.Args[1:], os.Stdout, os.Stderr).Run(); err != nil {
return New(ctx, os.Args[1:], os.Stdout, os.Stderr).Run()
}
func Main() {
if err := main(); err != nil {
klog.Fatal(err)
}
}

View File

@ -0,0 +1,87 @@
// Copyright 2021 the Pinniped contributors. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
package controllerinit
import (
"context"
"fmt"
"reflect"
"sort"
"time"
)
// Runner is something that can be run such as a series of controllers. Blocks until context is canceled.
type Runner func(context.Context)
// RunnerWrapper takes a Runner and wraps its execution with other logic. Blocks until context is canceled.
// RunnerWrapper is responsible for the lifetime of the passed in Runner.
type RunnerWrapper func(context.Context, Runner)
// RunnerBuilder is a function that can be used to construct a Runner.
// It is expected to be called in the main go routine since the construction can fail.
type RunnerBuilder func(context.Context) (Runner, error)
// informer is the subset of SharedInformerFactory needed for starting an informer cache and waiting for it to sync.
type informer interface {
Start(stopCh <-chan struct{})
WaitForCacheSync(stopCh <-chan struct{}) map[reflect.Type]bool
}
// Prepare returns RunnerBuilder that, when called:
// 1. Starts all provided informers and waits for them sync (and fails if they hang)
// 2. Returns a Runner that combines the Runner and RunnerWrapper passed into Prepare
func Prepare(controllers Runner, controllersWrapper RunnerWrapper, informers ...informer) RunnerBuilder {
return func(ctx context.Context) (Runner, error) {
for _, informer := range informers {
informer := informer
informer.Start(ctx.Done())
// prevent us from blocking forever due to a broken informer
waitCtx, waitCancel := context.WithTimeout(ctx, time.Minute)
defer waitCancel()
// wait until the caches are synced before returning
status := informer.WaitForCacheSync(waitCtx.Done())
if unsynced := unsyncedInformers(status); len(unsynced) > 0 {
return nil, fmt.Errorf("failed to sync informers of %s: %v", anyToFullname(informer), unsynced)
}
}
return func(controllerCtx context.Context) {
controllersWrapper(controllerCtx, controllers)
}, nil
}
}
func unsyncedInformers(status map[reflect.Type]bool) []string {
if len(status) == 0 {
return []string{"all:empty"}
}
var names []string
for typ, synced := range status {
if !synced {
names = append(names, typeToFullname(typ))
}
}
sort.Strings(names)
return names
}
func anyToFullname(any interface{}) string {
typ := reflect.TypeOf(any)
return typeToFullname(typ)
}
func typeToFullname(typ reflect.Type) string {
if typ.Kind() == reflect.Ptr {
typ = typ.Elem()
}
return typ.PkgPath() + "." + typ.Name()
}

View File

@ -6,7 +6,6 @@
package controllermanager
import (
"context"
"fmt"
"time"
@ -27,6 +26,7 @@ import (
"go.pinniped.dev/internal/controller/authenticator/webhookcachefiller"
"go.pinniped.dev/internal/controller/impersonatorconfig"
"go.pinniped.dev/internal/controller/kubecertagent"
"go.pinniped.dev/internal/controllerinit"
"go.pinniped.dev/internal/controllerlib"
"go.pinniped.dev/internal/deploymentref"
"go.pinniped.dev/internal/downward"
@ -95,7 +95,7 @@ type Config struct {
// Prepare the controllers and their informers and return a function that will start them when called.
//nolint:funlen // Eh, fair, it is a really long function...but it is wiring the world...so...
func PrepareControllers(c *Config) (func(ctx context.Context), error) {
func PrepareControllers(c *Config) (controllerinit.RunnerBuilder, error) {
loginConciergeGroupData, identityConciergeGroupData := groupsuffix.ConciergeAggregatedGroups(c.APIGroupSuffix)
dref, deployment, err := deploymentref.New(c.ServerInstallationInfo)
@ -303,11 +303,12 @@ func PrepareControllers(c *Config) (func(ctx context.Context), error) {
singletonWorker,
)
// Return a function which starts the informers and controllers.
return func(ctx context.Context) {
informers.startAndWaitForSync(ctx)
go leaderElector(ctx, controllerManager.Start)
}, nil
return controllerinit.Prepare(controllerManager.Start, leaderElector,
informers.kubePublicNamespaceK8s,
informers.kubeSystemNamespaceK8s,
informers.installationNamespaceK8s,
informers.pinniped,
), nil
}
type informers struct {
@ -345,15 +346,3 @@ func createInformers(
),
}
}
func (i *informers) startAndWaitForSync(ctx context.Context) {
i.kubePublicNamespaceK8s.Start(ctx.Done())
i.kubeSystemNamespaceK8s.Start(ctx.Done())
i.installationNamespaceK8s.Start(ctx.Done())
i.pinniped.Start(ctx.Done())
i.kubePublicNamespaceK8s.WaitForCacheSync(ctx.Done())
i.kubeSystemNamespaceK8s.WaitForCacheSync(ctx.Done())
i.installationNamespaceK8s.WaitForCacheSync(ctx.Done())
i.pinniped.WaitForCacheSync(ctx.Done())
}

View File

@ -16,6 +16,7 @@ import (
"k8s.io/client-go/tools/leaderelection/resourcelock"
"go.pinniped.dev/internal/constable"
"go.pinniped.dev/internal/controllerinit"
"go.pinniped.dev/internal/downward"
"go.pinniped.dev/internal/kubeclient"
"go.pinniped.dev/internal/plog"
@ -36,7 +37,7 @@ const ErrNotLeader constable.Error = "write attempt rejected as client is not le
// logic and will coordinate lease release with the input controller starter function.
func New(podInfo *downward.PodInfo, deployment *appsv1.Deployment, opts ...kubeclient.Option) (
*kubeclient.Client,
func(context.Context, func(context.Context)),
controllerinit.RunnerWrapper,
error,
) {
internalClient, err := kubeclient.New(opts...)
@ -89,7 +90,10 @@ func New(podInfo *downward.PodInfo, deployment *appsv1.Deployment, opts ...kubec
return nil, nil, fmt.Errorf("could not create leader election client: %w", err)
}
controllersWithLeaderElector := func(ctx context.Context, controllers func(context.Context)) {
controllersWithLeaderElector := func(ctx context.Context, controllers controllerinit.Runner) {
plog.Debug("leader election loop start", "identity", identity)
defer plog.Debug("leader election loop shutdown", "identity", identity)
leaderElectorCtx, leaderElectorCancel := context.WithCancel(context.Background()) // purposefully detached context
go func() {

View File

@ -14,6 +14,7 @@ import (
"os"
"os/signal"
"strings"
"sync"
"syscall"
"time"
@ -38,6 +39,7 @@ import (
"go.pinniped.dev/internal/controller/supervisorconfig/ldapupstreamwatcher"
"go.pinniped.dev/internal/controller/supervisorconfig/oidcupstreamwatcher"
"go.pinniped.dev/internal/controller/supervisorstorage"
"go.pinniped.dev/internal/controllerinit"
"go.pinniped.dev/internal/controllerlib"
"go.pinniped.dev/internal/deploymentref"
"go.pinniped.dev/internal/downward"
@ -56,36 +58,51 @@ const (
defaultResyncInterval = 3 * time.Minute
)
func start(ctx context.Context, l net.Listener, handler http.Handler) {
func startServer(ctx context.Context, shutdown *sync.WaitGroup, l net.Listener, handler http.Handler) {
server := http.Server{Handler: handler}
errCh := make(chan error)
shutdown.Add(1)
go func() {
errCh <- server.Serve(l)
defer shutdown.Done()
err := server.Serve(l)
plog.Debug("server exited", "err", err)
}()
shutdown.Add(1)
go func() {
select {
case err := <-errCh:
plog.Debug("server exited", "err", err)
case <-ctx.Done():
plog.Debug("server context cancelled", "err", ctx.Err())
if err := server.Shutdown(context.Background()); err != nil {
plog.Debug("server shutdown failed", "err", err)
}
defer shutdown.Done()
<-ctx.Done()
plog.Debug("server context cancelled", "err", ctx.Err())
// allow up to a minute grace period for active connections to return to idle
connectionsCtx, connectionsCancel := context.WithTimeout(context.Background(), time.Minute)
defer connectionsCancel()
if err := server.Shutdown(connectionsCtx); err != nil {
plog.Debug("server shutdown failed", "err", err)
}
}()
}
func waitForSignal() os.Signal {
func signalCtx() context.Context {
signalCh := make(chan os.Signal, 1)
signal.Notify(signalCh, os.Interrupt, syscall.SIGTERM)
return <-signalCh
ctx, cancel := context.WithCancel(context.Background())
go func() {
defer cancel()
s := <-signalCh
plog.Debug("saw signal", "signal", s)
}()
return ctx
}
//nolint:funlen
func startControllers(
ctx context.Context,
func prepareControllers(
cfg *supervisor.Config,
issuerManager *manager.Manager,
dynamicJWKSProvider jwks.DynamicJWKSProvider,
@ -97,8 +114,8 @@ func startControllers(
pinnipedClient pinnipedclientset.Interface,
kubeInformers kubeinformers.SharedInformerFactory,
pinnipedInformers pinnipedinformers.SharedInformerFactory,
leaderElector func(context.Context, func(context.Context)),
) {
leaderElector controllerinit.RunnerWrapper,
) controllerinit.RunnerBuilder {
federationDomainInformer := pinnipedInformers.Config().V1alpha1().FederationDomains()
secretInformer := kubeInformers.Core().V1().Secrets()
@ -267,21 +284,27 @@ func startControllers(
),
singletonWorker)
kubeInformers.Start(ctx.Done())
pinnipedInformers.Start(ctx.Done())
// Wait until the caches are synced before returning.
kubeInformers.WaitForCacheSync(ctx.Done())
pinnipedInformers.WaitForCacheSync(ctx.Done())
go leaderElector(ctx, controllerManager.Start)
return controllerinit.Prepare(controllerManager.Start, leaderElector, kubeInformers, pinnipedInformers)
}
func run(podInfo *downward.PodInfo, cfg *supervisor.Config) error {
serverInstallationNamespace := podInfo.Namespace
func startControllers(ctx context.Context, shutdown *sync.WaitGroup, buildControllers controllerinit.RunnerBuilder) error {
runControllers, err := buildControllers(ctx)
if err != nil {
return fmt.Errorf("cannot create run controller func: %w", err)
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
shutdown.Add(1)
go func() {
defer shutdown.Done()
runControllers(ctx)
}()
return nil
}
func runSupervisor(podInfo *downward.PodInfo, cfg *supervisor.Config) error {
serverInstallationNamespace := podInfo.Namespace
dref, supervisorDeployment, err := deploymentref.New(podInfo)
if err != nil {
@ -339,8 +362,7 @@ func run(podInfo *downward.PodInfo, cfg *supervisor.Config) error {
clientWithoutLeaderElection.Kubernetes.CoreV1().Secrets(serverInstallationNamespace), // writes to kube storage are allowed for non-leaders
)
startControllers(
ctx,
buildControllersFunc := prepareControllers(
cfg,
oidProvidersManager,
dynamicJWKSProvider,
@ -355,13 +377,20 @@ func run(podInfo *downward.PodInfo, cfg *supervisor.Config) error {
leaderElector,
)
ctx := signalCtx()
shutdown := &sync.WaitGroup{}
if err := startControllers(ctx, shutdown, buildControllersFunc); err != nil {
return err
}
//nolint: gosec // Intentionally binding to all network interfaces.
httpListener, err := net.Listen("tcp", ":8080")
if err != nil {
return fmt.Errorf("cannot create listener: %w", err)
}
defer func() { _ = httpListener.Close() }()
start(ctx, httpListener, oidProvidersManager)
startServer(ctx, shutdown, httpListener, oidProvidersManager)
//nolint: gosec // Intentionally binding to all network interfaces.
httpsListener, err := tls.Listen("tcp", ":8443", &tls.Config{
@ -384,20 +413,20 @@ func run(podInfo *downward.PodInfo, cfg *supervisor.Config) error {
return fmt.Errorf("cannot create listener: %w", err)
}
defer func() { _ = httpsListener.Close() }()
start(ctx, httpsListener, oidProvidersManager)
startServer(ctx, shutdown, httpsListener, oidProvidersManager)
plog.Debug("supervisor is ready",
"httpAddress", httpListener.Addr().String(),
"httpsAddress", httpsListener.Addr().String(),
)
defer plog.Debug("supervisor exiting")
gotSignal := waitForSignal()
plog.Debug("supervisor exiting", "signal", gotSignal)
shutdown.Wait()
return nil
}
func Main() {
func main() error { // return an error instead of klog.Fatal to allow defer statements to run
logs.InitLogs()
defer logs.FlushLogs()
plog.RemoveKlogGlobalFlags() // move this whenever the below code gets refactored to use cobra
@ -408,16 +437,20 @@ func Main() {
// Discover in which namespace we are installed.
podInfo, err := downward.Load(os.Args[1])
if err != nil {
klog.Fatal(fmt.Errorf("could not read pod metadata: %w", err))
return fmt.Errorf("could not read pod metadata: %w", err)
}
// Read the server config file.
cfg, err := supervisor.FromPath(os.Args[2])
if err != nil {
klog.Fatal(fmt.Errorf("could not load config: %w", err))
return fmt.Errorf("could not load config: %w", err)
}
if err := run(podInfo, cfg); err != nil {
return runSupervisor(podInfo, cfg)
}
func Main() {
if err := main(); err != nil {
klog.Fatal(err)
}
}

View File

@ -0,0 +1,81 @@
// Copyright 2021 the Pinniped contributors. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
package integration
import (
"context"
"testing"
"time"
"github.com/stretchr/testify/require"
kubeinformers "k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"go.pinniped.dev/internal/controllerinit"
"go.pinniped.dev/test/testlib"
)
// this just makes some slow read requests which are safe to run in parallel with serial tests, see main_test.go.
func TestControllerInitPrepare_Parallel(t *testing.T) {
_ = testlib.IntegrationEnv(t)
t.Run("with parent context that is never canceled", func(t *testing.T) {
t.Parallel()
// the nil params should never be used in this case
buildControllers := controllerinit.Prepare(nil, nil, buildBrokenInformer(t))
start := time.Now()
runControllers, err := buildControllers(context.Background()) // we expect this to not block forever even with a context.Background()
delta := time.Since(start)
require.EqualError(t, err,
"failed to sync informers of k8s.io/client-go/informers.sharedInformerFactory: "+
"[k8s.io/api/core/v1.Namespace k8s.io/api/core/v1.Node]")
require.Nil(t, runControllers)
require.InDelta(t, time.Minute, delta, float64(30*time.Second))
})
t.Run("with parent context that is canceled early", func(t *testing.T) {
t.Parallel()
// the nil params should never be used in this case
buildControllers := controllerinit.Prepare(nil, nil, buildBrokenInformer(t))
// we expect this to exit sooner because the parent context is shorter
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
t.Cleanup(cancel)
start := time.Now()
runControllers, err := buildControllers(ctx)
delta := time.Since(start)
require.EqualError(t, err,
"failed to sync informers of k8s.io/client-go/informers.sharedInformerFactory: "+
"[k8s.io/api/core/v1.Namespace k8s.io/api/core/v1.Node]")
require.Nil(t, runControllers)
require.InDelta(t, 10*time.Second, delta, float64(15*time.Second))
})
}
func buildBrokenInformer(t *testing.T) kubeinformers.SharedInformerFactory {
t.Helper()
config := testlib.NewClientConfig(t)
config = rest.CopyConfig(config)
config.Impersonate.UserName = "user-with-no-rbac" // so we can test that we correctly detect a cache sync failure
client := kubernetes.NewForConfigOrDie(config)
informers := kubeinformers.NewSharedInformerFactoryWithOptions(client, 0)
// make sure some informers gets lazily loaded
_ = informers.Core().V1().Nodes().Informer()
_ = informers.Core().V1().Namespaces().Informer()
return informers
}