Skip to content
This repository was archived by the owner on Apr 2, 2024. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ jobs:
sudo mv sccache-*/sccache /usr/local/bin/sccache
sccache --show-stats

- uses: Swatinem/rust-cache@v2
# - uses: Swatinem/rust-cache@v2

- name: Install cargo-pgx
run: |
Expand All @@ -44,7 +44,7 @@ jobs:
- name: Initialize pgx
if: ${{ steps.cache-pgx.outputs.cache-hit != 'true' }}
run: cargo pgx init --pg${{ matrix.postgres.version }} download

- name: Run cargo test
run: cargo pgx test pg${{ matrix.postgres.version }}

Expand Down
22 changes: 21 additions & 1 deletion docs/sql-api.md
Original file line number Diff line number Diff line change
Expand Up @@ -492,6 +492,18 @@ the specified span_id.
```
function TABLE(trace_id trace_id, parent_span_id bigint, span_id bigint, dist integer, path bigint[]) **ps_trace.upstream_spans**(_trace_id trace_id, _span_id bigint, _max_dist integer DEFAULT NULL::integer)
```
### _prom_catalog._actually_delete_series_and_labels
Internal utility function which only exists due to restrictions on plpgsql
transaction control and SECURITY DEFINER
```
procedure void **_prom_catalog._actually_delete_series_and_labels**(IN metric_schema text, IN metric_table text, IN metric_series_table text, IN deletion_epoch bigint)
```
### _prom_catalog._lock_and_set_epoch
Internal utility function which only exists due to restrictions on plpgsql
transaction control and SECURITY DEFINER
```
procedure void **_prom_catalog._lock_and_set_epoch**(IN new_current_epoch bigint, IN new_delete_epoch bigint)
```
### _prom_catalog.attach_series_partition

```
Expand Down Expand Up @@ -575,7 +587,7 @@ function void **_prom_catalog.delay_compression_job**(ht_table text, new_start t
### _prom_catalog.delete_expired_series

```
function void **_prom_catalog.delete_expired_series**(metric_schema text, metric_table text, metric_series_table text, ran_at timestamp with time zone, present_epoch bigint, last_updated_epoch timestamp with time zone)
procedure void **_prom_catalog.delete_expired_series**(IN metric_schema text, IN metric_table text, IN metric_series_table text, IN run_at timestamp with time zone)
```
### _prom_catalog.delete_series_catalog_row

Expand Down Expand Up @@ -832,6 +844,14 @@ function TABLE(hypertable_name text, node_name text, node_up boolean) **_prom_ca
```
function TABLE(hypertable_name text, table_bytes bigint, index_bytes bigint, toast_bytes bigint, total_bytes bigint) **_prom_catalog.hypertable_remote_size**(schema_name_in text)
```
### _prom_catalog.initialize_current_epoch
This function can be used to initialize the current epoch to the epoch value
of `ran_at`. The value will only be adjusted if `current_epoch` is 0, which is
the default value set by schema migrations.
This is necessary to prevent spurious epoch aborts during ingestion.
```
function bigint **_prom_catalog.initialize_current_epoch**(ran_at timestamp with time zone)
```
### _prom_catalog.insert_exemplar_row

```
Expand Down
208 changes: 140 additions & 68 deletions migration/idempotent/001-base.sql
Original file line number Diff line number Diff line change
Expand Up @@ -864,11 +864,14 @@ CREATE OR REPLACE FUNCTION _prom_catalog.delete_series_catalog_row(
SET search_path = pg_catalog, pg_temp
AS
$$
DECLARE
locally_observed_epoch BIGINT;
BEGIN
SELECT current_epoch FROM _prom_catalog.ids_epoch FOR UPDATE INTO STRICT locally_observed_epoch;
EXECUTE FORMAT(
'UPDATE prom_data_series.%1$I SET delete_epoch = current_epoch+1 FROM _prom_catalog.ids_epoch WHERE delete_epoch IS NULL AND id = ANY($1)',
'UPDATE prom_data_series.%1$I s SET delete_epoch = $1 WHERE s.delete_epoch IS NULL AND id = ANY($2)',
metric_table
) USING series_ids;
) USING locally_observed_epoch, series_ids;
RETURN;
END;
$$
Expand Down Expand Up @@ -1980,12 +1983,12 @@ CREATE OR REPLACE FUNCTION _prom_catalog.epoch_abort(user_epoch BIGINT)
VOLATILE
SET search_path = pg_catalog, pg_temp
AS $func$
DECLARE db_epoch BIGINT;
DECLARE current_delete_epoch BIGINT;
BEGIN
SELECT current_epoch FROM _prom_catalog.ids_epoch LIMIT 1
INTO db_epoch;
RAISE EXCEPTION 'epoch % to old to continue INSERT, current: %',
user_epoch, db_epoch
SELECT delete_epoch FROM _prom_catalog.ids_epoch LIMIT 1
INTO current_delete_epoch;
RAISE EXCEPTION 'epoch % older than oldest delete epoch %, unable to INSERT',
user_epoch, current_delete_epoch
USING ERRCODE='PS001';
END;
$func$ LANGUAGE PLPGSQL;
Expand Down Expand Up @@ -2058,6 +2061,34 @@ data newer than `newer_than` in any metric table.
Note: See _prom_catalog.mark_series_to_be_dropped_as_unused for context.
';

