e2e_test: handle hung go routines and readers

Signed-off-by: Monis Khan <mok@vmware.com>
This commit is contained in:
Monis Khan 2022-02-08 10:27:26 -05:00
parent 1388183bf1
commit 8ee461ae8a
No known key found for this signature in database
GPG Key ID: 52C90ADA01B269B8
2 changed files with 79 additions and 36 deletions

View File

@ -7,7 +7,6 @@ import (
"bytes" "bytes"
"context" "context"
"encoding/json" "encoding/json"
"errors"
"fmt" "fmt"
"io" "io"
"io/ioutil" "io/ioutil"
@ -293,25 +292,14 @@ func runPinnipedLoginOIDC(
t.Logf("starting CLI subprocess") t.Logf("starting CLI subprocess")
require.NoError(t, cmd.Start()) require.NoError(t, cmd.Start())
t.Cleanup(func() { t.Cleanup(func() {
err := cmd.Wait() err := cmd.Wait() // handles closing of file descriptors
t.Logf("CLI subprocess exited with code %d", cmd.ProcessState.ExitCode()) t.Logf("CLI subprocess exited with code %d", cmd.ProcessState.ExitCode())
require.NoErrorf(t, err, "CLI process did not exit cleanly") require.NoErrorf(t, err, "CLI process did not exit cleanly")
}) })
// Start a background goroutine to read stderr from the CLI and parse out the login URL. // Start a background goroutine to read stderr from the CLI and parse out the login URL.
loginURLChan := make(chan string, 1) loginURLChan := make(chan string, 1)
spawnTestGoroutine(t, func() (err error) { spawnTestGoroutine(ctx, t, func() error {
t.Helper()
defer func() {
closeErr := stderr.Close()
if closeErr == nil || errors.Is(closeErr, os.ErrClosed) {
return
}
if err == nil {
err = fmt.Errorf("stderr stream closed with error: %w", closeErr)
}
}()
reader := bufio.NewReader(testlib.NewLoggerReader(t, "stderr", stderr)) reader := bufio.NewReader(testlib.NewLoggerReader(t, "stderr", stderr))
scanner := bufio.NewScanner(reader) scanner := bufio.NewScanner(reader)
@ -328,16 +316,7 @@ func runPinnipedLoginOIDC(
// Start a background goroutine to read stdout from the CLI and parse out an ExecCredential. // Start a background goroutine to read stdout from the CLI and parse out an ExecCredential.
credOutputChan := make(chan clientauthenticationv1beta1.ExecCredential, 1) credOutputChan := make(chan clientauthenticationv1beta1.ExecCredential, 1)
spawnTestGoroutine(t, func() (err error) { spawnTestGoroutine(ctx, t, func() error {
defer func() {
closeErr := stdout.Close()
if closeErr == nil || errors.Is(closeErr, os.ErrClosed) {
return
}
if err == nil {
err = fmt.Errorf("stdout stream closed with error: %w", closeErr)
}
}()
reader := bufio.NewReader(testlib.NewLoggerReader(t, "stdout", stdout)) reader := bufio.NewReader(testlib.NewLoggerReader(t, "stdout", stdout))
var out clientauthenticationv1beta1.ExecCredential var out clientauthenticationv1beta1.ExecCredential
if err := json.NewDecoder(reader).Decode(&out); err != nil { if err := json.NewDecoder(reader).Decode(&out); err != nil {
@ -398,12 +377,33 @@ func readAndExpectEmpty(r io.Reader) (err error) {
return nil return nil
} }
// Note: Callers should ensure that f eventually returns, otherwise this helper will hang forever in t.Cleanup. // Note: Callers should ensure that f eventually returns, otherwise this helper will leak a go routine.
func spawnTestGoroutine(t *testing.T, f func() error) { func spawnTestGoroutine(ctx context.Context, t *testing.T, f func() error) {
t.Helper() t.Helper()
var eg errgroup.Group var eg errgroup.Group
t.Cleanup(func() { t.Cleanup(func() {
require.NoError(t, eg.Wait(), "background goroutine failed") egCh := make(chan error, 1) // do not block the go routine from exiting even after the select has completed
go func() {
egCh <- eg.Wait()
}()
leewayCh := make(chan struct{})
go func() {
<-ctx.Done()
// give f up to 30 seconds after the context is canceled to return
// this prevents "race" conditions where f is orchestrated via the same context
time.Sleep(30 * time.Second)
close(leewayCh)
}()
select {
case <-leewayCh:
t.Errorf("background goroutine hung: %v", ctx.Err())
case err := <-egCh:
require.NoError(t, err, "background goroutine failed")
}
}) })
eg.Go(f) eg.Go(f)
} }

View File

@ -19,6 +19,7 @@ import (
"regexp" "regexp"
"sort" "sort"
"strings" "strings"
"sync/atomic"
"testing" "testing"
"time" "time"
@ -50,7 +51,7 @@ import (
func TestE2EFullIntegration(t *testing.T) { func TestE2EFullIntegration(t *testing.T) {
env := testlib.IntegrationEnv(t) env := testlib.IntegrationEnv(t)
ctx, cancelFunc := context.WithTimeout(context.Background(), 5*time.Minute) ctx, cancelFunc := context.WithTimeout(context.Background(), 10*time.Minute)
defer cancelFunc() defer cancelFunc()
// Build pinniped CLI. // Build pinniped CLI.
@ -107,6 +108,9 @@ func TestE2EFullIntegration(t *testing.T) {
// Add an OIDC upstream IDP and try using it to authenticate during kubectl commands. // Add an OIDC upstream IDP and try using it to authenticate during kubectl commands.
t.Run("with Supervisor OIDC upstream IDP and automatic flow", func(t *testing.T) { t.Run("with Supervisor OIDC upstream IDP and automatic flow", func(t *testing.T) {
testCtx, cancel := context.WithTimeout(ctx, 2*time.Minute)
t.Cleanup(cancel)
// Start a fresh browser driver because we don't want to share cookies between the various tests in this file. // Start a fresh browser driver because we don't want to share cookies between the various tests in this file.
page := browsertest.Open(t) page := browsertest.Open(t)
@ -158,7 +162,7 @@ func TestE2EFullIntegration(t *testing.T) {
// Run "kubectl get namespaces" which should trigger a browser login via the plugin. // Run "kubectl get namespaces" which should trigger a browser login via the plugin.
start := time.Now() start := time.Now()
kubectlCmd := exec.CommandContext(ctx, "kubectl", "get", "namespace", "--kubeconfig", kubeconfigPath) kubectlCmd := exec.CommandContext(testCtx, "kubectl", "get", "namespace", "--kubeconfig", kubeconfigPath)
kubectlCmd.Env = append(os.Environ(), env.ProxyEnv()...) kubectlCmd.Env = append(os.Environ(), env.ProxyEnv()...)
// Wrap the stdout and stderr pipes with TeeReaders which will copy each incremental read to an // Wrap the stdout and stderr pipes with TeeReaders which will copy each incremental read to an
@ -175,8 +179,8 @@ func TestE2EFullIntegration(t *testing.T) {
require.NoError(t, kubectlCmd.Start()) require.NoError(t, kubectlCmd.Start())
t.Cleanup(func() { t.Cleanup(func() {
// Consume readers so that the tee buffers will contain all the output so far. // Consume readers so that the tee buffers will contain all the output so far.
_, stdoutReadAllErr := ioutil.ReadAll(stdoutPipe) _, stdoutReadAllErr := readAllCtx(testCtx, stdoutPipe)
_, stderrReadAllErr := ioutil.ReadAll(stderrPipe) _, stderrReadAllErr := readAllCtx(testCtx, stderrPipe)
// Note that Wait closes the stdout/stderr pipes, so we don't need to close them ourselves. // Note that Wait closes the stdout/stderr pipes, so we don't need to close them ourselves.
waitErr := kubectlCmd.Wait() waitErr := kubectlCmd.Wait()
@ -194,7 +198,7 @@ func TestE2EFullIntegration(t *testing.T) {
// Start a background goroutine to read stderr from the CLI and parse out the login URL. // Start a background goroutine to read stderr from the CLI and parse out the login URL.
loginURLChan := make(chan string, 1) loginURLChan := make(chan string, 1)
spawnTestGoroutine(t, func() error { spawnTestGoroutine(testCtx, t, func() error {
reader := bufio.NewReader(testlib.NewLoggerReader(t, "stderr", stderrPipe)) reader := bufio.NewReader(testlib.NewLoggerReader(t, "stderr", stderrPipe))
scanner := bufio.NewScanner(reader) scanner := bufio.NewScanner(reader)
for scanner.Scan() { for scanner.Scan() {
@ -209,8 +213,8 @@ func TestE2EFullIntegration(t *testing.T) {
// Start a background goroutine to read stdout from kubectl and return the result as a string. // Start a background goroutine to read stdout from kubectl and return the result as a string.
kubectlOutputChan := make(chan string, 1) kubectlOutputChan := make(chan string, 1)
spawnTestGoroutine(t, func() error { spawnTestGoroutine(testCtx, t, func() error {
output, err := ioutil.ReadAll(stdoutPipe) output, err := readAllCtx(testCtx, stdoutPipe)
if err != nil { if err != nil {
return err return err
} }
@ -227,7 +231,7 @@ func TestE2EFullIntegration(t *testing.T) {
require.Fail(t, "timed out waiting for login URL") require.Fail(t, "timed out waiting for login URL")
case loginURL = <-loginURLChan: case loginURL = <-loginURLChan:
} }
t.Logf("navigating to login page") t.Logf("navigating to login page: %q", loginURL)
require.NoError(t, page.Navigate(loginURL)) require.NoError(t, page.Navigate(loginURL))
// Expect to be redirected to the upstream provider and log in. // Expect to be redirected to the upstream provider and log in.
@ -253,7 +257,7 @@ func TestE2EFullIntegration(t *testing.T) {
t.Logf("first kubectl command took %s", time.Since(start).String()) t.Logf("first kubectl command took %s", time.Since(start).String())
requireUserCanUseKubectlWithoutAuthenticatingAgain(ctx, t, env, requireUserCanUseKubectlWithoutAuthenticatingAgain(testCtx, t, env,
downstream, downstream,
kubeconfigPath, kubeconfigPath,
sessionCachePath, sessionCachePath,
@ -1170,3 +1174,42 @@ func getSecretNameFromSignature(t *testing.T, signature string, typeLabel string
signatureAsValidName := strings.ToLower(b32.EncodeToString(signatureBytes)) signatureAsValidName := strings.ToLower(b32.EncodeToString(signatureBytes))
return fmt.Sprintf("pinniped-storage-%s-%s", typeLabel, signatureAsValidName) return fmt.Sprintf("pinniped-storage-%s-%s", typeLabel, signatureAsValidName)
} }
func readAllCtx(ctx context.Context, r io.Reader) ([]byte, error) {
errCh := make(chan error, 1)
data := &atomic.Value{}
go func() { // copied from io.ReadAll and modified to use the atomic.Value above
b := make([]byte, 0, 512)
data.Store(string(b)) // cast to string to make a copy of the byte slice
for {
if len(b) == cap(b) {
// Add more capacity (let append pick how much).
b = append(b, 0)[:len(b)]
data.Store(string(b)) // cast to string to make a copy of the byte slice
}
n, err := r.Read(b[len(b):cap(b)])
b = b[:len(b)+n]
data.Store(string(b)) // cast to string to make a copy of the byte slice
if err != nil {
if err == io.EOF {
err = nil
}
errCh <- err
return
}
}
}()
select {
case <-ctx.Done():
b, _ := data.Load().(string)
return nil, fmt.Errorf("failed to complete read all: %w, data read so far:\n%q", ctx.Err(), b)
case err := <-errCh:
b, _ := data.Load().(string)
if len(b) == 0 {
return nil, err
}
return []byte(b), err
}
}