Skip to content

Add recompression spin lock #8018

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
May 6, 2025
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .unreleased/pr_8018
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Implements: #8018 Add spin-lock during recompression on unique constraints
131 changes: 93 additions & 38 deletions tsl/src/compression/recompress.c
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,14 @@
#include "ts_catalog/chunk_column_stats.h"
#include "ts_catalog/compression_settings.h"

/*
* Timing parameters for spin locking heuristics.
* These are the same as used by Postgres for truncate locking during lazy vacuum.
* https://github.com/postgres/postgres/blob/4a0650d359c5981270039eeb634c3b7427aa0af5/src/backend/access/heap/vacuumlazy.c#L82
*/
#define RECOMPRESS_EXCLUSIVE_LOCK_WAIT_INTERVAL 50 /* ms */
#define RECOMPRESS_EXCLUSIVE_LOCK_TIMEOUT 5000 /* ms */

static bool fetch_uncompressed_chunk_into_tuplesort(Tuplesortstate *tuplesortstate,
Relation uncompressed_chunk_rel,
Snapshot snapshot);
Expand All @@ -50,6 +58,8 @@
int nsegmentby_cols);
static void recompress_segment(Tuplesortstate *tuplesortstate, Relation compressed_chunk_rel,
RowCompressor *row_compressor);
static void try_updating_chunk_status(Chunk *uncompressed_chunk, Relation uncompressed_chunk_rel);

/*
* Recompress an existing chunk by decompressing the batches
* that are affected by the addition of newer data. The existing
Expand Down Expand Up @@ -533,38 +543,7 @@
*/
if (ConditionalLockRelation(uncompressed_chunk_rel, ExclusiveLock))
{
TableScanDesc scan = table_beginscan(uncompressed_chunk_rel, GetLatestSnapshot(), 0, 0);
hypercore_scan_set_skip_compressed(scan, true);
ScanDirection scan_dir = uncompressed_chunk_rel->rd_tableam == hypercore_routine() ?
ForwardScanDirection :
BackwardScanDirection;
TupleTableSlot *slot = table_slot_create(uncompressed_chunk_rel, NULL);

/* Doing a backwards scan with assumption that newly inserted tuples
* are most likely at the end of the heap.
*/
bool has_tuples = false;
if (table_scan_getnextslot(scan, scan_dir, slot))
{
has_tuples = true;
}

ExecDropSingleTupleTableSlot(slot);
table_endscan(scan);

if (!has_tuples)
{
if (ts_chunk_clear_status(uncompressed_chunk,
CHUNK_STATUS_COMPRESSED_UNORDERED |
CHUNK_STATUS_COMPRESSED_PARTIAL))
ereport(DEBUG1,
(errmsg("cleared chunk status for recompression: \"%s.%s\"",
NameStr(uncompressed_chunk->fd.schema_name),
NameStr(uncompressed_chunk->fd.table_name))));

/* changed chunk status, so invalidate any plans involving this chunk */
CacheInvalidateRelcacheByRelid(uncompressed_chunk_id);
}
try_updating_chunk_status(uncompressed_chunk, uncompressed_chunk_rel);
}
else if (has_unique_constraints)
{
Expand All @@ -575,13 +554,45 @@
* and speculative insertion could potentially cause false negatives during
* constraint checking. For now, our best option here is to bail.
*
* This can be improved by using a spin lock to wait for the ExclusiveLock
* or bail out if we can't get it in time.
* We use a spin lock to wait for the ExclusiveLock or bail out if we can't get it in time.
*/
ereport(ERROR,
(errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
errmsg("aborting recompression due to concurrent DML on uncompressed "
"data, retrying with next policy run")));

int lock_retry = 0;
while (true)
{
if (ConditionalLockRelation(uncompressed_chunk_rel, ExclusiveLock))
{
try_updating_chunk_status(uncompressed_chunk, uncompressed_chunk_rel);
break;

Check warning on line 566 in tsl/src/compression/recompress.c

View check run for this annotation

Codecov / codecov/patch

tsl/src/compression/recompress.c#L565-L566

Added lines #L565 - L566 were not covered by tests
}
Comment on lines +564 to +568
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wondering if this should be after the waitlatch since you will enter in this branch after failure on the first ConditionalLockRelation. Does it make sense?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did it this way because I wanted to keep the structure consistent with how Postgres implements a similar spin lock.


/*
* Check for interrupts while trying to (re-)acquire the exclusive
* lock.
*/
CHECK_FOR_INTERRUPTS();

if (++lock_retry >
(RECOMPRESS_EXCLUSIVE_LOCK_TIMEOUT / RECOMPRESS_EXCLUSIVE_LOCK_WAIT_INTERVAL))
{
/*
* We failed to establish the lock in the specified number of
* retries. This means we give up trying to get the exclusive lock are abort the
* recompression operation
*/
ereport(ERROR,
(errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
errmsg("aborting recompression due to concurrent DML on uncompressed "
"data, retrying with next policy run")));
break;
}

(void) WaitLatch(MyLatch,
WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
RECOMPRESS_EXCLUSIVE_LOCK_WAIT_INTERVAL,
WAIT_EVENT_VACUUM_TRUNCATE);
ResetLatch(MyLatch);
}
}

