2021-01-20 00:37:02 +00:00
// Copyright 2020-2021 the Pinniped contributors. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
package impersonator
import (
"fmt"
2021-03-10 18:30:06 +00:00
"net"
2021-01-20 00:37:02 +00:00
"net/http"
"net/http/httputil"
"net/url"
"strings"
2021-02-23 01:23:11 +00:00
"time"
2021-01-20 00:37:02 +00:00
2021-03-10 18:30:06 +00:00
"k8s.io/apimachinery/pkg/util/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2021-02-15 23:00:10 +00:00
"k8s.io/apimachinery/pkg/runtime"
2021-03-10 18:30:06 +00:00
"k8s.io/apimachinery/pkg/runtime/serializer"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apiserver/pkg/authorization/authorizer"
"k8s.io/apiserver/pkg/endpoints/request"
genericapiserver "k8s.io/apiserver/pkg/server"
"k8s.io/apiserver/pkg/server/dynamiccertificates"
genericoptions "k8s.io/apiserver/pkg/server/options"
2021-01-20 00:37:02 +00:00
"k8s.io/client-go/rest"
"k8s.io/client-go/transport"
2021-02-18 15:13:24 +00:00
"go.pinniped.dev/internal/constable"
2021-03-10 18:30:06 +00:00
"go.pinniped.dev/internal/httputil/securityheader"
2021-02-09 20:28:56 +00:00
"go.pinniped.dev/internal/kubeclient"
2021-03-10 18:30:06 +00:00
"go.pinniped.dev/internal/plog"
2021-01-20 00:37:02 +00:00
)
2021-03-10 18:30:06 +00:00
// FactoryFunc is a function which can create an impersonator server.
// It returns a function which will start the impersonator server.
// That start function takes a stopCh which can be used to stop the server.
// Once a server has been stopped, don't start it again using the start function.
// Instead, call the factory function again to get a new start function.
type FactoryFunc func (
port int ,
dynamicCertProvider dynamiccertificates . CertKeyContentProvider ,
impersonationProxySignerCA dynamiccertificates . CAContentProvider ,
) ( func ( stopCh <- chan struct { } ) error , error )
func New (
port int ,
dynamicCertProvider dynamiccertificates . CertKeyContentProvider , // TODO: we need to check those optional interfaces and see what we need to implement
impersonationProxySignerCA dynamiccertificates . CAContentProvider , // TODO: we need to check those optional interfaces and see what we need to implement
) ( func ( stopCh <- chan struct { } ) error , error ) {
return newInternal ( port , dynamicCertProvider , impersonationProxySignerCA , nil , nil )
2021-01-20 00:37:02 +00:00
}
2021-03-10 18:30:06 +00:00
func newInternal ( //nolint:funlen // yeah, it's kind of long.
port int ,
dynamicCertProvider dynamiccertificates . CertKeyContentProvider ,
impersonationProxySignerCA dynamiccertificates . CAContentProvider ,
clientOpts [ ] kubeclient . Option , // for unit testing, should always be nil in production
recOpts func ( * genericoptions . RecommendedOptions ) , // for unit testing, should always be nil in production
) ( func ( stopCh <- chan struct { } ) error , error ) {
var listener net . Listener
constructServer := func ( ) ( func ( stopCh <- chan struct { } ) error , error ) {
// bare minimum server side scheme to allow for status messages to be encoded
scheme := runtime . NewScheme ( )
metav1 . AddToGroupVersion ( scheme , metav1 . Unversioned )
codecs := serializer . NewCodecFactory ( scheme )
// this is unused for now but it is a safe value that we could use in the future
defaultEtcdPathPrefix := "/pinniped-impersonation-proxy-registry"
recommendedOptions := genericoptions . NewRecommendedOptions (
defaultEtcdPathPrefix ,
codecs . LegacyCodec ( ) ,
)
recommendedOptions . Etcd = nil // turn off etcd storage because we don't need it yet
recommendedOptions . SecureServing . ServerCert . GeneratedCert = dynamicCertProvider // serving certs (end user facing)
recommendedOptions . SecureServing . BindPort = port
// wire up the impersonation proxy signer CA as a valid authenticator for client cert auth
// TODO fix comments
kubeClient , err := kubeclient . New ( clientOpts ... )
2021-02-09 20:28:56 +00:00
if err != nil {
return nil , err
}
2021-03-10 18:30:06 +00:00
kubeClientCA , err := dynamiccertificates . NewDynamicCAFromConfigMapController ( "client-ca" , metav1 . NamespaceSystem , "extension-apiserver-authentication" , "client-ca-file" , kubeClient . Kubernetes )
if err != nil {
return nil , err
}
recommendedOptions . Authentication . ClientCert . ClientCA = "---irrelevant-but-needs-to-be-non-empty---" // drop when we pick up https://github.com/kubernetes/kubernetes/pull/100055
recommendedOptions . Authentication . ClientCert . CAContentProvider = dynamiccertificates . NewUnionCAContentProvider ( impersonationProxySignerCA , kubeClientCA )
if recOpts != nil {
recOpts ( recommendedOptions )
}
serverConfig := genericapiserver . NewRecommendedConfig ( codecs )
// Note that ApplyTo is going to create a network listener and bind to the requested port.
// It puts this listener into serverConfig.SecureServing.Listener.
err = recommendedOptions . ApplyTo ( serverConfig )
if serverConfig . SecureServing != nil {
// Set the pointer from the outer function to allow the outer function to close the listener in case
// this function returns an error for any reason anywhere below here.
listener = serverConfig . SecureServing . Listener
}
if err != nil {
return nil , err
}
// loopback authentication to this server does not really make sense since we just proxy everything to KAS
// thus we replace loopback connection config with one that does direct connections to KAS
// loopback config is mainly used by post start hooks, so this is mostly future proofing
serverConfig . LoopbackClientConfig = rest . CopyConfig ( kubeClient . ProtoConfig ) // assume proto is safe (hooks can override)
// remove the bearer token so our authorizer does not get stomped on by AuthorizeClientBearerToken
// see sanity checks at the end of this function
serverConfig . LoopbackClientConfig . BearerToken = ""
// assume proto config is safe because transport level configs do not use rest.ContentConfig
// thus if we are interacting with actual APIs, they should be using pre-built clients
impersonationProxy , err := newImpersonationReverseProxy ( rest . CopyConfig ( kubeClient . ProtoConfig ) )
if err != nil {
return nil , err
}
defaultBuildHandlerChainFunc := serverConfig . BuildHandlerChainFunc
serverConfig . BuildHandlerChainFunc = func ( _ http . Handler , c * genericapiserver . Config ) http . Handler {
// we ignore the passed in handler because we never have any REST APIs to delegate to
handler := defaultBuildHandlerChainFunc ( impersonationProxy , c )
handler = securityheader . Wrap ( handler )
return handler
}
2021-01-20 00:37:02 +00:00
2021-03-10 18:30:06 +00:00
// TODO integration test this authorizer logic with system:masters + double impersonation
// overwrite the delegating authorizer with one that only cares about impersonation
// empty string is disallowed because request info has had bugs in the past where it would leave it empty
disallowedVerbs := sets . NewString ( "" , "impersonate" )
noImpersonationAuthorizer := & comparableAuthorizer {
AuthorizerFunc : func ( a authorizer . Attributes ) ( authorizer . Decision , string , error ) {
// supporting impersonation is not hard, it would just require a bunch of testing
// and configuring the audit layer (to preserve the caller) which we can do later
// we would also want to delete the incoming impersonation headers
// instead of overwriting the delegating authorizer, we would
// actually use it to make the impersonation authorization checks
if disallowedVerbs . Has ( a . GetVerb ( ) ) {
return authorizer . DecisionDeny , "impersonation is not allowed or invalid verb" , nil
}
return authorizer . DecisionAllow , "deferring authorization to kube API server" , nil
} ,
}
// TODO write a big comment explaining wth this is doing
serverConfig . Authorization . Authorizer = noImpersonationAuthorizer
impersonationProxyServer , err := serverConfig . Complete ( ) . New ( "impersonation-proxy" , genericapiserver . NewEmptyDelegate ( ) )
if err != nil {
return nil , err
}
preparedRun := impersonationProxyServer . PrepareRun ( )
// wait until the very end to do sanity checks
if preparedRun . Authorizer != noImpersonationAuthorizer {
return nil , constable . Error ( "invalid mutation of impersonation authorizer detected" )
}
// assert that we have a functioning token file to use and no bearer token
if len ( preparedRun . LoopbackClientConfig . BearerToken ) != 0 || len ( preparedRun . LoopbackClientConfig . BearerTokenFile ) == 0 {
return nil , constable . Error ( "invalid impersonator loopback rest config has wrong bearer token semantics" )
}
// TODO make sure this is closed on error
_ = preparedRun . SecureServingInfo . Listener
return preparedRun . Run , nil
}
result , err := constructServer ( )
// If there was any error during construction, then we would like to close the listener to free up the port.
2021-01-20 00:37:02 +00:00
if err != nil {
2021-03-10 18:30:06 +00:00
errs := [ ] error { err }
if listener != nil {
errs = append ( errs , listener . Close ( ) )
}
return nil , errors . NewAggregate ( errs )
2021-01-20 00:37:02 +00:00
}
2021-03-10 18:30:06 +00:00
return result , nil
}
// No-op wrapping around AuthorizerFunc to allow for comparisons.
type comparableAuthorizer struct {
authorizer . AuthorizerFunc
}
2021-01-20 00:37:02 +00:00
2021-03-10 18:30:06 +00:00
func newImpersonationReverseProxy ( restConfig * rest . Config ) ( http . Handler , error ) {
serverURL , err := url . Parse ( restConfig . Host )
2021-01-20 00:37:02 +00:00
if err != nil {
return nil , fmt . Errorf ( "could not parse host URL from in-cluster config: %w" , err )
}
2021-03-10 18:30:06 +00:00
kubeTransportConfig , err := restConfig . TransportConfig ( )
2021-01-20 00:37:02 +00:00
if err != nil {
return nil , fmt . Errorf ( "could not get in-cluster transport config: %w" , err )
}
2021-02-23 01:23:11 +00:00
kubeTransportConfig . TLS . NextProtos = [ ] string { "http/1.1" } // TODO huh?
2021-01-20 00:37:02 +00:00
kubeRoundTripper , err := transport . New ( kubeTransportConfig )
if err != nil {
return nil , fmt . Errorf ( "could not get in-cluster transport: %w" , err )
}
2021-03-10 18:30:06 +00:00
return http . HandlerFunc ( func ( w http . ResponseWriter , r * http . Request ) {
// TODO integration test using a bearer token
if len ( r . Header . Values ( "Authorization" ) ) != 0 {
plog . Warning ( "aggregated API server logic did not delete authorization header but it is always supposed to do so" ,
"url" , r . URL . String ( ) ,
"method" , r . Method ,
)
http . Error ( w , "invalid authorization header" , http . StatusInternalServerError )
return
2021-02-18 15:13:24 +00:00
}
2021-03-10 18:30:06 +00:00
if err := ensureNoImpersonationHeaders ( r ) ; err != nil {
plog . Error ( "noImpersonationAuthorizer logic did not prevent nested impersonation but it is always supposed to do so" ,
err ,
"url" , r . URL . String ( ) ,
"method" , r . Method ,
)
http . Error ( w , "invalid impersonation" , http . StatusInternalServerError )
return
2021-02-18 15:13:24 +00:00
}
2021-01-20 00:37:02 +00:00
2021-03-10 18:30:06 +00:00
userInfo , ok := request . UserFrom ( r . Context ( ) )
2021-02-18 15:13:24 +00:00
if ! ok {
2021-03-10 18:30:06 +00:00
plog . Warning ( "aggregated API server logic did not set user info but it is always supposed to do so" ,
"url" , r . URL . String ( ) ,
"method" , r . Method ,
)
http . Error ( w , "invalid user" , http . StatusInternalServerError )
return
2021-02-18 15:13:24 +00:00
}
2021-01-20 00:37:02 +00:00
2021-03-10 18:30:06 +00:00
if len ( userInfo . GetUID ( ) ) > 0 {
plog . Warning ( "rejecting request with UID since we cannot impersonate UIDs" ,
"url" , r . URL . String ( ) ,
"method" , r . Method ,
)
http . Error ( w , "unexpected uid" , http . StatusUnprocessableEntity )
return
}
2021-01-20 00:37:02 +00:00
2021-03-10 18:30:06 +00:00
plog . Trace ( "proxying authenticated request" ,
"url" , r . URL . String ( ) ,
"method" , r . Method ,
"username" , userInfo . GetName ( ) , // this info leak seems fine for trace level logs
)
2021-01-20 00:37:02 +00:00
2021-03-10 18:30:06 +00:00
reverseProxy := httputil . NewSingleHostReverseProxy ( serverURL )
impersonateConfig := transport . ImpersonationConfig {
UserName : userInfo . GetName ( ) ,
Groups : userInfo . GetGroups ( ) ,
Extra : userInfo . GetExtra ( ) ,
}
reverseProxy . Transport = transport . NewImpersonatingRoundTripper ( impersonateConfig , kubeRoundTripper )
reverseProxy . FlushInterval = 200 * time . Millisecond // the "watch" verb will not work without this line
// transport.NewImpersonatingRoundTripper clones the request before setting headers
// so this call will not accidentally mutate the input request (see http.Handler docs)
reverseProxy . ServeHTTP ( w , r )
} ) , nil
2021-02-18 15:13:24 +00:00
}
2021-02-16 13:15:50 +00:00
func ensureNoImpersonationHeaders ( r * http . Request ) error {
2021-03-02 22:56:54 +00:00
for key := range r . Header {
2021-03-10 18:30:06 +00:00
if strings . HasPrefix ( key , "Impersonate" ) {
2021-03-02 22:56:54 +00:00
return fmt . Errorf ( "%q header already exists" , key )
2021-02-16 13:15:50 +00:00
}
}
2021-02-18 15:13:24 +00:00
2021-03-10 18:30:06 +00:00
return nil
2021-01-20 00:37:02 +00:00
}