Move starting/stopping impersonation proxy server to a new controller

- Watch a configmap to read the configuration of the impersonation
  proxy and reconcile it.
- Implements "auto" mode by querying the API for control plane nodes.
- WIP: does not create a load balancer or proper TLS certificates yet.
  Those will come in future commits.

Signed-off-by: Margo Crawford <margaretc@vmware.com>
This commit is contained in:
Ryan Richard 2021-02-11 17:22:47 -08:00
parent 9b87906a30
commit 5cd60fa5f9
9 changed files with 958 additions and 104 deletions

View File

@ -31,9 +31,12 @@ rules:
resources: [ securitycontextconstraints ] resources: [ securitycontextconstraints ]
verbs: [ use ] verbs: [ use ]
resourceNames: [ nonroot ] resourceNames: [ nonroot ]
- apiGroups: [""] - apiGroups: [ "" ]
resources: ["users", "groups"] resources: [ "users", "groups" ]
verbs: ["impersonate"] verbs: [ "impersonate" ]
- apiGroups: [ "" ]
resources: [ nodes ]
verbs: [ list ]
- apiGroups: - apiGroups:
- #@ pinnipedDevAPIGroupWithPrefix("config.concierge") - #@ pinnipedDevAPIGroupWithPrefix("config.concierge")
resources: [ credentialissuers ] resources: [ credentialissuers ]
@ -84,9 +87,12 @@ rules:
- apiGroups: [ "" ] - apiGroups: [ "" ]
resources: [ pods/exec ] resources: [ pods/exec ]
verbs: [ create ] verbs: [ create ]
- apiGroups: [apps] - apiGroups: [ apps ]
resources: [replicasets,deployments] resources: [ replicasets,deployments ]
verbs: [get] verbs: [ get ]
- apiGroups: [ "" ]
resources: [ configmaps ]
verbs: [ list, get, watch ]
--- ---
kind: RoleBinding kind: RoleBinding
apiVersion: rbac.authorization.k8s.io/v1 apiVersion: rbac.authorization.k8s.io/v1

View File

@ -55,17 +55,21 @@ type Config struct {
TLS *TLSConfig `json:"tls,omitempty"` TLS *TLSConfig `json:"tls,omitempty"`
} }
func FromConfigMap(configMap *v1.ConfigMap) (*Config, error) { func NewConfig() *Config {
return &Config{Mode: ModeAuto}
}
func ConfigFromConfigMap(configMap *v1.ConfigMap) (*Config, error) {
stringConfig, ok := configMap.Data[ConfigMapDataKey] stringConfig, ok := configMap.Data[ConfigMapDataKey]
if !ok { if !ok {
return nil, fmt.Errorf(`ConfigMap is missing expected key "%s"`, ConfigMapDataKey) return nil, fmt.Errorf(`ConfigMap is missing expected key "%s"`, ConfigMapDataKey)
} }
var config Config config := NewConfig()
if err := yaml.Unmarshal([]byte(stringConfig), &config); err != nil { if err := yaml.Unmarshal([]byte(stringConfig), config); err != nil {
return nil, fmt.Errorf("decode yaml: %w", err) return nil, fmt.Errorf("decode yaml: %w", err)
} }
if config.Mode == "" { if config.Mode != ModeAuto && config.Mode != ModeEnabled && config.Mode != ModeDisabled {
config.Mode = ModeAuto // set the default value return nil, fmt.Errorf(`illegal value for "mode": %s`, config.Mode)
} }
return &config, nil return config, nil
} }

View File

