Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
53 commits
Select commit Hold shift + click to select a range
ef1b1d6
list output
shahronak47 May 23, 2024
6710584
update Roxygen version
shahronak47 May 23, 2024
eb838f4
added tests
shahronak47 Jun 4, 2024
2a24cc1
fill_gaps vectorization
shahronak47 Jun 5, 2024
05f1ffa
Merge branch 'DEV' into vectorize-povline
shahronak47 Jun 7, 2024
507ce02
Merge branch 'DEV' into vectorize-povline
shahronak47 Jul 20, 2024
2c9e643
Merge branch 'DEV' into vectorize-povline
shahronak47 Aug 7, 2024
0956f38
comment assert_that
shahronak47 Aug 12, 2024
161d83e
proposed change with grouped by poverty_line
giorgiacek Aug 23, 2024
6cb786c
Merge pull request #412 from PIP-Technical-Team/vectorize-povline-gc
shahronak47 Aug 26, 2024
09ff25a
add tests of multiple lines
Aug 28, 2024
5fabb71
Merge pull request #413 from PIP-Technical-Team/vectorize-povline-AC
shahronak47 Aug 29, 2024
912e1b2
fix bug
Aug 29, 2024
80f35ca
finishing final bug
Aug 29, 2024
68ae810
Merge branch 'DEV' into vectorize-povline_AC
Sep 5, 2024
eae0390
make it dependent of wbpip (>= 0.1.5)
Sep 30, 2024
790097f
upper case year in pip_grp_logic
Sep 30, 2024
d0e362f
add more tests to check of aggregates
Sep 30, 2024
1f2e85d
Merge pull request #421 from PIP-Technical-Team/vectorize-povline_AC
shahronak47 Oct 1, 2024
5e14dbb
fix aggregate level vectorized povline pip_grp_logic()
Oct 5, 2024
c635fe1
Merge branch 'DEV' into vectorize-povline
shahronak47 Oct 5, 2024
4cdc278
unnest_dt
shahronak47 Oct 13, 2024
0550a0b
Merge branch 'vectorize-povline' of https://github.com/PIP-Technical-…
shahronak47 Oct 13, 2024
fbe2d08
update code
Oct 14, 2024
df18a39
update docs
Oct 14, 2024
b316ac1
rm lorenz parameter from as.numeric call
shahronak47 Oct 15, 2024
88daa4d
Merge branch 'vectorize-povline' of https://github.com/PIP-Technical-…
shahronak47 Oct 15, 2024
60aceda
get kv values
shahronak47 Oct 19, 2024
490b56d
push print
shahronak47 Nov 18, 2024
97350cd
Merge branch 'DEV' into vectorize-povline
shahronak47 Mar 19, 2025
aabab6c
Merge branch 'DEV' into vectorize-povline
shahronak47 Mar 26, 2025
4f4142e
update docs
shahronak47 Mar 26, 2025
916a727
Merge branch 'vectorize-povline' of https://github.com/PIP-Technical-…
shahronak47 Mar 26, 2025
6a50c8f
fix few tests
Apr 1, 2025
993859d
update data
Apr 5, 2025
22ae5d9
intermediate fix for the tests
Apr 22, 2025
fb50bd1
fix final pipelines
Apr 22, 2025
b2ec732
use %in%
Apr 23, 2025
f83f1a5
fix docs
May 1, 2025
bca2b2c
remove comment
shahronak47 May 2, 2025
1020d98
switch to collapse join
shahronak47 May 4, 2025
9ada0a5
remove duckplyr
shahronak47 May 5, 2025
1078335
add verbose = 0 to collapse::join
May 5, 2025
b8ca724
half poverty line issue
shahronak47 May 9, 2025
b0d82f0
Merge branch 'vectorize-povline' of https://github.com/PIP-Technical-…
shahronak47 May 9, 2025
0cf7755
add jsonlite
May 6, 2025
611f37b
Update DESCRIPTION
tonyfujs May 6, 2025
16be41b
remove duckplyr
May 12, 2025
3969151
update function
shahronak47 May 14, 2025
1f5214f
while inserting take poverty lines as unique key
shahronak47 May 14, 2025
69d2ee0
pull from DEV
shahronak47 May 14, 2025
12aff6c
create file
shahronak47 May 14, 2025
5ac9b8d
set read only as FALSE
shahronak47 May 14, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion DESCRIPTION
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ Imports:
fst (>= 0.9.8),
plumber,
urltools,
wbpip,
wbpip (>= 0.1.5),
rlang (>= 1.1.2),
fs,
memoise,
Expand Down
1 change: 1 addition & 0 deletions NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ export(ui_hp_stacked)
export(ui_pc_charts)
export(ui_pc_regional)
export(ui_svy_meta)
export(unnest_dt_longer)
export(update_master_file)
export(valid_years)
export(validate_input_grouped_stats)
Expand Down
27 changes: 17 additions & 10 deletions R/add_agg_stats.R
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,9 @@ add_agg_stats <- function(df,
aggregated <- lapply(aggregated_list,
ag_average_poverty_stats,
return_cols)
aggregated <- data.table::rbindlist(aggregated)

aggregated <- data.table::rbindlist(aggregated)
aggregated$path <- as.character(aggregated$path)
df <- rbind(df, aggregated)
}

