Skip to content
159 changes: 5 additions & 154 deletions tools/tdgpt/taosanalytics/app.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
# encoding:utf-8
# pylint: disable=c0103
"""the main route definition for restful service"""
import os
import os.path
import sys

Expand All @@ -13,7 +12,7 @@
from taosanalytics.handlers.anomaly import handle_anomaly
from taosanalytics.handlers.forecast import handle_forecast
from taosanalytics.handlers.correlation import handle_correlation
from taosanalytics.handlers.misc import handle_batch
from taosanalytics.handlers.misc import do_profile_search, handle_batch, handle_pearsonr

from taosanalytics.conf import Configure
from taosanalytics.log import AppLogger
Expand Down Expand Up @@ -45,7 +44,6 @@ def _init_app():
app = Flask(__name__)
app.config["PROPAGATE_EXCEPTIONS"] = True


@app.route("/")
def index():
""" default rsp """
Expand Down Expand Up @@ -111,175 +109,28 @@ def handle_batch_req():
return handle_batch(request)


@app.route('/deploy', methods=['POST'])
@app.route('/api/v1/deploy', methods=['POST'])
Comment thread
hjxilinx marked this conversation as resolved.
def deploy_model():
"""deploy model to production environment, e.g. load model to memory, etc."""
return do_handle_dynamic_model(request)


@app.route('/undeploy', methods=['POST'])
@app.route('/api/v1/undeploy', methods=['POST'])
Comment thread
hjxilinx marked this conversation as resolved.
def undeploy_model():
return do_handle_undeploy_model(request)
Comment thread
hjxilinx marked this conversation as resolved.
Comment thread
hjxilinx marked this conversation as resolved.


def handle_pearsonr(request, api_version):
"""
Execute pearsonr correlation logic.

:param request: Flask request object
:param api_version: API version to determine the specific implementation of Pearson correlation
:return: dict with correlation result or error information
"""
try:
# check for rows limitation to reduce the process time
req_json, payload, options, data_index, _ = do_check_before_exec(request, False)
except ValueError as e:
msg = str(e)
if msg == SINGLE_COLUMN_ERROR_MSG:
msg = 'a second data column is required for pearsonr'
return {"msg": msg, "rows": -1}
except Exception as e:
return {"msg": str(e), "rows": -1}

if api_version != 'v1':
app_logger.log_inst.error('unsupported API version: %s', api_version)
return {"msg": f"unsupported API version: {api_version}", "rows": -1}

try:
second_list = get_more_data_list(payload, req_json["schema"])
if second_list is None:
return {"msg": "a second data column is required for pearsonr", "rows": -1}

correlation, p_value = pearsonr(payload[data_index], second_list)
if not np.isfinite(correlation):
correlation = 0.0
p_value = 1.0

correlation = float(correlation)
p_value = float(p_value)

app_logger.log_inst.debug(f"pearsonr correlation: {correlation}, p value: {p_value}")
res = {"option": options, "rows": 1, "correlation_coefficient": correlation, "p_value": p_value}

return res
except Exception as e:
app_logger.log_inst.error('pearsonr correlation failed, %s', str(e))
return {"msg": str(e), "rows": -1}

