Skip to content

Commit

Permalink
restored logs that are ununused by current npm (v2)
Browse files Browse the repository at this point in the history
  • Loading branch information
rayaisaiah committed Mar 3, 2025
1 parent d603453 commit 36eb2d6
Show file tree
Hide file tree
Showing 10 changed files with 135 additions and 0 deletions.
18 changes: 18 additions & 0 deletions npm/pkg/controlplane/controllers/v1/nameSpaceController.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,8 @@ func (nsc *NamespaceController) needSync(obj interface{}, event string) (string,
return key, needSync
}

klog.Infof("[NAMESPACE %s EVENT] for namespace [%s]", event, key)

needSync = true
return key, needSync
}
Expand Down Expand Up @@ -159,10 +161,14 @@ func (nsc *NamespaceController) Run(stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
defer nsc.workqueue.ShutDown()

klog.Info("Starting Namespace controller\n")
klog.Info("Starting workers")
// Launch workers to process namespace resources
go wait.Until(nsc.runWorker, time.Second, stopCh)

klog.Info("Started workers")
<-stopCh
klog.Info("Shutting down workers")
}

func (nsc *NamespaceController) runWorker() {
Expand Down Expand Up @@ -200,6 +206,7 @@ func (nsc *NamespaceController) processNextWorkItem() bool {
// Finally, if no error occurs we Forget this item so it does not
// get queued again until another change happens.
nsc.workqueue.Forget(obj)
klog.Infof("Successfully synced '%s'", key)
return nil
}(obj)
if err != nil {
Expand Down Expand Up @@ -282,6 +289,7 @@ func (nsc *NamespaceController) syncNameSpace(key string) error {
func (nsc *NamespaceController) syncAddNameSpace(nsObj *corev1.Namespace) error {
var err error
corev1NsName, corev1NsLabels := util.GetNSNameWithPrefix(nsObj.ObjectMeta.Name), nsObj.ObjectMeta.Labels
klog.Infof("NAMESPACE CREATING: [%s/%v]", corev1NsName, corev1NsLabels)

// Create ipset for the namespace.
if err = nsc.ipsMgr.CreateSet(corev1NsName, []string{util.IpsetNetHashFlag}); err != nil {
Expand All @@ -300,12 +308,14 @@ func (nsc *NamespaceController) syncAddNameSpace(nsObj *corev1.Namespace) error
// Add the namespace to its label's ipset list.
for nsLabelKey, nsLabelVal := range corev1NsLabels {
labelIpsetName := util.GetNSNameWithPrefix(nsLabelKey)
klog.Infof("Adding namespace %s to ipset list %s", corev1NsName, labelIpsetName)
if err = nsc.ipsMgr.AddToList(labelIpsetName, corev1NsName); err != nil {
metrics.SendErrorLogAndMetric(util.NSID, "[AddNamespace] Error: failed to add namespace %s to ipset list %s with err: %v", corev1NsName, labelIpsetName, err)
return err
}

labelIpsetName = util.GetNSNameWithPrefix(util.GetIpSetFromLabelKV(nsLabelKey, nsLabelVal))
klog.Infof("Adding namespace %s to ipset list %s", corev1NsName, labelIpsetName)
if err = nsc.ipsMgr.AddToList(labelIpsetName, corev1NsName); err != nil {
metrics.SendErrorLogAndMetric(util.NSID, "[AddNamespace] Error: failed to add namespace %s to ipset list %s with err: %v", corev1NsName, labelIpsetName, err)
return err
Expand All @@ -322,6 +332,7 @@ func (nsc *NamespaceController) syncAddNameSpace(nsObj *corev1.Namespace) error
func (nsc *NamespaceController) syncUpdateNameSpace(newNsObj *corev1.Namespace) (metrics.OperationKind, error) {
var err error
newNsName, newNsLabel := util.GetNSNameWithPrefix(newNsObj.ObjectMeta.Name), newNsObj.ObjectMeta.Labels
klog.Infof("NAMESPACE UPDATING:\n namespace: [%s/%v]", newNsName, newNsLabel)

// If previous syncAddNameSpace failed for some reasons
// before caching npm namespace object or syncUpdateNameSpace is called due to namespace creation event,
Expand All @@ -343,6 +354,7 @@ func (nsc *NamespaceController) syncUpdateNameSpace(newNsObj *corev1.Namespace)
// Delete the namespace from its label's ipset list.
for _, nsLabelVal := range deleteFromIPSets {
labelKey := util.GetNSNameWithPrefix(nsLabelVal)
klog.Infof("Deleting namespace %s from ipset list %s", newNsName, labelKey)
if err = nsc.ipsMgr.DeleteFromList(labelKey, newNsName); err != nil {
metrics.SendErrorLogAndMetric(util.NSID, "[UpdateNamespace] Error: failed to delete namespace %s from ipset list %s with err: %v", newNsName, labelKey, err)
return metrics.UpdateOp, fmt.Errorf("failed to delete namespace %s from ipset list %s with err: %w", newNsName, labelKey, err)
Expand All @@ -359,6 +371,7 @@ func (nsc *NamespaceController) syncUpdateNameSpace(newNsObj *corev1.Namespace)
// Add the namespace to its label's ipset list.
for _, nsLabelVal := range addToIPSets {
labelKey := util.GetNSNameWithPrefix(nsLabelVal)
klog.Infof("Adding namespace %s to ipset list %s", newNsName, labelKey)
if err = nsc.ipsMgr.AddToList(labelKey, newNsName); err != nil {
metrics.SendErrorLogAndMetric(util.NSID, "[UpdateNamespace] Error: failed to add namespace %s to ipset list %s with err: %v", newNsName, labelKey, err)
return metrics.UpdateOp, fmt.Errorf("failed to add namespace %s to ipset list %s with err: %w", newNsName, labelKey, err)
Expand All @@ -382,21 +395,26 @@ func (nsc *NamespaceController) syncUpdateNameSpace(newNsObj *corev1.Namespace)

// cleanDeletedNamespace handles deleting namespace from ipset.
func (nsc *NamespaceController) cleanDeletedNamespace(cachedNsKey string) error {
klog.Infof("NAMESPACE DELETING: [%s]", cachedNsKey)
cachedNsObj, exists := nsc.npmNamespaceCache.NsMap[cachedNsKey]
if !exists {
return nil
}

klog.Infof("NAMESPACE DELETING cached labels: [%s/%v]", cachedNsKey, cachedNsObj.LabelsMap)

var err error
// Delete the namespace from its label's ipset list.
for nsLabelKey, nsLabelVal := range cachedNsObj.LabelsMap {
labelIpsetName := util.GetNSNameWithPrefix(nsLabelKey)
klog.Infof("Deleting namespace %s from ipset list %s", cachedNsKey, labelIpsetName)
if err = nsc.ipsMgr.DeleteFromList(labelIpsetName, cachedNsKey); err != nil {
metrics.SendErrorLogAndMetric(util.NSID, "[DeleteNamespace] Error: failed to delete namespace %s from ipset list %s with err: %v", cachedNsKey, labelIpsetName, err)
return err
}

labelIpsetName = util.GetNSNameWithPrefix(util.GetIpSetFromLabelKV(nsLabelKey, nsLabelVal))
klog.Infof("Deleting namespace %s from ipset list %s", cachedNsKey, labelIpsetName)
if err = nsc.ipsMgr.DeleteFromList(labelIpsetName, cachedNsKey); err != nil {
metrics.SendErrorLogAndMetric(util.NSID, "[DeleteNamespace] Error: failed to delete namespace %s from ipset list %s with err: %v", cachedNsKey, labelIpsetName, err)
return err
Expand Down
14 changes: 14 additions & 0 deletions npm/pkg/controlplane/controllers/v1/nameSpaceController_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,26 +133,39 @@ func newNameSpace(name, rv string, labels map[string]string) *corev1.Namespace {
}

func addNamespace(t *testing.T, f *nameSpaceFixture, nsObj *corev1.Namespace) {
t.Logf("Calling add namespace event")
f.nsController.addNamespace(nsObj)
if f.nsController.workqueue.Len() == 0 {
t.Logf("Add Namespace: worker queue length is 0 ")
return
}
f.nsController.processNextWorkItem()
}

func updateNamespace(t *testing.T, f *nameSpaceFixture, oldNsObj, newNsObj *corev1.Namespace) {
addNamespace(t, f, oldNsObj)
t.Logf("Complete add namespace event")

t.Logf("Updating kubeinformer namespace object")
f.kubeInformer.Core().V1().Namespaces().Informer().GetIndexer().Update(newNsObj)

t.Logf("Calling update namespace event")
f.nsController.updateNamespace(oldNsObj, newNsObj)
if f.nsController.workqueue.Len() == 0 {
t.Logf("Update Namespace: worker queue length is 0 ")
return
}
f.nsController.processNextWorkItem()
}

func deleteNamespace(t *testing.T, f *nameSpaceFixture, nsObj *corev1.Namespace, isDeletedFinalStateUnknownObject IsDeletedFinalStateUnknownObject) {
addNamespace(t, f, nsObj)
t.Logf("Complete add namespace event")

t.Logf("Updating kubeinformer namespace object")
f.kubeInformer.Core().V1().Namespaces().Informer().GetIndexer().Delete(nsObj)

t.Logf("Calling delete namespace event")
if isDeletedFinalStateUnknownObject {
tombstone := cache.DeletedFinalStateUnknown{
Key: nsObj.Name,
Expand All @@ -164,6 +177,7 @@ func deleteNamespace(t *testing.T, f *nameSpaceFixture, nsObj *corev1.Namespace,
}

if f.nsController.workqueue.Len() == 0 {
t.Logf("Delete Namespace: worker queue length is 0 ")
return
}
f.nsController.processNextWorkItem()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,9 +177,12 @@ func (c *NetworkPolicyController) Run(stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
defer c.workqueue.ShutDown()

klog.Infof("Starting Network Policy worker")
go wait.Until(c.runWorker, time.Second, stopCh)

klog.Infof("Started Network Policy worker")
<-stopCh
klog.Info("Shutting down Network Policy workers")
}

func (c *NetworkPolicyController) runWorker() {
Expand Down Expand Up @@ -216,6 +219,7 @@ func (c *NetworkPolicyController) processNextWorkItem() bool {
// Finally, if no error occurs we Forget this item so it does not
// get queued again until another change happens.
c.workqueue.Forget(obj)
klog.Infof("Successfully synced '%s'", key)
return nil
}(obj)
if err != nil {
Expand Down Expand Up @@ -369,11 +373,13 @@ func (c *NetworkPolicyController) syncAddAndUpdateNetPol(netPolObj *networkingv1

sets, namedPorts, lists, ingressIPCidrs, egressIPCidrs, iptEntries := translatePolicy(netPolObj)
for _, set := range sets {
klog.Infof("Creating set: %v, hashedSet: %v", set, util.GetHashedName(set))
if err = c.ipsMgr.CreateSetNoLock(set, []string{util.IpsetNetHashFlag}); err != nil {
return operationKind, fmt.Errorf("[syncAddAndUpdateNetPol] Error: creating ipset %s with err: %w", set, err)
}
}
for _, set := range namedPorts {
klog.Infof("Creating set: %v, hashedSet: %v", set, util.GetHashedName(set))
if err = c.ipsMgr.CreateSetNoLock(set, []string{util.IpsetIPPortHashFlag}); err != nil {
return operationKind, fmt.Errorf("[syncAddAndUpdateNetPol] Error: creating ipset named port %s with err: %w", set, err)
}
Expand Down Expand Up @@ -484,6 +490,7 @@ func (c *NetworkPolicyController) createCidrsRule(direction, policyName, ns stri
continue
}
setName := policyName + "-in-ns-" + ns + "-" + strconv.Itoa(i) + direction
klog.Infof("Creating set: %v, hashedSet: %v", setName, util.GetHashedName(setName))
if err := c.ipsMgr.CreateSetNoLock(setName, spec); err != nil {
return fmt.Errorf("[createCidrsRule] Error: creating ipset %s with err: %w", ipCidrSet, err)
}
Expand Down Expand Up @@ -514,6 +521,7 @@ func (c *NetworkPolicyController) removeCidrsRule(direction, policyName, ns stri
continue
}
setName := policyName + "-in-ns-" + ns + "-" + strconv.Itoa(i) + direction
klog.Infof("Delete set: %v, hashedSet: %v", setName, util.GetHashedName(setName))
if err := c.ipsMgr.DeleteSet(setName); err != nil {
return fmt.Errorf("[removeCidrsRule] deleting ipset %s with err: %w", ipCidrSet, err)
}
Expand Down
30 changes: 30 additions & 0 deletions npm/pkg/controlplane/controllers/v1/podController.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,11 +89,15 @@ func (c *PodController) needSync(eventType string, obj interface{}) (string, boo
return key, needSync
}

klog.Infof("[POD %s EVENT] for %s in %s", eventType, podObj.Name, podObj.Namespace)

if !hasValidPodIP(podObj) {
return key, needSync
}

if isHostNetworkPod(podObj) {
klog.Infof("[POD %s EVENT] HostNetwork POD IGNORED: [%s/%s/%s/%+v%s]",
eventType, podObj.GetObjectMeta().GetUID(), podObj.Namespace, podObj.Name, podObj.Labels, podObj.Status.PodIP)
return key, needSync
}

Expand Down Expand Up @@ -128,6 +132,7 @@ func (c *PodController) addPod(obj interface{}) {
func (c *PodController) updatePod(old, newp interface{}) {
key, needSync := c.needSync("UPDATE", newp)
if !needSync {
klog.Infof("[POD UPDATE EVENT] No need to sync this pod")
return
}

Expand All @@ -138,6 +143,7 @@ func (c *PodController) updatePod(old, newp interface{}) {
if oldPod.ResourceVersion == newPod.ResourceVersion {
// Periodic resync will send update events for all known pods.
// Two different versions of the same pods will always have different RVs.
klog.Infof("[POD UPDATE EVENT] Two pods have the same RVs")
return
}
}
Expand All @@ -164,7 +170,9 @@ func (c *PodController) deletePod(obj interface{}) {
}
}

klog.Infof("[POD DELETE EVENT] for %s in %s", podObj.Name, podObj.Namespace)
if isHostNetworkPod(podObj) {
klog.Infof("[POD DELETE EVENT] HostNetwork POD IGNORED: [%s/%s/%s/%+v%s]", podObj.UID, podObj.Namespace, podObj.Name, podObj.Labels, podObj.Status.PodIP)
return
}

Expand All @@ -184,9 +192,12 @@ func (c *PodController) Run(stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
defer c.workqueue.ShutDown()

klog.Infof("Starting Pod worker")
go wait.Until(c.runWorker, time.Second, stopCh)

klog.Info("Started Pod workers")
<-stopCh
klog.Info("Shutting down Pod workers")
}

func (c *PodController) runWorker() {
Expand Down Expand Up @@ -224,6 +235,7 @@ func (c *PodController) processNextWorkItem() bool {
// Finally, if no error occurs we Forget this item so it does not
// get queued again until another change happens.
c.workqueue.Forget(obj)
klog.Infof("Successfully synced '%s'", key)
return nil
}(obj)
if err != nil {
Expand Down Expand Up @@ -313,11 +325,14 @@ func (c *PodController) syncPod(key string) error {
}

func (c *PodController) syncAddedPod(podObj *corev1.Pod) error {
klog.Infof("POD CREATING: [%s%s/%s/%s%+v%s]", string(podObj.GetUID()), podObj.Namespace,
podObj.Name, podObj.Spec.NodeName, podObj.Labels, podObj.Status.PodIP)

var err error
podNs := util.GetNSNameWithPrefix(podObj.Namespace)
podKey, _ := cache.MetaNamespaceKeyFunc(podObj)
// Add the pod ip information into namespace's ipset.
klog.Infof("Adding pod %s to ipset %s", podObj.Status.PodIP, podNs)
if err = c.ipsMgr.AddToSet(podNs, podObj.Status.PodIP, util.IpsetNetHashFlag, podKey); err != nil {
return fmt.Errorf("[syncAddedPod] Error: failed to add pod to namespace ipset with err: %w", err)
}
Expand All @@ -328,18 +343,21 @@ func (c *PodController) syncAddedPod(podObj *corev1.Pod) error {

// Get lists of podLabelKey and podLabelKey + podLavelValue ,and then start adding them to ipsets.
for labelKey, labelVal := range podObj.Labels {
klog.Infof("Adding pod %s to ipset %s", npmPodObj.PodIP, labelKey)
if err = c.ipsMgr.AddToSet(labelKey, npmPodObj.PodIP, util.IpsetNetHashFlag, podKey); err != nil {
return fmt.Errorf("[syncAddedPod] Error: failed to add pod to label ipset with err: %w", err)
}

podIPSetName := util.GetIpSetFromLabelKV(labelKey, labelVal)
klog.Infof("Adding pod %s to ipset %s", npmPodObj.PodIP, podIPSetName)
if err = c.ipsMgr.AddToSet(podIPSetName, npmPodObj.PodIP, util.IpsetNetHashFlag, podKey); err != nil {
return fmt.Errorf("[syncAddedPod] Error: failed to add pod to label ipset with err: %w", err)
}
npmPodObj.AppendLabels(map[string]string{labelKey: labelVal}, common.AppendToExistingLabels)
}

// Add pod's named ports from its ipset.
klog.Infof("Adding named port ipsets")
containerPorts := common.GetContainerPortList(podObj)
if err = c.manageNamedPortIpsets(containerPorts, podKey, npmPodObj.PodIP, addNamedPort); err != nil {
return fmt.Errorf("[syncAddedPod] Error: failed to add pod to named port ipset with err: %w", err)
Expand Down Expand Up @@ -378,6 +396,7 @@ func (c *PodController) syncAddAndUpdatePod(newPodObj *corev1.Pod) (metrics.Oper

podKey, _ := cache.MetaNamespaceKeyFunc(newPodObj)
cachedNpmPod, exists := c.podMap[podKey]
klog.Infof("[syncAddAndUpdatePod] updating Pod with key %s", podKey)
// No cached npmPod exists. start adding the pod in a cache
if !exists {
if err = c.syncAddedPod(newPodObj); err != nil {
Expand All @@ -396,10 +415,15 @@ func (c *PodController) syncAddAndUpdatePod(newPodObj *corev1.Pod) (metrics.Oper
// NPM should clean up existing references of cached pod obj and its IP.
// then, re-add new pod obj.
if cachedNpmPod.PodIP != newPodObj.Status.PodIP {
klog.Infof("Pod (Namespace:%s, Name:%s, newUid:%s), has cachedPodIp:%s which is different from PodIp:%s",
newPodObj.Namespace, newPodObj.Name, string(newPodObj.UID), cachedNpmPod.PodIP, newPodObj.Status.PodIP)

klog.Infof("Deleting cached Pod with key:%s first due to IP Mistmatch", podKey)
if err = c.cleanUpDeletedPod(podKey); err != nil {
return metrics.UpdateOp, err
}

klog.Infof("Adding back Pod with key:%s after IP Mistmatch", podKey)
if err = c.syncAddedPod(newPodObj); err != nil {
return metrics.UpdateOp, err
}
Expand All @@ -414,6 +438,7 @@ func (c *PodController) syncAddAndUpdatePod(newPodObj *corev1.Pod) (metrics.Oper

// Delete the pod from its label's ipset.
for _, podIPSetName := range deleteFromIPSets {
klog.Infof("Deleting pod %s from ipset %s", cachedNpmPod.PodIP, podIPSetName)
if err = c.ipsMgr.DeleteFromSet(podIPSetName, cachedNpmPod.PodIP, podKey); err != nil {
return metrics.UpdateOp, fmt.Errorf("[syncAddAndUpdatePod] Error: failed to delete pod from label ipset with err: %w", err)
}
Expand All @@ -427,6 +452,7 @@ func (c *PodController) syncAddAndUpdatePod(newPodObj *corev1.Pod) (metrics.Oper

// Add the pod to its label's ipset.
for _, addIPSetName := range addToIPSets {
klog.Infof("Adding pod %s to ipset %s", newPodObj.Status.PodIP, addIPSetName)
if err = c.ipsMgr.AddToSet(addIPSetName, newPodObj.Status.PodIP, util.IpsetNetHashFlag, podKey); err != nil {
return metrics.UpdateOp, fmt.Errorf("[syncAddAndUpdatePod] Error: failed to add pod to label ipset with err: %w", err)
}
Expand Down Expand Up @@ -469,6 +495,7 @@ func (c *PodController) syncAddAndUpdatePod(newPodObj *corev1.Pod) (metrics.Oper

// cleanUpDeletedPod cleans up all ipset associated with this pod
func (c *PodController) cleanUpDeletedPod(cachedNpmPodKey string) error {
klog.Infof("[cleanUpDeletedPod] deleting Pod with key %s", cachedNpmPodKey)
// If cached npmPod does not exist, return nil
cachedNpmPod, exist := c.podMap[cachedNpmPodKey]
if !exist {
Expand All @@ -484,11 +511,13 @@ func (c *PodController) cleanUpDeletedPod(cachedNpmPodKey string) error {

// Get lists of podLabelKey and podLabelKey + podLavelValue ,and then start deleting them from ipsets
for labelKey, labelVal := range cachedNpmPod.Labels {
klog.Infof("Deleting pod %s from ipset %s", cachedNpmPod.PodIP, labelKey)
if err = c.ipsMgr.DeleteFromSet(labelKey, cachedNpmPod.PodIP, cachedNpmPodKey); err != nil {
return fmt.Errorf("[cleanUpDeletedPod] Error: failed to delete pod from label ipset with err: %w", err)
}

podIPSetName := util.GetIpSetFromLabelKV(labelKey, labelVal)
klog.Infof("Deleting pod %s from ipset %s", cachedNpmPod.PodIP, podIPSetName)
if err = c.ipsMgr.DeleteFromSet(podIPSetName, cachedNpmPod.PodIP, cachedNpmPodKey); err != nil {
return fmt.Errorf("[cleanUpDeletedPod] Error: failed to delete pod from label ipset with err: %w", err)
}
Expand All @@ -509,6 +538,7 @@ func (c *PodController) cleanUpDeletedPod(cachedNpmPodKey string) error {
func (c *PodController) manageNamedPortIpsets(portList []corev1.ContainerPort, podKey string,
podIP string, namedPortOperation NamedPortOperation) error {
for _, port := range portList {
klog.Infof("port is %+v", port)
if port.Name == "" {
continue
}
Expand Down
2 changes: 2 additions & 0 deletions npm/pkg/controlplane/controllers/v1/podController_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ func addPod(t *testing.T, f *podFixture, podObj *corev1.Pod) {

func deletePod(t *testing.T, f *podFixture, podObj *corev1.Pod, isDeletedFinalStateUnknownObject IsDeletedFinalStateUnknownObject) {
addPod(t, f, podObj)
t.Logf("Complete add pod event")

// simulate pod delete event and delete pod object from sharedInformer cache
f.kubeInformer.Core().V1().Pods().Informer().GetIndexer().Delete(podObj)
Expand All @@ -162,6 +163,7 @@ func deletePod(t *testing.T, f *podFixture, podObj *corev1.Pod, isDeletedFinalSt
// Need to make more cases - interestings..
func updatePod(t *testing.T, f *podFixture, oldPodObj *corev1.Pod, newPodObj *corev1.Pod) {
addPod(t, f, oldPodObj)
t.Logf("Complete add pod event")

// simulate pod update event and update the pod to shared informer's cache
f.kubeInformer.Core().V1().Pods().Informer().GetIndexer().Update(newPodObj)
Expand Down
Loading

0 comments on commit 36eb2d6

Please sign in to comment.