// Copyright 2023 the Pinniped contributors. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0

package integration

import (
	"bytes"
	"context"
	"io"
	"strings"
	"testing"
	"time"

	"github.com/stretchr/testify/require"
	corev1 "k8s.io/api/core/v1"
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
	"k8s.io/utils/strings/slices"

	"go.pinniped.dev/test/testlib"
)

// TestPodShutdown_Disruptive is intended to test that the Supervisor and Concierge pods can
// perform a graceful shutdown. Most importantly, the leader pods should give up their leases
// before they die.
// Never run this test in parallel since deleting the pods is disruptive, see main_test.go.
func TestPodShutdown_Disruptive(t *testing.T) {
	// Only run this test in CI on Kind clusters, because something about restarting the pods
	// in this test breaks the "kubectl port-forward" commands that we are using in CI for
	// AKS, EKS, and GKE clusters. The Go code that we wrote for graceful pod shutdown should
	// not be sensitive to which distribution it runs on, so running this test only on Kind
	// should give us sufficient coverage for what we are trying to test here.
	env := testlib.IntegrationEnv(t, testlib.SkipPodRestartAssertions()).
		WithKubeDistribution(testlib.KindDistro)

	testShutdownAllPodsOfApp(t, env, env.ConciergeNamespace, env.ConciergeAppName, "-kube-cert-agent-")
	testShutdownAllPodsOfApp(t, env, env.SupervisorNamespace, env.SupervisorAppName, "")
}

func testShutdownAllPodsOfApp(t *testing.T, env *testlib.TestEnv, namespace string, appName string, ignorePodsWithNameSubstring string) {
	// Precondition: the app should have some pods running initially.
	initialPods := getRunningPodsByNamePrefix(t, namespace, appName+"-", ignorePodsWithNameSubstring)
	require.Greater(t, len(initialPods), 0)

	// Precondition: the leader election lease should contain the name of one of the initial pods as the lease's holder.
	waitForLeaderElectionLeaseToHaveHolderIdentity(t, namespace, appName,
		func(holder string) bool { return holder != "" && slices.Contains(namesOfPods(initialPods), holder) }, 2*time.Minute)

	// Start tailing the logs of all the pods in background goroutines. This struct will keep track
	// of each background log tail.
	type podLog struct {
		pod        corev1.Pod    // which pod's logs are being tailed
		tailDoneCh chan struct{} // this channel will be closed when it is safe to read from logsBuf
		logsBuf    *bytes.Buffer // the text of the logs will be put in this buffer
	}
	podLogs := make([]*podLog, 0)
	// Skip tailing pod logs for test runs that are using alternate group suffixes. There seems to be a bug in our
	// kubeclient package which causes an "unable to find resp serialier" (sic) error for pod log API responses when
	// the middleware is active. Since we do not tail pod logs in production code (or anywhere else at this time),
	// we don't need to fix that bug right now just for this test.
	if env.APIGroupSuffix == "pinniped.dev" {
		// For each pod, start tailing its logs.
		for _, pod := range initialPods {
			tailDoneCh, logTailBuf := tailFollowPodLogs(t, pod)
			podLogs = append(podLogs, &podLog{
				pod:        pod,
				tailDoneCh: tailDoneCh,
				logsBuf:    logTailBuf,
			})
		}
	}

	// Scale down the deployment's number of replicas to 0, which will shut down all the pods.
	originalScale := updateDeploymentScale(t, namespace, appName, 0)

	// When the test is over, restore the deployment to the original scale.
	t.Cleanup(func() {
		updateDeploymentScale(t, namespace, appName, originalScale)

		// Wait for all the new pods to be running.
		var newPods []corev1.Pod
		testlib.RequireEventually(t, func(requireEventually *require.Assertions) {
			newPods = getRunningPodsByNamePrefix(t, namespace, appName+"-", ignorePodsWithNameSubstring)
			requireEventually.Len(newPods, originalScale, "wanted pods to return to original scale")
		}, 2*time.Minute, 200*time.Millisecond)

		// After a short time, leader election should have finished and the lease should contain the name of
		// one of the new pods as the lease's holder.
		waitForLeaderElectionLeaseToHaveHolderIdentity(t, namespace, appName,
			func(holder string) bool { return holder != "" && slices.Contains(namesOfPods(newPods), holder) }, 1*time.Minute)

		t.Logf("new pod of Deployment %s/%s has acquired the leader election lease", namespace, appName)
	})

	// Double check: the deployment's previous scale should have equaled the actual number of running pods from
	// the start of the test (before we scaled down).
	require.Equal(t, len(initialPods), originalScale)

	// Now that we have adjusted the scale to 0, the pods should go away.
	// Our pods are intended to gracefully shut down within a few seconds, so fail unless it happens fairly quickly.
	testlib.RequireEventually(t, func(requireEventually *require.Assertions) {
		pods := getRunningPodsByNamePrefix(t, namespace, appName+"-", ignorePodsWithNameSubstring)
		requireEventually.Len(pods, 0, "wanted no pods but found some")
	}, 20*time.Second, 200*time.Millisecond)

	// Look for some interesting log messages in each of the now-dead pod's logs, if we started tailing them above.
	for _, pl := range podLogs {
		// Wait for the logs of the now-dead pod to be finished collecting.
		t.Logf("waiting for tail of pod logs for pod %q", pl.pod.Name)
		<-pl.tailDoneCh
		// Assert that the Kubernetes generic apiserver library has started and finished a graceful
		// shutdown according to its log messages. This is to make sure that the whole graceful shutdown
		// process was performed successfully and without being blocked.
		require.Containsf(t, pl.logsBuf.String(), `"[graceful-termination] shutdown event","name":"ShutdownInitiated"`,
			"did not find expected message in pod log for pod %q", pl.pod.Name)
		require.Containsf(t, pl.logsBuf.String(), `"[graceful-termination] apiserver is exiting\n"`,
			"did not find expected message in pod log for pod %q", pl.pod.Name)
		t.Logf("found expected graceful-termination messages in the logs of pod %q", pl.pod.Name)
	}

	// The leader election lease should already contain the empty string as the holder, because the old leader
	// pod should have given up the lease during its graceful shutdown.
	waitForLeaderElectionLeaseToHaveHolderIdentity(t, namespace, appName,
		func(holder string) bool { return holder == "" }, 1*time.Minute)
}