Expand All @@ -45,9 +46,6 @@ ag_average_poverty_stats <- function(df, return_cols) {
na_cols <- return_cols$na_cols
national_cols <- return_cols$national_cols

# This should be removed eventually
# assertthat::assert_that(assertthat::are_equal(length(df$reporting_level), 2))

# STEP 1: Identify groups of variables that will be handled differently ------
## original names
orig_names <- data.table::copy(names(df))
Expand Down Expand Up @@ -77,23 +75,26 @@ ag_average_poverty_stats <- function(df, return_cols) {
## Handle negatives ------
df[, (noneg_vars) :=
lapply(.SD, negative_to_na),
.SDcols = noneg_vars]
.SDcols = noneg_vars, by = poverty_line]

## Handle zeros -------------
df[, (zero_vars) :=
lapply(.SD, zeros_to_na),
.SDcols = zero_vars]
.SDcols = zero_vars, by = poverty_line]


# STEP 3: Calculations ----------
## weighted average ------
wgt_df <- df |>
# this grouping is not necessary, but ensures data.frame as output
collapse::fgroup_by(c("country_code", "reporting_year", "welfare_type")) |>
collapse::get_vars(c("reporting_pop", avg_names)) |>
collapse::fgroup_by(c("country_code", "reporting_year", "welfare_type", "poverty_line")) |>
collapse::get_vars(c("reporting_pop", "poverty_line", avg_names)) |>
collapse::fmean(reporting_pop,
keep.group_vars = FALSE,
keep.group_vars = TRUE,
keep.w = TRUE,
stub = FALSE)
stub = FALSE)|>
collapse::fselect(-country_code, -reporting_year, -welfare_type)



