Skip to content

Commit

Permalink
Merge pull request #227 from gabrielggg/main
Browse files Browse the repository at this point in the history
[Policy Assistant] Add support for k8s native workload traffic
  • Loading branch information
k8s-ci-robot authored Jul 5, 2024
2 parents c7f1995 + 8ec9885 commit 964c353
Show file tree
Hide file tree
Showing 14 changed files with 1,266 additions and 3 deletions.
38 changes: 38 additions & 0 deletions cmd/policy-assistant/pkg/kube/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

"github.com/pkg/errors"
"github.com/sirupsen/logrus"
appsv1 "k8s.io/api/apps/v1"
v1 "k8s.io/api/core/v1"
networkingv1 "k8s.io/api/networking/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -112,6 +113,43 @@ func (k *Kubernetes) CreateNetworkPolicy(policy *networkingv1.NetworkPolicy) (*n
return createdPolicy, errors.Wrapf(err, "unable to create network policy %s/%s", policy.Namespace, policy.Name)
}

func (k *Kubernetes) GetDeploymentsInNamespace(namespace string) ([]appsv1.Deployment, error) {
deploymentList, err := k.ClientSet.AppsV1().Deployments(namespace).List(context.TODO(), metav1.ListOptions{})
if err != nil {
return nil, errors.Wrapf(err, "unable to get deployments in namespace %s", namespace)
}
return deploymentList.Items, nil
}

func (k *Kubernetes) GetDaemonSetsInNamespace(namespace string) ([]appsv1.DaemonSet, error) {
daemonSetList, err := k.ClientSet.AppsV1().DaemonSets(namespace).List(context.TODO(), metav1.ListOptions{})
if err != nil {
return nil, errors.Wrapf(err, "unable to get daemonSets in namespace %s", namespace)
}
return daemonSetList.Items, nil
}

func (k *Kubernetes) GetStatefulSetsInNamespace(namespace string) ([]appsv1.StatefulSet, error) {
statefulSetList, err := k.ClientSet.AppsV1().StatefulSets(namespace).List(context.TODO(), metav1.ListOptions{})
if err != nil {
return nil, errors.Wrapf(err, "unable to get StatefulSets in namespace %s", namespace)
}
return statefulSetList.Items, nil
}

func (k *Kubernetes) GetReplicaSetsInNamespace(namespace string) ([]appsv1.ReplicaSet, error) {
replicaSetList, err := k.ClientSet.AppsV1().ReplicaSets(namespace).List(context.TODO(), metav1.ListOptions{})
if err != nil {
return nil, errors.Wrapf(err, "unable to get ReplicaSets in namespace %s", namespace)
}
return replicaSetList.Items, nil
}

func (k *Kubernetes) GetReplicaSet(namespace string, name string) (*appsv1.ReplicaSet, error) {
replicaSet, err := k.ClientSet.AppsV1().ReplicaSets(namespace).Get(context.TODO(), name, metav1.GetOptions{})
return replicaSet, errors.Wrapf(err, "unable to get replicaSet %s/%s", namespace, name)
}

