diff --git a/CHANGELOG.md b/CHANGELOG.md index 46fa035..cbbf2f0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,10 +1,17 @@ # Changelog - * Updated and rewrote Notification 2.0 listener implementation. Added additional parameters for more control: - `consumer_name`, `shared`, `auto_ack` and `auto_unsubscribe`. Added `unsubscribe` function for removing - subscribers on demand. Both `AsyncListener` and `Listener` now provide consistent `start`/`stop` functions - which take care of coroutine and thread creation. The `listen ` function can still be invoked directly if - necessary. +* Added client-side filtering to many of the standard API (wherever sensibly applicable); These APIs `select` + and `get_all` functions now feature optional `include` and `exclude` parameters which can be used to filter + the results before being wrapped into Python objects; Added multiple matchers including a JSONPath matcher + and a JMESPath matcher. +* Added `QueueListener` and `AsyncQueueListener` classes to the Notification 2.0 toolkit. These pre-defined + listener implementation append new notifications to standard queues that can be monitored/listened to which + makes Notification 2.0 solutions even simpler to implement. +* Updated and rewrote Notification 2.0 listener implementation. Added additional parameters for more control: + `consumer_name`, `shared`, `auto_ack` and `auto_unsubscribe`. Added `unsubscribe` function for removing + subscribers on demand. Both `AsyncListener` and `Listener` now provide consistent `start`/`stop` functions + which take care of coroutine and thread creation. The `listen ` function can still be invoked directly if + necessary. ## Version 3.3.0 diff --git a/buster37.dockerfile b/buster37.dockerfile index d2eb0f8..f43cd5f 100644 --- a/buster37.dockerfile +++ b/buster37.dockerfile @@ -1,6 +1,10 @@ FROM python:3.7-slim-buster -RUN apt-get update && apt-get -y install git +RUN sed -i 's|deb.debian.org/debian|archive.debian.org/debian|g' /etc/apt/sources.list \ + && sed -i 's|security.debian.org/debian-security|archive.debian.org/debian-security|g' /etc/apt/sources.list \ + && apt-get update \ + && apt-get -y install git \ + && rm -rf /var/lib/apt/lists/* COPY requirements.txt / RUN pip install --upgrade pip && pip install -r requirements.txt diff --git a/c8y_api/model/_base.py b/c8y_api/model/_base.py index c959517..fed55fb 100644 --- a/c8y_api/model/_base.py +++ b/c8y_api/model/_base.py @@ -10,7 +10,12 @@ from deprecated import deprecated from c8y_api._base_api import CumulocityRestApi +from c8y_api.model.matcher import JsonMatcher from c8y_api.model._util import _DateUtil, _StringUtil, _QueryUtil +try: + from c8y_api.model.matcher import JmesPathMatcher +except ImportError: + pass def get_by_path(dictionary: dict, path: str, default: Any = None) -> Any: @@ -633,6 +638,8 @@ def __init__(self, c8y: CumulocityRestApi, resource: str): # the default object name would be the resource path element just before # the last event for e.g. /event/events self.object_name = self.resource.split('/')[-1] + # the default JSON matcher for client-side filtering + self.default_matcher = JmesPathMatcher def build_object_path(self, object_id: int | str) -> str: """Build the path to a specific object of this resource. @@ -777,16 +784,33 @@ def _get_count(self, base_query: str) -> int: result_json = self.c8y.get(base_query + '&pageSize=1&withTotalPages=true') return result_json['statistics']['totalPages'] - def _iterate(self, base_query: str, page_number: int | None, limit: int | None, parse_fun): + def _iterate( + self, + base_query: str, + page_number: int | None, + limit: int | None, + include: str | JsonMatcher | None, + exclude: str | JsonMatcher | None, + parse_fun + ): # if no specific page is defined we just start at 1 current_page = page_number if page_number else 1 # we will read page after page until # - we reached the limit, or # - there is no result (i.e. we were at the last page) num_results = 0 + # compile/prepare filter if defined + if isinstance(include, str): + include = self.default_matcher(include) + if isinstance(exclude, str): + exclude = self.default_matcher(exclude) + while True: - results = [parse_fun(x) for x in self._get_page(base_query, current_page)] - # no results, so we are done + results = [ + parse_fun(x) for x in self._get_page(base_query, current_page) + if (not include or include.safe_matches(x)) + and (not exclude or not exclude.safe_matches(x)) + ] if not results: break for result in results: diff --git a/c8y_api/model/_util.py b/c8y_api/model/_util.py index 4d14622..df804f6 100644 --- a/c8y_api/model/_util.py +++ b/c8y_api/model/_util.py @@ -5,7 +5,6 @@ from typing import Union from dateutil import parser -from re import sub class _StringUtil(object): @@ -30,6 +29,26 @@ def to_pascal_case(name: str): return name return parts[0] + "".join([x.title() for x in parts[1:]]) + @staticmethod + def like(expression: str, string: str): + """Check if like-expression matches a string. + + Only supports * at beginning and end. + """ + return ( + expression[1:-1] in string if expression.startswith('*') and expression.endswith('*') + else string.startswith(expression[:-1]) if expression.endswith('*') + else string.endswith(expression[1:]) if expression.startswith('*') + else expression == string + ) + + @staticmethod + def matches(expression: str, string: str): + """Check if regex expression matches a string.""" + try: + return re.search(expression, string) is not None + except re.error: + return False class _QueryUtil(object): @@ -39,7 +58,7 @@ def encode_odata_query_value(value): http://docs.oasis-open.org/odata/odata/v4.01/odata-v4.01-part2-url-conventions.html#sec_URLParsing http://docs.oasis-open.org/odata/odata/v4.01/cs01/abnf/odata-abnf-construction-rules.txt """ # single quotes escaped through single quote - return sub('\'', '\'\'', value) + return re.sub('\'', '\'\'', value) @staticmethod def encode_odata_text_value(value): @@ -47,7 +66,7 @@ def encode_odata_text_value(value): http://docs.oasis-open.org/odata/odata/v4.01/odata-v4.01-part2-url-conventions.html#sec_URLParsing http://docs.oasis-open.org/odata/odata/v4.01/cs01/abnf/odata-abnf-construction-rules.txt """ # single quotes escaped through single quote - encoded_quotes = sub('\'', '\'\'', value) + encoded_quotes = re.sub('\'', '\'\'', value) return encoded_quotes if " " not in encoded_quotes else f"'{encoded_quotes}'" diff --git a/c8y_api/model/administration.py b/c8y_api/model/administration.py index 0a186df..82b7d25 100644 --- a/c8y_api/model/administration.py +++ b/c8y_api/model/administration.py @@ -11,6 +11,7 @@ from c8y_api.model._base import CumulocityResource, SimpleObject from c8y_api.model._parser import SimpleObjectParser, ComplexObjectParser, as_values as parse_as_values from c8y_api.model._util import _DateUtil +from c8y_api.model.matcher import JsonMatcher class PermissionUtil: @@ -261,9 +262,9 @@ def from_json(cls, json: dict) -> GlobalRole: role: GlobalRole = cls._from_json(json, GlobalRole()) # role ID are int for some reason - convert for consistency role.id = str(role.id) - if json['roles'] and json['roles']['references']: + if 'roles' in json and json['roles'] and json['roles']['references']: role.permission_ids = {ref['role']['id'] for ref in json['roles']['references']} - if json['applications']: + if 'applications' in json and json['applications']: role.application_ids = {ref['id'] for ref in json['applications']} return role @@ -939,7 +940,14 @@ def get(self, role_id: str | int) -> InventoryRole: role.c8y = self.c8y # inject c8y connection into instance return role - def select(self, limit: int = None, page_size: int = 1000, page_number: int = None) -> Generator[InventoryRole]: + def select( + self, + limit: int = None, + include: str | JsonMatcher = None, + exclude: str | JsonMatcher = None, + page_size: int = 1000, + page_number: int = None, + ) -> Generator[InventoryRole]: """Get all defined inventory roles. This function is implemented in a lazy fashion - results will only be @@ -949,6 +957,12 @@ def select(self, limit: int = None, page_size: int = 1000, page_number: int = No Args: limit (int): Limit the number of results to this number. + include (str | JsonMatcher): Matcher/expression to filter the query + results (on client side). The inclusion is applied first. + Creates a JMESPath matcher by default for strings. + exclude (str | JsonMatcher): Matcher/expression to filter the query + results (on client side). The exclusion is applied second. + Creates a JMESPath matcher by default for strings. page_size (int): Define the number of objects read (and parsed in one chunk). This is a performance related setting. page_number (int): Pull a specific page; this effectively disables @@ -958,9 +972,15 @@ def select(self, limit: int = None, page_size: int = 1000, page_number: int = No Generator for InventoryRole objects """ base_query = self._prepare_query(page_size=page_size) - return super()._iterate(base_query, page_number, limit, InventoryRole.from_json) + return super()._iterate(base_query, page_number, limit, include, exclude, InventoryRole.from_json) - def get_all(self, limit: int = None, page_size: int = 1000, page_number: int = None) -> List[InventoryRole]: + def get_all( + self, + limit: int = None, + include: str | JsonMatcher = None, exclude: str | JsonMatcher = None, + page_size: int = 1000, + page_number: int = None, + ) -> List[InventoryRole]: """Get all defined inventory roles. This function is a greedy version of the ``select`` function. All @@ -971,7 +991,12 @@ def get_all(self, limit: int = None, page_size: int = 1000, page_number: int = N Returns: List of InventoryRole objects """ - return list(self.select(limit=limit, page_size=page_size, page_number=page_number)) + return list(self.select( + limit=limit, + include=include, + exclude=exclude, + page_size=page_size, + page_number=page_number)) def select_assignments(self, username: str) -> Generator[InventoryRoleAssignment]: """Get all inventory role assignments of a user. @@ -1065,6 +1090,7 @@ def select( only_devices: bool = None, with_subusers_count: bool = None, limit: int = None, + include: str | JsonMatcher = None, exclude: str | JsonMatcher = None, page_size: int = 5, page_number: int = None, as_values: str | tuple | list[str | tuple] = None, @@ -1088,6 +1114,12 @@ def select( with_subusers_count (bool): Whether to include an additional field `subusersCount` which holds the number of direct sub users. limit (int): Limit the number of results to this number. + include (str | JsonMatcher): Matcher/expression to filter the query + results (on client side). The inclusion is applied first. + Creates a JMESPath matcher by default for strings. + exclude (str | JsonMatcher): Matcher/expression to filter the query + results (on client side). The exclusion is applied second. + Creates a JMESPath matcher by default for strings. page_size (int): Define the number of events which are read (and parsed in one chunk). This is a performance related setting. page_number (int): Pull a specific page; this effectively disables @@ -1131,6 +1163,8 @@ def select( base_query, page_number, limit, + include, + exclude, User.from_json if not as_values else lambda x: parse_as_values(x, as_values)) @@ -1141,6 +1175,7 @@ def get_all( owner: str = None, only_devices: bool = None, with_subusers_count: bool = None, + include: str | JsonMatcher = None, exclude: str | JsonMatcher = None, page_size: int = 1000, as_values: str | tuple | list[str|tuple] = None, **kwargs @@ -1160,6 +1195,8 @@ def get_all( owner=owner, only_devices=only_devices, with_subusers_count=with_subusers_count, + include=include, + exclude=exclude, page_size=page_size, as_values=as_values, **kwargs)) @@ -1286,12 +1323,24 @@ def get(self, role_id: int | str) -> GlobalRole: self._global_roles_by_name = {g.name: g for g in self.get_all()} return self._global_roles_by_name[role_id] - def select(self, username: str = None, page_size: int = 5) -> Generator[GlobalRole]: + def select( + self, + username: str = None, + include: str | JsonMatcher = None, + exclude: str | JsonMatcher = None, + page_size: int = 5, + ) -> Generator[GlobalRole]: """Iterate over global roles. Args: username (str): Retrieve global roles assigned to a specified user If omitted, all available global roles are returned + include (str | JsonMatcher): Matcher/expression to filter the query + results (on client side). The inclusion is applied first. + Creates a JMESPath matcher by default for strings. + exclude (str | JsonMatcher): Matcher/expression to filter the query + results (on client side). The exclusion is applied second. + Creates a JMESPath matcher by default for strings. page_size (int): Maximum number of entries fetched per requests; this is a performance setting @@ -1301,12 +1350,24 @@ def select(self, username: str = None, page_size: int = 5) -> Generator[GlobalRo # unfortunately, as selecting by username can't be implemented using the # generic _iterate method, we have to do everything manually. if username: + # compile/prepare filter if defined + if isinstance(include, str): + include = self.default_matcher(include) + if isinstance(exclude, str): + exclude = self.default_matcher(exclude) # select by username query = f'/user/{self.c8y.tenant_id}/users/{username}/groups?pageSize={page_size}¤tPage=' page_number = 1 while True: response_json = self.c8y.get(query + str(page_number)) - references = response_json['references'] + references = ( + response_json['references'] if not include and not exclude + else [ + x for x in response_json['references'] + if (not include or include.safe_matches(x['group'])) + and (not exclude or not exclude.safe_matches(x['group'])) + ] + ) if not references: break for ref_json in references: @@ -1317,21 +1378,39 @@ def select(self, username: str = None, page_size: int = 5) -> Generator[GlobalRo else: # select all base_query = self._prepare_query(page_size=page_size) - yield from super()._iterate(base_query, page_number=None, limit=None, parse_fun=GlobalRole.from_json) + yield from super()._iterate( + base_query, + page_number=None, + limit=None, + include=include, + exclude=exclude, + parse_fun=GlobalRole.from_json) - def get_all(self, username: str = None, page_size: int = 1000) -> List[GlobalRole]: + def get_all( + self, + username: str = None, + include: str | JsonMatcher = None, + exclude: str | JsonMatcher = None, + page_size: int = 1000 + ) -> List[GlobalRole]: """Retrieve global roles. Args: username (str): Retrieve global roles assigned to a specified user If omitted, all available global roles are returned + include (str | JsonMatcher): Matcher/expression to filter the query + results (on client side). The inclusion is applied first. + Creates a JMESPath matcher by default for strings. + exclude (str | JsonMatcher): Matcher/expression to filter the query + results (on client side). The exclusion is applied second. + Creates a JMESPath matcher by default for strings. page_size (int): Maximum number of entries fetched per requests; this is a performance setting Return: List of GlobalRole instances """ - return list(self.select(username=username, page_size=page_size)) + return list(self.select(username=username, include=include, exclude=exclude, page_size=page_size)) def assign_users(self, role_id: int | str, *usernames: str): """Add users to a global role. diff --git a/c8y_api/model/alarms.py b/c8y_api/model/alarms.py index 764c40e..6e571f2 100644 --- a/c8y_api/model/alarms.py +++ b/c8y_api/model/alarms.py @@ -6,6 +6,7 @@ from typing import List, Generator from c8y_api._base_api import CumulocityRestApi +from c8y_api.model.matcher import JsonMatcher from c8y_api.model._base import CumulocityResource, SimpleObject, ComplexObject from c8y_api.model._parser import as_values as parse_as_values, ComplexObjectParser from c8y_api.model._util import _DateUtil @@ -252,8 +253,9 @@ def select(self, updated_before: str | datetime = None, updated_after: str | datetime = None, last_updated_from: str | datetime = None, last_updated_to: str | datetime = None, min_age: timedelta = None, max_age: timedelta = None, - reverse: bool = False, limit: int = None, with_source_assets: bool = None, with_source_devices: bool = None, + reverse: bool = False, limit: int = None, + include: str | JsonMatcher = None, exclude: str | JsonMatcher = None, page_size: int = 1000, page_number: int = None, as_values: str | tuple | list[str | tuple] = None, **kwargs) -> Generator[Alarm]: @@ -303,6 +305,12 @@ def select(self, reverse (bool): Invert the order of results, starting with the most recent one limit (int): Limit the number of results to this number. + include (str | JsonMatcher): Matcher/expression to filter the query + results (on client side). The inclusion is applied first. + Creates a JMESPath matcher by default for strings. + exclude (str | JsonMatcher): Matcher/expression to filter the query + results (on client side). The exclusion is applied second. + Creates a JMESPath matcher by default for strings. page_size (int): Define the number of alarms which are read (and parsed in one chunk). This is a performance related setting. page_number (int): Pull a specific page; this effectively disables @@ -327,12 +335,16 @@ def select(self, last_updated_from=last_updated_from, last_updated_to=last_updated_to, with_source_devices=with_source_devices, with_source_assets=with_source_assets, min_age=min_age, max_age=max_age, - reverse=reverse, page_size=page_size, + reverse=reverse, + filter=filter, + page_size=page_size, **kwargs) return super()._iterate( base_query, page_number, limit, + include, + exclude, Alarm.from_json if not as_values else lambda x: parse_as_values(x, as_values)) @@ -350,6 +362,7 @@ def get_all( min_age: timedelta = None, max_age: timedelta = None, with_source_assets: bool = None, with_source_devices: bool = None, reverse: bool = False, limit: int = None, + include: str | JsonMatcher = None, exclude: str | JsonMatcher = None, page_size: int = 1000, page_number: int = None, as_values: str | tuple | list[str | tuple] = None, **kwargs) -> List[Alarm]: @@ -375,7 +388,8 @@ def get_all( last_updated_from=last_updated_from, last_updated_to=last_updated_to, min_age=min_age, max_age=max_age, reverse=reverse, with_source_devices=with_source_devices, with_source_assets=with_source_assets, - limit=limit, page_size=page_size, page_number=page_number, + limit=limit, include=include, exclude=exclude, + page_size=page_size, page_number=page_number, as_values=as_values, **kwargs)) diff --git a/c8y_api/model/applications.py b/c8y_api/model/applications.py index 31a425a..0fb0f7b 100644 --- a/c8y_api/model/applications.py +++ b/c8y_api/model/applications.py @@ -319,7 +319,7 @@ def select( page_size=page_size, **kwargs ) - return super()._iterate(base_query, page_number, limit, Application.from_json) + return super()._iterate(base_query, page_number, limit, None, None, Application.from_json) def get_all( self, diff --git a/c8y_api/model/audit.py b/c8y_api/model/audit.py index 95e881e..04add22 100644 --- a/c8y_api/model/audit.py +++ b/c8y_api/model/audit.py @@ -7,6 +7,7 @@ from typing import Generator, List, ClassVar from c8y_api._base_api import CumulocityRestApi +from c8y_api.model.matcher import JsonMatcher from c8y_api.model._base import CumulocityResource, ComplexObject from c8y_api.model._parser import ComplexObjectParser, SimpleObjectParser, as_values as parse_as_values from c8y_api.model._util import _DateUtil @@ -206,6 +207,7 @@ def select( before: str | datetime = None, after: str | datetime = None, min_age: timedelta = None, max_age: timedelta = None, reverse: bool = False, limit: int = None, + include: str | JsonMatcher = None, exclude: str | JsonMatcher = None, page_size: int = 1000, page_number: int = None, as_values: str | tuple | list[str | tuple] = None, **kwargs @@ -236,6 +238,12 @@ def select( reverse (bool): Invert the order of results, starting with the most recent one. limit (int): Limit the number of results to this number. + include (str | JsonMatcher): Matcher/expression to filter the query + results (on client side). The inclusion is applied first. + Creates a JMESPath matcher by default for strings. + exclude (str | JsonMatcher): Matcher/expression to filter the query + results (on client side). The exclusion is applied second. + Creates a JMESPath matcher by default for strings. page_size (int): Define the number of objects which are read (and parsed in one chunk). This is a performance related setting. page_number (int): Pull a specific page; this effectively disables @@ -253,12 +261,15 @@ def select( type=type, source=source, application=application, user=user, before=before, after=after, min_age=min_age, max_age=max_age, - reverse=reverse, page_size=page_size, + reverse=reverse, + page_size=page_size, **kwargs) return super()._iterate( base_query, page_number, limit, + include, + exclude, AuditRecord.from_json if not as_values else lambda x: parse_as_values(x, as_values)) @@ -269,6 +280,7 @@ def get_all( before: str | datetime = None, after: str | datetime = None, min_age: timedelta = None, max_age: timedelta = None, reverse: bool = False, limit: int = None, + include: str | JsonMatcher = None, exclude: str | JsonMatcher = None, page_size: int = 1000, page_number: int = None, as_values: str | tuple | list[str | tuple] = None, **kwargs @@ -288,7 +300,9 @@ def get_all( type=type, source=source, application=application, user=user, before=before, after=after, min_age=min_age, max_age=max_age, - reverse=reverse, limit=limit, page_size=page_size, page_number=page_number, + reverse=reverse, limit=limit, + include=include, exclude=exclude, + page_size=page_size, page_number=page_number, as_values=as_values, **kwargs, )) diff --git a/c8y_api/model/events.py b/c8y_api/model/events.py index 61f64b7..3a780e7 100644 --- a/c8y_api/model/events.py +++ b/c8y_api/model/events.py @@ -9,6 +9,7 @@ from typing import Generator, List, BinaryIO from c8y_api._base_api import CumulocityRestApi +from c8y_api.model.matcher import JsonMatcher from c8y_api.model._base import CumulocityResource, SimpleObject, ComplexObject from c8y_api.model._parser import as_values as parse_as_values, ComplexObjectParser from c8y_api.model._util import _DateUtil @@ -275,9 +276,11 @@ def select(self, updated_before: str | datetime = None, updated_after: str | datetime = None, last_updated_from: str | datetime = None, last_updated_to: str | datetime = None, min_age: timedelta = None, max_age: timedelta = None, - reverse: bool = False, limit: int = None, with_source_assets: bool = None, with_source_devices: bool = None, - page_size: int = 1000, page_number: int = None, + reverse: bool = False, limit: int = None, + include: str | JsonMatcher = None, exclude: str | JsonMatcher = None, + page_size: int = 1000, + page_number: int = None, as_values: str | tuple | list[str | tuple] = None, **kwargs) -> Generator[Event]: """Query the database for events and iterate over the results. @@ -325,6 +328,12 @@ def select(self, with_source_devices (bool): Whether also alarms for related source devices should be included. Requires `source` limit (int): Limit the number of results to this number. + include (str | JsonMatcher): Matcher/expression to filter the query + results (on client side). The inclusion is applied first. + Creates a JMESPath matcher by default for strings. + exclude (str | JsonMatcher): Matcher/expression to filter the query + results (on client side). The exclusion is applied second. + Creates a JMESPath matcher by default for strings. page_size (int): Define the number of events which are read (and parsed in one chunk). This is a performance related setting. page_number (int): Pull a specific page; this effectively disables @@ -367,6 +376,8 @@ def select(self, base_query, page_number, limit, + include, + exclude, Event.from_json if not as_values else lambda x: parse_as_values(x, as_values)) @@ -382,8 +393,9 @@ def get_all( updated_before: str | datetime = None, updated_after: str | datetime = None, last_updated_from: str | datetime = None, last_updated_to: str | datetime = None, min_age: timedelta = None, max_age: timedelta = None, - reverse: bool = False, limit: int = None, with_source_assets: bool = None, with_source_devices: bool = None, + reverse: bool = False, limit: int = None, + include: str | JsonMatcher = None, exclude: str | JsonMatcher = None, page_size: int = 1000, page_number: int = None, as_values: str | tuple | list[str | tuple] = None, **kwargs) -> List[Event]: @@ -408,9 +420,11 @@ def get_all( updated_before=updated_before, updated_after=updated_after, last_updated_from=last_updated_from, last_updated_to=last_updated_to, min_age=min_age, max_age=max_age, - reverse=reverse, with_source_devices=with_source_devices, with_source_assets=with_source_assets, - limit=limit, page_size=page_size, page_number=page_number, + reverse=reverse, + limit=limit, + include=include, exclude=exclude, + page_size=page_size, page_number=page_number, as_values=as_values, **kwargs )) diff --git a/c8y_api/model/inventory.py b/c8y_api/model/inventory.py index b070baf..0665b57 100644 --- a/c8y_api/model/inventory.py +++ b/c8y_api/model/inventory.py @@ -5,6 +5,7 @@ from typing import Any, Generator, List +from c8y_api.model.matcher import JsonMatcher from c8y_api.model._base import CumulocityResource from c8y_api.model._parser import as_values as parse_as_values from c8y_api.model._util import _QueryUtil @@ -88,6 +89,7 @@ def get_all( with_latest_values: bool = None, reverse: bool = None, limit: int = None, + include: str | JsonMatcher = None, exclude: str | JsonMatcher = None, page_size: int = 1000, as_values: str | tuple | list[str|tuple] = None, **kwargs) -> List[ManagedObject]: @@ -121,6 +123,8 @@ def get_all( with_latest_values=with_latest_values, reverse=reverse, limit=limit, + include=include, + exclude=exclude, page_size=page_size, as_values=as_values, **kwargs)) @@ -241,6 +245,7 @@ def select( with_parents: bool = None, with_latest_values: bool = None, limit: int = None, + include: str | JsonMatcher = None, exclude: str | JsonMatcher = None, page_size: int = 1000, page_number: int = None, as_values: str | tuple | list[str | tuple] = None, @@ -291,6 +296,12 @@ def select( fragment `c8y_LatestMeasurements, which contains the latest measurement values reported by the device to the platform. limit (int): Limit the number of results to this number. + include (str | JsonMatcher): Matcher/expression to filter the query + results (on client side). The inclusion is applied first. + Creates a JMESPath matcher by default for strings. + exclude (str | JsonMatcher): Matcher/expression to filter the query + results (on client side). The exclusion is applied second. + Creates a JMESPath matcher by default for strings. page_size (int): Define the number of events which are read (and parsed in one chunk). This is a performance related setting. page_number (int): Pull a specific page; this effectively disables @@ -326,6 +337,8 @@ def select( with_parents=with_parents, with_latest_values=with_latest_values, limit=limit, + include=include, + exclude=exclude, page_size=page_size, page_number=page_number, as_values=as_values, @@ -431,13 +444,24 @@ def filter_none(**xs): return {query_key: query, **kwargs} - def _select(self, parse_fun, device_mode: bool, page_number, limit, as_values, **kwargs) -> Generator[Any]: + def _select( + self, + parse_fun, + device_mode: bool, + page_number, + limit, + include, + exclude, + as_values, + **kwargs) -> Generator[Any]: """Generic select function to be used by derived classes as well.""" base_query = self._prepare_inventory_query(device_mode, **kwargs) return super()._iterate( base_query, page_number, limit, + include, + exclude, parse_fun if not as_values else lambda x: parse_as_values(x, as_values)) @@ -614,6 +638,7 @@ def select( # noqa (order) with_parents: bool = None, with_latest_values: bool = None, limit: int = None, + include: str | JsonMatcher = None, exclude: str | JsonMatcher = None, page_size: int = 100, page_number: int = None, as_values: str | tuple | list[str | tuple] = None, @@ -667,6 +692,12 @@ def select( # noqa (order) fragment `c8y_LatestMeasurements, which contains the latest measurement values reported by the device to the platform. limit (int): Limit the number of results to this number. + include (str | JsonMatcher): Matcher/expression to filter the query + results (on client side). The inclusion is applied first. + Creates a JMESPath matcher by default for strings. + exclude (str | JsonMatcher): Matcher/expression to filter the query + results (on client side). The exclusion is applied second. + Creates a JMESPath matcher by default for strings. page_size (int): Define the number of events which are read (and parsed in one chunk). This is a performance related setting. page_number (int): Pull a specific page; this effectively disables @@ -701,6 +732,8 @@ def select( # noqa (order) with_parents=with_parents, with_latest_values=with_latest_values, limit=limit, + include=include, + exclude=exclude, page_size=page_size, page_number=page_number, as_values=as_values, @@ -727,6 +760,8 @@ def get_all( # noqa (changed signature) with_parents: bool = None, with_latest_values: bool = None, limit: int = None, + include: str | JsonMatcher = None, + exclude: str | JsonMatcher = None, page_size: int = 100, page_number: int = None, as_values: str | tuple | list[str | tuple] = None, @@ -760,6 +795,8 @@ def get_all( # noqa (changed signature) with_parents=with_parents, with_latest_values=with_latest_values, limit=limit, + include=include, + exclude=exclude, page_size=page_size, page_number=page_number, as_values=as_values, @@ -866,6 +903,7 @@ def select( # noqa (changed signature) with_parents: bool = None, with_latest_values: bool = None, limit: int = None, + include: str | JsonMatcher = None, exclude: str | JsonMatcher = None, page_size: int = 100, page_number: int = None, as_values: str | tuple | list[str | tuple] = None, @@ -922,6 +960,12 @@ def select( # noqa (changed signature) fragment `c8y_LatestMeasurements, which contains the latest measurement values reported by the device to the platform. limit (int): Limit the number of results to this number. + include (str | JsonMatcher): Matcher/expression to filter the query + results (on client side). The inclusion is applied first. + Creates a JMESPath matcher by default for strings. + exclude (str | JsonMatcher): Matcher/expression to filter the query + results (on client side). The exclusion is applied second. + Creates a JMESPath matcher by default for strings. page_size (int): Define the number of events which are read (and parsed in one chunk). This is a performance related setting. page_number (int): Pull a specific page; this effectively disables @@ -964,6 +1008,8 @@ def select( # noqa (changed signature) with_parents=with_parents, with_latest_values=with_latest_values, limit=limit, + include=include, + exclude=exclude, page_size=page_size, page_number=page_number, as_values=as_values, @@ -1033,6 +1079,7 @@ def get_all( # noqa (changed signature) with_parents: bool = None, with_latest_values: bool = None, limit: int = None, + include: str | JsonMatcher = None, exclude: str | JsonMatcher = None, page_size: int = 100, page_number: int = None, as_values: str | tuple | list[str | tuple] = None, @@ -1065,6 +1112,8 @@ def get_all( # noqa (changed signature) with_parents=with_parents, with_latest_values=with_latest_values, limit=limit, + include=include, + exclude=exclude, page_size=page_size, page_number=page_number, as_values=as_values, diff --git a/c8y_api/model/managedobjects.py b/c8y_api/model/managedobjects.py index 74ccbd2..47a25c9 100644 --- a/c8y_api/model/managedobjects.py +++ b/c8y_api/model/managedobjects.py @@ -697,7 +697,7 @@ class DeviceGroup(ManagedObject): def __init__(self, c8y=None, root: bool = False, name: str = None, owner: str = None, **kwargs): """ Build a new DeviceGroup object. - A type of a device group will always be either `c8y_DeviceGroup` + The `type` of a device group will always be either `c8y_DeviceGroup` or `c8y_DeviceSubGroup` (depending on it's level). This is handled by the API. diff --git a/c8y_api/model/matcher/__init__.py b/c8y_api/model/matcher/__init__.py new file mode 100644 index 0000000..a69f9e4 --- /dev/null +++ b/c8y_api/model/matcher/__init__.py @@ -0,0 +1,55 @@ +from ._matcher import ( + JsonMatcher, + AllMatcher, + match_all, + AnyMatcher, + match_any, + NotMatcher, + match_not, + FragmentMatcher, + fragment, + FieldMatcher, + field, + DescriptionMatcher, + description, + TextMatcher, + text, + CommandMatcher, + command, +) + +__all__ = [ + 'JsonMatcher', + 'AllMatcher', + 'match_all', + 'AnyMatcher', + 'match_any', + 'NotMatcher', + 'match_not', + 'FragmentMatcher', + 'fragment', + 'FieldMatcher', + 'field', + 'DescriptionMatcher', + 'description', + 'TextMatcher', + 'text', + 'CommandMatcher', + 'command', +] + +try: + import jmespath as _jmespath + from ._jmespath_matcher import JmesPathMatcher, jmespath + __all__.append('JmesPathMatcher') + __all__.append('jmespath') +except ImportError: + pass + +try: + import jsonpath_ng as _jsonpath_ng + from ._jsonpath_matcher import JsonPathMatcher, jsonpath + __all__.append('JsonPathMatcher') + __all__.append('jsonpath') +except ImportError: + pass diff --git a/c8y_api/model/matcher/_jmespath_matcher.py b/c8y_api/model/matcher/_jmespath_matcher.py new file mode 100644 index 0000000..2c7f243 --- /dev/null +++ b/c8y_api/model/matcher/_jmespath_matcher.py @@ -0,0 +1,22 @@ +# Copyright (c) 2025 Cumulocity GmbH + +import jmespath as jmespath_lib + +from c8y_api.model.matcher._matcher import JsonMatcher + + +class JmesPathMatcher(JsonMatcher): + """JsonMatcher implementation for JMESPath.""" + + def __init__(self, expression: str, warn_on_error: bool = True): + super().__init__(expression, warn_on_error) + self.compiled_expression = jmespath_lib.compile(expression) + + def matches(self, json: dict) -> bool: + # pylint: disable=broad-exception-caught + return self.compiled_expression.search(json) + + +def jmespath(expression: str) -> JmesPathMatcher: + """Create a JMESPathMatcher from an expression.""" + return JmesPathMatcher(expression) diff --git a/c8y_api/model/matcher/_jsonpath_matcher.py b/c8y_api/model/matcher/_jsonpath_matcher.py new file mode 100644 index 0000000..10685c6 --- /dev/null +++ b/c8y_api/model/matcher/_jsonpath_matcher.py @@ -0,0 +1,22 @@ +# Copyright (c) 2025 Cumulocity GmbH + +from jsonpath_ng.ext import parse + +from c8y_api.model.matcher._matcher import JsonMatcher + + +class JsonPathMatcher(JsonMatcher): + """JsonMatcher implementation for JSONPath.""" + + def __init__(self, expression: str, warn_on_error: bool = True): + super().__init__(expression, warn_on_error) + self.compiled_expression = parse(expression) + + def matches(self, json: dict) -> bool: + # pylint: disable=broad-exception-caught + return self.compiled_expression.find(json) + + +def jsonpath(expression: str) -> JsonPathMatcher: + """Create a JMESPathMatcher from an expression.""" + return JsonPathMatcher(expression) diff --git a/c8y_api/model/matcher/_matcher.py b/c8y_api/model/matcher/_matcher.py new file mode 100644 index 0000000..3b105f0 --- /dev/null +++ b/c8y_api/model/matcher/_matcher.py @@ -0,0 +1,173 @@ +# Copyright (c) 2025 Cumulocity GmbH + +import logging +from abc import ABC, abstractmethod + +from c8y_api.model._util import _StringUtil + + +class JsonMatcher(ABC): + """Abstract base class for all JSON matchers. + + JSON Matchers are used to filter the results of a database query on + client-side. + + See also c8y_api._base.CumulocityResource._iterate + """ + + def __init__(self, expression: str, warn_on_error: bool = False): + self.expression = expression + self.log = logging.getLogger('c8y_api.model.matcher') + self.warn_on_error = warn_on_error + + def __repr__(self): + return f'<{self.__class__.__name__} "{self.expression}">' + + @abstractmethod + def matches(self, json: dict) -> bool: + """Check if a JSON document matches. + + Args: + json (dict): JSON document. + + Returns: + True if the expression of this matcher matches the JSON document. + False otherwise or if the expression could not be evaluated. + """ + + def safe_matches(self, json: dict) -> bool: + """Check if a JSON document matches and log a warning if an + exception occurs.""" + # pylint: disable=broad-exception-caught + try: + return self.matches(json) + except Exception as error: + self.log.warning(f"Matching \"{self.expression}\" failed with error: {error}") + return False + + +class AllMatcher(JsonMatcher): + """Higher level matcher matching if all the enclosed matchers match.""" + + def __init__(self, *matchers: JsonMatcher): + super().__init__(' AND '.join(str(m) for m in matchers)) + self.matchers = matchers + + def matches(self, json: dict) -> bool: + return all(m.matches(json) for m in self.matchers) + + +def match_all(*matchers: JsonMatcher) -> AllMatcher: + """Create a higher level matcher matching if all the enclosed matchers match.""" + return AllMatcher(*matchers) + + +class AnyMatcher(JsonMatcher): + """Higher level matcher matching if any of the enclosed matcher matches.""" + + def __init__(self, *matchers: JsonMatcher): + super().__init__(' OR '.join(str(m) for m in matchers)) + self.matchers = matchers + + def matches(self, json: dict) -> bool: + return any(m.matches(json) for m in self.matchers) + + +def match_any(*matchers: JsonMatcher) -> AnyMatcher: + """Create a higher level matcher matching if any of the enclosed matcher matches.""" + return AnyMatcher(*matchers) + + +class NotMatcher(JsonMatcher): + """Higher level matcher matching if the enclosed matcher doesn't match.""" + + def __init__(self, matcher: JsonMatcher): + super().__init__(f'NOT {matcher}') + self.matcher = matcher + + def matches(self, json: dict) -> bool: + return not self.matcher.matches(json) + + +def match_not(matcher: JsonMatcher) -> NotMatcher: + """Create a higher level matcher matching if the enclosed matcher doesn't match.""" + return NotMatcher(matcher) + + +class FragmentMatcher(JsonMatcher): + """Matcher matching the existence of a top-level fragment.""" + + def __init__(self, name: str): + super().__init__(name) + + def matches(self, json: dict) -> bool: + return self.expression in json + + +def fragment(name: str) -> FragmentMatcher: + """Create a matcher matching the existence of a top-level fragment.""" + return FragmentMatcher(name) + + +class FieldMatcher(JsonMatcher): + """Generic matcher matching the value of a top-level string field.""" + + class Mode: + """The mode of matching.""" + LIKE = 'LIKE' + REGEX = 'REGEX' + + def __init__(self, name: str, expression: str, mode: str = 'LIKE'): + super().__init__(expression) + self.field_name = name + self.mode = mode + + def matches(self, json: dict) -> bool: + return self.field_name in json and ( + (self.mode == 'REGEX' and _StringUtil.matches(self.expression, json[self.field_name])) or + _StringUtil.like(self.expression, json[self.field_name]) + ) + + +def field(name: str, value: str, mode: str = FieldMatcher.Mode.LIKE) -> FieldMatcher: + """Create a matcher matching the value of a top-level string field.""" + return FieldMatcher(name, value, mode) + + +class DescriptionMatcher(FieldMatcher): + """Matcher matching the top-level `description` field of a document.""" + + def __init__(self, expression: str, mode: str = 'LIKE'): + super().__init__('description', expression, mode) + + +def description(name: str, mode: str = FieldMatcher.Mode.LIKE) -> DescriptionMatcher: + """Create a matcher matching the top-level `description` field of a document.""" + return DescriptionMatcher(name, mode) + + +class TextMatcher(FieldMatcher): + """Matcher matching the top-level `text` field of a document.""" + + def __init__(self, expression: str, mode: str = 'LIKE'): + super().__init__('text', expression, mode) + + +def text(name: str, mode: str = FieldMatcher.Mode.LIKE) -> TextMatcher: + """Create a matcher matching the top-level `text` field of a document.""" + return TextMatcher(name, mode) + + +class CommandMatcher(FieldMatcher): + """Matcher matching the `text` field c8y_Command fragment.""" + + def __init__(self, command_text: str, mode: str ='LIKE'): + super().__init__('text', expression=command_text, mode=mode) + + def matches(self, json: dict) -> bool: + return 'c8y_Command' in json and super().matches(json['c8y_Command']) + + +def command(name: str, mode: str = FieldMatcher.Mode.LIKE) -> CommandMatcher: + """Create a matcher matching the `text` field c8y_Command fragment.""" + return CommandMatcher(name, mode) diff --git a/c8y_api/model/measurements.py b/c8y_api/model/measurements.py index e735365..8c94aae 100644 --- a/c8y_api/model/measurements.py +++ b/c8y_api/model/measurements.py @@ -568,6 +568,8 @@ def select( base_query, page_number, limit, + None, + None, Measurement.from_json if not as_values else lambda x: parse_as_values(x, as_values)) diff --git a/c8y_api/model/notification2.py b/c8y_api/model/notification2.py index b979b52..c9054f3 100644 --- a/c8y_api/model/notification2.py +++ b/c8y_api/model/notification2.py @@ -216,7 +216,7 @@ def select( typeFilter=type_filter, page_size=page_size, **kwargs) - return super()._iterate(base_query, page_number, limit, Subscription.from_json) + return super()._iterate(base_query, page_number, limit, None, None, Subscription.from_json) def get_all( self, diff --git a/c8y_api/model/operations.py b/c8y_api/model/operations.py index d15a5bf..080c233 100644 --- a/c8y_api/model/operations.py +++ b/c8y_api/model/operations.py @@ -6,7 +6,7 @@ from typing import Type, List, Generator from c8y_api._base_api import CumulocityRestApi - +from c8y_api.model.matcher import JsonMatcher from c8y_api.model._base import CumulocityResource, ComplexObject, SimpleObject, _DictWrapper from c8y_api.model._parser import ComplexObjectParser, as_values as parse_as_values from c8y_api.model._util import _DateUtil @@ -161,6 +161,7 @@ def select( min_age: timedelta = None, max_age: timedelta = None, date_from: str | datetime = None, date_to: str | datetime = None, reverse: bool = False, limit: int = None, + include: str | JsonMatcher = None, exclude: str | JsonMatcher = None, page_size: int = 1000, page_number: int = None, as_values: str | tuple | list[str | tuple] = None, **kwargs @@ -198,6 +199,12 @@ def select( reverse (bool): Invert the order of results, starting with the most recent one. limit (int): Limit the number of results to this number. + include (str | JsonMatcher): Matcher/expression to filter the query + results (on client side). The inclusion is applied first. + Creates a JMESPath matcher by default for strings. + exclude (str | JsonMatcher): Matcher/expression to filter the query + results (on client side). The exclusion is applied second. + Creates a JMESPath matcher by default for strings. page_size (int): Define the number of operations which are read (and parsed in one chunk). This is a performance related setting. @@ -225,6 +232,8 @@ def select( base_query, page_number, limit, + include, + exclude, Operation.from_json if not as_values else lambda x: parse_as_values(x, as_values)) @@ -237,6 +246,7 @@ def get_all( min_age: timedelta = None, max_age: timedelta = None, date_from: str | datetime = None, date_to: str | datetime = None, reverse: bool = False, limit: int = None, + include: str | JsonMatcher = None, exclude: str | JsonMatcher = None, page_size: int = 1000, page_number: int = None, as_values: str | tuple | list[str | tuple] = None, **kwargs @@ -259,6 +269,7 @@ def get_all( date_from=date_from, date_to=date_to, reverse=reverse, limit=limit, + include=include, exclude=exclude, page_size=page_size, page_number=page_number, as_values=as_values, @@ -548,7 +559,7 @@ def select(self, limit: int = None, page_size: int = 1000, page_number: int = No Generator[BulkOperation]: Iterable of matching BulkOperation objects """ base_query = self._prepare_query(page_size=page_size) - return super()._iterate(base_query, page_number, limit, BulkOperation.from_json) + return super()._iterate(base_query, page_number, limit, None, None, BulkOperation.from_json) def get_all(self, limit: int = None, page_size: int = 1000, page_number: int = None) -> List[BulkOperation]: """ Query the database for bulk operations and return the results diff --git a/c8y_api/model/tenant_options.py b/c8y_api/model/tenant_options.py index cde17c2..db8ae62 100644 --- a/c8y_api/model/tenant_options.py +++ b/c8y_api/model/tenant_options.py @@ -151,7 +151,7 @@ def select(self, category: str = None, limit: int = None, if not category: # select all base_query = self._prepare_query(page_size=page_size) - yield from super()._iterate(base_query, page_number, limit, TenantOption.from_json) + yield from super()._iterate(base_query, page_number, limit, None, None, TenantOption.from_json) else: # select by category, this is just a single request options_json = self.c8y.get(f'{self.resource}/{category}') diff --git a/c8y_api/model/tenants.py b/c8y_api/model/tenants.py index 3df01a8..b692d95 100644 --- a/c8y_api/model/tenants.py +++ b/c8y_api/model/tenants.py @@ -260,7 +260,7 @@ def select( company=company, page_size=page_size, **kwargs) - return super()._iterate(base_query, page_number, limit, Tenant.from_json) + return super()._iterate(base_query, page_number, limit, None, None, Tenant.from_json) def get_all( self, diff --git a/c8y_tk/app/subscription_listener.py b/c8y_tk/app/subscription_listener.py index 915374c..abbc585 100644 --- a/c8y_tk/app/subscription_listener.py +++ b/c8y_tk/app/subscription_listener.py @@ -6,7 +6,7 @@ import logging import threading import time -from typing import Callable, Union +from typing import Callable, Union, List from c8y_api.app import MultiTenantCumulocityApp @@ -29,7 +29,7 @@ class SubscriptionListener: def __init__( self, app: MultiTenantCumulocityApp, - callback: Callable[[list[str]], None] = None, + callback: Callable[[List[str]], None] = None, max_threads: int = 5, blocking: bool = True, polling_interval: float = 3600, @@ -75,9 +75,9 @@ def _cleanup_future(self, future): def add_callback( self, - callback: Callable[[Union[str ,list[str]]], None], + callback: Callable[[Union[str, List[str]]], None], blocking: bool = True, - when: str = 'any', + when: str = 'any' ) -> "SubscriptionListener": """Add a callback function to be invoked if a tenant subscribes to/unsubscribes from the monitored multi-tenant microservice. @@ -262,7 +262,7 @@ def shutdown(self, timeout: float = None): self._executor and self.get_callbacks()): raise TimeoutError(f"Listener thread did not close within the specified timeout ({timeout}s).") - def get_callbacks(self) -> list[Future]: + def get_callbacks(self) -> List[Future]: """Get currently running callbacks. This function can be used to gain direct access to the currently diff --git a/c8y_tk/notification2/__init__.py b/c8y_tk/notification2/__init__.py index 77e806b..913c841 100644 --- a/c8y_tk/notification2/__init__.py +++ b/c8y_tk/notification2/__init__.py @@ -2,4 +2,4 @@ from c8y_tk.notification2.listener import * -__all__ = ['Listener', 'AsyncListener'] +__all__ = ['Listener', 'AsyncListener', 'AsyncQueueListener', 'QueueListener'] diff --git a/c8y_tk/notification2/listener.py b/c8y_tk/notification2/listener.py index 3d3ca27..9886e96 100644 --- a/c8y_tk/notification2/listener.py +++ b/c8y_tk/notification2/listener.py @@ -8,6 +8,7 @@ import threading import uuid from itertools import count +import queue as sync_queue from typing import Callable, Awaitable import certifi @@ -231,7 +232,7 @@ async def _callback(msg): self._is_connected.clear() if self._connection: with contextlib.suppress(Exception): - await self._connection.close() # TODO: add code and reason + await self._connection.close() self._connection = None self._is_running.clear() @@ -586,7 +587,7 @@ def wait(self, timeout=None) -> bool: timeout (float): Timeout in seconds. Returns: - True if the listener has stopped (before timeout), False otherwise. + Whether the listener has stopped (before timeout). """ self._thread.join(timeout=timeout) return not self._thread.is_alive() @@ -621,3 +622,101 @@ def ack(self, msg_id: str = None, payload: str = None) -> None: """ # assuming that we are already listening ... asyncio.run_coroutine_threadsafe(self._listener.ack(msg_id, payload), self._event_loop) + + +class AsyncQueueListener(object): + """Special listener implementation which pushes notification messages + into a standard (async) queue which can be monitored and read.""" + + def __init__( + self, + c8y: CumulocityApi, + subscription_name: str, + subscriber_name: str = None, + consumer_name: str = None, + shared: bool = False, + auto_unsubscribe: bool = True, + queue: asyncio.Queue = None + ): + self.queue = queue or asyncio.Queue() + self.listener = AsyncListener( + c8y=c8y, + subscription_name=subscription_name, + subscriber_name=subscriber_name, + consumer_name=consumer_name, + shared=shared, + auto_ack=True, + auto_unsubscribe=auto_unsubscribe, + ) + + def start(self): + """Start the listener.""" + async def push_message(msg: AsyncListener.Message): + self.queue.put_nowait(msg) + + self.listener.start(push_message) + + def stop(self): + """Stop the listener.""" + self.listener.stop() + + async def wait(self, timeout=None): + """Wait for the listener task to finish. + + Args: + timeout (int): The number of seconds to wait for the listener + to finish. The listener will be cancelled if the timeout + occurs. + """ + await self.listener.wait(timeout=timeout) + + +class QueueListener(object): + """Special listener implementation which pushes notification messages + into a standard (sync) queue which can be monitored and read.""" + + def __init__( + self, + c8y: CumulocityApi, + subscription_name: str, + subscriber_name: str = None, + consumer_name: str = None, + shared: bool = False, + auto_unsubscribe: bool = True, + queue: sync_queue.Queue = None + ): + self.queue = queue or sync_queue.Queue() + self.listener = Listener( + c8y=c8y, + subscription_name=subscription_name, + subscriber_name=subscriber_name, + consumer_name=consumer_name, + shared=shared, + auto_ack=True, + auto_unsubscribe=auto_unsubscribe, + ) + + def start(self): + """Start the listener.""" + + def push_message(msg: AsyncListener.Message): + self.queue.put(msg) + + self.listener.start(push_message) + + def stop(self): + """Stop the listener.""" + self.listener.stop() + + def wait(self, timeout=None) -> bool: + """Wait for the listener task to finish. + + Args: + timeout (int): The number of seconds to wait for the listener + to finish. The listener will be cancelled if the timeout + occurs. + + Returns: + Whether the listener has stopped (before timeout). + """ + return self.listener.wait(timeout=timeout) diff --git a/integration_tests/conftest.py b/integration_tests/conftest.py index c09fea7..126d06f 100644 --- a/integration_tests/conftest.py +++ b/integration_tests/conftest.py @@ -57,7 +57,7 @@ def execute(fun) -> bool: try: fun() return True - except BaseException as e: + except Exception as e: logger.warning(f"Caught exception ignored due to safe call: {e}") return False @@ -67,7 +67,7 @@ def execute(fun) -> bool: @pytest.fixture(scope='function') def auto_delete(logger): """Register a created Cumulocity object for automatic deletion after test function execution.""" - + # pylint: disable=broad-exception-caught objects = [] def register(obj) -> Any: @@ -81,7 +81,7 @@ def register(obj) -> Any: o.delete() except KeyError: pass - except BaseException as e: + except Exception as e: logger.warning(f"Caught exception ignored due to safe call: {e}") @@ -112,35 +112,6 @@ def live_c8y(test_environment) -> CumulocityApi: return SimpleCumulocityApp() - - - -# -# -# @pytest.fixture(scope='session') -# def register_object(logger): -# """Wrap a created Cumulocity object so that it will automatically be deleted -# after a test regardless of an exception or failure.""" -# -# objects = [] -# -# def register(obj) -> Any: -# objects.append(obj) -# return obj -# -# yield register -# -# for o in objects: -# try: -# # Deletion should through a KeyError if object was already deleted -# o.delete() -# logger.warning(f"Object #{o.id} was not deleted by test.") -# except KeyError: -# pass -# except BaseException as e: -# logger.warning(f"Caught exception ignored due to safe call: {e}") - - @pytest.fixture(scope='function') def safe_create(logger, live_c8y, request): """Wrap a created Cumulocity object so that it will automatically be deleted diff --git a/integration_tests/test_alarms.py b/integration_tests/test_alarms.py index 591d504..14c6923 100644 --- a/integration_tests/test_alarms.py +++ b/integration_tests/test_alarms.py @@ -4,6 +4,8 @@ from __future__ import annotations +import datetime +import random from typing import List import pytest @@ -174,6 +176,18 @@ def test_count(live_c8y: CumulocityApi, sample_alarms: List[Alarm]): assert count == len(sample_alarms) +def test_client_side_filtering(live_c8y: CumulocityApi, sample_alarms: List[Alarm]): + """Verify that client-side filtering works as expected.""" + alarm = random.choice(sample_alarms) + + alarms_1 = live_c8y.alarms.get_all(source=alarm.source) + # we need _some_ start date, otherwise the result set would be too big + created_after = alarm.creation_datetime - datetime.timedelta(hours=1) + # not that the filter uses source.id as this is raw JSON format + alarms_2 = live_c8y.alarms.get_all(created_after=created_after, include=f"source.id == '{alarm.source}'") + assert len(alarms_1) == len(alarms_2) + + def test_filter_by_update_time(live_c8y: CumulocityApi, session_device, sample_alarms: List[Alarm]): """Verify that filtering by lastUpdatedTime works as expected.""" diff --git a/integration_tests/test_audits.py b/integration_tests/test_audits.py index 4286dbc..ef0299b 100644 --- a/integration_tests/test_audits.py +++ b/integration_tests/test_audits.py @@ -23,9 +23,9 @@ def test_CR(live_c8y: CumulocityApi, session_device): # noqa (case) application=f'{name}_app', user=live_c8y.username).create() after = _DateUtil.now() - # -> there should be exactly one audit record with that source + # -> there should be at least 1 audit record with that source records = live_c8y.audit_records.get_all(source=session_device.id) - assert len(records) == 1 + assert len(records) >= 1 assert records[0].id == record.id # -> there should be exactly one audit record with that application/user diff --git a/integration_tests/test_devicegroups.py b/integration_tests/test_devicegroups.py index 6e81343..fdd0825 100644 --- a/integration_tests/test_devicegroups.py +++ b/integration_tests/test_devicegroups.py @@ -66,7 +66,7 @@ def test_CRUD(live_c8y: CumulocityApi, safe_executor): with pytest.raises(KeyError): live_c8y.group_inventory.get(root.id) - except BaseException as e: + except Exception as e: safe_executor(root.delete) safe_executor(child1.delete) safe_executor(child2.delete) @@ -110,7 +110,7 @@ def test_CRUD2(live_c8y, safe_executor): # 4) remove the groups live_c8y.group_inventory.delete_trees(root.id) - except BaseException as e: + except Exception as e: safe_executor(live_c8y.group_inventory.delete(root.id, child1.id, child2.id)) raise e @@ -128,7 +128,6 @@ def test_select(live_c8y: CumulocityApi, safe_executor): root.assign_child_group(child1) try: - # 1) select via name (query) # by default, only root folders are selected ids = [x.id for x in live_c8y.group_inventory.select(name=f'Root-{name}')] @@ -149,9 +148,20 @@ def test_select(live_c8y: CumulocityApi, safe_executor): assert ids == [child1.id] assert live_c8y.group_inventory.get_count(parent=root.id, owner=live_c8y.username) == 1 + # 4) select with client-side filtering + filtered_roots_1 = live_c8y.group_inventory.get_all( + type=DeviceGroup.ROOT_TYPE, + include="starts_with(name, 'Root-')", + as_values='name') + filtered_roots_2 = [ + x for x in live_c8y.group_inventory.get_all(type=DeviceGroup.ROOT_TYPE, as_values='name') + if x.startswith('Root-') + ] + assert set(filtered_roots_1) == set(filtered_roots_2) + root.delete_tree() - except BaseException as ex: + except Exception as ex: safe_executor(root.delete) safe_executor(child1.delete) raise ex diff --git a/integration_tests/test_events.py b/integration_tests/test_events.py index 5967388..979814a 100644 --- a/integration_tests/test_events.py +++ b/integration_tests/test_events.py @@ -7,6 +7,7 @@ import datetime as dt import logging import os +import random import tempfile from typing import List @@ -15,6 +16,7 @@ from c8y_api import CumulocityApi from c8y_api.model import Event, Device from c8y_api.model._util import _DateUtil + from util.testing_util import RandomNameGenerator @@ -159,6 +161,18 @@ def test_filter_by_update_time(live_c8y: CumulocityApi, session_device, sample_e assert last_event_after.updated_datetime == max(after_datetimes) +def test_select(live_c8y, sample_events): + """Verify that selecting events works as expected.""" + + # 1) use client-side filtering + event_1 = random.choice(sample_events) + event_2 = live_c8y.events.get_all(source=event_1.source, include=f"type == '{event_1.type}'")[0] + assert event_1.id == event_2.id + + # 2) use type/source + assert live_c8y.events.get_all(type=event_1.type, source=event_1.source)[0].text == event_1.text + + def test_CRUD_attachments(live_c8y: CumulocityApi, session_device: Device, sample_events: List[Event]): # noqa (case) """Verify that creating, reading, updating and deleting of an event attachment works as expected.""" diff --git a/integration_tests/test_global_roles.py b/integration_tests/test_global_roles.py index 0284825..aca65a9 100644 --- a/integration_tests/test_global_roles.py +++ b/integration_tests/test_global_roles.py @@ -43,7 +43,7 @@ def test_CRUD(live_c8y: CumulocityApi): # noqa (case) assert rolename in str(e) -def test_select(live_c8y: CumulocityApi): +def test_select(live_c8y: CumulocityApi, safe_create): """Verify that selection works as expected.""" # (1) get all defined global roles all_roles = live_c8y.global_roles.get_all() @@ -51,7 +51,7 @@ def test_select(live_c8y: CumulocityApi): # (2) create a user and assign roles username = RandomNameGenerator.random_name(2) email = f'{username}@c8y.com' - user = User(live_c8y, username=username, email=email, enabled=True).create() + user = safe_create(User(live_c8y, username=username, email=email, enabled=True)) selected_roles = random.sample(all_roles, k=5) for role in selected_roles: user.assign_global_role(role.id) @@ -60,6 +60,16 @@ def test_select(live_c8y: CumulocityApi): for role in live_c8y.global_roles.get_all(username=username): assert role.id in [x.id for x in selected_roles] + # (4) select with filter + filtered_1 = live_c8y.global_roles.get_all(include="contains(name, 'Global')") + filtered_2 = [x for x in live_c8y.global_roles.get_all() if 'Global' in x.name] + assert {x.name for x in filtered_1} == {x.name for x in filtered_2} + + # (5) select by user with filter + filtered_1 = live_c8y.global_roles.get_all(username=username, include="contains(name, 'a')") + filtered_2 = [x for x in live_c8y.global_roles.get_all(username=username) if 'a' in x.name] + assert {x.name for x in filtered_1} == {x.name for x in filtered_2} + # Cleanup user.delete() diff --git a/integration_tests/test_inventory.py b/integration_tests/test_inventory.py index fcab77d..fe47c6a 100644 --- a/integration_tests/test_inventory.py +++ b/integration_tests/test_inventory.py @@ -2,6 +2,7 @@ # pylint: disable=redefined-outer-name +import random import time from typing import List @@ -9,6 +10,7 @@ from c8y_api import CumulocityApi from c8y_api.model import Event, ManagedObject, Measurement, Count, Value, Device +from c8y_api.model.matcher import jsonpath from util.testing_util import RandomNameGenerator from tests.utils import get_ids @@ -98,6 +100,35 @@ def test_get_by_query(live_c8y: CumulocityApi, similar_objects: List[ManagedObje assert live_c8y.inventory.get_count(query=query) == len(similar_objects) +def test_filtering(live_c8y: CumulocityApi, safe_create): + """Verify that client side filtering works as expected.""" + objects = [ + safe_create(ManagedObject( + live_c8y, + type='c8y_TestObject', + name=f'c8y_TestObject_{i}', + array=random.choices(range(10), k=5) + )) + for i in range(10) + ] + + # using filter parameter (JSONPath) + filtered_1 = live_c8y.inventory.get_all( + type='c8y_TestObject', + fragment='array', + include=jsonpath('$.array[?(@ == 0)]')) + # using Python means + filtered_2 = list(filter( + lambda mo: 'array' in mo and 0 in mo.array, + live_c8y.inventory.select(type='c8y_TestObject', fragment='array') + )) + # -> no difference + assert {x.name for x in filtered_1} == {x.name for x in filtered_2} + + for o in objects: + o.delete() + + def test_get_single_by_query(live_c8y: CumulocityApi, module_factory): """Verify that the get_by function works as expected.""" basename = RandomNameGenerator.random_name(2) diff --git a/integration_tests/test_inventoryroles.py b/integration_tests/test_inventoryroles.py index 4cf979a..627a546 100644 --- a/integration_tests/test_inventoryroles.py +++ b/integration_tests/test_inventoryroles.py @@ -81,6 +81,11 @@ def test_select_inventory_roles(live_c8y: CumulocityApi): # (1) get all defined inventory roles assert live_c8y.inventory_roles.get_all() + # (2) filter + filtered_1 = live_c8y.inventory_roles.get_all(include='description != null') + filtered_2 = [x for x in live_c8y.inventory_roles.get_all() if x.description] + assert {x.id for x in filtered_1} == {x.id for x in filtered_2} + def test_assignments(live_c8y, session_device, module_factory): """Verify that inventory roles can be assigned, retrieved and unassigned.""" diff --git a/integration_tests/test_measurements.py b/integration_tests/test_measurements.py index b74e58d..dd81254 100644 --- a/integration_tests/test_measurements.py +++ b/integration_tests/test_measurements.py @@ -251,11 +251,11 @@ def test_collect_multiple_series(series_fixture, request): assert all(len(v) == len(series_names) for v in values) # -> Each value within the n-tuple belongs to one series # There will be None values (when a series does not define a value - # at that timestamp). Subsequent values will have the same type + # at that timestamp). All actual values will have the same type. assert any(any(e is None for e in v) for v in values) for i in range(0, len(series_names)): - t = type(values[0][i]) - assert all(isinstance(v[i], t) for v in values if v[i]) + actual_values = [v[i] for v in values if v[i] is not None] + assert all(isinstance(v, type(actual_values[0])) for v in actual_values) def test_get_and_collect_series(live_c8y, sample_series_device): diff --git a/integration_tests/test_notification2.py b/integration_tests/test_notification2.py index 3410c54..92880ef 100644 --- a/integration_tests/test_notification2.py +++ b/integration_tests/test_notification2.py @@ -13,9 +13,9 @@ import pytest from c8y_api import CumulocityApi -from c8y_api.model import Device, ManagedObject, Subscription, Measurement, Value, Event, Alarm -from c8y_tk.notification2 import AsyncListener, Listener -from tests.utils import assert_in_any, assert_no_failures, assert_all_in +from c8y_api.model import Device, ManagedObject, Subscription, Measurement, Value, Event, Alarm, Operation +from c8y_tk.notification2 import AsyncListener, Listener, AsyncQueueListener, QueueListener +from tests.utils import assert_in_any, assert_no_failures from util.testing_util import RandomNameGenerator @@ -86,9 +86,8 @@ def build(): return build -# TODO: Add Operation @pytest.mark.parametrize("api_filters, expected", [ - ('*', 'M,E,EwC,A,AwC,MO'), + ('*', 'M,E,EwC,A,AwC,MO,O'), ('M', 'M'), ('E', 'E'), ('EwC', 'E,EwC'), @@ -126,6 +125,7 @@ def test_api_filters(live_c8y: CumulocityApi, sample_object, api_filters, expect mo = sample_object mo['c8y_IsDevice'] = {} + mo['com_cumulocity_model_Agent'] = {} mo.update() sub = Subscription( live_c8y, @@ -153,7 +153,7 @@ def receive_notification(m:Listener.Message): e_id = Event(live_c8y, source=mo.id, type="c8y_TestEvent", time='now', text='text').create().id a_id = Alarm(live_c8y, source=mo.id, type="c8y_TestAlarm", time='now', text='text', severity=Alarm.Severity.WARNING).create().id - # o_id = Operation(live_c8y, device_id=mo.id, c8y_Operation={}).create().id + o_id = Operation(live_c8y, device_id=mo.id, c8y_Operation={}).create().id mo.apply({'some_tag': {}}) time.sleep(1) @@ -169,6 +169,8 @@ def receive_notification(m:Listener.Message): assert e_id in ids if 'alarms' in expected: assert a_id in ids + if 'operations' in expected: + assert o_id in ids finally: # (99) cleanup listener.stop() @@ -666,3 +668,51 @@ async def receive_notification(m:AsyncListener.Message): == (n_listeners-1 if shared else 0)) assert sum("cancelled" in x for x in log_messages) == n_listeners assert_no_failures(caplog) + + +@pytest.mark.asyncio(loop_scope='function') +async def test_asyncio_queue_listener(live_c8y: CumulocityApi, sample_object): + """Verify that the queue listener works as expected.""" + mo = sample_object + sub = create_managed_object_subscription(live_c8y, mo) + + q = asyncio.Queue() + listener = AsyncQueueListener( + c8y=live_c8y, + subscription_name=sub.name, + queue=q, + ) + + listener.start() + try: + await asyncio.sleep(5) # ensure creation + mo.apply({'test_CustomFragment': {'num': 42}}) + msg = await q.get() + assert msg.json['test_CustomFragment']['num'] == 42 + finally: + listener.stop() + await listener.wait() + + +@pytest.mark.asyncio(loop_scope='function') +async def test_queue_listener(live_c8y: CumulocityApi, sample_object): + """Verify that the queue listener works as expected.""" + mo = sample_object + sub = create_managed_object_subscription(live_c8y, mo) + + q = queue.Queue() + listener = QueueListener( + c8y=live_c8y, + subscription_name=sub.name, + queue=q, + ) + + listener.start() + try: + time.sleep(5) # ensure creation + mo.apply({'test_CustomFragment': {'num': 42}}) + msg = q.get() + assert msg.json['test_CustomFragment']['num'] == 42 + finally: + listener.stop() + listener.wait() diff --git a/integration_tests/test_users.py b/integration_tests/test_users.py index 5e3a22e..ad61449 100644 --- a/integration_tests/test_users.py +++ b/integration_tests/test_users.py @@ -5,7 +5,6 @@ import secrets import string import time -from contextlib import suppress from typing import Union, Tuple import pytest @@ -54,24 +53,31 @@ def test_CRUD(live_c8y: CumulocityApi): # noqa (case) assert user.username in str(e) -def test_select_by_name(live_c8y: CumulocityApi): +def test_select_by_name(live_c8y: CumulocityApi, safe_create): """Verify that user selection by name works as expected.""" prefix = RandomNameGenerator.random_name(1) users = [] - try: - for _ in range(0, 5): - username = f'{prefix}-{RandomNameGenerator.random_name(1)}' - email = f'{username}@c8y.com' + for _ in range(0, 5): + username = f'{prefix}-{RandomNameGenerator.random_name(1)}' + email = f'{username}@c8y.com' - user = User(live_c8y, username=username, email=email, enabled=True).create() - users.append(user) + user = safe_create(User(live_c8y, username=username, email=email, enabled=True)) + users.append(user) - selected = live_c8y.users.get_all(username=prefix) - assert {x.id for x in selected} == {x.id for x in users} - finally: - for u in users: - with suppress(Exception): - u.delete() + selected = live_c8y.users.get_all(username=prefix) + assert {x.id for x in selected} == {x.id for x in users} + + +def test_select(live_c8y: CumulocityApi, safe_create): + """Verify that user selection works as expected.""" + # test getting by group (2 == admin should be on all installations) + admin_users = live_c8y.users.get_all(groups=['2']) + for user in admin_users: + assert '2' in live_c8y.users.get(user.username).global_role_ids + + # test getting with a client-side-filter + admin_users_2 = live_c8y.users.get_all(include="contains(groups.references[].group.id, `2`)") + assert {x.id for x in admin_users} == {x.id for x in admin_users_2} def test_get_current(live_c8y: CumulocityApi): diff --git a/pyproject.toml b/pyproject.toml index 47145b6..f1e3d0d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -33,6 +33,11 @@ dependencies = [ "importlib_metadata; python_version < '3.8'" ] +[project.optional-dependencies] +jmespath = ["jmespath"] +jsonpath = ["jsonpath-ng"] +filters = ["jmespath", "jsonpath-ng"] + [project.urls] Homepage = "https://github.com/Cumulocity-IoT/cumulocity-python-api" Source = "https://github.com/Cumulocity-IoT/cumulocity-python-api" diff --git a/requirements.txt b/requirements.txt index bd92448..f000ded 100644 --- a/requirements.txt +++ b/requirements.txt @@ -23,4 +23,6 @@ hatch-vcs importlib_metadata; python_version < '3.8' tenacity certifi -urllib3 \ No newline at end of file +urllib3 +jsonpath-ng +jmespath \ No newline at end of file diff --git a/samples/client_side_filtering.py b/samples/client_side_filtering.py new file mode 100644 index 0000000..f7d5b59 --- /dev/null +++ b/samples/client_side_filtering.py @@ -0,0 +1,107 @@ +import logging + +from c8y_api.app import SimpleCumulocityApp +from c8y_api.model import Device +from c8y_api.model.matcher import field, match_all, match_not +from util.testing_util import load_dotenv + +logging.basicConfig(level=logging.DEBUG) + +""" +This example demonstrates how to use the client-side-filtering feature of the +Python Cumulocity API. + +Client-side filtering is _not_ part of the standard API, it is an extension +which the Python API provides which _can_ be useful in _some_ use cases. It +is important to understand, that client-side-filtering is somewhat a tool of +last resort - it becomes necessary if server-side filtering is not sufficient. +Python being Python already provides nice standard features for filtering the +query results, e.g. via list comprehension or the `filter` function. However, +sometimes it may appear easier for developers to define such a filter directly +with the `select` or `get_all` function, for example in interactive scenarios. + +Client-side filters are defined for the _raw_ JSON structure. Hence, when you +want to use them you must be aware of Cumulocity's JSON data format. + +Performance: Hi there, optimization kids! To put it bluntly - using this +feature does most likely _not_ increase performance in any way. This is +because the actual JSON parsing will happen in any case and this is the +expensive part. So - all the illustrated methods below do only marginally +differ in speeed. +""" + +load_dotenv() # load environment from a .env if present +c8y = SimpleCumulocityApp() +print("CumulocityApp initialized.") +print(f"{c8y.base_url}, Tenant: {c8y.tenant_id}, User:{c8y.username}") + +# Let's create a couple of devices with arbitrary names +d1 = Device(c8y=c8y, type="c8y_TestDevice", name="Some Test Device").create() +d2 = Device(c8y=c8y, type="c8y_TestDevice", name="Device #2").create() +d3 = Device(c8y=c8y, type="c8y_TestDevice", name="Another Device").create() +d4 = Device(c8y=c8y, type="c8y_TestDevice", name="Machine thingy").create() + +print("All devices of type 'c8y_TestDevice':") +for d in c8y.device_inventory.select(type='c8y_TestDevice'): + print(f" - {d.name}") + +# Option 1: filtering devices by name using standard Python filters +# The following select statement will simply list "all" devices (there are +# no DB filters) and subsequently filter the results using a standard Python +# list comprehension: +filtered_devices = [x for x in c8y.device_inventory.select(type='c8y_TestDevice') if 'Device' in x.name] +# -> We will only have devices which are named "Device something" +print("Option #1 result (needs to contain 'Device' string)") +for d in filtered_devices: + print(f" - {d.name}") + +# Option 2: using the client-side filtering with JMESPath filters +# The following statement will simply list "all" devices (there are no DB +# filters) and subsequently filter the results using a JMESPath expression. +# The JMESPath is matched against the unprocessed JSON, hence it is required +# to understand Cumulocity's native JSON formats (but they are close to how +# the Python API resembles it: +filtered_devices_2 = c8y.device_inventory.get_all(type='c8y_TestDevice', include="contains(name, 'Device')") +# -> We will only have devices which are named "Device something" +print("Option #2 result (same thing):") +for d in filtered_devices_2: + print(f" - {d.name}") + +# Option 3: the client-side filtering with custom filters +# The following statement will simply list "all" devices (there are no DB +# filters) and subsequently filter the results using Python matchers. There +# is quite a list of predefined matchers (see c8y_api.model.matchers) and +# custom ones are easy to define. +filtered_devices_3 = c8y.device_inventory.get_all( + type='c8y_TestDevice', + include=field('name', '*Device*'), + exclude=field('name', '*#*')) +# -> We will only have devices which are named "Device something" but +# without '#' anywhere in the name +print("Option #3 result (Same, but no #)") +for d in filtered_devices_3: + print(f" - {d.name}") + +# Option 4: the client-side filtering with nested filters +# The following statement applies the same as above, but using ridiculously +# nested Python matchers to get the same logic. + +filtered_devices_4 = c8y.device_inventory.get_all( + type='c8y_TestDevice', + include=match_all( + field('name', '*Device*'), + match_not( + field('name', '*#*') + ) + )) +# -> Same result +print("Option #4 result (All the same)") +for d in filtered_devices_4: + print(f" - {d.name}") + + +# cleanup +d1.delete() +d2.delete() +d3.delete() +d4.delete() \ No newline at end of file diff --git a/samples/notification2_synchronous.py b/samples/notification2_synchronous.py index 15ef74d..27d6b67 100644 --- a/samples/notification2_synchronous.py +++ b/samples/notification2_synchronous.py @@ -2,7 +2,6 @@ # pylint: disable=missing-function-docstring -import threading import time from c8y_api.app import SimpleCumulocityApp diff --git a/tasks.py b/tasks.py index be2ad20..9bebd59 100644 --- a/tasks.py +++ b/tasks.py @@ -48,7 +48,7 @@ def init(c): def lint(c, scope='all'): """Run PyLint.""" if scope == 'all': - scope = 'c8y_api/ c8y_tk tests integration_tests samples' + scope = 'c8y_api c8y_tk tests integration_tests samples' c.run(f'pylint --rcfile pylintrc --fail-under=9 {scope}') diff --git a/tests/model/test_audits.py b/tests/model/test_audits.py index d75fdcf..d158d74 100644 --- a/tests/model/test_audits.py +++ b/tests/model/test_audits.py @@ -56,6 +56,41 @@ def test_select(fun, params, expected, not_expected): assert ne not in resource +def test_client_side_filtering(): + """Verify that client side filtering works as expected. + + This test prepares a mocked CumulocityApi and runs the get_all function + against it. The REST GET is mocked as well as corresponding matcher + results. The test verifies that the matcher is invoked and applied. + """ + get_data = [ + {'auditRecords': x, 'statistics': {'totalPages': 1}} for x in + [ + [{'id': 1}, {'id': 2}, {'id': 3}], + [] + ] + ] + include_results = [True, False, True] + exclude_results = [True, False] + + # prepare mocked CumulocityApi with mock get response and matcher results + c8y = CumulocityApi(base_url='some.host.com', tenant_id='t123', username='user', password='pass') + c8y.get = Mock(side_effect=get_data) + include_matcher = Mock(safe_matches=Mock(side_effect=include_results)) + exclude_matcher = Mock(safe_matches=Mock(side_effect=exclude_results)) + + # run get_all/select + result = c8y.audit_records.get_all(include=include_matcher, exclude=exclude_matcher) + + # -> result should only contain filtered documents + # 1,2,3 -> 1,3 -> 3 + assert [3] == [x.id for x in result] + # -> include matcher should have been called for each document + assert include_matcher.safe_matches.call_count == len(include_results) + # -> exclude matcher should have been called for each included + assert exclude_matcher.safe_matches.call_count == len(exclude_results) + + @pytest.mark.parametrize('fun', [ AuditRecords.get_all, ]) diff --git a/tests/model/test_base.py b/tests/model/test_base.py index 0c72415..4ffa8e0 100644 --- a/tests/model/test_base.py +++ b/tests/model/test_base.py @@ -582,7 +582,13 @@ def parse_fun(item): res._get_page = Mock(side_effect=get_page) # iterate oder results - result = list(res._iterate(base_query="q", page_number=None, limit=limit, parse_fun=parse_fun)) + result = list(res._iterate( + base_query="q", + page_number=None, + limit=limit, + include=None, + exclude=None, + parse_fun=parse_fun)) result_ids = [x.id for x in result] # check expectation diff --git a/tests/model/test_filtering.py b/tests/model/test_filtering.py new file mode 100644 index 0000000..322abf2 --- /dev/null +++ b/tests/model/test_filtering.py @@ -0,0 +1,61 @@ +# Copyright (c) 2025 Cumulocity GmbH + +from unittest.mock import Mock + +import pytest + +from c8y_api import CumulocityApi +from c8y_api.model import AuditRecords, Events, Alarms, Operations, Inventory, DeviceGroupInventory, DeviceInventory, \ + Users, GlobalRoles + + +@pytest.mark.parametrize('resource_class', [ + Events, + Alarms, + Operations, + AuditRecords, + Inventory, + DeviceInventory, + DeviceGroupInventory, + Users, + GlobalRoles, # additional test in test_global_role.py +]) +def test_client_side_filtering(resource_class): + """Verify that client side filtering works as expected. + + This test prepares a mocked CumulocityApi and runs the get_all function + against it. The REST GET is mocked as well as corresponding matcher + results. The test verifies that the matcher is invoked and applied. + """ + # create mock CumulocityApi instance + c8y = CumulocityApi(base_url='some.host.com', tenant_id='t123', username='user', password='pass') + resource = resource_class(c8y=c8y) + + # prepare mock data and corresponding matchers + # the get function is invoked until there are no results (empty list), the results + # are stored in an array by object name + get_data = [ + {resource.object_name: x, 'statistics': {'totalPages': 1}} for x in + [ + # need to prepare data for all kind of formats ... + [{'id': 1, 'source':{'id': 1}}, {'id': 2, 'source':{'id': 2}}, {'id': 3, 'source':{'id': 3}}], + [] + ] + ] + include_results = [True, False, True] + exclude_results = [True, False] + + c8y.get = Mock(side_effect=get_data) + include_matcher = Mock(safe_matches=Mock(side_effect=include_results)) + exclude_matcher = Mock(safe_matches=Mock(side_effect=exclude_results)) + + # run get_all/select + result = resource.get_all(include=include_matcher, exclude=exclude_matcher) + + # -> result should only contain filtered documents + # 1,2,3 -> 1,3 -> 3 + assert ['3'] == [str(x.id) for x in result] + # -> include matcher should have been called for each document + assert include_matcher.safe_matches.call_count == len(include_results) + # -> exclude matcher should have been called for each included + assert exclude_matcher.safe_matches.call_count == len(exclude_results) diff --git a/tests/model/test_global_role.py b/tests/model/test_global_role.py index b878d4c..6fa51e9 100644 --- a/tests/model/test_global_role.py +++ b/tests/model/test_global_role.py @@ -6,9 +6,11 @@ import json import os +from unittest.mock import Mock import pytest +from c8y_api import CumulocityApi from c8y_api.model import GlobalRole @@ -67,3 +69,45 @@ def test_updating(sample_role: GlobalRole): expected_updates = {'name', 'description'} assert len(sample_role.get_updates()) == len(expected_updates) assert set(sample_role.to_diff_json().keys()) == expected_updates + + +def test_client_side_filtering(): + """Verify that client side filtering works as expected when selecting by username. + + The GlobalRoles API has a special case for selecting groups/global roles by username + which is covered by this test. + + See also `test_filtering.py` for generic client side filtering tests. + """ + + # create mock CumulocityApi instance + c8y = CumulocityApi(base_url='some.host.com', tenant_id='t123', username='user', password='pass') + + # prepare mock data and corresponding matchers + # the get function is invoked until there are no results (empty list), the results + # are stored in an array by object name + get_data = [ + {'references': x, 'statistics': {'totalPages': 1}} for x in + [ + # need to prepare data for all kind of formats ... + [{'group': {'id': str(x)}} for x in [1, 2, 3]], + [] + ] + ] + include_results = [True, False, True] + exclude_results = [True, False] + + c8y.get = Mock(side_effect=get_data) + include_matcher = Mock(safe_matches=Mock(side_effect=include_results)) + exclude_matcher = Mock(safe_matches=Mock(side_effect=exclude_results)) + + # run get_all/select + result = c8y.global_roles.get_all(username='username', include=include_matcher, exclude=exclude_matcher) + + # -> result should only contain filtered documents + # 1,2,3 -> 1,3 -> 3 + assert ['3'] == [str(x.id) for x in result] + # -> include matcher should have been called for each document + assert include_matcher.safe_matches.call_count == len(include_results) + # -> exclude matcher should have been called for each included + assert exclude_matcher.safe_matches.call_count == len(exclude_results) diff --git a/tests/model/test_matchers.py b/tests/model/test_matchers.py new file mode 100644 index 0000000..dc05e7d --- /dev/null +++ b/tests/model/test_matchers.py @@ -0,0 +1,188 @@ +import logging +import re +from unittest.mock import patch, Mock + +import pytest + +from c8y_api.model.matcher import * + + +class MatchingMatcher(JsonMatcher): + """Always matching matcher for test purposes.""" + def matches(self, _): + return True + + +class NotMatchingMatcher(JsonMatcher): + """Never matching matcher for test purposes.""" + def matches(self, _): + return False + + +MATCH = MatchingMatcher('MATCH') +DONT_MATCH = NotMatchingMatcher('DONT_MATCH') + + +def test_logging(caplog): + """Verify that exceptions during 'safe' matching are propagated + as expected.""" + + class FailingMatcher(JsonMatcher): + """Always failing matcher for test purposes.""" + def matches(self, _): + raise ValueError('expected') + + # 1) single matcher raises exception + with pytest.raises(ValueError): + FailingMatcher('FAIL').matches({}) + assert not caplog.records + + # 2) safe_matches returns False and warns + with caplog.at_level(logging.WARNING): + FailingMatcher('FAIL').safe_matches({}) + assert len(caplog.records) == 1 + r0 = caplog.records[0] + assert r0.name == 'c8y_api.model.matcher' + assert r0.levelname == 'WARNING' + assert 'FAIL' in r0.message + + # 3) nested matcher propagates + caplog.clear() + with caplog.at_level(logging.WARNING): + match_all(MATCH, MATCH, FailingMatcher('FAIL')).safe_matches({}) + assert len(caplog.records) == 1 + assert re.search(r'MATCH.*AND.*MATCH.*AND.*FAIL', caplog.messages[0]) + + +def test_fragment_matcher(): + """Verify that the fragment matchers work as expected.""" + assert fragment('fragment').matches({'fragment': {}}) + assert not fragment('fragment').matches({'other': {}}) + + +@patch('c8y_api.model.matcher._matcher._StringUtil.like') +@patch('c8y_api.model.matcher._matcher._StringUtil.matches') +def test_field_matcher(matches_mock, like_mock): + """Verify that description matchers work as expected. + + The field matcher can work in LIKE and REGEX mode, only one of the + respective string util functions are expected to be invoked per + matching attempt. If the field is not present in the JSON, no + matching attempt is expected. + """ + + valid = {'field': 'text'} + not_valid = {'other': 'text'} + + # field present, only like matcher is invoked + like_mock.return_value = True + assert field('field', 'expr').matches(valid) + like_mock.assert_called_once_with('expr', 'text') + + # field present, only like matcher is invoked although not matching + like_mock.reset_mock() + like_mock.return_value = False + assert not field('field', 'expr').matches(valid) + like_mock.assert_called_once_with('expr', 'text') + matches_mock.assert_not_called() + + # field not present, no matcher invoked + like_mock.reset_mock() + assert not field('field', 'expr').matches(not_valid) + like_mock.assert_not_called() + matches_mock.assert_not_called() + + # regex mode, like matcher not invoked + assert field('field', 'expr', mode='REGEX').matches(valid) + like_mock.assert_not_called() + matches_mock.assert_called_once_with('expr', 'text') + + +def test_all_matcher(): + """Verify that ALL matchers work as expected. + + All the enclosed matchers are invoked until one fails. + """ + match1 = Mock(matches=Mock(return_value=True)) + match2 = Mock(matches=Mock(return_value=True)) + dont_match = Mock(matches=Mock(return_value=False)) + match3 = Mock(matches=Mock(return_value=True)) + + assert not match_all(match1, match2, dont_match, match3).matches({}) + match1.matches.assert_called_once_with({}) + match2.matches.assert_called_once_with({}) + dont_match.matches.assert_called_once_with({}) + match3.matches.assert_not_called() + + +def test_any_matcher(): + """Verify that ALL matchers work as expected. + + All the enclosed matchers are invoked until one matches. + """ + match1 = Mock(matches=Mock(return_value=True)) + match2 = Mock(matches=Mock(return_value=True)) + dont_match = Mock(matches=Mock(return_value=False)) + + assert match_any(dont_match, match1, match2).matches({}) + dont_match.matches.assert_called_once_with({}) + match1.matches.assert_called_once_with({}) + match2.matches.assert_not_called() + + +def test_not_matcher(): + """Verify that NOT matcher work as expected.""" + assert not match_not(MATCH).matches({}) + assert match_not(DONT_MATCH).matches({}) + + +def test_description_matcher(): + """Verify that the description matchers are initialized correctly.""" + matcher = description('MATCH') + assert isinstance(matcher, FieldMatcher) + assert matcher.field_name == 'description' + assert matcher.expression == 'MATCH' + + +def test_text_matcher(): + """Verify that the text matchers are initialized correctly.""" + matcher = text('MATCH') + assert isinstance(matcher, FieldMatcher) + assert matcher.field_name == 'text' + assert matcher.expression == 'MATCH' + + +def test_command_matcher(): + """Verify that the text matchers work as expected. + + The command matcher is a regular field matcher, but the matched `text` + field is nested within a `c8y_Command` fragment. + """ + matcher = command('MATCH') + with patch.object(FieldMatcher, 'matches') as matches_mock: + matches_mock.return_value = True + assert matcher.matches({'c8y_Command': 'random'}) + matches_mock.assert_called_once_with('random') + + matches_mock.reset_mock() + assert not matcher.matches({'c8y_Other': 'random'}) + matches_mock.assert_not_called() + + +def test_jmespath_matcher(): + """Verify that the jmespath matchers work as expected.""" + assert jmespath("name == 'NAME'").matches({'name': 'NAME'}) + assert not jmespath("name == 'NAME'").matches({'name': 'RANDOM'}) + with pytest.raises(Exception) as error: + jmespath("*INVALID*").matches({}) + assert "INVALID" in str(error.value) + +# $[?(@.firstName == "John")] + +def test_jsonpath_matcher(): + """Verify that the jsonpath matchers work as expected.""" + assert jsonpath('$.array[?(@ == 0)]').matches({'array': [0, 1, 2]}) + assert not jsonpath('$.array[?(@ == 0)]').matches({}) + with pytest.raises(Exception) as error: + jsonpath("*INVALID*").matches({}) + assert "INVALID" in str(error.value) diff --git a/tests/test_util.py b/tests/test_util.py index 7e2f4e5..b0ec09d 100644 --- a/tests/test_util.py +++ b/tests/test_util.py @@ -29,6 +29,60 @@ def test_snake_to_pascal_case(name, expected): assert _StringUtil.to_pascal_case(name) == expected +@pytest.mark.parametrize( + 'expression, string, expected', + [ + ('abc', 'abc', True), + ('abc', 'abcd', False), + ('*abc', 'abc', True), + ('*abc', 'xabc', True), + ('*abc', 'xabx', False), + ('abc*', 'abc', True), + ('abc*', 'abcx', True), + ('abc*', 'abx', False), + ('*abc*', 'abc', True), + ('*abc*', 'xabcy', True), + ('*abc*', 'xaby', False), + ], + ids=[ + 'exact match', + 'no exact match', + 'ends with', + 'ends with #2', + 'no ends with', + 'starts with', + 'starts with #2', + 'no starts with', + 'contains', + 'contains #2', + 'no contains', + ] +) +def test_like(expression, string, expected): + """Verify that the `like` function works as expected.""" + assert _StringUtil.like(expression, string) == expected + + +@pytest.mark.parametrize( + 'expression, string, expected', + [ + ('abc', 'abc', True), + ('abc', 'xabcy', True), + (r'^abc$', 'xabcy', False), + (r'abc.*', 'abcx', True), + ], + ids=[ + 'exact match', + 'contains', + 'no full match', + 'regex match', + ] +) +def test_matches(expression, string, expected): + """Verify that the `matches` function works as expected.""" + assert _StringUtil.matches(expression, string) == expected + + @patch.dict(os.environ, {'C8Y_SOME': 'some', 'C8Y_THING': 'thing', 'C8YNOT': 'not'}, clear=True) def test_c8y_keys(): """Verify that the C8Y_* keys can be filtered from environment."""