-
Notifications
You must be signed in to change notification settings - Fork 194
Open
Labels
is:questionIssue is actually a question.Issue is actually a question.
Description
I currently have a netopeer2 to container in Docker, and I have customized a sample model, which is as follows.
module client-list {
namespace "urn:example:client-list";
prefix cl;
revision "2025-07-07" {
description "Network client list monitoring model";
}
typedef lease-state {
type enumeration {
enum active;
enum expired;
enum released;
}
}
grouping dhcp-lease-group {
description "DHCP lease information grouping";
leaf ip-address {
type string;
description "IP address assigned to client";
}
leaf mac-address {
type string;
description "MAC address of client";
}
leaf hostname {
type string;
description "Hostname reported by client";
}
leaf lease-expiry {
type string;
description "Lease expiry time in human-readable format";
}
leaf state {
type lease-state;
description "Current lease state";
}
leaf last-seen {
type string;
description "Last time client was active";
}
leaf manufacturer {
type string;
description "Manufacturer/Vendor of the device based on MAC OUI";
}
}
rpc get-dhcp-leases {
description "Retrieve current DHCP lease information";
input {
leaf dummy {
type empty;
description "Placeholder field to satisfy YANG syntax requirements";
config false;
status deprecated;
}
}
output {
list lease {
key "ip-address";
description "DHCP client lease entry";
uses dhcp-lease-group;
}
}
}
}
Below is a Python script I used to subscribe to this sample model, and the Python script is as follows.
#!/usr/bin/env python3
import sysrepo
import subprocess
import os
import time
from typing import Dict, List, Optional, Any
from datetime import datetime
class DHCPService:
def __init__(self):
print("Initializing DHCPService...")
self.conn = None
self.session = None
self.subscription = None
self.oui_cache = {}
self._load_oui_database()
print("DHCPService initialized successfully")
def _load_oui_database(self):
print("Loading OUI database...")
oui_path = "/usr/share/ieee-data/oui.txt"
if os.path.exists(oui_path):
with open(oui_path, "r", encoding="utf-8", errors="ignore") as f:
for line in f:
if "(hex)" in line or "(base 16)" in line:
parts = line.split()
mac_prefix = parts[0].replace("-", "").lower()[:6]
self.oui_cache[mac_prefix] = " ".join(parts[3:]).strip()
print(f"Loaded {len(self.oui_cache)} OUI entries")
else:
print("Warning: OUI database not found")
def start(self):
try:
print("Starting sysrepo connection...")
self.conn = sysrepo.SysrepoConnection()
self.session = self.conn.start_session("running")
print("Sysrepo session started successfully")
print("Subscribing to RPC call...")
try:
self.subscription = self.session.subscribe_rpc_call(
"/client-list:get-dhcp-leases",
self._handle_get_dhcp_leases,
asyncio_register=False
)
print("RPC subscription completed without errors")
print("NOTE: To verify subscription is working, try calling the RPC from another session")
except Exception as sub_error:
print(f"ERROR: Failed to subscribe to RPC: {str(sub_error)}")
raise
print("Service running, waiting for RPC calls...")
while True:
time.sleep(1)
except Exception as e:
print(f"Error in service startup: {str(e)}")
raise
finally:
self.stop()
def stop(self):
print("Stopping service...")
if self.subscription:
print("Unsubscribing from RPC...")
self.subscription.unsubscribe()
if self.session:
print("Stopping sysrepo session...")
self.session.stop()
if self.conn:
print("Disconnecting from sysrepo...")
self.conn.disconnect()
print("Service stopped")
def _handle_get_dhcp_leases(self, session: str, input_params: Dict, event: str, private_data: Any) -> Optional[Dict]:
print("RPC call received: get-dhcp-leases")
try:
leases = self._get_dhcp_leases()
print(f"Returning {len(leases)} DHCP leases")
return {
"lease": [
{
"ip-address": lease["ip"],
"mac-address": lease["mac"],
"hostname": lease["hostname"],
"lease-expiry": datetime.fromtimestamp(lease["expiry"]).isoformat(),
"state": lease["state"],
"last-seen": lease["last_seen"],
"manufacturer": lease["manufacturer"]
}
for lease in leases
]
}
except Exception as e:
print(f"Error handling RPC call: {str(e)}")
raise
def _get_dhcp_leases(self) -> List[Dict]:
print("Reading DHCP leases...")
with open('/var/lib/misc/dnsmasq.leases', 'r') as f:
leases = []
for line in f:
fields = line.strip().split()
if len(fields) >= 4:
leases.append({
'expiry': int(fields[0]),
'mac': fields[1],
'ip': fields[2],
'hostname': fields[3],
'manufacturer': self._get_manufacturer(fields[1])
})
print("Checking online devices...")
online_ips = self._get_online_devices()
for lease in leases:
lease['state'] = 'active' if lease['ip'] in online_ips else 'expired'
lease['last_seen'] = datetime.now().isoformat()
print(f"Found {len(leases)} leases ({len(online_ips)} online)")
return leases
def _get_manufacturer(self, mac: str) -> str:
clean_mac = mac.lower().replace(":", "").replace("-", "")[:6]
return self.oui_cache.get(clean_mac, "Unknown")
def _get_online_devices(self) -> List[str]:
try:
print("Running arping to refresh ARP table...")
subprocess.run(["arping", "-c", "1", "-I", "br-lan", "-q", "224.0.0.1"],
check=False, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
print("Reading ARP table...")
with open('/proc/net/arp', 'r') as f:
return [line.split()[0] for line in f.readlines()[1:] if line.split()[2] == '0x2']
except Exception as e:
print(f"Error checking online devices: {str(e)}")
return []
if __name__ == "__main__":
print("Starting DHCP RPC service...")
service = DHCPService()
try:
service.start()
except KeyboardInterrupt:
print("\nReceived keyboard interrupt")
service.stop()
except Exception as e:
print(f"Service crashed: {str(e)}")
service.stop()
After I executed this script, I checked the logs of net pair to but found no message indicating successful subscription. Was the subscription actually successful? I want to know how to subscribe correctly.Btw,I cannot use self.subscribe to do subscription ,cause I use odl version netopeer2
Metadata
Metadata
Assignees
Labels
is:questionIssue is actually a question.Issue is actually a question.