Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ on:

env:
REGISTRY: ghcr.io
IMAGE_NAME: nrel/api-umbrella
IMAGE_NAME: natlabrockies/api-umbrella
DOCKER_BUILDKIT: 1
TESTS_GLOB: "test/**/test_*.rb"

Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ on:

env:
REGISTRY: ghcr.io
IMAGE_NAME: nrel/api-umbrella
IMAGE_NAME: natlabrockies/api-umbrella
DOCKER_BUILDKIT: 1

jobs:
Expand Down
2 changes: 1 addition & 1 deletion Dockerfile-opensearch
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
FROM public.ecr.aws/opensearchproject/opensearch:2.17.1
FROM public.ecr.aws/opensearchproject/opensearch:3.3.2
RUN /usr/share/opensearch/bin/opensearch-plugin install --batch mapper-murmur3
2 changes: 2 additions & 0 deletions config/schema.cue
Original file line number Diff line number Diff line change
Expand Up @@ -380,6 +380,8 @@ import "path"
analytics_v0_summary_start_time: string | *"2013-07-01T00:00:00.000Z"
analytics_v0_summary_end_time?: string
analytics_v0_summary_filter?: string
analytics_v0_summary_db_timeout: uint | *900 // 15 minutes
analytics_v0_summary_analytics_timeout: uint | *2400 // 40 minutes
max_body_size: string | *"1m"
allowed_signup_embed_urls_regex?: string
default_host?: string
Expand Down
2 changes: 1 addition & 1 deletion config/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ web:
user_name: api_umbrella
password: dev_password
contact_form_email: default-test-contact-email@example.com
analytics_v0_summary_start_time: "2013-07-01T00:00:00.000-06:00"
analytics_v0_summary_start_time: "2013-06-01T00:00:00.000-06:00"
analytics_v0_summary_end_time: "2013-08-31T23:59:59.999-06:00"
router:
trusted_proxies:
Expand Down
8 changes: 7 additions & 1 deletion db/schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ CREATE FUNCTION api_umbrella.analytics_cache_extract_unique_user_ids() RETURNS t
AS $$
BEGIN
IF (jsonb_typeof(NEW.data->'aggregations'->'unique_user_ids'->'buckets') = 'array') THEN
NEW.unique_user_ids := (SELECT array_agg(DISTINCT bucket->>'key')::uuid[] FROM jsonb_array_elements(NEW.data->'aggregations'->'unique_user_ids'->'buckets') AS bucket);
NEW.unique_user_ids := (SELECT array_agg(DISTINCT bucket->'key'->>'user_id')::uuid[] FROM jsonb_array_elements(NEW.data->'aggregations'->'unique_user_ids'->'buckets') AS bucket);
END IF;

RETURN NEW;
Expand Down Expand Up @@ -765,6 +765,9 @@ CREATE TABLE api_umbrella.analytics_cache (
created_at timestamp with time zone DEFAULT transaction_timestamp() NOT NULL,
updated_at timestamp with time zone DEFAULT transaction_timestamp() NOT NULL,
unique_user_ids uuid[],
data_date character varying GENERATED ALWAYS AS (((((((data -> 'aggregations'::text) -> 'hits_over_time'::text) -> 'buckets'::text) -> 0) ->> 'key_as_string'::text))::character varying) STORED,
hit_count bigint GENERATED ALWAYS AS (((((((data -> 'aggregations'::text) -> 'hits_over_time'::text) -> 'buckets'::text) -> 0) ->> 'doc_count'::text))::bigint) STORED,
response_time_average bigint GENERATED ALWAYS AS (round(((((data -> 'aggregations'::text) -> 'response_time_average'::text) ->> 'value'::text))::numeric)) STORED,
CONSTRAINT analytics_cache_enforce_single_date_bucket CHECK ((NOT (jsonb_array_length((((data -> 'aggregations'::text) -> 'hits_over_time'::text) -> 'buckets'::text)) > 1)))
);

Expand Down Expand Up @@ -2801,6 +2804,7 @@ ALTER TABLE ONLY api_umbrella.rate_limits
-- PostgreSQL database dump complete
--


