WIP: start on publisher controller integration

This commit is contained in:
Andrew Keesler 2020-07-31 12:08:07 -04:00
parent 2e05e032ee
commit 52546fad90
6 changed files with 242 additions and 19 deletions

View File

@ -15,9 +15,9 @@
apiVersion: apiextensions.k8s.io/v1 apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition kind: CustomResourceDefinition
metadata: metadata:
name: logindiscoveryconfigs.suzerain-io.github.io name: logindiscoveryconfigs.placeholder.suzerain-io.github.io
spec: spec:
group: suzerain-io.github.io group: placeholder.suzerain-io.github.io
versions: versions:
#! Any changes to these schemas should also be reflected in the types.go file(s) #! Any changes to these schemas should also be reflected in the types.go file(s)
#! in https://github.com/suzerain-io/placeholder-name-api/tree/main/pkg/apis/placeholder #! in https://github.com/suzerain-io/placeholder-name-api/tree/main/pkg/apis/placeholder

View File

@ -38,6 +38,9 @@ rules:
- apiGroups: [""] - apiGroups: [""]
resources: [services] resources: [services]
verbs: [create, get, list, patch, update, watch] verbs: [create, get, list, patch, update, watch]
- apiGroups: [placeholder.suzerain-io.github.io]
resources: [logindiscoveryconfigs]
verbs: [create, get, list, update, watch]
--- ---
kind: RoleBinding kind: RoleBinding
apiVersion: rbac.authorization.k8s.io/v1 apiVersion: rbac.authorization.k8s.io/v1
@ -108,3 +111,27 @@ roleRef:
#! give permissions for a special configmap of CA bundles that is needed by aggregated api servers #! give permissions for a special configmap of CA bundles that is needed by aggregated api servers
name: extension-apiserver-authentication-reader name: extension-apiserver-authentication-reader
apiGroup: rbac.authorization.k8s.io apiGroup: rbac.authorization.k8s.io
---
kind: Role
apiVersion: rbac.authorization.k8s.io/v1
metadata:
name: #@ data.values.app_name + "-cluster-info-lister-watcher-role"
namespace: kube-public
rules:
- apiGroups: [""]
resources: [configmaps]
verbs: [list, watch] #! TODO: do we neeed a get here for the controller?
---
kind: RoleBinding
apiVersion: rbac.authorization.k8s.io/v1
metadata:
name: #@ data.values.app_name + "-cluster-info-lister-watcher-role-binding"
namespace: kube-public
subjects:
- kind: ServiceAccount
name: #@ data.values.app_name + "-service-account"
namespace: #@ data.values.namespace
roleRef:
kind: Role
name: #@ data.values.app_name + "-cluster-info-lister-watcher-role"
apiGroup: rbac.authorization.k8s.io

View File