def do_profile_search(request, api_version):
"""
Execute profile search logic.

:param request: Flask request object
:param api_version: API version to determine the specific implementation of profile search
:return: dict with profile search result or error information

Request body example:
Supported normalization values: "min-max", "z-score", "centering", "none".
Supported algo types: "dtw", "cosine".
For "dtw", algo.params may include:
- "radius": search radius for fastdtw
- "min_window": minimum ts window size for profile search
- "max_window": maximum ts window size for profile search
- "window_size_step": step for ts window size between min_window and max_window, only applicable for dtw algo
- "window_sliding_step": step for sliding the ts window when searching
Result selection notes:
- Return the top N similar profiles with "num".
- Or return all profiles with distance below the threshold when using dtw.
- Or return all profiles with similarity above the threshold when using cosine similarity.
- "num" and "threshold" cannot be set at the same time.
- "exclude_contained" is only applicable for dtw and means whether to exclude the worse matched profile in a strict-containment pair, keeping the better one (the match with the smaller distance). For example, if there are two matched profiles with ts window [1, 5] and [2, 4], and one strictly contains the other, the worse match will be excluded if "exclude_contained" is set to true.
- "exclude_source" is applicable for all algorithms and means whether to exclude the matched profile that contains the source profile. For example, if the source profile has ts window [2, 4], the matched profile with ts window [2, 4] will be excluded if "exclude_source" is set to true.
- Threshold-based results are capped at 500 matches.
target_data.ts may be either:
- a unix timestamp list, such as [1, 2, 3, 4, 5, 6]
- a ts window list, such as [[1, 5], [2, 6]]
{
"normalization": "z-score",
"algo": {
"type": "dtw",
"params": {
"radius": 5,
"min_window": 5,
"max_window": 20,
"window_size_step": 2,
"window_sliding_step": 1
}
},
"result": {
"num": 3,
"exclude_contained": true,
"exclude_source": true
},
"source_data": {
"ts": [1000, 2000, 3000, 4000, 5000],
"data": [1, 2, 3, 4, 5]
},

"target_data": {
"ts": [1, 2, 3, 4, 5, 6],
"data": [1, 2, 3, 4, 5, 6]
}
}
Response example:
metric_type is either "dtw_distance" or "cosine_similarity".
Sort rule:
- "dtw_distance": smaller "criteria" means more similar (ascending)
- "cosine_similarity": larger "criteria" means more similar (descending)
In each match, "num" is the number of data points in the matched window.
{
"rows": 3,
"metric_type": "dtw_distance",
"matches": [
{
"criteria": 0.12,
"ts_window": [1, 5],
"num": 7
},
{
"criteria": 0.21,
"ts_window": [2, 6],
"num": 5
},
{
"criteria": 0.35,
"ts_window": [3, 7],
"num": 4
}
]
}

"""
if api_version != 'v1':
app_logger.log_inst.error('unsupported API version: %s', api_version)
return {"msg": f"unsupported API version: {api_version}", "rows": -1}

try:
req_json = do_initial_check(request)
except Exception as e:
return {"msg": str(e), "rows": -1}

try:
result = do_profile_search_impl(req_json)
app_logger.log_inst.debug("profile-search result: %s", result)
return result

except Exception as e:
app_logger.log_inst.error('profile search failed, %s', str(e))
return {"msg": str(e), "rows": -1}


@app.route('/api/v1/analysis/pearsonr', methods=['POST'])
def handle_pearsonr_req():
"""handle the pearsonr correlation request """
app_logger.log_inst.info('recv pearsonr correlation request from %s', request.remote_addr)
AppLogger.info('recv pearsonr correlation request from %s', request.remote_addr)
return handle_pearsonr(request, api_version='v1')


@app.route('/api/v1/analysis/profile-search', methods=['POST'])
def handle_profile_search_req():
"""handle the profile search request """
app_logger.log_inst.info('recv profile search request from %s', request.remote_addr)
AppLogger.info('recv profile search request from %s', request.remote_addr)
return do_profile_search(request, api_version='v1')


Expand Down
15 changes: 11 additions & 4 deletions tools/tdgpt/taosanalytics/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,14 @@
except Exception: # pragma: no cover
keras = None # noqa: F841

_ANODE_SECTION_NAME = "taosanode"


class Configure:
""" configuration class (singleton) """

_instance = None
_lock = __import__('threading').Lock()
path: Optional[str]
_conf: dict

def __new__(cls, conf_path: Optional[str] = None):
with cls._lock:
Expand All @@ -42,7 +42,7 @@ def __new__(cls, conf_path: Optional[str] = None):
if conf_path is not None:
print(f"Input configuration file not available. Use default config file: {instance.path}")

if os.path.exists(instance.path):
if instance.path is not None and os.path.exists(instance.path):
instance.reload()
else:
print(
Expand All @@ -63,6 +63,7 @@ def get_instance(cls) -> 'Configure':
"""Return the singleton instance, creating it with defaults if not yet initialized."""
if cls._instance is None:
cls()
assert cls._instance is not None
return cls._instance

# def _get_default_conf_windows(self):
Expand Down Expand Up @@ -156,8 +157,14 @@ def get_img_dir(self) -> str:
def reload(self):
""" load the info from config file """
spec = importlib.util.spec_from_file_location("gunicorn_config", self.path)
if spec is None:
raise RuntimeError(f"Cannot load configuration spec from {self.path}")

config_module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(config_module)
if spec.loader is None:
raise RuntimeError(f"Cannot load configuration loader from {self.path}")

spec.loader.exec_module(config_module) # type: ignore[union-attr]

conf_vars = {}
for key in dir(config_module):
Expand Down
Loading
Loading