// Copyright 2021 the Pinniped contributors. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 package kubeclient import ( stderrors "errors" "fmt" "io" "io/ioutil" "net/http" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/serializer/streaming" "k8s.io/apimachinery/pkg/util/net" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/watch" restclient "k8s.io/client-go/rest" restclientwatch "k8s.io/client-go/rest/watch" "go.pinniped.dev/internal/plog" ) func handleWatchResponseNewGVK( config *restclient.Config, negotiatedSerializer runtime.NegotiatedSerializer, resp *http.Response, middlewareReq *request, result *mutationResult, ) (*http.Response, error) { // defer non-success cases to client-go if resp.StatusCode != http.StatusOK { return resp, nil } var goRoutineStarted bool defer func() { if goRoutineStarted { return } // always drain and close the body if we do not get to the point of starting our go routine drainAndMaybeCloseBody(resp, true) }() serializerInfo, err := getSerializerInfo(config, negotiatedSerializer, resp, middlewareReq) if err != nil { return nil, err } newResp := &http.Response{} *newResp = *resp newBodyReader, newBodyWriter := io.Pipe() newResp.Body = newBodyReader // client-go is responsible for closing this reader goRoutineStarted = true go func() { var sourceDecoder watch.Decoder defer utilruntime.HandleCrash() defer func() { // the sourceDecoder will close the resp body. we want to make sure the drain the body before // we do that drainAndMaybeCloseBody(resp, false) if sourceDecoder != nil { sourceDecoder.Close() } }() defer newBodyWriter.Close() frameReader := serializerInfo.StreamSerializer.Framer.NewFrameReader(resp.Body) watchEventDecoder := streaming.NewDecoder(frameReader, serializerInfo.StreamSerializer.Serializer) sourceDecoder = restclientwatch.NewDecoder(watchEventDecoder, &passthroughDecoder{}) defer sourceDecoder.Close() frameWriter := serializerInfo.StreamSerializer.Framer.NewFrameWriter(newBodyWriter) watchEventEncoder := streaming.NewEncoder(frameWriter, serializerInfo.StreamSerializer.Serializer) for { ok, err := sendWatchEvent(sourceDecoder, serializerInfo.Serializer, middlewareReq, result, watchEventEncoder) if err != nil { if stderrors.Is(err, io.ErrClosedPipe) { return // calling newBodyReader.Close() will send this to all newBodyWriter.Write() } // CloseWithError always returns nil // all newBodyReader.Read() will get this error _ = newBodyWriter.CloseWithError(err) return } if !ok { return } } }() return newResp, nil } func sendWatchEvent(sourceDecoder watch.Decoder, s runtime.Serializer, middlewareReq *request, result *mutationResult, watchEventEncoder streaming.Encoder) (bool, error) { // partially copied from watch.NewStreamWatcher.receive eventType, obj, err := sourceDecoder.Decode() if err != nil { switch { case stderrors.Is(err, io.EOF): // watch closed normally case stderrors.Is(err, io.ErrUnexpectedEOF): plog.InfoErr("Unexpected EOF during watch stream event decoding", err) case net.IsProbableEOF(err), net.IsTimeout(err): plog.TraceErr("Unable to decode an event from the watch stream", err) default: return false, fmt.Errorf("unexpected watch decode error for %#v: %w", middlewareReq, err) } return false, nil // all errors end watch } unknown, ok := obj.(*runtime.Unknown) if !ok || len(unknown.Raw) == 0 { return false, fmt.Errorf("unexpected decode type: %T", obj) } respData := unknown.Raw fixedRespData, err := maybeRestoreGVK(s, respData, result) if err != nil { return false, fmt.Errorf("unable to restore GVK for %#v: %w", middlewareReq, err) } fixedRespData, err = maybeMutateResponse(s, fixedRespData, middlewareReq, result) if err != nil { return false, fmt.Errorf("unable to mutate response for %#v: %w", middlewareReq, err) } event := &metav1.WatchEvent{ Type: string(eventType), Object: runtime.RawExtension{Raw: fixedRespData}, } if err := watchEventEncoder.Encode(event); err != nil { return false, fmt.Errorf("failed to encode watch event for %#v: %w", middlewareReq, err) } return true, nil } // drainAndMaybeCloseBody attempts to drain and optionallt close the provided body. // // We want to drain used HTTP response bodies so that the underlying TCP connection can be // reused. However, if the underlying response body is extremely large or a never-ending stream, // then we don't want to wait for the read to finish. In these cases, we give up on the TCP // connection and just close the body. func drainAndMaybeCloseBody(resp *http.Response, close bool) { // from k8s.io/client-go/rest/request.go... const maxBodySlurpSize = 2 << 10 if resp.ContentLength <= maxBodySlurpSize { _, _ = io.Copy(ioutil.Discard, &io.LimitedReader{R: resp.Body, N: maxBodySlurpSize}) } if close { resp.Body.Close() } }