Use http2 for all non-upgrade requests

Instead of using the LongRunningFunc to determine if we can safely
use http2, follow the same logic as the aggregation proxy and only
use http2 when the request is not an upgrade.

Signed-off-by: Monis Khan <mok@vmware.com>
This commit is contained in:
Monis Khan 2021-03-19 13:39:55 -04:00
parent 2749044625
commit c03fe2d1fe
No known key found for this signature in database
GPG Key ID: 52C90ADA01B269B8
2 changed files with 92 additions and 30 deletions

View File

@ -18,9 +18,11 @@ import (
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/runtime/serializer"
"k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apimachinery/pkg/util/httpstream"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apiserver/pkg/authentication/user"
"k8s.io/apiserver/pkg/authorization/authorizer"
"k8s.io/apiserver/pkg/endpoints/filterlatency"
"k8s.io/apiserver/pkg/endpoints/handlers/responsewriters"
"k8s.io/apiserver/pkg/endpoints/request"
genericapirequest "k8s.io/apiserver/pkg/endpoints/request"
@ -148,9 +150,25 @@ func newInternal( //nolint:funlen // yeah, it's kind of long.
defaultBuildHandlerChainFunc := serverConfig.BuildHandlerChainFunc
serverConfig.BuildHandlerChainFunc = func(_ http.Handler, c *genericapiserver.Config) http.Handler {
// We ignore the passed in handler because we never have any REST APIs to delegate to.
handler := impersonationProxyFunc(c)
// This means we are ignoring the admission, discovery, REST storage, etc layers.
doNotDelegate := http.HandlerFunc(func(_ http.ResponseWriter, _ *http.Request) {})
// Impersonation proxy business logic with timing information.
impersonationProxyCompleted := filterlatency.TrackCompleted(doNotDelegate)
impersonationProxy := impersonationProxyFunc(c)
handler := http.Handler(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
defer impersonationProxyCompleted.ServeHTTP(w, r)
impersonationProxy.ServeHTTP(w, r)
}))
handler = filterlatency.TrackStarted(handler, "impersonationproxy")
// The standard Kube handler chain (authn, authz, impersonation, audit, etc).
// See the genericapiserver.DefaultBuildHandlerChain func for details.
handler = defaultBuildHandlerChainFunc(handler, c)
// Always set security headers so browsers do the right thing.
handler = securityheader.Wrap(handler)
return handler
}
@ -258,22 +276,11 @@ func newImpersonationReverseProxyFunc(restConfig *rest.Config) (func(*genericapi
return
}
reqInfo, ok := request.RequestInfoFrom(r.Context())
if !ok {
plog.Warning("aggregated API server logic did not set request info but it is always supposed to do so",
"url", r.URL.String(),
"method", r.Method,
)
newInternalErrResponse(w, r, c.Serializer, "invalid request info")
return
}
// when we are running regular requests (e.g., CRUD) we should always be able to use HTTP/2.0
// since KAS always supports that and it goes through proxies just fine. for long running
// requests (e.g., proxy, watch), we know they use http/1.1 with an upgrade to
// websockets/SPDY (this upgrade is NEVER to HTTP/2.0 as the KAS does not support that).
// KAS only supports upgrades via http/1.1 to websockets/SPDY (upgrades never use http/2.0)
// Thus we default to using http/2.0 when the request is not an upgrade, otherwise we use http/1.1
baseRT := http2RoundTripper
if c.LongRunningFunc(r, reqInfo) {
isUpgradeRequest := httpstream.IsUpgradeRequest(r)
if isUpgradeRequest {
baseRT = http1RoundTripper
}
@ -282,19 +289,31 @@ func newImpersonationReverseProxyFunc(restConfig *rest.Config) (func(*genericapi
plog.WarningErr("rejecting request as we cannot act as the current user", err,
"url", r.URL.String(),
"method", r.Method,
"isUpgradeRequest", isUpgradeRequest,
)
newInternalErrResponse(w, r, c.Serializer, "unimplemented functionality - unable to act as current user")
return
}
plog.Debug("impersonation proxy servicing request", "method", r.Method, "url", r.URL.String())
plog.Trace("impersonation proxy servicing request was for user", "method", r.Method, "url", r.URL.String(),
plog.Debug("impersonation proxy servicing request",
"url", r.URL.String(),
"method", r.Method,
"isUpgradeRequest", isUpgradeRequest,
)
plog.Trace("impersonation proxy servicing request was for user",
"url", r.URL.String(),
"method", r.Method,
"isUpgradeRequest", isUpgradeRequest,
"username", userInfo.GetName(), // this info leak seems fine for trace level logs
)
// The proxy library used below will panic when the client disconnects abruptly, so in order to
// assure that this log message is always printed at the end of this func, it must be deferred.
defer plog.Debug("impersonation proxy finished servicing request", "method", r.Method, "url", r.URL.String())
defer plog.Debug("impersonation proxy finished servicing request",
"url", r.URL.String(),
"method", r.Method,
"isUpgradeRequest", isUpgradeRequest,
)
reverseProxy := httputil.NewSingleHostReverseProxy(serverURL)
reverseProxy.Transport = rt

View File

@ -19,6 +19,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/serializer"
"k8s.io/apimachinery/pkg/util/httpstream"
"k8s.io/apiserver/pkg/authentication/user"
"k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/apiserver/pkg/features"
@ -71,6 +72,7 @@ func TestImpersonator(t *testing.T) {
clientCert *clientCert
clientImpersonateUser rest.ImpersonationConfig
clientMutateHeaders func(http.Header)
clientNextProtos []string
kubeAPIServerClientBearerTokenFile string
kubeAPIServerStatusCode int
wantKubeAPIServerRequestHeaders http.Header
@ -91,6 +93,31 @@ func TestImpersonator(t *testing.T) {
"X-Forwarded-For": {"127.0.0.1"},
},
},
{
name: "happy path with upgrade",
clientCert: newClientCert(t, ca, "test-username2", []string{"test-group3", "test-group4"}),
kubeAPIServerClientBearerTokenFile: "required-to-be-set",
clientMutateHeaders: func(header http.Header) {
header.Add("Connection", "Upgrade")
header.Add("Upgrade", "spdy/3.1")
if ok := httpstream.IsUpgradeRequest(&http.Request{Header: header}); !ok {
panic("request must be upgrade in this test")
}
},
clientNextProtos: []string{"http/1.1"}, // we need to use http1 as http2 does not support upgrades, see http2checkConnHeaders
wantKubeAPIServerRequestHeaders: http.Header{
"Impersonate-User": {"test-username2"},
"Impersonate-Group": {"test-group3", "test-group4", "system:authenticated"},
"Authorization": {"Bearer some-service-account-token"},
"User-Agent": {"test-agent"},
"Accept": {"application/vnd.kubernetes.protobuf,application/json"},
"Accept-Encoding": {"gzip"},
"X-Forwarded-For": {"127.0.0.1"},
"Connection": {"Upgrade"},
"Upgrade": {"spdy/3.1"},
},
},
{
name: "user is authenticated but the kube API request returns an error",
kubeAPIServerStatusCode: http.StatusNotFound,
@ -255,9 +282,10 @@ func TestImpersonator(t *testing.T) {
clientKubeconfig := &rest.Config{
Host: "https://127.0.0.1:" + strconv.Itoa(port),
TLSClientConfig: rest.TLSClientConfig{
CAData: ca.Bundle(),
CertData: tt.clientCert.certPEM,
KeyData: tt.clientCert.keyPEM,
CAData: ca.Bundle(),
CertData: tt.clientCert.certPEM,
KeyData: tt.clientCert.keyPEM,
NextProtos: tt.clientNextProtos,
},
UserAgent: "test-agent",
// BearerToken should be ignored during auth when there are valid client certs,
@ -514,16 +542,31 @@ func TestImpersonatorHTTPHandler(t *testing.T) {
metav1.AddToGroupVersion(scheme, metav1.Unversioned)
codecs := serializer.NewCodecFactory(scheme)
serverConfig := genericapiserver.NewRecommendedConfig(codecs)
serverConfig.Config.LongRunningFunc = func(_ *http.Request, _ *request.RequestInfo) bool {
// take the HTTP/2.0 vs HTTP/1.1 branch randomly to make sure we exercise both branches
return rand.Int()%2 == 0 //nolint:gosec // we don't care whether this is cryptographically secure or not
}
w := httptest.NewRecorder()
requestBeforeServe := tt.request.Clone(tt.request.Context())
impersonatorHTTPHandlerFunc(&serverConfig.Config).ServeHTTP(w, tt.request)
require.Equal(t, requestBeforeServe, tt.request, "ServeHTTP() mutated the request, and it should not per http.Handler docs")
r := tt.request
wantKubeAPIServerRequestHeaders := tt.wantKubeAPIServerRequestHeaders
// take the isUpgradeRequest branch randomly to make sure we exercise both branches
forceUpgradeRequest := rand.Int()%2 == 0 //nolint:gosec // we do not care if this is cryptographically secure
if forceUpgradeRequest && len(r.Header.Get("Upgrade")) == 0 {
r = r.Clone(r.Context())
r.Header.Add("Connection", "Upgrade")
r.Header.Add("Upgrade", "spdy/3.1")
wantKubeAPIServerRequestHeaders = wantKubeAPIServerRequestHeaders.Clone()
if wantKubeAPIServerRequestHeaders == nil {
wantKubeAPIServerRequestHeaders = http.Header{}
}
wantKubeAPIServerRequestHeaders.Add("Connection", "Upgrade")
wantKubeAPIServerRequestHeaders.Add("Upgrade", "spdy/3.1")
}
requestBeforeServe := r.Clone(r.Context())
impersonatorHTTPHandlerFunc(&serverConfig.Config).ServeHTTP(w, r)
require.Equal(t, requestBeforeServe, r, "ServeHTTP() mutated the request, and it should not per http.Handler docs")
if tt.wantHTTPStatus != 0 {
require.Equalf(t, tt.wantHTTPStatus, w.Code, "fyi, response body was %q", w.Body.String())
}
@ -533,7 +576,7 @@ func TestImpersonatorHTTPHandler(t *testing.T) {
if tt.wantHTTPStatus == http.StatusOK || tt.kubeAPIServerStatusCode != http.StatusOK {
require.True(t, testKubeAPIServerWasCalled, "Should have proxied the request to the Kube API server, but didn't")
require.Equal(t, tt.wantKubeAPIServerRequestHeaders, testKubeAPIServerSawHeaders)
require.Equal(t, wantKubeAPIServerRequestHeaders, testKubeAPIServerSawHeaders)
} else {
require.False(t, testKubeAPIServerWasCalled, "Should not have proxied the request to the Kube API server, but did")
}