diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index 29b955d6..cc84d211 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -28,7 +28,7 @@ jobs: sudo mv sccache-*/sccache /usr/local/bin/sccache sccache --show-stats - - uses: Swatinem/rust-cache@v2 +# - uses: Swatinem/rust-cache@v2 - name: Install cargo-pgx run: | @@ -44,7 +44,7 @@ jobs: - name: Initialize pgx if: ${{ steps.cache-pgx.outputs.cache-hit != 'true' }} run: cargo pgx init --pg${{ matrix.postgres.version }} download - + - name: Run cargo test run: cargo pgx test pg${{ matrix.postgres.version }} diff --git a/docs/sql-api.md b/docs/sql-api.md index 489a7ab3..aa044cbf 100644 --- a/docs/sql-api.md +++ b/docs/sql-api.md @@ -492,6 +492,18 @@ the specified span_id. ``` function TABLE(trace_id trace_id, parent_span_id bigint, span_id bigint, dist integer, path bigint[]) **ps_trace.upstream_spans**(_trace_id trace_id, _span_id bigint, _max_dist integer DEFAULT NULL::integer) ``` +### _prom_catalog._actually_delete_series_and_labels +Internal utility function which only exists due to restrictions on plpgsql +transaction control and SECURITY DEFINER +``` +procedure void **_prom_catalog._actually_delete_series_and_labels**(IN metric_schema text, IN metric_table text, IN metric_series_table text, IN deletion_epoch bigint) +``` +### _prom_catalog._lock_and_set_epoch +Internal utility function which only exists due to restrictions on plpgsql +transaction control and SECURITY DEFINER +``` +procedure void **_prom_catalog._lock_and_set_epoch**(IN new_current_epoch bigint, IN new_delete_epoch bigint) +``` ### _prom_catalog.attach_series_partition ``` @@ -575,7 +587,7 @@ function void **_prom_catalog.delay_compression_job**(ht_table text, new_start t ### _prom_catalog.delete_expired_series ``` -function void **_prom_catalog.delete_expired_series**(metric_schema text, metric_table text, metric_series_table text, ran_at timestamp with time zone, present_epoch bigint, last_updated_epoch timestamp with time zone) +procedure void **_prom_catalog.delete_expired_series**(IN metric_schema text, IN metric_table text, IN metric_series_table text, IN run_at timestamp with time zone) ``` ### _prom_catalog.delete_series_catalog_row @@ -832,6 +844,14 @@ function TABLE(hypertable_name text, node_name text, node_up boolean) **_prom_ca ``` function TABLE(hypertable_name text, table_bytes bigint, index_bytes bigint, toast_bytes bigint, total_bytes bigint) **_prom_catalog.hypertable_remote_size**(schema_name_in text) ``` +### _prom_catalog.initialize_current_epoch +This function can be used to initialize the current epoch to the epoch value +of `ran_at`. The value will only be adjusted if `current_epoch` is 0, which is +the default value set by schema migrations. +This is necessary to prevent spurious epoch aborts during ingestion. +``` +function bigint **_prom_catalog.initialize_current_epoch**(ran_at timestamp with time zone) +``` ### _prom_catalog.insert_exemplar_row ``` diff --git a/migration/idempotent/001-base.sql b/migration/idempotent/001-base.sql index 8449e447..e722d351 100644 --- a/migration/idempotent/001-base.sql +++ b/migration/idempotent/001-base.sql @@ -864,11 +864,14 @@ CREATE OR REPLACE FUNCTION _prom_catalog.delete_series_catalog_row( SET search_path = pg_catalog, pg_temp AS $$ +DECLARE + locally_observed_epoch BIGINT; BEGIN + SELECT current_epoch FROM _prom_catalog.ids_epoch FOR UPDATE INTO STRICT locally_observed_epoch; EXECUTE FORMAT( - 'UPDATE prom_data_series.%1$I SET delete_epoch = current_epoch+1 FROM _prom_catalog.ids_epoch WHERE delete_epoch IS NULL AND id = ANY($1)', + 'UPDATE prom_data_series.%1$I s SET delete_epoch = $1 WHERE s.delete_epoch IS NULL AND id = ANY($2)', metric_table - ) USING series_ids; + ) USING locally_observed_epoch, series_ids; RETURN; END; $$ @@ -1980,12 +1983,12 @@ CREATE OR REPLACE FUNCTION _prom_catalog.epoch_abort(user_epoch BIGINT) VOLATILE SET search_path = pg_catalog, pg_temp AS $func$ -DECLARE db_epoch BIGINT; +DECLARE current_delete_epoch BIGINT; BEGIN - SELECT current_epoch FROM _prom_catalog.ids_epoch LIMIT 1 - INTO db_epoch; - RAISE EXCEPTION 'epoch % to old to continue INSERT, current: %', - user_epoch, db_epoch + SELECT delete_epoch FROM _prom_catalog.ids_epoch LIMIT 1 + INTO current_delete_epoch; + RAISE EXCEPTION 'epoch % older than oldest delete epoch %, unable to INSERT', + user_epoch, current_delete_epoch USING ERRCODE='PS001'; END; $func$ LANGUAGE PLPGSQL; @@ -2058,6 +2061,34 @@ data newer than `newer_than` in any metric table. Note: See _prom_catalog.mark_series_to_be_dropped_as_unused for context. '; +CREATE OR REPLACE FUNCTION _prom_catalog.initialize_current_epoch(ran_at TIMESTAMPTZ) + RETURNS BIGINT + --security definer to have permissions on ids_epoch table + SECURITY DEFINER + VOLATILE + SET search_path = pg_catalog, pg_temp +AS $func$ + DECLARE + epoch BIGINT; + BEGIN + epoch := EXTRACT (EPOCH FROM ran_at); + UPDATE _prom_catalog.ids_epoch e SET current_epoch = epoch WHERE e.current_epoch = 0; + RETURN (SELECT current_epoch FROM _prom_catalog.ids_epoch); + END +$func$ + LANGUAGE plpgsql; +--redundant given schema settings but extra caution for security definers +REVOKE ALL ON FUNCTION _prom_catalog.initialize_current_epoch(TIMESTAMPTZ) FROM PUBLIC; +GRANT EXECUTE ON FUNCTION _prom_catalog.initialize_current_epoch(TIMESTAMPTZ) TO prom_writer; +GRANT EXECUTE ON FUNCTION _prom_catalog.initialize_current_epoch(TIMESTAMPTZ) TO prom_maintenance; +COMMENT ON FUNCTION _prom_catalog.initialize_current_epoch(TIMESTAMPTZ) + IS +'This function can be used to initialize the current epoch to the epoch value +of `ran_at`. The value will only be adjusted if `current_epoch` is 0, which is +the default value set by schema migrations. +This is necessary to prevent spurious epoch aborts during ingestion.'; + + -- Marks series which we will drop soon as unused. -- A series is unused if there is no data newer than `drop_point` which -- references that series. @@ -2076,6 +2107,7 @@ CREATE OR REPLACE FUNCTION _prom_catalog.mark_series_to_be_dropped_as_unused( AS $func$ DECLARE check_time TIMESTAMPTZ; + locally_observed_epoch BIGINT; BEGIN -- We determine if a series is unused by "looking back" over data older @@ -2099,6 +2131,8 @@ BEGIN SELECT drop_point OPERATOR(pg_catalog.+) pg_catalog.interval '1 hour' INTO check_time; + SELECT current_epoch FROM _prom_catalog.ids_epoch FOR UPDATE INTO STRICT locally_observed_epoch; + EXECUTE format( $query$ WITH potentially_drop_series AS ( @@ -2113,11 +2147,10 @@ BEGIN SELECT _prom_catalog.get_confirmed_unused_series('%1$s','%2$s','%3$s', array_agg(series_id), %5$L) as ids FROM potentially_drop_series ) -- we want this next statement to be the last one in the txn since it could block series fetch (both of them update delete_epoch) - UPDATE prom_data_series.%3$I SET delete_epoch = current_epoch+1 - FROM _prom_catalog.ids_epoch - WHERE delete_epoch IS NULL - AND id IN (SELECT unnest(ids) FROM confirmed_drop_series) - $query$, metric_schema, metric_table, metric_series_table, drop_point, check_time); + UPDATE prom_data_series.%3$I s SET delete_epoch = %6$L + WHERE s.delete_epoch IS NULL + AND s.id IN (SELECT unnest(ids) FROM confirmed_drop_series) + $query$, metric_schema, metric_table, metric_series_table, drop_point, check_time, locally_observed_epoch); END $func$ LANGUAGE PLPGSQL; @@ -2136,59 +2169,106 @@ This function is designed to be used in the context of dropping metric chunks, see `_prom_catalog.drop_metric_chunks`. '; -CREATE OR REPLACE FUNCTION _prom_catalog.delete_expired_series( - metric_schema TEXT, metric_table TEXT, metric_series_table TEXT, ran_at TIMESTAMPTZ, present_epoch BIGINT, last_updated_epoch TIMESTAMPTZ +CREATE OR REPLACE PROCEDURE _prom_catalog.delete_expired_series( + metric_schema TEXT, metric_table TEXT, metric_series_table TEXT, run_at TIMESTAMPTZ ) - RETURNS VOID - --security definer to add jobs as the logged-in user +AS $func$ +DECLARE + deletion_epoch BIGINT; + run_at_seconds BIGINT; + epoch_duration_seconds BIGINT; +BEGIN + -- Note: We cannot use SET in the procedure declaration because we do transaction control + -- and we can _only_ use SET LOCAL in a procedure which _does_ transaction control + SET LOCAL search_path = pg_catalog, pg_temp; + + SELECT EXTRACT (EPOCH FROM _prom_catalog.get_default_value('epoch_duration')::INTERVAL)::BIGINT INTO STRICT epoch_duration_seconds; + SELECT EXTRACT (EPOCH FROM run_at)::BIGINT INTO STRICT run_at_seconds; + + -- If we marked a series for deletion `epoch_duration_seconds` ago, we can delete it now + deletion_epoch := run_at_seconds - epoch_duration_seconds; + + -- First we advance the current epoch and delete epoch. + -- This corresponds to the PrepareDeleteTx label in our model. + CALL _prom_catalog._lock_and_set_epoch(run_at_seconds, deletion_epoch); + + -- Release lock on ids_epoch + COMMIT; + + -- reset search path after transaction end + SET LOCAL search_path = pg_catalog, pg_temp; + + -- Now we recheck the delete conditions, and delete series. + -- This corresponds to the ActuallyDeleteTx and Resurrect in our model. + CALL _prom_catalog._actually_delete_series_and_labels(metric_schema, metric_table, metric_series_table, deletion_epoch); + COMMIT; +END +$func$ +LANGUAGE PLPGSQL; + +--redundant given schema settings but extra caution for security definers +REVOKE ALL ON PROCEDURE _prom_catalog.delete_expired_series(text, text, text, TIMESTAMPTZ) FROM PUBLIC; +GRANT EXECUTE ON PROCEDURE _prom_catalog.delete_expired_series(text, text, text, TIMESTAMPTZ) TO prom_maintenance; + +CREATE OR REPLACE PROCEDURE _prom_catalog._lock_and_set_epoch(new_current_epoch BIGINT, new_delete_epoch BIGINT) + LANGUAGE sql + -- security definer to modify the `ids_epoch` table + SECURITY DEFINER + SET search_path = pg_catalog, pg_temp +AS $func$ + SELECT * FROM _prom_catalog.ids_epoch FOR UPDATE; + UPDATE _prom_catalog.ids_epoch SET current_epoch = new_current_epoch, delete_epoch = new_delete_epoch; +$func$; +--redundant given schema settings but extra caution for security definers +REVOKE ALL ON PROCEDURE _prom_catalog._lock_and_set_epoch(BIGINT, BIGINT) FROM PUBLIC; +GRANT EXECUTE ON PROCEDURE _prom_catalog._lock_and_set_epoch(BIGINT, BIGINT) TO prom_maintenance; +COMMENT ON PROCEDURE _prom_catalog._lock_and_set_epoch(BIGINT, BIGINT) IS +'Internal utility function which only exists due to restrictions on plpgsql +transaction control and SECURITY DEFINER'; + +CREATE OR REPLACE PROCEDURE _prom_catalog._actually_delete_series_and_labels(metric_schema TEXT, metric_table TEXT, metric_series_table TEXT, deletion_epoch BIGINT) + LANGUAGE PLPGSQL + -- security definer to modify the `ids_epoch` table SECURITY DEFINER - VOLATILE SET search_path = pg_catalog, pg_temp AS $func$ DECLARE label_array int[]; - next_epoch BIGINT; - deletion_epoch BIGINT; - epoch_duration INTERVAL; BEGIN - next_epoch := present_epoch + 1; - -- technically we can delete any ID <= current_epoch - 1 - -- but it's always safe to leave them around for a bit longer - deletion_epoch := present_epoch - 4; - EXECUTE format($query$ - -- recheck that the series IDs we might delete are actually dead - WITH dead_series AS ( - SELECT potential.id - FROM - ( - SELECT id - FROM prom_data_series.%3$I - WHERE delete_epoch <= %4$L - ) as potential - LEFT JOIN LATERAL ( - SELECT 1 - FROM %1$I.%2$I metric_data - WHERE metric_data.series_id = potential.id - LIMIT 1 - ) as lateral_exists(indicator) ON (TRUE) - WHERE indicator IS NULL - ), deleted_series AS ( - DELETE FROM prom_data_series.%3$I - WHERE delete_epoch <= %4$L - AND id IN (SELECT id FROM dead_series) -- concurrency means we need this qual in both - RETURNING id, labels - ), resurrected_series AS ( - UPDATE prom_data_series.%3$I - SET delete_epoch = NULL + -- recheck that the series IDs we might delete are actually dead + WITH dead_series AS ( + SELECT potential.id + FROM + ( + SELECT id + FROM prom_data_series.%3$I WHERE delete_epoch <= %4$L - AND id NOT IN (SELECT id FROM dead_series) -- concurrency means we need this qual in both - ) - SELECT ARRAY(SELECT DISTINCT unnest(labels) as label_id - FROM deleted_series) + ) as potential + LEFT JOIN LATERAL ( + SELECT 1 + FROM %1$I.%2$I metric_data + WHERE metric_data.series_id = potential.id + LIMIT 1 + ) as lateral_exists(indicator) ON (TRUE) + WHERE indicator IS NULL + ), deleted_series AS ( + DELETE FROM prom_data_series.%3$I + WHERE delete_epoch <= %4$L + AND id IN (SELECT id FROM dead_series) -- concurrency means we need this qual in both + RETURNING id, labels + ), resurrected_series AS ( + UPDATE prom_data_series.%3$I + SET delete_epoch = NULL + WHERE delete_epoch <= %4$L + AND id NOT IN (SELECT id FROM dead_series) -- concurrency means we need this qual in both + ) + SELECT ARRAY(SELECT DISTINCT unnest(labels) as label_id + FROM deleted_series) $query$, metric_schema, metric_table, metric_series_table, deletion_epoch) INTO label_array; - + -- Now we check if there are any labels which we can remove. + -- This is not reflected in our series cache model. IF array_length(label_array, 1) > 0 THEN --jit interacts poorly why the multi-partition query below SET LOCAL jit = 'off'; @@ -2222,24 +2302,16 @@ BEGIN DELETE FROM _prom_catalog.label WHERE id IN (SELECT * FROM confirmed_drop_labels) AND key != '__name__'; $query$, metric_series_table) USING label_array; - SET LOCAL jit = DEFAULT; END IF; - - SELECT _prom_catalog.get_default_value('epoch_duration')::INTERVAL INTO STRICT epoch_duration; - - IF ran_at > last_updated_epoch + epoch_duration THEN - -- we only want to increment the epoch every epoch_duration - UPDATE _prom_catalog.ids_epoch - SET (current_epoch, last_update_time) = (next_epoch, now()) - WHERE current_epoch < next_epoch; - END IF; END -$func$ -LANGUAGE PLPGSQL; +$func$; --redundant given schema settings but extra caution for security definers -REVOKE ALL ON FUNCTION _prom_catalog.delete_expired_series(text, text, text, timestamptz, BIGINT, timestamptz) FROM PUBLIC; -GRANT EXECUTE ON FUNCTION _prom_catalog.delete_expired_series(text, text, text, timestamptz, BIGINT, timestamptz) TO prom_maintenance; +REVOKE ALL ON PROCEDURE _prom_catalog._actually_delete_series_and_labels(TEXT, TEXT, TEXT, BIGINT) FROM PUBLIC; +GRANT EXECUTE ON PROCEDURE _prom_catalog._actually_delete_series_and_labels(TEXT, TEXT, TEXT, BIGINT) TO prom_maintenance; +COMMENT ON PROCEDURE _prom_catalog._actually_delete_series_and_labels(TEXT, TEXT, TEXT, BIGINT) IS +'Internal utility function which only exists due to restrictions on plpgsql +transaction control and SECURITY DEFINER'; CREATE OR REPLACE FUNCTION _prom_catalog.set_app_name(full_name text) RETURNS VOID diff --git a/migration/idempotent/011-maintenance.sql b/migration/idempotent/011-maintenance.sql index f20105b9..5b54acb4 100644 --- a/migration/idempotent/011-maintenance.sql +++ b/migration/idempotent/011-maintenance.sql @@ -90,8 +90,6 @@ DECLARE metric_series_table NAME; is_metric_view BOOLEAN; time_dimension_id INT; - last_updated TIMESTAMPTZ; - present_epoch BIGINT; lastT TIMESTAMPTZ; startT TIMESTAMPTZ; BEGIN @@ -131,11 +129,10 @@ BEGIN ORDER BY range_end DESC LIMIT 1; END IF; - -- end this txn so we're not holding any locks on the catalog - - SELECT current_epoch, last_update_time INTO present_epoch, last_updated FROM - _prom_catalog.ids_epoch LIMIT 1; + PERFORM _prom_catalog.initialize_current_epoch(ran_at); + -- end this txn so we're not holding any locks on the catalog COMMIT; + -- reset search path after transaction end SET LOCAL search_path = pg_catalog, pg_temp; @@ -144,7 +141,7 @@ BEGIN -- we may still have old ones to delete lastT := pg_catalog.clock_timestamp(); PERFORM _prom_catalog.set_app_name(pg_catalog.format('promscale maintenance: data retention: metric %s: delete expired series', metric_name)); - PERFORM _prom_catalog.delete_expired_series(metric_schema, metric_table, metric_series_table, ran_at, present_epoch, last_updated); + CALL _prom_catalog.delete_expired_series(metric_schema, metric_table, metric_series_table, ran_at); IF log_verbose THEN RAISE LOG 'promscale maintenance: data retention: metric %: done deleting expired series as only action in %', metric_name, pg_catalog.clock_timestamp() OPERATOR(pg_catalog.-) lastT; RAISE LOG 'promscale maintenance: data retention: metric %: finished in %', metric_name, pg_catalog.clock_timestamp() OPERATOR(pg_catalog.-) startT; @@ -170,8 +167,6 @@ BEGIN IF log_verbose THEN RAISE LOG 'promscale maintenance: data retention: metric %: done dropping chunks in %', metric_name, pg_catalog.clock_timestamp() OPERATOR(pg_catalog.-) lastT; END IF; - SELECT current_epoch, last_update_time INTO present_epoch, last_updated FROM - _prom_catalog.ids_epoch LIMIT 1; COMMIT; -- reset search path after transaction end SET LOCAL search_path = pg_catalog, pg_temp; @@ -180,7 +175,7 @@ BEGIN -- transaction 4 lastT := pg_catalog.clock_timestamp(); PERFORM _prom_catalog.set_app_name(pg_catalog.format('promscale maintenance: data retention: metric %s: delete expired series', metric_name)); - PERFORM _prom_catalog.delete_expired_series(metric_schema, metric_table, metric_series_table, ran_at, present_epoch, last_updated); + CALL _prom_catalog.delete_expired_series(metric_schema, metric_table, metric_series_table, ran_at); IF log_verbose THEN RAISE LOG 'promscale maintenance: data retention: metric %: done deleting expired series in %', metric_name, pg_catalog.clock_timestamp() OPERATOR(pg_catalog.-) lastT; RAISE LOG 'promscale maintenance: data retention: metric %: finished in %', metric_name, pg_catalog.clock_timestamp() OPERATOR(pg_catalog.-) startT; diff --git a/migration/incremental/035-logical-to-time-based-epoch.sql b/migration/incremental/035-logical-to-time-based-epoch.sql new file mode 100644 index 00000000..3404f862 --- /dev/null +++ b/migration/incremental/035-logical-to-time-based-epoch.sql @@ -0,0 +1,14 @@ +ALTER TABLE _prom_catalog.ids_epoch ADD COLUMN + delete_epoch BIGINT NULL DEFAULT NULL; + +-- We will reinitialize the value of `current_epoch` anyway, so it's easier to +-- start from a common base. +UPDATE _prom_catalog.ids_epoch SET current_epoch = 0 WHERE current_epoch <> 0; + +-- We're changing the signatures of these functions, so they must be dropped. +DROP FUNCTION IF EXISTS _prom_catalog.mark_series_to_be_dropped_as_unused(TEXT, TEXT, TEXT, TIMESTAMPTZ); +DROP PROCEDURE IF EXISTS _prom_catalog.drop_metric_chunks(TEXT, TEXT, TIMESTAMPTZ, TIMESTAMPTZ, BOOLEAN); + +-- We're replacing this function with a procedure, so it must be dropped. +DROP FUNCTION IF EXISTS _prom_catalog.delete_expired_series(TEXT, TEXT, TEXT, TIMESTAMPTZ, BIGINT, TIMESTAMPTZ); + diff --git a/sql-tests/testdata/drop_metric.sql b/sql-tests/testdata/drop_metric.sql index 91228845..c24d848b 100644 --- a/sql-tests/testdata/drop_metric.sql +++ b/sql-tests/testdata/drop_metric.sql @@ -2,7 +2,7 @@ \set QUIET 1 \i 'testdata/scripts/pgtap-1.2.0.sql' -SELECT * FROM plan(49); +SELECT * FROM plan(45); -- -- Moved from TestSQLDropMetricChunk -- @@ -16,11 +16,10 @@ BEGIN -- Avoid randomness in chunk interval size by setting explicitly. PERFORM _prom_catalog.get_or_create_metric_table_name('test'); PERFORM public.set_chunk_time_interval('prom_data.test', interval '8 hours'); - -- Set 1h epoch duration to prevent changing defaults from affecting this test's outcome. - PERFORM _prom_catalog.set_default_value('epoch_duration', (interval '1 hour')::text); + -- Set 4h epoch duration to prevent changing defaults from affecting this test's outcome. + PERFORM _prom_catalog.set_default_value('epoch_duration', (interval '4 hour')::text); - - -- this series (s1) will be deleted along with it's label + -- this series (s1) will be deleted along with its label SELECT f.series_id FROM _prom_catalog.get_or_create_series_id_for_kv_array('test', ARRAY['__name__', 'name1'], ARRAY['test', 'value1']) f INTO STRICT s1_series_id; @@ -33,7 +32,7 @@ BEGIN INSERT INTO prom_data.test(time,value,series_id) VALUES - -- this will be dropped immediately (notice it's one second before the midnight) + -- this will be dropped immediately (notice it's one second before midnight) ('2009-11-10 23:59:59.999+00',0.1,s1_series_id), -- same as the above ('2009-11-10 23:59:59.999+00',0.1,s3_series_id), @@ -43,7 +42,7 @@ BEGIN ('2009-11-11 05:00:00+00', 0.1,s3_series_id); PERFORM - CASE current_epoch > 0::BIGINT + 1 WHEN true THEN _prom_catalog.epoch_abort(0) + CASE current_epoch > 0::BIGINT WHEN true THEN _prom_catalog.epoch_abort(0) END FROM _prom_catalog.ids_epoch LIMIT 1; @@ -75,11 +74,10 @@ $fnc$; -- The first attempt to drop the chunks. CALL _prom_catalog.drop_metric_chunks('prom_data', 'test', E'2009-11-11 00:00:05+00'); SELECT asserts_before_deletion('after the first timestamp'); --- Attempting to drop chunks while incrementally moving `run_at` by an hour --- reruns shouldn't change anything until the epoch advances beyond current_epoch + 4 --- --- And current_epoch advances every time ran_at advances for the length of an epoch --- duration. Which we set to be 1h at the beginning of this test. + +-- Calling drop_metric_chunks shouldn't result in deletion from the series +-- table until `epoch_duration` time has passed. We set this to 4h at the +-- beginning of this test. CALL _prom_catalog.drop_metric_chunks('prom_data', 'test', E'2009-11-11 00:00:05+00'); SELECT asserts_before_deletion('after iter 0'); CALL _prom_catalog.drop_metric_chunks('prom_data', 'test', E'2009-11-11 00:00:05+00', now() + '1 hours'); @@ -88,8 +86,6 @@ CALL _prom_catalog.drop_metric_chunks('prom_data', 'test', E'2009-11-11 00:00:05 SELECT asserts_before_deletion('after iter 2'); CALL _prom_catalog.drop_metric_chunks('prom_data', 'test', E'2009-11-11 00:00:05+00', now() + '3 hours'); SELECT asserts_before_deletion('after iter 3'); -CALL _prom_catalog.drop_metric_chunks('prom_data', 'test', E'2009-11-11 00:00:05+00', now() + '4 hours'); -SELECT asserts_before_deletion('after iter 4'); CREATE FUNCTION asserts_after_deletion(msg TEXT) @@ -105,27 +101,27 @@ BEGIN END; $fnc$; --- Now current_epoch advanced far enough and it is the time to actually drop the unused series +-- When `epoch_duration` time has passed, we drop the unused series. +CALL _prom_catalog.drop_metric_chunks('prom_data', 'test', E'2009-11-11 00:00:05+00', now() + '4 hours'); +SELECT asserts_after_deletion('after iter 4'); CALL _prom_catalog.drop_metric_chunks('prom_data', 'test', E'2009-11-11 00:00:05+00', now() + '5 hours'); -SELECT asserts_after_deletion('after iter 5'); +SELECT asserts_after_deletion('after iter 5'); CALL _prom_catalog.drop_metric_chunks('prom_data', 'test', E'2009-11-11 00:00:05+00', now() + '6 hours'); -SELECT asserts_after_deletion('after iter 6'); +SELECT asserts_after_deletion('after iter 6'); CALL _prom_catalog.drop_metric_chunks('prom_data', 'test', E'2009-11-11 00:00:05+00', now() + '7 hours'); -SELECT asserts_after_deletion('after iter 7'); +SELECT asserts_after_deletion('after iter 7'); CALL _prom_catalog.drop_metric_chunks('prom_data', 'test', E'2009-11-11 00:00:05+00', now() + '8 hours'); -SELECT asserts_after_deletion('after iter 8'); -CALL _prom_catalog.drop_metric_chunks('prom_data', 'test', E'2009-11-11 00:00:05+00', now() + '9 hours'); SELECT asserts_after_deletion('after all iterations'); SELECT throws_like( 'SELECT - CASE current_epoch > 0::BIGINT + 1 WHEN true THEN _prom_catalog.epoch_abort(0) + CASE current_epoch > 1 WHEN true THEN _prom_catalog.epoch_abort(0) END FROM _prom_catalog.ids_epoch LIMIT 1;', - 'epoch 0 to old to continue INSERT, current: %', + format('epoch 0 older than oldest delete epoch %s, unable to INSERT', delete_epoch), 'Epoch has changed after a series was dropped' - ); + ) FROM _prom_catalog.ids_epoch; -- The end SELECT * FROM finish(true); \ No newline at end of file diff --git a/sql-tests/testdata/initialize-current-epoch.sql b/sql-tests/testdata/initialize-current-epoch.sql new file mode 100644 index 00000000..4101e44b --- /dev/null +++ b/sql-tests/testdata/initialize-current-epoch.sql @@ -0,0 +1,29 @@ +\unset ECHO +\set QUIET 1 +\i 'testdata/scripts/pgtap-1.2.0.sql' + +SELECT * FROM plan(5); + +CREATE FUNCTION test_initialize_current_epoch() + RETURNS SETOF TEXT + LANGUAGE plpgsql VOLATILE AS +$fnc$ +DECLARE + now_ts TIMESTAMPTZ; + now_epoch_s BIGINT; +BEGIN + now_ts := now(); + now_epoch_s := EXTRACT (EPOCH FROM now_ts); + RETURN NEXT is(current_epoch, 0::BIGINT) FROM _prom_catalog.ids_epoch; + RETURN NEXT is(_prom_catalog.initialize_current_epoch(now_ts), now_epoch_s); + RETURN NEXT is(current_epoch, now_epoch_s) FROM _prom_catalog.ids_epoch; + RETURN NEXT is(_prom_catalog.initialize_current_epoch(now_ts + '1 minute'::interval), now_epoch_s); + RETURN NEXT is(current_epoch, now_epoch_s) FROM _prom_catalog.ids_epoch; + RETURN; +END; +$fnc$; + +SELECT test_initialize_current_epoch(); + +-- The end +SELECT * FROM finish(true); \ No newline at end of file