1e1789f6d1
This change allows configuration of the http and https listeners used by the supervisor. TCP (IPv4 and IPv6 with any interface and port) and Unix domain socket based listeners are supported. Listeners may also be disabled. Binding the http listener to TCP addresses other than 127.0.0.1 or ::1 is deprecated. The deployment now uses https health checks. The supervisor is always able to complete a TLS connection with the use of a bootstrap certificate that is signed by an in-memory certificate authority. To support sidecar containers used by service meshes, Unix domain socket based listeners include ACLs that allow writes to the socket file from any runAsUser specified in the pod's containers. Signed-off-by: Monis Khan <mok@vmware.com>
534 lines
16 KiB
Go
534 lines
16 KiB
Go
// Copyright 2020-2022 the Pinniped contributors. All Rights Reserved.
|
|
// SPDX-License-Identifier: Apache-2.0
|
|
|
|
// Package server defines the entrypoint for the Pinniped Supervisor server.
|
|
package server
|
|
|
|
import (
|
|
"context"
|
|
"crypto/rand"
|
|
"crypto/tls"
|
|
"fmt"
|
|
"net"
|
|
"net/http"
|
|
"os"
|
|
"os/signal"
|
|
"strconv"
|
|
"strings"
|
|
"sync"
|
|
"syscall"
|
|
"time"
|
|
|
|
"github.com/joshlf/go-acl"
|
|
appsv1 "k8s.io/api/apps/v1"
|
|
corev1 "k8s.io/api/core/v1"
|
|
genericapifilters "k8s.io/apiserver/pkg/endpoints/filters"
|
|
kubeinformers "k8s.io/client-go/informers"
|
|
"k8s.io/client-go/kubernetes"
|
|
"k8s.io/client-go/pkg/version"
|
|
"k8s.io/client-go/rest"
|
|
"k8s.io/component-base/logs"
|
|
"k8s.io/klog/v2"
|
|
"k8s.io/klog/v2/klogr"
|
|
"k8s.io/utils/clock"
|
|
|
|
configv1alpha1 "go.pinniped.dev/generated/latest/apis/supervisor/config/v1alpha1"
|
|
pinnipedclientset "go.pinniped.dev/generated/latest/client/supervisor/clientset/versioned"
|
|
pinnipedinformers "go.pinniped.dev/generated/latest/client/supervisor/informers/externalversions"
|
|
"go.pinniped.dev/internal/config/supervisor"
|
|
"go.pinniped.dev/internal/controller/supervisorconfig"
|
|
"go.pinniped.dev/internal/controller/supervisorconfig/activedirectoryupstreamwatcher"
|
|
"go.pinniped.dev/internal/controller/supervisorconfig/generator"
|
|
"go.pinniped.dev/internal/controller/supervisorconfig/ldapupstreamwatcher"
|
|
"go.pinniped.dev/internal/controller/supervisorconfig/oidcupstreamwatcher"
|
|
"go.pinniped.dev/internal/controller/supervisorstorage"
|
|
"go.pinniped.dev/internal/controllerinit"
|
|
"go.pinniped.dev/internal/controllerlib"
|
|
"go.pinniped.dev/internal/crypto/ptls"
|
|
"go.pinniped.dev/internal/deploymentref"
|
|
"go.pinniped.dev/internal/downward"
|
|
"go.pinniped.dev/internal/groupsuffix"
|
|
"go.pinniped.dev/internal/kubeclient"
|
|
"go.pinniped.dev/internal/leaderelection"
|
|
"go.pinniped.dev/internal/oidc/jwks"
|
|
"go.pinniped.dev/internal/oidc/provider"
|
|
"go.pinniped.dev/internal/oidc/provider/manager"
|
|
"go.pinniped.dev/internal/plog"
|
|
"go.pinniped.dev/internal/secret"
|
|
)
|
|
|
|
const (
|
|
singletonWorker = 1
|
|
defaultResyncInterval = 3 * time.Minute
|
|
)
|
|
|
|
func startServer(ctx context.Context, shutdown *sync.WaitGroup, l net.Listener, handler http.Handler) {
|
|
handler = genericapifilters.WithWarningRecorder(handler)
|
|
handler = withBootstrapPaths(handler, "/healthz") // only health checks are allowed for bootstrap connections
|
|
|
|
server := http.Server{
|
|
Handler: handler,
|
|
ConnContext: withBootstrapConnCtx,
|
|
}
|
|
|
|
shutdown.Add(1)
|
|
go func() {
|
|
defer shutdown.Done()
|
|
|
|
err := server.Serve(l)
|
|
plog.Debug("server exited", "err", err)
|
|
}()
|
|
|
|
shutdown.Add(1)
|
|
go func() {
|
|
defer shutdown.Done()
|
|
|
|
<-ctx.Done()
|
|
plog.Debug("server context cancelled", "err", ctx.Err())
|
|
|
|
// allow up to a minute grace period for active connections to return to idle
|
|
connectionsCtx, connectionsCancel := context.WithTimeout(context.Background(), time.Minute)
|
|
defer connectionsCancel()
|
|
|
|
if err := server.Shutdown(connectionsCtx); err != nil {
|
|
plog.Debug("server shutdown failed", "err", err)
|
|
}
|
|
}()
|
|
}
|
|
|
|
func signalCtx() context.Context {
|
|
signalCh := make(chan os.Signal, 1)
|
|
signal.Notify(signalCh, os.Interrupt, syscall.SIGTERM)
|
|
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
go func() {
|
|
defer cancel()
|
|
|
|
s := <-signalCh
|
|
plog.Debug("saw signal", "signal", s)
|
|
}()
|
|
|
|
return ctx
|
|
}
|
|
|
|
//nolint:funlen
|
|
func prepareControllers(
|
|
cfg *supervisor.Config,
|
|
issuerManager *manager.Manager,
|
|
dynamicJWKSProvider jwks.DynamicJWKSProvider,
|
|
dynamicTLSCertProvider provider.DynamicTLSCertProvider,
|
|
dynamicUpstreamIDPProvider provider.DynamicUpstreamIDPProvider,
|
|
secretCache *secret.Cache,
|
|
supervisorDeployment *appsv1.Deployment,
|
|
kubeClient kubernetes.Interface,
|
|
pinnipedClient pinnipedclientset.Interface,
|
|
kubeInformers kubeinformers.SharedInformerFactory,
|
|
pinnipedInformers pinnipedinformers.SharedInformerFactory,
|
|
leaderElector controllerinit.RunnerWrapper,
|
|
) controllerinit.RunnerBuilder {
|
|
federationDomainInformer := pinnipedInformers.Config().V1alpha1().FederationDomains()
|
|
secretInformer := kubeInformers.Core().V1().Secrets()
|
|
|
|
// Create controller manager.
|
|
controllerManager := controllerlib.
|
|
NewManager().
|
|
WithController(
|
|
supervisorstorage.GarbageCollectorController(
|
|
dynamicUpstreamIDPProvider,
|
|
clock.RealClock{},
|
|
kubeClient,
|
|
secretInformer,
|
|
controllerlib.WithInformer,
|
|
),
|
|
singletonWorker,
|
|
).
|
|
WithController(
|
|
supervisorconfig.NewFederationDomainWatcherController(
|
|
issuerManager,
|
|
clock.RealClock{},
|
|
pinnipedClient,
|
|
federationDomainInformer,
|
|
controllerlib.WithInformer,
|
|
),
|
|
singletonWorker,
|
|
).
|
|
WithController(
|
|
supervisorconfig.NewJWKSWriterController(
|
|
cfg.Labels,
|
|
kubeClient,
|
|
pinnipedClient,
|
|
secretInformer,
|
|
federationDomainInformer,
|
|
controllerlib.WithInformer,
|
|
),
|
|
singletonWorker,
|
|
).
|
|
WithController(
|
|
supervisorconfig.NewJWKSObserverController(
|
|
dynamicJWKSProvider,
|
|
secretInformer,
|
|
federationDomainInformer,
|
|
controllerlib.WithInformer,
|
|
),
|
|
singletonWorker,
|
|
).
|
|
WithController(
|
|
supervisorconfig.NewTLSCertObserverController(
|
|
dynamicTLSCertProvider,
|
|
cfg.NamesConfig.DefaultTLSCertificateSecret,
|
|
secretInformer,
|
|
federationDomainInformer,
|
|
controllerlib.WithInformer,
|
|
),
|
|
singletonWorker,
|
|
).
|
|
WithController(
|
|
generator.NewSupervisorSecretsController(
|
|
supervisorDeployment,
|
|
cfg.Labels,
|
|
kubeClient,
|
|
secretInformer,
|
|
func(secret []byte) {
|
|
plog.Debug("setting csrf cookie secret")
|
|
secretCache.SetCSRFCookieEncoderHashKey(secret)
|
|
},
|
|
controllerlib.WithInformer,
|
|
controllerlib.WithInitialEvent,
|
|
),
|
|
singletonWorker,
|
|
).
|
|
WithController(
|
|
generator.NewFederationDomainSecretsController(
|
|
generator.NewSymmetricSecretHelper(
|
|
"pinniped-oidc-provider-hmac-key-",
|
|
cfg.Labels,
|
|
rand.Reader,
|
|
generator.SecretUsageTokenSigningKey,
|
|
func(federationDomainIssuer string, symmetricKey []byte) {
|
|
plog.Debug("setting hmac secret", "issuer", federationDomainIssuer)
|
|
secretCache.SetTokenHMACKey(federationDomainIssuer, symmetricKey)
|
|
},
|
|
),
|
|
func(fd *configv1alpha1.FederationDomainStatus) *corev1.LocalObjectReference {
|
|
return &fd.Secrets.TokenSigningKey
|
|
},
|
|
kubeClient,
|
|
pinnipedClient,
|
|
secretInformer,
|
|
federationDomainInformer,
|
|
controllerlib.WithInformer,
|
|
),
|
|
singletonWorker,
|
|
).
|
|
WithController(
|
|
generator.NewFederationDomainSecretsController(
|
|
generator.NewSymmetricSecretHelper(
|
|
"pinniped-oidc-provider-upstream-state-signature-key-",
|
|
cfg.Labels,
|
|
rand.Reader,
|
|
generator.SecretUsageStateSigningKey,
|
|
func(federationDomainIssuer string, symmetricKey []byte) {
|
|
plog.Debug("setting state signature key", "issuer", federationDomainIssuer)
|
|
secretCache.SetStateEncoderHashKey(federationDomainIssuer, symmetricKey)
|
|
},
|
|
),
|
|
func(fd *configv1alpha1.FederationDomainStatus) *corev1.LocalObjectReference {
|
|
return &fd.Secrets.StateSigningKey
|
|
},
|
|
kubeClient,
|
|
pinnipedClient,
|
|
secretInformer,
|
|
federationDomainInformer,
|
|
controllerlib.WithInformer,
|
|
),
|
|
singletonWorker,
|
|
).
|
|
WithController(
|
|
generator.NewFederationDomainSecretsController(
|
|
generator.NewSymmetricSecretHelper(
|
|
"pinniped-oidc-provider-upstream-state-encryption-key-",
|
|
cfg.Labels,
|
|
rand.Reader,
|
|
generator.SecretUsageStateEncryptionKey,
|
|
func(federationDomainIssuer string, symmetricKey []byte) {
|
|
plog.Debug("setting state encryption key", "issuer", federationDomainIssuer)
|
|
secretCache.SetStateEncoderBlockKey(federationDomainIssuer, symmetricKey)
|
|
},
|
|
),
|
|
func(fd *configv1alpha1.FederationDomainStatus) *corev1.LocalObjectReference {
|
|
return &fd.Secrets.StateEncryptionKey
|
|
},
|
|
kubeClient,
|
|
pinnipedClient,
|
|
secretInformer,
|
|
federationDomainInformer,
|
|
controllerlib.WithInformer,
|
|
),
|
|
singletonWorker,
|
|
).
|
|
WithController(
|
|
oidcupstreamwatcher.New(
|
|
dynamicUpstreamIDPProvider,
|
|
pinnipedClient,
|
|
pinnipedInformers.IDP().V1alpha1().OIDCIdentityProviders(),
|
|
secretInformer,
|
|
klogr.New(),
|
|
controllerlib.WithInformer,
|
|
),
|
|
singletonWorker).
|
|
WithController(
|
|
ldapupstreamwatcher.New(
|
|
dynamicUpstreamIDPProvider,
|
|
pinnipedClient,
|
|
pinnipedInformers.IDP().V1alpha1().LDAPIdentityProviders(),
|
|
secretInformer,
|
|
controllerlib.WithInformer,
|
|
),
|
|
singletonWorker).
|
|
WithController(
|
|
activedirectoryupstreamwatcher.New(
|
|
dynamicUpstreamIDPProvider,
|
|
pinnipedClient,
|
|
pinnipedInformers.IDP().V1alpha1().ActiveDirectoryIdentityProviders(),
|
|
secretInformer,
|
|
controllerlib.WithInformer,
|
|
),
|
|
singletonWorker)
|
|
|
|
return controllerinit.Prepare(controllerManager.Start, leaderElector, kubeInformers, pinnipedInformers)
|
|
}
|
|
|
|
func startControllers(ctx context.Context, shutdown *sync.WaitGroup, buildControllers controllerinit.RunnerBuilder) error {
|
|
runControllers, err := buildControllers(ctx)
|
|
if err != nil {
|
|
return fmt.Errorf("cannot create run controller func: %w", err)
|
|
}
|
|
|
|
shutdown.Add(1)
|
|
go func() {
|
|
defer shutdown.Done()
|
|
|
|
runControllers(ctx)
|
|
}()
|
|
|
|
return nil
|
|
}
|
|
|
|
//nolint:funlen
|
|
func runSupervisor(podInfo *downward.PodInfo, cfg *supervisor.Config) error {
|
|
serverInstallationNamespace := podInfo.Namespace
|
|
|
|
dref, supervisorDeployment, supervisorPod, err := deploymentref.New(podInfo)
|
|
if err != nil {
|
|
return fmt.Errorf("cannot create deployment ref: %w", err)
|
|
}
|
|
|
|
opts := []kubeclient.Option{
|
|
dref,
|
|
kubeclient.WithMiddleware(groupsuffix.New(*cfg.APIGroupSuffix)),
|
|
}
|
|
|
|
client, leaderElector, err := leaderelection.New(
|
|
podInfo,
|
|
supervisorDeployment,
|
|
opts...,
|
|
)
|
|
if err != nil {
|
|
return fmt.Errorf("cannot create k8s client: %w", err)
|
|
}
|
|
|
|
clientWithoutLeaderElection, err := kubeclient.New(opts...)
|
|
if err != nil {
|
|
return fmt.Errorf("cannot create k8s client without leader election: %w", err)
|
|
}
|
|
|
|
kubeInformers := kubeinformers.NewSharedInformerFactoryWithOptions(
|
|
client.Kubernetes,
|
|
defaultResyncInterval,
|
|
kubeinformers.WithNamespace(serverInstallationNamespace),
|
|
)
|
|
|
|
pinnipedInformers := pinnipedinformers.NewSharedInformerFactoryWithOptions(
|
|
client.PinnipedSupervisor,
|
|
defaultResyncInterval,
|
|
pinnipedinformers.WithNamespace(serverInstallationNamespace),
|
|
)
|
|
|
|
// Serve the /healthz endpoint and make all other paths result in 404.
|
|
healthMux := http.NewServeMux()
|
|
healthMux.Handle("/healthz", http.HandlerFunc(func(writer http.ResponseWriter, request *http.Request) {
|
|
_, _ = writer.Write([]byte("ok"))
|
|
}))
|
|
|
|
dynamicJWKSProvider := jwks.NewDynamicJWKSProvider()
|
|
dynamicTLSCertProvider := provider.NewDynamicTLSCertProvider()
|
|
dynamicUpstreamIDPProvider := provider.NewDynamicUpstreamIDPProvider()
|
|
secretCache := secret.Cache{}
|
|
|
|
// OIDC endpoints will be served by the oidProvidersManager, and any non-OIDC paths will fallback to the healthMux.
|
|
oidProvidersManager := manager.NewManager(
|
|
healthMux,
|
|
dynamicJWKSProvider,
|
|
dynamicUpstreamIDPProvider,
|
|
&secretCache,
|
|
clientWithoutLeaderElection.Kubernetes.CoreV1().Secrets(serverInstallationNamespace), // writes to kube storage are allowed for non-leaders
|
|
)
|
|
|
|
buildControllersFunc := prepareControllers(
|
|
cfg,
|
|
oidProvidersManager,
|
|
dynamicJWKSProvider,
|
|
dynamicTLSCertProvider,
|
|
dynamicUpstreamIDPProvider,
|
|
&secretCache,
|
|
supervisorDeployment,
|
|
client.Kubernetes,
|
|
client.PinnipedSupervisor,
|
|
kubeInformers,
|
|
pinnipedInformers,
|
|
leaderElector,
|
|
)
|
|
|
|
ctx := signalCtx()
|
|
shutdown := &sync.WaitGroup{}
|
|
|
|
if err := startControllers(ctx, shutdown, buildControllersFunc); err != nil {
|
|
return err
|
|
}
|
|
|
|
if e := cfg.Endpoints.HTTP; e.Network != supervisor.NetworkDisabled {
|
|
finishSetupPerms := maybeSetupUnixPerms(e, supervisorPod)
|
|
|
|
httpListener, err := net.Listen(e.Network, e.Address)
|
|
if err != nil {
|
|
return fmt.Errorf("cannot create http listener with network %q and address %q: %w", e.Network, e.Address, err)
|
|
}
|
|
|
|
if err := finishSetupPerms(); err != nil {
|
|
return fmt.Errorf("cannot setup http listener permissions for network %q and address %q: %w", e.Network, e.Address, err)
|
|
}
|
|
|
|
defer func() { _ = httpListener.Close() }()
|
|
startServer(ctx, shutdown, httpListener, oidProvidersManager)
|
|
plog.Debug("supervisor http listener started", "address", httpListener.Addr().String())
|
|
}
|
|
|
|
if e := cfg.Endpoints.HTTPS; e.Network != supervisor.NetworkDisabled { //nolint:nestif
|
|
finishSetupPerms := maybeSetupUnixPerms(e, supervisorPod)
|
|
|
|
bootstrapCert, err := getBootstrapCert() // generate this in-memory once per process startup
|
|
if err != nil {
|
|
return fmt.Errorf("https listener bootstrap error: %w", err)
|
|
}
|
|
|
|
c := ptls.Default(nil)
|
|
c.GetCertificate = func(info *tls.ClientHelloInfo) (*tls.Certificate, error) {
|
|
cert := dynamicTLSCertProvider.GetTLSCert(strings.ToLower(info.ServerName))
|
|
|
|
defaultCert := dynamicTLSCertProvider.GetDefaultTLSCert()
|
|
|
|
if plog.Enabled(plog.LevelTrace) { // minor CPU optimization as this is generally just noise
|
|
host, port, _ := net.SplitHostPort(info.Conn.LocalAddr().String()) // error is safe to ignore here
|
|
|
|
plog.Trace("GetCertificate called",
|
|
"info.ServerName", info.ServerName,
|
|
"foundSNICert", cert != nil,
|
|
"foundDefaultCert", defaultCert != nil,
|
|
"host", host,
|
|
"port", port,
|
|
)
|
|
}
|
|
|
|
if cert == nil {
|
|
cert = defaultCert
|
|
}
|
|
|
|
if cert == nil {
|
|
setIsBootstrapConn(info.Context()) // make this connection only work for bootstrap requests
|
|
cert = bootstrapCert
|
|
}
|
|
|
|
return cert, nil
|
|
}
|
|
|
|
httpsListener, err := tls.Listen(e.Network, e.Address, c)
|
|
if err != nil {
|
|
return fmt.Errorf("cannot create https listener with network %q and address %q: %w", e.Network, e.Address, err)
|
|
}
|
|
|
|
if err := finishSetupPerms(); err != nil {
|
|
return fmt.Errorf("cannot setup https listener permissions for network %q and address %q: %w", e.Network, e.Address, err)
|
|
}
|
|
|
|
defer func() { _ = httpsListener.Close() }()
|
|
startServer(ctx, shutdown, httpsListener, oidProvidersManager)
|
|
plog.Debug("supervisor https listener started", "address", httpsListener.Addr().String())
|
|
}
|
|
|
|
plog.Debug("supervisor started")
|
|
defer plog.Debug("supervisor exiting")
|
|
|
|
shutdown.Wait()
|
|
|
|
return nil
|
|
}
|
|
|
|
func maybeSetupUnixPerms(endpoint *supervisor.Endpoint, pod *corev1.Pod) func() error {
|
|
if endpoint.Network != supervisor.NetworkUnix {
|
|
return func() error { return nil }
|
|
}
|
|
|
|
_ = os.Remove(endpoint.Address) // empty dir volumes persist across container crashes
|
|
|
|
return func() error {
|
|
selfUser := int64(os.Getuid())
|
|
var entries []acl.Entry
|
|
for _, container := range pod.Spec.Containers {
|
|
if container.SecurityContext == nil ||
|
|
container.SecurityContext.RunAsUser == nil ||
|
|
*container.SecurityContext.RunAsUser == selfUser {
|
|
continue
|
|
}
|
|
|
|
plog.Debug("adding write permission",
|
|
"address", endpoint.Address,
|
|
"uid", *container.SecurityContext.RunAsUser,
|
|
)
|
|
entries = append(entries, acl.Entry{
|
|
Tag: acl.TagUser,
|
|
Qualifier: strconv.FormatInt(*container.SecurityContext.RunAsUser, 10),
|
|
Perms: 2, // write permission
|
|
})
|
|
}
|
|
return acl.Add(endpoint.Address, entries...) // allow all containers in the pod to write to the socket
|
|
}
|
|
}
|
|
|
|
func main() error { // return an error instead of klog.Fatal to allow defer statements to run
|
|
logs.InitLogs()
|
|
defer logs.FlushLogs()
|
|
|
|
klog.Infof("Running %s at %#v", rest.DefaultKubernetesUserAgent(), version.Get())
|
|
klog.Infof("Command-line arguments were: %s %s %s", os.Args[0], os.Args[1], os.Args[2])
|
|
|
|
// Discover in which namespace we are installed.
|
|
podInfo, err := downward.Load(os.Args[1])
|
|
if err != nil {
|
|
return fmt.Errorf("could not read pod metadata: %w", err)
|
|
}
|
|
|
|
// Read the server config file.
|
|
cfg, err := supervisor.FromPath(os.Args[2])
|
|
if err != nil {
|
|
return fmt.Errorf("could not load config: %w", err)
|
|
}
|
|
|
|
return runSupervisor(podInfo, cfg)
|
|
}
|
|
|
|
func Main() {
|
|
if err := main(); err != nil {
|
|
klog.Fatal(err)
|
|
}
|
|
}
|