Skip to content

Add recompression spin lock #8018

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
May 6, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .unreleased/pr_8018
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Implements: #8018 Add spin-lock during recompression on unique constraints
133 changes: 95 additions & 38 deletions tsl/src/compression/recompress.c
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
*/

#include <postgres.h>
#include "debug_point.h"
#include <parser/parse_coerce.h>
#include <parser/parse_relation.h>
#include <utils/inval.h>
Expand All @@ -28,6 +29,14 @@
#include "ts_catalog/chunk_column_stats.h"
#include "ts_catalog/compression_settings.h"

/*
* Timing parameters for spin locking heuristics.
* These are the same as used by Postgres for truncate locking during lazy vacuum.
* https://github.com/postgres/postgres/blob/4a0650d359c5981270039eeb634c3b7427aa0af5/src/backend/access/heap/vacuumlazy.c#L82
*/
#define RECOMPRESS_EXCLUSIVE_LOCK_WAIT_INTERVAL 50 /* ms */
#define RECOMPRESS_EXCLUSIVE_LOCK_TIMEOUT 5000 /* ms */

static bool fetch_uncompressed_chunk_into_tuplesort(Tuplesortstate *tuplesortstate,
Relation uncompressed_chunk_rel,
Snapshot snapshot);
Expand All @@ -50,6 +59,8 @@ static bool check_changed_group(CompressedSegmentInfo *current_segment, TupleTab
int nsegmentby_cols);
static void recompress_segment(Tuplesortstate *tuplesortstate, Relation compressed_chunk_rel,
RowCompressor *row_compressor);
static void try_updating_chunk_status(Chunk *uncompressed_chunk, Relation uncompressed_chunk_rel);

/*
* Recompress an existing chunk by decompressing the batches
* that are affected by the addition of newer data. The existing
Expand Down Expand Up @@ -533,38 +544,7 @@ recompress_chunk_segmentwise_impl(Chunk *uncompressed_chunk)
*/
if (ConditionalLockRelation(uncompressed_chunk_rel, ExclusiveLock))
{
TableScanDesc scan = table_beginscan(uncompressed_chunk_rel, GetLatestSnapshot(), 0, 0);
hypercore_scan_set_skip_compressed(scan, true);
ScanDirection scan_dir = uncompressed_chunk_rel->rd_tableam == hypercore_routine() ?
ForwardScanDirection :
BackwardScanDirection;
TupleTableSlot *slot = table_slot_create(uncompressed_chunk_rel, NULL);

/* Doing a backwards scan with assumption that newly inserted tuples
* are most likely at the end of the heap.
*/
bool has_tuples = false;
if (table_scan_getnextslot(scan, scan_dir, slot))
{
has_tuples = true;
}

ExecDropSingleTupleTableSlot(slot);
table_endscan(scan);

if (!has_tuples)
{
if (ts_chunk_clear_status(uncompressed_chunk,
CHUNK_STATUS_COMPRESSED_UNORDERED |
CHUNK_STATUS_COMPRESSED_PARTIAL))
ereport(DEBUG1,
(errmsg("cleared chunk status for recompression: \"%s.%s\"",
NameStr(uncompressed_chunk->fd.schema_name),
NameStr(uncompressed_chunk->fd.table_name))));

/* changed chunk status, so invalidate any plans involving this chunk */
CacheInvalidateRelcacheByRelid(uncompressed_chunk_id);
}
try_updating_chunk_status(uncompressed_chunk, uncompressed_chunk_rel);
}
else if (has_unique_constraints)
{
Expand All @@ -575,13 +555,46 @@ recompress_chunk_segmentwise_impl(Chunk *uncompressed_chunk)
* and speculative insertion could potentially cause false negatives during
* constraint checking. For now, our best option here is to bail.
*
* This can be improved by using a spin lock to wait for the ExclusiveLock
* or bail out if we can't get it in time.
* We use a spin lock to wait for the ExclusiveLock or bail out if we can't get it in time.
*/
ereport(ERROR,
(errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
errmsg("aborting recompression due to concurrent DML on uncompressed "
"data, retrying with next policy run")));

int lock_retry = 0;
while (true)
{
if (ConditionalLockRelation(uncompressed_chunk_rel, ExclusiveLock))
{
try_updating_chunk_status(uncompressed_chunk, uncompressed_chunk_rel);
break;
}
Comment on lines +564 to +568
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wondering if this should be after the waitlatch since you will enter in this branch after failure on the first ConditionalLockRelation. Does it make sense?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did it this way because I wanted to keep the structure consistent with how Postgres implements a similar spin lock.


/*
* Check for interrupts while trying to (re-)acquire the exclusive
* lock.
*/
CHECK_FOR_INTERRUPTS();

if (++lock_retry >
(RECOMPRESS_EXCLUSIVE_LOCK_TIMEOUT / RECOMPRESS_EXCLUSIVE_LOCK_WAIT_INTERVAL))
{
/*
* We failed to establish the lock in the specified number of
* retries. This means we give up trying to get the exclusive lock are abort the
* recompression operation
*/
ereport(ERROR,
(errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
errmsg("aborting recompression due to concurrent DML on uncompressed "
"data, retrying with next policy run")));
break;
}

(void) WaitLatch(MyLatch,
WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
RECOMPRESS_EXCLUSIVE_LOCK_WAIT_INTERVAL,
WAIT_EVENT_VACUUM_TRUNCATE);
ResetLatch(MyLatch);
DEBUG_WAITPOINT("chunk_recompress_after_latch");
}
}

