ContainerImage.Pinniped/internal/concierge/impersonator/impersonator.go

282 lines
11 KiB
Go
Raw Normal View History

// Copyright 2020-2021 the Pinniped contributors. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
package impersonator
import (
"fmt"
"net"
"net/http"
"net/http/httputil"
"net/url"
"strings"
"time"
"k8s.io/apimachinery/pkg/util/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/serializer"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apiserver/pkg/authorization/authorizer"
"k8s.io/apiserver/pkg/endpoints/request"
genericapiserver "k8s.io/apiserver/pkg/server"
"k8s.io/apiserver/pkg/server/dynamiccertificates"
genericoptions "k8s.io/apiserver/pkg/server/options"
"k8s.io/client-go/rest"
"k8s.io/client-go/transport"
"go.pinniped.dev/internal/constable"
"go.pinniped.dev/internal/httputil/securityheader"
"go.pinniped.dev/internal/kubeclient"
"go.pinniped.dev/internal/plog"
)
// FactoryFunc is a function which can create an impersonator server.
// It returns a function which will start the impersonator server.
// That start function takes a stopCh which can be used to stop the server.
// Once a server has been stopped, don't start it again using the start function.
// Instead, call the factory function again to get a new start function.
type FactoryFunc func(
port int,
dynamicCertProvider dynamiccertificates.CertKeyContentProvider,
impersonationProxySignerCA dynamiccertificates.CAContentProvider,
) (func(stopCh <-chan struct{}) error, error)
func New(
port int,
dynamicCertProvider dynamiccertificates.CertKeyContentProvider, // TODO: we need to check those optional interfaces and see what we need to implement
impersonationProxySignerCA dynamiccertificates.CAContentProvider, // TODO: we need to check those optional interfaces and see what we need to implement
) (func(stopCh <-chan struct{}) error, error) {
return newInternal(port, dynamicCertProvider, impersonationProxySignerCA, nil, nil)
}
func newInternal( //nolint:funlen // yeah, it's kind of long.
port int,
dynamicCertProvider dynamiccertificates.CertKeyContentProvider,
impersonationProxySignerCA dynamiccertificates.CAContentProvider,
clientOpts []kubeclient.Option, // for unit testing, should always be nil in production
recOpts func(*genericoptions.RecommendedOptions), // for unit testing, should always be nil in production
) (func(stopCh <-chan struct{}) error, error) {
var listener net.Listener
constructServer := func() (func(stopCh <-chan struct{}) error, error) {
// bare minimum server side scheme to allow for status messages to be encoded
scheme := runtime.NewScheme()
metav1.AddToGroupVersion(scheme, metav1.Unversioned)
codecs := serializer.NewCodecFactory(scheme)
// this is unused for now but it is a safe value that we could use in the future
defaultEtcdPathPrefix := "/pinniped-impersonation-proxy-registry"
recommendedOptions := genericoptions.NewRecommendedOptions(
defaultEtcdPathPrefix,
codecs.LegacyCodec(),
)
recommendedOptions.Etcd = nil // turn off etcd storage because we don't need it yet
recommendedOptions.SecureServing.ServerCert.GeneratedCert = dynamicCertProvider // serving certs (end user facing)
recommendedOptions.SecureServing.BindPort = port
// wire up the impersonation proxy signer CA as a valid authenticator for client cert auth
// TODO fix comments
kubeClient, err := kubeclient.New(clientOpts...)
if err != nil {
return nil, err
}
kubeClientCA, err := dynamiccertificates.NewDynamicCAFromConfigMapController("client-ca", metav1.NamespaceSystem, "extension-apiserver-authentication", "client-ca-file", kubeClient.Kubernetes)
if err != nil {
return nil, err
}
recommendedOptions.Authentication.ClientCert.ClientCA = "---irrelevant-but-needs-to-be-non-empty---" // drop when we pick up https://github.com/kubernetes/kubernetes/pull/100055
recommendedOptions.Authentication.ClientCert.CAContentProvider = dynamiccertificates.NewUnionCAContentProvider(impersonationProxySignerCA, kubeClientCA)
if recOpts != nil {
recOpts(recommendedOptions)
}
serverConfig := genericapiserver.NewRecommendedConfig(codecs)
// Note that ApplyTo is going to create a network listener and bind to the requested port.
// It puts this listener into serverConfig.SecureServing.Listener.
err = recommendedOptions.ApplyTo(serverConfig)
if serverConfig.SecureServing != nil {
// Set the pointer from the outer function to allow the outer function to close the listener in case
// this function returns an error for any reason anywhere below here.
listener = serverConfig.SecureServing.Listener
}
if err != nil {
return nil, err
}
// loopback authentication to this server does not really make sense since we just proxy everything to KAS
// thus we replace loopback connection config with one that does direct connections to KAS
// loopback config is mainly used by post start hooks, so this is mostly future proofing
serverConfig.LoopbackClientConfig = rest.CopyConfig(kubeClient.ProtoConfig) // assume proto is safe (hooks can override)
// remove the bearer token so our authorizer does not get stomped on by AuthorizeClientBearerToken
// see sanity checks at the end of this function
serverConfig.LoopbackClientConfig.BearerToken = ""
// assume proto config is safe because transport level configs do not use rest.ContentConfig
// thus if we are interacting with actual APIs, they should be using pre-built clients
impersonationProxy, err := newImpersonationReverseProxy(rest.CopyConfig(kubeClient.ProtoConfig))
if err != nil {
return nil, err
}
defaultBuildHandlerChainFunc := serverConfig.BuildHandlerChainFunc
serverConfig.BuildHandlerChainFunc = func(_ http.Handler, c *genericapiserver.Config) http.Handler {
// we ignore the passed in handler because we never have any REST APIs to delegate to
handler := defaultBuildHandlerChainFunc(impersonationProxy, c)
handler = securityheader.Wrap(handler)
return handler
}
// TODO integration test this authorizer logic with system:masters + double impersonation
// overwrite the delegating authorizer with one that only cares about impersonation
// empty string is disallowed because request info has had bugs in the past where it would leave it empty
disallowedVerbs := sets.NewString("", "impersonate")
noImpersonationAuthorizer := &comparableAuthorizer{
AuthorizerFunc: func(a authorizer.Attributes) (authorizer.Decision, string, error) {
// supporting impersonation is not hard, it would just require a bunch of testing
// and configuring the audit layer (to preserve the caller) which we can do later
// we would also want to delete the incoming impersonation headers
// instead of overwriting the delegating authorizer, we would
// actually use it to make the impersonation authorization checks
if disallowedVerbs.Has(a.GetVerb()) {
return authorizer.DecisionDeny, "impersonation is not allowed or invalid verb", nil
}
return authorizer.DecisionAllow, "deferring authorization to kube API server", nil
},
}
// TODO write a big comment explaining wth this is doing
serverConfig.Authorization.Authorizer = noImpersonationAuthorizer
impersonationProxyServer, err := serverConfig.Complete().New("impersonation-proxy", genericapiserver.NewEmptyDelegate())
if err != nil {
return nil, err
}
preparedRun := impersonationProxyServer.PrepareRun()
// wait until the very end to do sanity checks
if preparedRun.Authorizer != noImpersonationAuthorizer {
return nil, constable.Error("invalid mutation of impersonation authorizer detected")
}
// assert that we have a functioning token file to use and no bearer token
if len(preparedRun.LoopbackClientConfig.BearerToken) != 0 || len(preparedRun.LoopbackClientConfig.BearerTokenFile) == 0 {
return nil, constable.Error("invalid impersonator loopback rest config has wrong bearer token semantics")
}
// TODO make sure this is closed on error
_ = preparedRun.SecureServingInfo.Listener
return preparedRun.Run, nil
}
result, err := constructServer()
// If there was any error during construction, then we would like to close the listener to free up the port.
if err != nil {
errs := []error{err}
if listener != nil {
errs = append(errs, listener.Close())
}
return nil, errors.NewAggregate(errs)
}
return result, nil
}
// No-op wrapping around AuthorizerFunc to allow for comparisons.
type comparableAuthorizer struct {
authorizer.AuthorizerFunc
}
func newImpersonationReverseProxy(restConfig *rest.Config) (http.Handler, error) {
serverURL, err := url.Parse(restConfig.Host)
if err != nil {
return nil, fmt.Errorf("could not parse host URL from in-cluster config: %w", err)
}
kubeTransportConfig, err := restConfig.TransportConfig()
if err != nil {
return nil, fmt.Errorf("could not get in-cluster transport config: %w", err)
}
kubeTransportConfig.TLS.NextProtos = []string{"http/1.1"} // TODO huh?
kubeRoundTripper, err := transport.New(kubeTransportConfig)
if err != nil {
return nil, fmt.Errorf("could not get in-cluster transport: %w", err)
}
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// TODO integration test using a bearer token
if len(r.Header.Values("Authorization")) != 0 {
plog.Warning("aggregated API server logic did not delete authorization header but it is always supposed to do so",
"url", r.URL.String(),
"method", r.Method,
)
http.Error(w, "invalid authorization header", http.StatusInternalServerError)
return
}
if err := ensureNoImpersonationHeaders(r); err != nil {
plog.Error("noImpersonationAuthorizer logic did not prevent nested impersonation but it is always supposed to do so",
err,
"url", r.URL.String(),
"method", r.Method,
)
http.Error(w, "invalid impersonation", http.StatusInternalServerError)
return
}
userInfo, ok := request.UserFrom(r.Context())
if !ok {
plog.Warning("aggregated API server logic did not set user info but it is always supposed to do so",
"url", r.URL.String(),
"method", r.Method,
)
http.Error(w, "invalid user", http.StatusInternalServerError)
return
}
if len(userInfo.GetUID()) > 0 {
plog.Warning("rejecting request with UID since we cannot impersonate UIDs",
"url", r.URL.String(),
"method", r.Method,
)
http.Error(w, "unexpected uid", http.StatusUnprocessableEntity)
return
}
plog.Trace("proxying authenticated request",
"url", r.URL.String(),
"method", r.Method,
"username", userInfo.GetName(), // this info leak seems fine for trace level logs
)
reverseProxy := httputil.NewSingleHostReverseProxy(serverURL)
impersonateConfig := transport.ImpersonationConfig{
UserName: userInfo.GetName(),
Groups: userInfo.GetGroups(),
Extra: userInfo.GetExtra(),
}
reverseProxy.Transport = transport.NewImpersonatingRoundTripper(impersonateConfig, kubeRoundTripper)
reverseProxy.FlushInterval = 200 * time.Millisecond // the "watch" verb will not work without this line
// transport.NewImpersonatingRoundTripper clones the request before setting headers
// so this call will not accidentally mutate the input request (see http.Handler docs)
reverseProxy.ServeHTTP(w, r)
}), nil
}
func ensureNoImpersonationHeaders(r *http.Request) error {
for key := range r.Header {
if strings.HasPrefix(key, "Impersonate") {
return fmt.Errorf("%q header already exists", key)
}
}
return nil
}