diff --git a/internal/concierge/impersonator/impersonator.go b/internal/concierge/impersonator/impersonator.go index 311b2fc4..615e6ea8 100644 --- a/internal/concierge/impersonator/impersonator.go +++ b/internal/concierge/impersonator/impersonator.go @@ -18,9 +18,11 @@ import ( "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/runtime/serializer" "k8s.io/apimachinery/pkg/util/errors" + "k8s.io/apimachinery/pkg/util/httpstream" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apiserver/pkg/authentication/user" "k8s.io/apiserver/pkg/authorization/authorizer" + "k8s.io/apiserver/pkg/endpoints/filterlatency" "k8s.io/apiserver/pkg/endpoints/handlers/responsewriters" "k8s.io/apiserver/pkg/endpoints/request" genericapirequest "k8s.io/apiserver/pkg/endpoints/request" @@ -148,9 +150,25 @@ func newInternal( //nolint:funlen // yeah, it's kind of long. defaultBuildHandlerChainFunc := serverConfig.BuildHandlerChainFunc serverConfig.BuildHandlerChainFunc = func(_ http.Handler, c *genericapiserver.Config) http.Handler { // We ignore the passed in handler because we never have any REST APIs to delegate to. - handler := impersonationProxyFunc(c) + // This means we are ignoring the admission, discovery, REST storage, etc layers. + doNotDelegate := http.HandlerFunc(func(_ http.ResponseWriter, _ *http.Request) {}) + + // Impersonation proxy business logic with timing information. + impersonationProxyCompleted := filterlatency.TrackCompleted(doNotDelegate) + impersonationProxy := impersonationProxyFunc(c) + handler := http.Handler(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + defer impersonationProxyCompleted.ServeHTTP(w, r) + impersonationProxy.ServeHTTP(w, r) + })) + handler = filterlatency.TrackStarted(handler, "impersonationproxy") + + // The standard Kube handler chain (authn, authz, impersonation, audit, etc). + // See the genericapiserver.DefaultBuildHandlerChain func for details. handler = defaultBuildHandlerChainFunc(handler, c) + + // Always set security headers so browsers do the right thing. handler = securityheader.Wrap(handler) + return handler } @@ -258,22 +276,11 @@ func newImpersonationReverseProxyFunc(restConfig *rest.Config) (func(*genericapi return } - reqInfo, ok := request.RequestInfoFrom(r.Context()) - if !ok { - plog.Warning("aggregated API server logic did not set request info but it is always supposed to do so", - "url", r.URL.String(), - "method", r.Method, - ) - newInternalErrResponse(w, r, c.Serializer, "invalid request info") - return - } - - // when we are running regular requests (e.g., CRUD) we should always be able to use HTTP/2.0 - // since KAS always supports that and it goes through proxies just fine. for long running - // requests (e.g., proxy, watch), we know they use http/1.1 with an upgrade to - // websockets/SPDY (this upgrade is NEVER to HTTP/2.0 as the KAS does not support that). + // KAS only supports upgrades via http/1.1 to websockets/SPDY (upgrades never use http/2.0) + // Thus we default to using http/2.0 when the request is not an upgrade, otherwise we use http/1.1 baseRT := http2RoundTripper - if c.LongRunningFunc(r, reqInfo) { + isUpgradeRequest := httpstream.IsUpgradeRequest(r) + if isUpgradeRequest { baseRT = http1RoundTripper } @@ -282,19 +289,31 @@ func newImpersonationReverseProxyFunc(restConfig *rest.Config) (func(*genericapi plog.WarningErr("rejecting request as we cannot act as the current user", err, "url", r.URL.String(), "method", r.Method, + "isUpgradeRequest", isUpgradeRequest, ) newInternalErrResponse(w, r, c.Serializer, "unimplemented functionality - unable to act as current user") return } - plog.Debug("impersonation proxy servicing request", "method", r.Method, "url", r.URL.String()) - plog.Trace("impersonation proxy servicing request was for user", "method", r.Method, "url", r.URL.String(), + plog.Debug("impersonation proxy servicing request", + "url", r.URL.String(), + "method", r.Method, + "isUpgradeRequest", isUpgradeRequest, + ) + plog.Trace("impersonation proxy servicing request was for user", + "url", r.URL.String(), + "method", r.Method, + "isUpgradeRequest", isUpgradeRequest, "username", userInfo.GetName(), // this info leak seems fine for trace level logs ) // The proxy library used below will panic when the client disconnects abruptly, so in order to // assure that this log message is always printed at the end of this func, it must be deferred. - defer plog.Debug("impersonation proxy finished servicing request", "method", r.Method, "url", r.URL.String()) + defer plog.Debug("impersonation proxy finished servicing request", + "url", r.URL.String(), + "method", r.Method, + "isUpgradeRequest", isUpgradeRequest, + ) reverseProxy := httputil.NewSingleHostReverseProxy(serverURL) reverseProxy.Transport = rt diff --git a/internal/concierge/impersonator/impersonator_test.go b/internal/concierge/impersonator/impersonator_test.go index 9fccbe9e..2999236b 100644 --- a/internal/concierge/impersonator/impersonator_test.go +++ b/internal/concierge/impersonator/impersonator_test.go @@ -19,6 +19,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/serializer" + "k8s.io/apimachinery/pkg/util/httpstream" "k8s.io/apiserver/pkg/authentication/user" "k8s.io/apiserver/pkg/endpoints/request" "k8s.io/apiserver/pkg/features" @@ -71,6 +72,7 @@ func TestImpersonator(t *testing.T) { clientCert *clientCert clientImpersonateUser rest.ImpersonationConfig clientMutateHeaders func(http.Header) + clientNextProtos []string kubeAPIServerClientBearerTokenFile string kubeAPIServerStatusCode int wantKubeAPIServerRequestHeaders http.Header @@ -91,6 +93,31 @@ func TestImpersonator(t *testing.T) { "X-Forwarded-For": {"127.0.0.1"}, }, }, + { + name: "happy path with upgrade", + clientCert: newClientCert(t, ca, "test-username2", []string{"test-group3", "test-group4"}), + kubeAPIServerClientBearerTokenFile: "required-to-be-set", + clientMutateHeaders: func(header http.Header) { + header.Add("Connection", "Upgrade") + header.Add("Upgrade", "spdy/3.1") + + if ok := httpstream.IsUpgradeRequest(&http.Request{Header: header}); !ok { + panic("request must be upgrade in this test") + } + }, + clientNextProtos: []string{"http/1.1"}, // we need to use http1 as http2 does not support upgrades, see http2checkConnHeaders + wantKubeAPIServerRequestHeaders: http.Header{ + "Impersonate-User": {"test-username2"}, + "Impersonate-Group": {"test-group3", "test-group4", "system:authenticated"}, + "Authorization": {"Bearer some-service-account-token"}, + "User-Agent": {"test-agent"}, + "Accept": {"application/vnd.kubernetes.protobuf,application/json"}, + "Accept-Encoding": {"gzip"}, + "X-Forwarded-For": {"127.0.0.1"}, + "Connection": {"Upgrade"}, + "Upgrade": {"spdy/3.1"}, + }, + }, { name: "user is authenticated but the kube API request returns an error", kubeAPIServerStatusCode: http.StatusNotFound, @@ -255,9 +282,10 @@ func TestImpersonator(t *testing.T) { clientKubeconfig := &rest.Config{ Host: "https://127.0.0.1:" + strconv.Itoa(port), TLSClientConfig: rest.TLSClientConfig{ - CAData: ca.Bundle(), - CertData: tt.clientCert.certPEM, - KeyData: tt.clientCert.keyPEM, + CAData: ca.Bundle(), + CertData: tt.clientCert.certPEM, + KeyData: tt.clientCert.keyPEM, + NextProtos: tt.clientNextProtos, }, UserAgent: "test-agent", // BearerToken should be ignored during auth when there are valid client certs, @@ -514,16 +542,31 @@ func TestImpersonatorHTTPHandler(t *testing.T) { metav1.AddToGroupVersion(scheme, metav1.Unversioned) codecs := serializer.NewCodecFactory(scheme) serverConfig := genericapiserver.NewRecommendedConfig(codecs) - serverConfig.Config.LongRunningFunc = func(_ *http.Request, _ *request.RequestInfo) bool { - // take the HTTP/2.0 vs HTTP/1.1 branch randomly to make sure we exercise both branches - return rand.Int()%2 == 0 //nolint:gosec // we don't care whether this is cryptographically secure or not - } w := httptest.NewRecorder() - requestBeforeServe := tt.request.Clone(tt.request.Context()) - impersonatorHTTPHandlerFunc(&serverConfig.Config).ServeHTTP(w, tt.request) - require.Equal(t, requestBeforeServe, tt.request, "ServeHTTP() mutated the request, and it should not per http.Handler docs") + r := tt.request + wantKubeAPIServerRequestHeaders := tt.wantKubeAPIServerRequestHeaders + + // take the isUpgradeRequest branch randomly to make sure we exercise both branches + forceUpgradeRequest := rand.Int()%2 == 0 //nolint:gosec // we do not care if this is cryptographically secure + if forceUpgradeRequest && len(r.Header.Get("Upgrade")) == 0 { + r = r.Clone(r.Context()) + r.Header.Add("Connection", "Upgrade") + r.Header.Add("Upgrade", "spdy/3.1") + + wantKubeAPIServerRequestHeaders = wantKubeAPIServerRequestHeaders.Clone() + if wantKubeAPIServerRequestHeaders == nil { + wantKubeAPIServerRequestHeaders = http.Header{} + } + wantKubeAPIServerRequestHeaders.Add("Connection", "Upgrade") + wantKubeAPIServerRequestHeaders.Add("Upgrade", "spdy/3.1") + } + + requestBeforeServe := r.Clone(r.Context()) + impersonatorHTTPHandlerFunc(&serverConfig.Config).ServeHTTP(w, r) + + require.Equal(t, requestBeforeServe, r, "ServeHTTP() mutated the request, and it should not per http.Handler docs") if tt.wantHTTPStatus != 0 { require.Equalf(t, tt.wantHTTPStatus, w.Code, "fyi, response body was %q", w.Body.String()) } @@ -533,7 +576,7 @@ func TestImpersonatorHTTPHandler(t *testing.T) { if tt.wantHTTPStatus == http.StatusOK || tt.kubeAPIServerStatusCode != http.StatusOK { require.True(t, testKubeAPIServerWasCalled, "Should have proxied the request to the Kube API server, but didn't") - require.Equal(t, tt.wantKubeAPIServerRequestHeaders, testKubeAPIServerSawHeaders) + require.Equal(t, wantKubeAPIServerRequestHeaders, testKubeAPIServerSawHeaders) } else { require.False(t, testKubeAPIServerWasCalled, "Should not have proxied the request to the Kube API server, but did") }