@ -13,7 +13,12 @@ import (
"go.pinniped.dev/internal/here" "go.pinniped.dev/internal/here"
) )
func TestFromConfigMap(t *testing.T) { func TestNewConfig(t *testing.T) {
// It defaults the mode.
require.Equal(t, &Config{Mode: ModeAuto}, NewConfig())
}
func TestConfigFromConfigMap(t *testing.T) {
tests := []struct { tests := []struct {
name string name string
configMap *v1.ConfigMap configMap *v1.ConfigMap
@ -27,11 +32,11 @@ func TestFromConfigMap(t *testing.T) {
ObjectMeta: metav1.ObjectMeta{}, ObjectMeta: metav1.ObjectMeta{},
Data: map[string]string{ Data: map[string]string{
"config.yaml": here.Doc(` "config.yaml": here.Doc(`
mode: enabled mode: enabled
endpoint: https://proxy.example.com:8443/ endpoint: https://proxy.example.com:8443/
tls: tls:
certificateAuthoritySecretName: my-ca-crt certificateAuthoritySecretName: my-ca-crt
tlsSecretName: my-tls-certificate-and-key tlsSecretName: my-tls-certificate-and-key
`), `),
}, },
}, },
@ -59,6 +64,51 @@ func TestFromConfigMap(t *testing.T) {
TLS: nil, TLS: nil,
}, },
}, },
{
name: "valid config with mode enabled",
configMap: &v1.ConfigMap{
TypeMeta: metav1.TypeMeta{},
ObjectMeta: metav1.ObjectMeta{},
Data: map[string]string{
"config.yaml": "mode: enabled",
},
},
wantConfig: &Config{
Mode: "enabled",
Endpoint: "",
TLS: nil,
},
},
{
name: "valid config with mode disabled",
configMap: &v1.ConfigMap{
TypeMeta: metav1.TypeMeta{},
ObjectMeta: metav1.ObjectMeta{},
Data: map[string]string{
"config.yaml": "mode: disabled",
},
},
wantConfig: &Config{
Mode: "disabled",
Endpoint: "",
TLS: nil,
},
},
{
name: "valid config with mode auto",
configMap: &v1.ConfigMap{
TypeMeta: metav1.TypeMeta{},
ObjectMeta: metav1.ObjectMeta{},
Data: map[string]string{
"config.yaml": "mode: auto",
},
},
wantConfig: &Config{
Mode: "auto",
Endpoint: "",
TLS: nil,
},
},
{ {
name: "wrong key in configmap", name: "wrong key in configmap",
configMap: &v1.ConfigMap{ configMap: &v1.ConfigMap{
@ -81,12 +131,23 @@ func TestFromConfigMap(t *testing.T) {
}, },
wantError: "decode yaml: error unmarshaling JSON: while decoding JSON: json: cannot unmarshal string into Go value of type impersonator.Config", wantError: "decode yaml: error unmarshaling JSON: while decoding JSON: json: cannot unmarshal string into Go value of type impersonator.Config",
}, },
{
name: "illegal value for mode in configmap",
configMap: &v1.ConfigMap{
TypeMeta: metav1.TypeMeta{},
ObjectMeta: metav1.ObjectMeta{},
Data: map[string]string{
"config.yaml": "mode: unexpected-value",
},
},
wantError: `illegal value for "mode": unexpected-value`,
},
} }
for _, tt := range tests { for _, tt := range tests {
test := tt test := tt
t.Run(test.name, func(t *testing.T) { t.Run(test.name, func(t *testing.T) {
config, err := FromConfigMap(test.configMap) config, err := ConfigFromConfigMap(test.configMap)
require.Equal(t, test.wantConfig, config) require.Equal(t, test.wantConfig, config)
if test.wantError != "" { if test.wantError != "" {
require.EqualError(t, err, test.wantError) require.EqualError(t, err, test.wantError)

View File

@ -33,13 +33,13 @@ var allowedHeaders = []string{
"Upgrade", "Upgrade",
} }
type Proxy struct { type proxy struct {
cache *authncache.Cache cache *authncache.Cache
proxy *httputil.ReverseProxy proxy *httputil.ReverseProxy
log logr.Logger log logr.Logger
} }
func New(cache *authncache.Cache, log logr.Logger) (*Proxy, error) { func New(cache *authncache.Cache, log logr.Logger) (http.Handler, error) {
return newInternal(cache, log, func() (*rest.Config, error) { return newInternal(cache, log, func() (*rest.Config, error) {
client, err := kubeclient.New() client, err := kubeclient.New()
if err != nil { if err != nil {
@ -49,7 +49,7 @@ func New(cache *authncache.Cache, log logr.Logger) (*Proxy, error) {
}) })
} }
func newInternal(cache *authncache.Cache, log logr.Logger, getConfig func() (*rest.Config, error)) (*Proxy, error) { func newInternal(cache *authncache.Cache, log logr.Logger, getConfig func() (*rest.Config, error)) (*proxy, error) {
kubeconfig, err := getConfig() kubeconfig, err := getConfig()
if err != nil { if err != nil {
return nil, fmt.Errorf("could not get in-cluster config: %w", err) return nil, fmt.Errorf("could not get in-cluster config: %w", err)
@ -71,17 +71,17 @@ func newInternal(cache *authncache.Cache, log logr.Logger, getConfig func() (*re
return nil, fmt.Errorf("could not get in-cluster transport: %w", err) return nil, fmt.Errorf("could not get in-cluster transport: %w", err)
} }
proxy := httputil.NewSingleHostReverseProxy(serverURL) reverseProxy := httputil.NewSingleHostReverseProxy(serverURL)
proxy.Transport = kubeRoundTripper reverseProxy.Transport = kubeRoundTripper
return &Proxy{ return &proxy{
cache: cache, cache: cache,
proxy: proxy, proxy: reverseProxy,
log: log, log: log,
}, nil }, nil
} }
func (p *Proxy) ServeHTTP(w http.ResponseWriter, r *http.Request) { func (p *proxy) ServeHTTP(w http.ResponseWriter, r *http.Request) {
log := p.log.WithValues( log := p.log.WithValues(
"url", r.URL.String(), "url", r.URL.String(),
"method", r.Method, "method", r.Method,

View File

@ -6,19 +6,10 @@ package server
import ( import (
"context" "context"
"crypto/tls"
"crypto/x509/pkix"
"fmt" "fmt"
"io" "io"
"net/http"
"time" "time"
"k8s.io/apimachinery/pkg/util/intstr"
v1 "k8s.io/api/core/v1"
"go.pinniped.dev/internal/kubeclient"
"github.com/spf13/cobra" "github.com/spf13/cobra"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime"
@ -27,15 +18,11 @@ import (
utilruntime "k8s.io/apimachinery/pkg/util/runtime" utilruntime "k8s.io/apimachinery/pkg/util/runtime"
genericapiserver "k8s.io/apiserver/pkg/server" genericapiserver "k8s.io/apiserver/pkg/server"
genericoptions "k8s.io/apiserver/pkg/server/options" genericoptions "k8s.io/apiserver/pkg/server/options"
"k8s.io/klog/v2"
"k8s.io/klog/v2/klogr"
loginapi "go.pinniped.dev/generated/1.20/apis/concierge/login" loginapi "go.pinniped.dev/generated/1.20/apis/concierge/login"
loginv1alpha1 "go.pinniped.dev/generated/1.20/apis/concierge/login/v1alpha1" loginv1alpha1 "go.pinniped.dev/generated/1.20/apis/concierge/login/v1alpha1"
"go.pinniped.dev/internal/certauthority"
"go.pinniped.dev/internal/certauthority/dynamiccertauthority" "go.pinniped.dev/internal/certauthority/dynamiccertauthority"
"go.pinniped.dev/internal/concierge/apiserver" "go.pinniped.dev/internal/concierge/apiserver"
"go.pinniped.dev/internal/concierge/impersonator"
"go.pinniped.dev/internal/config/concierge" "go.pinniped.dev/internal/config/concierge"
"go.pinniped.dev/internal/controller/authenticator/authncache" "go.pinniped.dev/internal/controller/authenticator/authncache"
"go.pinniped.dev/internal/controllermanager" "go.pinniped.dev/internal/controllermanager"
@ -175,64 +162,6 @@ func (a *App) runServer(ctx context.Context) error {
return fmt.Errorf("could not create aggregated API server: %w", err) return fmt.Errorf("could not create aggregated API server: %w", err)
} }
client, err := kubeclient.New()
if err != nil {
plog.WarningErr("could not create client", err)
} else {
appNameLabel := cfg.Labels["app"]
loadBalancer := v1.Service{
Spec: v1.ServiceSpec{
Type: "LoadBalancer",
Ports: []v1.ServicePort{
{
TargetPort: intstr.FromInt(8444),
Port: 443,
Protocol: v1.ProtocolTCP,
},
},
Selector: map[string]string{"app": appNameLabel},
},
ObjectMeta: metav1.ObjectMeta{
Name: "impersonation-proxy-load-balancer",
Namespace: podInfo.Namespace,
Labels: cfg.Labels,
},
}
_, err = client.Kubernetes.CoreV1().Services(podInfo.Namespace).Create(ctx, &loadBalancer, metav1.CreateOptions{})
if err != nil {
plog.WarningErr("could not create load balancer", err)
}
}
// run proxy handler
impersonationCA, err := certauthority.New(pkix.Name{CommonName: "test CA"}, 24*time.Hour)
if err != nil {
return fmt.Errorf("could not create impersonation CA: %w", err)
}
impersonationCert, err := impersonationCA.Issue(pkix.Name{}, []string{"impersonation-proxy"}, nil, 24*time.Hour)
if err != nil {
return fmt.Errorf("could not create impersonation cert: %w", err)
}
impersonationProxy, err := impersonator.New(authenticators, klogr.New().WithName("impersonation-proxy"))
if err != nil {
return fmt.Errorf("could not create impersonation proxy: %w", err)
}
impersonationProxyServer := http.Server{
Addr: "0.0.0.0:8444",
Handler: impersonationProxy,
TLSConfig: &tls.Config{
MinVersion: tls.VersionTLS12,
Certificates: []tls.Certificate{*impersonationCert},
},
}
// todo store CA, cert etc. on the authenticator status
go func() {
if err := impersonationProxyServer.ListenAndServeTLS("", ""); err != nil {
klog.ErrorS(err, "could not serve impersonation proxy")
}
}()
// Run the server. Its post-start hook will start the controllers. // Run the server. Its post-start hook will start the controllers.
return server.GenericAPIServer.PrepareRun().Run(ctx.Done()) return server.GenericAPIServer.PrepareRun().Run(ctx.Done())
} }

View File

@ -0,0 +1,222 @@
// Copyright 2021 the Pinniped contributors. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
package impersonatorconfig
import (
"crypto/tls"
"crypto/x509/pkix"
"errors"
"fmt"
"net"
"net/http"
"time"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
corev1informers "k8s.io/client-go/informers/core/v1"
"k8s.io/client-go/kubernetes"
"go.pinniped.dev/internal/certauthority"
"go.pinniped.dev/internal/clusterhost"
"go.pinniped.dev/internal/concierge/impersonator"
pinnipedcontroller "go.pinniped.dev/internal/controller"
"go.pinniped.dev/internal/controllerlib"
"go.pinniped.dev/internal/plog"
)
const (
impersonationProxyPort = ":8444"
)
type impersonatorConfigController struct {
namespace string
configMapResourceName string
k8sClient kubernetes.Interface
configMapsInformer corev1informers.ConfigMapInformer
generatedLoadBalancerServiceName string
startTLSListenerFunc StartTLSListenerFunc
httpHandlerFactory func() (http.Handler, error)
server *http.Server
hasControlPlaneNodes *bool
}
type StartTLSListenerFunc func(network, listenAddress string, config *tls.Config) (net.Listener, error)
func NewImpersonatorConfigController(
namespace string,
configMapResourceName string,
k8sClient kubernetes.Interface,
configMapsInformer corev1informers.ConfigMapInformer,
withInformer pinnipedcontroller.WithInformerOptionFunc,
withInitialEvent pinnipedcontroller.WithInitialEventOptionFunc,
generatedLoadBalancerServiceName string,
startTLSListenerFunc StartTLSListenerFunc,
httpHandlerFactory func() (http.Handler, error),
) controllerlib.Controller {
return controllerlib.New(
controllerlib.Config{
Name: "impersonator-config-controller",
Syncer: &impersonatorConfigController{
namespace: namespace,
configMapResourceName: configMapResourceName,
k8sClient: k8sClient,
configMapsInformer: configMapsInformer,
generatedLoadBalancerServiceName: generatedLoadBalancerServiceName,
startTLSListenerFunc: startTLSListenerFunc,
httpHandlerFactory: httpHandlerFactory,
},
},
withInformer(
configMapsInformer,
pinnipedcontroller.NameAndNamespaceExactMatchFilterFactory(configMapResourceName, namespace),
controllerlib.InformerOption{},
),
// Be sure to run once even if the ConfigMap that the informer is watching doesn't exist.
withInitialEvent(controllerlib.Key{
Namespace: namespace,
Name: configMapResourceName,
}),
)
}
func (c *impersonatorConfigController) Sync(ctx controllerlib.Context) error {
plog.Info("impersonatorConfigController Sync")
configMap, err := c.configMapsInformer.Lister().ConfigMaps(c.namespace).Get(c.configMapResourceName)
notFound := k8serrors.IsNotFound(err)
if err != nil && !notFound {
return fmt.Errorf("failed to get %s/%s configmap: %w", c.namespace, c.configMapResourceName, err)
}
var config *impersonator.Config
if notFound {
plog.Info("Did not find impersonation proxy config: using default config values",
"configmap", c.configMapResourceName,
"namespace", c.namespace,
)
config = impersonator.NewConfig() // use default configuration options
} else {
config, err = impersonator.ConfigFromConfigMap(configMap)
if err != nil {
return fmt.Errorf("invalid impersonator configuration: %v", err)
}
plog.Info("Read impersonation proxy config",
"configmap", c.configMapResourceName,
"namespace", c.namespace,
)
}
// Make a live API call to avoid the cost of having an informer watch all node changes on the cluster,
// since there could be lots and we don't especially care about node changes.
// Once we have concluded that there is or is not a visible control plane, then cache that decision
// to avoid listing nodes very often.
if c.hasControlPlaneNodes == nil {
hasControlPlaneNodes, err := clusterhost.New(c.k8sClient).HasControlPlaneNodes(ctx.Context)
if err != nil {
return err
}
c.hasControlPlaneNodes = &hasControlPlaneNodes
plog.Debug("Queried for control plane nodes", "foundControlPlaneNodes", hasControlPlaneNodes)
}
if (config.Mode == impersonator.ModeAuto && !*c.hasControlPlaneNodes) || config.Mode == impersonator.ModeEnabled {
if err = c.startImpersonator(); err != nil {
return err
}
} else {
if err = c.stopImpersonator(); err != nil {
return err
}
}
// TODO when the proxy is going to run, and the endpoint goes from being not specified to being specified, then the LoadBalancer is deleted
// TODO when the proxy is going to run, and when the endpoint goes from being specified to being not specified, then the LoadBalancer is created
// TODO when auto mode decides that the proxy should be disabled, then it also does not create the LoadBalancer (or it deletes it)
// client, err := kubeclient.New()
// if err != nil {
// plog.WarningErr("could not create client", err)
// } else {
// appNameLabel := cfg.Labels["app"]
// loadBalancer := v1.Service{
// Spec: v1.ServiceSpec{
// Type: "LoadBalancer",
// Ports: []v1.ServicePort{
// {
// TargetPort: intstr.FromInt(8444),
// Port: 443,
// Protocol: v1.ProtocolTCP,
// },
// },
// Selector: map[string]string{"app": appNameLabel},
// },
// ObjectMeta: metav1.ObjectMeta{
// Name: "impersonation-proxy-load-balancer",
// Namespace: podInfo.Namespace,
// Labels: cfg.Labels,
// },
// }
// _, err = client.Kubernetes.CoreV1().Services(podInfo.Namespace).Create(ctx, &loadBalancer, metav1.CreateOptions{})
// if err != nil {
// plog.WarningErr("could not create load balancer", err)
// }
// }
return nil
}
func (c *impersonatorConfigController) stopImpersonator() error {
if c.server != nil {
plog.Info("Stopping impersonation proxy", "port", impersonationProxyPort)
err := c.server.Close()
c.server = nil
if err != nil {
return err
}
}
return nil
}
func (c *impersonatorConfigController) startImpersonator() error {
if c.server != nil {
return nil
}
impersonationCA, err := certauthority.New(pkix.Name{CommonName: "test CA"}, 24*time.Hour)
if err != nil {
return fmt.Errorf("could not create impersonation CA: %w", err)
}
impersonationCert, err := impersonationCA.Issue(pkix.Name{}, []string{"impersonation-proxy"}, nil, 24*time.Hour)
if err != nil {
return fmt.Errorf("could not create impersonation cert: %w", err)
}
handler, err := c.httpHandlerFactory()
if err != nil {
return err
}
listener, err := c.startTLSListenerFunc("tcp", impersonationProxyPort, &tls.Config{
MinVersion: tls.VersionTLS12, // Allow v1.2 because clients like the default `curl` on MacOS don't support 1.3 yet.
GetCertificate: func(info *tls.ClientHelloInfo) (*tls.Certificate, error) {
return impersonationCert, nil
},
})
if err != nil {
return err
}
c.server = &http.Server{Handler: handler}
go func() {
plog.Info("Starting impersonation proxy", "port", impersonationProxyPort)
err = c.server.Serve(listener)
if errors.Is(err, http.ErrServerClosed) {
plog.Info("The impersonation proxy server has shut down")
} else {
plog.Error("Unexpected shutdown of the impersonation proxy server", err)
}
}()
return nil
}

View File

@ -0,0 +1,535 @@
// Copyright 2021 the Pinniped contributors. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
package impersonatorconfig
import (
"context"
"crypto/tls"
"errors"
"fmt"
"io/ioutil"
"net"
"net/http"
"strings"
"testing"
"time"
"github.com/sclevine/spec"
"github.com/sclevine/spec/report"
"github.com/stretchr/testify/require"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
kubeinformers "k8s.io/client-go/informers"
kubernetesfake "k8s.io/client-go/kubernetes/fake"
coretesting "k8s.io/client-go/testing"
"k8s.io/client-go/tools/cache"
"go.pinniped.dev/internal/controllerlib"
"go.pinniped.dev/internal/here"
"go.pinniped.dev/internal/testutil"
)
type tlsListenerWrapper struct {
listener net.Listener
closeError error
}
func (t *tlsListenerWrapper) Accept() (net.Conn, error) {
return t.listener.Accept()
}
func (t *tlsListenerWrapper) Close() error {
if t.closeError != nil {
// Really close the connection and then "pretend" that there was an error during close.
_ = t.listener.Close()
return t.closeError
}
return t.listener.Close()
}
func (t *tlsListenerWrapper) Addr() net.Addr {
return t.listener.Addr()
}
func TestImpersonatorConfigControllerOptions(t *testing.T) {
spec.Run(t, "options", func(t *testing.T, when spec.G, it spec.S) {
const installedInNamespace = "some-namespace"
const configMapResourceName = "some-configmap-resource-name"
const generatedLoadBalancerServiceName = "some-service-resource-name"
var r *require.Assertions
var observableWithInformerOption *testutil.ObservableWithInformerOption
var observableWithInitialEventOption *testutil.ObservableWithInitialEventOption
var configMapsInformerFilter controllerlib.Filter
it.Before(func() {
r = require.New(t)
observableWithInformerOption = testutil.NewObservableWithInformerOption()
observableWithInitialEventOption = testutil.NewObservableWithInitialEventOption()
configMapsInformer := kubeinformers.NewSharedInformerFactory(nil, 0).Core().V1().ConfigMaps()
_ = NewImpersonatorConfigController(
installedInNamespace,
configMapResourceName,
nil,
configMapsInformer,
observableWithInformerOption.WithInformer,
observableWithInitialEventOption.WithInitialEvent,
generatedLoadBalancerServiceName,
nil,
nil,
)
configMapsInformerFilter = observableWithInformerOption.GetFilterForInformer(configMapsInformer)
})
when("watching ConfigMap objects", func() {
var subject controllerlib.Filter
var target, wrongNamespace, wrongName, unrelated *corev1.ConfigMap
it.Before(func() {
subject = configMapsInformerFilter
target = &corev1.ConfigMap{ObjectMeta: metav1.ObjectMeta{Name: configMapResourceName, Namespace: installedInNamespace}}
wrongNamespace = &corev1.ConfigMap{ObjectMeta: metav1.ObjectMeta{Name: configMapResourceName, Namespace: "wrong-namespace"}}
wrongName = &corev1.ConfigMap{ObjectMeta: metav1.ObjectMeta{Name: "wrong-name", Namespace: installedInNamespace}}
unrelated = &corev1.ConfigMap{ObjectMeta: metav1.ObjectMeta{Name: "wrong-name", Namespace: "wrong-namespace"}}
})
when("the target ConfigMap changes", func() {
it("returns true to trigger the sync method", func() {
r.True(subject.Add(target))
r.True(subject.Update(target, unrelated))
r.True(subject.Update(unrelated, target))
r.True(subject.Delete(target))
})
})
when("a ConfigMap from another namespace changes", func() {
it("returns false to avoid triggering the sync method", func() {
r.False(subject.Add(wrongNamespace))
r.False(subject.Update(wrongNamespace, unrelated))
r.False(subject.Update(unrelated, wrongNamespace))
r.False(subject.Delete(wrongNamespace))
})
})
when("a ConfigMap with a different name changes", func() {
it("returns false to avoid triggering the sync method", func() {
r.False(subject.Add(wrongName))
r.False(subject.Update(wrongName, unrelated))
r.False(subject.Update(unrelated, wrongName))
r.False(subject.Delete(wrongName))
})
})
when("a ConfigMap with a different name and a different namespace changes", func() {
it("returns false to avoid triggering the sync method", func() {
r.False(subject.Add(unrelated))
r.False(subject.Update(unrelated, unrelated))
r.False(subject.Delete(unrelated))
})
})
})
when("starting up", func() {
it("asks for an initial event because the ConfigMap may not exist yet and it needs to run anyway", func() {
r.Equal(&controllerlib.Key{
Namespace: installedInNamespace,
Name: configMapResourceName,
}, observableWithInitialEventOption.GetInitialEventKey())
})
})
}, spec.Parallel(), spec.Report(report.Terminal{}))
}
func TestImpersonatorConfigControllerSync(t *testing.T) {
spec.Run(t, "Sync", func(t *testing.T, when spec.G, it spec.S) {
const installedInNamespace = "some-namespace"
const configMapResourceName = "some-configmap-resource-name"
const generatedLoadBalancerServiceName = "some-service-resource-name"
var r *require.Assertions
var subject controllerlib.Controller
var kubeAPIClient *kubernetesfake.Clientset
var kubeInformerClient *kubernetesfake.Clientset
var kubeInformers kubeinformers.SharedInformerFactory
var timeoutContext context.Context
var timeoutContextCancel context.CancelFunc
var syncContext *controllerlib.Context
var startTLSListenerFuncWasCalled int
var startTLSListenerFuncError error
var startTLSListenerUponCloseError error
var httpHanderFactoryFuncError error
var startedTLSListener net.Listener
var startTLSListenerFunc = func(network, listenAddress string, config *tls.Config) (net.Listener, error) {
startTLSListenerFuncWasCalled++
r.Equal("tcp", network)
r.Equal(":8444", listenAddress)
r.Equal(uint16(tls.VersionTLS12), config.MinVersion)
if startTLSListenerFuncError != nil {
return nil, startTLSListenerFuncError
}
var err error
//nolint: gosec // Intentionally binding to all network interfaces.
startedTLSListener, err = tls.Listen(network, ":0", config) // automatically choose the port for unit tests
r.NoError(err)
return &tlsListenerWrapper{listener: startedTLSListener, closeError: startTLSListenerUponCloseError}, nil
}
var closeTLSListener = func() {
if startedTLSListener != nil {
err := startedTLSListener.Close()
// Ignore when the production code has already closed the server because there is nothing to
// clean up in that case.
if err != nil && !strings.Contains(err.Error(), "use of closed network connection") {
r.NoError(err)
}
}
}
var requireTLSServerIsRunning = func() {
r.Greater(startTLSListenerFuncWasCalled, 0)
tr := &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, //nolint:gosec // TODO once we're using certs, do not skip verify
}
client := &http.Client{Transport: tr}
url := "https://" + startedTLSListener.Addr().String()
req, err := http.NewRequestWithContext(context.Background(), "GET", url, nil)
r.NoError(err)
resp, err := client.Do(req)
r.NoError(err)
r.Equal(http.StatusOK, resp.StatusCode)
body, err := ioutil.ReadAll(resp.Body)
r.NoError(resp.Body.Close())
r.NoError(err)
r.Equal("hello world", string(body))
}
var requireTLSServerIsNoLongerRunning = func() {
r.Greater(startTLSListenerFuncWasCalled, 0)
_, err := tls.Dial(
startedTLSListener.Addr().Network(),
startedTLSListener.Addr().String(),
&tls.Config{InsecureSkipVerify: true}, //nolint:gosec // TODO once we're using certs, do not skip verify
)
r.Error(err)
r.Regexp(`dial tcp \[::\]:[0-9]+: connect: connection refused`, err.Error())
}
var requireTLSServerWasNeverStarted = func() {
r.Equal(0, startTLSListenerFuncWasCalled)
}
var waitForInformerCacheToSeeResourceVersion = func(informer cache.SharedIndexInformer, wantVersion string) {
r.Eventually(func() bool {
return informer.LastSyncResourceVersion() == wantVersion
}, 10*time.Second, time.Millisecond)
}
// Defer starting the informers until the last possible moment so that the
// nested Before's can keep adding things to the informer caches.
var startInformersAndController = func() {
// Set this at the last second to allow for injection of server override.
subject = NewImpersonatorConfigController(
installedInNamespace,
configMapResourceName,
kubeAPIClient,
kubeInformers.Core().V1().ConfigMaps(),
controllerlib.WithInformer,
controllerlib.WithInitialEvent,
generatedLoadBalancerServiceName,
startTLSListenerFunc,
func() (http.Handler, error) {
return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
_, err := fmt.Fprintf(w, "hello world")
r.NoError(err)
}), httpHanderFactoryFuncError
},
)
// Set this at the last second to support calling subject.Name().
syncContext = &controllerlib.Context{
Context: timeoutContext,
Name: subject.Name(),
Key: controllerlib.Key{
Namespace: installedInNamespace,
Name: configMapResourceName,
},
}
// Must start informers before calling TestRunSynchronously()
kubeInformers.Start(timeoutContext.Done())
controllerlib.TestRunSynchronously(t, subject)
}
var addImpersonatorConfigMapToTracker = func(resourceName, configYAML string) {
impersonatorConfigMap := &corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: resourceName,
Namespace: installedInNamespace,
// Note that this seems to be ignored by the informer during initial creation, so actually
// the informer will see this as resource version "". Leaving it here to express the intent
// that the initial version is version 0.
ResourceVersion: "0",
},
Data: map[string]string{
"config.yaml": configYAML,
},
}
r.NoError(kubeInformerClient.Tracker().Add(impersonatorConfigMap))
}
var updateImpersonatorConfigMapInTracker = func(resourceName, configYAML, newResourceVersion string) {
impersonatorConfigMap := &corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: resourceName,
Namespace: installedInNamespace,
// Different resource version compared to the initial version when this resource was created
// so we can tell when the informer cache has cached this newly updated version.
ResourceVersion: newResourceVersion,
},
Data: map[string]string{
"config.yaml": configYAML,
},
}
r.NoError(kubeInformerClient.Tracker().Update(
schema.GroupVersionResource{Version: "v1", Resource: "configmaps"},
impersonatorConfigMap,
installedInNamespace,
))
}
var addNodeWithRoleToTracker = func(role string) {
r.NoError(kubeAPIClient.Tracker().Add(
&corev1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: "node",
Labels: map[string]string{"kubernetes.io/node-role": role},
},
},
))
}
it.Before(func() {
r = require.New(t)
timeoutContext, timeoutContextCancel = context.WithTimeout(context.Background(), time.Second*3)
kubeInformerClient = kubernetesfake.NewSimpleClientset()
kubeInformers = kubeinformers.NewSharedInformerFactoryWithOptions(kubeInformerClient, 0,
kubeinformers.WithNamespace(installedInNamespace),
)
kubeAPIClient = kubernetesfake.NewSimpleClientset()
})
it.After(func() {
timeoutContextCancel()
closeTLSListener()
})
when("the ConfigMap does not yet exist in the installation namespace or it was deleted (defaults to auto mode)", func() {
it.Before(func() {
addImpersonatorConfigMapToTracker("some-other-ConfigMap", "foo: bar")
})
when("there are visible control plane nodes", func() {
it.Before(func() {
addNodeWithRoleToTracker("control-plane")
})
it("does not start the impersonator", func() {
startInformersAndController()
r.NoError(controllerlib.TestSync(t, subject, *syncContext))
requireTLSServerWasNeverStarted()
})
})
when("there are not visible control plane nodes", func() {
it.Before(func() {
addNodeWithRoleToTracker("worker")
})
it("automatically starts the impersonator", func() {
startInformersAndController()
r.NoError(controllerlib.TestSync(t, subject, *syncContext))
requireTLSServerIsRunning()
})
})
})
when("sync is called more than once", func() {
it.Before(func() {
addNodeWithRoleToTracker("worker")
})
it("only starts the impersonator once and only lists the cluster's nodes once", func() {
startInformersAndController()
r.NoError(controllerlib.TestSync(t, subject, *syncContext))
r.Equal(
[]coretesting.Action{
coretesting.NewListAction(
schema.GroupVersionResource{Version: "v1", Resource: "nodes"},
schema.GroupVersionKind{Group: "", Version: "v1", Kind: "Node"},
"",
metav1.ListOptions{}),
},
kubeAPIClient.Actions(),
)
r.NoError(controllerlib.TestSync(t, subject, *syncContext))
r.Equal(1, startTLSListenerFuncWasCalled) // wasn't started a second time
requireTLSServerIsRunning() // still running
r.Equal(1, len(kubeAPIClient.Actions())) // no new API calls
})
})
when("getting the control plane nodes returns an error, e.g. when there are no nodes", func() {
it("returns an error", func() {
startInformersAndController()
r.EqualError(controllerlib.TestSync(t, subject, *syncContext), "no nodes found")
requireTLSServerWasNeverStarted()
})
})
when("the http handler factory function returns an error", func() {
it.Before(func() {
addNodeWithRoleToTracker("worker")
httpHanderFactoryFuncError = errors.New("some factory error")
})
it("returns an error", func() {
startInformersAndController()
r.EqualError(controllerlib.TestSync(t, subject, *syncContext), "some factory error")
requireTLSServerWasNeverStarted()
})
})
when("the configmap is invalid", func() {
it.Before(func() {
addImpersonatorConfigMapToTracker(configMapResourceName, "not yaml")
})
it("returns an error", func() {
startInformersAndController()
r.EqualError(controllerlib.TestSync(t, subject, *syncContext), "invalid impersonator configuration: decode yaml: error unmarshaling JSON: while decoding JSON: json: cannot unmarshal string into Go value of type impersonator.Config")
requireTLSServerWasNeverStarted()
})
})
when("the ConfigMap is already in the installation namespace", func() {
when("the configuration is auto mode with an endpoint", func() {
it.Before(func() {
addImpersonatorConfigMapToTracker(configMapResourceName, here.Doc(`
mode: auto
endpoint: https://proxy.example.com:8443/
`),
)
})
when("there are visible control plane nodes", func() {
it.Before(func() {
addNodeWithRoleToTracker("control-plane")
})
it("does not start the impersonator", func() {
startInformersAndController()
r.NoError(controllerlib.TestSync(t, subject, *syncContext))
requireTLSServerWasNeverStarted()
})
})
when("there are not visible control plane nodes", func() {
it.Before(func() {
addNodeWithRoleToTracker("worker")
})
it("starts the impersonator according to the settings in the ConfigMap", func() {
startInformersAndController()
r.NoError(controllerlib.TestSync(t, subject, *syncContext))
requireTLSServerIsRunning()
})
})
})
when("the configuration is disabled mode", func() {
it.Before(func() {
addImpersonatorConfigMapToTracker(configMapResourceName, "mode: disabled")
addNodeWithRoleToTracker("worker")
})
it("does not start the impersonator", func() {
startInformersAndController()
r.NoError(controllerlib.TestSync(t, subject, *syncContext))
requireTLSServerWasNeverStarted()
})
})
when("the configuration is enabled mode", func() {
it.Before(func() {
addImpersonatorConfigMapToTracker(configMapResourceName, "mode: enabled")
addNodeWithRoleToTracker("control-plane")
})
it("starts the impersonator regardless of the visibility of control plane nodes", func() {
startInformersAndController()
r.NoError(controllerlib.TestSync(t, subject, *syncContext))
requireTLSServerIsRunning()
})
it("returns an error when the tls listener fails to start", func() {
startTLSListenerFuncError = errors.New("tls error")
startInformersAndController()
r.EqualError(controllerlib.TestSync(t, subject, *syncContext), "tls error")
})
})
when("the configuration switches from enabled to disabled mode", func() {
it.Before(func() {
addImpersonatorConfigMapToTracker(configMapResourceName, "mode: enabled")
addNodeWithRoleToTracker("control-plane")
})
it("starts the impersonator, then shuts it down, then starts it again", func() {
startInformersAndController()
r.NoError(controllerlib.TestSync(t, subject, *syncContext))
requireTLSServerIsRunning()
updateImpersonatorConfigMapInTracker(configMapResourceName, "mode: disabled", "1")
waitForInformerCacheToSeeResourceVersion(kubeInformers.Core().V1().ConfigMaps().Informer(), "1")
r.NoError(controllerlib.TestSync(t, subject, *syncContext))
requireTLSServerIsNoLongerRunning()
updateImpersonatorConfigMapInTracker(configMapResourceName, "mode: enabled", "2")
waitForInformerCacheToSeeResourceVersion(kubeInformers.Core().V1().ConfigMaps().Informer(), "2")
r.NoError(controllerlib.TestSync(t, subject, *syncContext))
requireTLSServerIsRunning()
})
when("there is an error while shutting down the server", func() {
it.Before(func() {
startTLSListenerUponCloseError = errors.New("fake server close error")
})
it("returns the error from the sync function", func() {
startInformersAndController()
r.NoError(controllerlib.TestSync(t, subject, *syncContext))
requireTLSServerIsRunning()
updateImpersonatorConfigMapInTracker(configMapResourceName, "mode: disabled", "1")
waitForInformerCacheToSeeResourceVersion(kubeInformers.Core().V1().ConfigMaps().Informer(), "1")
r.EqualError(controllerlib.TestSync(t, subject, *syncContext), "fake server close error")
requireTLSServerIsNoLongerRunning()
})
})
})
})
}, spec.Parallel(), spec.Report(report.Terminal{}))
}

View File

@ -7,7 +7,9 @@ package controllermanager
import ( import (
"context" "context"
"crypto/tls"
"fmt" "fmt"
"net/http"
"time" "time"
"k8s.io/apimachinery/pkg/util/clock" "k8s.io/apimachinery/pkg/util/clock"
@ -19,12 +21,14 @@ import (
pinnipedclientset "go.pinniped.dev/generated/1.20/client/concierge/clientset/versioned" pinnipedclientset "go.pinniped.dev/generated/1.20/client/concierge/clientset/versioned"
pinnipedinformers "go.pinniped.dev/generated/1.20/client/concierge/informers/externalversions" pinnipedinformers "go.pinniped.dev/generated/1.20/client/concierge/informers/externalversions"
"go.pinniped.dev/internal/apiserviceref" "go.pinniped.dev/internal/apiserviceref"
"go.pinniped.dev/internal/concierge/impersonator"
"go.pinniped.dev/internal/config/concierge" "go.pinniped.dev/internal/config/concierge"
"go.pinniped.dev/internal/controller/apicerts" "go.pinniped.dev/internal/controller/apicerts"
"go.pinniped.dev/internal/controller/authenticator/authncache" "go.pinniped.dev/internal/controller/authenticator/authncache"
"go.pinniped.dev/internal/controller/authenticator/cachecleaner" "go.pinniped.dev/internal/controller/authenticator/cachecleaner"
"go.pinniped.dev/internal/controller/authenticator/jwtcachefiller" "go.pinniped.dev/internal/controller/authenticator/jwtcachefiller"
"go.pinniped.dev/internal/controller/authenticator/webhookcachefiller" "go.pinniped.dev/internal/controller/authenticator/webhookcachefiller"
"go.pinniped.dev/internal/controller/impersonatorconfig"
"go.pinniped.dev/internal/controller/issuerconfig" "go.pinniped.dev/internal/controller/issuerconfig"
"go.pinniped.dev/internal/controller/kubecertagent" "go.pinniped.dev/internal/controller/kubecertagent"
"go.pinniped.dev/internal/controllerlib" "go.pinniped.dev/internal/controllerlib"
@ -271,6 +275,28 @@ func PrepareControllers(c *Config) (func(ctx context.Context), error) {
klogr.New(), klogr.New(),
), ),
singletonWorker, singletonWorker,
).
// The impersonation proxy configuration controllers dynamically configure the impersonation proxy feature.
WithController(
impersonatorconfig.NewImpersonatorConfigController(
c.ServerInstallationInfo.Namespace,
"pinniped-concierge-impersonation-proxy-config", // TODO this string should come from `c.NamesConfig`
client.Kubernetes,
informers.installationNamespaceK8s.Core().V1().ConfigMaps(),
controllerlib.WithInformer,
controllerlib.WithInitialEvent,
"pinniped-concierge-impersonation-proxy-load-balancer", // TODO this string should come from `c.NamesConfig`
tls.Listen,
func() (http.Handler, error) {
impersonationProxyHandler, err := impersonator.New(c.AuthenticatorCache, klogr.New().WithName("impersonation-proxy"))
if err != nil {
return nil, fmt.Errorf("could not create impersonation proxy: %w", err)
}
return impersonationProxyHandler, nil
},
),
singletonWorker,
) )
// Return a function which starts the informers and controllers. // Return a function which starts the informers and controllers.

View File

@ -18,8 +18,10 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest" "k8s.io/client-go/rest"
"sigs.k8s.io/yaml"
loginv1alpha1 "go.pinniped.dev/generated/1.20/apis/concierge/login/v1alpha1" loginv1alpha1 "go.pinniped.dev/generated/1.20/apis/concierge/login/v1alpha1"
"go.pinniped.dev/internal/concierge/impersonator"
"go.pinniped.dev/test/library" "go.pinniped.dev/test/library"
) )
@ -31,16 +33,16 @@ func TestImpersonationProxy(t *testing.T) {
return return
} }
ctx, cancel := context.WithTimeout(context.Background(), 6*time.Minute) ctx, cancel := context.WithTimeout(context.Background(), 10*time.Minute)
defer cancel() defer cancel()
// Create a client using the admin kubeconfig. // Create a client using the admin kubeconfig.
// adminClient := library.NewClientset(t) adminClient := library.NewKubernetesClientset(t)
// Create a WebhookAuthenticator. // Create a WebhookAuthenticator.
authenticator := library.CreateTestWebhookAuthenticator(ctx, t) authenticator := library.CreateTestWebhookAuthenticator(ctx, t)
// Find the address of the ClusterIP service. // The address of the ClusterIP service that points at the impersonation proxy's port
proxyServiceURL := fmt.Sprintf("https://%s-proxy.%s.svc.cluster.local", env.ConciergeAppName, env.ConciergeNamespace) proxyServiceURL := fmt.Sprintf("https://%s-proxy.%s.svc.cluster.local", env.ConciergeAppName, env.ConciergeNamespace)
t.Logf("making kubeconfig that points to %q", proxyServiceURL) t.Logf("making kubeconfig that points to %q", proxyServiceURL)
@ -56,20 +58,89 @@ func TestImpersonationProxy(t *testing.T) {
}, },
} }
clientset, err := kubernetes.NewForConfig(kubeconfig) impersonationProxyClient, err := kubernetes.NewForConfig(kubeconfig)
require.NoError(t, err, "unexpected failure from kubernetes.NewForConfig()") require.NoError(t, err, "unexpected failure from kubernetes.NewForConfig()")
// TODO if there is already a ConfigMap, remember its contents and delete it, which puts the proxy into its default settings
// TODO and in a t.Cleanup() if there was already a ConfigMap at the start of the test, then restore the original contents
if env.HasCapability(library.HasExternalLoadBalancerProvider) {
// Check that load balancer has been created
require.Eventually(t, func() bool {
return hasLoadBalancerService(ctx, t, adminClient, env.ConciergeNamespace)
}, 10*time.Second, 500*time.Millisecond)
} else {
// Check that no load balancer has been created
require.Never(t, func() bool {
return hasLoadBalancerService(ctx, t, adminClient, env.ConciergeNamespace)
}, 10*time.Second, 500*time.Millisecond)
// Check that we can't use the impersonation proxy to execute kubectl commands yet
_, err = impersonationProxyClient.CoreV1().Namespaces().List(ctx, metav1.ListOptions{})
require.EqualError(t, err, "Get \"https://pinniped-concierge-proxy.concierge.svc.cluster.local/api/v1/namespaces\": Service Unavailable")
// Create configuration to make the impersonation proxy turn on with a hard coded endpoint (without a LoadBalancer)
configMap := configMapForConfig(t, impersonator.Config{
Mode: impersonator.ModeEnabled,
Endpoint: proxyServiceURL,
TLS: nil,
})
_, err = adminClient.CoreV1().ConfigMaps(env.ConciergeNamespace).Create(ctx, &configMap, metav1.CreateOptions{})
require.NoError(t, err)
t.Cleanup(func() {
// TODO clean up the ConfigMap at the end of the test, and make sure that it happens before the t.Cleanup() above which is trying to restore the original ConfigMap
})
}
t.Run( t.Run(
"access as user", "access as user",
library.AccessAsUserTest(ctx, env.TestUser.ExpectedUsername, clientset), library.AccessAsUserTest(ctx, env.TestUser.ExpectedUsername, impersonationProxyClient),
) )
for _, group := range env.TestUser.ExpectedGroups { for _, group := range env.TestUser.ExpectedGroups {
group := group group := group
t.Run( t.Run(
"access as group "+group, "access as group "+group,
library.AccessAsGroupTest(ctx, group, clientset), library.AccessAsGroupTest(ctx, group, impersonationProxyClient),
) )
} }
// Update configuration to force the proxy to disabled mode
configMap := configMapForConfig(t, impersonator.Config{Mode: impersonator.ModeDisabled})
_, err = adminClient.CoreV1().ConfigMaps(env.ConciergeNamespace).Update(ctx, &configMap, metav1.UpdateOptions{})
require.NoError(t, err)
// Check that we can't use the impersonation proxy to execute kubectl commands again
_, err = impersonationProxyClient.CoreV1().Namespaces().List(ctx, metav1.ListOptions{})
require.EqualError(t, err, "Get \"https://pinniped-concierge-proxy.concierge.svc.cluster.local/api/v1/namespaces\": Service Unavailable")
// if env.HasCapability(library.HasExternalLoadBalancerProvider) {
// TODO we started the test with a load balancer, so after forcing the proxy to disable, assert that the LoadBalancer was deleted
// }
}
func configMapForConfig(t *testing.T, config impersonator.Config) corev1.ConfigMap {
configString, err := yaml.Marshal(config)
require.NoError(t, err)
configMap := corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{Name: "pinniped-concierge-impersonation-proxy-config"},
Data: map[string]string{
"config.yaml": string(configString),
}}
return configMap
}
func hasLoadBalancerService(ctx context.Context, t *testing.T, client kubernetes.Interface, namespace string) bool {
t.Helper()
services, err := client.CoreV1().Services(namespace).List(ctx, metav1.ListOptions{})
require.NoError(t, err)
for _, service := range services.Items {
if service.Spec.Type == corev1.ServiceTypeLoadBalancer {
return true
}
}
return false
} }
func makeImpersonationTestToken(t *testing.T, authenticator corev1.TypedLocalObjectReference) string { func makeImpersonationTestToken(t *testing.T, authenticator corev1.TypedLocalObjectReference) string {