// Given a list of pods, return a list of their names.
func namesOfPods(pods []corev1.Pod) []string {
	names := make([]string, len(pods))
	for i, pod := range pods {
		names[i] = pod.Name
	}
	return names
}

func getRunningPodsByNamePrefix(
	t *testing.T,
	namespace string,
	podNamePrefix string,
	podNameExcludeSubstring string,
) (foundPods []corev1.Pod) {
	t.Helper()
	ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute)
	defer cancel()
	client := testlib.NewKubernetesClientset(t)

	pods, err := client.CoreV1().Pods(namespace).List(ctx, metav1.ListOptions{})
	require.NoError(t, err)

	for _, pod := range pods.Items {
		if !strings.HasPrefix(pod.Name, podNamePrefix) {
			continue
		}
		if podNameExcludeSubstring != "" && strings.Contains(pod.Name, podNameExcludeSubstring) {
			continue
		}
		if pod.Status.Phase != corev1.PodRunning {
			continue
		}
		foundPods = append(foundPods, pod)
	}

	return foundPods
}

func updateDeploymentScale(t *testing.T, namespace string, deploymentName string, newScale int) int {
	t.Helper()
	ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute)
	defer cancel()
	client := testlib.NewKubernetesClientset(t)

	initialScale, err := client.AppsV1().Deployments(namespace).GetScale(ctx, deploymentName, metav1.GetOptions{})
	require.NoError(t, err)

	desiredScale := initialScale.DeepCopy()
	desiredScale.Spec.Replicas = int32(newScale)
	updatedScale, err := client.AppsV1().Deployments(namespace).UpdateScale(ctx, deploymentName, desiredScale, metav1.UpdateOptions{})
	require.NoError(t, err)
	t.Logf("updated scale of Deployment %s/%s from %d to %d",
		namespace, deploymentName, initialScale.Spec.Replicas, updatedScale.Spec.Replicas)

	return int(initialScale.Spec.Replicas)
}

func tailFollowPodLogs(t *testing.T, pod corev1.Pod) (chan struct{}, *bytes.Buffer) {
	t.Helper()
	done := make(chan struct{})
	var buf bytes.Buffer
	client := testlib.NewKubernetesClientset(t)

	go func() {
		// At the end of this block, signal that we are done writing to the returned buf,
		// so it is now safe to read the logs from the returned buf.
		defer close(done)

		ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute)
		defer cancel()

		req := client.CoreV1().Pods(pod.Namespace).GetLogs(pod.Name, &corev1.PodLogOptions{
			Follow: true, // keep streaming until completion
		})

		// This line should block until the pod dies or the context expires.
		body, err := req.Stream(ctx)
		require.NoError(t, err)

		_, err = io.Copy(&buf, body)
		require.NoError(t, err)

		require.NoError(t, body.Close())
	}()

	return done, &buf
}

func waitForLeaderElectionLeaseToHaveHolderIdentity(
	t *testing.T,
	namespace string,
	leaseName string,
	holderIdentityPredicate func(string) bool,
	waitDuration time.Duration,
) {
	t.Helper()
	ctx, cancel := context.WithTimeout(context.Background(), waitDuration*2)
	defer cancel()
	client := testlib.NewKubernetesClientset(t)

	testlib.RequireEventually(t, func(requireEventually *require.Assertions) {
		lease, err := client.CoordinationV1().Leases(namespace).Get(ctx, leaseName, metav1.GetOptions{})
		requireEventually.NoError(err)
		requireEventually.Truef(holderIdentityPredicate(*lease.Spec.HolderIdentity),
			"leader election lease had holder %s", *lease.Spec.HolderIdentity)
	}, waitDuration, 200*time.Millisecond)
}