Skip to content

Commit

Permalink
moving over to geomapper's generic function
Browse files Browse the repository at this point in the history
  • Loading branch information
dsweber2 authored and dshemetov committed Jun 5, 2024
1 parent 61277b2 commit 402f2ab
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 160 deletions.
2 changes: 2 additions & 0 deletions _delphi_utils_python/DEVELOP.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,3 +54,5 @@ When you are finished, the virtual environment can be deactivated and
deactivate
rm -r env
```
## Releasing the module
If you have made enough changes that it warrants updating [the PyPi project](https://pypi.org/project/delphi-utils/), currently this is done as part of merging from `main` to `prod`.
74 changes: 18 additions & 56 deletions nwss_wastewater/delphi_nwss/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,62 +26,18 @@
from datetime import datetime

import numpy as np
import pandas as pd
from delphi_utils import S3ArchiveDiffer, get_structured_logger, create_export_csv
from delphi_utils import (
GeoMapper,
S3ArchiveDiffer,
get_structured_logger,
create_export_csv,
)
from delphi_utils.nancodes import add_default_nancodes

from .constants import GEOS, METRIC_SIGNALS, PROVIDER_NORMS, SIGNALS
from .pull import pull_nwss_data


def sum_all_nan(x):
"""Return a normal sum unless everything is NaN, then return that."""
all_nan = np.isnan(x).all()
if all_nan:
return np.nan
return np.nansum(x)


def generate_weights(df, column_aggregating="pcr_conc_smoothed"):
"""
Weigh column_aggregating by population.
generate the relevant population amounts, and create a weighted but
unnormalized column, derived from `column_aggregating`
"""
# set the weight of places with na's to zero
df[f"relevant_pop_{column_aggregating}"] = (
df["population_served"] * np.abs(df[column_aggregating]).notna()
)
# generate the weighted version
df[f"weighted_{column_aggregating}"] = (
df[column_aggregating] * df[f"relevant_pop_{column_aggregating}"]
)
return df


def weighted_state_sum(df: pd.DataFrame, geo: str, sensor: str):
"""Sum sensor, weighted by population for non NA's, grouped by state."""
agg_df = df.groupby(["timestamp", geo]).agg(
{f"relevant_pop_{sensor}": "sum", f"weighted_{sensor}": sum_all_nan}
)
agg_df["val"] = agg_df[f"weighted_{sensor}"] / agg_df[f"relevant_pop_{sensor}"]
agg_df = agg_df.reset_index()
agg_df = agg_df.rename(columns={"state": "geo_id"})
return agg_df


def weighted_nation_sum(df: pd.DataFrame, sensor: str):
"""Sum sensor, weighted by population for non NA's."""
agg_df = df.groupby("timestamp").agg(
{f"relevant_pop_{sensor}": "sum", f"weighted_{sensor}": sum_all_nan}
)
agg_df["val"] = agg_df[f"weighted_{sensor}"] / agg_df[f"relevant_pop_{sensor}"]
agg_df = agg_df.reset_index()
agg_df["geo_id"] = "us"
return agg_df


def add_needed_columns(df, col_names=None):
"""Short util to add expected columns not found in the dataset."""
if col_names is None:
Expand Down Expand Up @@ -140,7 +96,7 @@ def run_module(params):
## build the base version of the signal at the most detailed geo level you can get.
## compute stuff here or farm out to another function or file
df_pull = pull_nwss_data(socrata_token, logger)
## aggregate
geomapper = GeoMapper()
# iterate over the providers and the normalizations that they specifically provide
for provider, normalization in zip(
PROVIDER_NORMS["provider"], PROVIDER_NORMS["normalization"]
Expand All @@ -153,16 +109,22 @@ def run_module(params):
for sensor in [*SIGNALS, *METRIC_SIGNALS]:
full_sensor_name = sensor + "_" + provider + "_" + normalization
df_prov_norm = df_prov_norm.rename(columns={sensor: full_sensor_name})
# add weighed column
df = generate_weights(df_prov_norm, full_sensor_name)
for geo in GEOS:
logger.info(
"Generating signal and exporting to CSV", metric=full_sensor_name
)
if geo == "nation":
agg_df = weighted_nation_sum(df, full_sensor_name)
else:
agg_df = weighted_state_sum(df, geo, full_sensor_name)
df_prov_norm["nation"] = "us"
agg_df = geomapper.aggregate_by_weighted_sum(
df_prov_norm,
geo,
full_sensor_name,
"timestamp",
"population_served",
)
agg_df = agg_df.rename(
columns={geo: "geo_id", f"weighted_{full_sensor_name}": "val"}
)
# add se, sample_size, and na codes
agg_df = add_needed_columns(agg_df)
# actual export
Expand Down
105 changes: 1 addition & 104 deletions nwss_wastewater/tests/test_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,110 +2,7 @@
import pandas as pd
from pandas.testing import assert_frame_equal

from delphi_nwss.run import (
add_needed_columns,
generate_weights,
sum_all_nan,
weighted_state_sum,
weighted_nation_sum,
)


def test_sum_all_nan():
"""Check that sum_all_nan returns NaN iff everything is a NaN"""
assert sum_all_nan(np.array([3, 5])) == 8
assert np.isclose(sum_all_nan([np.nan, 3, 5]), 8)
assert np.isnan(np.array([np.nan, np.nan])).all()


def test_weight_generation():
dataFrame = pd.DataFrame(
{
"a": [1, 2, 3, 4, np.nan],
"b": [5, 6, 7, 8, 9],
"population_served": [10, 5, 8, 1, 3],
}
)
weighted = generate_weights(dataFrame, column_aggregating="a")
weighted_by_hand = pd.DataFrame(
{
"a": [1, 2, 3, 4, np.nan],
"b": [5, 6, 7, 8, 9],
"population_served": [10, 5, 8, 1, 3],
"relevant_pop_a": [10, 5, 8, 1, 0],
"weighted_a": [10.0, 2 * 5.0, 3 * 8, 4.0 * 1, np.nan * 0],
}
)
assert_frame_equal(weighted, weighted_by_hand)
# operations are in-place
assert_frame_equal(weighted, dataFrame)


def test_weighted_state_sum():
dataFrame = pd.DataFrame(
{
"state": ["al", "al", "ca", "ca", "nd", "me", "me"],
"timestamp": np.zeros(7),
"a": [1, 2, 3, 4, 12, -2, 2],
"b": [5, 6, 7, np.nan, np.nan, -1, -2],
"population_served": [10, 5, 8, 1, 3, 1, 2],
}
)
weighted = generate_weights(dataFrame, column_aggregating="b")
agg = weighted_state_sum(weighted, "state", "b")
expected_agg = pd.DataFrame(
{
"timestamp": np.zeros(4),
"geo_id": ["al", "ca", "me", "nd"],
"relevant_pop_b": [10 + 5, 8 + 0, 1 + 2, 0],
"weighted_b": [5 * 10 + 6 * 5, 7 * 8 + 0, 1 * -1 + -2 * 2, np.nan],
"val": [80 / 15, 56 / 8, -5 / 3, np.nan],
}
)
assert_frame_equal(agg, expected_agg)

weighted = generate_weights(dataFrame, column_aggregating="a")
agg_a = weighted_state_sum(weighted, "state", "a")
expected_agg_a = pd.DataFrame(
{
"timestamp": np.zeros(4),
"geo_id": ["al", "ca", "me", "nd"],
"relevant_pop_a": [10 + 5, 8 + 1, 1 + 2, 3],
"weighted_a": [1 * 10 + 2 * 5, 3 * 8 + 1 * 4, -2 * 1 + 2 * 2, 12 * 3],
"val": [20 / 15, 28 / 9, (-2 * 1 + 2 * 2) / 3, 36 / 3],
}
)
assert_frame_equal(agg_a, expected_agg_a)


def test_weighted_nation_sum():
dataFrame = pd.DataFrame(
{
"state": [
"al",
"al",
"ca",
"ca",
"nd",
],
"timestamp": np.hstack((np.zeros(3), np.ones(2))),
"a": [1, 2, 3, 4, 12],
"b": [5, 6, 7, np.nan, np.nan],
"population_served": [10, 5, 8, 1, 3],
}
)
weighted = generate_weights(dataFrame, column_aggregating="a")
agg = weighted_nation_sum(weighted, "a")
expected_agg = pd.DataFrame(
{
"timestamp": [0.0, 1],
"relevant_pop_a": [10 + 5 + 8, 1 + 3],
"weighted_a": [1 * 10 + 2 * 5 + 3 * 8, 1 * 4 + 3 * 12],
"val": [44 / 23, 40 / 4],
"geo_id": ["us", "us"],
}
)
assert_frame_equal(agg, expected_agg)
from delphi_nwss.run import add_needed_columns


def test_adding_cols():
Expand Down

0 comments on commit 402f2ab

Please sign in to comment.