Skip to content

Commit 0971d13

Browse files
authored
Add recompression spin lock (#8018)
Recompression tries to take an ExclusiveLock at the end of compression in order to update the chunk status from partial to fully compressed. This is mandatory when unique constraints are present in the table. Otherwise recompression is aborted. This adds a spin-lock during the ExclusiveLock only in the case of unique constraints being present on the table.
1 parent 0886bec commit 0971d13

File tree

4 files changed

+275
-38
lines changed

4 files changed

+275
-38
lines changed

.unreleased/pr_8018

+1
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Implements: #8018 Add spin-lock during recompression on unique constraints

tsl/src/compression/recompress.c

+95-38
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
*/
66

77
#include <postgres.h>
8+
#include "debug_point.h"
89
#include <parser/parse_coerce.h>
910
#include <parser/parse_relation.h>
1011
#include <utils/inval.h>
@@ -28,6 +29,14 @@
2829
#include "ts_catalog/chunk_column_stats.h"
2930
#include "ts_catalog/compression_settings.h"
3031

32+
/*
33+
* Timing parameters for spin locking heuristics.
34+
* These are the same as used by Postgres for truncate locking during lazy vacuum.
35+
* https://github.com/postgres/postgres/blob/4a0650d359c5981270039eeb634c3b7427aa0af5/src/backend/access/heap/vacuumlazy.c#L82
36+
*/
37+
#define RECOMPRESS_EXCLUSIVE_LOCK_WAIT_INTERVAL 50 /* ms */
38+
#define RECOMPRESS_EXCLUSIVE_LOCK_TIMEOUT 5000 /* ms */
39+
3140
static bool fetch_uncompressed_chunk_into_tuplesort(Tuplesortstate *tuplesortstate,
3241
Relation uncompressed_chunk_rel,
3342
Snapshot snapshot);
@@ -50,6 +59,8 @@ static bool check_changed_group(CompressedSegmentInfo *current_segment, TupleTab
5059
int nsegmentby_cols);
5160
static void recompress_segment(Tuplesortstate *tuplesortstate, Relation compressed_chunk_rel,
5261
RowCompressor *row_compressor);
62+
static void try_updating_chunk_status(Chunk *uncompressed_chunk, Relation uncompressed_chunk_rel);
63+
5364
/*
5465
* Recompress an existing chunk by decompressing the batches
5566
* that are affected by the addition of newer data. The existing
@@ -533,38 +544,7 @@ recompress_chunk_segmentwise_impl(Chunk *uncompressed_chunk)
533544
*/
534545
if (ConditionalLockRelation(uncompressed_chunk_rel, ExclusiveLock))
535546
{
536-
TableScanDesc scan = table_beginscan(uncompressed_chunk_rel, GetLatestSnapshot(), 0, 0);
537-
hypercore_scan_set_skip_compressed(scan, true);
538-
ScanDirection scan_dir = uncompressed_chunk_rel->rd_tableam == hypercore_routine() ?
539-
ForwardScanDirection :
540-
BackwardScanDirection;
541-
TupleTableSlot *slot = table_slot_create(uncompressed_chunk_rel, NULL);
542-
543-
/* Doing a backwards scan with assumption that newly inserted tuples
544-
* are most likely at the end of the heap.
545-
*/
546-
bool has_tuples = false;
547-
if (table_scan_getnextslot(scan, scan_dir, slot))
548-
{
549-
has_tuples = true;
550-
}
551-
552-
ExecDropSingleTupleTableSlot(slot);
553-
table_endscan(scan);
554-
555-
if (!has_tuples)
556-
{
557-
if (ts_chunk_clear_status(uncompressed_chunk,
558-
CHUNK_STATUS_COMPRESSED_UNORDERED |
559-
CHUNK_STATUS_COMPRESSED_PARTIAL))
560-
ereport(DEBUG1,
561-
(errmsg("cleared chunk status for recompression: \"%s.%s\"",
562-
NameStr(uncompressed_chunk->fd.schema_name),
563-
NameStr(uncompressed_chunk->fd.table_name))));
564-
565-
/* changed chunk status, so invalidate any plans involving this chunk */
566-
CacheInvalidateRelcacheByRelid(uncompressed_chunk_id);
567-
}
547+
try_updating_chunk_status(uncompressed_chunk, uncompressed_chunk_rel);
568548
}
569549
else if (has_unique_constraints)
570550
{
@@ -575,13 +555,46 @@ recompress_chunk_segmentwise_impl(Chunk *uncompressed_chunk)
575555
* and speculative insertion could potentially cause false negatives during
576556
* constraint checking. For now, our best option here is to bail.
577557
*
578-
* This can be improved by using a spin lock to wait for the ExclusiveLock
579-
* or bail out if we can't get it in time.
558+
* We use a spin lock to wait for the ExclusiveLock or bail out if we can't get it in time.
580559
*/
581-
ereport(ERROR,
582-
(errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
583-
errmsg("aborting recompression due to concurrent DML on uncompressed "
584-
"data, retrying with next policy run")));
560+
561+
int lock_retry = 0;
562+
while (true)
563+
{
564+
if (ConditionalLockRelation(uncompressed_chunk_rel, ExclusiveLock))
565+
{
566+
try_updating_chunk_status(uncompressed_chunk, uncompressed_chunk_rel);
567+
break;
568+
}
569+
570+
/*
571+
* Check for interrupts while trying to (re-)acquire the exclusive
572+
* lock.
573+
*/
574+
CHECK_FOR_INTERRUPTS();
575+
576+
if (++lock_retry >
577+
(RECOMPRESS_EXCLUSIVE_LOCK_TIMEOUT / RECOMPRESS_EXCLUSIVE_LOCK_WAIT_INTERVAL))
578+
{
579+
/*
580+
* We failed to establish the lock in the specified number of
581+
* retries. This means we give up trying to get the exclusive lock are abort the
582+
* recompression operation
583+
*/
584+
ereport(ERROR,
585+
(errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
586+
errmsg("aborting recompression due to concurrent DML on uncompressed "
587+
"data, retrying with next policy run")));
588+
break;
589+
}
590+
591+
(void) WaitLatch(MyLatch,
592+
WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
593+
RECOMPRESS_EXCLUSIVE_LOCK_WAIT_INTERVAL,
594+
WAIT_EVENT_VACUUM_TRUNCATE);
595+
ResetLatch(MyLatch);
596+
DEBUG_WAITPOINT("chunk_recompress_after_latch");
597+
}
585598
}
586599

587600
table_close(uncompressed_chunk_rel, NoLock);
@@ -866,3 +879,47 @@ delete_tuple_for_recompression(Relation rel, ItemPointer tid, Snapshot snapshot)
866879

867880
return result == TM_Ok;
868881
}
882+
883+
/* Check if we can update the chunk status to fully compressed after segmentwise recompression
884+
* We can only do this if there were no concurrent DML operations, so we check to see if there are
885+
* any uncompressed tuples in the chunk after compression.
886+
* If there aren't, we can update the chunk status
887+
*
888+
* Note: Caller is expected to have an ExclusiveLock on the uncompressed_chunk
889+
*/
890+
static void
891+
try_updating_chunk_status(Chunk *uncompressed_chunk, Relation uncompressed_chunk_rel)
892+
{
893+
TableScanDesc scan = table_beginscan(uncompressed_chunk_rel, GetLatestSnapshot(), 0, 0);
894+
hypercore_scan_set_skip_compressed(scan, true);
895+
ScanDirection scan_dir = uncompressed_chunk_rel->rd_tableam == hypercore_routine() ?
896+
ForwardScanDirection :
897+
BackwardScanDirection;
898+
TupleTableSlot *slot = table_slot_create(uncompressed_chunk_rel, NULL);
899+
900+
/* Doing a backwards scan with assumption that newly inserted tuples
901+
* are most likely at the end of the heap.
902+
*/
903+
bool has_tuples = false;
904+
if (table_scan_getnextslot(scan, scan_dir, slot))
905+
{
906+
has_tuples = true;
907+
}
908+
909+
ExecDropSingleTupleTableSlot(slot);
910+
table_endscan(scan);
911+
912+
if (!has_tuples)
913+
{
914+
if (ts_chunk_clear_status(uncompressed_chunk,
915+
CHUNK_STATUS_COMPRESSED_UNORDERED |
916+
CHUNK_STATUS_COMPRESSED_PARTIAL))
917+
ereport(DEBUG1,
918+
(errmsg("cleared chunk status for recompression: \"%s.%s\"",
919+
NameStr(uncompressed_chunk->fd.schema_name),
920+
NameStr(uncompressed_chunk->fd.table_name))));
921+
922+
/* changed chunk status, so invalidate any plans involving this chunk */
923+
CacheInvalidateRelcacheByRelid(uncompressed_chunk->table_id);
924+
}
925+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,178 @@
1+
# This file and its contents are licensed under the Timescale License.
2+
# Please see the included NOTICE for copyright information and
3+
# LICENSE-TIMESCALE for a copy of the license.
4+
5+
# This TAP test verifies the behavior when recompression is attempted on a chunk
6+
# with unique constraints while concurrent DML is happening
7+
8+
use strict;
9+
use warnings;
10+
use TimescaleNode;
11+
use Data::Dumper;
12+
use Test::More;
13+
14+
# Test setup
15+
my $node = TimescaleNode->create('node');
16+
17+
# Create table with a unique constraint
18+
my $result = $node->safe_psql(
19+
'postgres', q{
20+
CREATE TABLE sensor_data (
21+
time timestamptz not null,
22+
sensor_id integer not null,
23+
cpu double precision null,
24+
temperature double precision null,
25+
UNIQUE(time, sensor_id)
26+
);
27+
}
28+
);
29+
is($result, '', 'create table with unique constraint');
30+
31+
# Create hypertable
32+
$result = $node->safe_psql(
33+
'postgres', q{
34+
SELECT table_name FROM create_hypertable('sensor_data','time', chunk_time_interval => INTERVAL '1 month');
35+
}
36+
);
37+
is($result, 'sensor_data', 'create hypertable');
38+
39+
# Insert data
40+
$result = $node->safe_psql(
41+
'postgres', q{
42+
INSERT INTO sensor_data
43+
SELECT
44+
time + (INTERVAL '1 minute' * random()) AS time,
45+
sensor_id,
46+
random() AS cpu,
47+
random()* 100 AS temperature
48+
FROM
49+
generate_series('2022-01-01', '2022-01-07', INTERVAL '1 hour') AS g1(time),
50+
generate_series(1, 50, 1) AS g2(sensor_id)
51+
ORDER BY time;
52+
}
53+
);
54+
is($result, '', 'insert data');
55+
56+
# Define count query
57+
my $count_query = "SELECT count(*) FROM sensor_data;";
58+
59+
# Count inserted rows
60+
my $num_rows = $node->safe_psql('postgres', $count_query);
61+
is($num_rows, 7250, 'validate inserted rows');
62+
63+
# Enable compression
64+
$result = $node->safe_psql(
65+
'postgres', q{
66+
ALTER TABLE sensor_data SET (
67+
timescaledb.compress,
68+
timescaledb.compress_segmentby = 'sensor_id',
69+
timescaledb.compress_orderby = 'time'
70+
);
71+
}
72+
);
73+
is($result, '', 'enable compression');
74+
75+
# Compress the chunk
76+
my $compress_query =
77+
"SELECT count(*) FROM (SELECT compress_chunk(show_chunks('sensor_data')));";
78+
$result = $node->safe_psql('postgres', $compress_query);
79+
is($result, '1', 'compress chunk');
80+
81+
# Insert more data to make the chunk partial
82+
$result = $node->safe_psql(
83+
'postgres', q{
84+
INSERT INTO sensor_data
85+
SELECT
86+
time + (INTERVAL '1 minute' * random()) AS time,
87+
sensor_id,
88+
random() AS cpu,
89+
random()* 100 AS temperature
90+
FROM
91+
generate_series('2022-01-01', '2022-01--7', INTERVAL '1 hour') AS g1(time),
92+
generate_series(51, 55, 1) AS g2(sensor_id)
93+
ORDER BY time;
94+
}
95+
);
96+
is($result, '', 'insert more data to make the chunk partial');
97+
98+
# Create psql sessions
99+
my $s1 = $node->background_psql('postgres');
100+
my $s2 = $node->background_psql('postgres');
101+
my $s3 = $node->background_psql('postgres');
102+
103+
# Enable segmentwise recompression
104+
$s1->query_safe("SET timescaledb.enable_segmentwise_recompression TO on;");
105+
106+
# Enable waiting to acquire an exclusive lock
107+
$s1->query_safe("SET timescaledb.enable_recompress_waiting TO on;");
108+
109+
# TEST 1:
110+
# Session 1 tries to acquire an exclusive lock at the end of recompression but is blocked due to the inserts by Session 2
111+
# We use session 3 to set up debug waitpoints
112+
# Begin txns in all sessions
113+
114+
$s1->query_safe("BEGIN;");
115+
$s2->query_safe("BEGIN;");
116+
$s3->query_safe("BEGIN;");
117+
118+
# We enable the debug_waitpoint after the latch and release it after s2 aborts
119+
# This allows s1 to successfully acquire the lock the second time around
120+
121+
$s3->query_safe(
122+
"SELECT debug_waitpoint_enable('chunk_recompress_after_latch');");
123+
124+
125+
# Get lock data
126+
$result = $node->safe_psql('postgres',
127+
"SELECT relation::regclass::text, mode FROM pg_locks WHERE relation::regclass::text LIKE '%hyper_1_%chunk' and granted;"
128+
);
129+
is($result, '', "verify no locks exist on the chunk");
130+
131+
# Session 2: Insert rows into the chunk
132+
# This blocks until session 2 releases its locks
133+
$s2->query_until(
134+
'', q{
135+
INSERT INTO sensor_data VALUES
136+
('2022-01-05 12:00:00', 100, 0.5, 25.5),
137+
('2022-01-05 13:00:00', 101, 0.6, 26.5);
138+
});
139+
140+
# We have to use 'query_until('', ...)' so that the test immediately fires the next query
141+
$s1->query_until(
142+
'', q{
143+
SELECT compress_chunk(show_chunks('sensor_data'));
144+
});
145+
146+
# Session 2 immediately aborts, releasing the RowExclusiveLock on the table
147+
$s2->query_safe("ABORT");
148+
149+
# Release the debug waitpoint so that recompression succeeds
150+
$s3->query_safe(
151+
"SELECT debug_waitpoint_release('chunk_recompress_after_latch');");
152+
153+
# Verify ExclusiveLock on uncompressed chunk
154+
$result = $node->safe_psql('postgres',
155+
"SELECT relation::regclass::text FROM pg_locks WHERE relation::regclass::text LIKE '%hyper_1_%chunk' AND granted AND mode = 'ExclusiveLock';"
156+
);
157+
is( $result,
158+
'_timescaledb_internal._hyper_1_1_chunk',
159+
"verify ExclusiveLock on uncompressed chunk");
160+
161+
# Verify AccessShareLock on compressed chunk
162+
$result = $node->safe_psql('postgres',
163+
"SELECT relation::regclass::text FROM pg_locks WHERE relation::regclass::text LIKE '%hyper_1_%chunk' AND granted AND mode = 'AccessShareLock'"
164+
);
165+
is( $result,
166+
"_timescaledb_internal._hyper_1_1_chunk",
167+
"verify AccessShareLock on internal compressed chunk");
168+
169+
# Clean up
170+
$s1->query_safe("ROLLBACK;");
171+
172+
$s3->query_safe("ROLLBACK;");
173+
$s3->quit();
174+
175+
$s2->quit();
176+
$s1->quit();
177+
178+
done_testing();

tsl/test/t/CMakeLists.txt

+1
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ set(PROVE_DEBUG_TEST_FILES 003_mvcc_cagg.pl)
88

99
if((${PG_VERSION_MAJOR} GREATER_EQUAL "16"))
1010
list(APPEND PROVE_TEST_FILES 004_truncate_or_delete_spin_lock.pl)
11+
list(APPEND PROVE_DEBUG_TEST_FILES 005_recompression_spin_lock_test.pl)
1112
endif()
1213

1314
if(CMAKE_BUILD_TYPE MATCHES Debug)

0 commit comments

Comments
 (0)