// Copyright 2023 the Pinniped contributors. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 package tokenclient import ( "context" "fmt" "time" "github.com/pkg/errors" authenticationv1 "k8s.io/api/authentication/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" "k8s.io/utils/clock" "go.pinniped.dev/internal/backoff" "go.pinniped.dev/internal/plog" ) type WhatToDoWithTokenFunc func(authenticationv1.TokenRequestStatus, metav1.Duration) error type TokenClient struct { namespace string serviceAccountName string k8sClient kubernetes.Interface whatToDoWithToken WhatToDoWithTokenFunc expirationSeconds int64 clock clock.Clock logger plog.Logger } type Opt func(client *TokenClient) func WithExpirationSeconds(expirationSeconds int64) Opt { return func(client *TokenClient) { client.expirationSeconds = expirationSeconds } } func New( namespace string, serviceAccountName string, k8sClient kubernetes.Interface, whatToDoWithToken WhatToDoWithTokenFunc, logger plog.Logger, opts ...Opt, ) *TokenClient { client := &TokenClient{ namespace: namespace, serviceAccountName: serviceAccountName, k8sClient: k8sClient, whatToDoWithToken: whatToDoWithToken, expirationSeconds: 600, clock: clock.RealClock{}, logger: logger, } for _, opt := range opts { opt(client) } return client } type howToFetchTokenFromAPIServer func(tokenRequest *authenticationv1.TokenRequest) (*authenticationv1.TokenRequest, error) func (tokenClient TokenClient) Start(ctx context.Context) { sleeper := make(chan time.Time, 1) // Make sure that the <-sleeper below gets run once immediately. sleeper <- time.Now() for { select { case <-ctx.Done(): tokenClient.logger.Info("TokenClient was cancelled and is stopping") return case <-sleeper: var tokenTTL metav1.Duration err := backoff.WithContext(ctx, &backoff.InfiniteBackoff{ Duration: 10 * time.Millisecond, MaxDuration: 5 * time.Second, Factor: 2.0, }, func(ctx context.Context) (bool, error) { var err error var tokenRequestStatus authenticationv1.TokenRequestStatus tokenRequestStatus, tokenTTL, err = tokenClient.fetchToken( func(tokenRequest *authenticationv1.TokenRequest) (*authenticationv1.TokenRequest, error) { return tokenClient.k8sClient.CoreV1().ServiceAccounts(tokenClient.namespace).CreateToken( ctx, tokenClient.serviceAccountName, tokenRequest, metav1.CreateOptions{}) }, ) if err != nil { tokenClient.logger.Warning(fmt.Sprintf("Could not fetch token: %s\n", err)) // We got an error. Swallow it and ask for retry. return false, nil } err = tokenClient.whatToDoWithToken(tokenRequestStatus, tokenTTL) if err != nil { tokenClient.logger.Warning(fmt.Sprintf("unable to pass token to the caller: %s\n", err)) // We got an error. Swallow it and ask for retry. return false, nil } // We got a token. Stop backing off. return true, nil }) if err != nil { // We were cancelled during our WithContext. We know it was not due to some other // error because our last argument to WithContext above never returns any errors. return } // Schedule ourselves to wake up in the future. time.AfterFunc(tokenTTL.Duration*4/5, func() { sleeper <- time.Now() }) } } } func (tokenClient TokenClient) fetchToken( howToFetchTokenFromAPIServer howToFetchTokenFromAPIServer, ) (authenticationv1.TokenRequestStatus, metav1.Duration, error) { tokenClient.logger.Debug(fmt.Sprintf("refreshing cache at time=%s\n", tokenClient.clock.Now().Format(time.RFC3339))) tokenRequestInput := &authenticationv1.TokenRequest{ Spec: authenticationv1.TokenRequestSpec{ ExpirationSeconds: &tokenClient.expirationSeconds, }, } tokenRequest, err := howToFetchTokenFromAPIServer(tokenRequestInput) emptyMetav1Duration := metav1.Duration{Duration: 0} if err != nil { return authenticationv1.TokenRequestStatus{}, emptyMetav1Duration, errors.Wrap(err, "error creating token") } if tokenRequest == nil { return authenticationv1.TokenRequestStatus{}, emptyMetav1Duration, errors.New("tokenRequest is nil after request") } ttl := metav1.Duration{Duration: tokenRequest.Status.ExpirationTimestamp.Sub(tokenClient.clock.Now())} return tokenRequest.Status, ttl, nil }