This repository was archived by the owner on Apr 2, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 19
Switch from logical to time-based epoch #512
Closed
Closed
Changes from all commits
Commits
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -474,7 +474,7 @@ BEGIN | |
| id bigint NOT NULL, | ||
| metric_id int NOT NULL, | ||
| labels prom_api.label_array NOT NULL, | ||
| delete_epoch BIGINT NULL DEFAULT NULL, | ||
| mark_for_deletion_epoch TIMESTAMPTZ NULL DEFAULT NULL, | ||
| CHECK(labels[1] = %2$L AND labels[1] IS NOT NULL), | ||
| CHECK(metric_id = %3$L), | ||
| CONSTRAINT series_labels_id_%3$s UNIQUE(labels) INCLUDE (id), | ||
|
|
@@ -488,7 +488,7 @@ BEGIN | |
| --these indexes are logically on all series tables but they cannot be defined on the parent due to | ||
| --dump/restore issues. | ||
| EXECUTE format('CREATE INDEX series_labels_%s ON prom_data_series.%I USING GIN (labels)', NEW.id, NEW.table_name); | ||
| EXECUTE format('CREATE INDEX series_delete_epoch_id_%s ON prom_data_series.%I (delete_epoch, id) WHERE delete_epoch IS NOT NULL', NEW.id, NEW.table_name); | ||
| EXECUTE format('CREATE INDEX series_mark_for_deletion_epoch_id_%s ON prom_data_series.%I (mark_for_deletion_epoch) INCLUDE (id) WHERE mark_for_deletion_epoch IS NOT NULL', NEW.id, NEW.table_name); | ||
|
|
||
| EXECUTE format('ALTER TABLE prom_data_series.%1$I OWNER TO prom_admin', NEW.table_name); | ||
| EXECUTE format('GRANT ALL PRIVILEGES ON TABLE prom_data_series.%I TO prom_admin', NEW.table_name); | ||
|
|
@@ -866,7 +866,7 @@ AS | |
| $$ | ||
| BEGIN | ||
| EXECUTE FORMAT( | ||
| 'UPDATE prom_data_series.%1$I SET delete_epoch = current_epoch+1 FROM _prom_catalog.ids_epoch WHERE delete_epoch IS NULL AND id = ANY($1)', | ||
| 'UPDATE prom_data_series.%1$I SET mark_for_deletion_epoch = current_epoch FROM _prom_catalog.global_epoch WHERE mark_for_deletion_epoch IS NULL AND id = ANY($1)', | ||
| metric_table | ||
| ) USING series_ids; | ||
| RETURN; | ||
|
|
@@ -1285,7 +1285,7 @@ AS $func$ | |
| BEGIN | ||
| EXECUTE FORMAT($query$ | ||
| UPDATE prom_data_series.%1$I | ||
| SET delete_epoch = NULL | ||
| SET mark_for_deletion_epoch = NULL | ||
| WHERE id = $1 | ||
| $query$, metric_table) using series_id; | ||
| END | ||
|
|
@@ -1322,7 +1322,7 @@ BEGIN | |
| ), existing AS ( | ||
| SELECT | ||
| id, | ||
| CASE WHEN delete_epoch IS NOT NULL THEN | ||
| CASE WHEN mark_for_deletion_epoch IS NOT NULL THEN | ||
| _prom_catalog.resurrect_series_ids(%1$L, id) | ||
| END | ||
| FROM prom_data_series.%1$I as series | ||
|
|
@@ -1367,7 +1367,7 @@ BEGIN | |
| ), existing AS ( | ||
| SELECT | ||
| id, | ||
| CASE WHEN delete_epoch IS NOT NULL THEN | ||
| CASE WHEN mark_for_deletion_epoch IS NOT NULL THEN | ||
| _prom_catalog.resurrect_series_ids(%1$L, id) | ||
| END | ||
| FROM prom_data_series.%1$I as series | ||
|
|
@@ -1397,7 +1397,7 @@ BEGIN | |
| WITH existing AS ( | ||
| SELECT | ||
| id, | ||
| CASE WHEN delete_epoch IS NOT NULL THEN | ||
| CASE WHEN mark_for_deletion_epoch IS NOT NULL THEN | ||
| _prom_catalog.resurrect_series_ids(%1$L, id) | ||
| END | ||
| FROM prom_data_series.%1$I as series | ||
|
|
@@ -1975,23 +1975,23 @@ COMMENT ON FUNCTION prom_api.reset_metric_compression_setting(TEXT) | |
| IS 'resets the compression setting for a specific metric to using the default'; | ||
| GRANT EXECUTE ON FUNCTION prom_api.reset_metric_compression_setting(TEXT) TO prom_admin; | ||
|
|
||
| CREATE OR REPLACE FUNCTION _prom_catalog.epoch_abort(user_epoch BIGINT) | ||
| CREATE OR REPLACE FUNCTION _prom_catalog.epoch_abort(user_epoch TIMESTAMPTZ) | ||
| RETURNS VOID | ||
| VOLATILE | ||
| SET search_path = pg_catalog, pg_temp | ||
| AS $func$ | ||
| DECLARE db_epoch BIGINT; | ||
| DECLARE db_delete_epoch TIMESTAMPTZ; | ||
| BEGIN | ||
| SELECT current_epoch FROM _prom_catalog.ids_epoch LIMIT 1 | ||
| INTO db_epoch; | ||
| RAISE EXCEPTION 'epoch % to old to continue INSERT, current: %', | ||
| user_epoch, db_epoch | ||
| SELECT delete_epoch FROM _prom_catalog.global_epoch LIMIT 1 | ||
| INTO db_delete_epoch; | ||
| RAISE EXCEPTION 'epoch % to old to continue INSERT, current DB delete epoch: %', | ||
| user_epoch, db_delete_epoch | ||
| USING ERRCODE='PS001'; | ||
| END; | ||
| $func$ LANGUAGE PLPGSQL; | ||
| COMMENT ON FUNCTION _prom_catalog.epoch_abort(BIGINT) | ||
| COMMENT ON FUNCTION _prom_catalog.epoch_abort(TIMESTAMPTZ) | ||
| IS 'ABORT an INSERT transaction due to the ID epoch being out of date'; | ||
| GRANT EXECUTE ON FUNCTION _prom_catalog.epoch_abort TO prom_writer; | ||
| GRANT EXECUTE ON FUNCTION _prom_catalog.epoch_abort(TIMESTAMPTZ) TO prom_writer; | ||
|
|
||
| -- Given a `metric_schema`, `metric_table`, and `series_table`, this function | ||
| -- returns all series ids in `potential_series_ids` which are not referenced by | ||
|
|
@@ -2113,9 +2113,9 @@ BEGIN | |
| SELECT _prom_catalog.get_confirmed_unused_series('%1$s','%2$s','%3$s', array_agg(series_id), %5$L) as ids | ||
| FROM potentially_drop_series | ||
| ) -- we want this next statement to be the last one in the txn since it could block series fetch (both of them update delete_epoch) | ||
| UPDATE prom_data_series.%3$I SET delete_epoch = current_epoch+1 | ||
| FROM _prom_catalog.ids_epoch | ||
| WHERE delete_epoch IS NULL | ||
| UPDATE prom_data_series.%3$I SET mark_for_deletion_epoch = current_epoch | ||
| FROM _prom_catalog.global_epoch | ||
| WHERE mark_for_deletion_epoch IS NULL | ||
| AND id IN (SELECT unnest(ids) FROM confirmed_drop_series) | ||
| $query$, metric_schema, metric_table, metric_series_table, drop_point, check_time); | ||
| END | ||
|
|
@@ -2137,7 +2137,7 @@ chunks, see `_prom_catalog.drop_metric_chunks`. | |
| '; | ||
|
|
||
| CREATE OR REPLACE FUNCTION _prom_catalog.delete_expired_series( | ||
| metric_schema TEXT, metric_table TEXT, metric_series_table TEXT, ran_at TIMESTAMPTZ, present_epoch BIGINT, last_updated_epoch TIMESTAMPTZ | ||
| metric_schema TEXT, metric_table TEXT, metric_series_table TEXT, ran_at TIMESTAMPTZ | ||
| ) | ||
| RETURNS VOID | ||
| --security definer to add jobs as the logged-in user | ||
|
|
@@ -2147,21 +2147,13 @@ CREATE OR REPLACE FUNCTION _prom_catalog.delete_expired_series( | |
| AS $func$ | ||
| DECLARE | ||
| label_array int[]; | ||
| next_epoch BIGINT; | ||
| deletion_epoch BIGINT; | ||
| max_deletion_time TIMESTAMPTZ; | ||
| deletion_time TIMESTAMPTZ; | ||
| epoch_duration INTERVAL; | ||
| BEGIN | ||
| next_epoch := present_epoch + 1; | ||
| -- technically we can delete any ID <= current_epoch - 1 | ||
| -- but it's always safe to leave them around for a bit longer | ||
| deletion_epoch := present_epoch - 4; | ||
|
|
||
| SELECT _prom_catalog.get_default_value('epoch_duration')::INTERVAL INTO STRICT epoch_duration; | ||
|
|
||
| -- we don't want to delete too soon | ||
| IF ran_at < last_updated_epoch + epoch_duration THEN | ||
| RETURN; | ||
| END IF; | ||
| deletion_time := ran_at - epoch_duration; | ||
|
|
||
| EXECUTE format($query$ | ||
| -- recheck that the series IDs we might delete are actually dead | ||
|
|
@@ -2171,7 +2163,7 @@ BEGIN | |
| ( | ||
| SELECT id | ||
| FROM prom_data_series.%3$I | ||
| WHERE delete_epoch <= %4$L | ||
| WHERE mark_for_deletion_epoch < %4$L | ||
| ) as potential | ||
| LEFT JOIN LATERAL ( | ||
| SELECT 1 | ||
|
|
@@ -2182,19 +2174,21 @@ BEGIN | |
| WHERE indicator IS NULL | ||
| ), deleted_series AS ( | ||
| DELETE FROM prom_data_series.%3$I | ||
| WHERE delete_epoch <= %4$L | ||
| WHERE mark_for_deletion_epoch < %4$L | ||
| AND id IN (SELECT id FROM dead_series) -- concurrency means we need this qual in both | ||
| RETURNING id, labels | ||
| RETURNING id, labels, mark_for_deletion_epoch | ||
| ), resurrected_series AS ( | ||
| UPDATE prom_data_series.%3$I | ||
| SET delete_epoch = NULL | ||
| WHERE delete_epoch <= %4$L | ||
| SET mark_for_deletion_epoch = NULL | ||
| WHERE mark_for_deletion_epoch < %4$L | ||
| AND id NOT IN (SELECT id FROM dead_series) -- concurrency means we need this qual in both | ||
| ) | ||
| SELECT ARRAY(SELECT DISTINCT unnest(labels) as label_id | ||
| FROM deleted_series) | ||
| $query$, metric_schema, metric_table, metric_series_table, deletion_epoch) INTO label_array; | ||
|
|
||
| SELECT | ||
| array_agg(DISTINCT labels.label) as label_id | ||
| , max(mark_for_deletion_epoch) as max_deletion_time | ||
| FROM deleted_series d, | ||
| LATERAL unnest(d.labels) as labels(label) | ||
| $query$, metric_schema, metric_table, metric_series_table, deletion_time) INTO label_array, max_deletion_time; | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Where did |
||
|
|
||
| IF array_length(label_array, 1) > 0 THEN | ||
| --jit interacts poorly why the multi-partition query below | ||
|
|
@@ -2233,16 +2227,15 @@ BEGIN | |
| SET LOCAL jit = DEFAULT; | ||
| END IF; | ||
|
|
||
| UPDATE _prom_catalog.ids_epoch | ||
| SET (current_epoch, last_update_time) = (next_epoch, now()) | ||
| WHERE current_epoch < next_epoch; | ||
| UPDATE _prom_catalog.global_epoch e | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I realize this isn't the final protocol implementation, nonetheless I want to point out that this update happens at a different point and with different locking in the proven protocol. |
||
| SET (current_epoch, delete_epoch) = (ran_at, COALESCE(max_deletion_time, (SELECT delete_epoch FROM _prom_catalog.global_epoch))); | ||
| RETURN; | ||
| END | ||
| $func$ | ||
| LANGUAGE PLPGSQL; | ||
| --redundant given schema settings but extra caution for security definers | ||
| REVOKE ALL ON FUNCTION _prom_catalog.delete_expired_series(text, text, text, timestamptz, BIGINT, timestamptz) FROM PUBLIC; | ||
| GRANT EXECUTE ON FUNCTION _prom_catalog.delete_expired_series(text, text, text, timestamptz, BIGINT, timestamptz) TO prom_maintenance; | ||
| REVOKE ALL ON FUNCTION _prom_catalog.delete_expired_series(text, text, text, timestamptz) FROM PUBLIC; | ||
| GRANT EXECUTE ON FUNCTION _prom_catalog.delete_expired_series(text, text, text, timestamptz) TO prom_maintenance; | ||
|
|
||
| CREATE OR REPLACE FUNCTION _prom_catalog.set_app_name(full_name text) | ||
| RETURNS VOID | ||
|
|
@@ -2491,7 +2484,7 @@ BEGIN | |
| %2$s | ||
| FROM | ||
| prom_data_series.%1$I AS series | ||
| WHERE delete_epoch IS NULL | ||
| WHERE mark_for_deletion_epoch IS NULL | ||
| $$, view_name, label_value_cols); | ||
|
|
||
| IF NOT view_exists THEN | ||
|
|
||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -78,6 +78,24 @@ GRANT EXECUTE ON FUNCTION _prom_catalog.drop_metric_chunk_data(text, text, times | |
| COMMENT ON FUNCTION _prom_catalog.drop_metric_chunk_data(text, text, timestamptz) | ||
| IS 'drop chunks from schema_name.metric_name containing data older than older_than.'; | ||
|
|
||
| CREATE OR REPLACE FUNCTION _prom_catalog.initialize_current_epoch() | ||
| RETURNS VOID | ||
| --security definer to have permissions on global_epoch table | ||
| SECURITY DEFINER | ||
| VOLATILE | ||
| SET search_path = pg_catalog, pg_temp | ||
| AS $func$ | ||
| UPDATE _prom_catalog.global_epoch e SET current_epoch = now() | ||
| WHERE e.current_epoch = 'epoch'::TIMESTAMPTZ; | ||
| $func$ | ||
| LANGUAGE SQL; | ||
| --redundant given schema settings but extra caution for security definers | ||
| REVOKE ALL ON FUNCTION _prom_catalog.initialize_current_epoch() FROM PUBLIC; | ||
| GRANT EXECUTE ON FUNCTION _prom_catalog.initialize_current_epoch() TO prom_maintenance; | ||
| COMMENT ON FUNCTION _prom_catalog.initialize_current_epoch() | ||
| IS 'The current_epoch field of _prom_catalog.global_epoch is initialized to a value in the past. | ||
| This must be correctly initialized to prevent spurious epoch aborts during ingestion.'; | ||
|
|
||
| --drop chunks from metrics tables and delete the appropriate series. | ||
| CREATE OR REPLACE PROCEDURE _prom_catalog.drop_metric_chunks( | ||
| schema_name TEXT, metric_name TEXT, older_than TIMESTAMPTZ, ran_at TIMESTAMPTZ = now(), log_verbose BOOLEAN = FALSE | ||
|
|
@@ -90,8 +108,8 @@ DECLARE | |
| metric_series_table NAME; | ||
| is_metric_view BOOLEAN; | ||
| time_dimension_id INT; | ||
| last_updated TIMESTAMPTZ; | ||
| present_epoch BIGINT; | ||
| present_epoch TIMESTAMPTZ; | ||
| delete_epoch TIMESTAMPTZ; | ||
| lastT TIMESTAMPTZ; | ||
| startT TIMESTAMPTZ; | ||
| BEGIN | ||
|
|
@@ -133,8 +151,10 @@ BEGIN | |
| END IF; | ||
| -- end this txn so we're not holding any locks on the catalog | ||
|
|
||
| SELECT current_epoch, last_update_time INTO present_epoch, last_updated FROM | ||
| _prom_catalog.ids_epoch LIMIT 1; | ||
| -- ensure that current_epoch is not set to the default initial value | ||
| PERFORM _prom_catalog.initialize_current_epoch(); | ||
| SELECT e.current_epoch, e.delete_epoch INTO present_epoch, delete_epoch FROM | ||
| _prom_catalog.global_epoch e LIMIT 1; | ||
| COMMIT; | ||
| -- reset search path after transaction end | ||
| SET LOCAL search_path = pg_catalog, pg_temp; | ||
|
|
@@ -144,7 +164,7 @@ BEGIN | |
| -- we may still have old ones to delete | ||
| lastT := pg_catalog.clock_timestamp(); | ||
| PERFORM _prom_catalog.set_app_name(pg_catalog.format('promscale maintenance: data retention: metric %s: delete expired series', metric_name)); | ||
| PERFORM _prom_catalog.delete_expired_series(metric_schema, metric_table, metric_series_table, ran_at, present_epoch, last_updated); | ||
| PERFORM _prom_catalog.delete_expired_series(metric_schema, metric_table, metric_series_table, ran_at); | ||
| IF log_verbose THEN | ||
| RAISE LOG 'promscale maintenance: data retention: metric %: done deleting expired series as only action in %', metric_name, pg_catalog.clock_timestamp() OPERATOR(pg_catalog.-) lastT; | ||
| RAISE LOG 'promscale maintenance: data retention: metric %: finished in %', metric_name, pg_catalog.clock_timestamp() OPERATOR(pg_catalog.-) startT; | ||
|
|
@@ -170,8 +190,8 @@ BEGIN | |
| IF log_verbose THEN | ||
| RAISE LOG 'promscale maintenance: data retention: metric %: done dropping chunks in %', metric_name, pg_catalog.clock_timestamp() OPERATOR(pg_catalog.-) lastT; | ||
| END IF; | ||
| SELECT current_epoch, last_update_time INTO present_epoch, last_updated FROM | ||
| _prom_catalog.ids_epoch LIMIT 1; | ||
| SELECT e.current_epoch, e.delete_epoch INTO present_epoch, delete_epoch FROM | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not sure this line is even necessary now, when 183 doesn't reference it. |
||
| _prom_catalog.global_epoch e LIMIT 1; | ||
| COMMIT; | ||
| -- reset search path after transaction end | ||
| SET LOCAL search_path = pg_catalog, pg_temp; | ||
|
|
@@ -180,7 +200,7 @@ BEGIN | |
| -- transaction 4 | ||
| lastT := pg_catalog.clock_timestamp(); | ||
| PERFORM _prom_catalog.set_app_name(pg_catalog.format('promscale maintenance: data retention: metric %s: delete expired series', metric_name)); | ||
| PERFORM _prom_catalog.delete_expired_series(metric_schema, metric_table, metric_series_table, ran_at, present_epoch, last_updated); | ||
| PERFORM _prom_catalog.delete_expired_series(metric_schema, metric_table, metric_series_table, ran_at); | ||
| IF log_verbose THEN | ||
| RAISE LOG 'promscale maintenance: data retention: metric %: done deleting expired series in %', metric_name, pg_catalog.clock_timestamp() OPERATOR(pg_catalog.-) lastT; | ||
| RAISE LOG 'promscale maintenance: data retention: metric %: finished in %', metric_name, pg_catalog.clock_timestamp() OPERATOR(pg_catalog.-) startT; | ||
|
|
||
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.