CREATE OR REPLACE FUNCTION _prom_catalog.initialize_current_epoch(ran_at TIMESTAMPTZ)
RETURNS BIGINT
--security definer to have permissions on ids_epoch table
SECURITY DEFINER
VOLATILE
SET search_path = pg_catalog, pg_temp
AS $func$
DECLARE
epoch BIGINT;
BEGIN
epoch := EXTRACT (EPOCH FROM ran_at);
UPDATE _prom_catalog.ids_epoch e SET current_epoch = epoch WHERE e.current_epoch = 0;
RETURN (SELECT current_epoch FROM _prom_catalog.ids_epoch);
END
$func$
LANGUAGE plpgsql;
--redundant given schema settings but extra caution for security definers
REVOKE ALL ON FUNCTION _prom_catalog.initialize_current_epoch(TIMESTAMPTZ) FROM PUBLIC;
GRANT EXECUTE ON FUNCTION _prom_catalog.initialize_current_epoch(TIMESTAMPTZ) TO prom_writer;
GRANT EXECUTE ON FUNCTION _prom_catalog.initialize_current_epoch(TIMESTAMPTZ) TO prom_maintenance;
COMMENT ON FUNCTION _prom_catalog.initialize_current_epoch(TIMESTAMPTZ)
IS
'This function can be used to initialize the current epoch to the epoch value
of `ran_at`. The value will only be adjusted if `current_epoch` is 0, which is
the default value set by schema migrations.
This is necessary to prevent spurious epoch aborts during ingestion.';


-- Marks series which we will drop soon as unused.
-- A series is unused if there is no data newer than `drop_point` which
-- references that series.
Expand All @@ -2076,6 +2107,7 @@ CREATE OR REPLACE FUNCTION _prom_catalog.mark_series_to_be_dropped_as_unused(
AS $func$
DECLARE
check_time TIMESTAMPTZ;
locally_observed_epoch BIGINT;
BEGIN

-- We determine if a series is unused by "looking back" over data older
Expand All @@ -2099,6 +2131,8 @@ BEGIN
SELECT drop_point OPERATOR(pg_catalog.+) pg_catalog.interval '1 hour'
INTO check_time;

SELECT current_epoch FROM _prom_catalog.ids_epoch FOR UPDATE INTO STRICT locally_observed_epoch;

EXECUTE format(
$query$
WITH potentially_drop_series AS (
Expand All @@ -2113,11 +2147,10 @@ BEGIN
SELECT _prom_catalog.get_confirmed_unused_series('%1$s','%2$s','%3$s', array_agg(series_id), %5$L) as ids
FROM potentially_drop_series
) -- we want this next statement to be the last one in the txn since it could block series fetch (both of them update delete_epoch)
UPDATE prom_data_series.%3$I SET delete_epoch = current_epoch+1
FROM _prom_catalog.ids_epoch
WHERE delete_epoch IS NULL
AND id IN (SELECT unnest(ids) FROM confirmed_drop_series)
$query$, metric_schema, metric_table, metric_series_table, drop_point, check_time);
UPDATE prom_data_series.%3$I s SET delete_epoch = %6$L
WHERE s.delete_epoch IS NULL
AND s.id IN (SELECT unnest(ids) FROM confirmed_drop_series)
$query$, metric_schema, metric_table, metric_series_table, drop_point, check_time, locally_observed_epoch);
END
$func$
LANGUAGE PLPGSQL;
Expand All @@ -2136,59 +2169,106 @@ This function is designed to be used in the context of dropping metric
chunks, see `_prom_catalog.drop_metric_chunks`.
';