INSERT INTO api_umbrella.lapis_migrations (name) VALUES ('1498350289');
INSERT INTO api_umbrella.lapis_migrations (name) VALUES ('1554823736');
INSERT INTO api_umbrella.lapis_migrations (name) VALUES ('1560722058');
Expand All @@ -2820,3 +2824,5 @@ INSERT INTO api_umbrella.lapis_migrations (name) VALUES ('1701483732');
INSERT INTO api_umbrella.lapis_migrations (name) VALUES ('1721347955');
INSERT INTO api_umbrella.lapis_migrations (name) VALUES ('1738353016');
INSERT INTO api_umbrella.lapis_migrations (name) VALUES ('1753472899');
INSERT INTO api_umbrella.lapis_migrations (name) VALUES ('1769633747');
INSERT INTO api_umbrella.lapis_migrations (name) VALUES ('1769732670');
2 changes: 1 addition & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ services:
dockerfile: Dockerfile
target: test
cache_from:
- ${DOCKER_IMAGE_CACHE_FROM:-ghcr.io/nrel/api-umbrella:dev-env-main}
- ${DOCKER_IMAGE_CACHE_FROM:-ghcr.io/natlabrockies/api-umbrella:dev-env-main}
entrypoint: /app/docker/dev/docker-entrypoint
command: /app/docker/dev/docker-start
volumes:
Expand Down
2 changes: 1 addition & 1 deletion src/api-umbrella/cli/migrate.lua
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ return function()
local clean_lines = {}
local removing_comments = true
for _, line in ipairs(lines) do
if not removing_comments or (line ~= "" and not startswith(line, "--")) then
if (not removing_comments or (line ~= "" and not startswith(line, "--"))) and not startswith(line, "\\restrict") and not startswith(line, "\\unrestrict") then
if startswith(line, "COMMENT ON EXTENSION") then
line = "-- " .. line
end
Expand Down
2 changes: 1 addition & 1 deletion src/api-umbrella/version.txt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
1.6.0
1.6.6
164 changes: 109 additions & 55 deletions src/api-umbrella/web-app/actions/v0/analytics.lua
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,17 @@ local stable_object_hash = require "api-umbrella.utils.stable_object_hash"
local t = require("api-umbrella.web-app.utils.gettext").gettext
local time = require "api-umbrella.utils.time"

local db_statement_timeout_ms = config["web"]["analytics_v0_summary_db_timeout"] * 1000

local _M = {}

local function generate_organization_summary(start_time, end_time, recent_start_time, filters)
local cache_id = "analytics_summary:organization:" .. start_time .. ":" .. end_time .. ":" .. recent_start_time .. ":" .. stable_object_hash(filters)
local function generate_organization_summary(organization_name, start_time, end_time, recent_start_time, filters)
local cache_id = "analytics_summary:organization:" .. organization_name .. ":" .. start_time .. ":" .. end_time .. ":" .. recent_start_time .. ":" .. stable_object_hash({
filters = filters,
timezone = config["analytics"]["timezone"],
max_buckets = config["opensearch"]["max_buckets"],
analytics_v0_summary_filter = config["web"]["analytics_v0_summary_filter"],
})
local cache = Cache:find(cache_id)
if cache then
ngx.log(ngx.NOTICE, "Using cached analytics response for " .. cache_id)
Expand All @@ -38,50 +45,65 @@ local function generate_organization_summary(start_time, end_time, recent_start_
if config["web"]["analytics_v0_summary_filter"] then
search:set_search_query_string(config["web"]["analytics_v0_summary_filter"])
end
search:set_timeout(20 * 60) -- 20 minutes
search:set_timeout(config["web"]["analytics_v0_summary_analytics_timeout"])
search:set_permission_scope(filters)

local aggregate_sql = [[
WITH interval_rows AS (
SELECT
substring(data_date from 1 for :date_key_length) AS interval_date,
hit_count,
unique_user_ids,
response_time_average
FROM analytics_cache
WHERE id IN :ids
),
interval_unique_user_ids AS (
SELECT
interval_date,
array_agg(DISTINCT user_ids.user_id) FILTER (WHERE user_ids.user_id IS NOT NULL) AS unique_user_ids
FROM interval_rows
CROSS JOIN LATERAL unnest(unique_user_ids) AS user_ids(user_id)
GROUP BY interval_date
),
interval_counts AS (
SELECT
interval_date,
SUM(hit_count) AS hit_count,
SUM(response_time_average) AS response_time_average
FROM interval_rows
GROUP BY interval_date
),
interval_totals AS (
SELECT
interval_counts.interval_date,
interval_counts.hit_count,
interval_unique_user_ids.unique_user_ids,
interval_counts.response_time_average
FROM interval_counts
NATURAL LEFT JOIN interval_unique_user_ids
ORDER BY interval_date
),
all_unique_users AS (
SELECT COUNT(DISTINCT user_ids.user_id) FILTER (WHERE user_ids.user_id IS NOT NULL) AS total_unique_users
FROM interval_totals
LEFT JOIN LATERAL unnest(interval_totals.unique_user_ids) AS user_ids(user_id) ON true
)
SELECT jsonb_build_object(
'hits', jsonb_build_object(
:interval_name, jsonb_agg(jsonb_build_array(interval_totals.interval_date, COALESCE(interval_totals.hit_count, 0))),
'total', SUM(interval_totals.hit_count)
),
'active_api_keys', jsonb_build_object(
:interval_name, jsonb_agg(jsonb_build_array(interval_totals.interval_date, COALESCE(array_length(interval_totals.unique_user_ids, 1), 0))),
'total', (
SELECT COUNT(DISTINCT user_ids.id)
FROM unnest(array_accum(interval_totals.unique_user_ids)) AS user_ids(id)
)
'total', (SELECT total_unique_users FROM all_unique_users)
),
'average_response_times', jsonb_build_object(
:interval_name, jsonb_agg(jsonb_build_array(interval_totals.interval_date, interval_totals.response_time_average)),
'average', ROUND(SUM(CASE WHEN interval_totals.response_time_average IS NOT NULL AND interval_totals.hit_count IS NOT NULL THEN interval_totals.response_time_average * interval_totals.hit_count END) / SUM(CASE WHEN interval_totals.response_time_average IS NOT NULL AND interval_totals.hit_count IS NOT NULL THEN interval_totals.hit_count END))
)
) AS response
FROM (
SELECT
interval_date,
hit_count,
response_time_average,
(
SELECT array_agg(DISTINCT user_id)
FROM unnest(interval_agg.user_ids) AS user_id
LEFT JOIN api_users ON user_id = api_users.id
WHERE user_id IS NOT NULL AND api_users.disabled_at IS NULL
) AS unique_user_ids
FROM (
SELECT
substring(data->'aggregations'->'hits_over_time'->'buckets'->0->>'key_as_string' from 1 for :date_key_length) AS interval_date,
SUM((data->'aggregations'->'hits_over_time'->'buckets'->0->>'doc_count')::bigint) AS hit_count,
array_accum(unique_user_ids) AS user_ids,
SUM(ROUND((data->'aggregations'->'response_time_average'->>'value')::numeric)) AS response_time_average
FROM analytics_cache
WHERE id IN :ids
GROUP BY interval_date
ORDER BY interval_date
) AS interval_agg
) AS interval_totals
FROM interval_totals
]]

-- Expire the monthly data in 3 months. While the historical data shouldn't
Expand All @@ -97,7 +119,7 @@ local function generate_organization_summary(start_time, end_time, recent_start_
date_key_length = 7,
}, {
fatal = true,
statement_timeout = 5 * 60 * 1000, -- 5 minutes
statement_timeout = db_statement_timeout_ms,
})[1]["response"]

