Skip to content

Commit f4bc651

Browse files
committed
first draft of splitting signals
1 parent 089bd24 commit f4bc651

File tree

5 files changed

+203
-67
lines changed

5 files changed

+203
-67
lines changed

nwss_wastewater/delphi_nwss/constants.py

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,16 @@
1212

1313
SIGNALS = ["pcr_conc_smoothed"]
1414
METRIC_SIGNALS = ["detect_prop_15d", "percentile", "ptc_15d"]
15+
PROVIDER_NORMS = {
16+
"provider": ["CDC_VERILY", "CDC_VERILY", "NWSS", "NWSS", "WWS"],
17+
"normalization": [
18+
"flow-population",
19+
"microbial",
20+
"flow-population",
21+
"microbial",
22+
"microbial",
23+
],
24+
}
1525
METRIC_DATES = ["date_start", "date_end"]
1626
SAMPLE_SITE_NAMES = {
1727
"wwtp_jurisdiction": "category",
@@ -24,6 +34,6 @@
2434
"sampling_prior": bool,
2535
"sample_location_specify": float,
2636
}
27-
SIG_DIGITS = 7
37+
SIG_DIGITS = 4
2838

2939
NEWLINE = "\n"

nwss_wastewater/delphi_nwss/pull.py

Lines changed: 78 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
from .constants import (
99
SIGNALS,
10+
PROVIDER_NORMS,
1011
METRIC_SIGNALS,
1112
METRIC_DATES,
1213
SAMPLE_SITE_NAMES,
@@ -28,7 +29,7 @@ def sig_digit_round(value, n_digits):
2829
sign_mask = value < 0
2930
value[sign_mask] *= -1
3031
exponent = np.ceil(np.log10(value))
31-
result = 10**exponent * np.round(value * 10 ** (-exponent), n_digits)
32+
result = 10 ** exponent * np.round(value * 10 ** (-exponent), n_digits)
3233
result[sign_mask] *= -1
3334
result[zero_mask] = in_value[zero_mask]
3435
return result
@@ -60,21 +61,66 @@ def warn_string(df, type_dict):
6061
"""
6162

6263

63-
def add_population(df, df_metric):
64-
"""Add the population column from df_metric to df, and rename some columns."""
64+
def reformat(df, df_metric):
65+
"""Add columns from df_metric to df, and rename some columns.
66+
67+
Specifically the population and METRIC_SIGNAL columns, and renames date_start to timestamp.
68+
"""
6569
# drop unused columns from df_metric
66-
df_population = df_metric.loc[:, ["key_plot_id", "date_start", "population_served"]]
70+
df_metric_core = df_metric.loc[
71+
:, ["key_plot_id", "date_start", "population_served", *METRIC_SIGNALS]
72+
]
6773
# get matching keys
68-
df_population = df_population.rename(columns={"date_start": "timestamp"})
69-
df_population = df_population.set_index(["key_plot_id", "timestamp"])
74+
df_metric_core = df_metric_core.rename(columns={"date_start": "timestamp"})
75+
df_metric_core = df_metric_core.set_index(["key_plot_id", "timestamp"])
7076
df = df.set_index(["key_plot_id", "timestamp"])
7177

72-
df = df.join(df_population)
78+
df = df.join(df_metric_core)
7379
df = df.reset_index()
7480
return df
7581

7682

77-
def pull_nwss_data(socrata_token: str):
83+
def drop_unnormalized(df):
84+
"""Drop unnormalized.
85+
86+
mutate `df` to no longer have rows where the normalization scheme isn't actually identified,
87+
as we can't classify the kind of signal
88+
"""
89+
return df[~df["normalization"].isna()]
90+
91+
92+
def add_identifier_columns(df):
93+
"""Add identifier columns.
94+
95+
Add columns to get more detail than key_plot_id gives;
96+
specifically, state, and `provider_normalization`, which gives the signal identifier
97+
"""
98+
df["state"] = df.key_plot_id.str.extract(
99+
r"_(\w\w)_"
100+
) # a pair of alphanumerics surrounded by _
101+
df["provider"] = df.key_plot_id.str.extract(
102+
r"(.*)_[a-z]{2}_"
103+
) # anything followed by state ^
104+
df["signal_name"] = df.provider + "_" + df.normalization
105+
106+
107+
def check_endpoints(df):
108+
"""Make sure that there aren't any new signals that we need to add."""
109+
# compare with existing column name checker
110+
# also add a note about handling errors
111+
unique_provider_norms = (
112+
df[["provider", "normalization"]]
113+
.drop_duplicates()
114+
.sort_values(["provider", "normalization"])
115+
.reset_index(drop=True)
116+
)
117+
if not unique_provider_norms.equals(pd.DataFrame(PROVIDER_NORMS)):
118+
raise ValueError(
119+
f"There are new providers and/or norms. They are\n{unique_provider_norms}"
120+
)
121+
122+
123+
def pull_nwss_data(token: str):
78124
"""Pull the latest NWSS Wastewater data, and conforms it into a dataset.
79125
80126
The output dataset has:
@@ -95,13 +141,15 @@ def pull_nwss_data(socrata_token: str):
95141
pd.DataFrame
96142
Dataframe as described above.
97143
"""
144+
# Constants
145+
keep_columns = [*SIGNALS, *METRIC_SIGNALS]
98146
# concentration key types
99147
type_dict, type_dict_metric = construct_typedicts()
100148

101149
# Pull data from Socrata API
102-
client = Socrata("data.cdc.gov", socrata_token)
103-
results_concentration = client.get("g653-rqe2", limit=10**10)
104-
results_metric = client.get("2ew6-ywp6", limit=10**10)
150+
client = Socrata("data.cdc.gov", token)
151+
results_concentration = client.get("g653-rqe2", limit=10 ** 10)
152+
results_metric = client.get("2ew6-ywp6", limit=10 ** 10)
105153
df_metric = pd.DataFrame.from_records(results_metric)
106154
df_concentration = pd.DataFrame.from_records(results_concentration)
107155
df_concentration = df_concentration.rename(columns={"date": "timestamp"})
@@ -116,19 +164,29 @@ def pull_nwss_data(socrata_token: str):
116164
except KeyError as exc:
117165
raise ValueError(warn_string(df_metric, type_dict_metric)) from exc
118166

167+
# if the normalization scheme isn't recorded, why is it even included as a sample site?
168+
df = drop_unnormalized(df_concentration)
119169
# pull 2 letter state labels out of the key_plot_id labels
120-
df_concentration["state"] = df_concentration.key_plot_id.str.extract(r"_(\w\w)_")
170+
add_identifier_columns(df)
121171

172+
# move population and metric signals over to df
173+
df = reformat(df, df_metric)
122174
# round out some of the numeric noise that comes from smoothing
123-
df_concentration[SIGNALS[0]] = sig_digit_round(
124-
df_concentration[SIGNALS[0]], SIG_DIGITS
125-
)
175+
for signal in [*SIGNALS, *METRIC_SIGNALS]:
176+
df[signal] = sig_digit_round(df[signal], SIG_DIGITS)
126177

127-
df_concentration = add_population(df_concentration, df_metric)
128178
# if there are population NA's, assume the previous value is accurate (most
129179
# likely introduced by dates only present in one and not the other; even
130180
# otherwise, best to assume some value rather than break the data)
131-
df_concentration.population_served = df_concentration.population_served.ffill()
132-
133-
keep_columns = ["timestamp", "state", "population_served"]
134-
return df_concentration[SIGNALS + keep_columns]
181+
df.population_served = df.population_served.ffill()
182+
check_endpoints(df)
183+
keep_columns.extend(
184+
[
185+
"timestamp",
186+
"state",
187+
"population_served",
188+
"normalization",
189+
"provider",
190+
]
191+
)
192+
return df[keep_columns]

nwss_wastewater/delphi_nwss/run.py

Lines changed: 46 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929
from delphi_utils import S3ArchiveDiffer, get_structured_logger, create_export_csv
3030
from delphi_utils.nancodes import add_default_nancodes
3131

32-
from .constants import GEOS, SIGNALS
32+
from .constants import GEOS, METRIC_SIGNALS, PROVIDER_NORMS, SIGNALS
3333
from .pull import pull_nwss_data
3434

3535

@@ -50,7 +50,7 @@ def generate_weights(df, column_aggregating="pcr_conc_smoothed"):
5050
"""
5151
# set the weight of places with na's to zero
5252
df[f"relevant_pop_{column_aggregating}"] = (
53-
df["population_served"] * df[column_aggregating].notna()
53+
df["population_served"] * np.abs(df[column_aggregating]).notna()
5454
)
5555
# generate the weighted version
5656
df[f"weighted_{column_aggregating}"] = (
@@ -126,38 +126,60 @@ def run_module(params):
126126
export_dir = params["common"]["export_dir"]
127127
socrata_token = params["indicator"]["socrata_token"]
128128
if "archive" in params:
129-
daily_arch_diff = S3ArchiveDiffer(
129+
arch_diff = S3ArchiveDiffer(
130130
params["archive"]["cache_dir"],
131131
export_dir,
132132
params["archive"]["bucket_name"],
133-
"nchs_mortality",
133+
"nwss_wastewater",
134134
params["archive"]["aws_credentials"],
135135
)
136-
daily_arch_diff.update_cache()
136+
arch_diff.update_cache()
137137

138138
run_stats = []
139139
## build the base version of the signal at the most detailed geo level you can get.
140140
## compute stuff here or farm out to another function or file
141141
df_pull = pull_nwss_data(socrata_token)
142142
## aggregate
143-
for sensor in SIGNALS:
144-
df = df_pull.copy()
145-
# add weighed column
146-
df = generate_weights(df, sensor)
147-
148-
for geo in GEOS:
149-
logger.info("Generating signal and exporting to CSV", metric=sensor)
150-
if geo == "nation":
151-
agg_df = weighted_nation_sum(df, sensor)
152-
else:
153-
agg_df = weighted_state_sum(df, geo, sensor)
154-
# add se, sample_size, and na codes
155-
agg_df = add_needed_columns(agg_df)
156-
# actual export
157-
dates = create_export_csv(
158-
agg_df, geo_res=geo, export_dir=export_dir, sensor=sensor
159-
)
160-
if len(dates) > 0:
161-
run_stats.append((max(dates), len(dates)))
143+
# iterate over the providers and the normalizations that they specifically provide
144+
for (provider, normalization) in zip(
145+
PROVIDER_NORMS["provider"], PROVIDER_NORMS["normalization"]
146+
):
147+
# copy by only taking the relevant subsection
148+
df_prov_norm = df_pull[
149+
(df_pull.provider == provider) & (df_pull.normalization == normalization)
150+
]
151+
df_prov_norm = df_prov_norm.drop(["provider", "normalization"], axis=1)
152+
for sensor in [*SIGNALS, *METRIC_SIGNALS]:
153+
full_sensor_name = sensor + "_" + provider + "_" + normalization
154+
df_prov_norm = df_prov_norm.rename(columns={sensor: full_sensor_name})
155+
# add weighed column
156+
df = generate_weights(df_prov_norm, full_sensor_name)
157+
for geo in GEOS:
158+
logger.info(
159+
"Generating signal and exporting to CSV", metric=full_sensor_name
160+
)
161+
if geo == "nation":
162+
agg_df = weighted_nation_sum(df, full_sensor_name)
163+
else:
164+
agg_df = weighted_state_sum(df, geo, full_sensor_name)
165+
# add se, sample_size, and na codes
166+
agg_df = add_needed_columns(agg_df)
167+
# actual export
168+
dates = create_export_csv(
169+
agg_df, geo_res=geo, export_dir=export_dir, sensor=full_sensor_name
170+
)
171+
if "archive" in params:
172+
_, common_diffs, new_files = arch_diff.diff_exports()
173+
to_archive = [
174+
f for f, diff in common_diffs.items() if diff is not None
175+
]
176+
to_archive += new_files
177+
_, fails = arch_diff.archive_exports(to_archive)
178+
succ_common_diffs = {
179+
f: diff for f, diff in common_diffs.items() if f not in fails
180+
}
181+
arch_diff.filter_exports(succ_common_diffs)
182+
if len(dates) > 0:
183+
run_stats.append((max(dates), len(dates)))
162184
## log this indicator run
163185
logging(start_time, run_stats, logger)

nwss_wastewater/tests/test_pull.py

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,11 @@
1010
import pandas.api.types as ptypes
1111

1212
from delphi_nwss.pull import (
13+
add_identifier_columns,
14+
check_endpoints,
1315
construct_typedicts,
1416
sig_digit_round,
15-
add_population,
17+
reformat,
1618
warn_string,
1719
)
1820
import numpy as np
@@ -111,6 +113,15 @@ def test_column_conversions_metric():
111113
assert all(ptypes.is_numeric_dtype(converted[flo].dtype) for flo in float_typed)
112114

113115

116+
def test_warn_string():
117+
type_dict, type_dict_metric = construct_typedicts()
118+
df_conc = pd.read_csv("test_data/conc_data.csv")
119+
assert (
120+
warn_string(df_conc, type_dict)
121+
== "\nExpected column(s) missed, The dataset schema may\nhave changed. Please investigate and amend the code.\n\nColumns needed:\npcr_conc_smoothed\ntimestamp\n\nColumns available:\nUnnamed: 0\nkey_plot_id\ndate\npcr_conc_smoothed\nnormalization\n"
122+
)
123+
124+
114125
def test_formatting():
115126
type_dict, type_dict_metric = construct_typedicts()
116127
df_metric = pd.read_csv("test_data/metric_data.csv", index_col=0)
@@ -132,6 +143,28 @@ def test_formatting():
132143
"pcr_conc_smoothed",
133144
"normalization",
134145
"population_served",
146+
"detect_prop_15d",
147+
"percentile",
148+
"ptc_15d",
135149
]
136150
)
137151
)
152+
153+
154+
def test_identifier_colnames():
155+
test_df = pd.read_csv("test_data/conc_data.csv", index_col=0)
156+
add_identifier_columns(test_df)
157+
assert all(test_df.state.unique() == ["ak", "tn"])
158+
assert all(test_df.provider.unique() == ["CDC_BIOBOT", "WWS"])
159+
# the only cases where the signal name is wrong is when normalization isn't defined
160+
assert all(
161+
(test_df.signal_name == test_df.provider + "_" + test_df.normalization)
162+
| (test_df.normalization.isna())
163+
)
164+
assert all(
165+
(
166+
test_df.signal_name.unique()
167+
== ["CDC_BIOBOT_flow-population", np.nan, "WWS_microbial"]
168+
)
169+
| (pd.isna(test_df.signal_name.unique()))
170+
)

0 commit comments

Comments
 (0)