## Sum: National total of reporting vars ------
Expand All @@ -103,7 +104,13 @@ ag_average_poverty_stats <- function(df, return_cols) {

# STEP 4: Format results ----
## Bind resulting tables ----

# first_rows <- df[, .SD[1], by = poverty_line,
# .SDcols = c(nonum_names)]
#
# out <- merge(first_rows, wgt_df, by = "poverty_line", all = TRUE)
out <- cbind(df[1, .SD, .SDcols = nonum_names], wgt_df)
out$path <- fs::path(out$path)

## convert years back to numeric ----
out[, (years_vars) :=
Expand Down
2 changes: 1 addition & 1 deletion R/copy_functions.R
Original file line number Diff line number Diff line change
Expand Up @@ -583,7 +583,7 @@ pipgd_lorenz_curve <- function(
params$gd_params$lq$reg_results$coef[["B"]],
params$gd_params$lq$reg_results$coef[["C"]])

lc <- sapply(
lc <- sapply(
X = x_vec,
FUN = function(x1){
wbpip::value_at_lq(
Expand Down
52 changes: 34 additions & 18 deletions R/duckdb_func.R
Original file line number Diff line number Diff line change
Expand Up @@ -14,26 +14,30 @@ return_if_exists <- function(lkup, povline, cache_file_path, fill_gaps) {
con <- connect_with_retry(cache_file_path)

master_file <- DBI::dbGetQuery(con,
glue::glue("select * from {target_file}")) |>
duckplyr::as_duckplyr_tibble()
glue::glue("select * from {target_file}"))

# It is important to close the read connection before you open a write connection because
# duckdb kind of inherits read_only flag from previous connection object if it is not closed
# More details here https://app.clickup.com/t/868cdpe3q
duckdb::dbDisconnect(con)

data_present_in_master <-
dplyr::inner_join(
collapse::join(
x = master_file,
y = lkup |>
collapse::fselect(country_code, reporting_year, is_interpolated),
by = c("country_code", "reporting_year", "is_interpolated")) |>
dplyr::filter(poverty_line == povline)

collapse::fselect(country_code, reporting_year, is_interpolated, welfare_type),
on = c("country_code", "reporting_year", "is_interpolated", "welfare_type"),
how = "inner",
overid = 2,
verbose = 0) |>
collapse::fsubset(poverty_line %in% povline)
#browser()
keep <- TRUE
if (nrow(data_present_in_master) > 0) {
if (nrow(data_present_in_master) > 0 &&
all(povline %in% data_present_in_master$poverty_line)) {
# Remove the rows from lkup that are present in master
keep <- !with(lkup, paste(country_code, reporting_year, is_interpolated)) %in%
with(data_present_in_master, paste(country_code, reporting_year, is_interpolated))
keep <- !with(lkup, paste(country_code, reporting_year, is_interpolated, welfare_type)) %in%
with(data_present_in_master, paste(country_code, reporting_year, is_interpolated, welfare_type))

lkup <- lkup[keep, ]

Expand All @@ -55,22 +59,34 @@ return_if_exists <- function(lkup, povline, cache_file_path, fill_gaps) {
#' @export
#'
update_master_file <- function(dat, cache_file_path, fill_gaps) {
write_con <- duckdb::dbConnect(duckdb::duckdb(), dbdir = cache_file_path)
write_con <- connect_with_retry(cache_file_path, read_only = FALSE)
target_file <- if (fill_gaps) "fg_master_file" else "rg_master_file"

duckdb::duckdb_register(write_con, "append_data", dat, overwrite = TRUE)
DBI::dbExecute(write_con, glue::glue("INSERT INTO {target_file} SELECT * FROM append_data;"))
unique_keys <- c("country_code", "reporting_year", "is_interpolated", "welfare_type", "poverty_line")

# Insert the rows that don't exist already in the master file
nr <- DBI::dbExecute(write_con, glue::glue("
INSERT INTO {target_file}
SELECT *
FROM append_data AS a
WHERE NOT EXISTS (
SELECT 1
FROM {target_file} AS t
WHERE {glue::glue_collapse(
glue::glue('t.{unique_keys} = a.{unique_keys}'), sep = ' AND ')}
);
"))
duckdb::dbDisconnect(write_con)
message(glue::glue("{target_file} is updated."))
if(nr > 0) message(glue::glue("{target_file} is updated."))

return(nrow(dat))
return(nr)
}

connect_with_retry <- function(db_path, max_attempts = 5, delay_sec = 1) {
connect_with_retry <- function(db_path, max_attempts = 5, delay_sec = 1, read_only = TRUE) {
attempt <- 1
while (attempt <= max_attempts) {
tryCatch({
con <- duckdb::dbConnect(duckdb::duckdb(), dbdir = db_path, read_only = TRUE)
con <- duckdb::dbConnect(duckdb::duckdb(dbdir = db_path, read_only = read_only))
message("Connected on attempt ", attempt)
return(con)
}, error = function(e) {
Expand Down Expand Up @@ -108,7 +124,7 @@ reset_cache <- function(pass = Sys.getenv('PIP_CACHE_LOCAL_KEY'), type = c("both
}

create_duckdb_file <- function(cache_file_path) {
con <- duckdb::dbConnect(duckdb::duckdb(), dbdir = cache_file_path)
con <- connect_with_retry(cache_file_path, read_only = FALSE)
DBI::dbExecute(con, "CREATE OR REPLACE table rg_master_file (
country_code VARCHAR,
survey_id VARCHAR,
Expand Down
20 changes: 14 additions & 6 deletions R/fg_pip.R
Original file line number Diff line number Diff line change
Expand Up @@ -77,10 +77,13 @@ fg_pip <- function(country,
valid_regions = valid_regions,
data_dir = data_dir)

# Join because some data might be coming from cache so it might be absent in metadata
# Join because some data might be coming from cache so it might be absent in
# metadata
ctry_years <- collapse::join(ctry_years, metadata |>
collapse::fselect(intersect(names(ctry_years), names(metadata))),
verbose = 0,how = "inner")
collapse::fselect(intersect(names(ctry_years),
names(metadata))),
verbose = 0,
how = "inner")

results_subset <- vector(mode = "list", length = nrow(ctry_years))

Expand Down Expand Up @@ -113,17 +116,20 @@ fg_pip <- function(country,
# tmp_metadata <- unique(tmp_metadata)
# Add stats columns to data frame
for (stat in seq_along(tmp_stats)) {
tmp_metadata[[names(tmp_stats)[stat]]] <- tmp_stats[[stat]]
tmp_metadata[[names(tmp_stats)[stat]]] <- list(tmp_stats[[stat]])
}
# To allow multiple povline values, we store them in a list and unnest
tmp_metadata <- tmp_metadata %>%
unnest_dt_longer(names(tmp_metadata)[sapply(tmp_metadata, is.list)])

results_subset[[ctry_year_id]] <- tmp_metadata
}
out[[svy_id]] <- results_subset
}
out <- unlist(out, recursive = FALSE)
out <- data.table::rbindlist(out)


# # Remove median
# Remove median
# out[, median := NULL]

# Ensure that out does not have duplicates
Expand All @@ -134,6 +140,8 @@ fg_pip <- function(country,
poverty_line := round(poverty_line, digits = 3) ]

out$path <- as.character(out$path)
if("max_year" %in% names(out)) out$max_year <- NULL

return(list(main_data = out, data_in_cache = data_present_in_master))
}

Expand Down
9 changes: 1 addition & 8 deletions R/pip.R
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
#' lkup = lkups)
#' }
#' @export
#'
pip <- function(country = "ALL",
year = "ALL",
povline = 1.9,
Expand Down Expand Up @@ -103,14 +104,6 @@ pip <- function(country = "ALL",
aux_files = lkup$aux_files
)
# lcv$est_ctrs has all the country_code that we are interested in
# Integrate return_if_exists for following scenario
# 1) country = "AGO" year = 2000 pl = 1.9 should return from master file
# 2) country = "AGO" year = 2019 pl = 1.9 should return pip call
# 3) country = c("CHN", "IND"), year = 2019, 2017 should return half from master file and half from pip call
#
# 4) country = "all" year = 2019
# 5) country = "AGO" year = "all"
# 6) country = "all" year = "all"

cache_file_path <- fs::path(lkup$data_root, 'cache', ext = "duckdb")
if (!file.exists(cache_file_path)) {
Expand Down
9 changes: 6 additions & 3 deletions R/pip_grp.R
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,9 @@ pip_grp <- function(country = "ALL",

# Handle potential (insignificant) difference in poverty_line values that
# may mess-up the grouping
out$poverty_line <- povline
# I don't think we need this out$poverty_line already has the correct values additionally,
# since povline is vectorized the below line does not work as expected
#out$poverty_line <- povline

# Handle aggregations with sub-groups
if (group_by != "none") {
Expand Down Expand Up @@ -283,6 +285,7 @@ pip_aggregate_by <- function(df,

compute_world_aggregates <- function(rgn, cols) {
# Compute stats
# Grouping by poverty line as well since we now have vectorized poverty line values
wld <- rgn[, lapply(.SD,
stats::weighted.mean,
w = reporting_pop,
Expand All @@ -292,10 +295,10 @@ compute_world_aggregates <- function(rgn, cols) {
]
# Compute yearly population WLD totals
tmp <- rgn[, .(reporting_pop = sum(reporting_pop)),
by = .(reporting_year)]
by = .(reporting_year, poverty_line)]


wld <- wld[tmp, on = .(reporting_year = reporting_year)]
wld <- wld[tmp, on = .(reporting_year = reporting_year, poverty_line = poverty_line)]
wld[["region_code"]] <- "WLD"
wld[["region_name"]] <- "World"

Expand Down
7 changes: 4 additions & 3 deletions R/pip_grp_logic.R
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,10 @@ pip_grp_logic <- function(country = "ALL",
reporting_level <- match.arg(reporting_level)
group_by <- match.arg(group_by)


# Custom aggregations only supported at the national level
# subgroups aggregations only supported for "all" countries
country <- toupper(country)
year <- toupper(year)
if (group_by != "none") {
reporting_level <- "all"
if (!all(country %in% c("ALL", lkup$query_controls$region$values))) {
Expand Down Expand Up @@ -296,7 +296,6 @@ pip_grp_helper <- function(lcv_country,
if (nrow(out) == 0) {
return(pipapi::empty_response_grp)
}

# Handles aggregated distributions
if (reporting_level %in% c("national", "all")) {
out <- add_agg_stats(out,
Expand All @@ -305,7 +304,9 @@ pip_grp_helper <- function(lcv_country,

# Handle potential (insignificant) difference in poverty_line values that
# may mess-up the grouping
out$poverty_line <- povline
# I don't think we need this out$poverty_line already has the correct values additionally,
# since povline is vectorized the below line does not work as expected
# out$poverty_line <- povline

add_vars_out_of_pipeline(out, fill_gaps = TRUE, lkup = lkup)

Expand Down
8 changes: 5 additions & 3 deletions R/rg_pip.R
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ rg_pip <- function(country,
reporting_level = tmp_metadata$reporting_level,
path = tmp_metadata$path
)

tmp_stats <- wbpip:::prod_compute_pip_stats(
welfare = svy_data$df0$welfare,
povline = povline,
Expand All @@ -71,9 +70,12 @@ rg_pip <- function(country,
)
# Add stats columns to data frame
for (j in seq_along(tmp_stats)) {
tmp_metadata[[names(tmp_stats)[j]]] <- tmp_stats[[j]]
tmp_metadata[[names(tmp_stats)[j]]] <- list(tmp_stats[[j]])
}

# To allow multiple povline values, we store them in a list and unnest
tmp_metadata <-
tmp_metadata %>%
unnest_dt_longer(names(tmp_metadata)[sapply(tmp_metadata, is.list)])
out[[i]] <- tmp_metadata
}
#browser()
Expand Down
35 changes: 33 additions & 2 deletions R/utils.R
Original file line number Diff line number Diff line change
Expand Up @@ -381,7 +381,6 @@ censor_stats <- function(df, censored_table) {
setDT(df)
setDT(censored_table)


# Create a binary column to mark rows for removal based on 'all' statistic
df[, to_remove := FALSE]
censor_all <- censored_table[statistic == "all", .(id)]
Expand All @@ -396,7 +395,8 @@ censor_stats <- function(df, censored_table) {
censor_stats <- censored_table[statistic != "all"]
if (nrow(censor_stats) > 0) {
# Perform a non-equi join to mark relevant statistics
df[censor_stats, on = .(tmp_id = id), mult = "first",
# Commenting mult = "first" since with multiple povline values there are more than one rows
df[censor_stats, on = .(tmp_id = id), #mult = "first",
unique(censor_stats$statistic) := NA_real_]
}

Expand Down Expand Up @@ -1361,4 +1361,35 @@ add_vars_out_of_pipeline <- function(out, fill_gaps, lkup) {
invisible(out)
}

#' An efficient tidyr::unnest_longer
#'
#' @param tbl a dataframe/tibble/data.table
#' @param cols one (or more) column names in `tbl`
#'
#' @return A longer data.table
#' @export
#'
#' @examples
#' \dontrun{
#' df <- data.frame(
#' a = LETTERS[1:5],
#' b = LETTERS[6:10],
#' list_column1 = list(c(LETTERS[1:5]), "F", "G", "H", "I"),
#' list_column2 = list(c(LETTERS[1:5]), "F", "G", "H", "K")
#' )
#' unnest_dt_longer(df, grep("^list_column", names(df), value = TRUE))
#' }
unnest_dt_longer <- function(tbl, cols) {

tbl <- data.table::as.data.table(tbl)
clnms <- rlang::syms(setdiff(colnames(tbl), cols))

tbl <- eval(
rlang::expr(tbl[, lapply(.SD, unlist), by = list(!!!clnms), .SDcols = cols])
)

colnames(tbl) <- c(as.character(clnms), cols)

tbl
}

Loading