From 8ee461ae8a4ee2b0db0690a7afcc4927aef98e71 Mon Sep 17 00:00:00 2001 From: Monis Khan Date: Tue, 8 Feb 2022 10:27:26 -0500 Subject: [PATCH] e2e_test: handle hung go routines and readers Signed-off-by: Monis Khan --- test/integration/cli_test.go | 54 +++++++++++++++---------------- test/integration/e2e_test.go | 61 ++++++++++++++++++++++++++++++------ 2 files changed, 79 insertions(+), 36 deletions(-) diff --git a/test/integration/cli_test.go b/test/integration/cli_test.go index 3c16d11f..263c9154 100644 --- a/test/integration/cli_test.go +++ b/test/integration/cli_test.go @@ -7,7 +7,6 @@ import ( "bytes" "context" "encoding/json" - "errors" "fmt" "io" "io/ioutil" @@ -293,25 +292,14 @@ func runPinnipedLoginOIDC( t.Logf("starting CLI subprocess") require.NoError(t, cmd.Start()) t.Cleanup(func() { - err := cmd.Wait() + err := cmd.Wait() // handles closing of file descriptors t.Logf("CLI subprocess exited with code %d", cmd.ProcessState.ExitCode()) require.NoErrorf(t, err, "CLI process did not exit cleanly") }) // Start a background goroutine to read stderr from the CLI and parse out the login URL. loginURLChan := make(chan string, 1) - spawnTestGoroutine(t, func() (err error) { - t.Helper() - defer func() { - closeErr := stderr.Close() - if closeErr == nil || errors.Is(closeErr, os.ErrClosed) { - return - } - if err == nil { - err = fmt.Errorf("stderr stream closed with error: %w", closeErr) - } - }() - + spawnTestGoroutine(ctx, t, func() error { reader := bufio.NewReader(testlib.NewLoggerReader(t, "stderr", stderr)) scanner := bufio.NewScanner(reader) @@ -328,16 +316,7 @@ func runPinnipedLoginOIDC( // Start a background goroutine to read stdout from the CLI and parse out an ExecCredential. credOutputChan := make(chan clientauthenticationv1beta1.ExecCredential, 1) - spawnTestGoroutine(t, func() (err error) { - defer func() { - closeErr := stdout.Close() - if closeErr == nil || errors.Is(closeErr, os.ErrClosed) { - return - } - if err == nil { - err = fmt.Errorf("stdout stream closed with error: %w", closeErr) - } - }() + spawnTestGoroutine(ctx, t, func() error { reader := bufio.NewReader(testlib.NewLoggerReader(t, "stdout", stdout)) var out clientauthenticationv1beta1.ExecCredential if err := json.NewDecoder(reader).Decode(&out); err != nil { @@ -398,12 +377,33 @@ func readAndExpectEmpty(r io.Reader) (err error) { return nil } -// Note: Callers should ensure that f eventually returns, otherwise this helper will hang forever in t.Cleanup. -func spawnTestGoroutine(t *testing.T, f func() error) { +// Note: Callers should ensure that f eventually returns, otherwise this helper will leak a go routine. +func spawnTestGoroutine(ctx context.Context, t *testing.T, f func() error) { t.Helper() + var eg errgroup.Group t.Cleanup(func() { - require.NoError(t, eg.Wait(), "background goroutine failed") + egCh := make(chan error, 1) // do not block the go routine from exiting even after the select has completed + go func() { + egCh <- eg.Wait() + }() + + leewayCh := make(chan struct{}) + go func() { + <-ctx.Done() + // give f up to 30 seconds after the context is canceled to return + // this prevents "race" conditions where f is orchestrated via the same context + time.Sleep(30 * time.Second) + close(leewayCh) + }() + + select { + case <-leewayCh: + t.Errorf("background goroutine hung: %v", ctx.Err()) + + case err := <-egCh: + require.NoError(t, err, "background goroutine failed") + } }) eg.Go(f) } diff --git a/test/integration/e2e_test.go b/test/integration/e2e_test.go index 6129f7af..b4b5f494 100644 --- a/test/integration/e2e_test.go +++ b/test/integration/e2e_test.go @@ -19,6 +19,7 @@ import ( "regexp" "sort" "strings" + "sync/atomic" "testing" "time" @@ -50,7 +51,7 @@ import ( func TestE2EFullIntegration(t *testing.T) { env := testlib.IntegrationEnv(t) - ctx, cancelFunc := context.WithTimeout(context.Background(), 5*time.Minute) + ctx, cancelFunc := context.WithTimeout(context.Background(), 10*time.Minute) defer cancelFunc() // Build pinniped CLI. @@ -107,6 +108,9 @@ func TestE2EFullIntegration(t *testing.T) { // Add an OIDC upstream IDP and try using it to authenticate during kubectl commands. t.Run("with Supervisor OIDC upstream IDP and automatic flow", func(t *testing.T) { + testCtx, cancel := context.WithTimeout(ctx, 2*time.Minute) + t.Cleanup(cancel) + // Start a fresh browser driver because we don't want to share cookies between the various tests in this file. page := browsertest.Open(t) @@ -158,7 +162,7 @@ func TestE2EFullIntegration(t *testing.T) { // Run "kubectl get namespaces" which should trigger a browser login via the plugin. start := time.Now() - kubectlCmd := exec.CommandContext(ctx, "kubectl", "get", "namespace", "--kubeconfig", kubeconfigPath) + kubectlCmd := exec.CommandContext(testCtx, "kubectl", "get", "namespace", "--kubeconfig", kubeconfigPath) kubectlCmd.Env = append(os.Environ(), env.ProxyEnv()...) // Wrap the stdout and stderr pipes with TeeReaders which will copy each incremental read to an @@ -175,8 +179,8 @@ func TestE2EFullIntegration(t *testing.T) { require.NoError(t, kubectlCmd.Start()) t.Cleanup(func() { // Consume readers so that the tee buffers will contain all the output so far. - _, stdoutReadAllErr := ioutil.ReadAll(stdoutPipe) - _, stderrReadAllErr := ioutil.ReadAll(stderrPipe) + _, stdoutReadAllErr := readAllCtx(testCtx, stdoutPipe) + _, stderrReadAllErr := readAllCtx(testCtx, stderrPipe) // Note that Wait closes the stdout/stderr pipes, so we don't need to close them ourselves. waitErr := kubectlCmd.Wait() @@ -194,7 +198,7 @@ func TestE2EFullIntegration(t *testing.T) { // Start a background goroutine to read stderr from the CLI and parse out the login URL. loginURLChan := make(chan string, 1) - spawnTestGoroutine(t, func() error { + spawnTestGoroutine(testCtx, t, func() error { reader := bufio.NewReader(testlib.NewLoggerReader(t, "stderr", stderrPipe)) scanner := bufio.NewScanner(reader) for scanner.Scan() { @@ -209,8 +213,8 @@ func TestE2EFullIntegration(t *testing.T) { // Start a background goroutine to read stdout from kubectl and return the result as a string. kubectlOutputChan := make(chan string, 1) - spawnTestGoroutine(t, func() error { - output, err := ioutil.ReadAll(stdoutPipe) + spawnTestGoroutine(testCtx, t, func() error { + output, err := readAllCtx(testCtx, stdoutPipe) if err != nil { return err } @@ -227,7 +231,7 @@ func TestE2EFullIntegration(t *testing.T) { require.Fail(t, "timed out waiting for login URL") case loginURL = <-loginURLChan: } - t.Logf("navigating to login page") + t.Logf("navigating to login page: %q", loginURL) require.NoError(t, page.Navigate(loginURL)) // Expect to be redirected to the upstream provider and log in. @@ -253,7 +257,7 @@ func TestE2EFullIntegration(t *testing.T) { t.Logf("first kubectl command took %s", time.Since(start).String()) - requireUserCanUseKubectlWithoutAuthenticatingAgain(ctx, t, env, + requireUserCanUseKubectlWithoutAuthenticatingAgain(testCtx, t, env, downstream, kubeconfigPath, sessionCachePath, @@ -1170,3 +1174,42 @@ func getSecretNameFromSignature(t *testing.T, signature string, typeLabel string signatureAsValidName := strings.ToLower(b32.EncodeToString(signatureBytes)) return fmt.Sprintf("pinniped-storage-%s-%s", typeLabel, signatureAsValidName) } + +func readAllCtx(ctx context.Context, r io.Reader) ([]byte, error) { + errCh := make(chan error, 1) + data := &atomic.Value{} + go func() { // copied from io.ReadAll and modified to use the atomic.Value above + b := make([]byte, 0, 512) + data.Store(string(b)) // cast to string to make a copy of the byte slice + for { + if len(b) == cap(b) { + // Add more capacity (let append pick how much). + b = append(b, 0)[:len(b)] + data.Store(string(b)) // cast to string to make a copy of the byte slice + } + n, err := r.Read(b[len(b):cap(b)]) + b = b[:len(b)+n] + data.Store(string(b)) // cast to string to make a copy of the byte slice + if err != nil { + if err == io.EOF { + err = nil + } + errCh <- err + return + } + } + }() + + select { + case <-ctx.Done(): + b, _ := data.Load().(string) + return nil, fmt.Errorf("failed to complete read all: %w, data read so far:\n%q", ctx.Err(), b) + + case err := <-errCh: + b, _ := data.Load().(string) + if len(b) == 0 { + return nil, err + } + return []byte(b), err + } +}