28
28
#include "ts_catalog/chunk_column_stats.h"
29
29
#include "ts_catalog/compression_settings.h"
30
30
31
+ /*
32
+ * Timing parameters for spin locking heuristics.
33
+ * These are the same as used by Postgres for truncate locking during lazy vacuum.
34
+ * https://github.com/postgres/postgres/blob/4a0650d359c5981270039eeb634c3b7427aa0af5/src/backend/access/heap/vacuumlazy.c#L82
35
+ */
36
+ #define RECOMPRESS_EXCLUSIVE_LOCK_WAIT_INTERVAL 50 /* ms */
37
+ #define RECOMPRESS_EXCLUSIVE_LOCK_TIMEOUT 5000 /* ms */
38
+
31
39
static bool fetch_uncompressed_chunk_into_tuplesort (Tuplesortstate * tuplesortstate ,
32
40
Relation uncompressed_chunk_rel ,
33
41
Snapshot snapshot );
@@ -50,6 +58,8 @@ static bool check_changed_group(CompressedSegmentInfo *current_segment, TupleTab
50
58
int nsegmentby_cols );
51
59
static void recompress_segment (Tuplesortstate * tuplesortstate , Relation compressed_chunk_rel ,
52
60
RowCompressor * row_compressor );
61
+ static void try_updating_chunk_status (Chunk * uncompressed_chunk , Relation uncompressed_chunk_rel );
62
+
53
63
/*
54
64
* Recompress an existing chunk by decompressing the batches
55
65
* that are affected by the addition of newer data. The existing
@@ -533,38 +543,7 @@ recompress_chunk_segmentwise_impl(Chunk *uncompressed_chunk)
533
543
*/
534
544
if (ConditionalLockRelation (uncompressed_chunk_rel , ExclusiveLock ))
535
545
{
536
- TableScanDesc scan = table_beginscan (uncompressed_chunk_rel , GetLatestSnapshot (), 0 , 0 );
537
- hypercore_scan_set_skip_compressed (scan , true);
538
- ScanDirection scan_dir = uncompressed_chunk_rel -> rd_tableam == hypercore_routine () ?
539
- ForwardScanDirection :
540
- BackwardScanDirection ;
541
- TupleTableSlot * slot = table_slot_create (uncompressed_chunk_rel , NULL );
542
-
543
- /* Doing a backwards scan with assumption that newly inserted tuples
544
- * are most likely at the end of the heap.
545
- */
546
- bool has_tuples = false;
547
- if (table_scan_getnextslot (scan , scan_dir , slot ))
548
- {
549
- has_tuples = true;
550
- }
551
-
552
- ExecDropSingleTupleTableSlot (slot );
553
- table_endscan (scan );
554
-
555
- if (!has_tuples )
556
- {
557
- if (ts_chunk_clear_status (uncompressed_chunk ,
558
- CHUNK_STATUS_COMPRESSED_UNORDERED |
559
- CHUNK_STATUS_COMPRESSED_PARTIAL ))
560
- ereport (DEBUG1 ,
561
- (errmsg ("cleared chunk status for recompression: \"%s.%s\"" ,
562
- NameStr (uncompressed_chunk -> fd .schema_name ),
563
- NameStr (uncompressed_chunk -> fd .table_name ))));
564
-
565
- /* changed chunk status, so invalidate any plans involving this chunk */
566
- CacheInvalidateRelcacheByRelid (uncompressed_chunk_id );
567
- }
546
+ try_updating_chunk_status (uncompressed_chunk , uncompressed_chunk_rel );
568
547
}
569
548
else if (has_unique_constraints )
570
549
{
@@ -575,13 +554,44 @@ recompress_chunk_segmentwise_impl(Chunk *uncompressed_chunk)
575
554
* and speculative insertion could potentially cause false negatives during
576
555
* constraint checking. For now, our best option here is to bail.
577
556
*
578
- * This can be improved by using a spin lock to wait for the ExclusiveLock
579
- * or bail out if we can't get it in time.
557
+ * We use a spin lock to wait for the ExclusiveLock or bail out if we can't get it in time.
580
558
*/
581
- ereport (ERROR ,
582
- (errcode (ERRCODE_T_R_SERIALIZATION_FAILURE ),
583
- errmsg ("aborting recompression due to concurrent DML on uncompressed "
584
- "data, retrying with next policy run" )));
559
+
560
+ int lock_retry = 0 ;
561
+ while (true)
562
+ {
563
+ if (ConditionalLockRelation (uncompressed_chunk_rel , ExclusiveLock ))
564
+ {
565
+ try_updating_chunk_status (uncompressed_chunk , uncompressed_chunk_rel );
566
+ break ;
567
+ }
568
+
569
+ /*
570
+ * Check for interrupts while trying to (re-)acquire the exclusive
571
+ * lock.
572
+ */
573
+ CHECK_FOR_INTERRUPTS ();
574
+
575
+ if (++ lock_retry >
576
+ (RECOMPRESS_EXCLUSIVE_LOCK_TIMEOUT / RECOMPRESS_EXCLUSIVE_LOCK_WAIT_INTERVAL ))
577
+ {
578
+ /*
579
+ * We failed to establish the lock in the specified number of
580
+ * retries. This means we give up trying to get the exclusive lock are abort the recompression operation
581
+ */
582
+ ereport (ERROR ,
583
+ (errcode (ERRCODE_T_R_SERIALIZATION_FAILURE ),
584
+ errmsg ("aborting recompression due to concurrent DML on uncompressed "
585
+ "data, retrying with next policy run" )));
586
+ break ;
587
+ }
588
+
589
+ (void ) WaitLatch (MyLatch ,
590
+ WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH ,
591
+ RECOMPRESS_EXCLUSIVE_LOCK_WAIT_INTERVAL ,
592
+ WAIT_EVENT_VACUUM_TRUNCATE );
593
+ ResetLatch (MyLatch );
594
+ }
585
595
}
586
596
587
597
table_close (uncompressed_chunk_rel , NoLock );
@@ -866,3 +876,46 @@ delete_tuple_for_recompression(Relation rel, ItemPointer tid, Snapshot snapshot)
866
876
867
877
return result == TM_Ok ;
868
878
}
879
+
880
+ /* Check if we can update the chunk status to fully compressed after segmentwise recompression
881
+ * We can only do this if there were no concurrent DML operations, so we check to see if there are
882
+ * any uncompressed tuples in the chunk after compression.
883
+ * If there aren't, we can update the chunk status
884
+ *
885
+ * Note: Caller is expected to have an ExclusiveLock on the uncompressed_chunk
886
+ */
887
+ static void try_updating_chunk_status (Chunk * uncompressed_chunk , Relation uncompressed_chunk_rel )
888
+ {
889
+ TableScanDesc scan = table_beginscan (uncompressed_chunk_rel , GetLatestSnapshot (), 0 , 0 );
890
+ hypercore_scan_set_skip_compressed (scan , true);
891
+ ScanDirection scan_dir = uncompressed_chunk_rel -> rd_tableam == hypercore_routine () ?
892
+ ForwardScanDirection :
893
+ BackwardScanDirection ;
894
+ TupleTableSlot * slot = table_slot_create (uncompressed_chunk_rel , NULL );
895
+
896
+ /* Doing a backwards scan with assumption that newly inserted tuples
897
+ * are most likely at the end of the heap.
898
+ */
899
+ bool has_tuples = false;
900
+ if (table_scan_getnextslot (scan , scan_dir , slot ))
901
+ {
902
+ has_tuples = true;
903
+ }
904
+
905
+ ExecDropSingleTupleTableSlot (slot );
906
+ table_endscan (scan );
907
+
908
+ if (!has_tuples )
909
+ {
910
+ if (ts_chunk_clear_status (uncompressed_chunk ,
911
+ CHUNK_STATUS_COMPRESSED_UNORDERED |
912
+ CHUNK_STATUS_COMPRESSED_PARTIAL ))
913
+ ereport (DEBUG1 ,
914
+ (errmsg ("cleared chunk status for recompression: \"%s.%s\"" ,
915
+ NameStr (uncompressed_chunk -> fd .schema_name ),
916
+ NameStr (uncompressed_chunk -> fd .table_name ))));
917
+
918
+ /* changed chunk status, so invalidate any plans involving this chunk */
919
+ CacheInvalidateRelcacheByRelid (uncompressed_chunk -> table_id );
920
+ }
921
+ }
0 commit comments