diff --git a/.unreleased/pr_8018 b/.unreleased/pr_8018 new file mode 100644 index 00000000000..791b1513a01 --- /dev/null +++ b/.unreleased/pr_8018 @@ -0,0 +1 @@ +Implements: #8018 Add spin-lock during recompression on unique constraints diff --git a/tsl/src/compression/recompress.c b/tsl/src/compression/recompress.c index bf98127e81a..1a719bd3756 100644 --- a/tsl/src/compression/recompress.c +++ b/tsl/src/compression/recompress.c @@ -5,6 +5,7 @@ */ #include +#include "debug_point.h" #include #include #include @@ -28,6 +29,14 @@ #include "ts_catalog/chunk_column_stats.h" #include "ts_catalog/compression_settings.h" +/* + * Timing parameters for spin locking heuristics. + * These are the same as used by Postgres for truncate locking during lazy vacuum. + * https://github.com/postgres/postgres/blob/4a0650d359c5981270039eeb634c3b7427aa0af5/src/backend/access/heap/vacuumlazy.c#L82 + */ +#define RECOMPRESS_EXCLUSIVE_LOCK_WAIT_INTERVAL 50 /* ms */ +#define RECOMPRESS_EXCLUSIVE_LOCK_TIMEOUT 5000 /* ms */ + static bool fetch_uncompressed_chunk_into_tuplesort(Tuplesortstate *tuplesortstate, Relation uncompressed_chunk_rel, Snapshot snapshot); @@ -50,6 +59,8 @@ static bool check_changed_group(CompressedSegmentInfo *current_segment, TupleTab int nsegmentby_cols); static void recompress_segment(Tuplesortstate *tuplesortstate, Relation compressed_chunk_rel, RowCompressor *row_compressor); +static void try_updating_chunk_status(Chunk *uncompressed_chunk, Relation uncompressed_chunk_rel); + /* * Recompress an existing chunk by decompressing the batches * that are affected by the addition of newer data. The existing @@ -533,38 +544,7 @@ recompress_chunk_segmentwise_impl(Chunk *uncompressed_chunk) */ if (ConditionalLockRelation(uncompressed_chunk_rel, ExclusiveLock)) { - TableScanDesc scan = table_beginscan(uncompressed_chunk_rel, GetLatestSnapshot(), 0, 0); - hypercore_scan_set_skip_compressed(scan, true); - ScanDirection scan_dir = uncompressed_chunk_rel->rd_tableam == hypercore_routine() ? - ForwardScanDirection : - BackwardScanDirection; - TupleTableSlot *slot = table_slot_create(uncompressed_chunk_rel, NULL); - - /* Doing a backwards scan with assumption that newly inserted tuples - * are most likely at the end of the heap. - */ - bool has_tuples = false; - if (table_scan_getnextslot(scan, scan_dir, slot)) - { - has_tuples = true; - } - - ExecDropSingleTupleTableSlot(slot); - table_endscan(scan); - - if (!has_tuples) - { - if (ts_chunk_clear_status(uncompressed_chunk, - CHUNK_STATUS_COMPRESSED_UNORDERED | - CHUNK_STATUS_COMPRESSED_PARTIAL)) - ereport(DEBUG1, - (errmsg("cleared chunk status for recompression: \"%s.%s\"", - NameStr(uncompressed_chunk->fd.schema_name), - NameStr(uncompressed_chunk->fd.table_name)))); - - /* changed chunk status, so invalidate any plans involving this chunk */ - CacheInvalidateRelcacheByRelid(uncompressed_chunk_id); - } + try_updating_chunk_status(uncompressed_chunk, uncompressed_chunk_rel); } else if (has_unique_constraints) { @@ -575,13 +555,46 @@ recompress_chunk_segmentwise_impl(Chunk *uncompressed_chunk) * and speculative insertion could potentially cause false negatives during * constraint checking. For now, our best option here is to bail. * - * This can be improved by using a spin lock to wait for the ExclusiveLock - * or bail out if we can't get it in time. + * We use a spin lock to wait for the ExclusiveLock or bail out if we can't get it in time. */ - ereport(ERROR, - (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE), - errmsg("aborting recompression due to concurrent DML on uncompressed " - "data, retrying with next policy run"))); + + int lock_retry = 0; + while (true) + { + if (ConditionalLockRelation(uncompressed_chunk_rel, ExclusiveLock)) + { + try_updating_chunk_status(uncompressed_chunk, uncompressed_chunk_rel); + break; + } + + /* + * Check for interrupts while trying to (re-)acquire the exclusive + * lock. + */ + CHECK_FOR_INTERRUPTS(); + + if (++lock_retry > + (RECOMPRESS_EXCLUSIVE_LOCK_TIMEOUT / RECOMPRESS_EXCLUSIVE_LOCK_WAIT_INTERVAL)) + { + /* + * We failed to establish the lock in the specified number of + * retries. This means we give up trying to get the exclusive lock are abort the + * recompression operation + */ + ereport(ERROR, + (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE), + errmsg("aborting recompression due to concurrent DML on uncompressed " + "data, retrying with next policy run"))); + break; + } + + (void) WaitLatch(MyLatch, + WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH, + RECOMPRESS_EXCLUSIVE_LOCK_WAIT_INTERVAL, + WAIT_EVENT_VACUUM_TRUNCATE); + ResetLatch(MyLatch); + DEBUG_WAITPOINT("chunk_recompress_after_latch"); + } } table_close(uncompressed_chunk_rel, NoLock); @@ -866,3 +879,47 @@ delete_tuple_for_recompression(Relation rel, ItemPointer tid, Snapshot snapshot) return result == TM_Ok; } + +/* Check if we can update the chunk status to fully compressed after segmentwise recompression + * We can only do this if there were no concurrent DML operations, so we check to see if there are + * any uncompressed tuples in the chunk after compression. + * If there aren't, we can update the chunk status + * + * Note: Caller is expected to have an ExclusiveLock on the uncompressed_chunk + */ +static void +try_updating_chunk_status(Chunk *uncompressed_chunk, Relation uncompressed_chunk_rel) +{ + TableScanDesc scan = table_beginscan(uncompressed_chunk_rel, GetLatestSnapshot(), 0, 0); + hypercore_scan_set_skip_compressed(scan, true); + ScanDirection scan_dir = uncompressed_chunk_rel->rd_tableam == hypercore_routine() ? + ForwardScanDirection : + BackwardScanDirection; + TupleTableSlot *slot = table_slot_create(uncompressed_chunk_rel, NULL); + + /* Doing a backwards scan with assumption that newly inserted tuples + * are most likely at the end of the heap. + */ + bool has_tuples = false; + if (table_scan_getnextslot(scan, scan_dir, slot)) + { + has_tuples = true; + } + + ExecDropSingleTupleTableSlot(slot); + table_endscan(scan); + + if (!has_tuples) + { + if (ts_chunk_clear_status(uncompressed_chunk, + CHUNK_STATUS_COMPRESSED_UNORDERED | + CHUNK_STATUS_COMPRESSED_PARTIAL)) + ereport(DEBUG1, + (errmsg("cleared chunk status for recompression: \"%s.%s\"", + NameStr(uncompressed_chunk->fd.schema_name), + NameStr(uncompressed_chunk->fd.table_name)))); + + /* changed chunk status, so invalidate any plans involving this chunk */ + CacheInvalidateRelcacheByRelid(uncompressed_chunk->table_id); + } +} diff --git a/tsl/test/t/005_recompression_spin_lock_test.pl b/tsl/test/t/005_recompression_spin_lock_test.pl new file mode 100644 index 00000000000..fbab1372ada --- /dev/null +++ b/tsl/test/t/005_recompression_spin_lock_test.pl @@ -0,0 +1,178 @@ +# This file and its contents are licensed under the Timescale License. +# Please see the included NOTICE for copyright information and +# LICENSE-TIMESCALE for a copy of the license. + +# This TAP test verifies the behavior when recompression is attempted on a chunk +# with unique constraints while concurrent DML is happening + +use strict; +use warnings; +use TimescaleNode; +use Data::Dumper; +use Test::More; + +# Test setup +my $node = TimescaleNode->create('node'); + +# Create table with a unique constraint +my $result = $node->safe_psql( + 'postgres', q{ + CREATE TABLE sensor_data ( + time timestamptz not null, + sensor_id integer not null, + cpu double precision null, + temperature double precision null, + UNIQUE(time, sensor_id) + ); + } +); +is($result, '', 'create table with unique constraint'); + +# Create hypertable +$result = $node->safe_psql( + 'postgres', q{ + SELECT table_name FROM create_hypertable('sensor_data','time', chunk_time_interval => INTERVAL '1 month'); + } +); +is($result, 'sensor_data', 'create hypertable'); + +# Insert data +$result = $node->safe_psql( + 'postgres', q{ + INSERT INTO sensor_data + SELECT + time + (INTERVAL '1 minute' * random()) AS time, + sensor_id, + random() AS cpu, + random()* 100 AS temperature + FROM + generate_series('2022-01-01', '2022-01-07', INTERVAL '1 hour') AS g1(time), + generate_series(1, 50, 1) AS g2(sensor_id) + ORDER BY time; + } +); +is($result, '', 'insert data'); + +# Define count query +my $count_query = "SELECT count(*) FROM sensor_data;"; + +# Count inserted rows +my $num_rows = $node->safe_psql('postgres', $count_query); +is($num_rows, 7250, 'validate inserted rows'); + +# Enable compression +$result = $node->safe_psql( + 'postgres', q{ + ALTER TABLE sensor_data SET ( + timescaledb.compress, + timescaledb.compress_segmentby = 'sensor_id', + timescaledb.compress_orderby = 'time' + ); + } +); +is($result, '', 'enable compression'); + +# Compress the chunk +my $compress_query = + "SELECT count(*) FROM (SELECT compress_chunk(show_chunks('sensor_data')));"; +$result = $node->safe_psql('postgres', $compress_query); +is($result, '1', 'compress chunk'); + +# Insert more data to make the chunk partial +$result = $node->safe_psql( + 'postgres', q{ + INSERT INTO sensor_data + SELECT + time + (INTERVAL '1 minute' * random()) AS time, + sensor_id, + random() AS cpu, + random()* 100 AS temperature + FROM +generate_series('2022-01-01', '2022-01--7', INTERVAL '1 hour') AS g1(time), + generate_series(51, 55, 1) AS g2(sensor_id) + ORDER BY time; + } +); +is($result, '', 'insert more data to make the chunk partial'); + +# Create psql sessions +my $s1 = $node->background_psql('postgres'); +my $s2 = $node->background_psql('postgres'); +my $s3 = $node->background_psql('postgres'); + +# Enable segmentwise recompression +$s1->query_safe("SET timescaledb.enable_segmentwise_recompression TO on;"); + +# Enable waiting to acquire an exclusive lock +$s1->query_safe("SET timescaledb.enable_recompress_waiting TO on;"); + +# TEST 1: +# Session 1 tries to acquire an exclusive lock at the end of recompression but is blocked due to the inserts by Session 2 +# We use session 3 to set up debug waitpoints +# Begin txns in all sessions + +$s1->query_safe("BEGIN;"); +$s2->query_safe("BEGIN;"); +$s3->query_safe("BEGIN;"); + +# We enable the debug_waitpoint after the latch and release it after s2 aborts +# This allows s1 to successfully acquire the lock the second time around + +$s3->query_safe( + "SELECT debug_waitpoint_enable('chunk_recompress_after_latch');"); + + +# Get lock data +$result = $node->safe_psql('postgres', + "SELECT relation::regclass::text, mode FROM pg_locks WHERE relation::regclass::text LIKE '%hyper_1_%chunk' and granted;" +); +is($result, '', "verify no locks exist on the chunk"); + +# Session 2: Insert rows into the chunk +# This blocks until session 2 releases its locks +$s2->query_until( + '', q{ + INSERT INTO sensor_data VALUES + ('2022-01-05 12:00:00', 100, 0.5, 25.5), + ('2022-01-05 13:00:00', 101, 0.6, 26.5); +}); + +# We have to use 'query_until('', ...)' so that the test immediately fires the next query +$s1->query_until( + '', q{ + SELECT compress_chunk(show_chunks('sensor_data')); +}); + +# Session 2 immediately aborts, releasing the RowExclusiveLock on the table +$s2->query_safe("ABORT"); + +# Release the debug waitpoint so that recompression succeeds +$s3->query_safe( + "SELECT debug_waitpoint_release('chunk_recompress_after_latch');"); + +# Verify ExclusiveLock on uncompressed chunk +$result = $node->safe_psql('postgres', + "SELECT relation::regclass::text FROM pg_locks WHERE relation::regclass::text LIKE '%hyper_1_%chunk' AND granted AND mode = 'ExclusiveLock';" +); +is( $result, + '_timescaledb_internal._hyper_1_1_chunk', + "verify ExclusiveLock on uncompressed chunk"); + +# Verify AccessShareLock on compressed chunk +$result = $node->safe_psql('postgres', + "SELECT relation::regclass::text FROM pg_locks WHERE relation::regclass::text LIKE '%hyper_1_%chunk' AND granted AND mode = 'AccessShareLock'" +); +is( $result, + "_timescaledb_internal._hyper_1_1_chunk", + "verify AccessShareLock on internal compressed chunk"); + +# Clean up +$s1->query_safe("ROLLBACK;"); + +$s3->query_safe("ROLLBACK;"); +$s3->quit(); + +$s2->quit(); +$s1->quit(); + +done_testing(); diff --git a/tsl/test/t/CMakeLists.txt b/tsl/test/t/CMakeLists.txt index 0dff888c25e..f0a80f8ed35 100644 --- a/tsl/test/t/CMakeLists.txt +++ b/tsl/test/t/CMakeLists.txt @@ -8,6 +8,7 @@ set(PROVE_DEBUG_TEST_FILES 003_mvcc_cagg.pl) if((${PG_VERSION_MAJOR} GREATER_EQUAL "16")) list(APPEND PROVE_TEST_FILES 004_truncate_or_delete_spin_lock.pl) + list(APPEND PROVE_DEBUG_TEST_FILES 005_recompression_spin_lock_test.pl) endif() if(CMAKE_BUILD_TYPE MATCHES Debug)