diff --git a/tools/tdgpt/taosanalytics/app.py b/tools/tdgpt/taosanalytics/app.py index 9848228c6e3e..9a57375efb05 100755 --- a/tools/tdgpt/taosanalytics/app.py +++ b/tools/tdgpt/taosanalytics/app.py @@ -1,7 +1,6 @@ # encoding:utf-8 # pylint: disable=c0103 """the main route definition for restful service""" -import os import os.path import sys @@ -13,7 +12,7 @@ from taosanalytics.handlers.anomaly import handle_anomaly from taosanalytics.handlers.forecast import handle_forecast from taosanalytics.handlers.correlation import handle_correlation -from taosanalytics.handlers.misc import handle_batch +from taosanalytics.handlers.misc import do_profile_search, handle_batch, handle_pearsonr from taosanalytics.conf import Configure from taosanalytics.log import AppLogger @@ -45,7 +44,6 @@ def _init_app(): app = Flask(__name__) app.config["PROPAGATE_EXCEPTIONS"] = True - @app.route("/") def index(): """ default rsp """ @@ -111,175 +109,28 @@ def handle_batch_req(): return handle_batch(request) -@app.route('/deploy', methods=['POST']) +@app.route('/api/v1/deploy', methods=['POST']) def deploy_model(): """deploy model to production environment, e.g. load model to memory, etc.""" return do_handle_dynamic_model(request) -@app.route('/undeploy', methods=['POST']) +@app.route('/api/v1/undeploy', methods=['POST']) def undeploy_model(): return do_handle_undeploy_model(request) -def handle_pearsonr(request, api_version): - """ - Execute pearsonr correlation logic. - - :param request: Flask request object - :param api_version: API version to determine the specific implementation of Pearson correlation - :return: dict with correlation result or error information - """ - try: - # check for rows limitation to reduce the process time - req_json, payload, options, data_index, _ = do_check_before_exec(request, False) - except ValueError as e: - msg = str(e) - if msg == SINGLE_COLUMN_ERROR_MSG: - msg = 'a second data column is required for pearsonr' - return {"msg": msg, "rows": -1} - except Exception as e: - return {"msg": str(e), "rows": -1} - - if api_version != 'v1': - app_logger.log_inst.error('unsupported API version: %s', api_version) - return {"msg": f"unsupported API version: {api_version}", "rows": -1} - - try: - second_list = get_more_data_list(payload, req_json["schema"]) - if second_list is None: - return {"msg": "a second data column is required for pearsonr", "rows": -1} - - correlation, p_value = pearsonr(payload[data_index], second_list) - if not np.isfinite(correlation): - correlation = 0.0 - p_value = 1.0 - - correlation = float(correlation) - p_value = float(p_value) - - app_logger.log_inst.debug(f"pearsonr correlation: {correlation}, p value: {p_value}") - res = {"option": options, "rows": 1, "correlation_coefficient": correlation, "p_value": p_value} - - return res - except Exception as e: - app_logger.log_inst.error('pearsonr correlation failed, %s', str(e)) - return {"msg": str(e), "rows": -1} - -def do_profile_search(request, api_version): - """ - Execute profile search logic. - - :param request: Flask request object - :param api_version: API version to determine the specific implementation of profile search - :return: dict with profile search result or error information - - Request body example: - Supported normalization values: "min-max", "z-score", "centering", "none". - Supported algo types: "dtw", "cosine". - For "dtw", algo.params may include: - - "radius": search radius for fastdtw - - "min_window": minimum ts window size for profile search - - "max_window": maximum ts window size for profile search - - "window_size_step": step for ts window size between min_window and max_window, only applicable for dtw algo - - "window_sliding_step": step for sliding the ts window when searching - Result selection notes: - - Return the top N similar profiles with "num". - - Or return all profiles with distance below the threshold when using dtw. - - Or return all profiles with similarity above the threshold when using cosine similarity. - - "num" and "threshold" cannot be set at the same time. - - "exclude_contained" is only applicable for dtw and means whether to exclude the worse matched profile in a strict-containment pair, keeping the better one (the match with the smaller distance). For example, if there are two matched profiles with ts window [1, 5] and [2, 4], and one strictly contains the other, the worse match will be excluded if "exclude_contained" is set to true. - - "exclude_source" is applicable for all algorithms and means whether to exclude the matched profile that contains the source profile. For example, if the source profile has ts window [2, 4], the matched profile with ts window [2, 4] will be excluded if "exclude_source" is set to true. - - Threshold-based results are capped at 500 matches. - target_data.ts may be either: - - a unix timestamp list, such as [1, 2, 3, 4, 5, 6] - - a ts window list, such as [[1, 5], [2, 6]] - { - "normalization": "z-score", - "algo": { - "type": "dtw", - "params": { - "radius": 5, - "min_window": 5, - "max_window": 20, - "window_size_step": 2, - "window_sliding_step": 1 - } - }, - "result": { - "num": 3, - "exclude_contained": true, - "exclude_source": true - }, - "source_data": { - "ts": [1000, 2000, 3000, 4000, 5000], - "data": [1, 2, 3, 4, 5] - }, - - "target_data": { - "ts": [1, 2, 3, 4, 5, 6], - "data": [1, 2, 3, 4, 5, 6] - } - } - Response example: - metric_type is either "dtw_distance" or "cosine_similarity". - Sort rule: - - "dtw_distance": smaller "criteria" means more similar (ascending) - - "cosine_similarity": larger "criteria" means more similar (descending) - In each match, "num" is the number of data points in the matched window. - { - "rows": 3, - "metric_type": "dtw_distance", - "matches": [ - { - "criteria": 0.12, - "ts_window": [1, 5], - "num": 7 - }, - { - "criteria": 0.21, - "ts_window": [2, 6], - "num": 5 - }, - { - "criteria": 0.35, - "ts_window": [3, 7], - "num": 4 - } - ] - } - - """ - if api_version != 'v1': - app_logger.log_inst.error('unsupported API version: %s', api_version) - return {"msg": f"unsupported API version: {api_version}", "rows": -1} - - try: - req_json = do_initial_check(request) - except Exception as e: - return {"msg": str(e), "rows": -1} - - try: - result = do_profile_search_impl(req_json) - app_logger.log_inst.debug("profile-search result: %s", result) - return result - - except Exception as e: - app_logger.log_inst.error('profile search failed, %s', str(e)) - return {"msg": str(e), "rows": -1} - - @app.route('/api/v1/analysis/pearsonr', methods=['POST']) def handle_pearsonr_req(): """handle the pearsonr correlation request """ - app_logger.log_inst.info('recv pearsonr correlation request from %s', request.remote_addr) + AppLogger.info('recv pearsonr correlation request from %s', request.remote_addr) return handle_pearsonr(request, api_version='v1') @app.route('/api/v1/analysis/profile-search', methods=['POST']) def handle_profile_search_req(): """handle the profile search request """ - app_logger.log_inst.info('recv profile search request from %s', request.remote_addr) + AppLogger.info('recv profile search request from %s', request.remote_addr) return do_profile_search(request, api_version='v1') diff --git a/tools/tdgpt/taosanalytics/conf.py b/tools/tdgpt/taosanalytics/conf.py index 1e6afc0f60c2..7e75ac081137 100644 --- a/tools/tdgpt/taosanalytics/conf.py +++ b/tools/tdgpt/taosanalytics/conf.py @@ -19,14 +19,14 @@ except Exception: # pragma: no cover keras = None # noqa: F841 -_ANODE_SECTION_NAME = "taosanode" - class Configure: """ configuration class (singleton) """ _instance = None _lock = __import__('threading').Lock() + path: Optional[str] + _conf: dict def __new__(cls, conf_path: Optional[str] = None): with cls._lock: @@ -42,7 +42,7 @@ def __new__(cls, conf_path: Optional[str] = None): if conf_path is not None: print(f"Input configuration file not available. Use default config file: {instance.path}") - if os.path.exists(instance.path): + if instance.path is not None and os.path.exists(instance.path): instance.reload() else: print( @@ -63,6 +63,7 @@ def get_instance(cls) -> 'Configure': """Return the singleton instance, creating it with defaults if not yet initialized.""" if cls._instance is None: cls() + assert cls._instance is not None return cls._instance # def _get_default_conf_windows(self): @@ -156,8 +157,14 @@ def get_img_dir(self) -> str: def reload(self): """ load the info from config file """ spec = importlib.util.spec_from_file_location("gunicorn_config", self.path) + if spec is None: + raise RuntimeError(f"Cannot load configuration spec from {self.path}") + config_module = importlib.util.module_from_spec(spec) - spec.loader.exec_module(config_module) + if spec.loader is None: + raise RuntimeError(f"Cannot load configuration loader from {self.path}") + + spec.loader.exec_module(config_module) # type: ignore[union-attr] conf_vars = {} for key in dir(config_module): diff --git a/tools/tdgpt/taosanalytics/handlers/misc.py b/tools/tdgpt/taosanalytics/handlers/misc.py index 716b7fc260f8..e4be348fd3cd 100644 --- a/tools/tdgpt/taosanalytics/handlers/misc.py +++ b/tools/tdgpt/taosanalytics/handlers/misc.py @@ -2,10 +2,12 @@ """misc handlers: encapsulates miscellaneous tool business logic""" import numpy as np +from scipy.stats import pearsonr from taosanalytics.algo.tool.batch import do_batch_process, update_config +from taosanalytics.algo.tool.profile_search import do_profile_search_impl from taosanalytics.log import AppLogger -from taosanalytics.util import do_initial_check +from taosanalytics.util import SINGLE_COLUMN_ERROR_MSG, do_initial_check, do_check_before_exec, get_more_data_list def handle_batch(request): @@ -42,3 +44,153 @@ def handle_batch(request): except Exception as e: AppLogger.error('golden batch process failed, %s', str(e)) return {"msg": str(e), "rows": -1} + + +def handle_pearsonr(request, api_version): + """ + Execute pearsonr correlation logic. + + :param request: Flask request object + :param api_version: API version to determine the specific implementation of Pearson correlation + :return: dict with correlation result or error information + """ + try: + # check for rows limitation to reduce the process time + req_json, payload, options, data_index, _ = do_check_before_exec(request, False) + except ValueError as e: + msg = str(e) + if msg == SINGLE_COLUMN_ERROR_MSG: + msg = 'a second data column is required for pearsonr' + return {"msg": msg, "rows": -1} + except Exception as e: + return {"msg": str(e), "rows": -1} + + if api_version != 'v1': + AppLogger.error('unsupported API version: %s', api_version) + return {"msg": f"unsupported API version: {api_version}", "rows": -1} + + try: + second_list = get_more_data_list(payload, req_json["schema"]) + if second_list is None: + return {"msg": "a second data column is required for pearsonr", "rows": -1} + + correlation, p_value = pearsonr(payload[data_index], second_list) + + if not np.isfinite(correlation): + correlation = 0.0 + p_value = 1.0 + else: + correlation = float(correlation) + p_value = float(p_value) + + AppLogger.debug(f"pearsonr correlation: {correlation}, p value: {p_value}") + res = {"option": options, "rows": 1, "correlation_coefficient": correlation, "p_value": p_value} + + return res + except Exception as e: + AppLogger.error('pearsonr correlation failed, %s', str(e)) + return {"msg": str(e), "rows": -1} + + +def do_profile_search(request, api_version): + """ + Execute profile search logic. + + :param request: Flask request object + :param api_version: API version to determine the specific implementation of profile search + :return: dict with profile search result or error information + + Request body example: + Supported normalization values: "min-max", "z-score", "centering", "none". + Supported algo types: "dtw", "cosine". + For "dtw", algo.params may include: + - "radius": search radius for fastdtw + - "min_window": minimum ts window size for profile search + - "max_window": maximum ts window size for profile search + - "window_size_step": step for ts window size between min_window and max_window, only applicable for dtw algo + - "window_sliding_step": step for sliding the ts window when searching + Result selection notes: + - Return the top N similar profiles with "num". + - Or return all profiles with distance below the threshold when using dtw. + - Or return all profiles with similarity above the threshold when using cosine similarity. + - "num" and "threshold" cannot be set at the same time. + - "exclude_contained" is only applicable for dtw and means whether to exclude the worse matched profile in a strict-containment pair, keeping the better one (the match with the smaller distance). For example, if there are two matched profiles with ts window [1, 5] and [2, 4], and one strictly contains the other, the worse match will be excluded if "exclude_contained" is set to true. + - "exclude_source" is applicable for all algorithms and means whether to exclude the matched profile that contains the source profile. For example, if the source profile has ts window [2, 4], the matched profile with ts window [2, 4] will be excluded if "exclude_source" is set to true. + - Threshold-based results are capped at 500 matches. + target_data.ts may be either: + - a unix timestamp list, such as [1, 2, 3, 4, 5, 6] + - a ts window list, such as [[1, 5], [2, 6]] + { + "normalization": "z-score", + "algo": { + "type": "dtw", + "params": { + "radius": 5, + "min_window": 5, + "max_window": 20, + "window_size_step": 2, + "window_sliding_step": 1 + } + }, + "result": { + "num": 3, + "exclude_contained": true, + "exclude_source": true + }, + "source_data": { + "ts": [1000, 2000, 3000, 4000, 5000], + "data": [1, 2, 3, 4, 5] + }, + + "target_data": { + "ts": [1, 2, 3, 4, 5, 6], + "data": [1, 2, 3, 4, 5, 6] + } + } + Response example: + metric_type is either "dtw_distance" or "cosine_similarity". + Sort rule: + - "dtw_distance": smaller "criteria" means more similar (ascending) + - "cosine_similarity": larger "criteria" means more similar (descending) + In each match, "num" is the number of data points in the matched window. + { + "rows": 3, + "metric_type": "dtw_distance", + "matches": [ + { + "criteria": 0.12, + "ts_window": [1, 5], + "num": 5 + }, + { + "criteria": 0.21, + "ts_window": [2, 6], + "num": 5 + }, + { + "criteria": 0.35, + "ts_window": [3, 7], + "num": 4 + } + ] + } + + """ + if api_version != 'v1': + AppLogger.error('unsupported API version: %s', api_version) + return {"msg": f"unsupported API version: {api_version}", "rows": -1} + + try: + req_json = do_initial_check(request) + except Exception as e: + return {"msg": str(e), "rows": -1} + + try: + result = do_profile_search_impl(req_json) + AppLogger.debug("profile-search result: %s", result) + return result + + except Exception as e: + AppLogger.error('profile search failed, %s', str(e)) + return {"msg": str(e), "rows": -1} + diff --git a/tools/tdgpt/taosanalytics/log.py b/tools/tdgpt/taosanalytics/log.py index 3a7aa7ae6e38..4588c14809d9 100644 --- a/tools/tdgpt/taosanalytics/log.py +++ b/tools/tdgpt/taosanalytics/log.py @@ -9,8 +9,9 @@ class AppLogger(): """ system log_inst class (singleton) """ _LOG_STR_FORMAT = '%(asctime)s - %(threadName)s - %(levelname)s - %(message)s' - _instance = None + _instance: 'AppLogger | None' = None _lock = __import__('threading').Lock() + log_inst: logging.Logger def __new__(cls): with cls._lock: @@ -26,6 +27,7 @@ def get_instance(cls) -> logging.Logger: if cls._instance is None: cls() + assert cls._instance is not None return cls._instance.log_inst @classmethod diff --git a/tools/tdgpt/taosanalytics/service_registry.py b/tools/tdgpt/taosanalytics/service_registry.py index 732c97dcada3..0be9a51fa8f7 100644 --- a/tools/tdgpt/taosanalytics/service_registry.py +++ b/tools/tdgpt/taosanalytics/service_registry.py @@ -11,7 +11,7 @@ import pandas as pd -from taosanalytics.algo.tool.forecaster import ArimaModelForecaster +from taosanalytics.algo.tool.forecaster import ArimaModelForecaster, ProphetModelForecaster from taosanalytics.conf import Configure from taosanalytics.log import AppLogger from taosanalytics.base import ( @@ -83,8 +83,43 @@ def execute(self): } elif algo_name == 'prophet': - # load prophet model and execute - raise NotImplementedError("Prophet model is not implemented yet") + try: + datetime_list = pd.to_datetime(self.ts_list, unit=self.precision, utc=True) + df = pd.DataFrame({ + 'ts': datetime_list, + 'y': self.list, + }) + + # Prophet does not handle timezone-aware datetimes well, so we remove timezone info if exists. We keep it as UTC to avoid any unintended timezone shifts. + df['ts'] = df['ts'].dt.tz_convert('UTC').dt.tz_localize(None) + + except Exception as e: + msg = f"failed to prepare input data for Prophet model forecast: {e}" + AppLogger.error(msg) + raise RuntimeError(msg) from e + + forecaster = ProphetModelForecaster(self.config_file_path, df, self.rows, alpha=1 - self.conf) + result = forecaster.forecast() + + if (result is None or + not isinstance(result, pd.DataFrame) or + not {'yhat', 'yhat_lower', 'yhat_upper'}.issubset(result.columns)): + raise RuntimeError( + f"failed to execute forecast with Prophet model forecaster built from config file: {self.config_file_path}") + + result = result.tail(self.rows) + result_ts = [self.start_ts + i * self.time_step for i in range(self.rows)] + + res = [result_ts, result['yhat'].tolist()] + if self.return_conf: + res.append(result['yhat_lower'].tolist()) + res.append(result['yhat_upper'].tolist()) + + return { + "mse": None, + "model_info": forecaster.get_param(), + "res": res + } elif algo_name == 'holtwinters': # load holtwinters model and execute raise NotImplementedError("HoltWinters model is not implemented yet") diff --git a/tools/tdgpt/tests/restful_api_test.py b/tools/tdgpt/tests/restful_api_test.py index 7b3321ce4903..2fc1ffa8d99a 100644 --- a/tools/tdgpt/tests/restful_api_test.py +++ b/tools/tdgpt/tests/restful_api_test.py @@ -469,7 +469,7 @@ def test_deploy_model(self, mock_deploy): "message": "Model sample_model deployed successfully" }, 200) - response = self.client.post('/deploy', json={ + response = self.client.post('/api/v1/deploy', json={ "model_name": "sample_model", "config": { "algo": "arima", @@ -478,6 +478,14 @@ def test_deploy_model(self, mock_deploy): "d": 1, "q": 1 } + } + }) + + self.assertEqual(response.status_code, 200) + self.assertEqual(response.json["status"], "success") + self.assertIn("deployed successfully", response.json["message"]) + mock_deploy.assert_called_once() + # --- /api/v1/analysis/pearsonr tests --- def test_pearsonr_happy_path(self): @@ -559,9 +567,12 @@ def test_profile_search_happy_path_dtw(self): }) self.assertEqual(response.status_code, 200) - self.assertEqual(response.json["status"], "success") - self.assertIn("deployed successfully", response.json["message"]) - mock_deploy.assert_called_once() + + res = response.json + self.assertEqual(res["rows"], 2) + self.assertEqual(res["metric_type"], "dtw_distance") + self.assertAlmostEqual(res["matches"][0]["criteria"], 0.0) + self.assertEqual(res["matches"][0]["ts_window"], [1, 5]) @patch("taosanalytics.app.do_handle_undeploy_model") def test_undeploy_model(self, mock_undeploy): @@ -570,7 +581,7 @@ def test_undeploy_model(self, mock_undeploy): "message": "Model sample_model undeployed successfully" }, 200) - response = self.client.post('/undeploy', json={ + response = self.client.post('/api/v1/undeploy', json={ "model_name": "sample_model" }) @@ -578,11 +589,7 @@ def test_undeploy_model(self, mock_undeploy): self.assertEqual(response.json["status"], "success") self.assertIn("undeployed successfully", response.json["message"]) mock_undeploy.assert_called_once() - res = response.json - self.assertEqual(res["rows"], 2) - self.assertEqual(res["metric_type"], "dtw_distance") - self.assertAlmostEqual(res["matches"][0]["criteria"], 0.0) - self.assertEqual(res["matches"][0]["ts_window"], [1, 5]) + def test_profile_search_happy_path_cosine(self): """happy path: cosine similarity profile search""" diff --git a/tools/tdgpt/tests/unit_test.py b/tools/tdgpt/tests/unit_test.py index c75b8dc6444a..233b83263a92 100644 --- a/tools/tdgpt/tests/unit_test.py +++ b/tools/tdgpt/tests/unit_test.py @@ -370,23 +370,6 @@ def _register_dynamic_service_for_algo(self, algo_name): loader.register_service_from_file(config_path) return service_name, config_path - def test_dynamic_execute_prophet_not_implemented(self): - import os - - service_name = None - config_path = None - try: - service_name, config_path = self._register_dynamic_service_for_algo("prophet") - service = loader.get_service(service_name) - self.assertIsNotNone(service) - with self.assertRaisesRegex(NotImplementedError, "Prophet model is not implemented yet"): - service.execute() - finally: - if service_name and service_name in loader.services: - del loader.services[service_name] - if config_path and os.path.exists(config_path): - os.remove(config_path) - def test_dynamic_execute_holtwinters_not_implemented(self): import os