table_close(uncompressed_chunk_rel, NoLock);
Expand Down Expand Up @@ -866,3 +877,47 @@

return result == TM_Ok;
}

/* Check if we can update the chunk status to fully compressed after segmentwise recompression
* We can only do this if there were no concurrent DML operations, so we check to see if there are
* any uncompressed tuples in the chunk after compression.
* If there aren't, we can update the chunk status
*
* Note: Caller is expected to have an ExclusiveLock on the uncompressed_chunk
*/
static void
try_updating_chunk_status(Chunk *uncompressed_chunk, Relation uncompressed_chunk_rel)
{
TableScanDesc scan = table_beginscan(uncompressed_chunk_rel, GetLatestSnapshot(), 0, 0);
hypercore_scan_set_skip_compressed(scan, true);
ScanDirection scan_dir = uncompressed_chunk_rel->rd_tableam == hypercore_routine() ?
ForwardScanDirection :
BackwardScanDirection;
TupleTableSlot *slot = table_slot_create(uncompressed_chunk_rel, NULL);

/* Doing a backwards scan with assumption that newly inserted tuples
* are most likely at the end of the heap.
*/
bool has_tuples = false;
if (table_scan_getnextslot(scan, scan_dir, slot))
{
has_tuples = true;
}

ExecDropSingleTupleTableSlot(slot);
table_endscan(scan);

if (!has_tuples)
{
if (ts_chunk_clear_status(uncompressed_chunk,
CHUNK_STATUS_COMPRESSED_UNORDERED |
CHUNK_STATUS_COMPRESSED_PARTIAL))
ereport(DEBUG1,
(errmsg("cleared chunk status for recompression: \"%s.%s\"",
NameStr(uncompressed_chunk->fd.schema_name),
NameStr(uncompressed_chunk->fd.table_name))));

/* changed chunk status, so invalidate any plans involving this chunk */
CacheInvalidateRelcacheByRelid(uncompressed_chunk->table_id);
}
}
164 changes: 164 additions & 0 deletions tsl/test/t/005_recompression_spin_lock_test.pl
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
# This file and its contents are licensed under the Timescale License.
# Please see the included NOTICE for copyright information and
# LICENSE-TIMESCALE for a copy of the license.

# This TAP test verifies the behavior when recompression is attempted on a chunk
# with unique constraints while concurrent DML is happening

use strict;
use warnings;
use TimescaleNode;
use Data::Dumper;
use Test::More;

# Test setup
my $node = TimescaleNode->create('node');

# Create table with a unique constraint
my $result = $node->safe_psql(
'postgres', q{
CREATE TABLE sensor_data (
time timestamptz not null,
sensor_id integer not null,
cpu double precision null,
temperature double precision null,
UNIQUE(time, sensor_id)
);
}
);
is($result, '', 'create table with unique constraint');

# Create hypertable
$result = $node->safe_psql(
'postgres', q{
SELECT table_name FROM create_hypertable('sensor_data','time', chunk_time_interval => INTERVAL '1 month');
}
);
is($result, 'sensor_data', 'create hypertable');

# Insert data
$result = $node->safe_psql(
'postgres', q{
INSERT INTO sensor_data
SELECT
time + (INTERVAL '1 minute' * random()) AS time,
sensor_id,
random() AS cpu,
random()* 100 AS temperature
FROM
generate_series('2022-01-01', '2022-01-07', INTERVAL '1 hour') AS g1(time),
generate_series(1, 50, 1) AS g2(sensor_id)
ORDER BY time;
}
);
is($result, '', 'insert data');

