Ensure concierge and supervisor gracefully exit
Changes made to both components: 1. Logs are always flushed on process exit 2. Informer cache sync can no longer hang process start up forever Changes made to concierge: 1. Add pre-shutdown hook that waits for controllers to exit cleanly 2. Informer caches are synced in post-start hook Changes made to supervisor: 1. Add shutdown code that waits for controllers to exit cleanly 2. Add shutdown code that waits for active connections to become idle Waiting for controllers to exit cleanly is critical as this allows the leader election logic to release the lock on exit. This reduces the time needed for the next leader to be elected. Signed-off-by: Monis Khan <mok@vmware.com>
This commit is contained in:
parent
e43bd59688
commit
0d285ce993
@ -6,6 +6,7 @@ package apiserver
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"sync"
|
||||||
|
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
"k8s.io/apimachinery/pkg/runtime"
|
"k8s.io/apimachinery/pkg/runtime"
|
||||||
@ -15,6 +16,7 @@ import (
|
|||||||
genericapiserver "k8s.io/apiserver/pkg/server"
|
genericapiserver "k8s.io/apiserver/pkg/server"
|
||||||
"k8s.io/client-go/pkg/version"
|
"k8s.io/client-go/pkg/version"
|
||||||
|
|
||||||
|
"go.pinniped.dev/internal/controllerinit"
|
||||||
"go.pinniped.dev/internal/issuer"
|
"go.pinniped.dev/internal/issuer"
|
||||||
"go.pinniped.dev/internal/plog"
|
"go.pinniped.dev/internal/plog"
|
||||||
"go.pinniped.dev/internal/registry/credentialrequest"
|
"go.pinniped.dev/internal/registry/credentialrequest"
|
||||||
@ -29,7 +31,7 @@ type Config struct {
|
|||||||
type ExtraConfig struct {
|
type ExtraConfig struct {
|
||||||
Authenticator credentialrequest.TokenCredentialRequestAuthenticator
|
Authenticator credentialrequest.TokenCredentialRequestAuthenticator
|
||||||
Issuer issuer.ClientCertIssuer
|
Issuer issuer.ClientCertIssuer
|
||||||
StartControllersPostStartHook func(ctx context.Context)
|
BuildControllersPostStartHook controllerinit.RunnerBuilder
|
||||||
Scheme *runtime.Scheme
|
Scheme *runtime.Scheme
|
||||||
NegotiatedSerializer runtime.NegotiatedSerializer
|
NegotiatedSerializer runtime.NegotiatedSerializer
|
||||||
LoginConciergeGroupVersion schema.GroupVersion
|
LoginConciergeGroupVersion schema.GroupVersion
|
||||||
@ -105,16 +107,39 @@ func (c completedConfig) New() (*PinnipedServer, error) {
|
|||||||
return nil, fmt.Errorf("could not install API groups: %w", err)
|
return nil, fmt.Errorf("could not install API groups: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
shutdown := &sync.WaitGroup{}
|
||||||
s.GenericAPIServer.AddPostStartHookOrDie("start-controllers",
|
s.GenericAPIServer.AddPostStartHookOrDie("start-controllers",
|
||||||
func(postStartContext genericapiserver.PostStartHookContext) error {
|
func(postStartContext genericapiserver.PostStartHookContext) error {
|
||||||
plog.Debug("start-controllers post start hook starting")
|
plog.Debug("start-controllers post start hook starting")
|
||||||
|
|
||||||
ctx, cancel := context.WithCancel(context.Background())
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
go func() {
|
go func() {
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
<-postStartContext.StopCh
|
<-postStartContext.StopCh
|
||||||
cancel()
|
|
||||||
}()
|
}()
|
||||||
c.ExtraConfig.StartControllersPostStartHook(ctx)
|
|
||||||
|
runControllers, err := c.ExtraConfig.BuildControllersPostStartHook(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("cannot create run controller func: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
shutdown.Add(1)
|
||||||
|
go func() {
|
||||||
|
defer shutdown.Done()
|
||||||
|
|
||||||
|
runControllers(ctx)
|
||||||
|
}()
|
||||||
|
|
||||||
|
return nil
|
||||||
|
},
|
||||||
|
)
|
||||||
|
s.GenericAPIServer.AddPreShutdownHookOrDie("stop-controllers",
|
||||||
|
func() error {
|
||||||
|
plog.Debug("stop-controllers pre shutdown hook starting")
|
||||||
|
defer plog.Debug("stop-controllers pre shutdown hook completed")
|
||||||
|
|
||||||
|
shutdown.Wait()
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
},
|
},
|
||||||
|
@ -27,6 +27,7 @@ import (
|
|||||||
conciergescheme "go.pinniped.dev/internal/concierge/scheme"
|
conciergescheme "go.pinniped.dev/internal/concierge/scheme"
|
||||||
"go.pinniped.dev/internal/config/concierge"
|
"go.pinniped.dev/internal/config/concierge"
|
||||||
"go.pinniped.dev/internal/controller/authenticator/authncache"
|
"go.pinniped.dev/internal/controller/authenticator/authncache"
|
||||||
|
"go.pinniped.dev/internal/controllerinit"
|
||||||
"go.pinniped.dev/internal/controllermanager"
|
"go.pinniped.dev/internal/controllermanager"
|
||||||
"go.pinniped.dev/internal/downward"
|
"go.pinniped.dev/internal/downward"
|
||||||
"go.pinniped.dev/internal/dynamiccert"
|
"go.pinniped.dev/internal/dynamiccert"
|
||||||
@ -135,7 +136,7 @@ func (a *App) runServer(ctx context.Context) error {
|
|||||||
|
|
||||||
// Prepare to start the controllers, but defer actually starting them until the
|
// Prepare to start the controllers, but defer actually starting them until the
|
||||||
// post start hook of the aggregated API server.
|
// post start hook of the aggregated API server.
|
||||||
startControllersFunc, err := controllermanager.PrepareControllers(
|
buildControllers, err := controllermanager.PrepareControllers(
|
||||||
&controllermanager.Config{
|
&controllermanager.Config{
|
||||||
ServerInstallationInfo: podInfo,
|
ServerInstallationInfo: podInfo,
|
||||||
APIGroupSuffix: *cfg.APIGroupSuffix,
|
APIGroupSuffix: *cfg.APIGroupSuffix,
|
||||||
@ -165,7 +166,7 @@ func (a *App) runServer(ctx context.Context) error {
|
|||||||
dynamicServingCertProvider,
|
dynamicServingCertProvider,
|
||||||
authenticators,
|
authenticators,
|
||||||
certIssuer,
|
certIssuer,
|
||||||
startControllersFunc,
|
buildControllers,
|
||||||
*cfg.APIGroupSuffix,
|
*cfg.APIGroupSuffix,
|
||||||
scheme,
|
scheme,
|
||||||
loginGV,
|
loginGV,
|
||||||
@ -190,7 +191,7 @@ func getAggregatedAPIServerConfig(
|
|||||||
dynamicCertProvider dynamiccert.Private,
|
dynamicCertProvider dynamiccert.Private,
|
||||||
authenticator credentialrequest.TokenCredentialRequestAuthenticator,
|
authenticator credentialrequest.TokenCredentialRequestAuthenticator,
|
||||||
issuer issuer.ClientCertIssuer,
|
issuer issuer.ClientCertIssuer,
|
||||||
startControllersPostStartHook func(context.Context),
|
buildControllers controllerinit.RunnerBuilder,
|
||||||
apiGroupSuffix string,
|
apiGroupSuffix string,
|
||||||
scheme *runtime.Scheme,
|
scheme *runtime.Scheme,
|
||||||
loginConciergeGroupVersion, identityConciergeGroupVersion schema.GroupVersion,
|
loginConciergeGroupVersion, identityConciergeGroupVersion schema.GroupVersion,
|
||||||
@ -227,7 +228,7 @@ func getAggregatedAPIServerConfig(
|
|||||||
ExtraConfig: apiserver.ExtraConfig{
|
ExtraConfig: apiserver.ExtraConfig{
|
||||||
Authenticator: authenticator,
|
Authenticator: authenticator,
|
||||||
Issuer: issuer,
|
Issuer: issuer,
|
||||||
StartControllersPostStartHook: startControllersPostStartHook,
|
BuildControllersPostStartHook: buildControllers,
|
||||||
Scheme: scheme,
|
Scheme: scheme,
|
||||||
NegotiatedSerializer: codecs,
|
NegotiatedSerializer: codecs,
|
||||||
LoginConciergeGroupVersion: loginConciergeGroupVersion,
|
LoginConciergeGroupVersion: loginConciergeGroupVersion,
|
||||||
@ -237,7 +238,7 @@ func getAggregatedAPIServerConfig(
|
|||||||
return apiServerConfig, nil
|
return apiServerConfig, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func Main() {
|
func main() error { // return an error instead of klog.Fatal to allow defer statements to run
|
||||||
logs.InitLogs()
|
logs.InitLogs()
|
||||||
defer logs.FlushLogs()
|
defer logs.FlushLogs()
|
||||||
|
|
||||||
@ -250,7 +251,11 @@ func Main() {
|
|||||||
|
|
||||||
ctx := genericapiserver.SetupSignalContext()
|
ctx := genericapiserver.SetupSignalContext()
|
||||||
|
|
||||||
if err := New(ctx, os.Args[1:], os.Stdout, os.Stderr).Run(); err != nil {
|
return New(ctx, os.Args[1:], os.Stdout, os.Stderr).Run()
|
||||||
|
}
|
||||||
|
|
||||||
|
func Main() {
|
||||||
|
if err := main(); err != nil {
|
||||||
klog.Fatal(err)
|
klog.Fatal(err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
87
internal/controllerinit/controllerinit.go
Normal file
87
internal/controllerinit/controllerinit.go
Normal file
@ -0,0 +1,87 @@
|
|||||||
|
// Copyright 2021 the Pinniped contributors. All Rights Reserved.
|
||||||
|
// SPDX-License-Identifier: Apache-2.0
|
||||||
|
|
||||||
|
package controllerinit
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"reflect"
|
||||||
|
"sort"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Runner is something that can be run such as a series of controllers. Blocks until context is canceled.
|
||||||
|
type Runner func(context.Context)
|
||||||
|
|
||||||
|
// RunnerWrapper takes a Runner and wraps its execution with other logic. Blocks until context is canceled.
|
||||||
|
// RunnerWrapper is responsible for the lifetime of the passed in Runner.
|
||||||
|
type RunnerWrapper func(context.Context, Runner)
|
||||||
|
|
||||||
|
// RunnerBuilder is a function that can be used to construct a Runner.
|
||||||
|
// It is expected to be called in the main go routine since the construction can fail.
|
||||||
|
type RunnerBuilder func(context.Context) (Runner, error)
|
||||||
|
|
||||||
|
// informer is the subset of SharedInformerFactory needed for starting an informer cache and waiting for it to sync.
|
||||||
|
type informer interface {
|
||||||
|
Start(stopCh <-chan struct{})
|
||||||
|
WaitForCacheSync(stopCh <-chan struct{}) map[reflect.Type]bool
|
||||||
|
}
|
||||||
|
|
||||||
|
// Prepare returns RunnerBuilder that, when called:
|
||||||
|
// 1. Starts all provided informers and waits for them sync (and fails if they hang)
|
||||||
|
// 2. Returns a Runner that combines the Runner and RunnerWrapper passed into Prepare
|
||||||
|
func Prepare(controllers Runner, controllersWrapper RunnerWrapper, informers ...informer) RunnerBuilder {
|
||||||
|
return func(ctx context.Context) (Runner, error) {
|
||||||
|
for _, informer := range informers {
|
||||||
|
informer := informer
|
||||||
|
|
||||||
|
informer.Start(ctx.Done())
|
||||||
|
|
||||||
|
// prevent us from blocking forever due to a broken informer
|
||||||
|
waitCtx, waitCancel := context.WithTimeout(ctx, time.Minute)
|
||||||
|
defer waitCancel()
|
||||||
|
|
||||||
|
// wait until the caches are synced before returning
|
||||||
|
status := informer.WaitForCacheSync(waitCtx.Done())
|
||||||
|
|
||||||
|
if unsynced := unsyncedInformers(status); len(unsynced) > 0 {
|
||||||
|
return nil, fmt.Errorf("failed to sync informers of %s: %v", anyToFullname(informer), unsynced)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return func(controllerCtx context.Context) {
|
||||||
|
controllersWrapper(controllerCtx, controllers)
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func unsyncedInformers(status map[reflect.Type]bool) []string {
|
||||||
|
if len(status) == 0 {
|
||||||
|
return []string{"all:empty"}
|
||||||
|
}
|
||||||
|
|
||||||
|
var names []string
|
||||||
|
|
||||||
|
for typ, synced := range status {
|
||||||
|
if !synced {
|
||||||
|
names = append(names, typeToFullname(typ))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
sort.Strings(names)
|
||||||
|
|
||||||
|
return names
|
||||||
|
}
|
||||||
|
|
||||||
|
func anyToFullname(any interface{}) string {
|
||||||
|
typ := reflect.TypeOf(any)
|
||||||
|
return typeToFullname(typ)
|
||||||
|
}
|
||||||
|
|
||||||
|
func typeToFullname(typ reflect.Type) string {
|
||||||
|
if typ.Kind() == reflect.Ptr {
|
||||||
|
typ = typ.Elem()
|
||||||
|
}
|
||||||
|
return typ.PkgPath() + "." + typ.Name()
|
||||||
|
}
|
@ -6,7 +6,6 @@
|
|||||||
package controllermanager
|
package controllermanager
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
|
||||||
"fmt"
|
"fmt"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@ -27,6 +26,7 @@ import (
|
|||||||
"go.pinniped.dev/internal/controller/authenticator/webhookcachefiller"
|
"go.pinniped.dev/internal/controller/authenticator/webhookcachefiller"
|
||||||
"go.pinniped.dev/internal/controller/impersonatorconfig"
|
"go.pinniped.dev/internal/controller/impersonatorconfig"
|
||||||
"go.pinniped.dev/internal/controller/kubecertagent"
|
"go.pinniped.dev/internal/controller/kubecertagent"
|
||||||
|
"go.pinniped.dev/internal/controllerinit"
|
||||||
"go.pinniped.dev/internal/controllerlib"
|
"go.pinniped.dev/internal/controllerlib"
|
||||||
"go.pinniped.dev/internal/deploymentref"
|
"go.pinniped.dev/internal/deploymentref"
|
||||||
"go.pinniped.dev/internal/downward"
|
"go.pinniped.dev/internal/downward"
|
||||||
@ -95,7 +95,7 @@ type Config struct {
|
|||||||
|
|
||||||
// Prepare the controllers and their informers and return a function that will start them when called.
|
// Prepare the controllers and their informers and return a function that will start them when called.
|
||||||
//nolint:funlen // Eh, fair, it is a really long function...but it is wiring the world...so...
|
//nolint:funlen // Eh, fair, it is a really long function...but it is wiring the world...so...
|
||||||
func PrepareControllers(c *Config) (func(ctx context.Context), error) {
|
func PrepareControllers(c *Config) (controllerinit.RunnerBuilder, error) {
|
||||||
loginConciergeGroupData, identityConciergeGroupData := groupsuffix.ConciergeAggregatedGroups(c.APIGroupSuffix)
|
loginConciergeGroupData, identityConciergeGroupData := groupsuffix.ConciergeAggregatedGroups(c.APIGroupSuffix)
|
||||||
|
|
||||||
dref, deployment, err := deploymentref.New(c.ServerInstallationInfo)
|
dref, deployment, err := deploymentref.New(c.ServerInstallationInfo)
|
||||||
@ -303,11 +303,12 @@ func PrepareControllers(c *Config) (func(ctx context.Context), error) {
|
|||||||
singletonWorker,
|
singletonWorker,
|
||||||
)
|
)
|
||||||
|
|
||||||
// Return a function which starts the informers and controllers.
|
return controllerinit.Prepare(controllerManager.Start, leaderElector,
|
||||||
return func(ctx context.Context) {
|
informers.kubePublicNamespaceK8s,
|
||||||
informers.startAndWaitForSync(ctx)
|
informers.kubeSystemNamespaceK8s,
|
||||||
go leaderElector(ctx, controllerManager.Start)
|
informers.installationNamespaceK8s,
|
||||||
}, nil
|
informers.pinniped,
|
||||||
|
), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
type informers struct {
|
type informers struct {
|
||||||
@ -345,15 +346,3 @@ func createInformers(
|
|||||||
),
|
),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (i *informers) startAndWaitForSync(ctx context.Context) {
|
|
||||||
i.kubePublicNamespaceK8s.Start(ctx.Done())
|
|
||||||
i.kubeSystemNamespaceK8s.Start(ctx.Done())
|
|
||||||
i.installationNamespaceK8s.Start(ctx.Done())
|
|
||||||
i.pinniped.Start(ctx.Done())
|
|
||||||
|
|
||||||
i.kubePublicNamespaceK8s.WaitForCacheSync(ctx.Done())
|
|
||||||
i.kubeSystemNamespaceK8s.WaitForCacheSync(ctx.Done())
|
|
||||||
i.installationNamespaceK8s.WaitForCacheSync(ctx.Done())
|
|
||||||
i.pinniped.WaitForCacheSync(ctx.Done())
|
|
||||||
}
|
|
||||||
|
@ -16,6 +16,7 @@ import (
|
|||||||
"k8s.io/client-go/tools/leaderelection/resourcelock"
|
"k8s.io/client-go/tools/leaderelection/resourcelock"
|
||||||
|
|
||||||
"go.pinniped.dev/internal/constable"
|
"go.pinniped.dev/internal/constable"
|
||||||
|
"go.pinniped.dev/internal/controllerinit"
|
||||||
"go.pinniped.dev/internal/downward"
|
"go.pinniped.dev/internal/downward"
|
||||||
"go.pinniped.dev/internal/kubeclient"
|
"go.pinniped.dev/internal/kubeclient"
|
||||||
"go.pinniped.dev/internal/plog"
|
"go.pinniped.dev/internal/plog"
|
||||||
@ -36,7 +37,7 @@ const ErrNotLeader constable.Error = "write attempt rejected as client is not le
|
|||||||
// logic and will coordinate lease release with the input controller starter function.
|
// logic and will coordinate lease release with the input controller starter function.
|
||||||
func New(podInfo *downward.PodInfo, deployment *appsv1.Deployment, opts ...kubeclient.Option) (
|
func New(podInfo *downward.PodInfo, deployment *appsv1.Deployment, opts ...kubeclient.Option) (
|
||||||
*kubeclient.Client,
|
*kubeclient.Client,
|
||||||
func(context.Context, func(context.Context)),
|
controllerinit.RunnerWrapper,
|
||||||
error,
|
error,
|
||||||
) {
|
) {
|
||||||
internalClient, err := kubeclient.New(opts...)
|
internalClient, err := kubeclient.New(opts...)
|
||||||
@ -89,7 +90,10 @@ func New(podInfo *downward.PodInfo, deployment *appsv1.Deployment, opts ...kubec
|
|||||||
return nil, nil, fmt.Errorf("could not create leader election client: %w", err)
|
return nil, nil, fmt.Errorf("could not create leader election client: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
controllersWithLeaderElector := func(ctx context.Context, controllers func(context.Context)) {
|
controllersWithLeaderElector := func(ctx context.Context, controllers controllerinit.Runner) {
|
||||||
|
plog.Debug("leader election loop start", "identity", identity)
|
||||||
|
defer plog.Debug("leader election loop shutdown", "identity", identity)
|
||||||
|
|
||||||
leaderElectorCtx, leaderElectorCancel := context.WithCancel(context.Background()) // purposefully detached context
|
leaderElectorCtx, leaderElectorCancel := context.WithCancel(context.Background()) // purposefully detached context
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
|
@ -14,6 +14,7 @@ import (
|
|||||||
"os"
|
"os"
|
||||||
"os/signal"
|
"os/signal"
|
||||||
"strings"
|
"strings"
|
||||||
|
"sync"
|
||||||
"syscall"
|
"syscall"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@ -38,6 +39,7 @@ import (
|
|||||||
"go.pinniped.dev/internal/controller/supervisorconfig/ldapupstreamwatcher"
|
"go.pinniped.dev/internal/controller/supervisorconfig/ldapupstreamwatcher"
|
||||||
"go.pinniped.dev/internal/controller/supervisorconfig/oidcupstreamwatcher"
|
"go.pinniped.dev/internal/controller/supervisorconfig/oidcupstreamwatcher"
|
||||||
"go.pinniped.dev/internal/controller/supervisorstorage"
|
"go.pinniped.dev/internal/controller/supervisorstorage"
|
||||||
|
"go.pinniped.dev/internal/controllerinit"
|
||||||
"go.pinniped.dev/internal/controllerlib"
|
"go.pinniped.dev/internal/controllerlib"
|
||||||
"go.pinniped.dev/internal/deploymentref"
|
"go.pinniped.dev/internal/deploymentref"
|
||||||
"go.pinniped.dev/internal/downward"
|
"go.pinniped.dev/internal/downward"
|
||||||
@ -56,36 +58,51 @@ const (
|
|||||||
defaultResyncInterval = 3 * time.Minute
|
defaultResyncInterval = 3 * time.Minute
|
||||||
)
|
)
|
||||||
|
|
||||||
func start(ctx context.Context, l net.Listener, handler http.Handler) {
|
func startServer(ctx context.Context, shutdown *sync.WaitGroup, l net.Listener, handler http.Handler) {
|
||||||
server := http.Server{Handler: handler}
|
server := http.Server{Handler: handler}
|
||||||
|
|
||||||
errCh := make(chan error)
|
shutdown.Add(1)
|
||||||
go func() {
|
go func() {
|
||||||
errCh <- server.Serve(l)
|
defer shutdown.Done()
|
||||||
|
|
||||||
|
err := server.Serve(l)
|
||||||
|
plog.Debug("server exited", "err", err)
|
||||||
}()
|
}()
|
||||||
|
|
||||||
|
shutdown.Add(1)
|
||||||
go func() {
|
go func() {
|
||||||
select {
|
defer shutdown.Done()
|
||||||
case err := <-errCh:
|
|
||||||
plog.Debug("server exited", "err", err)
|
<-ctx.Done()
|
||||||
case <-ctx.Done():
|
plog.Debug("server context cancelled", "err", ctx.Err())
|
||||||
plog.Debug("server context cancelled", "err", ctx.Err())
|
|
||||||
if err := server.Shutdown(context.Background()); err != nil {
|
// allow up to a minute grace period for active connections to return to idle
|
||||||
plog.Debug("server shutdown failed", "err", err)
|
connectionsCtx, connectionsCancel := context.WithTimeout(context.Background(), time.Minute)
|
||||||
}
|
defer connectionsCancel()
|
||||||
|
|
||||||
|
if err := server.Shutdown(connectionsCtx); err != nil {
|
||||||
|
plog.Debug("server shutdown failed", "err", err)
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
|
|
||||||
func waitForSignal() os.Signal {
|
func signalCtx() context.Context {
|
||||||
signalCh := make(chan os.Signal, 1)
|
signalCh := make(chan os.Signal, 1)
|
||||||
signal.Notify(signalCh, os.Interrupt, syscall.SIGTERM)
|
signal.Notify(signalCh, os.Interrupt, syscall.SIGTERM)
|
||||||
return <-signalCh
|
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
go func() {
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
s := <-signalCh
|
||||||
|
plog.Debug("saw signal", "signal", s)
|
||||||
|
}()
|
||||||
|
|
||||||
|
return ctx
|
||||||
}
|
}
|
||||||
|
|
||||||
//nolint:funlen
|
//nolint:funlen
|
||||||
func startControllers(
|
func prepareControllers(
|
||||||
ctx context.Context,
|
|
||||||
cfg *supervisor.Config,
|
cfg *supervisor.Config,
|
||||||
issuerManager *manager.Manager,
|
issuerManager *manager.Manager,
|
||||||
dynamicJWKSProvider jwks.DynamicJWKSProvider,
|
dynamicJWKSProvider jwks.DynamicJWKSProvider,
|
||||||
@ -97,8 +114,8 @@ func startControllers(
|
|||||||
pinnipedClient pinnipedclientset.Interface,
|
pinnipedClient pinnipedclientset.Interface,
|
||||||
kubeInformers kubeinformers.SharedInformerFactory,
|
kubeInformers kubeinformers.SharedInformerFactory,
|
||||||
pinnipedInformers pinnipedinformers.SharedInformerFactory,
|
pinnipedInformers pinnipedinformers.SharedInformerFactory,
|
||||||
leaderElector func(context.Context, func(context.Context)),
|
leaderElector controllerinit.RunnerWrapper,
|
||||||
) {
|
) controllerinit.RunnerBuilder {
|
||||||
federationDomainInformer := pinnipedInformers.Config().V1alpha1().FederationDomains()
|
federationDomainInformer := pinnipedInformers.Config().V1alpha1().FederationDomains()
|
||||||
secretInformer := kubeInformers.Core().V1().Secrets()
|
secretInformer := kubeInformers.Core().V1().Secrets()
|
||||||
|
|
||||||
@ -267,21 +284,27 @@ func startControllers(
|
|||||||
),
|
),
|
||||||
singletonWorker)
|
singletonWorker)
|
||||||
|
|
||||||
kubeInformers.Start(ctx.Done())
|
return controllerinit.Prepare(controllerManager.Start, leaderElector, kubeInformers, pinnipedInformers)
|
||||||
pinnipedInformers.Start(ctx.Done())
|
|
||||||
|
|
||||||
// Wait until the caches are synced before returning.
|
|
||||||
kubeInformers.WaitForCacheSync(ctx.Done())
|
|
||||||
pinnipedInformers.WaitForCacheSync(ctx.Done())
|
|
||||||
|
|
||||||
go leaderElector(ctx, controllerManager.Start)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func run(podInfo *downward.PodInfo, cfg *supervisor.Config) error {
|
func startControllers(ctx context.Context, shutdown *sync.WaitGroup, buildControllers controllerinit.RunnerBuilder) error {
|
||||||
serverInstallationNamespace := podInfo.Namespace
|
runControllers, err := buildControllers(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("cannot create run controller func: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
ctx, cancel := context.WithCancel(context.Background())
|
shutdown.Add(1)
|
||||||
defer cancel()
|
go func() {
|
||||||
|
defer shutdown.Done()
|
||||||
|
|
||||||
|
runControllers(ctx)
|
||||||
|
}()
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func runSupervisor(podInfo *downward.PodInfo, cfg *supervisor.Config) error {
|
||||||
|
serverInstallationNamespace := podInfo.Namespace
|
||||||
|
|
||||||
dref, supervisorDeployment, err := deploymentref.New(podInfo)
|
dref, supervisorDeployment, err := deploymentref.New(podInfo)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -339,8 +362,7 @@ func run(podInfo *downward.PodInfo, cfg *supervisor.Config) error {
|
|||||||
clientWithoutLeaderElection.Kubernetes.CoreV1().Secrets(serverInstallationNamespace), // writes to kube storage are allowed for non-leaders
|
clientWithoutLeaderElection.Kubernetes.CoreV1().Secrets(serverInstallationNamespace), // writes to kube storage are allowed for non-leaders
|
||||||
)
|
)
|
||||||
|
|
||||||
startControllers(
|
buildControllersFunc := prepareControllers(
|
||||||
ctx,
|
|
||||||
cfg,
|
cfg,
|
||||||
oidProvidersManager,
|
oidProvidersManager,
|
||||||
dynamicJWKSProvider,
|
dynamicJWKSProvider,
|
||||||
@ -355,13 +377,20 @@ func run(podInfo *downward.PodInfo, cfg *supervisor.Config) error {
|
|||||||
leaderElector,
|
leaderElector,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
ctx := signalCtx()
|
||||||
|
shutdown := &sync.WaitGroup{}
|
||||||
|
|
||||||
|
if err := startControllers(ctx, shutdown, buildControllersFunc); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
//nolint: gosec // Intentionally binding to all network interfaces.
|
//nolint: gosec // Intentionally binding to all network interfaces.
|
||||||
httpListener, err := net.Listen("tcp", ":8080")
|
httpListener, err := net.Listen("tcp", ":8080")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("cannot create listener: %w", err)
|
return fmt.Errorf("cannot create listener: %w", err)
|
||||||
}
|
}
|
||||||
defer func() { _ = httpListener.Close() }()
|
defer func() { _ = httpListener.Close() }()
|
||||||
start(ctx, httpListener, oidProvidersManager)
|
startServer(ctx, shutdown, httpListener, oidProvidersManager)
|
||||||
|
|
||||||
//nolint: gosec // Intentionally binding to all network interfaces.
|
//nolint: gosec // Intentionally binding to all network interfaces.
|
||||||
httpsListener, err := tls.Listen("tcp", ":8443", &tls.Config{
|
httpsListener, err := tls.Listen("tcp", ":8443", &tls.Config{
|
||||||
@ -384,20 +413,20 @@ func run(podInfo *downward.PodInfo, cfg *supervisor.Config) error {
|
|||||||
return fmt.Errorf("cannot create listener: %w", err)
|
return fmt.Errorf("cannot create listener: %w", err)
|
||||||
}
|
}
|
||||||
defer func() { _ = httpsListener.Close() }()
|
defer func() { _ = httpsListener.Close() }()
|
||||||
start(ctx, httpsListener, oidProvidersManager)
|
startServer(ctx, shutdown, httpsListener, oidProvidersManager)
|
||||||
|
|
||||||
plog.Debug("supervisor is ready",
|
plog.Debug("supervisor is ready",
|
||||||
"httpAddress", httpListener.Addr().String(),
|
"httpAddress", httpListener.Addr().String(),
|
||||||
"httpsAddress", httpsListener.Addr().String(),
|
"httpsAddress", httpsListener.Addr().String(),
|
||||||
)
|
)
|
||||||
|
defer plog.Debug("supervisor exiting")
|
||||||
|
|
||||||
gotSignal := waitForSignal()
|
shutdown.Wait()
|
||||||
plog.Debug("supervisor exiting", "signal", gotSignal)
|
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func Main() {
|
func main() error { // return an error instead of klog.Fatal to allow defer statements to run
|
||||||
logs.InitLogs()
|
logs.InitLogs()
|
||||||
defer logs.FlushLogs()
|
defer logs.FlushLogs()
|
||||||
plog.RemoveKlogGlobalFlags() // move this whenever the below code gets refactored to use cobra
|
plog.RemoveKlogGlobalFlags() // move this whenever the below code gets refactored to use cobra
|
||||||
@ -408,16 +437,20 @@ func Main() {
|
|||||||
// Discover in which namespace we are installed.
|
// Discover in which namespace we are installed.
|
||||||
podInfo, err := downward.Load(os.Args[1])
|
podInfo, err := downward.Load(os.Args[1])
|
||||||
if err != nil {
|
if err != nil {
|
||||||
klog.Fatal(fmt.Errorf("could not read pod metadata: %w", err))
|
return fmt.Errorf("could not read pod metadata: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Read the server config file.
|
// Read the server config file.
|
||||||
cfg, err := supervisor.FromPath(os.Args[2])
|
cfg, err := supervisor.FromPath(os.Args[2])
|
||||||
if err != nil {
|
if err != nil {
|
||||||
klog.Fatal(fmt.Errorf("could not load config: %w", err))
|
return fmt.Errorf("could not load config: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := run(podInfo, cfg); err != nil {
|
return runSupervisor(podInfo, cfg)
|
||||||
|
}
|
||||||
|
|
||||||
|
func Main() {
|
||||||
|
if err := main(); err != nil {
|
||||||
klog.Fatal(err)
|
klog.Fatal(err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
81
test/integration/controllerinit_test.go
Normal file
81
test/integration/controllerinit_test.go
Normal file
@ -0,0 +1,81 @@
|
|||||||
|
// Copyright 2021 the Pinniped contributors. All Rights Reserved.
|
||||||
|
// SPDX-License-Identifier: Apache-2.0
|
||||||
|
|
||||||
|
package integration
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
kubeinformers "k8s.io/client-go/informers"
|
||||||
|
"k8s.io/client-go/kubernetes"
|
||||||
|
"k8s.io/client-go/rest"
|
||||||
|
|
||||||
|
"go.pinniped.dev/internal/controllerinit"
|
||||||
|
"go.pinniped.dev/test/testlib"
|
||||||
|
)
|
||||||
|
|
||||||
|
// this just makes some slow read requests which are safe to run in parallel with serial tests, see main_test.go.
|
||||||
|
func TestControllerInitPrepare_Parallel(t *testing.T) {
|
||||||
|
_ = testlib.IntegrationEnv(t)
|
||||||
|
|
||||||
|
t.Run("with parent context that is never canceled", func(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
|
||||||
|
// the nil params should never be used in this case
|
||||||
|
buildControllers := controllerinit.Prepare(nil, nil, buildBrokenInformer(t))
|
||||||
|
|
||||||
|
start := time.Now()
|
||||||
|
runControllers, err := buildControllers(context.Background()) // we expect this to not block forever even with a context.Background()
|
||||||
|
delta := time.Since(start)
|
||||||
|
|
||||||
|
require.EqualError(t, err,
|
||||||
|
"failed to sync informers of k8s.io/client-go/informers.sharedInformerFactory: "+
|
||||||
|
"[k8s.io/api/core/v1.Namespace k8s.io/api/core/v1.Node]")
|
||||||
|
require.Nil(t, runControllers)
|
||||||
|
|
||||||
|
require.InDelta(t, time.Minute, delta, float64(30*time.Second))
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("with parent context that is canceled early", func(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
|
||||||
|
// the nil params should never be used in this case
|
||||||
|
buildControllers := controllerinit.Prepare(nil, nil, buildBrokenInformer(t))
|
||||||
|
|
||||||
|
// we expect this to exit sooner because the parent context is shorter
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||||||
|
t.Cleanup(cancel)
|
||||||
|
|
||||||
|
start := time.Now()
|
||||||
|
runControllers, err := buildControllers(ctx)
|
||||||
|
delta := time.Since(start)
|
||||||
|
|
||||||
|
require.EqualError(t, err,
|
||||||
|
"failed to sync informers of k8s.io/client-go/informers.sharedInformerFactory: "+
|
||||||
|
"[k8s.io/api/core/v1.Namespace k8s.io/api/core/v1.Node]")
|
||||||
|
require.Nil(t, runControllers)
|
||||||
|
|
||||||
|
require.InDelta(t, 10*time.Second, delta, float64(15*time.Second))
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func buildBrokenInformer(t *testing.T) kubeinformers.SharedInformerFactory {
|
||||||
|
t.Helper()
|
||||||
|
|
||||||
|
config := testlib.NewClientConfig(t)
|
||||||
|
config = rest.CopyConfig(config)
|
||||||
|
config.Impersonate.UserName = "user-with-no-rbac" // so we can test that we correctly detect a cache sync failure
|
||||||
|
|
||||||
|
client := kubernetes.NewForConfigOrDie(config)
|
||||||
|
|
||||||
|
informers := kubeinformers.NewSharedInformerFactoryWithOptions(client, 0)
|
||||||
|
|
||||||
|
// make sure some informers gets lazily loaded
|
||||||
|
_ = informers.Core().V1().Nodes().Informer()
|
||||||
|
_ = informers.Core().V1().Namespaces().Informer()
|
||||||
|
|
||||||
|
return informers
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user