func (k *Kubernetes) GetService(namespace string, name string) (*v1.Service, error) {
service, err := k.ClientSet.CoreV1().Services(namespace).Get(context.TODO(), name, metav1.GetOptions{})
return service, errors.Wrapf(err, "unable to get service %s/%s", namespace, name)
Expand Down
268 changes: 265 additions & 3 deletions cmd/policy-assistant/pkg/matcher/traffic.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@ import (
"strings"

"github.com/mattfenwick/collections/pkg/slice"
"github.com/mattfenwick/cyclonus/pkg/kube"
"github.com/mattfenwick/cyclonus/pkg/utils"
"github.com/olekukonko/tablewriter"
"github.com/sirupsen/logrus"
"golang.org/x/exp/maps"
v1 "k8s.io/api/core/v1"
)
Expand Down Expand Up @@ -57,7 +60,8 @@ func labelsToString(labels map[string]string) string {

type TrafficPeer struct {
Internal *InternalPeer
IP string
// IP external to cluster
IP string
}

func (p *TrafficPeer) Namespace() string {
Expand All @@ -71,10 +75,268 @@ func (p *TrafficPeer) IsExternal() bool {
return p.Internal == nil
}

func (p *TrafficPeer) Translate() TrafficPeer {
//Translates kubernetes workload types to TrafficPeers.
var podsNetworking []*PodNetworking
var podLabels map[string]string
var namespaceLabels map[string]string
var workloadOwner string
var workloadKind string
var internalPeer InternalPeer
workloadOwnerExists := false
workloadMetadata := strings.Split(strings.ToLower(p.Internal.Workload), "/")
if len(workloadMetadata) != 3 || (workloadMetadata[0] == "" || workloadMetadata[1] == "" || workloadMetadata[2] == "") || (workloadMetadata[1] != "daemonset" && workloadMetadata[1] != "statefulset" && workloadMetadata[1] != "replicaset" && workloadMetadata[1] != "deployment" && workloadMetadata[1] != "pod") {
logrus.Fatalf("Bad Workload structure: Types supported are pod, replicaset, deployment, daemonset, statefulset, and 3 fields are required with this structure, <namespace>/<workloadType>/<workloadName>")
}
kubeClient, err := kube.NewKubernetesForContext("")
utils.DoOrDie(err)
ns, err := kubeClient.GetNamespace(workloadMetadata[0])
utils.DoOrDie(err)
kubePods, err := kube.GetPodsInNamespaces(kubeClient, []string{workloadMetadata[0]})
if err != nil {
logrus.Fatalf("unable to read pods from kube, ns '%s': %+v", workloadMetadata[0], err)
}
for _, pod := range kubePods {
if workloadMetadata[1] == "deployment" && pod.OwnerReferences != nil && pod.OwnerReferences[0].Kind == "ReplicaSet" {
kubeReplicaSets, err := kubeClient.GetReplicaSet(workloadMetadata[0], pod.OwnerReferences[0].Name)
if err != nil {
logrus.Fatalf("unable to read Replicaset from kube, rs '%s': %+v", pod.OwnerReferences[0].Name, err)
}
if kubeReplicaSets.OwnerReferences != nil {
workloadOwner = kubeReplicaSets.OwnerReferences[0].Name
workloadKind = "deployment"
}

} else if (workloadMetadata[1] == "daemonset" || workloadMetadata[1] == "statefulset" || workloadMetadata[1] == "replicaset") && pod.OwnerReferences != nil {
workloadOwner = pod.OwnerReferences[0].Name
workloadKind = pod.OwnerReferences[0].Kind
} else if workloadMetadata[1] == "pod" {
workloadOwner = pod.Name
workloadKind = "pod"
}
if strings.ToLower(workloadOwner) == workloadMetadata[2] && strings.ToLower(workloadKind) == workloadMetadata[1] {
podLabels = pod.Labels
namespaceLabels = ns.Labels
podNetworking := PodNetworking{
IP: pod.Status.PodIP,
}
podsNetworking = append(podsNetworking, &podNetworking)
workloadOwnerExists = true

}
}

if !workloadOwnerExists {
logrus.Infof("workload not found on the cluster")
internalPeer = InternalPeer{
Workload: "",
}
} else {
internalPeer = InternalPeer{
Workload: p.Internal.Workload,
PodLabels: podLabels,
NamespaceLabels: namespaceLabels,
Namespace: workloadMetadata[0],
Pods: podsNetworking,
}
}

TranslatedPeer := TrafficPeer{
Internal: &internalPeer,
}
return TranslatedPeer
}

func DeploymentsToTrafficPeers() []TrafficPeer {
//Translates all pods associated with deployments to TrafficPeers.
var deploymentPeers []TrafficPeer
kubeClient, err := kube.NewKubernetesForContext("")
utils.DoOrDie(err)
kubeNamespaces, err := kubeClient.GetAllNamespaces()
if err != nil {
logrus.Fatalf("unable to read namespaces from kube: %+v", err)
}

for _, namespace := range kubeNamespaces.Items {
kubeDeployments, err := kubeClient.GetDeploymentsInNamespace(namespace.Name)
if err != nil {
logrus.Fatalf("unable to read deployments from kube, ns '%s': %+v", namespace.Name, err)
}
for _, deployment := range kubeDeployments {
tmpInternalPeer := InternalPeer{
Workload: namespace.Name + "/deployment/" + deployment.Name,
}
tmpPeer := TrafficPeer{
Internal: &tmpInternalPeer,
}
tmpPeerTranslated := tmpPeer.Translate()
if tmpPeerTranslated.Internal.Workload != "" {
deploymentPeers = append(deploymentPeers, tmpPeerTranslated)
}

}

}

return deploymentPeers
}

func DaemonSetsToTrafficPeers() []TrafficPeer {
//Translates all pods associated with daemonSets to TrafficPeers.
var daemonSetPeers []TrafficPeer
kubeClient, err := kube.NewKubernetesForContext("")
utils.DoOrDie(err)
kubeNamespaces, err := kubeClient.GetAllNamespaces()
if err != nil {
logrus.Fatalf("unable to read namespaces from kube: %+v", err)
}

for _, namespace := range kubeNamespaces.Items {
kubeDaemonSets, err := kubeClient.GetDaemonSetsInNamespace(namespace.Name)
if err != nil {
logrus.Fatalf("unable to read daemonSets from kube, ns '%s': %+v", namespace.Name, err)
}
for _, daemonSet := range kubeDaemonSets {
tmpInternalPeer := InternalPeer{
Workload: namespace.Name + "/daemonset/" + daemonSet.Name,
}
tmpPeer := TrafficPeer{
Internal: &tmpInternalPeer,
}
tmpPeerTranslated := tmpPeer.Translate()
if tmpPeerTranslated.Internal.Workload != "" {
daemonSetPeers = append(daemonSetPeers, tmpPeerTranslated)
}
}

}

return daemonSetPeers
}

func StatefulSetsToTrafficPeers() []TrafficPeer {
//Translates all pods associated with statefulSets to TrafficPeers.
var statefulSetPeers []TrafficPeer
kubeClient, err := kube.NewKubernetesForContext("")
utils.DoOrDie(err)
kubeNamespaces, err := kubeClient.GetAllNamespaces()
if err != nil {
logrus.Fatalf("unable to read namespaces from kube: %+v", err)
}

for _, namespace := range kubeNamespaces.Items {
kubeStatefulSets, err := kubeClient.GetStatefulSetsInNamespace(namespace.Name)
if err != nil {
logrus.Fatalf("unable to read statefulSets from kube, ns '%s': %+v", namespace.Name, err)
}
for _, statefulSet := range kubeStatefulSets {
tmpInternalPeer := InternalPeer{
Workload: namespace.Name + "/statefulset/" + statefulSet.Name,
}
tmpPeer := TrafficPeer{
Internal: &tmpInternalPeer,
}
tmpPeerTranslated := tmpPeer.Translate()
if tmpPeerTranslated.Internal.Workload != "" {
statefulSetPeers = append(statefulSetPeers, tmpPeerTranslated)
}
}

}

return statefulSetPeers
}

func ReplicaSetsToTrafficPeers() []TrafficPeer {
//Translates all pods associated with replicaSets that are not associated with deployments to TrafficPeers.
var replicaSetPeers []TrafficPeer
kubeClient, err := kube.NewKubernetesForContext("")
utils.DoOrDie(err)
kubeNamespaces, err := kubeClient.GetAllNamespaces()
if err != nil {
logrus.Fatalf("unable to read namespaces from kube: %+v", err)
}

for _, namespace := range kubeNamespaces.Items {
kubeReplicaSets, err := kubeClient.GetReplicaSetsInNamespace(namespace.Name)
if err != nil {
logrus.Fatalf("unable to read replicaSets from kube, ns '%s': %+v", namespace.Name, err)
}

for _, replicaSet := range kubeReplicaSets {
if replicaSet.OwnerReferences != nil {
continue
} else {
tmpInternalPeer := InternalPeer{
Workload: namespace.Name + "/replicaset/" + replicaSet.Name,
}
tmpPeer := TrafficPeer{
Internal: &tmpInternalPeer,
}
tmpPeerTranslated := tmpPeer.Translate()
if tmpPeerTranslated.Internal.Workload != "" {
replicaSetPeers = append(replicaSetPeers, tmpPeerTranslated)
}

}
}

}

return replicaSetPeers
}

func PodsToTrafficPeers() []TrafficPeer {
//Translates all pods that are not associated with other workload types (deployment, replicaSet, daemonSet, statefulSet.) to TrafficPeers.
var podPeers []TrafficPeer
kubeClient, err := kube.NewKubernetesForContext("")
utils.DoOrDie(err)
kubeNamespaces, err := kubeClient.GetAllNamespaces()
if err != nil {
logrus.Fatalf("unable to read namespaces from kube: %+v", err)
}

for _, namespace := range kubeNamespaces.Items {
kubePods, err := kube.GetPodsInNamespaces(kubeClient, []string{namespace.Name})
if err != nil {
logrus.Fatalf("unable to read pods from kube, ns '%s': %+v", namespace.Name, err)
}
for _, pod := range kubePods {
if pod.OwnerReferences != nil {
continue
} else {
tmpInternalPeer := InternalPeer{
Workload: namespace.Name + "/pod/" + pod.Name,
}
tmpPeer := TrafficPeer{
Internal: &tmpInternalPeer,
}
tmpPeerTranslated := tmpPeer.Translate()
if tmpPeerTranslated.Internal.Workload != "" {
podPeers = append(podPeers, tmpPeerTranslated)
}
}
}

}

return podPeers
}

// Internal to cluster
type InternalPeer struct {
// optional: if set, will override remaining values with information from cluster
Workload string
PodLabels map[string]string
NamespaceLabels map[string]string
Namespace string
NodeLabels map[string]string
Node string
// optional
Pods []*PodNetworking
}

type PodNetworking struct {
IP string
// don't worry about populating below fields right now
IsHostNetworking bool
NodeLabels []string
}
Loading

0 comments on commit 964c353

Please sign in to comment.