c6c2c525a6
Also fix some tests that were broken by bumping golang and dependencies in the previous commits. Note that in addition to changes made to satisfy the linter which do not impact the behavior of the code, this commit also adds ReadHeaderTimeout to all usages of http.Server to satisfy the linter (and because it seemed like a good suggestion).
163 lines
4.9 KiB
Go
163 lines
4.9 KiB
Go
// Copyright 2021-2022 the Pinniped contributors. All Rights Reserved.
|
|
// SPDX-License-Identifier: Apache-2.0
|
|
|
|
package kubeclient
|
|
|
|
import (
|
|
stderrors "errors"
|
|
"fmt"
|
|
"io"
|
|
"net/http"
|
|
|
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
|
"k8s.io/apimachinery/pkg/runtime"
|
|
"k8s.io/apimachinery/pkg/runtime/serializer/streaming"
|
|
"k8s.io/apimachinery/pkg/util/net"
|
|
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
|
"k8s.io/apimachinery/pkg/watch"
|
|
restclient "k8s.io/client-go/rest"
|
|
restclientwatch "k8s.io/client-go/rest/watch"
|
|
|
|
"go.pinniped.dev/internal/plog"
|
|
)
|
|
|
|
func handleWatchResponseNewGVK(
|
|
config *restclient.Config,
|
|
negotiatedSerializer runtime.NegotiatedSerializer,
|
|
resp *http.Response,
|
|
middlewareReq *request,
|
|
result *mutationResult,
|
|
) (*http.Response, error) {
|
|
// defer non-success cases to client-go
|
|
if resp.StatusCode != http.StatusOK {
|
|
return resp, nil
|
|
}
|
|
|
|
var goRoutineStarted bool
|
|
defer func() {
|
|
if goRoutineStarted {
|
|
return
|
|
}
|
|
// always drain and close the body if we do not get to the point of starting our go routine
|
|
drainAndMaybeCloseBody(resp, true)
|
|
}()
|
|
|
|
serializerInfo, err := getSerializerInfo(config, negotiatedSerializer, resp, middlewareReq)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
newResp := &http.Response{}
|
|
*newResp = *resp
|
|
|
|
newBodyReader, newBodyWriter := io.Pipe()
|
|
|
|
newResp.Body = newBodyReader // client-go is responsible for closing this reader
|
|
|
|
goRoutineStarted = true
|
|
go func() {
|
|
var sourceDecoder watch.Decoder
|
|
defer utilruntime.HandleCrash()
|
|
defer func() {
|
|
// the sourceDecoder will close the resp body. we want to make sure the drain the body before
|
|
// we do that
|
|
drainAndMaybeCloseBody(resp, false)
|
|
if sourceDecoder != nil {
|
|
sourceDecoder.Close()
|
|
}
|
|
}()
|
|
defer newBodyWriter.Close()
|
|
|
|
frameReader := serializerInfo.StreamSerializer.Framer.NewFrameReader(resp.Body)
|
|
watchEventDecoder := streaming.NewDecoder(frameReader, serializerInfo.StreamSerializer.Serializer)
|
|
sourceDecoder = restclientwatch.NewDecoder(watchEventDecoder, &passthroughDecoder{})
|
|
defer sourceDecoder.Close()
|
|
|
|
frameWriter := serializerInfo.StreamSerializer.Framer.NewFrameWriter(newBodyWriter)
|
|
watchEventEncoder := streaming.NewEncoder(frameWriter, serializerInfo.StreamSerializer.Serializer)
|
|
|
|
for {
|
|
ok, err := sendWatchEvent(sourceDecoder, serializerInfo.Serializer, middlewareReq, result, watchEventEncoder)
|
|
if err != nil {
|
|
if stderrors.Is(err, io.ErrClosedPipe) {
|
|
return // calling newBodyReader.Close() will send this to all newBodyWriter.Write()
|
|
}
|
|
|
|
// CloseWithError always returns nil
|
|
// all newBodyReader.Read() will get this error
|
|
_ = newBodyWriter.CloseWithError(err)
|
|
|
|
return
|
|
}
|
|
|
|
if !ok {
|
|
return
|
|
}
|
|
}
|
|
}()
|
|
|
|
return newResp, nil
|
|
}
|
|
|
|
func sendWatchEvent(sourceDecoder watch.Decoder, s runtime.Serializer, middlewareReq *request, result *mutationResult, watchEventEncoder streaming.Encoder) (bool, error) {
|
|
// partially copied from watch.NewStreamWatcher.receive
|
|
eventType, obj, err := sourceDecoder.Decode()
|
|
if err != nil {
|
|
switch {
|
|
case stderrors.Is(err, io.EOF):
|
|
// watch closed normally
|
|
case stderrors.Is(err, io.ErrUnexpectedEOF):
|
|
plog.InfoErr("Unexpected EOF during watch stream event decoding", err)
|
|
case net.IsProbableEOF(err), net.IsTimeout(err):
|
|
plog.TraceErr("Unable to decode an event from the watch stream", err)
|
|
default:
|
|
return false, fmt.Errorf("unexpected watch decode error for %#v: %w", middlewareReq, err)
|
|
}
|
|
return false, nil // all errors end watch
|
|
}
|
|
|
|
unknown, ok := obj.(*runtime.Unknown)
|
|
if !ok || len(unknown.Raw) == 0 {
|
|
return false, fmt.Errorf("unexpected decode type: %T", obj)
|
|
}
|
|
|
|
respData := unknown.Raw
|
|
fixedRespData, err := maybeRestoreGVK(s, respData, result)
|
|
if err != nil {
|
|
return false, fmt.Errorf("unable to restore GVK for %#v: %w", middlewareReq, err)
|
|
}
|
|
|
|
fixedRespData, err = maybeMutateResponse(s, fixedRespData, middlewareReq, result)
|
|
if err != nil {
|
|
return false, fmt.Errorf("unable to mutate response for %#v: %w", middlewareReq, err)
|
|
}
|
|
|
|
event := &metav1.WatchEvent{
|
|
Type: string(eventType),
|
|
Object: runtime.RawExtension{Raw: fixedRespData},
|
|
}
|
|
|
|
if err := watchEventEncoder.Encode(event); err != nil {
|
|
return false, fmt.Errorf("failed to encode watch event for %#v: %w", middlewareReq, err)
|
|
}
|
|
|
|
return true, nil
|
|
}
|
|
|
|
// drainAndMaybeCloseBody attempts to drain and optionallt close the provided body.
|
|
//
|
|
// We want to drain used HTTP response bodies so that the underlying TCP connection can be
|
|
// reused. However, if the underlying response body is extremely large or a never-ending stream,
|
|
// then we don't want to wait for the read to finish. In these cases, we give up on the TCP
|
|
// connection and just close the body.
|
|
func drainAndMaybeCloseBody(resp *http.Response, close bool) {
|
|
// from k8s.io/client-go/rest/request.go...
|
|
const maxBodySlurpSize = 2 << 10
|
|
if resp.ContentLength <= maxBodySlurpSize {
|
|
_, _ = io.Copy(io.Discard, &io.LimitedReader{R: resp.Body, N: maxBodySlurpSize})
|
|
}
|
|
if close {
|
|
resp.Body.Close()
|
|
}
|
|
}
|