table_close(uncompressed_chunk_rel, NoLock);
Expand Down Expand Up @@ -866,3 +879,47 @@ delete_tuple_for_recompression(Relation rel, ItemPointer tid, Snapshot snapshot)

return result == TM_Ok;
}

/* Check if we can update the chunk status to fully compressed after segmentwise recompression
* We can only do this if there were no concurrent DML operations, so we check to see if there are
* any uncompressed tuples in the chunk after compression.
* If there aren't, we can update the chunk status
*
* Note: Caller is expected to have an ExclusiveLock on the uncompressed_chunk
*/
static void
try_updating_chunk_status(Chunk *uncompressed_chunk, Relation uncompressed_chunk_rel)
{
TableScanDesc scan = table_beginscan(uncompressed_chunk_rel, GetLatestSnapshot(), 0, 0);
hypercore_scan_set_skip_compressed(scan, true);
ScanDirection scan_dir = uncompressed_chunk_rel->rd_tableam == hypercore_routine() ?
ForwardScanDirection :
BackwardScanDirection;
TupleTableSlot *slot = table_slot_create(uncompressed_chunk_rel, NULL);

/* Doing a backwards scan with assumption that newly inserted tuples
* are most likely at the end of the heap.
*/
bool has_tuples = false;
if (table_scan_getnextslot(scan, scan_dir, slot))
{
has_tuples = true;
}

ExecDropSingleTupleTableSlot(slot);
table_endscan(scan);

if (!has_tuples)
{
if (ts_chunk_clear_status(uncompressed_chunk,
CHUNK_STATUS_COMPRESSED_UNORDERED |
CHUNK_STATUS_COMPRESSED_PARTIAL))
ereport(DEBUG1,
(errmsg("cleared chunk status for recompression: \"%s.%s\"",
NameStr(uncompressed_chunk->fd.schema_name),
NameStr(uncompressed_chunk->fd.table_name))));

/* changed chunk status, so invalidate any plans involving this chunk */
CacheInvalidateRelcacheByRelid(uncompressed_chunk->table_id);
}
}
178 changes: 178 additions & 0 deletions tsl/test/t/005_recompression_spin_lock_test.pl
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
# This file and its contents are licensed under the Timescale License.
# Please see the included NOTICE for copyright information and
# LICENSE-TIMESCALE for a copy of the license.

# This TAP test verifies the behavior when recompression is attempted on a chunk
# with unique constraints while concurrent DML is happening

use strict;
use warnings;
use TimescaleNode;
use Data::Dumper;
use Test::More;

# Test setup
my $node = TimescaleNode->create('node');

# Create table with a unique constraint
my $result = $node->safe_psql(
'postgres', q{
CREATE TABLE sensor_data (
time timestamptz not null,
sensor_id integer not null,
cpu double precision null,
temperature double precision null,
UNIQUE(time, sensor_id)
);
}
);
is($result, '', 'create table with unique constraint');

# Create hypertable
$result = $node->safe_psql(
'postgres', q{
SELECT table_name FROM create_hypertable('sensor_data','time', chunk_time_interval => INTERVAL '1 month');
}
);
is($result, 'sensor_data', 'create hypertable');

# Insert data
$result = $node->safe_psql(
'postgres', q{
INSERT INTO sensor_data
SELECT
time + (INTERVAL '1 minute' * random()) AS time,
sensor_id,
random() AS cpu,
random()* 100 AS temperature
FROM
generate_series('2022-01-01', '2022-01-07', INTERVAL '1 hour') AS g1(time),
generate_series(1, 50, 1) AS g2(sensor_id)
ORDER BY time;
}
);
is($result, '', 'insert data');

# Define count query
my $count_query = "SELECT count(*) FROM sensor_data;";

# Count inserted rows
my $num_rows = $node->safe_psql('postgres', $count_query);
is($num_rows, 7250, 'validate inserted rows');