CREATE OR REPLACE FUNCTION _prom_catalog.delete_expired_series(
metric_schema TEXT, metric_table TEXT, metric_series_table TEXT, ran_at TIMESTAMPTZ, present_epoch BIGINT, last_updated_epoch TIMESTAMPTZ
CREATE OR REPLACE PROCEDURE _prom_catalog.delete_expired_series(
metric_schema TEXT, metric_table TEXT, metric_series_table TEXT, run_at TIMESTAMPTZ
)
RETURNS VOID
--security definer to add jobs as the logged-in user
AS $func$
DECLARE
deletion_epoch BIGINT;
run_at_seconds BIGINT;
epoch_duration_seconds BIGINT;
BEGIN
-- Note: We cannot use SET in the procedure declaration because we do transaction control
-- and we can _only_ use SET LOCAL in a procedure which _does_ transaction control
SET LOCAL search_path = pg_catalog, pg_temp;

SELECT EXTRACT (EPOCH FROM _prom_catalog.get_default_value('epoch_duration')::INTERVAL)::BIGINT INTO STRICT epoch_duration_seconds;
SELECT EXTRACT (EPOCH FROM run_at)::BIGINT INTO STRICT run_at_seconds;
Comment thread
sumerman marked this conversation as resolved.

-- If we marked a series for deletion `epoch_duration_seconds` ago, we can delete it now
deletion_epoch := run_at_seconds - epoch_duration_seconds;

-- First we advance the current epoch and delete epoch.
-- This corresponds to the PrepareDeleteTx label in our model.
CALL _prom_catalog._lock_and_set_epoch(run_at_seconds, deletion_epoch);

-- Release lock on ids_epoch
COMMIT;

-- reset search path after transaction end
SET LOCAL search_path = pg_catalog, pg_temp;

-- Now we recheck the delete conditions, and delete series.
-- This corresponds to the ActuallyDeleteTx and Resurrect in our model.
CALL _prom_catalog._actually_delete_series_and_labels(metric_schema, metric_table, metric_series_table, deletion_epoch);
COMMIT;
END
$func$
LANGUAGE PLPGSQL;

--redundant given schema settings but extra caution for security definers
REVOKE ALL ON PROCEDURE _prom_catalog.delete_expired_series(text, text, text, TIMESTAMPTZ) FROM PUBLIC;
GRANT EXECUTE ON PROCEDURE _prom_catalog.delete_expired_series(text, text, text, TIMESTAMPTZ) TO prom_maintenance;

CREATE OR REPLACE PROCEDURE _prom_catalog._lock_and_set_epoch(new_current_epoch BIGINT, new_delete_epoch BIGINT)
LANGUAGE sql
-- security definer to modify the `ids_epoch` table
SECURITY DEFINER
SET search_path = pg_catalog, pg_temp
AS $func$
SELECT * FROM _prom_catalog.ids_epoch FOR UPDATE;
UPDATE _prom_catalog.ids_epoch SET current_epoch = new_current_epoch, delete_epoch = new_delete_epoch;
$func$;
--redundant given schema settings but extra caution for security definers
REVOKE ALL ON PROCEDURE _prom_catalog._lock_and_set_epoch(BIGINT, BIGINT) FROM PUBLIC;
GRANT EXECUTE ON PROCEDURE _prom_catalog._lock_and_set_epoch(BIGINT, BIGINT) TO prom_maintenance;
COMMENT ON PROCEDURE _prom_catalog._lock_and_set_epoch(BIGINT, BIGINT) IS
'Internal utility function which only exists due to restrictions on plpgsql
transaction control and SECURITY DEFINER';

CREATE OR REPLACE PROCEDURE _prom_catalog._actually_delete_series_and_labels(metric_schema TEXT, metric_table TEXT, metric_series_table TEXT, deletion_epoch BIGINT)
LANGUAGE PLPGSQL
-- security definer to modify the `ids_epoch` table
SECURITY DEFINER
VOLATILE
SET search_path = pg_catalog, pg_temp
AS $func$
DECLARE
label_array int[];
next_epoch BIGINT;
deletion_epoch BIGINT;
epoch_duration INTERVAL;
BEGIN
next_epoch := present_epoch + 1;
-- technically we can delete any ID <= current_epoch - 1
-- but it's always safe to leave them around for a bit longer
deletion_epoch := present_epoch - 4;

EXECUTE format($query$
-- recheck that the series IDs we might delete are actually dead
WITH dead_series AS (
SELECT potential.id
FROM
(
SELECT id
FROM prom_data_series.%3$I
WHERE delete_epoch <= %4$L
) as potential
LEFT JOIN LATERAL (
SELECT 1
FROM %1$I.%2$I metric_data
WHERE metric_data.series_id = potential.id
LIMIT 1
) as lateral_exists(indicator) ON (TRUE)
WHERE indicator IS NULL
), deleted_series AS (
DELETE FROM prom_data_series.%3$I
WHERE delete_epoch <= %4$L
AND id IN (SELECT id FROM dead_series) -- concurrency means we need this qual in both
RETURNING id, labels
), resurrected_series AS (
UPDATE prom_data_series.%3$I
SET delete_epoch = NULL
-- recheck that the series IDs we might delete are actually dead
WITH dead_series AS (
SELECT potential.id
FROM
(
SELECT id
FROM prom_data_series.%3$I
WHERE delete_epoch <= %4$L
AND id NOT IN (SELECT id FROM dead_series) -- concurrency means we need this qual in both
)
SELECT ARRAY(SELECT DISTINCT unnest(labels) as label_id
FROM deleted_series)
) as potential
LEFT JOIN LATERAL (
SELECT 1
FROM %1$I.%2$I metric_data
WHERE metric_data.series_id = potential.id
LIMIT 1
) as lateral_exists(indicator) ON (TRUE)
WHERE indicator IS NULL
), deleted_series AS (
DELETE FROM prom_data_series.%3$I
WHERE delete_epoch <= %4$L
AND id IN (SELECT id FROM dead_series) -- concurrency means we need this qual in both
RETURNING id, labels
), resurrected_series AS (
UPDATE prom_data_series.%3$I
SET delete_epoch = NULL
WHERE delete_epoch <= %4$L
AND id NOT IN (SELECT id FROM dead_series) -- concurrency means we need this qual in both
)
SELECT ARRAY(SELECT DISTINCT unnest(labels) as label_id
FROM deleted_series)
$query$, metric_schema, metric_table, metric_series_table, deletion_epoch) INTO label_array;


-- Now we check if there are any labels which we can remove.
-- This is not reflected in our series cache model.
IF array_length(label_array, 1) > 0 THEN
--jit interacts poorly why the multi-partition query below
SET LOCAL jit = 'off';
Expand Down Expand Up @@ -2222,24 +2302,16 @@ BEGIN
DELETE FROM _prom_catalog.label
WHERE id IN (SELECT * FROM confirmed_drop_labels) AND key != '__name__';
$query$, metric_series_table) USING label_array;