# Define count query
my $count_query = "SELECT count(*) FROM sensor_data;";

# Count inserted rows
my $num_rows = $node->safe_psql('postgres', $count_query);
is($num_rows, 7250, 'validate inserted rows');

# Enable compression
$result = $node->safe_psql(
'postgres', q{
ALTER TABLE sensor_data SET (
timescaledb.compress,
timescaledb.compress_segmentby = 'sensor_id',
timescaledb.compress_orderby = 'time'
);
}
);
is($result, '', 'enable compression');

# Compress the chunk
my $compress_query =
"SELECT count(*) FROM (SELECT compress_chunk(show_chunks('sensor_data')));";
$result = $node->safe_psql('postgres', $compress_query);
is($result, '1', 'compress chunk');

# Insert more data to make the chunk partial
$result = $node->safe_psql(
'postgres', q{
INSERT INTO sensor_data
SELECT
time + (INTERVAL '1 minute' * random()) AS time,
sensor_id,
random() AS cpu,
random()* 100 AS temperature
FROM
generate_series('2022-01-01', '2022-01--7', INTERVAL '1 hour') AS g1(time),
generate_series(51, 55, 1) AS g2(sensor_id)
ORDER BY time;
}
);
is($result, '', 'insert more data to make the chunk partial');

# Create psql sessions
my $s1 = $node->background_psql('postgres');
my $s2 = $node->background_psql('postgres');

# Enable segmentwise recompression
$s1->query_safe("SET timescaledb.enable_segmentwise_recompression TO on;");

# Enable waiting to acquire an exclusive lock
$s1->query_safe("SET timescaledb.enable_recompress_waiting TO on;");

# TEST 1:
# Session 1 tries to acquire an exclusive lock at the end of recompression but is blocked due to the inserts by Session 2
# However
# Begin txns in both sessions

$result = $s1->query_safe("BEGIN;");

$result = $s2->query_safe("BEGIN;");

# Get lock data
$result = $node->safe_psql('postgres',
"SELECT relation::regclass::text, mode FROM pg_locks WHERE relation::regclass::text LIKE '%hyper_1_%chunk' and granted;"
);
is($result, '', "verify no locks exist on the chunk");

# Session 2: Insert rows into the chunk
# This blocks until session 2 releases its locks
$s2->query_until(
'', q{
INSERT INTO sensor_data VALUES
('2022-01-05 12:00:00', 100, 0.5, 25.5),
('2022-01-05 13:00:00', 101, 0.6, 26.5);
});

# We have to use 'query_until('', ...)' so that the test immediately fires the next query
# Otherwise s1 times out throws an error
$s1->query_until(
'', q{
SELECT compress_chunk(show_chunks('sensor_data'));
});

# Session 2 immediately aborts, releasing the RowExclusiveLock on the table
$result = $s2->query_safe("ABORT");

# Verify ExclusiveLock on uncompressed chunk
$result = $node->safe_psql('postgres',
"SELECT relation::regclass::text FROM pg_locks WHERE relation::regclass::text LIKE '%hyper_1_%chunk' AND granted AND mode = 'ExclusiveLock';"
);
is( $result,
'_timescaledb_internal._hyper_1_1_chunk',
"verify ExclusiveLock on uncompressed chunk");

# Verify AccessShareLock on compressed chunk
$result = $node->safe_psql('postgres',
"SELECT relation::regclass::text FROM pg_locks WHERE relation::regclass::text LIKE '%hyper_1_%chunk' AND granted AND mode = 'AccessShareLock'"
);
is( $result,
"_timescaledb_internal._hyper_1_1_chunk",
"verify AccessShareLock on internal compressed chunk");

# Clean up
$result = $s1->query_safe("ROLLBACK;");

$s2->quit();
$s1->quit();

done_testing();
3 changes: 2 additions & 1 deletion tsl/test/t/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ set(PROVE_DEBUG_TEST_FILES 003_mvcc_cagg.pl)
# versions.

if((${PG_VERSION_MAJOR} GREATER_EQUAL "16"))
list(APPEND PROVE_TEST_FILES 004_truncate_or_delete_spin_lock.pl)
list(APPEND PROVE_TEST_FILES 004_truncate_or_delete_spin_lock.pl
005_recompression_spin_lock_test.pl)
endif()

if(CMAKE_BUILD_TYPE MATCHES Debug)
Expand Down
Loading