search:set_start_time(recent_start_time)
Expand All @@ -112,13 +134,26 @@ local function generate_organization_summary(start_time, end_time, recent_start_
date_key_length = 10,
}, {
fatal = true,
statement_timeout = 5 * 60 * 1000, -- 5 minutes
statement_timeout = db_statement_timeout_ms,
})[1]["response"]

response["hits"]["recent"] = recent_response["hits"]
response["active_api_keys"]["recent"] = recent_response["active_api_keys"]
response["average_response_times"]["recent"] = recent_response["average_response_times"]

-- Only cache the data if it includes the expected latest month of data, and
-- also includes all months/days expected. This prevents returning and
-- caching incomplete data due to the underlying analytics queries failing
-- for certain time periods and forces the data to wait until all of the
-- underlying data is cached before returning the overall summary data.
local last_month = response["hits"]["monthly"][#response["hits"]["monthly"]]
local expected_last_month = string.sub(end_time, 1, 7)
local last_day = response["hits"]["recent"]["daily"][#response["hits"]["recent"]["daily"]]
local expected_last_day = string.sub(end_time, 1, 10)
if last_month[1] ~= expected_last_month or last_day[1] ~= expected_last_day or #analytics_cache_ids ~= #response["hits"]["monthly"] or #recent_analytics_cache_ids ~= #response["hits"]["recent"]["daily"] then
return nil, "incomplete data"
end

local response_json = json_encode(response)
expires_at = ngx.now() + 60 * 60 * 24 * 2 -- 2 days
Cache:upsert(cache_id, response_json, expires_at)
Expand All @@ -127,6 +162,8 @@ local function generate_organization_summary(start_time, end_time, recent_start_
end

local function generate_production_apis_summary(start_time, end_time, recent_start_time)
local any_err = false

local data = {
organizations = {},
}
Expand All @@ -139,7 +176,7 @@ local function generate_production_apis_summary(start_time, end_time, recent_sta
WHERE api_backends.status_description = 'Production'
]], nil, {
fatal = true,
statement_timeout = 5 * 60 * 1000, -- 5 minutes
statement_timeout = db_statement_timeout_ms,
})
data["organization_count"] = int64_to_json_number(counts[1]["organization_count"])
data["api_backend_count"] = int64_to_json_number(counts[1]["api_backend_count"])
Expand All @@ -162,7 +199,7 @@ local function generate_production_apis_summary(start_time, end_time, recent_sta
ORDER BY api_backends.organization_name
]], nil, {
fatal = true,
statement_timeout = 5 * 60 * 1000, -- 5 minutes
statement_timeout = db_statement_timeout_ms,
})
for _, organization in ipairs(organizations) do
local filters = {
Expand Down Expand Up @@ -190,18 +227,32 @@ local function generate_production_apis_summary(start_time, end_time, recent_sta
end

ngx.log(ngx.NOTICE, 'Fetching analytics for organization "' .. organization["organization_name"] .. '"')
local organization_data = generate_organization_summary(start_time, end_time, recent_start_time, filters)
organization_data["name"] = organization["organization_name"]
organization_data["api_backend_count"] = int64_to_json_number(organization["api_backend_count"])
organization_data["api_backend_url_match_count"] = int64_to_json_number(organization["api_backend_url_match_count"])
table.insert(data["organizations"], organization_data)
local organization_data, organization_data_err = generate_organization_summary(organization["organization_name"], start_time, end_time, recent_start_time, filters)
if organization_data_err then
ngx.log(ngx.ERR, 'Analytics for organization "' .. organization["organization_name"] .. '" failed: ', organization_data_err)
any_err = true
else
organization_data["name"] = organization["organization_name"]
organization_data["api_backend_count"] = int64_to_json_number(organization["api_backend_count"])
organization_data["api_backend_url_match_count"] = int64_to_json_number(organization["api_backend_url_match_count"])
table.insert(data["organizations"], organization_data)
end
end

ngx.log(ngx.NOTICE, "Fetching analytics for all organizations")
local all_data = generate_organization_summary(start_time, end_time, recent_start_time, all_filters)
data["all"] = all_data
local all_data, all_data_err = generate_organization_summary("all", start_time, end_time, recent_start_time, all_filters)
if all_data_err then
ngx.log(ngx.ERR, "Analytics for all organization failed: ", all_data_err)
any_err = true
else
data["all"] = all_data
end

return data
if any_err then
return nil, "incomplete data"
else
return data
end
end

local function generate_summary()
Expand Down Expand Up @@ -236,21 +287,24 @@ local function generate_summary()
date_tz:set(icu_date.fields.MILLISECOND, 0)
local recent_start_time = date_tz:format(format_iso8601)

local response = {
production_apis = generate_production_apis_summary(start_time, end_time, recent_start_time),
start_time = time.timestamp_ms_to_iso8601(start_time_ms),
end_time = time.timestamp_ms_to_iso8601(end_time_ms),
timezone = date_tz:get_time_zone_id(),
}

response["cached_at"] = time.timestamp_to_iso8601(ngx.now())
local production_apis, production_apis_err = generate_production_apis_summary(start_time, end_time, recent_start_time)
if production_apis_err then
ngx.log(ngx.ERR, "Production APIs summary error: ", production_apis_err)
else
local response = {
production_apis = production_apis,
start_time = time.timestamp_ms_to_iso8601(start_time_ms),
end_time = time.timestamp_ms_to_iso8601(end_time_ms),
timezone = date_tz:get_time_zone_id(),
}

local cache_id = "analytics_summary"
local response_json = json_encode(response)
local expires_at = ngx.now() + 60 * 60 * 24 * 2 -- 2 days
Cache:upsert(cache_id, response_json, expires_at)
response["cached_at"] = time.timestamp_to_iso8601(ngx.now())

return response_json
local cache_id = "analytics_summary"
local response_json = json_encode(response)
local expires_at = nil -- Never expire
Cache:upsert(cache_id, response_json, expires_at)
end
end

function _M.summary(self)
Expand Down
1 change: 1 addition & 0 deletions src/api-umbrella/web-app/hooks/init_preload_modules.lua
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ require "api-umbrella.utils.active_config_store.build_web_app_active_config"
require "api-umbrella.utils.active_config_store.fetch_published_config_for_setting_active_config"
require "api-umbrella.utils.active_config_store.polling_set_active_config"
require "api-umbrella.utils.api_key_prefixer"
require "api-umbrella.utils.append_array"
require "api-umbrella.utils.array_includes"
require "api-umbrella.utils.array_last"
require "api-umbrella.utils.build_url"
Expand Down
Loading
Loading