Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
54 commits
Select commit Hold shift + click to select a range
0beb12c
first draft
shahronak47 Dec 12, 2024
8f799dd
bring all functions in pipapi
shahronak47 Dec 13, 2024
5adc8bf
finish 3rd case
shahronak47 Dec 13, 2024
8d0d547
draft for case 4
shahronak47 Dec 16, 2024
48669d7
change all case
shahronak47 Dec 16, 2024
6efb333
fix case 4
shahronak47 Dec 16, 2024
af7bad1
draft push
shahronak47 Dec 22, 2024
dbbfa79
making sure everything works except country and year all
shahronak47 Dec 30, 2024
fa96de3
new version
shahronak47 Jan 1, 2025
cd752f3
fix for new implementation
shahronak47 Jan 2, 2025
3da0aff
change for fill_gaps
Jan 2, 2025
55ccf31
use more keys for joining
shahronak47 Jan 6, 2025
4d40952
final touches
shahronak47 Jan 9, 2025
050fa11
add data'
Jan 9, 2025
49d5d55
Speed comparison
shahronak47 Jan 9, 2025
f4295b2
time complete
Jan 9, 2025
7e377aa
add more stats
Jan 9, 2025
82902af
more comparison
Jan 9, 2025
2726d0e
update vignettee
Jan 10, 2025
819dc36
update numbers
shahronak47 Jan 10, 2025
33bb2f8
call connection object only once
shahronak47 Jan 11, 2025
21d4331
update speed comparison
Jan 14, 2025
83cc628
include dcos
shahronak47 Jan 14, 2025
79b05e2
fix conflicts
shahronak47 Jan 14, 2025
b6795ad
rm missing comma
shahronak47 Jan 14, 2025
9aaf684
push draft
shahronak47 Jan 18, 2025
7c52503
ready for separate master files
shahronak47 Jan 19, 2025
4f03a8d
remove bugs
shahronak47 Jan 19, 2025
70ed16e
fix docs
shahronak47 Jan 22, 2025
d1214ee
fix few issues
shahronak47 Jan 22, 2025
dc8685b
update caching
shahronak47 Jan 24, 2025
047b1a2
update
Jan 24, 2025
fb62569
update timing
Jan 24, 2025
c2c5fbb
add updates
shahronak47 Jan 27, 2025
82c8903
Vignette builder
shahronak47 Jan 27, 2025
d054382
separate read and write connection
shahronak47 Feb 7, 2025
f668d2e
added reset cache function
shahronak47 Feb 9, 2025
2d4c9f4
move connection in func;reset_cache ready for API
shahronak47 Feb 12, 2025
0fd49fc
option to query live data
shahronak47 Feb 13, 2025
dc65a50
Merge branch 'DEV' into implement-duckdb
shahronak47 Feb 14, 2025
6ed301e
add an API endpoint
shahronak47 Feb 14, 2025
5bcec83
Merge branch 'implement-duckdb' of https://github.com/PIP-Technical-T…
shahronak47 Feb 14, 2025
80eca1d
fix-fg_pip_local
Feb 19, 2025
a6cec56
early response for empty table
shahronak47 Feb 20, 2025
5ed78a4
Merge branch 'implement-duckdb' of https://github.com/PIP-Technical-T…
shahronak47 Feb 20, 2025
f75b011
add test file for testing cache
Feb 21, 2025
344d001
add default pipapi.query_live_data option and clean it up a little
Feb 21, 2025
f37d565
create file if it doesn't exist
Feb 25, 2025
a61cdb7
Merge branch 'implement-duckdb' of https://github.com/PIP-Technical-T…
Feb 25, 2025
3d8b3d0
lineup_year issue solve
Feb 26, 2025
8514b1b
fix tests
Feb 26, 2025
5afb426
change condition
shahronak47 Feb 28, 2025
c0f30aa
depend on latest duckplyr
shahronak47 Mar 6, 2025
1623ca8
update vignette
shahronak47 Mar 12, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .Rbuildignore
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,5 @@ TEMP/
^docs$
^pkgdown$
CONTRIBUTING.md
^doc$
^Meta$
5 changes: 3 additions & 2 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,10 @@ TEMP/
TEMP.R
renv/
.Rprofile
inst/doc
tests/testdata/app_data/
.Renviron
docs
logs/
/sessionInfoLog
demo.duckdb*
/doc/
/Meta/
11 changes: 8 additions & 3 deletions DESCRIPTION
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ Suggests:
covr,
testthat,
spelling,
knitr,
rmarkdown,
markdown,
assertthat,
Expand All @@ -36,7 +35,9 @@ Suggests:
lintr,
withr,
devtools,
stringr
stringr,
knitr,
dplyr
Language: en-US
Imports:
data.table,
Expand All @@ -58,7 +59,11 @@ Imports:
joyn,
yaml,
purrr,
future
future,
glue,
DBI,
duckdb,
duckplyr
Remotes:
PIP-Technical-Team/wbpip@DEV
Depends:
Expand Down
2 changes: 2 additions & 0 deletions NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ export(pip_grp)
export(pip_grp_logic)
export(pipgd_lorenz_curve)
export(return_correct_version)
export(return_if_exists)
export(select_off_alt_agg)
export(select_reporting_level)
export(select_user_aggs)
Expand All @@ -39,6 +40,7 @@ export(ui_hp_stacked)
export(ui_pc_charts)
export(ui_pc_regional)
export(ui_svy_meta)
export(update_master_file)
export(valid_years)
export(validate_input_grouped_stats)
export(version_dataframe)
Expand Down
2 changes: 1 addition & 1 deletion R/add_agg_stats.R
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ ag_average_poverty_stats <- function(df, return_cols) {
national_cols <- return_cols$national_cols

# This should be removed eventually
assertthat::assert_that(assertthat::are_equal(length(df$reporting_level), 2))
# assertthat::assert_that(assertthat::are_equal(length(df$reporting_level), 2))

# STEP 1: Identify groups of variables that will be handled differently ------
## original names
Expand Down
1 change: 0 additions & 1 deletion R/copy_functions.R
Original file line number Diff line number Diff line change
Expand Up @@ -494,7 +494,6 @@ gd_compute_headcount_lq <- function(
#' By default, the best fitting Lorenz parameterization (quadratic or beta) is
#' selected.
#'
#' @param params list of parameters
#' @param welfare numeric vector of cumulative share of welfare (income/consumption)
#' @param weight numeric vector of cumulative share of the population
#' @param lorenz either "lb" or "lq"
Expand Down
2 changes: 1 addition & 1 deletion R/create_countries_vctr.R
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ get_user_alt_gt <- function(user_gt, off_gt) {
return(out)
}

#' Helper function to define user_{var}_code
#' Helper function to define user_\{var\}_code
#'
#' @param x character: Grouping type needed by user
#'
Expand Down
202 changes: 202 additions & 0 deletions R/duckdb_func.R
Original file line number Diff line number Diff line change
@@ -0,0 +1,202 @@
#' Return the rows of the table if they exist in master file
#'
#' @inheritParams subset_lkup
#' @param con Connection object to duckdb table
#'
#' @return Dataframe
#' @export
#'
return_if_exists <- function(lkup, povline, con, fill_gaps) {
# It is not possible to append to parquet file https://stackoverflow.com/questions/39234391/how-to-append-data-to-an-existing-parquet-file
# Writing entire data will be very costly as data keeps on growing, better is to save data in duckdb and append to it.
if (!getOption("pipapi.query_live_data")) {
target_file <- if (fill_gaps) "fg_master_file" else "rg_master_file"

master_file <- DBI::dbGetQuery(con,
glue::glue("select * from {target_file}")) |>
duckplyr::as_duckplyr_tibble()

data_present_in_master <-
dplyr::inner_join(
x = master_file,
y = lkup |>
collapse::fselect(country_code, reporting_year, is_interpolated),
by = c("country_code", "reporting_year", "is_interpolated")) |>
dplyr::filter(poverty_line == povline)

keep <- TRUE
if (nrow(data_present_in_master) > 0) {
# Remove the rows from lkup that are present in master
keep <- !with(lkup, paste(country_code, reporting_year, is_interpolated)) %in%
with(data_present_in_master, paste(country_code, reporting_year, is_interpolated))

lkup <- lkup[keep, ]

message("Returning data from cache.")
}
} else {
data_present_in_master <- NULL
}
# nrow(data_present_in_master) should be equal to sum(keep)
return(list(data_present_in_master = data_present_in_master, lkup = lkup))
}

#' Update master file with the contents of the dataframe
#' @inheritParams pip
#' @param dat Dataframe to be appended
#' @param cache_file_path path where cache file is saved
#'
#' @return number of rows updated
#' @export
#'
update_master_file <- function(dat, cache_file_path, fill_gaps) {
write_con <- duckdb::dbConnect(duckdb::duckdb(), dbdir = cache_file_path)
target_file <- if (fill_gaps) "fg_master_file" else "rg_master_file"

duckdb::duckdb_register(write_con, "append_data", dat, overwrite = TRUE)
DBI::dbExecute(write_con, glue::glue("INSERT INTO {target_file} SELECT * FROM append_data;"))
duckdb::dbDisconnect(write_con)
message(glue::glue("{target_file} is updated."))

return(nrow(dat))
}


#' Reset the cache. Only to be used internally
#'
#' @noRd
reset_cache <- function(pass = Sys.getenv('PIP_CACHE_LOCAL_KEY'), type = c("both", "rg", "fg"), lkup) {
# lkup will be passed through API and will not be an argument to endpoint, same as pip call
# Checks if the keys match across local and server before reseting the cache
if (pass != Sys.getenv('PIP_CACHE_SERVER_KEY')) {
rlang::abort("Either key not set or incorrect key!")
}

cache_file_path <- fs::path(lkup$data_root, 'cache', ext = "duckdb")
write_con <- duckdb::dbConnect(duckdb::duckdb(), dbdir = cache_file_path)

type <- match.arg(type)
if(type == "both") type = c("rg", "fg")
if("rg" %in% type) {
DBI::dbExecute(write_con, "DELETE from rg_master_file")
}
if("fg" %in% type) {
DBI::dbExecute(write_con, "DELETE from fg_master_file")
}
duckdb::dbDisconnect(write_con)
}

create_duckdb_file <- function(cache_file_path) {
con <- duckdb::dbConnect(duckdb::duckdb(), dbdir = cache_file_path)
DBI::dbExecute(con, "CREATE OR REPLACE table rg_master_file (
country_code VARCHAR,
survey_id VARCHAR,
cache_id VARCHAR,
wb_region_code VARCHAR,
reporting_year DOUBLE,
surveyid_year VARCHAR,
survey_year DOUBLE,
survey_time VARCHAR,
survey_acronym VARCHAR,
survey_coverage VARCHAR,
survey_comparability DOUBLE,
comparable_spell VARCHAR,
welfare_type VARCHAR,
reporting_level VARCHAR,
survey_mean_lcu DOUBLE,
survey_mean_ppp DOUBLE,
survey_median_ppp DOUBLE,
survey_median_lcu DOUBLE,
predicted_mean_ppp DOUBLE,
ppp DOUBLE,
cpi DOUBLE,
reporting_pop DOUBLE,
reporting_gdp DOUBLE,
reporting_pce DOUBLE,
pop_data_level VARCHAR,
gdp_data_level VARCHAR,
pce_data_level VARCHAR,
cpi_data_level VARCHAR,
ppp_data_level VARCHAR,
distribution_type VARCHAR,
gd_type VARCHAR,
is_interpolated BOOLEAN,
is_used_for_line_up BOOLEAN,
is_used_for_aggregation BOOLEAN,
estimation_type VARCHAR,
display_cp DOUBLE,
path VARCHAR,
country_name VARCHAR,
africa_split VARCHAR,
africa_split_code VARCHAR,
region_name VARCHAR,
region_code VARCHAR,
world VARCHAR,
world_code VARCHAR,
poverty_line DOUBLE,
mean DOUBLE,
median DOUBLE,
headcount DOUBLE,
poverty_gap DOUBLE,
poverty_severity DOUBLE,
watts DOUBLE

)")

DBI::dbExecute(con, "CREATE OR REPLACE table fg_master_file (
country_code VARCHAR,
survey_id VARCHAR,
cache_id VARCHAR,
wb_region_code VARCHAR,
reporting_year DOUBLE,
surveyid_year VARCHAR,
survey_year DOUBLE,
survey_time VARCHAR,
survey_acronym VARCHAR,
survey_coverage VARCHAR,
survey_comparability DOUBLE,
comparable_spell VARCHAR,
welfare_type VARCHAR,
reporting_level VARCHAR,
survey_mean_lcu DOUBLE,
survey_mean_ppp DOUBLE,
survey_median_ppp DOUBLE,
survey_median_lcu DOUBLE,
predicted_mean_ppp DOUBLE,
ppp DOUBLE,
cpi DOUBLE,
reporting_pop DOUBLE,
reporting_gdp DOUBLE,
reporting_pce DOUBLE,
pop_data_level VARCHAR,
gdp_data_level VARCHAR,
pce_data_level VARCHAR,
cpi_data_level VARCHAR,
ppp_data_level VARCHAR,
distribution_type VARCHAR,
gd_type VARCHAR,
is_interpolated BOOLEAN,
is_used_for_line_up BOOLEAN,
is_used_for_aggregation BOOLEAN,
estimation_type VARCHAR,
interpolation_id VARCHAR,
display_cp DOUBLE,
country_name VARCHAR,
africa_split VARCHAR,
africa_split_code VARCHAR,
region_name VARCHAR,
region_code VARCHAR,
world VARCHAR,
world_code VARCHAR,
path VARCHAR,
data_interpolation_id VARCHAR,
poverty_line DOUBLE,
mean DOUBLE,
median DOUBLE,
headcount DOUBLE,
poverty_gap DOUBLE,
poverty_severity DOUBLE,
watts DOUBLE
)")
DBI::dbDisconnect(con)
}
29 changes: 17 additions & 12 deletions R/fg_pip.R
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#' Compute the main PIP poverty and inequality statistics for imputed years.
#'
#' @inheritParams pip
#' @param con duckdb connection object
#' @return data.frame
#' @keywords internal
fg_pip <- function(country,
Expand All @@ -12,14 +13,14 @@ fg_pip <- function(country,
welfare_type,
reporting_level,
ppp,
lkup) {
lkup,
con) {

valid_regions <- lkup$query_controls$region$values
interpolation_list <- lkup$interpolation_list
data_dir <- lkup$data_root
ref_lkup <- lkup$ref_lkup


# Handle interpolation
metadata <- subset_lkup(
country = country,
Expand All @@ -28,8 +29,14 @@ fg_pip <- function(country,
reporting_level = reporting_level,
lkup = ref_lkup,
valid_regions = valid_regions,
data_dir = data_dir
data_dir = data_dir,
povline = povline,
con = con,
fill_gaps = TRUE
)

data_present_in_master <- metadata$data_present_in_master
metadata <- metadata$lkup
# Remove aggregate distribution if popshare is specified
# TEMPORARY FIX UNTIL popshare is supported for aggregate distributions
metadata <- filter_lkup(metadata = metadata,
Expand All @@ -38,7 +45,7 @@ fg_pip <- function(country,

# Return empty dataframe if no metadata is found
if (nrow(metadata) == 0) {
return(pipapi::empty_response)
return(list(main_data = empty_response, data_in_cache = data_present_in_master))
}

unique_survey_files <- unique(metadata$data_interpolation_id)
Expand All @@ -54,7 +61,6 @@ fg_pip <- function(country,
# Extract country-years for which stats will be computed from the same files
# tmp_metadata <- interpolation_list[[unique_survey_files[svy_id]]]$tmp_metadata
iteration <- interpolation_list[[unique_survey_files[svy_id]]]

svy_data <- get_svy_data(svy_id = iteration$cache_ids,
reporting_level = iteration$reporting_level,
path = iteration$paths)
Expand All @@ -66,10 +72,14 @@ fg_pip <- function(country,
valid_regions = valid_regions,
data_dir = data_dir)

# Join because some data might be coming from cache so it might be absent in metadata
ctry_years <- collapse::join(ctry_years, metadata |>
collapse::fselect(intersect(names(ctry_years), names(metadata))),
verbose = 0,how = "inner")

results_subset <- vector(mode = "list", length = nrow(ctry_years))

for (ctry_year_id in seq_along(ctry_years$interpolation_id)) {

# Extract records to be used for a single country-year estimation
interp_id <- ctry_years[["interpolation_id"]][ctry_year_id]
tmp_metadata <- metadata[metadata$interpolation_id == interp_id, ]
Expand All @@ -96,19 +106,14 @@ fg_pip <- function(country,
}
#
# tmp_metadata <- unique(tmp_metadata)

# Add stats columns to data frame
for (stat in seq_along(tmp_stats)) {
tmp_metadata[[names(tmp_stats)[stat]]] <- tmp_stats[[stat]]
}


results_subset[[ctry_year_id]] <- tmp_metadata
}

out[[svy_id]] <- results_subset
}

out <- unlist(out, recursive = FALSE)
out <- data.table::rbindlist(out)

Expand All @@ -124,7 +129,7 @@ fg_pip <- function(country,
poverty_line := round(poverty_line, digits = 3) ]


return(out)
return(list(main_data = out, data_in_cache = data_present_in_master))
}

#' Remove duplicated rows created during the interpolation process
Expand Down
Loading