Took care of some impersonation cluster ip related todos

This commit is contained in:
Margo Crawford 2021-05-20 11:57:07 -07:00
parent ec25259901
commit 62651eddb0
2 changed files with 239 additions and 61 deletions

View File

@ -246,11 +246,11 @@ func (c *impersonatorConfigController) doSync(syncCtx controllerlib.Context, cre
if err = c.ensureClusterIPServiceIsStarted(ctx, impersonationSpec); err != nil {
return nil, err
}
} // else { // TODO test stopping the cluster ip service
// if err = c.ensureClusterIPServiceIsStopped(ctx); err != nil {
// return nil, err
// }
//}
} else {
if err = c.ensureClusterIPServiceIsStopped(ctx); err != nil {
return nil, err
}
}
nameInfo, err := c.findDesiredTLSCertificateName(impersonationSpec)
if err != nil {
@ -341,6 +341,18 @@ func (c *impersonatorConfigController) loadBalancerExists() (bool, error) {
return true, nil
}
func (c *impersonatorConfigController) clusterIPExists() (bool, error) {
_, err := c.servicesInformer.Lister().Services(c.namespace).Get(c.generatedClusterIPServiceName)
notFound := k8serrors.IsNotFound(err)
if notFound {
return false, nil
}
if err != nil {
return false, err
}
return true, nil
}
func (c *impersonatorConfigController) loadBalancerNeedsUpdate(config *v1alpha1.ImpersonationProxySpec) (bool, error) {
lb, err := c.servicesInformer.Lister().Services(c.namespace).Get(c.generatedLoadBalancerServiceName)
if err != nil {
@ -524,11 +536,14 @@ func (c *impersonatorConfigController) ensureClusterIPServiceIsStarted(ctx conte
Annotations: config.Service.Annotations,
},
}
//running, err := c.ClusterIPExists() // TODO test that clusterip is only created once
//if err != nil {
// return err
running, _ := c.clusterIPExists()
if running {
return nil
}
// if err != nil {
// return err // TODO test this error case
//}
//if running {
// if running {
// needsUpdate, err := c.ClusterIPNeedsUpdate(config) // TODO test updating annotations on clusterip
// if err != nil {
// return err
@ -541,7 +556,7 @@ func (c *impersonatorConfigController) ensureClusterIPServiceIsStarted(ctx conte
// return err
// }
// return nil
//}
// }
plog.Info("creating cluster ip for impersonation proxy",
"service", c.generatedClusterIPServiceName,
"namespace", c.namespace)
@ -549,6 +564,21 @@ func (c *impersonatorConfigController) ensureClusterIPServiceIsStarted(ctx conte
return err
}
func (c *impersonatorConfigController) ensureClusterIPServiceIsStopped(ctx context.Context) error {
running, err := c.clusterIPExists()
if err != nil {
return err
}
if !running {
return nil
}
plog.Info("Deleting cluster ip for impersonation proxy",
"service", c.generatedClusterIPServiceName,
"namespace", c.namespace)
return c.k8sClient.CoreV1().Services(c.namespace).Delete(ctx, c.generatedClusterIPServiceName, metav1.DeleteOptions{})
}
func (c *impersonatorConfigController) ensureTLSSecret(ctx context.Context, nameInfo *certNameInfo, ca *certauthority.CA) error {
secretFromInformer, err := c.secretsInformer.Lister().Secrets(c.namespace).Get(c.tlsSecretName)
notFound := k8serrors.IsNotFound(err)
@ -750,10 +780,9 @@ func (c *impersonatorConfigController) findDesiredTLSCertificateName(config *v1a
// - clusterip and no external endpoint
if config.ExternalEndpoint != "" {
return c.findTLSCertificateNameFromEndpointConfig(config), nil
} // else if config.Service.Type == v1alpha1.ImpersonationProxyServiceTypeClusterIP {
// // c.findTLSCertificateNameFromClusterIPService()
// // TODO implement this
//}
} else if config.Service.Type == v1alpha1.ImpersonationProxyServiceTypeClusterIP {
return c.findTLSCertificateNameFromClusterIPService()
}
return c.findTLSCertificateNameFromLoadBalancer()
}
@ -801,6 +830,24 @@ func (c *impersonatorConfigController) findTLSCertificateNameFromLoadBalancer()
return nil, fmt.Errorf("could not find valid IP addresses or hostnames from load balancer %s/%s", c.namespace, lb.Name)
}
func (c *impersonatorConfigController) findTLSCertificateNameFromClusterIPService() (*certNameInfo, error) {
clusterIP, err := c.servicesInformer.Lister().Services(c.namespace).Get(c.generatedClusterIPServiceName)
notFound := k8serrors.IsNotFound(err)
if notFound {
// We aren't ready and will try again later in this case.
return &certNameInfo{ready: false}, nil
}
if err != nil {
return nil, err
}
ip := clusterIP.Spec.ClusterIP
if ip != "" {
parsedIP := net.ParseIP(ip)
return &certNameInfo{ready: true, selectedIP: parsedIP, clientEndpoint: ip}, nil
}
return &certNameInfo{ready: false}, nil
}
func (c *impersonatorConfigController) createNewTLSSecret(ctx context.Context, ca *certauthority.CA, ip net.IP, hostname string) (*v1.Secret, error) {
var hostnames []string
var ips []net.IP

View File

@ -632,6 +632,17 @@ func TestImpersonatorConfigControllerSync(t *testing.T) {
}
}
var newClusterIPService = func(resourceName string, status corev1.ServiceStatus, spec corev1.ServiceSpec) *corev1.Service {
return &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: resourceName,
Namespace: installedInNamespace,
},
Spec: spec,
Status: status,
}
}
// Anytime an object is added/updated/deleted in the informer's client *after* the informer is started, then we
// need to wait for the informer's cache to asynchronously pick up that change from its "watch".
// If an object is added to the informer's client *before* the informer is started, then waiting is
@ -728,6 +739,14 @@ func TestImpersonatorConfigControllerSync(t *testing.T) {
r.NoError(client.Tracker().Add(loadBalancerService))
}
var addClusterIPServiceToTracker = func(resourceName string, clusterIP string, client *kubernetesfake.Clientset) {
clusterIPService := newClusterIPService(resourceName, corev1.ServiceStatus{}, corev1.ServiceSpec{
Type: corev1.ServiceTypeClusterIP,
ClusterIP: clusterIP,
})
r.NoError(client.Tracker().Add(clusterIPService))
}
var addSecretToTrackers = func(secret *corev1.Secret, clients ...*kubernetesfake.Clientset) {
for _, client := range clients {
secretCopy := secret.DeepCopy()
@ -900,6 +919,15 @@ func TestImpersonatorConfigControllerSync(t *testing.T) {
r.Equal(labels, createdClusterIPService.Labels)
}
var requireClusterIPWasDeleted = func(action coretesting.Action) {
// TODO maybe de-dup this with loadbalancerwasdeleted
deleteAction, ok := action.(coretesting.DeleteAction)
r.True(ok, "should have been able to cast this action to DeleteAction: %v", action)
r.Equal("delete", deleteAction.GetVerb())
r.Equal(clusterIPServiceName, deleteAction.GetName())
r.Equal("services", deleteAction.GetResource().Resource)
}
var requireTLSSecretWasDeleted = func(action coretesting.Action) {
deleteAction, ok := action.(coretesting.DeleteAction)
r.True(ok, "should have been able to cast this action to DeleteAction: %v", action)
@ -1476,6 +1504,38 @@ func TestImpersonatorConfigControllerSync(t *testing.T) {
})
})
when("a clusterip already exists with ingress", func() {
const fakeIP = "127.0.0.123"
it.Before(func() {
addCredentialIssuerToTrackers(v1alpha1.CredentialIssuer{
ObjectMeta: metav1.ObjectMeta{Name: credentialIssuerResourceName},
Spec: v1alpha1.CredentialIssuerSpec{
ImpersonationProxy: &v1alpha1.ImpersonationProxySpec{
Mode: v1alpha1.ImpersonationProxyModeEnabled,
Service: v1alpha1.ImpersonationProxyServiceSpec{
Type: v1alpha1.ImpersonationProxyServiceTypeClusterIP,
},
},
},
}, pinnipedInformerClient, pinnipedAPIClient)
addNodeWithRoleToTracker("worker", kubeAPIClient)
addClusterIPServiceToTracker(clusterIPServiceName, fakeIP, kubeInformerClient)
addClusterIPServiceToTracker(clusterIPServiceName, fakeIP, kubeAPIClient)
})
it("starts the impersonator without creating a clusterip", func() {
startInformersAndController()
r.NoError(runControllerSync())
r.Len(kubeAPIClient.Actions(), 3)
requireNodesListed(kubeAPIClient.Actions()[0])
ca := requireCASecretWasCreated(kubeAPIClient.Actions()[1])
requireTLSSecretWasCreated(kubeAPIClient.Actions()[2], ca)
requireTLSServerIsRunning(ca, fakeIP, map[string]string{fakeIP + ":443": testServerAddr()})
requireCredentialIssuer(newSuccessStrategy(fakeIP, ca))
// requireSigningCertProviderHasLoadedCerts()
})
})
when("a load balancer and a secret already exists", func() {
var caCrt []byte
it.Before(func() {
@ -1988,65 +2048,136 @@ func TestImpersonatorConfigControllerSync(t *testing.T) {
})
when("the configuration switches from enabled to disabled mode", func() {
it.Before(func() {
addSecretToTrackers(signingCASecret, kubeInformerClient)
addCredentialIssuerToTrackers(v1alpha1.CredentialIssuer{
ObjectMeta: metav1.ObjectMeta{Name: credentialIssuerResourceName},
Spec: v1alpha1.CredentialIssuerSpec{
when("service type loadbalancer", func() {
it.Before(func() {
addSecretToTrackers(signingCASecret, kubeInformerClient)
addCredentialIssuerToTrackers(v1alpha1.CredentialIssuer{
ObjectMeta: metav1.ObjectMeta{Name: credentialIssuerResourceName},
Spec: v1alpha1.CredentialIssuerSpec{
ImpersonationProxy: &v1alpha1.ImpersonationProxySpec{
Mode: v1alpha1.ImpersonationProxyModeEnabled,
},
},
}, pinnipedInformerClient, pinnipedAPIClient)
addNodeWithRoleToTracker("worker", kubeAPIClient)
})
it("starts the impersonator and loadbalancer, then shuts it down, then starts it again", func() {
startInformersAndController()
r.NoError(runControllerSync())
requireTLSServerIsRunningWithoutCerts()
r.Len(kubeAPIClient.Actions(), 3)
requireNodesListed(kubeAPIClient.Actions()[0])
requireLoadBalancerWasCreated(kubeAPIClient.Actions()[1])
requireCASecretWasCreated(kubeAPIClient.Actions()[2])
requireCredentialIssuer(newPendingStrategy())
requireSigningCertProviderIsEmpty()
// Simulate the informer cache's background update from its watch.
addObjectFromCreateActionToInformerAndWait(kubeAPIClient.Actions()[1], kubeInformers.Core().V1().Services())
addObjectFromCreateActionToInformerAndWait(kubeAPIClient.Actions()[2], kubeInformers.Core().V1().Secrets())
// Update the CredentialIssuer.
updateCredentialIssuerInInformerAndWait(credentialIssuerResourceName, v1alpha1.CredentialIssuerSpec{
ImpersonationProxy: &v1alpha1.ImpersonationProxySpec{
Mode: v1alpha1.ImpersonationProxyModeDisabled,
},
}, pinnipedInformers.Config().V1alpha1().CredentialIssuers())
r.NoError(runControllerSync())
requireTLSServerIsNoLongerRunning()
r.Len(kubeAPIClient.Actions(), 4)
requireLoadBalancerWasDeleted(kubeAPIClient.Actions()[3])
requireCredentialIssuer(newManuallyDisabledStrategy())
requireSigningCertProviderIsEmpty()
deleteServiceFromTracker(loadBalancerServiceName, kubeInformerClient)
waitForObjectToBeDeletedFromInformer(loadBalancerServiceName, kubeInformers.Core().V1().Services())
// Update the CredentialIssuer again.
updateCredentialIssuerInInformerAndWait(credentialIssuerResourceName, v1alpha1.CredentialIssuerSpec{
ImpersonationProxy: &v1alpha1.ImpersonationProxySpec{
Mode: v1alpha1.ImpersonationProxyModeEnabled,
},
},
}, pinnipedInformerClient, pinnipedAPIClient)
addNodeWithRoleToTracker("worker", kubeAPIClient)
}, pinnipedInformers.Config().V1alpha1().CredentialIssuers())
r.NoError(runControllerSync())
requireTLSServerIsRunningWithoutCerts()
r.Len(kubeAPIClient.Actions(), 5)
requireLoadBalancerWasCreated(kubeAPIClient.Actions()[4])
requireCredentialIssuer(newPendingStrategy())
requireSigningCertProviderIsEmpty()
})
})
it("starts the impersonator and loadbalancer, then shuts it down, then starts it again", func() {
startInformersAndController()
when("service type clusterip", func() {
it.Before(func() {
addSecretToTrackers(signingCASecret, kubeInformerClient)
addCredentialIssuerToTrackers(v1alpha1.CredentialIssuer{
ObjectMeta: metav1.ObjectMeta{Name: credentialIssuerResourceName},
Spec: v1alpha1.CredentialIssuerSpec{
ImpersonationProxy: &v1alpha1.ImpersonationProxySpec{
Mode: v1alpha1.ImpersonationProxyModeEnabled,
Service: v1alpha1.ImpersonationProxyServiceSpec{
Type: v1alpha1.ImpersonationProxyServiceTypeClusterIP,
},
},
},
}, pinnipedInformerClient, pinnipedAPIClient)
addNodeWithRoleToTracker("worker", kubeAPIClient)
})
r.NoError(runControllerSync())
requireTLSServerIsRunningWithoutCerts()
r.Len(kubeAPIClient.Actions(), 3)
requireNodesListed(kubeAPIClient.Actions()[0])
requireLoadBalancerWasCreated(kubeAPIClient.Actions()[1])
requireCASecretWasCreated(kubeAPIClient.Actions()[2])
requireCredentialIssuer(newPendingStrategy())
requireSigningCertProviderIsEmpty()
it("starts the impersonator and clusterip, then shuts it down, then starts it again", func() {
startInformersAndController()
// Simulate the informer cache's background update from its watch.
addObjectFromCreateActionToInformerAndWait(kubeAPIClient.Actions()[1], kubeInformers.Core().V1().Services())
addObjectFromCreateActionToInformerAndWait(kubeAPIClient.Actions()[2], kubeInformers.Core().V1().Secrets())
r.NoError(runControllerSync())
requireTLSServerIsRunningWithoutCerts()
r.Len(kubeAPIClient.Actions(), 3)
requireNodesListed(kubeAPIClient.Actions()[0])
requireClusterIPWasCreated(kubeAPIClient.Actions()[1])
requireCASecretWasCreated(kubeAPIClient.Actions()[2])
requireCredentialIssuer(newPendingStrategy())
requireSigningCertProviderIsEmpty()
// Update the CredentialIssuer.
updateCredentialIssuerInInformerAndWait(credentialIssuerResourceName, v1alpha1.CredentialIssuerSpec{
ImpersonationProxy: &v1alpha1.ImpersonationProxySpec{
Mode: v1alpha1.ImpersonationProxyModeDisabled,
},
}, pinnipedInformers.Config().V1alpha1().CredentialIssuers())
// Simulate the informer cache's background update from its watch.
addObjectFromCreateActionToInformerAndWait(kubeAPIClient.Actions()[1], kubeInformers.Core().V1().Services())
addObjectFromCreateActionToInformerAndWait(kubeAPIClient.Actions()[2], kubeInformers.Core().V1().Secrets())
r.NoError(runControllerSync())
requireTLSServerIsNoLongerRunning()
r.Len(kubeAPIClient.Actions(), 4)
requireLoadBalancerWasDeleted(kubeAPIClient.Actions()[3])
requireCredentialIssuer(newManuallyDisabledStrategy())
requireSigningCertProviderIsEmpty()
// Update the CredentialIssuer.
updateCredentialIssuerInInformerAndWait(credentialIssuerResourceName, v1alpha1.CredentialIssuerSpec{
ImpersonationProxy: &v1alpha1.ImpersonationProxySpec{
Mode: v1alpha1.ImpersonationProxyModeDisabled,
},
}, pinnipedInformers.Config().V1alpha1().CredentialIssuers())
deleteServiceFromTracker(loadBalancerServiceName, kubeInformerClient)
waitForObjectToBeDeletedFromInformer(loadBalancerServiceName, kubeInformers.Core().V1().Services())
r.NoError(runControllerSync())
requireTLSServerIsNoLongerRunning()
r.Len(kubeAPIClient.Actions(), 4)
requireClusterIPWasDeleted(kubeAPIClient.Actions()[3])
requireCredentialIssuer(newManuallyDisabledStrategy())
requireSigningCertProviderIsEmpty()
// Update the CredentialIssuer again.
updateCredentialIssuerInInformerAndWait(credentialIssuerResourceName, v1alpha1.CredentialIssuerSpec{
ImpersonationProxy: &v1alpha1.ImpersonationProxySpec{
Mode: v1alpha1.ImpersonationProxyModeEnabled,
},
}, pinnipedInformers.Config().V1alpha1().CredentialIssuers())
deleteServiceFromTracker(clusterIPServiceName, kubeInformerClient)
waitForObjectToBeDeletedFromInformer(clusterIPServiceName, kubeInformers.Core().V1().Services())
r.NoError(runControllerSync())
requireTLSServerIsRunningWithoutCerts()
r.Len(kubeAPIClient.Actions(), 5)
requireLoadBalancerWasCreated(kubeAPIClient.Actions()[4])
requireCredentialIssuer(newPendingStrategy())
requireSigningCertProviderIsEmpty()
// Update the CredentialIssuer again.
updateCredentialIssuerInInformerAndWait(credentialIssuerResourceName, v1alpha1.CredentialIssuerSpec{
ImpersonationProxy: &v1alpha1.ImpersonationProxySpec{
Mode: v1alpha1.ImpersonationProxyModeEnabled,
Service: v1alpha1.ImpersonationProxyServiceSpec{
Type: v1alpha1.ImpersonationProxyServiceTypeClusterIP,
},
},
}, pinnipedInformers.Config().V1alpha1().CredentialIssuers())
r.NoError(runControllerSync())
requireTLSServerIsRunningWithoutCerts()
r.Len(kubeAPIClient.Actions(), 5)
requireClusterIPWasCreated(kubeAPIClient.Actions()[4])
requireCredentialIssuer(newPendingStrategy())
requireSigningCertProviderIsEmpty()
})
})
})