e2e_test: handle hung go routines and readers

Signed-off-by: Monis Khan <mok@vmware.com>
(cherry picked from commit 8ee461ae8a)
This commit is contained in:
Monis Khan 2022-02-08 10:27:26 -05:00 committed by Margo Crawford
parent 7c87d7447c
commit 96d4d3ec7c
2 changed files with 79 additions and 36 deletions

View File

@ -7,7 +7,6 @@ import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"io"
"io/ioutil"
@ -286,25 +285,14 @@ func runPinnipedLoginOIDC(
t.Logf("starting CLI subprocess")
require.NoError(t, cmd.Start())
t.Cleanup(func() {
err := cmd.Wait()
err := cmd.Wait() // handles closing of file descriptors
t.Logf("CLI subprocess exited with code %d", cmd.ProcessState.ExitCode())
require.NoErrorf(t, err, "CLI process did not exit cleanly")
})
// Start a background goroutine to read stderr from the CLI and parse out the login URL.
loginURLChan := make(chan string, 1)
spawnTestGoroutine(t, func() (err error) {
t.Helper()
defer func() {
closeErr := stderr.Close()
if closeErr == nil || errors.Is(closeErr, os.ErrClosed) {
return
}
if err == nil {
err = fmt.Errorf("stderr stream closed with error: %w", closeErr)
}
}()
spawnTestGoroutine(ctx, t, func() error {
reader := bufio.NewReader(testlib.NewLoggerReader(t, "stderr", stderr))
scanner := bufio.NewScanner(reader)
@ -321,16 +309,7 @@ func runPinnipedLoginOIDC(
// Start a background goroutine to read stdout from the CLI and parse out an ExecCredential.
credOutputChan := make(chan clientauthenticationv1beta1.ExecCredential, 1)
spawnTestGoroutine(t, func() (err error) {
defer func() {
closeErr := stdout.Close()
if closeErr == nil || errors.Is(closeErr, os.ErrClosed) {
return
}
if err == nil {
err = fmt.Errorf("stdout stream closed with error: %w", closeErr)
}
}()
spawnTestGoroutine(ctx, t, func() error {
reader := bufio.NewReader(testlib.NewLoggerReader(t, "stdout", stdout))
var out clientauthenticationv1beta1.ExecCredential
if err := json.NewDecoder(reader).Decode(&out); err != nil {
@ -391,12 +370,33 @@ func readAndExpectEmpty(r io.Reader) (err error) {
return nil
}
// Note: Callers should ensure that f eventually returns, otherwise this helper will hang forever in t.Cleanup.
func spawnTestGoroutine(t *testing.T, f func() error) {
// Note: Callers should ensure that f eventually returns, otherwise this helper will leak a go routine.
func spawnTestGoroutine(ctx context.Context, t *testing.T, f func() error) {
t.Helper()
var eg errgroup.Group
t.Cleanup(func() {
require.NoError(t, eg.Wait(), "background goroutine failed")
egCh := make(chan error, 1) // do not block the go routine from exiting even after the select has completed
go func() {
egCh <- eg.Wait()
}()
leewayCh := make(chan struct{})
go func() {
<-ctx.Done()
// give f up to 30 seconds after the context is canceled to return
// this prevents "race" conditions where f is orchestrated via the same context
time.Sleep(30 * time.Second)
close(leewayCh)
}()
select {
case <-leewayCh:
t.Errorf("background goroutine hung: %v", ctx.Err())
case err := <-egCh:
require.NoError(t, err, "background goroutine failed")
}
})
eg.Go(f)
}

View File

@ -19,6 +19,7 @@ import (
"regexp"
"sort"
"strings"
"sync/atomic"
"testing"
"time"
@ -51,7 +52,7 @@ import (
func TestE2EFullIntegration(t *testing.T) { // nolint:gocyclo
env := testlib.IntegrationEnv(t)
ctx, cancelFunc := context.WithTimeout(context.Background(), 5*time.Minute)
ctx, cancelFunc := context.WithTimeout(context.Background(), 10*time.Minute)
defer cancelFunc()
// Build pinniped CLI.
@ -108,6 +109,9 @@ func TestE2EFullIntegration(t *testing.T) { // nolint:gocyclo
// Add an OIDC upstream IDP and try using it to authenticate during kubectl commands.
t.Run("with Supervisor OIDC upstream IDP and automatic flow", func(t *testing.T) {
testCtx, cancel := context.WithTimeout(ctx, 2*time.Minute)
t.Cleanup(cancel)
// Start a fresh browser driver because we don't want to share cookies between the various tests in this file.
page := browsertest.Open(t)
@ -159,7 +163,7 @@ func TestE2EFullIntegration(t *testing.T) { // nolint:gocyclo
// Run "kubectl get namespaces" which should trigger a browser login via the plugin.
start := time.Now()
kubectlCmd := exec.CommandContext(ctx, "kubectl", "get", "namespace", "--kubeconfig", kubeconfigPath)
kubectlCmd := exec.CommandContext(testCtx, "kubectl", "get", "namespace", "--kubeconfig", kubeconfigPath)
kubectlCmd.Env = append(os.Environ(), env.ProxyEnv()...)
// Wrap the stdout and stderr pipes with TeeReaders which will copy each incremental read to an
@ -176,8 +180,8 @@ func TestE2EFullIntegration(t *testing.T) { // nolint:gocyclo
require.NoError(t, kubectlCmd.Start())
t.Cleanup(func() {
// Consume readers so that the tee buffers will contain all the output so far.
_, stdoutReadAllErr := ioutil.ReadAll(stdoutPipe)
_, stderrReadAllErr := ioutil.ReadAll(stderrPipe)
_, stdoutReadAllErr := readAllCtx(testCtx, stdoutPipe)
_, stderrReadAllErr := readAllCtx(testCtx, stderrPipe)
// Note that Wait closes the stdout/stderr pipes, so we don't need to close them ourselves.
waitErr := kubectlCmd.Wait()
@ -195,7 +199,7 @@ func TestE2EFullIntegration(t *testing.T) { // nolint:gocyclo
// Start a background goroutine to read stderr from the CLI and parse out the login URL.
loginURLChan := make(chan string, 1)
spawnTestGoroutine(t, func() error {
spawnTestGoroutine(testCtx, t, func() error {
reader := bufio.NewReader(testlib.NewLoggerReader(t, "stderr", stderrPipe))
scanner := bufio.NewScanner(reader)
for scanner.Scan() {
@ -210,8 +214,8 @@ func TestE2EFullIntegration(t *testing.T) { // nolint:gocyclo
// Start a background goroutine to read stdout from kubectl and return the result as a string.
kubectlOutputChan := make(chan string, 1)
spawnTestGoroutine(t, func() error {
output, err := ioutil.ReadAll(stdoutPipe)
spawnTestGoroutine(testCtx, t, func() error {
output, err := readAllCtx(testCtx, stdoutPipe)
if err != nil {
return err
}
@ -228,7 +232,7 @@ func TestE2EFullIntegration(t *testing.T) { // nolint:gocyclo
require.Fail(t, "timed out waiting for login URL")
case loginURL = <-loginURLChan:
}
t.Logf("navigating to login page")
t.Logf("navigating to login page: %q", loginURL)
require.NoError(t, page.Navigate(loginURL))
// Expect to be redirected to the upstream provider and log in.
@ -254,7 +258,7 @@ func TestE2EFullIntegration(t *testing.T) { // nolint:gocyclo
t.Logf("first kubectl command took %s", time.Since(start).String())
requireUserCanUseKubectlWithoutAuthenticatingAgain(ctx, t, env,
requireUserCanUseKubectlWithoutAuthenticatingAgain(testCtx, t, env,
downstream,
kubeconfigPath,
sessionCachePath,
@ -1049,3 +1053,42 @@ func getSecretNameFromSignature(t *testing.T, signature string, typeLabel string
signatureAsValidName := strings.ToLower(b32.EncodeToString(signatureBytes))
return fmt.Sprintf("pinniped-storage-%s-%s", typeLabel, signatureAsValidName)
}
func readAllCtx(ctx context.Context, r io.Reader) ([]byte, error) {
errCh := make(chan error, 1)
data := &atomic.Value{}
go func() { // copied from io.ReadAll and modified to use the atomic.Value above
b := make([]byte, 0, 512)
data.Store(string(b)) // cast to string to make a copy of the byte slice
for {
if len(b) == cap(b) {
// Add more capacity (let append pick how much).
b = append(b, 0)[:len(b)]
data.Store(string(b)) // cast to string to make a copy of the byte slice
}
n, err := r.Read(b[len(b):cap(b)])
b = b[:len(b)+n]
data.Store(string(b)) // cast to string to make a copy of the byte slice
if err != nil {
if err == io.EOF {
err = nil
}
errCh <- err
return
}
}
}()
select {
case <-ctx.Done():
b, _ := data.Load().(string)
return nil, fmt.Errorf("failed to complete read all: %w, data read so far:\n%q", ctx.Err(), b)
case err := <-errCh:
b, _ := data.Load().(string)
if len(b) == 0 {
return nil, err
}
return []byte(b), err
}
}