Skip to content

Commit

Permalink
Merge pull request #1007 from xfilo/main
Browse files Browse the repository at this point in the history
OMDSDNOPENAPI - Fixed example in README.md, improved logging and code logic
  • Loading branch information
jokob-sk authored Feb 24, 2025
2 parents 9be9728 + 03bf4f4 commit 3a312fd
Show file tree
Hide file tree
Showing 3 changed files with 83 additions and 92 deletions.
2 changes: 1 addition & 1 deletion front/plugins/omada_sdn_openapi/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@

- Settings:

- ![settings_example](/front/plugins/omada_sdn_openapi_import/omada_sdn_openapi_import_settings.png)
- ![settings_example](/front/plugins/omada_sdn_openapi/omada_sdn_openapi_settings.png)

### ℹ️ Other info

Expand Down
173 changes: 82 additions & 91 deletions front/plugins/omada_sdn_openapi/script.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

__author__ = "xfilo"
__version__ = 0.1 # Initial version
__version__ = 0.2 # Rephrased error messages, improved logging and code logic

import os
import sys
Expand Down Expand Up @@ -59,7 +60,7 @@
class OmadaHelper:
@staticmethod
def log(message: str, level: Literal["minimal", "verbose", "debug", "trace"] = "minimal") -> None:
mylog(level, [f"[{pluginName}] {message}"])
mylog(level, [f"[{pluginName}] [{level[:1].upper()}] {message}"])

@staticmethod
def debug(message: str) -> None:
Expand All @@ -75,8 +76,7 @@ def minimal(message: str) -> None:

@staticmethod
def response(response_type: str, response_message: str, response_result: Any = None) -> Dict[str, Any]:
return {"response_type": response_type, "response_message": response_message,
"response_result": response_result}
return {"response_type": response_type, "response_message": response_message, "response_result": response_result}

