diff --git a/CHANGELOG.md b/CHANGELOG.md index 5ae48ebc..f8eb1770 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -28,6 +28,7 @@ We use the following categories for changes: ### Added - Telemetry for active series and last updated [#534] +- Add support for merge chunks during compression [#583] ### Fixed - Column conflict when creating a metric view with a label called `series` diff --git a/migration/idempotent/001-base.sql b/migration/idempotent/001-base.sql index b2b82b1b..bf65e73c 100644 --- a/migration/idempotent/001-base.sql +++ b/migration/idempotent/001-base.sql @@ -6,7 +6,8 @@ SELECT * FROM ( VALUES - ('chunk_interval' , (INTERVAL '8 hours')::text), + ('chunk_interval' , (INTERVAL '1 hour')::text), + ('compress_chunk_interval' , (INTERVAL '24 hours')::text), ('retention_period' , (90 * INTERVAL '1 day')::text), ('metric_compression' , (exists(select 1 from pg_catalog.pg_proc where proname = 'compress_chunk')::text)), ('trace_retention_period' , (30 * INTERVAL '1 days')::text), @@ -105,6 +106,40 @@ $func$ LANGUAGE SQL STABLE PARALLEL SAFE; GRANT EXECUTE ON FUNCTION _prom_catalog.get_default_chunk_interval() TO prom_reader; +CREATE OR REPLACE FUNCTION _prom_catalog.get_default_compress_chunk_interval() + RETURNS INTERVAL + SET search_path = pg_catalog, pg_temp +AS $func$ + SELECT _prom_catalog.get_default_value('compress_chunk_interval')::pg_catalog.interval; +$func$ +LANGUAGE SQL STABLE PARALLEL SAFE; +GRANT EXECUTE ON FUNCTION _prom_catalog.get_default_compress_chunk_interval() TO prom_reader; + +-- Calculate compress chunk interval based on chunk interval +-- This is necessary due to staggered chunk intervals we are using in the system +CREATE OR REPLACE FUNCTION _prom_catalog.calculate_compress_chunk_interval(chunk_interval INTERVAL, compress_chunk_interval INTERVAL) + RETURNS INTERVAL + SET search_path = pg_catalog, pg_temp +AS $func$ + SELECT FLOOR(EXTRACT(EPOCH FROM compress_chunk_interval)/EXTRACT(EPOCH FROM chunk_interval)) * chunk_interval +$func$ +LANGUAGE SQL STABLE PARALLEL SAFE; +GRANT EXECUTE ON FUNCTION _prom_catalog.calculate_compress_chunk_interval(INTERVAL, INTERVAL) TO prom_reader; + +-- Get chunk interval from metric hypertable +CREATE OR REPLACE FUNCTION _prom_catalog.get_chunk_interval(schema_name NAME, table_name NAME) + RETURNS INTERVAL + SET search_path = pg_catalog, pg_temp +AS $func$ + SELECT time_interval + FROM timescaledb_information.dimensions d + WHERE d.hypertable_schema = get_chunk_interval.schema_name + AND d.hypertable_name = get_chunk_interval.table_name + AND d.column_name = 'time'; +$func$ +LANGUAGE SQL STABLE PARALLEL SAFE; +GRANT EXECUTE ON FUNCTION _prom_catalog.get_chunk_interval(NAME, NAME) TO prom_reader; + CREATE OR REPLACE FUNCTION _prom_catalog.get_timescale_major_version() RETURNS INT SET search_path = pg_catalog, pg_temp @@ -123,6 +158,18 @@ $func$ LANGUAGE SQL STABLE PARALLEL SAFE; GRANT EXECUTE ON FUNCTION _prom_catalog.get_timescale_minor_version() TO prom_reader; +-- Merge chunks functionality is only supported from TSDB version 2.9 and up +CREATE OR REPLACE FUNCTION _prom_catalog.has_merge_chunks_support() + RETURNS BOOLEAN + SET search_path = pg_catalog, pg_temp +AS $func$ + SELECT _prom_catalog.get_timescale_major_version() > 2 + OR (_prom_catalog.get_timescale_major_version() = 2 + AND _prom_catalog.get_timescale_minor_version() >= 9); +$func$ +LANGUAGE SQL STABLE PARALLEL SAFE; +GRANT EXECUTE ON FUNCTION _prom_catalog.get_timescale_major_version() TO prom_reader; + CREATE OR REPLACE FUNCTION _prom_catalog.get_default_retention_period() RETURNS INTERVAL SET search_path = pg_catalog, pg_temp @@ -463,6 +510,9 @@ BEGIN IF _prom_catalog.is_timescaledb_installed() AND _prom_catalog.get_default_compression_setting() THEN PERFORM prom_api.set_compression_on_metric_table(NEW.table_name, TRUE); + IF _prom_catalog.has_merge_chunks_support() THEN + PERFORM prom_api.set_metric_compress_chunk_interval(new.metric_name, prom_api.get_metric_compress_chunk_interval(NEW.metric_name)); + END IF; END IF; EXECUTE format('GRANT ALL PRIVILEGES ON TABLE %I.%I TO prom_admin', NEW.table_schema, NEW.table_name); @@ -1509,10 +1559,8 @@ AS $func$ RETURN prom_api.get_default_chunk_interval(); END IF; - SELECT time_interval - INTO STRICT _chunk_interval - FROM timescaledb_information.dimensions - WHERE hypertable_schema = 'prom_data' AND hypertable_name = _table_name AND column_name = 'time'; + SELECT _prom_catalog.get_chunk_interval('prom_data', _table_name) + INTO STRICT _chunk_interval; RETURN _chunk_interval; END @@ -1540,6 +1588,152 @@ COMMENT ON FUNCTION prom_api.reset_metric_chunk_interval(TEXT) IS 'resets the chunk interval for a specific metric to using the default'; GRANT EXECUTE ON FUNCTION prom_api.reset_metric_chunk_interval(TEXT) TO prom_admin; +CREATE OR REPLACE FUNCTION _prom_catalog.set_compress_chunk_interval_on_metric_table(metric_name TEXT, new_interval INTERVAL) + RETURNS void + VOLATILE + SECURITY DEFINER + SET search_path = pg_catalog, pg_temp +AS $func$ + DECLARE + _table_name TEXT; + BEGIN + IF NOT _prom_catalog.is_timescaledb_installed() THEN + RAISE EXCEPTION 'cannot set compress chunk time interval without timescaledb installed'; + END IF; + + SELECT table_name + INTO STRICT _table_name + FROM _prom_catalog.get_or_create_metric_table_name(metric_name); + + EXECUTE format($$ + ALTER TABLE prom_data.%I SET ( + timescaledb.compress_chunk_time_interval = '%s' + ) $$, + _table_name, + _prom_catalog.calculate_compress_chunk_interval(_prom_catalog.get_chunk_interval('prom_data', _table_name), new_interval) + ); + END +$func$ +LANGUAGE PLPGSQL; +--redundant given schema settings but extra caution for security definers +REVOKE ALL ON FUNCTION _prom_catalog.set_compress_chunk_interval_on_metric_table(TEXT, INTERVAL) FROM PUBLIC; +GRANT EXECUTE ON FUNCTION _prom_catalog.set_compress_chunk_interval_on_metric_table(TEXT, INTERVAL) TO prom_admin; + +CREATE OR REPLACE FUNCTION prom_api.set_default_compress_chunk_interval(compress_chunk_interval INTERVAL) + RETURNS BOOLEAN + VOLATILE + SET search_path = pg_catalog, pg_temp +AS $$ + SELECT _prom_catalog.set_default_value('compress_chunk_interval', compress_chunk_interval::pg_catalog.text); + + SELECT _prom_catalog.set_compress_chunk_interval_on_metric_table(metric_name, chunk_interval) + FROM _prom_catalog.metric + WHERE default_compress_chunk_interval + AND NOT is_view; + + SELECT true; +$$ +LANGUAGE SQL; +COMMENT ON FUNCTION prom_api.set_default_compress_chunk_interval(INTERVAL) +IS 'set the compress chunk interval for any metrics (existing and new) without an explicit override'; +GRANT EXECUTE ON FUNCTION prom_api.set_default_chunk_interval(INTERVAL) TO prom_admin; + +CREATE OR REPLACE FUNCTION prom_api.get_default_compress_chunk_interval() + RETURNS INTERVAL + SET search_path = pg_catalog, pg_temp +AS $func$ + SELECT _prom_catalog.get_default_value('compress_chunk_interval')::pg_catalog.interval; +$func$ + LANGUAGE SQL; +COMMENT ON FUNCTION prom_api.get_default_chunk_interval() + IS 'Get the default compress chunk interval for all metrics'; +GRANT EXECUTE ON FUNCTION prom_api.get_default_compress_chunk_interval() TO prom_admin; + +CREATE OR REPLACE FUNCTION prom_api.set_metric_compress_chunk_interval(metric_name TEXT, compress_chunk_interval INTERVAL) + RETURNS BOOLEAN + VOLATILE + SET search_path = pg_catalog, pg_temp +AS $func$ +BEGIN + --use get_or_create_metric_table_name because we want to be able to set /before/ any data is ingested + --needs to run before update so row exists before update. + PERFORM _prom_catalog.get_or_create_metric_table_name(set_metric_compress_chunk_interval.metric_name); + + UPDATE _prom_catalog.metric SET default_compress_chunk_interval = false + WHERE id IN (SELECT id FROM _prom_catalog.get_metric_table_name_if_exists('prom_data', set_metric_compress_chunk_interval.metric_name)) + AND NOT is_view; + + IF NOT FOUND THEN + RETURN false; + END IF; + + PERFORM _prom_catalog.set_compress_chunk_interval_on_metric_table(metric_name, compress_chunk_interval); + + RETURN true; +END +$func$ +LANGUAGE plpgsql; +COMMENT ON FUNCTION prom_api.set_metric_compress_chunk_interval(TEXT, INTERVAL) +IS 'set a compress chunk interval for a specific metric (this overrides the default)'; +GRANT EXECUTE ON FUNCTION prom_api.set_metric_compress_chunk_interval(TEXT, INTERVAL) TO prom_admin; + +CREATE OR REPLACE FUNCTION prom_api.get_metric_compress_chunk_interval(metric_name TEXT) + RETURNS INTERVAL + SET search_path = pg_catalog, pg_temp +AS $func$ + DECLARE + _table_name TEXT; + _is_default BOOLEAN; + _compress_chunk_interval INTERVAL; + BEGIN + SELECT table_name, default_compress_chunk_interval + INTO STRICT _table_name, _is_default + FROM _prom_catalog.metric WHERE table_schema = 'prom_data' AND metric.metric_name = get_metric_compress_chunk_interval.metric_name; + IF _is_default THEN + RETURN prom_api.get_default_compress_chunk_interval(); + END IF; + + SELECT _timescaledb_internal.to_interval (dim.compress_interval_length) + INTO STRICT _compress_chunk_interval + FROM _timescaledb_catalog.hypertable ht, + _timescaledb_catalog.dimension dim + WHERE dim.hypertable_id = ht.id + AND ht.schema_name = 'prom_data' + AND ht.table_name = _table_name; + + RETURN _compress_chunk_interval; + END +$func$ +LANGUAGE plpgsql; +COMMENT ON FUNCTION prom_api.get_metric_compress_chunk_interval(TEXT) + IS 'Get the compress chunk interval for a specific metric, or the default compress chunk interval if not explicitly set'; +GRANT EXECUTE ON FUNCTION prom_api.get_metric_compress_chunk_interval(TEXT) TO prom_admin; + +CREATE OR REPLACE FUNCTION prom_api.reset_metric_compress_chunk_interval(metric_name TEXT) + RETURNS BOOLEAN + VOLATILE + SET search_path = pg_catalog, pg_temp +AS $func$ +BEGIN + UPDATE _prom_catalog.metric SET default_compress_chunk_interval = true + WHERE id = (SELECT id FROM _prom_catalog.get_metric_table_name_if_exists('prom_data', metric_name)) + AND NOT is_view; + + IF NOT FOUND THEN + RETURN false; + END IF; + + PERFORM _prom_catalog.set_compress_chunk_interval_on_metric_table(metric_name, + _prom_catalog.get_default_compress_chunk_interval()); + + RETURN true; +END +$func$ +LANGUAGE plpgsql; +COMMENT ON FUNCTION prom_api.reset_metric_compress_chunk_interval(TEXT) +IS 'resets the compress chunk interval for a specific metric to using the default'; +GRANT EXECUTE ON FUNCTION prom_api.reset_metric_chunk_interval(TEXT) TO prom_admin; + CREATE OR REPLACE FUNCTION _prom_catalog.get_metric_retention_period(schema_name TEXT, metric_name TEXT) RETURNS INTERVAL SET search_path = pg_catalog, pg_temp diff --git a/migration/incremental/035-set-compress-chunk-interval.sql b/migration/incremental/035-set-compress-chunk-interval.sql new file mode 100644 index 00000000..cd7e7d05 --- /dev/null +++ b/migration/incremental/035-set-compress-chunk-interval.sql @@ -0,0 +1,63 @@ +ALTER TABLE _prom_catalog.metric ADD COLUMN default_compress_chunk_interval BOOLEAN NOT NULL DEFAULT true; + +-- Set chunk interval and compress chunk interval to new defaults. +-- Chunk interval will only be updated if it is set to the previous initial a.k.a. non-user default. +DO $$ +DECLARE + r RECORD; + _chunk_time_initial_default TEXT; + _previous_chunk_time_initial_default TEXT = (INTERVAL '8 hours')::TEXT; + _chunk_time_interval INTERVAL; +BEGIN + SELECT coalesce(d.value, _previous_chunk_time_initial_default) + INTO _chunk_time_initial_default + FROM _prom_catalog.default d + WHERE d.key = 'chunk_interval'; + + IF NOT FOUND THEN + _chunk_time_initial_default := _previous_chunk_time_initial_default; + END IF; + + IF NOT _prom_catalog.is_timescaledb_oss() + THEN + + FOR r IN + SELECT * + FROM _prom_catalog.metric m + WHERE NOT m.is_view + LOOP + IF _chunk_time_initial_default = _previous_chunk_time_initial_default + AND r.default_chunk_interval + THEN + SELECT INTERVAL '1 hour' * (1.0+((random()*0.01)-0.005)) + INTO STRICT _chunk_time_interval; + + EXECUTE public.set_chunk_time_interval( + format('prom_data.%I', r.table_name), + _chunk_time_interval + ); + ELSE + SELECT time_interval + INTO STRICT _chunk_time_interval + FROM _prom_catalog.metric m + INNER JOIN timescaledb_information.dimensions d + ON m.table_name = d.hypertable_name + AND m.table_schema = d.hypertable_schema + WHERE m.metric_name = r.metric_name + AND d.column_name = 'time'; + END IF; + + BEGIN + EXECUTE format($query$ + ALTER TABLE prom_data.%I SET ( + timescaledb.compress_chunk_time_interval = '%s' + ) $query$, + r.table_name, + (FLOOR(EXTRACT(EPOCH FROM INTERVAL '24 hours')/EXTRACT(EPOCH FROM _chunk_time_interval)) * _chunk_time_interval)::text + ); + EXCEPTION WHEN SQLSTATE '01000' THEN + END; + END LOOP; + END IF; +END +$$;