diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index fd2d5fb4b..2bdb932b4 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -65,7 +65,7 @@ jobs: docker network create --driver bridge delphi-net docker run --rm -d -p 13306:3306 --network delphi-net --name delphi_database_epidata --cap-add=sys_nice delphi_database_epidata docker run --rm -d -p 6379:6379 --network delphi-net --env "REDIS_PASSWORD=1234" --name delphi_redis delphi_redis - + - run: | wget https://raw.githubusercontent.com/eficode/wait-for/master/wait-for @@ -103,7 +103,7 @@ jobs: with: node-version: '16.x' - name: Cache Node.js modules - uses: actions/cache@v2 + uses: actions/cache@v4 with: path: ~/.npm # npm cache files are stored in `~/.npm` on Linux/macOS key: ${{ runner.OS }}-node2-${{ hashFiles('**/package-lock.json') }} diff --git a/src/client/delphi_epidata.py b/src/client/delphi_epidata.py index 998c85281..b64ee2e55 100644 --- a/src/client/delphi_epidata.py +++ b/src/client/delphi_epidata.py @@ -675,6 +675,43 @@ def covid_hosp_facility_lookup(state=None, ccn=None, city=None, zip=None, fips_c # Make the API call return Epidata._request("covid_hosp_facility_lookup", params) + # Fetch Canadian respiratory RVDSS data + @staticmethod + def rvdss( + geo_type, + time_values, + geo_value, + as_of=None, + issues=None, + ): + """Fetch RVDSS data.""" + # Check parameters + if None in (geo_type, time_values, geo_value): + raise EpidataBadRequestException( + "`geo_type`, `geo_value`, and `time_values` are all required" + ) + + # Set up request + params = { + # Fake a time type param so that we can use some helper functions later. + "time_type": "week", + "geo_type": geo_type, + "time_values": Epidata._list(time_values), + } + + if isinstance(geo_value, (list, tuple)): + params["geo_values"] = ",".join(geo_value) + else: + params["geo_values"] = geo_value + if as_of is not None: + params["as_of"] = as_of + if issues is not None: + params["issues"] = Epidata._list(issues) + + # Make the API call + return Epidata._request("rvdss", params) + + @staticmethod def async_epidata(param_list, batch_size=50): """[DEPRECATED] Make asynchronous Epidata calls for a list of parameters.""" diff --git a/src/server/_query.py b/src/server/_query.py index 267a78eb1..58be121b2 100644 --- a/src/server/_query.py +++ b/src/server/_query.py @@ -483,15 +483,29 @@ def apply_issues_filter(self, history_table: str, issues: Optional[TimeValues]) self.where_integers("issue", issues) return self - def apply_as_of_filter(self, history_table: str, as_of: Optional[int]) -> "QueryBuilder": + # Note: This function was originally only used by covidcast, so it assumed + # that `source` and `signal` columns are always available. To make this usable + # for rvdss, too, which doesn't have the `source` or `signal` columns, add the + # `use_source_signal` flag to toggle inclusion of `source` and `signal` columns. + # + # `use_source_signal` defaults to True so if not passed, the function + # retains the historical behavior. + def apply_as_of_filter(self, history_table: str, as_of: Optional[int], use_source_signal: Optional[bool] = True) -> "QueryBuilder": if as_of is not None: self.retable(history_table) sub_condition_asof = "(issue <= :as_of)" self.params["as_of"] = as_of - sub_fields = "max(issue) max_issue, time_type, time_value, `source`, `signal`, geo_type, geo_value" - sub_group = "time_type, time_value, `source`, `signal`, geo_type, geo_value" + alias = self.alias - sub_condition = f"x.max_issue = {alias}.issue AND x.time_type = {alias}.time_type AND x.time_value = {alias}.time_value AND x.source = {alias}.source AND x.signal = {alias}.signal AND x.geo_type = {alias}.geo_type AND x.geo_value = {alias}.geo_value" + source_signal_plain = source_signal_alias = "" + # If `use_source_signal` toggled on, append `source` and `signal` columns to group-by list and join list. + if use_source_signal: + source_signal_plain = ", `source`, `signal`" + source_signal_alias = f" AND x.source = {alias}.source AND x.signal = {alias}.signal" + + sub_fields = f"max(issue) max_issue, time_type, time_value{source_signal_plain}, geo_type, geo_value" + sub_group = f"time_type, time_value{source_signal_plain}, geo_type, geo_value" + sub_condition = f"x.max_issue = {alias}.issue AND x.time_type = {alias}.time_type AND x.time_value = {alias}.time_value{source_signal_alias} AND x.geo_type = {alias}.geo_type AND x.geo_value = {alias}.geo_value" self.subquery = f"JOIN (SELECT {sub_fields} FROM {self.table} WHERE {self.conditions_clause} AND {sub_condition_asof} GROUP BY {sub_group}) x ON {sub_condition}" return self diff --git a/src/server/endpoints/__init__.py b/src/server/endpoints/__init__.py index 6dad05bf3..c0a095475 100644 --- a/src/server/endpoints/__init__.py +++ b/src/server/endpoints/__init__.py @@ -25,6 +25,7 @@ nowcast, paho_dengue, quidel, + rvdss, sensors, twitter, wiki, @@ -59,6 +60,7 @@ nowcast, paho_dengue, quidel, + rvdss, sensors, twitter, wiki, diff --git a/src/server/endpoints/rvdss.py b/src/server/endpoints/rvdss.py new file mode 100644 index 000000000..2e9198f01 --- /dev/null +++ b/src/server/endpoints/rvdss.py @@ -0,0 +1,97 @@ +from flask import Blueprint, request + +from .._params import ( + extract_date, + extract_dates, + extract_strings, + parse_time_set, +) +from .._query import execute_query, QueryBuilder +from .._validate import require_all + +bp = Blueprint("rvdss", __name__) + +db_table_name = "rvdss" + +@bp.route("/", methods=("GET", "POST")) +def handle(): + require_all(request, "time_type", "time_values", "geo_type", "geo_values") + + time_set = parse_time_set() + geo_type = extract_strings("geo_type") + geo_values = extract_strings("geo_values") + issues = extract_dates("issues") + as_of = extract_date("as_of") + + # basic query info + q = QueryBuilder(db_table_name, "rv") + + fields_string = [ + "geo_type", + "geo_value", + "region", + "time_type", + ] + fields_int = [ + "epiweek", + "time_value", + "issue", + "week", + "weekorder", + "year", + ] + fields_float = [ + "adv_pct_positive", + "adv_positive_tests", + "adv_tests", + "evrv_pct_positive", + "evrv_positive_tests", + "evrv_tests", + "flu_pct_positive", + "flu_positive_tests", + "flu_tests", + "flua_pct_positive", + "flua_positive_tests", + "flua_tests", + "fluah1n1pdm09_positive_tests", + "fluah3_positive_tests", + "fluauns_positive_tests", + "flub_pct_positive", + "flub_positive_tests", + "flub_tests", + "hcov_pct_positive", + "hcov_positive_tests", + "hcov_tests", + "hmpv_pct_positive", + "hmpv_positive_tests", + "hmpv_tests", + "hpiv1_positive_tests", + "hpiv2_positive_tests", + "hpiv3_positive_tests", + "hpiv4_positive_tests", + "hpiv_pct_positive", + "hpiv_positive_tests", + "hpiv_tests", + "hpivother_positive_tests", + "rsv_pct_positive", + "rsv_positive_tests", + "rsv_tests", + "sarscov2_pct_positive", + "sarscov2_positive_tests", + "sarscov2_tests", + ] + + q.set_sort_order("epiweek", "time_value", "geo_type", "geo_value", "issue") + q.set_fields(fields_string, fields_int, fields_float) + + q.where_strings("geo_type", geo_type) + # Only apply geo_values filter if wildcard "*" was NOT used. + if not (len(geo_values) == 1 and geo_values[0] == "*"): + q.where_strings("geo_value", geo_values) + + q.apply_time_filter("time_type", "time_value", time_set) + q.apply_issues_filter(db_table_name, issues) + q.apply_as_of_filter(db_table_name, as_of, use_source_signal = False) + + # send query + return execute_query(str(q), q.params, fields_string, fields_int, fields_float)