Skip to content

Add materialization invalidations API #8027

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
May 6, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .unreleased/pr_8027
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Implements: #8027 Add materialization invalidations API
1 change: 1 addition & 0 deletions cmake/ScriptFiles.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ set(SOURCE_FILES
job_api.sql
policy_api.sql
policy_internal.sql
cagg_api.sql
cagg_utils.sql
cagg_migrate.sql
job_stat_history_log_retention.sql
Expand Down
166 changes: 166 additions & 0 deletions sql/cagg_api.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
-- This file and its contents are licensed under the Apache License 2.0.
-- Please see the included NOTICE for copyright information and
-- LICENSE-APACHE for a copy of the license.

-- Get information about the materialization table and bucket width.
CREATE OR REPLACE FUNCTION _timescaledb_functions.get_materialization_info(
continuous_aggregate REGCLASS
) RETURNS RECORD AS
$body$
DECLARE
info RECORD;
BEGIN
SELECT mat_hypertable_id AS materialization_id,
bucket_width::interval AS bucket_width
INTO info
FROM _timescaledb_catalog.continuous_agg,
LATERAL _timescaledb_functions.cagg_get_bucket_function_info(mat_hypertable_id)
WHERE format('%I.%I', user_view_schema, user_view_name)::regclass = continuous_aggregate;

IF NOT FOUND THEN
RAISE '"%" is not a continuous aggregate', continuous_aggregate
USING ERRCODE = 'wrong_object_type';
END IF;

RETURN info;
END
$body$
LANGUAGE plpgsql
SET search_path TO pg_catalog, pg_temp;

-- Add new invalidations to the materialization invalidation log.
--
-- This will add the range to the materialization invalidations for
-- the continuous aggregate. The range will automatically be "aligned"
-- to the bucket width to ensure that it covers all buckets that it
-- touches.
CREATE OR REPLACE PROCEDURE _timescaledb_functions.add_materialization_invalidations(
continuous_aggregate regclass,
invalidation tsrange
) AS
$body$
DECLARE
info RECORD := _timescaledb_functions.get_materialization_info(continuous_aggregate);
aligned TSRANGE := _timescaledb_functions.align_to_bucket(info.bucket_width, invalidation);
BEGIN
INSERT INTO _timescaledb_catalog.continuous_aggs_materialization_invalidation_log
VALUES (info.materialization_id,
_timescaledb_functions.to_unix_microseconds(lower(aligned)),
_timescaledb_functions.to_unix_microseconds(upper(aligned)));
END
$body$
LANGUAGE plpgsql
SET search_path TO pg_catalog, pg_temp;

CREATE OR REPLACE PROCEDURE _timescaledb_functions.add_materialization_invalidations(
continuous_aggregate REGCLASS,
invalidation TSTZRANGE
) AS
$body$
DECLARE
info RECORD := _timescaledb_functions.get_materialization_info(continuous_aggregate);
aligned TSTZRANGE := _timescaledb_functions.align_to_bucket(info.bucket_width, invalidation);
BEGIN
INSERT INTO _timescaledb_catalog.continuous_aggs_materialization_invalidation_log
VALUES (info.materialization_id,
_timescaledb_functions.to_unix_microseconds(lower(aligned)),
_timescaledb_functions.to_unix_microseconds(upper(aligned)));
END
$body$
LANGUAGE plpgsql
SET search_path TO pg_catalog, pg_temp;

-- Get raw ranges from the materialization invalidation log
--
-- This is a cleaned-up version of the timestamps, still in Unix
-- microseconds, with nulls for '-infinity' and '+infinity' and
-- invalid entries removed.
CREATE OR REPLACE FUNCTION _timescaledb_functions.get_raw_materialization_ranges(typ regtype)
RETURNS TABLE (materialization_id integer,
lowest_modified_value bigint,
greatest_modified_value bigint)
AS $$
WITH
min_max_values AS MATERIALIZED (
SELECT _timescaledb_functions.get_internal_time_min(typ) AS min,
_timescaledb_functions.get_internal_time_max(typ) AS max
)
SELECT materialization_id,
NULLIF(lowest_modified_value, min_max_values.min),
NULLIF(greatest_modified_value, min_max_values.max)
FROM _timescaledb_catalog.continuous_aggs_materialization_invalidation_log, min_max_values
WHERE lowest_modified_value
BETWEEN min_max_values.min
AND min_max_values.max
AND greatest_modified_value
BETWEEN min_max_values.min
AND min_max_values.max
$$
LANGUAGE SQL
SET search_path TO pg_catalog, pg_temp;

-- Get materialization invalidations for a continuous aggregate.
--
-- Note that this will modify the materialization invalidation table
-- to be able to extract the restricted range of invalidations.
CREATE OR REPLACE FUNCTION _timescaledb_functions.get_materialization_invalidations(
continuous_aggregate REGCLASS,
restriction TSTZRANGE
) RETURNS TABLE (invalidations TSTZMULTIRANGE) AS
$body$
DECLARE
info RECORD := _timescaledb_functions.get_materialization_info(continuous_aggregate);
aligned TSTZRANGE := _timescaledb_functions.align_to_bucket(info.bucket_width, restriction);
BEGIN
-- Compute the multirange for the invalidations inside the
-- restriction passed down to the function and return the ranges.
RETURN QUERY
WITH
ranges AS (
SELECT materialization_id,
range_agg(_timescaledb_functions.make_multirange_from_internal_time(
null::tstzrange,
lowest_modified_value,
greatest_modified_value)) AS invals
FROM _timescaledb_functions.get_raw_materialization_ranges('timestamptz'::regtype)
GROUP BY materialization_id
)
SELECT range_agg(invals * multirange(aligned))
FROM ranges
WHERE invals && aligned
AND materialization_id = info.materialization_id;
END
$body$
LANGUAGE plpgsql
SET search_path TO pg_catalog, pg_temp;

CREATE OR REPLACE FUNCTION _timescaledb_functions.get_materialization_invalidations(
continuous_aggregate REGCLASS,
restriction TSRANGE
) RETURNS TABLE (invalidations TSMULTIRANGE) AS
$body$
DECLARE
info RECORD := _timescaledb_functions.get_materialization_info(continuous_aggregate);
aligned TSRANGE := _timescaledb_functions.align_to_bucket(info.bucket_width, restriction);
BEGIN
-- Compute the multirange for the invalidations inside the
-- restriction passed down to the function and return the ranges.
RETURN QUERY
WITH
ranges AS (
SELECT materialization_id,
range_agg(_timescaledb_functions.make_multirange_from_internal_time(
null::tsrange,
lowest_modified_value,
greatest_modified_value)) AS invals
FROM _timescaledb_functions.get_raw_materialization_ranges('timestamp'::regtype)
GROUP BY materialization_id
)
SELECT range_agg(invals * multirange(aligned))
FROM ranges
WHERE invals && aligned
AND materialization_id = info.materialization_id;
END
$body$
LANGUAGE plpgsql
SET search_path TO pg_catalog, pg_temp;
6 changes: 6 additions & 0 deletions sql/updates/reverse-dev.sql
Original file line number Diff line number Diff line change
Expand Up @@ -237,3 +237,9 @@ DROP FUNCTION _timescaledb_functions.make_multirange_from_internal_time(TSRANGE,
DROP FUNCTION _timescaledb_functions.make_range_from_internal_time(ANYRANGE, ANYELEMENT, ANYELEMENT);
DROP FUNCTION _timescaledb_functions.get_internal_time_min(REGTYPE);
DROP FUNCTION _timescaledb_functions.get_internal_time_max(REGTYPE);
DROP PROCEDURE _timescaledb_functions.add_materialization_invalidations(REGCLASS,TSRANGE);
DROP PROCEDURE _timescaledb_functions.add_materialization_invalidations(REGCLASS,TSTZRANGE);
DROP FUNCTION _timescaledb_functions.get_raw_materialization_ranges(REGTYPE);
DROP FUNCTION _timescaledb_functions.get_materialization_invalidations(REGCLASS, TSTZRANGE);
DROP FUNCTION _timescaledb_functions.get_materialization_invalidations(REGCLASS, TSRANGE);
DROP FUNCTION _timescaledb_functions.get_materialization_info(REGCLASS);
202 changes: 202 additions & 0 deletions tsl/test/expected/cagg_api.out
Original file line number Diff line number Diff line change
@@ -0,0 +1,202 @@
-- This file and its contents are licensed under the Timescale License.
-- Please see the included NOTICE for copyright information and
-- LICENSE-TIMESCALE for a copy of the license.
\c :TEST_DBNAME :ROLE_SUPERUSER
SET timezone TO CET;
SET datestyle TO ISO;
CREATE VIEW continuous_aggregates AS
SELECT mat_hypertable_id AS materialization_id,
format('%I.%I', user_view_schema, user_view_name)::regclass AS continuous_aggregate
FROM _timescaledb_catalog.hypertable
JOIN _timescaledb_catalog.continuous_agg
ON hypertable.id = continuous_agg.mat_hypertable_id;
CREATE TABLE hyper_ts (time timestamp NOT NULL, value float);
CREATE TABLE hyper_tstz (time timestamptz NOT NULL, value float);
CREATE TABLE hyper_multi (time timestamptz NOT NULL, device int, value float);
CREATE TABLE hyper_no_cagg (time timestamptz NOT NULL, device int, value float);
CREATE TABLE normal_ts(time timestamp NOT NULL, value float);
SELECT * FROM create_hypertable('hyper_ts', 'time');
WARNING: column type "timestamp without time zone" used for "time" does not follow best practices
hypertable_id | schema_name | table_name | created
---------------+-------------+------------+---------
1 | public | hyper_ts | t
(1 row)

SELECT * FROM create_hypertable('hyper_tstz', 'time');
hypertable_id | schema_name | table_name | created
---------------+-------------+------------+---------
2 | public | hyper_tstz | t
(1 row)

SELECT * FROM create_hypertable('hyper_no_cagg', 'time');
hypertable_id | schema_name | table_name | created
---------------+-------------+---------------+---------
3 | public | hyper_no_cagg | t
(1 row)

SELECT * FROM create_hypertable('hyper_multi', 'time', 'device', 4);
hypertable_id | schema_name | table_name | created
---------------+-------------+-------------+---------
4 | public | hyper_multi | t
(1 row)

INSERT INTO hyper_ts
SELECT time, ceil(random() * 100)::int
FROM generate_series('2025-01-01'::timestamp,
'2025-01-06', '1m') time;
INSERT INTO hyper_tstz
SELECT time, ceil(random() * 100)::int
FROM generate_series('2025-01-01'::timestamptz,
'2025-01-06', '1m') time;
CREATE MATERIALIZED VIEW ts_temperature_1h
WITH (timescaledb.continuous) AS
SELECT time_bucket('1 hour', time), avg(value)
FROM hyper_ts
GROUP BY 1;
NOTICE: refreshing continuous aggregate "ts_temperature_1h"
CREATE MATERIALIZED VIEW ts_temperature_15m
WITH (timescaledb.continuous) AS
SELECT time_bucket('15 minutes', time), avg(value)
FROM hyper_ts
GROUP BY 1;
NOTICE: refreshing continuous aggregate "ts_temperature_15m"
CREATE MATERIALIZED VIEW tstz_temperature_1h
WITH (timescaledb.continuous) AS
SELECT time_bucket('1 hour', time), avg(value)
FROM hyper_tstz
GROUP BY 1;
NOTICE: refreshing continuous aggregate "tstz_temperature_1h"
CREATE MATERIALIZED VIEW tstz_temperature_15m
WITH (timescaledb.continuous) AS
SELECT time_bucket('15 minutes', time), avg(value)
FROM hyper_tstz
GROUP BY 1;
NOTICE: refreshing continuous aggregate "tstz_temperature_15m"
CREATE MATERIALIZED VIEW multi_temperature_15m
WITH (timescaledb.continuous) AS
SELECT time_bucket('15 minutes', time), avg(value)
FROM hyper_multi
GROUP BY 1;
NOTICE: continuous aggregate "multi_temperature_15m" is already up-to-date
SET search_path TO _timescaledb_functions, public;
-- These are not part of the API, but we test them here just to make
-- sure they work as expected.
SELECT table_name, get_materialization_info(table_name)
FROM (
VALUES ('tstz_temperature_15m'), ('multi_temperature_15m')
) t(table_name);
table_name | get_materialization_info
-----------------------+--------------------------
tstz_temperature_15m | (8,"@ 15 mins")
multi_temperature_15m | (9,"@ 15 mins")
(2 rows)

\set ON_ERROR_STOP 0
SELECT get_materialization_info('hyper_no_cagg');
ERROR: "public.hyper_no_cagg" is not a continuous aggregate
\set ON_ERROR_STOP 1
-- This is not part of the API either, but added a test here to make
-- sure that it works as expected.
SELECT materialization_id,
to_timestamp(lowest_modified_value),
to_timestamp(greatest_modified_value)
FROM get_raw_materialization_ranges('timestamptz');
materialization_id | to_timestamp | to_timestamp
--------------------+--------------+--------------
(0 rows)

-- Here are tests of the API
SELECT *
INTO before
FROM _timescaledb_catalog.continuous_aggs_materialization_invalidation_log;
CALL _timescaledb_functions.add_materialization_invalidations(
'tstz_temperature_15m'::regclass,
'["2025-04-25 11:10:00+02","2025-04-26 11:14:00+02"]'::tstzrange
);
CALL _timescaledb_functions.add_materialization_invalidations(
'ts_temperature_15m'::regclass,
'["2025-04-25 11:10:00+02","2025-04-26 11:14:00+02"]'::tsrange
);
-- Custom refresh function that iterate over the ranges inside the
-- restriction and refresh them. This is to check that the refresh
-- function does the right thing with the ranges returned by the API
-- function.
CREATE PROCEDURE custom_refresh(cagg REGCLASS, restriction ANYRANGE) AS
$body$
DECLARE
inval restriction%TYPE;
BEGIN
FOR inval IN
SELECT UNNEST(invalidations)
FROM _timescaledb_functions.get_materialization_invalidations(cagg, restriction)
LOOP
RAISE NOTICE 'Updating range %', inval;
CALL refresh_continuous_aggregate(cagg, lower(inval), upper(inval));
COMMIT;
END LOOP;
END
$body$
LANGUAGE plpgsql;
SELECT continuous_aggregate,
to_timestamp(lhs.lowest_modified_value),
to_timestamp(lhs.greatest_modified_value)
FROM _timescaledb_catalog.continuous_aggs_materialization_invalidation_log lhs
JOIN continuous_aggregates USING (materialization_id)
LEFT JOIN before rhs ON row(lhs.*) = row(rhs.*)
WHERE lhs.materialization_id IS NULL OR rhs.materialization_id IS NULL;
continuous_aggregate | to_timestamp | to_timestamp
----------------------+------------------------+------------------------
ts_temperature_15m | 2025-04-25 11:00:00+02 | 2025-04-26 11:15:00+02
tstz_temperature_15m | 2025-04-25 11:00:00+02 | 2025-04-26 11:15:00+02
(2 rows)

SELECT materialization_id,
to_timestamp(lowest_modified_value),
to_timestamp(greatest_modified_value)
FROM get_raw_materialization_ranges('timestamptz');
materialization_id | to_timestamp | to_timestamp
--------------------+------------------------+------------------------
6 | 2025-04-25 11:00:00+02 | 2025-04-26 11:15:00+02
8 | 2025-04-25 11:00:00+02 | 2025-04-26 11:15:00+02
(2 rows)

SELECT * FROM _timescaledb_functions.get_materialization_invalidations(
'ts_temperature_15m'::regclass,
'["2025-04-25","2025-04-26"]'::tsrange
);
invalidations
-------------------------------------------------
{["2025-04-25 09:00:00","2025-04-26 00:00:00")}
(1 row)

CALL custom_refresh('ts_temperature_15m', '["2025-04-25","2025-04-26"]'::tsrange);
NOTICE: Updating range ["2025-04-25 09:00:00","2025-04-26 00:00:00")
SELECT * FROM _timescaledb_functions.get_materialization_invalidations(
'ts_temperature_15m'::regclass,
'["2025-04-25","2025-04-26"]'::tsrange
);
invalidations
--------------------------------------------------------
{["2025-04-25 00:00:00","2025-04-25 08:59:59.999999")}
(1 row)

SELECT * FROM _timescaledb_functions.get_materialization_invalidations(
'tstz_temperature_15m'::regclass,
'["2025-04-25","2025-04-26"]'::tstzrange
);
invalidations
-------------------------------------------------------
{["2025-04-25 11:00:00+02","2025-04-26 00:00:00+02")}
(1 row)

CALL custom_refresh('tstz_temperature_15m', '["2025-04-25","2025-04-26"]'::tsrange);
NOTICE: Updating range ["2025-04-25 09:00:00","2025-04-26 00:00:00")
SELECT * FROM _timescaledb_functions.get_materialization_invalidations(
'tstz_temperature_15m'::regclass,
'["2025-04-25","2025-04-26"]'::tstzrange
);
invalidations
--------------------------------------------------------------
{["2025-04-25 00:00:00+02","2025-04-25 10:59:59.999999+02")}
(1 row)

Loading
Loading