SET LOCAL jit = DEFAULT;
END IF;

SELECT _prom_catalog.get_default_value('epoch_duration')::INTERVAL INTO STRICT epoch_duration;

IF ran_at > last_updated_epoch + epoch_duration THEN
-- we only want to increment the epoch every epoch_duration
UPDATE _prom_catalog.ids_epoch
SET (current_epoch, last_update_time) = (next_epoch, now())
WHERE current_epoch < next_epoch;
END IF;
END
$func$
LANGUAGE PLPGSQL;
$func$;
--redundant given schema settings but extra caution for security definers
REVOKE ALL ON FUNCTION _prom_catalog.delete_expired_series(text, text, text, timestamptz, BIGINT, timestamptz) FROM PUBLIC;
GRANT EXECUTE ON FUNCTION _prom_catalog.delete_expired_series(text, text, text, timestamptz, BIGINT, timestamptz) TO prom_maintenance;
REVOKE ALL ON PROCEDURE _prom_catalog._actually_delete_series_and_labels(TEXT, TEXT, TEXT, BIGINT) FROM PUBLIC;
GRANT EXECUTE ON PROCEDURE _prom_catalog._actually_delete_series_and_labels(TEXT, TEXT, TEXT, BIGINT) TO prom_maintenance;
COMMENT ON PROCEDURE _prom_catalog._actually_delete_series_and_labels(TEXT, TEXT, TEXT, BIGINT) IS
'Internal utility function which only exists due to restrictions on plpgsql
transaction control and SECURITY DEFINER';

