9118869d04
Signed-off-by: Monis Khan <mok@vmware.com>
276 lines
8.0 KiB
Go
276 lines
8.0 KiB
Go
/*
|
|
Copyright 2020 VMware, Inc.
|
|
SPDX-License-Identifier: Apache-2.0
|
|
*/
|
|
|
|
// Package app is the command line entry point for placeholder-name.
|
|
package app
|
|
|
|
import (
|
|
"context"
|
|
"crypto/tls"
|
|
"crypto/x509/pkix"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"log"
|
|
"net"
|
|
"net/http"
|
|
"time"
|
|
|
|
"github.com/spf13/cobra"
|
|
"golang.org/x/sync/errgroup"
|
|
corev1 "k8s.io/api/core/v1"
|
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
|
"k8s.io/apimachinery/pkg/runtime"
|
|
"k8s.io/apimachinery/pkg/util/intstr"
|
|
"k8s.io/apiserver/pkg/authentication/authenticator"
|
|
"k8s.io/client-go/kubernetes"
|
|
corev1client "k8s.io/client-go/kubernetes/typed/core/v1"
|
|
restclient "k8s.io/client-go/rest"
|
|
apiregistrationv1 "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1"
|
|
aggregationv1client "k8s.io/kube-aggregator/pkg/client/clientset_generated/clientset"
|
|
|
|
"github.com/suzerain-io/placeholder-name-api/pkg/apis/placeholder"
|
|
"github.com/suzerain-io/placeholder-name/internal/autoregistration"
|
|
"github.com/suzerain-io/placeholder-name/internal/certauthority"
|
|
"github.com/suzerain-io/placeholder-name/internal/downward"
|
|
"github.com/suzerain-io/placeholder-name/pkg/config"
|
|
"github.com/suzerain-io/placeholder-name/pkg/handlers"
|
|
)
|
|
|
|
// shutdownGracePeriod controls how long active connections are allowed to continue at shutdown.
|
|
const shutdownGracePeriod = 5 * time.Second
|
|
|
|
// App is an object that represents the placeholder-name application.
|
|
type App struct {
|
|
cmd *cobra.Command
|
|
|
|
// CLI flags
|
|
configPath string
|
|
downwardAPIPath string
|
|
|
|
// listen address for healthz serve
|
|
healthAddr string
|
|
|
|
// listen address for main serve
|
|
mainAddr string
|
|
|
|
// webhook authenticates tokens
|
|
webhook authenticator.Token
|
|
}
|
|
|
|
// New constructs a new App with command line args, stdout and stderr.
|
|
func New(args []string, stdout, stderr io.Writer) *App {
|
|
a := &App{
|
|
healthAddr: ":8080",
|
|
mainAddr: ":443",
|
|
}
|
|
|
|
cmd := &cobra.Command{
|
|
Use: `placeholder-name`,
|
|
Long: `placeholder-name provides a generic API for mapping an external
|
|
credential from somewhere to an internal credential to be used for
|
|
authenticating to the Kubernetes API.`,
|
|
RunE: func(cmd *cobra.Command, args []string) error {
|
|
// Load the Kubernetes client configuration (kubeconfig),
|
|
kubeConfig, err := restclient.InClusterConfig()
|
|
if err != nil {
|
|
return fmt.Errorf("could not load in-cluster configuration: %w", err)
|
|
}
|
|
|
|
// explicitly use protobuf when talking to built-in kube APIs
|
|
protoKubeConfig := createProtoKubeConfig(kubeConfig)
|
|
|
|
// Connect to the core Kubernetes API.
|
|
k8s, err := kubernetes.NewForConfig(protoKubeConfig)
|
|
if err != nil {
|
|
return fmt.Errorf("could not initialize Kubernetes client: %w", err)
|
|
}
|
|
|
|
// Connect to the Kubernetes aggregation API.
|
|
aggregation, err := aggregationv1client.NewForConfig(protoKubeConfig)
|
|
if err != nil {
|
|
return fmt.Errorf("could not initialize Kubernetes client: %w", err)
|
|
}
|
|
return a.serve(context.Background(), k8s.CoreV1(), aggregation)
|
|
},
|
|
Args: cobra.NoArgs,
|
|
}
|
|
|
|
cmd.SetArgs(args)
|
|
cmd.SetOut(stdout)
|
|
cmd.SetErr(stderr)
|
|
|
|
cmd.Flags().StringVarP(
|
|
&a.configPath,
|
|
"config",
|
|
"c",
|
|
"placeholder-name.yaml",
|
|
"path to configuration file",
|
|
)
|
|
|
|
cmd.Flags().StringVar(
|
|
&a.downwardAPIPath,
|
|
"downward-api-path",
|
|
"/etc/podinfo",
|
|
"path to Downward API volume mount",
|
|
)
|
|
|
|
a.cmd = cmd
|
|
|
|
return a
|
|
}
|
|
|
|
func (a *App) Run() error {
|
|
return a.cmd.Execute()
|
|
}
|
|
|
|
func (a *App) serve(ctx context.Context, k8s corev1client.CoreV1Interface, aggregation aggregationv1client.Interface) error {
|
|
cfg, err := config.FromPath(a.configPath)
|
|
if err != nil {
|
|
return fmt.Errorf("could not load config: %w", err)
|
|
}
|
|
|
|
webhook, err := config.NewWebhook(cfg.WebhookConfig)
|
|
if err != nil {
|
|
return fmt.Errorf("could create webhook client: %w", err)
|
|
}
|
|
a.webhook = webhook
|
|
|
|
podinfo, err := downward.Load(a.downwardAPIPath)
|
|
if err != nil {
|
|
return fmt.Errorf("could not read pod metadata: %w", err)
|
|
}
|
|
|
|
ca, err := certauthority.New(pkix.Name{CommonName: "Placeholder CA"})
|
|
if err != nil {
|
|
return fmt.Errorf("could not initialize CA: %w", err)
|
|
}
|
|
caBundle, err := ca.Bundle()
|
|
if err != nil {
|
|
return fmt.Errorf("could not read CA bundle: %w", err)
|
|
}
|
|
log.Printf("initialized CA bundle:\n%s", string(caBundle))
|
|
|
|
cert, err := ca.Issue(
|
|
pkix.Name{CommonName: "Placeholder Server"},
|
|
[]string{"placeholder-serve"},
|
|
24*365*time.Hour,
|
|
)
|
|
if err != nil {
|
|
return fmt.Errorf("could not issue serving certificate: %w", err)
|
|
}
|
|
|
|
// Start an errgroup to manage the lifetimes of the various listener goroutines.
|
|
eg, ctx := errgroup.WithContext(ctx)
|
|
|
|
// Dynamically register our v1alpha1 API service.
|
|
service := corev1.Service{
|
|
ObjectMeta: metav1.ObjectMeta{Name: "placeholder-name-api"},
|
|
Spec: corev1.ServiceSpec{
|
|
Ports: []corev1.ServicePort{
|
|
{
|
|
Protocol: corev1.ProtocolTCP,
|
|
Port: 443,
|
|
TargetPort: intstr.IntOrString{IntVal: 443}, //TODO: parse this out of mainAddr
|
|
},
|
|
},
|
|
Selector: podinfo.Labels,
|
|
Type: corev1.ServiceTypeClusterIP,
|
|
},
|
|
}
|
|
apiService := apiregistrationv1.APIService{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Name: "v1alpha1." + placeholder.GroupName,
|
|
},
|
|
Spec: apiregistrationv1.APIServiceSpec{
|
|
Group: placeholder.GroupName,
|
|
Version: "v1alpha1",
|
|
CABundle: caBundle,
|
|
GroupPriorityMinimum: 2500,
|
|
VersionPriority: 10,
|
|
},
|
|
}
|
|
if err := autoregistration.Setup(ctx, autoregistration.SetupOptions{
|
|
CoreV1: k8s,
|
|
AggregationV1: aggregation,
|
|
Namespace: podinfo.Namespace,
|
|
ServiceTemplate: service,
|
|
APIServiceTemplate: apiService,
|
|
}); err != nil {
|
|
return fmt.Errorf("could not register API service: %w", err)
|
|
}
|
|
|
|
// Start healthz listener
|
|
eg.Go(func() error {
|
|
log.Printf("Starting healthz serve on %v", a.healthAddr)
|
|
server := http.Server{
|
|
BaseContext: func(_ net.Listener) context.Context { return ctx },
|
|
Addr: a.healthAddr,
|
|
Handler: handlers.New(),
|
|
}
|
|
return runGracefully(ctx, &server, eg, server.ListenAndServe)
|
|
})
|
|
|
|
// Start main service listener
|
|
eg.Go(func() error {
|
|
log.Printf("Starting main serve on %v", a.mainAddr)
|
|
server := http.Server{
|
|
BaseContext: func(_ net.Listener) context.Context { return ctx },
|
|
Addr: a.mainAddr,
|
|
TLSConfig: &tls.Config{
|
|
MinVersion: tls.VersionTLS12,
|
|
Certificates: []tls.Certificate{*cert},
|
|
},
|
|
Handler: http.HandlerFunc(a.exampleHandler),
|
|
}
|
|
return runGracefully(ctx, &server, eg, func() error {
|
|
// Doc for ListenAndServeTLS says we can pass empty strings if we configured
|
|
// keypair for TLS in http.Server.TLSConfig.
|
|
return server.ListenAndServeTLS("", "")
|
|
})
|
|
})
|
|
|
|
if err := eg.Wait(); !errors.Is(err, http.ErrServerClosed) {
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// exampleHandler is a stub to be replaced with our real server logic.
|
|
func (a *App) exampleHandler(w http.ResponseWriter, r *http.Request) {
|
|
ctx, cancel := context.WithTimeout(r.Context(), 3*time.Second)
|
|
defer cancel()
|
|
|
|
rsp, authenticated, err := a.webhook.AuthenticateToken(ctx, "")
|
|
log.Printf("token response: %+v", rsp)
|
|
log.Printf("token authenticated: %+v", authenticated)
|
|
log.Printf("token err: %+v", err)
|
|
|
|
_, _ = w.Write([]byte("hello world"))
|
|
}
|
|
|
|
// runGracefully runs an http.Server with graceful shutdown.
|
|
func runGracefully(ctx context.Context, srv *http.Server, eg *errgroup.Group, f func() error) error {
|
|
// Start the listener in a child goroutine.
|
|
eg.Go(f)
|
|
|
|
// If/when the context is canceled or times out, initiate shutting down the serve.
|
|
<-ctx.Done()
|
|
|
|
shutdownCtx, cancel := context.WithTimeout(context.Background(), shutdownGracePeriod)
|
|
defer cancel()
|
|
return srv.Shutdown(shutdownCtx)
|
|
}
|
|
|
|
// createProtoKubeConfig returns a copy of the input config with the ContentConfig set to use protobuf.
|
|
// do not use this config to communicate with any CRD based APIs.
|
|
func createProtoKubeConfig(kubeConfig *restclient.Config) *restclient.Config {
|
|
protoKubeConfig := restclient.CopyConfig(kubeConfig)
|
|
const protoThenJSON = runtime.ContentTypeProtobuf + "," + runtime.ContentTypeJSON
|
|
protoKubeConfig.AcceptContentTypes = protoThenJSON
|
|
protoKubeConfig.ContentType = runtime.ContentTypeProtobuf
|
|
return protoKubeConfig
|
|
}
|