Skip to content

Commit 964c353

Browse files
authored
Merge pull request #227 from gabrielggg/main
[Policy Assistant] Add support for k8s native workload traffic
2 parents c7f1995 + 8ec9885 commit 964c353

14 files changed

+1266
-3
lines changed

cmd/policy-assistant/pkg/kube/kubernetes.go

+38
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66

77
"github.com/pkg/errors"
88
"github.com/sirupsen/logrus"
9+
appsv1 "k8s.io/api/apps/v1"
910
v1 "k8s.io/api/core/v1"
1011
networkingv1 "k8s.io/api/networking/v1"
1112
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@@ -112,6 +113,43 @@ func (k *Kubernetes) CreateNetworkPolicy(policy *networkingv1.NetworkPolicy) (*n
112113
return createdPolicy, errors.Wrapf(err, "unable to create network policy %s/%s", policy.Namespace, policy.Name)
113114
}
114115

116+
func (k *Kubernetes) GetDeploymentsInNamespace(namespace string) ([]appsv1.Deployment, error) {
117+
deploymentList, err := k.ClientSet.AppsV1().Deployments(namespace).List(context.TODO(), metav1.ListOptions{})
118+
if err != nil {
119+
return nil, errors.Wrapf(err, "unable to get deployments in namespace %s", namespace)
120+
}
121+
return deploymentList.Items, nil
122+
}
123+
124+
func (k *Kubernetes) GetDaemonSetsInNamespace(namespace string) ([]appsv1.DaemonSet, error) {
125+
daemonSetList, err := k.ClientSet.AppsV1().DaemonSets(namespace).List(context.TODO(), metav1.ListOptions{})
126+
if err != nil {
127+
return nil, errors.Wrapf(err, "unable to get daemonSets in namespace %s", namespace)
128+
}
129+
return daemonSetList.Items, nil
130+
}
131+
132+
func (k *Kubernetes) GetStatefulSetsInNamespace(namespace string) ([]appsv1.StatefulSet, error) {
133+
statefulSetList, err := k.ClientSet.AppsV1().StatefulSets(namespace).List(context.TODO(), metav1.ListOptions{})
134+
if err != nil {
135+
return nil, errors.Wrapf(err, "unable to get StatefulSets in namespace %s", namespace)
136+
}
137+
return statefulSetList.Items, nil
138+
}
139+
140+
func (k *Kubernetes) GetReplicaSetsInNamespace(namespace string) ([]appsv1.ReplicaSet, error) {
141+
replicaSetList, err := k.ClientSet.AppsV1().ReplicaSets(namespace).List(context.TODO(), metav1.ListOptions{})
142+
if err != nil {
143+
return nil, errors.Wrapf(err, "unable to get ReplicaSets in namespace %s", namespace)
144+
}
145+
return replicaSetList.Items, nil
146+
}
147+
148+
func (k *Kubernetes) GetReplicaSet(namespace string, name string) (*appsv1.ReplicaSet, error) {
149+
replicaSet, err := k.ClientSet.AppsV1().ReplicaSets(namespace).Get(context.TODO(), name, metav1.GetOptions{})
150+
return replicaSet, errors.Wrapf(err, "unable to get replicaSet %s/%s", namespace, name)
151+
}
152+
115153
func (k *Kubernetes) GetService(namespace string, name string) (*v1.Service, error) {
116154
service, err := k.ClientSet.CoreV1().Services(namespace).Get(context.TODO(), name, metav1.GetOptions{})
117155
return service, errors.Wrapf(err, "unable to get service %s/%s", namespace, name)

cmd/policy-assistant/pkg/matcher/traffic.go

+265-3
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,10 @@ import (
55
"strings"
66

77
"github.com/mattfenwick/collections/pkg/slice"
8+
"github.com/mattfenwick/cyclonus/pkg/kube"
9+
"github.com/mattfenwick/cyclonus/pkg/utils"
810
"github.com/olekukonko/tablewriter"
11+
"github.com/sirupsen/logrus"
912
"golang.org/x/exp/maps"
1013
v1 "k8s.io/api/core/v1"
1114
)
@@ -57,7 +60,8 @@ func labelsToString(labels map[string]string) string {
5760

5861
type TrafficPeer struct {
5962
Internal *InternalPeer
60-
IP string
63+
// IP external to cluster
64+
IP string
6165
}
6266

6367
func (p *TrafficPeer) Namespace() string {
@@ -71,10 +75,268 @@ func (p *TrafficPeer) IsExternal() bool {
7175
return p.Internal == nil
7276
}
7377

78+
func (p *TrafficPeer) Translate() TrafficPeer {
79+
//Translates kubernetes workload types to TrafficPeers.
80+
var podsNetworking []*PodNetworking
81+
var podLabels map[string]string
82+
var namespaceLabels map[string]string
83+
var workloadOwner string
84+
var workloadKind string
85+
var internalPeer InternalPeer
86+
workloadOwnerExists := false
87+
workloadMetadata := strings.Split(strings.ToLower(p.Internal.Workload), "/")
88+
if len(workloadMetadata) != 3 || (workloadMetadata[0] == "" || workloadMetadata[1] == "" || workloadMetadata[2] == "") || (workloadMetadata[1] != "daemonset" && workloadMetadata[1] != "statefulset" && workloadMetadata[1] != "replicaset" && workloadMetadata[1] != "deployment" && workloadMetadata[1] != "pod") {
89+
logrus.Fatalf("Bad Workload structure: Types supported are pod, replicaset, deployment, daemonset, statefulset, and 3 fields are required with this structure, <namespace>/<workloadType>/<workloadName>")
90+
}
91+
kubeClient, err := kube.NewKubernetesForContext("")
92+
utils.DoOrDie(err)
93+
ns, err := kubeClient.GetNamespace(workloadMetadata[0])
94+
utils.DoOrDie(err)
95+
kubePods, err := kube.GetPodsInNamespaces(kubeClient, []string{workloadMetadata[0]})
96+
if err != nil {
97+
logrus.Fatalf("unable to read pods from kube, ns '%s': %+v", workloadMetadata[0], err)
98+
}
99+
for _, pod := range kubePods {
100+
if workloadMetadata[1] == "deployment" && pod.OwnerReferences != nil && pod.OwnerReferences[0].Kind == "ReplicaSet" {
101+
kubeReplicaSets, err := kubeClient.GetReplicaSet(workloadMetadata[0], pod.OwnerReferences[0].Name)
102+
if err != nil {
103+
logrus.Fatalf("unable to read Replicaset from kube, rs '%s': %+v", pod.OwnerReferences[0].Name, err)
104+
}
105+
if kubeReplicaSets.OwnerReferences != nil {
106+
workloadOwner = kubeReplicaSets.OwnerReferences[0].Name
107+
workloadKind = "deployment"
108+
}
109+
110+
} else if (workloadMetadata[1] == "daemonset" || workloadMetadata[1] == "statefulset" || workloadMetadata[1] == "replicaset") && pod.OwnerReferences != nil {
111+
workloadOwner = pod.OwnerReferences[0].Name
112+
workloadKind = pod.OwnerReferences[0].Kind
113+
} else if workloadMetadata[1] == "pod" {
114+
workloadOwner = pod.Name
115+
workloadKind = "pod"
116+
}
117+
if strings.ToLower(workloadOwner) == workloadMetadata[2] && strings.ToLower(workloadKind) == workloadMetadata[1] {
118+
podLabels = pod.Labels
119+
namespaceLabels = ns.Labels
120+
podNetworking := PodNetworking{
121+
IP: pod.Status.PodIP,
122+
}
123+
podsNetworking = append(podsNetworking, &podNetworking)
124+
workloadOwnerExists = true
125+
126+
}
127+
}
128+
129+
if !workloadOwnerExists {
130+
logrus.Infof("workload not found on the cluster")
131+
internalPeer = InternalPeer{
132+
Workload: "",
133+
}
134+
} else {
135+
internalPeer = InternalPeer{
136+
Workload: p.Internal.Workload,
137+
PodLabels: podLabels,
138+
NamespaceLabels: namespaceLabels,
139+
Namespace: workloadMetadata[0],
140+
Pods: podsNetworking,
141+
}
142+
}
143+
144+
TranslatedPeer := TrafficPeer{
145+
Internal: &internalPeer,
146+
}
147+
return TranslatedPeer
148+
}
149+
150+
func DeploymentsToTrafficPeers() []TrafficPeer {
151+
//Translates all pods associated with deployments to TrafficPeers.
152+
var deploymentPeers []TrafficPeer
153+
kubeClient, err := kube.NewKubernetesForContext("")
154+
utils.DoOrDie(err)
155+
kubeNamespaces, err := kubeClient.GetAllNamespaces()
156+
if err != nil {
157+
logrus.Fatalf("unable to read namespaces from kube: %+v", err)
158+
}
159+
160+
for _, namespace := range kubeNamespaces.Items {
161+
kubeDeployments, err := kubeClient.GetDeploymentsInNamespace(namespace.Name)
162+
if err != nil {
163+
logrus.Fatalf("unable to read deployments from kube, ns '%s': %+v", namespace.Name, err)
164+
}
165+
for _, deployment := range kubeDeployments {
166+
tmpInternalPeer := InternalPeer{
167+
Workload: namespace.Name + "/deployment/" + deployment.Name,
168+
}
169+
tmpPeer := TrafficPeer{
170+
Internal: &tmpInternalPeer,
171+
}
172+
tmpPeerTranslated := tmpPeer.Translate()
173+
if tmpPeerTranslated.Internal.Workload != "" {
174+
deploymentPeers = append(deploymentPeers, tmpPeerTranslated)
175+
}
176+
177+
}
178+
179+
}
180+
181+
return deploymentPeers
182+
}
183+
184+
func DaemonSetsToTrafficPeers() []TrafficPeer {
185+
//Translates all pods associated with daemonSets to TrafficPeers.
186+
var daemonSetPeers []TrafficPeer
187+
kubeClient, err := kube.NewKubernetesForContext("")
188+
utils.DoOrDie(err)
189+
kubeNamespaces, err := kubeClient.GetAllNamespaces()
190+
if err != nil {
191+
logrus.Fatalf("unable to read namespaces from kube: %+v", err)
192+
}
193+
194+
for _, namespace := range kubeNamespaces.Items {
195+
kubeDaemonSets, err := kubeClient.GetDaemonSetsInNamespace(namespace.Name)
196+
if err != nil {
197+
logrus.Fatalf("unable to read daemonSets from kube, ns '%s': %+v", namespace.Name, err)
198+
}
199+
for _, daemonSet := range kubeDaemonSets {
200+
tmpInternalPeer := InternalPeer{
201+
Workload: namespace.Name + "/daemonset/" + daemonSet.Name,
202+
}
203+
tmpPeer := TrafficPeer{
204+
Internal: &tmpInternalPeer,
205+
}
206+
tmpPeerTranslated := tmpPeer.Translate()
207+
if tmpPeerTranslated.Internal.Workload != "" {
208+
daemonSetPeers = append(daemonSetPeers, tmpPeerTranslated)
209+
}
210+
}
211+
212+
}
213+
214+
return daemonSetPeers
215+
}
216+
217+
func StatefulSetsToTrafficPeers() []TrafficPeer {
218+
//Translates all pods associated with statefulSets to TrafficPeers.
219+
var statefulSetPeers []TrafficPeer
220+
kubeClient, err := kube.NewKubernetesForContext("")
221+
utils.DoOrDie(err)
222+
kubeNamespaces, err := kubeClient.GetAllNamespaces()
223+
if err != nil {
224+
logrus.Fatalf("unable to read namespaces from kube: %+v", err)
225+
}
226+
227+
for _, namespace := range kubeNamespaces.Items {
228+
kubeStatefulSets, err := kubeClient.GetStatefulSetsInNamespace(namespace.Name)
229+
if err != nil {
230+
logrus.Fatalf("unable to read statefulSets from kube, ns '%s': %+v", namespace.Name, err)
231+
}
232+
for _, statefulSet := range kubeStatefulSets {
233+
tmpInternalPeer := InternalPeer{
234+
Workload: namespace.Name + "/statefulset/" + statefulSet.Name,
235+
}
236+
tmpPeer := TrafficPeer{
237+
Internal: &tmpInternalPeer,
238+
}
239+
tmpPeerTranslated := tmpPeer.Translate()
240+
if tmpPeerTranslated.Internal.Workload != "" {
241+
statefulSetPeers = append(statefulSetPeers, tmpPeerTranslated)
242+
}
243+
}
244+
245+
}
246+
247+
return statefulSetPeers
248+
}
249+
250+
func ReplicaSetsToTrafficPeers() []TrafficPeer {
251+
//Translates all pods associated with replicaSets that are not associated with deployments to TrafficPeers.
252+
var replicaSetPeers []TrafficPeer
253+
kubeClient, err := kube.NewKubernetesForContext("")
254+
utils.DoOrDie(err)
255+
kubeNamespaces, err := kubeClient.GetAllNamespaces()
256+
if err != nil {
257+
logrus.Fatalf("unable to read namespaces from kube: %+v", err)
258+
}
259+
260+
for _, namespace := range kubeNamespaces.Items {
261+
kubeReplicaSets, err := kubeClient.GetReplicaSetsInNamespace(namespace.Name)
262+
if err != nil {
263+
logrus.Fatalf("unable to read replicaSets from kube, ns '%s': %+v", namespace.Name, err)
264+
}
265+
266+
for _, replicaSet := range kubeReplicaSets {
267+
if replicaSet.OwnerReferences != nil {
268+
continue
269+
} else {
270+
tmpInternalPeer := InternalPeer{
271+
Workload: namespace.Name + "/replicaset/" + replicaSet.Name,
272+
}
273+
tmpPeer := TrafficPeer{
274+
Internal: &tmpInternalPeer,
275+
}
276+
tmpPeerTranslated := tmpPeer.Translate()
277+
if tmpPeerTranslated.Internal.Workload != "" {
278+
replicaSetPeers = append(replicaSetPeers, tmpPeerTranslated)
279+
}
280+
281+
}
282+
}
283+
284+
}
285+
286+
return replicaSetPeers
287+
}
288+
289+
func PodsToTrafficPeers() []TrafficPeer {
290+
//Translates all pods that are not associated with other workload types (deployment, replicaSet, daemonSet, statefulSet.) to TrafficPeers.
291+
var podPeers []TrafficPeer
292+
kubeClient, err := kube.NewKubernetesForContext("")
293+
utils.DoOrDie(err)
294+
kubeNamespaces, err := kubeClient.GetAllNamespaces()
295+
if err != nil {
296+
logrus.Fatalf("unable to read namespaces from kube: %+v", err)
297+
}
298+
299+
for _, namespace := range kubeNamespaces.Items {
300+
kubePods, err := kube.GetPodsInNamespaces(kubeClient, []string{namespace.Name})
301+
if err != nil {
302+
logrus.Fatalf("unable to read pods from kube, ns '%s': %+v", namespace.Name, err)
303+
}
304+
for _, pod := range kubePods {
305+
if pod.OwnerReferences != nil {
306+
continue
307+
} else {
308+
tmpInternalPeer := InternalPeer{
309+
Workload: namespace.Name + "/pod/" + pod.Name,
310+
}
311+
tmpPeer := TrafficPeer{
312+
Internal: &tmpInternalPeer,
313+
}
314+
tmpPeerTranslated := tmpPeer.Translate()
315+
if tmpPeerTranslated.Internal.Workload != "" {
316+
podPeers = append(podPeers, tmpPeerTranslated)
317+
}
318+
}
319+
}
320+
321+
}
322+
323+
return podPeers
324+
}
325+
326+
// Internal to cluster
74327
type InternalPeer struct {
328+
// optional: if set, will override remaining values with information from cluster
329+
Workload string
75330
PodLabels map[string]string
76331
NamespaceLabels map[string]string
77332
Namespace string
78-
NodeLabels map[string]string
79-
Node string
333+
// optional
334+
Pods []*PodNetworking
335+
}
336+
337+
type PodNetworking struct {
338+
IP string
339+
// don't worry about populating below fields right now
340+
IsHostNetworking bool
341+
NodeLabels []string
80342
}

0 commit comments

Comments
 (0)