ContainerImage.Pinniped/internal/kubeclient/watch.go
Monis Khan efe1fa89fe Allow multiple Pinnipeds to work on same cluster
Yes, this is a huge commit.

The middleware allows you to customize the API groups of all of the
*.pinniped.dev API groups.

Some notes about other small things in this commit:
- We removed the internal/client package in favor of pkg/conciergeclient. The
  two packages do basically the same thing. I don't think we use the former
  anymore.
- We re-enabled cluster-scoped owner assertions in the integration tests.
  This code was added in internal/ownerref. See a0546942 for when this
  assertion was removed.
- Note: the middlware code is in charge of restoring the GV of a request object,
  so we should never need to write mutations that do that.
- We updated the supervisor secret generation to no longer manually set an owner
  reference to the deployment since the middleware code now does this. I think we
  still need some way to make an initial event for the secret generator
  controller, which involves knowing the namespace and the name of the generated
  secret, so I still wired the deployment through. We could use a namespace/name
  tuple here, but I was lazy.

Signed-off-by: Andrew Keesler <akeesler@vmware.com>
Co-authored-by: Ryan Richard <richardry@vmware.com>
2021-02-02 15:18:41 -08:00

164 lines
4.9 KiB
Go

// Copyright 2021 the Pinniped contributors. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
package kubeclient
import (
stderrors "errors"
"fmt"
"io"
"io/ioutil"
"net/http"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/serializer/streaming"
"k8s.io/apimachinery/pkg/util/net"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/watch"
restclient "k8s.io/client-go/rest"
restclientwatch "k8s.io/client-go/rest/watch"
"go.pinniped.dev/internal/plog"
)
func handleWatchResponseNewGVK(
config *restclient.Config,
negotiatedSerializer runtime.NegotiatedSerializer,
resp *http.Response,
middlewareReq *request,
result *mutationResult,
) (*http.Response, error) {
// defer non-success cases to client-go
if resp.StatusCode != http.StatusOK {
return resp, nil
}
var goRoutineStarted bool
defer func() {
if goRoutineStarted {
return
}
// always drain and close the body if we do not get to the point of starting our go routine
drainAndMaybeCloseBody(resp, true)
}()
serializerInfo, err := getSerializerInfo(config, negotiatedSerializer, resp, middlewareReq)
if err != nil {
return nil, err
}
newResp := &http.Response{}
*newResp = *resp
newBodyReader, newBodyWriter := io.Pipe()
newResp.Body = newBodyReader // client-go is responsible for closing this reader
goRoutineStarted = true
go func() {
var sourceDecoder watch.Decoder
defer utilruntime.HandleCrash()
defer func() {
// the sourceDecoder will close the resp body. we want to make sure the drain the body before
// we do that
drainAndMaybeCloseBody(resp, false)
if sourceDecoder != nil {
sourceDecoder.Close()
}
}()
defer newBodyWriter.Close()
frameReader := serializerInfo.StreamSerializer.Framer.NewFrameReader(resp.Body)
watchEventDecoder := streaming.NewDecoder(frameReader, serializerInfo.StreamSerializer.Serializer)
sourceDecoder = restclientwatch.NewDecoder(watchEventDecoder, &passthroughDecoder{})
defer sourceDecoder.Close()
frameWriter := serializerInfo.StreamSerializer.Framer.NewFrameWriter(newBodyWriter)
watchEventEncoder := streaming.NewEncoder(frameWriter, serializerInfo.StreamSerializer.Serializer)
for {
ok, err := sendWatchEvent(sourceDecoder, serializerInfo.Serializer, middlewareReq, result, watchEventEncoder)
if err != nil {
if stderrors.Is(err, io.ErrClosedPipe) {
return // calling newBodyReader.Close() will send this to all newBodyWriter.Write()
}
// CloseWithError always returns nil
// all newBodyReader.Read() will get this error
_ = newBodyWriter.CloseWithError(err)
return
}
if !ok {
return
}
}
}()
return newResp, nil
}
func sendWatchEvent(sourceDecoder watch.Decoder, s runtime.Serializer, middlewareReq *request, result *mutationResult, watchEventEncoder streaming.Encoder) (bool, error) {
// partially copied from watch.NewStreamWatcher.receive
eventType, obj, err := sourceDecoder.Decode()
if err != nil {
switch {
case stderrors.Is(err, io.EOF):
// watch closed normally
case stderrors.Is(err, io.ErrUnexpectedEOF):
plog.InfoErr("Unexpected EOF during watch stream event decoding", err)
case net.IsProbableEOF(err), net.IsTimeout(err):
plog.TraceErr("Unable to decode an event from the watch stream", err)
default:
return false, fmt.Errorf("unexpected watch decode error for %#v: %w", middlewareReq, err)
}
return false, nil // all errors end watch
}
unknown, ok := obj.(*runtime.Unknown)
if !ok || len(unknown.Raw) == 0 {
return false, fmt.Errorf("unexpected decode type: %T", obj)
}
respData := unknown.Raw
fixedRespData, err := maybeRestoreGVK(s, respData, result)
if err != nil {
return false, fmt.Errorf("unable to restore GVK for %#v: %w", middlewareReq, err)
}
fixedRespData, err = maybeMutateResponse(s, fixedRespData, middlewareReq, result)
if err != nil {
return false, fmt.Errorf("unable to mutate response for %#v: %w", middlewareReq, err)
}
event := &metav1.WatchEvent{
Type: string(eventType),
Object: runtime.RawExtension{Raw: fixedRespData},
}
if err := watchEventEncoder.Encode(event); err != nil {
return false, fmt.Errorf("failed to encode watch event for %#v: %w", middlewareReq, err)
}
return true, nil
}
// drainAndMaybeCloseBody attempts to drain and optionallt close the provided body.
//
// We want to drain used HTTP response bodies so that the underlying TCP connection can be
// reused. However, if the underlying response body is extremely large or a never-ending stream,
// then we don't want to wait for the read to finish. In these cases, we give up on the TCP
// connection and just close the body.
func drainAndMaybeCloseBody(resp *http.Response, close bool) {
// from k8s.io/client-go/rest/request.go...
const maxBodySlurpSize = 2 << 10
if resp.ContentLength <= maxBodySlurpSize {
_, _ = io.Copy(ioutil.Discard, &io.LimitedReader{R: resp.Body, N: maxBodySlurpSize})
}
if close {
resp.Body.Close()
}
}