-
Notifications
You must be signed in to change notification settings - Fork 943
Add materialization invalidations API #8027
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
Implements: #8027 Add materialization invalidations API |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,166 @@ | ||
-- This file and its contents are licensed under the Apache License 2.0. | ||
-- Please see the included NOTICE for copyright information and | ||
-- LICENSE-APACHE for a copy of the license. | ||
|
||
-- Get information about the materialization table and bucket width. | ||
CREATE OR REPLACE FUNCTION _timescaledb_functions.get_materialization_info( | ||
continuous_aggregate REGCLASS | ||
) RETURNS RECORD AS | ||
mkindahl marked this conversation as resolved.
Show resolved
Hide resolved
|
||
$body$ | ||
DECLARE | ||
info RECORD; | ||
BEGIN | ||
SELECT mat_hypertable_id AS materialization_id, | ||
bucket_width::interval AS bucket_width | ||
INTO info | ||
FROM _timescaledb_catalog.continuous_agg, | ||
LATERAL _timescaledb_functions.cagg_get_bucket_function_info(mat_hypertable_id) | ||
WHERE format('%I.%I', user_view_schema, user_view_name)::regclass = continuous_aggregate; | ||
|
||
IF NOT FOUND THEN | ||
RAISE '"%" is not a continuous aggregate', continuous_aggregate | ||
USING ERRCODE = 'wrong_object_type'; | ||
END IF; | ||
|
||
RETURN info; | ||
END | ||
$body$ | ||
LANGUAGE plpgsql | ||
SET search_path TO pg_catalog, pg_temp; | ||
|
||
-- Add new invalidations to the materialization invalidation log. | ||
-- | ||
-- This will add the range to the materialization invalidations for | ||
-- the continuous aggregate. The range will automatically be "aligned" | ||
-- to the bucket width to ensure that it covers all buckets that it | ||
-- touches. | ||
CREATE OR REPLACE PROCEDURE _timescaledb_functions.add_materialization_invalidations( | ||
continuous_aggregate regclass, | ||
invalidation tsrange | ||
mkindahl marked this conversation as resolved.
Show resolved
Hide resolved
|
||
) AS | ||
$body$ | ||
DECLARE | ||
info RECORD := _timescaledb_functions.get_materialization_info(continuous_aggregate); | ||
aligned TSRANGE := _timescaledb_functions.align_to_bucket(info.bucket_width, invalidation); | ||
BEGIN | ||
INSERT INTO _timescaledb_catalog.continuous_aggs_materialization_invalidation_log | ||
VALUES (info.materialization_id, | ||
_timescaledb_functions.to_unix_microseconds(lower(aligned)), | ||
_timescaledb_functions.to_unix_microseconds(upper(aligned))); | ||
mkindahl marked this conversation as resolved.
Show resolved
Hide resolved
|
||
END | ||
$body$ | ||
LANGUAGE plpgsql | ||
SET search_path TO pg_catalog, pg_temp; | ||
|
||
CREATE OR REPLACE PROCEDURE _timescaledb_functions.add_materialization_invalidations( | ||
continuous_aggregate REGCLASS, | ||
invalidation TSTZRANGE | ||
) AS | ||
$body$ | ||
DECLARE | ||
info RECORD := _timescaledb_functions.get_materialization_info(continuous_aggregate); | ||
aligned TSTZRANGE := _timescaledb_functions.align_to_bucket(info.bucket_width, invalidation); | ||
BEGIN | ||
INSERT INTO _timescaledb_catalog.continuous_aggs_materialization_invalidation_log | ||
VALUES (info.materialization_id, | ||
_timescaledb_functions.to_unix_microseconds(lower(aligned)), | ||
_timescaledb_functions.to_unix_microseconds(upper(aligned))); | ||
END | ||
$body$ | ||
LANGUAGE plpgsql | ||
SET search_path TO pg_catalog, pg_temp; | ||
|
||
-- Get raw ranges from the materialization invalidation log | ||
-- | ||
-- This is a cleaned-up version of the timestamps, still in Unix | ||
-- microseconds, with nulls for '-infinity' and '+infinity' and | ||
-- invalid entries removed. | ||
CREATE OR REPLACE FUNCTION _timescaledb_functions.get_raw_materialization_ranges(typ regtype) | ||
mkindahl marked this conversation as resolved.
Show resolved
Hide resolved
|
||
RETURNS TABLE (materialization_id integer, | ||
mkindahl marked this conversation as resolved.
Show resolved
Hide resolved
|
||
lowest_modified_value bigint, | ||
greatest_modified_value bigint) | ||
AS $$ | ||
WITH | ||
min_max_values AS MATERIALIZED ( | ||
SELECT _timescaledb_functions.get_internal_time_min(typ) AS min, | ||
_timescaledb_functions.get_internal_time_max(typ) AS max | ||
) | ||
SELECT materialization_id, | ||
NULLIF(lowest_modified_value, min_max_values.min), | ||
NULLIF(greatest_modified_value, min_max_values.max) | ||
FROM _timescaledb_catalog.continuous_aggs_materialization_invalidation_log, min_max_values | ||
WHERE lowest_modified_value | ||
BETWEEN min_max_values.min | ||
AND min_max_values.max | ||
AND greatest_modified_value | ||
BETWEEN min_max_values.min | ||
AND min_max_values.max | ||
$$ | ||
LANGUAGE SQL | ||
SET search_path TO pg_catalog, pg_temp; | ||
|
||
-- Get materialization invalidations for a continuous aggregate. | ||
-- | ||
-- Note that this will modify the materialization invalidation table | ||
-- to be able to extract the restricted range of invalidations. | ||
CREATE OR REPLACE FUNCTION _timescaledb_functions.get_materialization_invalidations( | ||
continuous_aggregate REGCLASS, | ||
restriction TSTZRANGE | ||
) RETURNS TABLE (invalidations TSTZMULTIRANGE) AS | ||
$body$ | ||
DECLARE | ||
info RECORD := _timescaledb_functions.get_materialization_info(continuous_aggregate); | ||
aligned TSTZRANGE := _timescaledb_functions.align_to_bucket(info.bucket_width, restriction); | ||
BEGIN | ||
-- Compute the multirange for the invalidations inside the | ||
-- restriction passed down to the function and return the ranges. | ||
RETURN QUERY | ||
WITH | ||
ranges AS ( | ||
SELECT materialization_id, | ||
range_agg(_timescaledb_functions.make_multirange_from_internal_time( | ||
null::tstzrange, | ||
lowest_modified_value, | ||
greatest_modified_value)) AS invals | ||
FROM _timescaledb_functions.get_raw_materialization_ranges('timestamptz'::regtype) | ||
GROUP BY materialization_id | ||
) | ||
SELECT range_agg(invals * multirange(aligned)) | ||
FROM ranges | ||
WHERE invals && aligned | ||
AND materialization_id = info.materialization_id; | ||
END | ||
$body$ | ||
LANGUAGE plpgsql | ||
SET search_path TO pg_catalog, pg_temp; | ||
|
||
CREATE OR REPLACE FUNCTION _timescaledb_functions.get_materialization_invalidations( | ||
continuous_aggregate REGCLASS, | ||
restriction TSRANGE | ||
) RETURNS TABLE (invalidations TSMULTIRANGE) AS | ||
$body$ | ||
DECLARE | ||
info RECORD := _timescaledb_functions.get_materialization_info(continuous_aggregate); | ||
aligned TSRANGE := _timescaledb_functions.align_to_bucket(info.bucket_width, restriction); | ||
BEGIN | ||
-- Compute the multirange for the invalidations inside the | ||
-- restriction passed down to the function and return the ranges. | ||
RETURN QUERY | ||
WITH | ||
ranges AS ( | ||
SELECT materialization_id, | ||
range_agg(_timescaledb_functions.make_multirange_from_internal_time( | ||
null::tsrange, | ||
lowest_modified_value, | ||
greatest_modified_value)) AS invals | ||
FROM _timescaledb_functions.get_raw_materialization_ranges('timestamp'::regtype) | ||
GROUP BY materialization_id | ||
) | ||
SELECT range_agg(invals * multirange(aligned)) | ||
FROM ranges | ||
WHERE invals && aligned | ||
AND materialization_id = info.materialization_id; | ||
END | ||
$body$ | ||
LANGUAGE plpgsql | ||
SET search_path TO pg_catalog, pg_temp; |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,202 @@ | ||
-- This file and its contents are licensed under the Timescale License. | ||
-- Please see the included NOTICE for copyright information and | ||
-- LICENSE-TIMESCALE for a copy of the license. | ||
\c :TEST_DBNAME :ROLE_SUPERUSER | ||
SET timezone TO CET; | ||
SET datestyle TO ISO; | ||
CREATE VIEW continuous_aggregates AS | ||
SELECT mat_hypertable_id AS materialization_id, | ||
format('%I.%I', user_view_schema, user_view_name)::regclass AS continuous_aggregate | ||
FROM _timescaledb_catalog.hypertable | ||
JOIN _timescaledb_catalog.continuous_agg | ||
ON hypertable.id = continuous_agg.mat_hypertable_id; | ||
CREATE TABLE hyper_ts (time timestamp NOT NULL, value float); | ||
CREATE TABLE hyper_tstz (time timestamptz NOT NULL, value float); | ||
CREATE TABLE hyper_multi (time timestamptz NOT NULL, device int, value float); | ||
CREATE TABLE hyper_no_cagg (time timestamptz NOT NULL, device int, value float); | ||
CREATE TABLE normal_ts(time timestamp NOT NULL, value float); | ||
SELECT * FROM create_hypertable('hyper_ts', 'time'); | ||
WARNING: column type "timestamp without time zone" used for "time" does not follow best practices | ||
hypertable_id | schema_name | table_name | created | ||
---------------+-------------+------------+--------- | ||
1 | public | hyper_ts | t | ||
(1 row) | ||
|
||
SELECT * FROM create_hypertable('hyper_tstz', 'time'); | ||
hypertable_id | schema_name | table_name | created | ||
---------------+-------------+------------+--------- | ||
2 | public | hyper_tstz | t | ||
(1 row) | ||
|
||
SELECT * FROM create_hypertable('hyper_no_cagg', 'time'); | ||
hypertable_id | schema_name | table_name | created | ||
---------------+-------------+---------------+--------- | ||
3 | public | hyper_no_cagg | t | ||
(1 row) | ||
|
||
SELECT * FROM create_hypertable('hyper_multi', 'time', 'device', 4); | ||
hypertable_id | schema_name | table_name | created | ||
---------------+-------------+-------------+--------- | ||
4 | public | hyper_multi | t | ||
(1 row) | ||
|
||
INSERT INTO hyper_ts | ||
SELECT time, ceil(random() * 100)::int | ||
FROM generate_series('2025-01-01'::timestamp, | ||
'2025-01-06', '1m') time; | ||
INSERT INTO hyper_tstz | ||
SELECT time, ceil(random() * 100)::int | ||
FROM generate_series('2025-01-01'::timestamptz, | ||
'2025-01-06', '1m') time; | ||
CREATE MATERIALIZED VIEW ts_temperature_1h | ||
WITH (timescaledb.continuous) AS | ||
SELECT time_bucket('1 hour', time), avg(value) | ||
FROM hyper_ts | ||
GROUP BY 1; | ||
NOTICE: refreshing continuous aggregate "ts_temperature_1h" | ||
CREATE MATERIALIZED VIEW ts_temperature_15m | ||
WITH (timescaledb.continuous) AS | ||
SELECT time_bucket('15 minutes', time), avg(value) | ||
FROM hyper_ts | ||
GROUP BY 1; | ||
NOTICE: refreshing continuous aggregate "ts_temperature_15m" | ||
CREATE MATERIALIZED VIEW tstz_temperature_1h | ||
WITH (timescaledb.continuous) AS | ||
SELECT time_bucket('1 hour', time), avg(value) | ||
FROM hyper_tstz | ||
GROUP BY 1; | ||
NOTICE: refreshing continuous aggregate "tstz_temperature_1h" | ||
CREATE MATERIALIZED VIEW tstz_temperature_15m | ||
WITH (timescaledb.continuous) AS | ||
SELECT time_bucket('15 minutes', time), avg(value) | ||
FROM hyper_tstz | ||
GROUP BY 1; | ||
NOTICE: refreshing continuous aggregate "tstz_temperature_15m" | ||
CREATE MATERIALIZED VIEW multi_temperature_15m | ||
WITH (timescaledb.continuous) AS | ||
SELECT time_bucket('15 minutes', time), avg(value) | ||
FROM hyper_multi | ||
GROUP BY 1; | ||
NOTICE: continuous aggregate "multi_temperature_15m" is already up-to-date | ||
mkindahl marked this conversation as resolved.
Show resolved
Hide resolved
|
||
SET search_path TO _timescaledb_functions, public; | ||
-- These are not part of the API, but we test them here just to make | ||
-- sure they work as expected. | ||
SELECT table_name, get_materialization_info(table_name) | ||
FROM ( | ||
VALUES ('tstz_temperature_15m'), ('multi_temperature_15m') | ||
) t(table_name); | ||
table_name | get_materialization_info | ||
-----------------------+-------------------------- | ||
tstz_temperature_15m | (8,"@ 15 mins") | ||
multi_temperature_15m | (9,"@ 15 mins") | ||
(2 rows) | ||
|
||
\set ON_ERROR_STOP 0 | ||
SELECT get_materialization_info('hyper_no_cagg'); | ||
ERROR: "public.hyper_no_cagg" is not a continuous aggregate | ||
\set ON_ERROR_STOP 1 | ||
-- This is not part of the API either, but added a test here to make | ||
-- sure that it works as expected. | ||
SELECT materialization_id, | ||
to_timestamp(lowest_modified_value), | ||
to_timestamp(greatest_modified_value) | ||
FROM get_raw_materialization_ranges('timestamptz'); | ||
materialization_id | to_timestamp | to_timestamp | ||
--------------------+--------------+-------------- | ||
(0 rows) | ||
|
||
-- Here are tests of the API | ||
SELECT * | ||
INTO before | ||
FROM _timescaledb_catalog.continuous_aggs_materialization_invalidation_log; | ||
CALL _timescaledb_functions.add_materialization_invalidations( | ||
'tstz_temperature_15m'::regclass, | ||
mkindahl marked this conversation as resolved.
Show resolved
Hide resolved
|
||
'["2025-04-25 11:10:00+02","2025-04-26 11:14:00+02"]'::tstzrange | ||
); | ||
CALL _timescaledb_functions.add_materialization_invalidations( | ||
'ts_temperature_15m'::regclass, | ||
'["2025-04-25 11:10:00+02","2025-04-26 11:14:00+02"]'::tsrange | ||
); | ||
-- Custom refresh function that iterate over the ranges inside the | ||
-- restriction and refresh them. This is to check that the refresh | ||
-- function does the right thing with the ranges returned by the API | ||
-- function. | ||
CREATE PROCEDURE custom_refresh(cagg REGCLASS, restriction ANYRANGE) AS | ||
$body$ | ||
DECLARE | ||
inval restriction%TYPE; | ||
BEGIN | ||
FOR inval IN | ||
SELECT UNNEST(invalidations) | ||
FROM _timescaledb_functions.get_materialization_invalidations(cagg, restriction) | ||
LOOP | ||
RAISE NOTICE 'Updating range %', inval; | ||
CALL refresh_continuous_aggregate(cagg, lower(inval), upper(inval)); | ||
COMMIT; | ||
END LOOP; | ||
END | ||
$body$ | ||
LANGUAGE plpgsql; | ||
SELECT continuous_aggregate, | ||
to_timestamp(lhs.lowest_modified_value), | ||
to_timestamp(lhs.greatest_modified_value) | ||
FROM _timescaledb_catalog.continuous_aggs_materialization_invalidation_log lhs | ||
JOIN continuous_aggregates USING (materialization_id) | ||
LEFT JOIN before rhs ON row(lhs.*) = row(rhs.*) | ||
WHERE lhs.materialization_id IS NULL OR rhs.materialization_id IS NULL; | ||
continuous_aggregate | to_timestamp | to_timestamp | ||
----------------------+------------------------+------------------------ | ||
ts_temperature_15m | 2025-04-25 11:00:00+02 | 2025-04-26 11:15:00+02 | ||
tstz_temperature_15m | 2025-04-25 11:00:00+02 | 2025-04-26 11:15:00+02 | ||
(2 rows) | ||
|
||
SELECT materialization_id, | ||
to_timestamp(lowest_modified_value), | ||
to_timestamp(greatest_modified_value) | ||
FROM get_raw_materialization_ranges('timestamptz'); | ||
materialization_id | to_timestamp | to_timestamp | ||
--------------------+------------------------+------------------------ | ||
6 | 2025-04-25 11:00:00+02 | 2025-04-26 11:15:00+02 | ||
8 | 2025-04-25 11:00:00+02 | 2025-04-26 11:15:00+02 | ||
(2 rows) | ||
|
||
SELECT * FROM _timescaledb_functions.get_materialization_invalidations( | ||
'ts_temperature_15m'::regclass, | ||
'["2025-04-25","2025-04-26"]'::tsrange | ||
); | ||
invalidations | ||
------------------------------------------------- | ||
{["2025-04-25 09:00:00","2025-04-26 00:00:00")} | ||
(1 row) | ||
|
||
CALL custom_refresh('ts_temperature_15m', '["2025-04-25","2025-04-26"]'::tsrange); | ||
NOTICE: Updating range ["2025-04-25 09:00:00","2025-04-26 00:00:00") | ||
SELECT * FROM _timescaledb_functions.get_materialization_invalidations( | ||
'ts_temperature_15m'::regclass, | ||
'["2025-04-25","2025-04-26"]'::tsrange | ||
); | ||
invalidations | ||
-------------------------------------------------------- | ||
{["2025-04-25 00:00:00","2025-04-25 08:59:59.999999")} | ||
(1 row) | ||
|
||
SELECT * FROM _timescaledb_functions.get_materialization_invalidations( | ||
'tstz_temperature_15m'::regclass, | ||
'["2025-04-25","2025-04-26"]'::tstzrange | ||
); | ||
invalidations | ||
------------------------------------------------------- | ||
{["2025-04-25 11:00:00+02","2025-04-26 00:00:00+02")} | ||
(1 row) | ||
|
||
CALL custom_refresh('tstz_temperature_15m', '["2025-04-25","2025-04-26"]'::tsrange); | ||
NOTICE: Updating range ["2025-04-25 09:00:00","2025-04-26 00:00:00") | ||
SELECT * FROM _timescaledb_functions.get_materialization_invalidations( | ||
'tstz_temperature_15m'::regclass, | ||
'["2025-04-25","2025-04-26"]'::tstzrange | ||
); | ||
invalidations | ||
-------------------------------------------------------------- | ||
{["2025-04-25 00:00:00+02","2025-04-25 10:59:59.999999+02")} | ||
(1 row) | ||
|
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.