CREATE OR REPLACE FUNCTION _prom_catalog.set_app_name(full_name text)
RETURNS VOID
Expand Down
15 changes: 5 additions & 10 deletions migration/idempotent/011-maintenance.sql
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,6 @@ DECLARE
metric_series_table NAME;
is_metric_view BOOLEAN;
time_dimension_id INT;
last_updated TIMESTAMPTZ;
present_epoch BIGINT;
lastT TIMESTAMPTZ;
startT TIMESTAMPTZ;
BEGIN
Expand Down Expand Up @@ -131,11 +129,10 @@ BEGIN
ORDER BY range_end DESC
LIMIT 1;
END IF;
-- end this txn so we're not holding any locks on the catalog

SELECT current_epoch, last_update_time INTO present_epoch, last_updated FROM
_prom_catalog.ids_epoch LIMIT 1;
PERFORM _prom_catalog.initialize_current_epoch(ran_at);
Comment thread
sumerman marked this conversation as resolved.
-- end this txn so we're not holding any locks on the catalog
COMMIT;

-- reset search path after transaction end
SET LOCAL search_path = pg_catalog, pg_temp;

Expand All @@ -144,7 +141,7 @@ BEGIN
-- we may still have old ones to delete
lastT := pg_catalog.clock_timestamp();
PERFORM _prom_catalog.set_app_name(pg_catalog.format('promscale maintenance: data retention: metric %s: delete expired series', metric_name));
PERFORM _prom_catalog.delete_expired_series(metric_schema, metric_table, metric_series_table, ran_at, present_epoch, last_updated);
CALL _prom_catalog.delete_expired_series(metric_schema, metric_table, metric_series_table, ran_at);
IF log_verbose THEN
RAISE LOG 'promscale maintenance: data retention: metric %: done deleting expired series as only action in %', metric_name, pg_catalog.clock_timestamp() OPERATOR(pg_catalog.-) lastT;
RAISE LOG 'promscale maintenance: data retention: metric %: finished in %', metric_name, pg_catalog.clock_timestamp() OPERATOR(pg_catalog.-) startT;
Expand All @@ -170,8 +167,6 @@ BEGIN
IF log_verbose THEN
RAISE LOG 'promscale maintenance: data retention: metric %: done dropping chunks in %', metric_name, pg_catalog.clock_timestamp() OPERATOR(pg_catalog.-) lastT;
END IF;
SELECT current_epoch, last_update_time INTO present_epoch, last_updated FROM
_prom_catalog.ids_epoch LIMIT 1;
COMMIT;
-- reset search path after transaction end
SET LOCAL search_path = pg_catalog, pg_temp;
Expand All @@ -180,7 +175,7 @@ BEGIN
-- transaction 4
lastT := pg_catalog.clock_timestamp();
PERFORM _prom_catalog.set_app_name(pg_catalog.format('promscale maintenance: data retention: metric %s: delete expired series', metric_name));
PERFORM _prom_catalog.delete_expired_series(metric_schema, metric_table, metric_series_table, ran_at, present_epoch, last_updated);
CALL _prom_catalog.delete_expired_series(metric_schema, metric_table, metric_series_table, ran_at);
IF log_verbose THEN
RAISE LOG 'promscale maintenance: data retention: metric %: done deleting expired series in %', metric_name, pg_catalog.clock_timestamp() OPERATOR(pg_catalog.-) lastT;
RAISE LOG 'promscale maintenance: data retention: metric %: finished in %', metric_name, pg_catalog.clock_timestamp() OPERATOR(pg_catalog.-) startT;
Expand Down
Loading