@staticmethod
def timestamp_to_datetime(ms: int, timezone: str) -> Dict[str, Any]:
Expand Down Expand Up @@ -127,7 +127,7 @@ def normalize_data(input_data: list, input_type: str, site_name: str, timezone:
if not isinstance(input_data, list):
raise Exception(f"Expected a list, but got '{type(input_data)}'.")

OmadaHelper.verbose(f"Starting normalization of {len(input_data)} {input_type}(s) from site {site_name}")
OmadaHelper.verbose(f"Starting normalization of {len(input_data)} {input_type}(s) from site: {site_name}")
default_entry = {
"mac_address": "",
"ip_address": "",
Expand Down Expand Up @@ -158,45 +158,35 @@ def normalize_data(input_data: list, input_type: str, site_name: str, timezone:
entry["name"] = data.get("name", "")

last_seen = OmadaHelper.timestamp_to_datetime(data.get("lastSeen", 0), timezone)
entry["last_seen"] = last_seen.get("response_result") if isinstance(last_seen, dict) and last_seen.get(
"response_type") == "success" else ""
entry["last_seen"] = last_seen.get("response_result") if isinstance(last_seen, dict) and last_seen.get("response_type") == "success" else ""

if input_type == "device":
entry["device_type"] = data.get("type")
if data.get("type", "") != "gateway":
parent_mac = OmadaHelper.normalize_mac(data.get("uplinkDeviceMac"))
parent_mac = parent_mac.get("response_result") if isinstance(parent_mac,
dict) and parent_mac.get(
"response_type") == "success" else ""
entry["parent_node_mac_address"] = parent_mac
entry["parent_node_mac_address"] = parent_mac.get("response_result") if isinstance(parent_mac, dict) and parent_mac.get("response_type") == "success" else ""

if input_type == "client":
entry["vlan_id"] = data.get("vid")
entry["device_type"] = data.get("deviceType")
if data.get("connectDevType", "") == "gateway":
parent_mac = OmadaHelper.normalize_mac(data.get("gatewayMac"))
entry["parent_node_mac_address"] = parent_mac.get("response_result") if isinstance(parent_mac,
dict) and parent_mac.get(
"response_type") == "success" else ""
entry["parent_node_mac_address"] = parent_mac.get("response_result") if isinstance(parent_mac, dict) and parent_mac.get("response_type") == "success" else ""
entry["parent_node_port"] = data.get("port", "")
elif data.get("connectDevType", "") == "switch":
parent_mac = OmadaHelper.normalize_mac(data.get("switchMac"))
entry["parent_node_mac_address"] = parent_mac.get("response_result") if isinstance(parent_mac,
dict) and parent_mac.get(
"response_type") == "success" else ""
entry["parent_node_mac_address"] = parent_mac.get("response_result") if isinstance(parent_mac, dict) and parent_mac.get("response_type") == "success" else ""
entry["parent_node_port"] = data.get("port", "")
elif data.get("connectDevType", "") == "ap":
parent_mac = OmadaHelper.normalize_mac(data.get("apMac"))
entry["parent_node_mac_address"] = parent_mac.get("response_result") if isinstance(parent_mac,
dict) and parent_mac.get(
"response_type") == "success" else ""
entry["parent_node_mac_address"] = parent_mac.get("response_result") if isinstance(parent_mac, dict) and parent_mac.get("response_type") == "success" else ""
entry["parent_node_ssid"] = data.get("ssid", "")

result.append(entry)
OmadaHelper.debug(f"Processed {input_type} entry: {entry}")

msg = f"Successfully normalized {len(result)} {input_type}(s) from site: {site_name}"
OmadaHelper.verbose(msg)
OmadaHelper.minimal(msg)
final_result = OmadaHelper.response("success", msg, result)
except Exception as ex:
msg = f"Failed normalizing {input_type}(s) from site '{site_name}' - error: {str(ex)}"
Expand Down Expand Up @@ -255,7 +245,7 @@ def _get_headers(self, include_auth: bool = True) -> dict:

def _make_request(self, method: str, endpoint: str, **kwargs: Any) -> Dict[str, Any]:
time.sleep(1) # Sleep before making any request so it does not rate-limited
OmadaHelper.verbose(f"{method} request to endpoint: {endpoint}")
OmadaHelper.debug(f"{method} request to endpoint: {endpoint}")
url = f"{getattr(self, 'host')}{endpoint}"
headers = self._get_headers(kwargs.pop('include_auth', True))
try:
Expand All @@ -264,37 +254,42 @@ def _make_request(self, method: str, endpoint: str, **kwargs: Any) -> Dict[str,
data = response.json()
response_type = "error" if data.get("errorCode", 0) != 0 else "success"
msg = f"{method} request completed: {endpoint}"
OmadaHelper.minimal(msg)
OmadaHelper.verbose(msg)
return OmadaHelper.response(response_type, msg, data)
except requests.exceptions.RequestException as ex:
msg = f"{method} request failed: {str(ex)}"
OmadaHelper.minimal(f"{method} request to {endpoint} failed")
OmadaHelper.verbose(msg)
return OmadaHelper.response("error", msg)
OmadaHelper.minimal(f"{method} request failed: {url}")
OmadaHelper.verbose(f"{method} request error: {str(ex)}")
return OmadaHelper.response("error", f"{method} request failed to endpoint '{endpoint}' with error: {str(ex)}")

def authenticate(self) -> Dict[str, any]:
"""Make an endpoint request to get access token."""
OmadaHelper.verbose("Starting authentication process")

# Endpoint request
endpoint = "/openapi/authorize/token?grant_type=client_credentials"
payload = {
"omadacId": getattr(self, 'omada_id'),
"client_id": getattr(self, 'client_id'),
"client_secret": getattr(self, 'client_secret')
}

response = self._make_request("POST", endpoint, json=payload, include_auth=False)
if response["response_type"] == "success":
token_data = response["response_result"]
if token_data.get("errorCode") == 0:
self.access_token = token_data["result"]["accessToken"]
self.refresh_token = token_data["result"]["refreshToken"]
OmadaHelper.minimal("Authentication successful")
return OmadaHelper.response("success", "Authenticated successfully")

OmadaHelper.minimal("Authentication failed")

# Successful endpoint response
if response.get("response_type") == "success":
response_result = response.get("response_result")
error_code = response_result.get("errorCode")
access_token = response_result.get("result").get("accessToken")
refresh_token = response_result.get("result").get("refreshToken")
if error_code == 0 and access_token and refresh_token:
self.access_token = access_token
self.refresh_token = refresh_token
msg = "Successfully authenticated"
OmadaHelper.minimal(msg)
return OmadaHelper.response("success", msg)

# Failed authentication
OmadaHelper.debug(f"Authentication response: {response}")
return OmadaHelper.response("error",
f"Authentication failed - error: {response.get('response_result').get('msg')}")
return OmadaHelper.response("error", f"Authentication failed - error: {response.get('response_message', 'Not provided')}")

def get_clients(self, site_id: str) -> Dict[str, Any]:
"""Make an endpoint request to get all online clients on a site."""
Expand All @@ -308,52 +303,48 @@ def get_devices(self, site_id: str) -> Dict[str, Any]:
endpoint = f"/openapi/v1/{getattr(self, 'omada_id')}/sites/{site_id}/devices?page=1&pageSize={getattr(self, 'page_size')}"
return self._make_request("GET", endpoint)

def get_sites(self) -> Dict[str, Any]:
"""Make an endpoint request to populate all accesible sites."""
OmadaHelper.verbose("Retrieving all accessible sites")
endpoint = f"/openapi/v1/{getattr(self, 'omada_id')}/sites?page=1&pageSize={getattr(self, 'page_size')}"
return self._make_request("GET", endpoint)

def populate_sites(self) -> Dict[str, Any]:
"""Make an endpoint request to populate sites."""
try:
OmadaHelper.verbose("Starting site population process")

# All allowed sites for credentials
all_sites = self.get_sites()["response_result"].get("result").get("data", [])
OmadaHelper.debug(f"Retrieved {len(all_sites)} sites in total")

# All available sites
self.available_sites_dict = {site["siteId"]: site["name"] for site in all_sites}
OmadaHelper.debug(f"Available sites: {self.available_sites_dict}")

# All valid sites from input
active_sites_by_id = {site["siteId"]: site["name"] for site in all_sites if
site["siteId"] in self.requested_sites()}
active_sites_by_name = {site["siteId"]: site["name"] for site in all_sites if
site["name"] in self.requested_sites()}
self.active_sites_dict = active_sites_by_id | active_sites_by_name
OmadaHelper.debug(f"Active sites after filtering: {self.active_sites_dict}")

# If none of the input sites is valid/accessible, default to the first available site
if not self.active_sites_dict:
OmadaHelper.verbose(
"No valid site requested by configuration options, defaulting to first available site")
first_available_site = next(iter(self.available_sites_dict.items()), (None, None))
if first_available_site[0]: # Check if there's an available site
self.active_sites_dict = {first_available_site[0]: first_available_site[1]}
OmadaHelper.debug(f"Using first available site: {first_available_site}")

msg = f"Populated {len(self.active_sites_dict)} active sites"
OmadaHelper.verbose(msg)
result = OmadaHelper.response("success", msg)
except Exception as ex:
OmadaHelper.minimal("Failed to populate sites")
msg = f"Site population error: {str(ex)}"
OmadaHelper.verbose(msg)
result = OmadaHelper.response("error", msg)
"""Make an endpoint request to populate all accessible sites."""
OmadaHelper.verbose("Starting site population process")

return result
# Endpoint request
endpoint = f"/openapi/v1/{getattr(self, 'omada_id')}/sites?page=1&pageSize={getattr(self, 'page_size')}"
response = self._make_request("GET", endpoint)

# Successful endpoint response
if response.get("response_type") == "success":
response_result = response.get("response_result")
if response_result.get("errorCode") == 0:
# All allowed sites for credentials
all_sites = response_result.get("result", "").get("data", [])
OmadaHelper.debug(f"Retrieved {len(all_sites)} sites in total")

# All available sites
self.available_sites_dict = {site["siteId"]: site["name"] for site in all_sites}
OmadaHelper.debug(f"Available sites: {self.available_sites_dict}")

# All valid sites from input
active_sites_by_id = {site["siteId"]: site["name"] for site in all_sites if site["siteId"] in self.requested_sites()}
active_sites_by_name = {site["siteId"]: site["name"] for site in all_sites if site["name"] in self.requested_sites()}
self.active_sites_dict = active_sites_by_id | active_sites_by_name
OmadaHelper.debug(f"Active sites after filtering: {self.active_sites_dict}")

# If none of the input sites is valid/accessible, default to the first available site
if not self.active_sites_dict:
OmadaHelper.verbose("No valid site requested by configuration options, defaulting to first available site")
first_available_site = next(iter(self.available_sites_dict.items()), (None, None))
if first_available_site[0]: # Check if there's an available site
self.active_sites_dict = {first_available_site[0]: first_available_site[1]}
OmadaHelper.debug(f"Using first available site: {first_available_site}")

# Successful site population
msg = f"Successfully populated {len(self.active_sites_dict)} site(s)"
OmadaHelper.minimal(msg)
return OmadaHelper.response("success", msg)

# Failed site population
OmadaHelper.debug(f"Site population response: {response}")
return OmadaHelper.response("error", f"Site population failed - error: {response.get('response_message', 'Not provided')}")

def requested_sites(self) -> list:
"""Returns sites requested by user."""
Expand All @@ -370,8 +361,7 @@ def active_sites(self) -> dict:

def make_entries(plugin_objects: Plugin_Objects, normalized_input_data: dict) -> None:
if normalized_input_data.get("response_type", "error") != "success":
OmadaHelper.minimal(
f"Unable to make entries - error: {normalized_input_data.get('response_message', 'Not provided.')}")
OmadaHelper.minimal(f"Unable to make entries - error: {normalized_input_data.get('response_message', 'Not provided')}")
return

response_result = normalized_input_data.get("response_result", {})
Expand Down Expand Up @@ -407,19 +397,18 @@ def make_entries(plugin_objects: Plugin_Objects, normalized_input_data: dict) ->


def get_entries(plugin_objects: Plugin_Objects) -> Plugin_Objects:
OmadaHelper.minimal("Starting data collection process")
omada_api = OmadaAPI(OPTIONS)

auth_result = omada_api.authenticate()
if auth_result["response_type"] == "error":
OmadaHelper.minimal("Authentication failed, aborting data collection")
OmadaHelper.debug(f"Authentication error - {auth_result['response_message']}")
OmadaHelper.debug(f"{auth_result['response_message']}")
return plugin_objects

sites_result = omada_api.populate_sites()
if sites_result["response_type"] == "error":
OmadaHelper.minimal("Site population failed, aborting data collection")
OmadaHelper.debug(f"Site population error - {auth_result['response_message']}")
OmadaHelper.debug(f"{sites_result['response_message']}")
return plugin_objects

requested_sites = omada_api.requested_sites()
Expand All @@ -428,13 +417,15 @@ def get_entries(plugin_objects: Plugin_Objects) -> Plugin_Objects:

OmadaHelper.verbose(f"Requested sites: {requested_sites}")
OmadaHelper.verbose(f"Available sites: {available_sites}")
OmadaHelper.minimal(f"Active sites: {active_sites}")
OmadaHelper.verbose(f"Active sites: {active_sites}")

OmadaHelper.minimal("Starting data collection process")

for site_id, site_name in active_sites.items():
OmadaHelper.verbose(f"Processing site: {site_name} ({site_id})")

devices_response = omada_api.get_devices(site_id)
if devices_response["response_type"] == "error":
if devices_response["response_type"] != "success":
OmadaHelper.minimal(f"Failed to retrieve devices for site: {site_name}")
else:
devices = devices_response["response_result"].get("result").get("data", [])
Expand All @@ -443,7 +434,7 @@ def get_entries(plugin_objects: Plugin_Objects) -> Plugin_Objects:
make_entries(plugin_objects, devices)

clients_response = omada_api.get_clients(site_id)
if clients_response["response_type"] == "error":
if clients_response["response_type"] != "success":
OmadaHelper.minimal(f"Failed to retrieve clients for site {site_name}")
else:
clients = clients_response["response_result"].get("result").get("data", [])
Expand All @@ -453,7 +444,7 @@ def get_entries(plugin_objects: Plugin_Objects) -> Plugin_Objects:

OmadaHelper.verbose(f"Site complete: {site_name} ({site_id})")

OmadaHelper.minimal("Data collection process completed")
OmadaHelper.minimal("Completed data collection process")
return plugin_objects


Expand Down

0 comments on commit 3a312fd

Please sign in to comment.