# Enable compression
$result = $node->safe_psql(
'postgres', q{
ALTER TABLE sensor_data SET (
timescaledb.compress,
timescaledb.compress_segmentby = 'sensor_id',
timescaledb.compress_orderby = 'time'
);
}
);
is($result, '', 'enable compression');

# Compress the chunk
my $compress_query =
"SELECT count(*) FROM (SELECT compress_chunk(show_chunks('sensor_data')));";
$result = $node->safe_psql('postgres', $compress_query);
is($result, '1', 'compress chunk');

# Insert more data to make the chunk partial
$result = $node->safe_psql(
'postgres', q{
INSERT INTO sensor_data
SELECT
time + (INTERVAL '1 minute' * random()) AS time,
sensor_id,
random() AS cpu,
random()* 100 AS temperature
FROM
generate_series('2022-01-01', '2022-01--7', INTERVAL '1 hour') AS g1(time),
generate_series(51, 55, 1) AS g2(sensor_id)
ORDER BY time;
}
);
is($result, '', 'insert more data to make the chunk partial');

# Create psql sessions
my $s1 = $node->background_psql('postgres');
my $s2 = $node->background_psql('postgres');
my $s3 = $node->background_psql('postgres');

# Enable segmentwise recompression
$s1->query_safe("SET timescaledb.enable_segmentwise_recompression TO on;");

# Enable waiting to acquire an exclusive lock
$s1->query_safe("SET timescaledb.enable_recompress_waiting TO on;");

# TEST 1:
# Session 1 tries to acquire an exclusive lock at the end of recompression but is blocked due to the inserts by Session 2
# We use session 3 to set up debug waitpoints
# Begin txns in all sessions

$s1->query_safe("BEGIN;");
$s2->query_safe("BEGIN;");
$s3->query_safe("BEGIN;");

# We enable the debug_waitpoint after the latch and release it after s2 aborts
# This allows s1 to successfully acquire the lock the second time around

$s3->query_safe(
"SELECT debug_waitpoint_enable('chunk_recompress_after_latch');");


# Get lock data
$result = $node->safe_psql('postgres',
"SELECT relation::regclass::text, mode FROM pg_locks WHERE relation::regclass::text LIKE '%hyper_1_%chunk' and granted;"
);
is($result, '', "verify no locks exist on the chunk");

# Session 2: Insert rows into the chunk
# This blocks until session 2 releases its locks
$s2->query_until(
'', q{
INSERT INTO sensor_data VALUES
('2022-01-05 12:00:00', 100, 0.5, 25.5),
('2022-01-05 13:00:00', 101, 0.6, 26.5);
});

# We have to use 'query_until('', ...)' so that the test immediately fires the next query
$s1->query_until(
'', q{
SELECT compress_chunk(show_chunks('sensor_data'));
});

# Session 2 immediately aborts, releasing the RowExclusiveLock on the table
$s2->query_safe("ABORT");

# Release the debug waitpoint so that recompression succeeds
$s3->query_safe(
"SELECT debug_waitpoint_release('chunk_recompress_after_latch');");

# Verify ExclusiveLock on uncompressed chunk
$result = $node->safe_psql('postgres',
"SELECT relation::regclass::text FROM pg_locks WHERE relation::regclass::text LIKE '%hyper_1_%chunk' AND granted AND mode = 'ExclusiveLock';"
);
is( $result,
'_timescaledb_internal._hyper_1_1_chunk',
"verify ExclusiveLock on uncompressed chunk");

# Verify AccessShareLock on compressed chunk
$result = $node->safe_psql('postgres',
"SELECT relation::regclass::text FROM pg_locks WHERE relation::regclass::text LIKE '%hyper_1_%chunk' AND granted AND mode = 'AccessShareLock'"
);
is( $result,
"_timescaledb_internal._hyper_1_1_chunk",
"verify AccessShareLock on internal compressed chunk");

# Clean up
$s1->query_safe("ROLLBACK;");

$s3->query_safe("ROLLBACK;");
$s3->quit();

$s2->quit();
$s1->quit();

done_testing();
1 change: 1 addition & 0 deletions tsl/test/t/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ set(PROVE_DEBUG_TEST_FILES 003_mvcc_cagg.pl)

if((${PG_VERSION_MAJOR} GREATER_EQUAL "16"))
list(APPEND PROVE_TEST_FILES 004_truncate_or_delete_spin_lock.pl)
list(APPEND PROVE_DEBUG_TEST_FILES 005_recompression_spin_lock_test.pl)
endif()

if(CMAKE_BUILD_TYPE MATCHES Debug)
Expand Down
Loading