@ -6,6 +6,7 @@ SPDX-License-Identifier: Apache-2.0
package apiserver package apiserver
import ( import (
"context"
"fmt" "fmt"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@ -58,6 +59,7 @@ type Config struct {
type ExtraConfig struct { type ExtraConfig struct {
Webhook authenticator.Token Webhook authenticator.Token
Issuer loginrequest.CertIssuer Issuer loginrequest.CertIssuer
StartControllersPostStartHook func(ctx context.Context)
} }
type PlaceHolderServer struct { type PlaceHolderServer struct {
@ -122,9 +124,17 @@ func (c completedConfig) New() (*PlaceHolderServer, error) {
return nil, fmt.Errorf("install API group error: %w", err) return nil, fmt.Errorf("install API group error: %w", err)
} }
s.GenericAPIServer.AddPostStartHookOrDie("place-holder-post-start-hook", s.GenericAPIServer.AddPostStartHookOrDie("start-controllers",
func(context genericapiserver.PostStartHookContext) error { func(postStartContext genericapiserver.PostStartHookContext) error {
klog.InfoS("post start hook", "foo", "bar") klog.InfoS("post start hook")
ctx, cancel := context.WithCancel(context.Background())
go func() {
<-postStartContext.StopCh
cancel()
}()
c.ExtraConfig.StartControllersPostStartHook(ctx)
return nil return nil
}, },
) )

View File

@ -23,8 +23,9 @@ import (
) )
const ( const (
ClusterInfoNamespace = "kube-public"
clusterInfoName = "cluster-info" clusterInfoName = "cluster-info"
clusterInfoNamespace = "kube-public"
clusterInfoConfigMapKey = "kubeconfig" clusterInfoConfigMapKey = "kubeconfig"
configName = "placeholder-name-config" configName = "placeholder-name-config"
@ -75,7 +76,7 @@ func NewPublisherController(
}, },
withInformer( withInformer(
configMapInformer, configMapInformer,
nameAndNamespaceExactMatchFilterFactory(clusterInfoName, clusterInfoNamespace), nameAndNamespaceExactMatchFilterFactory(clusterInfoName, ClusterInfoNamespace),
controller.InformerOption{}, controller.InformerOption{},
), ),
withInformer( withInformer(
@ -89,7 +90,7 @@ func NewPublisherController(
func (c *publisherController) Sync(ctx controller.Context) error { func (c *publisherController) Sync(ctx controller.Context) error {
configMap, err := c.configMapInformer. configMap, err := c.configMapInformer.
Lister(). Lister().
ConfigMaps(clusterInfoNamespace). ConfigMaps(ClusterInfoNamespace).
Get(clusterInfoName) Get(clusterInfoName)
notFound := k8serrors.IsNotFound(err) notFound := k8serrors.IsNotFound(err)
if err != nil && !notFound { if err != nil && !notFound {
@ -99,7 +100,7 @@ func (c *publisherController) Sync(ctx controller.Context) error {
klog.InfoS( klog.InfoS(
"could not find config map", "could not find config map",
"configmap", "configmap",
klog.KRef(clusterInfoNamespace, clusterInfoName), klog.KRef(ClusterInfoNamespace, clusterInfoName),
) )
return nil return nil
} }

View File

@ -25,20 +25,30 @@ import (
"k8s.io/apiserver/pkg/server/dynamiccertificates" "k8s.io/apiserver/pkg/server/dynamiccertificates"
genericoptions "k8s.io/apiserver/pkg/server/options" genericoptions "k8s.io/apiserver/pkg/server/options"
"k8s.io/apiserver/plugin/pkg/authenticator/token/webhook" "k8s.io/apiserver/plugin/pkg/authenticator/token/webhook"
k8sinformers "k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes"
corev1client "k8s.io/client-go/kubernetes/typed/core/v1"
restclient "k8s.io/client-go/rest" restclient "k8s.io/client-go/rest"
apiregistrationv1 "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1" apiregistrationv1 "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1"
aggregationv1client "k8s.io/kube-aggregator/pkg/client/clientset_generated/clientset" aggregationv1client "k8s.io/kube-aggregator/pkg/client/clientset_generated/clientset"
"github.com/suzerain-io/controller-go"
placeholderv1alpha1 "github.com/suzerain-io/placeholder-name-api/pkg/apis/placeholder/v1alpha1" placeholderv1alpha1 "github.com/suzerain-io/placeholder-name-api/pkg/apis/placeholder/v1alpha1"
placeholderclientset "github.com/suzerain-io/placeholder-name-client-go/pkg/generated/clientset/versioned"
placeholderinformers "github.com/suzerain-io/placeholder-name-client-go/pkg/generated/informers/externalversions"
"github.com/suzerain-io/placeholder-name/internal/apiserver" "github.com/suzerain-io/placeholder-name/internal/apiserver"
"github.com/suzerain-io/placeholder-name/internal/autoregistration" "github.com/suzerain-io/placeholder-name/internal/autoregistration"
"github.com/suzerain-io/placeholder-name/internal/certauthority" "github.com/suzerain-io/placeholder-name/internal/certauthority"
"github.com/suzerain-io/placeholder-name/internal/controller/logindiscovery"
"github.com/suzerain-io/placeholder-name/internal/downward" "github.com/suzerain-io/placeholder-name/internal/downward"
"github.com/suzerain-io/placeholder-name/pkg/config" "github.com/suzerain-io/placeholder-name/pkg/config"
) )
// TODO(akeesler): what should these controller settings be?
const (
defaultWorkers = 3
defaultResync = 20 * time.Minute
)
// App is an object that represents the placeholder-name-server application. // App is an object that represents the placeholder-name-server application.
type App struct { type App struct {
cmd *cobra.Command cmd *cobra.Command
@ -94,7 +104,15 @@ authenticating to the Kubernetes API.`,
return fmt.Errorf("could not initialize Kubernetes client: %w", err) return fmt.Errorf("could not initialize Kubernetes client: %w", err)
} }
return a.run(ctx, k8s.CoreV1(), aggregation) // Connect to the placeholder API.
// I think we can't use protobuf encoding here because we are using CRDs
// (for which protobuf encoding is not supported).
placeholder, err := placeholderclientset.NewForConfig(kubeConfig)
if err != nil {
return fmt.Errorf("could not initialize placeholder client: %w", err)
}
return a.run(ctx, k8s, aggregation, placeholder)
}, },
Args: cobra.NoArgs, Args: cobra.NoArgs,
} }
@ -143,8 +161,9 @@ func (a *App) Run() error {
func (a *App) run( func (a *App) run(
ctx context.Context, ctx context.Context,
k8s corev1client.CoreV1Interface, k8s kubernetes.Interface,
aggregation aggregationv1client.Interface, aggregation aggregationv1client.Interface,
placeholder placeholderclientset.Interface,
) error { ) error {
cfg, err := config.FromPath(a.configPath) cfg, err := config.FromPath(a.configPath)
if err != nil { if err != nil {
@ -213,7 +232,7 @@ func (a *App) run(
}, },
} }
if err := autoregistration.Setup(ctx, autoregistration.SetupOptions{ if err := autoregistration.Setup(ctx, autoregistration.SetupOptions{
CoreV1: k8s, CoreV1: k8s.CoreV1(),
AggregationV1: aggregation, AggregationV1: aggregation,
Namespace: podinfo.Namespace, Namespace: podinfo.Namespace,
ServiceTemplate: service, ServiceTemplate: service,
@ -222,7 +241,13 @@ func (a *App) run(
return fmt.Errorf("could not register API service: %w", err) return fmt.Errorf("could not register API service: %w", err)
} }
apiServerConfig, err := a.ConfigServer(cert, webhookTokenAuthenticator, clientCA) cmrf := wireControllerManagerRunFunc(podinfo, k8s, placeholder)
apiServerConfig, err := a.configServer(
cert,
webhookTokenAuthenticator,
clientCA,
cmrf,
)
if err != nil { if err != nil {
return err return err
} }
@ -235,7 +260,12 @@ func (a *App) run(
return server.GenericAPIServer.PrepareRun().Run(ctx.Done()) return server.GenericAPIServer.PrepareRun().Run(ctx.Done())
} }
func (a *App) ConfigServer(cert *tls.Certificate, webhookTokenAuthenticator *webhook.WebhookTokenAuthenticator, ca *certauthority.CA) (*apiserver.Config, error) { func (a *App) configServer(
cert *tls.Certificate,
webhookTokenAuthenticator *webhook.WebhookTokenAuthenticator,
ca *certauthority.CA,
startControllersPostStartHook func(context.Context),
) (*apiserver.Config, error) {
provider, err := createStaticCertKeyProvider(cert) provider, err := createStaticCertKeyProvider(cert)
if err != nil { if err != nil {
return nil, fmt.Errorf("could not create static cert key provider: %w", err) return nil, fmt.Errorf("could not create static cert key provider: %w", err)
@ -252,6 +282,7 @@ func (a *App) ConfigServer(cert *tls.Certificate, webhookTokenAuthenticator *web
ExtraConfig: apiserver.ExtraConfig{ ExtraConfig: apiserver.ExtraConfig{
Webhook: webhookTokenAuthenticator, Webhook: webhookTokenAuthenticator,
Issuer: ca, Issuer: ca,
StartControllersPostStartHook: startControllersPostStartHook,
}, },
} }
return apiServerConfig, nil return apiServerConfig, nil
@ -290,3 +321,41 @@ func createStaticCertKeyProvider(cert *tls.Certificate) (dynamiccertificates.Cer
return dynamiccertificates.NewStaticCertKeyContent("some-name???", certChainPEM, privateKeyPEM) return dynamiccertificates.NewStaticCertKeyContent("some-name???", certChainPEM, privateKeyPEM)
} }
func wireControllerManagerRunFunc(
podinfo *downward.PodInfo,
k8s kubernetes.Interface,
placeholder placeholderclientset.Interface,
) func(ctx context.Context) {
k8sInformers := k8sinformers.NewSharedInformerFactoryWithOptions(
k8s,
defaultResync,
k8sinformers.WithNamespace(
logindiscovery.ClusterInfoNamespace,
),
)
placeholderInformers := placeholderinformers.NewSharedInformerFactoryWithOptions(
placeholder,
defaultResync,
placeholderinformers.WithNamespace(
"integration", // TODO(akeesler): unhardcode this.
),
)
cm := controller.
NewManager().
WithController(
logindiscovery.NewPublisherController(
podinfo.Namespace,
placeholder,
k8sInformers.Core().V1().ConfigMaps(),
placeholderInformers.Placeholder().V1alpha1().LoginDiscoveryConfigs(),
controller.WithInformer,
),
defaultWorkers,
)
return func(ctx context.Context) {
k8sInformers.Start(ctx.Done())
placeholderInformers.Start(ctx.Done())
go cm.Start(ctx)
}
}

View File

@ -0,0 +1,116 @@
/*
Copyright 2020 VMware, Inc.
SPDX-License-Identifier: Apache-2.0
*/
package integration
import (
"context"
"encoding/base64"
"os"
"testing"
"time"
"github.com/stretchr/testify/require"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/rest"
placeholderv1alpha1 "github.com/suzerain-io/placeholder-name-api/pkg/apis/placeholder/v1alpha1"
"github.com/suzerain-io/placeholder-name/test/library"
)
func TestSuccessfulLoginDiscoveryConfig(t *testing.T) {
namespaceName := os.Getenv("PLACEHOLDER_NAME_NAMESPACE")
require.NotEmptyf(t, namespaceName, "must specify PLACEHOLDER_NAME_NAMESPACE env var for integration tests")
client := library.NewPlaceholderNameClientset(t)
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
// TODO(akeesler): is there a race here between this test running and the
// placeholder-name-server creating the CR?
config := library.NewClientConfig(t)
expectedLDC := getExpectedLDC(namespaceName, config)
configList, err := client.
PlaceholderV1alpha1().
LoginDiscoveryConfigs(namespaceName).
List(ctx, metav1.ListOptions{})
require.NoError(t, err)
require.Len(t, configList.Items, 1)
require.Equal(t, expectedLDC, configList.Items[0])
}
func TestReconcilingLoginDiscoveryConfig(t *testing.T) {
t.Skip()
namespaceName := os.Getenv("PLACEHOLDER_NAME_NAMESPACE")
require.NotEmptyf(t, namespaceName, "must specify PLACEHOLDER_NAME_NAMESPACE env var for integration tests")
client := library.NewPlaceholderNameClientset(t)
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
// TODO(akeesler): is there a race here between this test running and the
// placeholder-name-server creating the CR?
w, err := client.
PlaceholderV1alpha1().
LoginDiscoveryConfigs(namespaceName).
Watch(ctx, metav1.ListOptions{})
require.NoError(t, err)
err = client.
PlaceholderV1alpha1().
LoginDiscoveryConfigs(namespaceName).
Delete(ctx, "placeholder-name-config", metav1.DeleteOptions{})
require.NoError(t, err)
config := library.NewClientConfig(t)
expectedLDC := getExpectedLDC(namespaceName, config)
received := func(et watch.EventType, o runtime.Object) func() bool {
return func() bool {
select {
case e := <-w.ResultChan():
require.Equal(t, et, e.Type)
require.Equal(t, o, e.Object)
return true
default:
return false
}
}
}
require.Eventually(
t,
received(watch.Deleted, expectedLDC),
time.Second,
3*time.Second,
)
require.Eventually(
t,
received(watch.Added, expectedLDC),
time.Second,
3*time.Second,
)
}
func getExpectedLDC(
namespaceName string,
config *rest.Config,
) *placeholderv1alpha1.LoginDiscoveryConfig {
return &placeholderv1alpha1.LoginDiscoveryConfig{
ObjectMeta: metav1.ObjectMeta{
Name: "placeholder-name-config",
Namespace: namespaceName,
},
Spec: placeholderv1alpha1.LoginDiscoveryConfigSpec{
Server: config.Host,
CertificateAuthorityData: base64.StdEncoding.EncodeToString(config.TLSClientConfig.CAData),
},
}
}