29
29
#include <nodes/hypertable_modify.h>
30
30
#include <ts_catalog/array_utils.h>
31
31
32
- static struct decompress_batches_stats
33
- decompress_batches_indexscan (Relation in_rel , Relation out_rel , Relation index_rel ,
34
- Snapshot snapshot , ScanKeyData * index_scankeys , int num_index_scankeys ,
35
- ScanKeyData * heap_scankeys , int num_heap_scankeys ,
36
- ScanKeyData * mem_scankeys , int num_mem_scankeys ,
37
- Bitmapset * null_columns , List * is_nulls );
32
+ static struct decompress_batches_stats decompress_batches_indexscan (
33
+ Relation in_rel , Relation out_rel , Relation index_rel , Snapshot snapshot ,
34
+ ScanKeyData * index_scankeys , int num_index_scankeys , ScanKeyData * heap_scankeys ,
35
+ int num_heap_scankeys , ScanKeyData * mem_scankeys , int num_mem_scankeys ,
36
+ OnConflictAction on_conflict , bool * skip_insert , Bitmapset * null_columns , List * is_nulls );
38
37
static struct decompress_batches_stats
39
38
decompress_batches_seqscan (Relation in_rel , Relation out_rel , Snapshot snapshot ,
40
39
ScanKeyData * scankeys , int num_scankeys , ScanKeyData * mem_scankeys ,
41
- int num_mem_scankeys , Bitmapset * null_columns , List * is_nulls );
40
+ int num_mem_scankeys , OnConflictAction on_conflict , bool * skip_insert ,
41
+ Bitmapset * null_columns , List * is_nulls );
42
42
43
- static bool batch_matches (RowDecompressor * decompressor , ScanKeyData * scankeys , int num_scankeys );
43
+ static bool batch_matches (RowDecompressor * decompressor , ScanKeyData * scankeys , int num_scankeys ,
44
+ OnConflictAction on_conflict , bool * skip_insert );
44
45
static void process_predicates (Chunk * ch , CompressionSettings * settings , List * predicates ,
45
46
ScanKeyData * * mem_scankeys , int * num_mem_scankeys ,
46
47
List * * heap_filters , List * * index_filters , List * * is_null );
47
48
static Relation find_matching_index (Relation comp_chunk_rel , List * * index_filters ,
48
49
List * * heap_filters );
49
- static Bitmapset * compressed_insert_key_columns (Relation relation );
50
+ static Bitmapset * compressed_insert_key_columns (Relation relation , bool * covering );
50
51
static BatchFilter * make_batchfilter (char * column_name , StrategyNumber strategy , Oid collation ,
51
52
RegProcedure opcode , Const * value , bool is_null_check ,
52
53
bool is_null , bool is_array_op );
@@ -84,7 +85,20 @@ decompress_batches_for_insert(const ChunkInsertState *cis, TupleTableSlot *slot)
84
85
CompressionSettings * settings = ts_compression_settings_get (cis -> compressed_chunk_table_id );
85
86
Assert (settings );
86
87
87
- Bitmapset * key_columns = compressed_insert_key_columns (out_rel );
88
+ bool covering ;
89
+ Bitmapset * key_columns = compressed_insert_key_columns (out_rel , & covering );
90
+ OnConflictAction on_conflict = ONCONFLICT_UPDATE ;
91
+ /*
92
+ * When no on conflict clause is specified and the index is covering, we can
93
+ * error out before decompressing anything.
94
+ * For ON CONFLICT DO NOTHING with covering index we can skip decompression
95
+ * and abort the insert when we find a matching tuple.
96
+ * For ON CONFLICT DO UPDATE we need to decompress the tuple on match.
97
+ */
98
+ if (covering && cis -> cds -> dispatch )
99
+ {
100
+ on_conflict = chunk_dispatch_get_on_conflict_action (cis -> cds -> dispatch );
101
+ }
88
102
Bitmapset * index_columns = NULL ;
89
103
Bitmapset * null_columns = NULL ;
90
104
struct decompress_batches_stats stats ;
@@ -113,6 +127,7 @@ decompress_batches_for_insert(const ChunkInsertState *cis, TupleTableSlot *slot)
113
127
& index_columns ,
114
128
& num_index_scankeys );
115
129
130
+ bool skip_insert = false;
116
131
if (index_rel )
117
132
{
118
133
/*
@@ -147,6 +162,8 @@ decompress_batches_for_insert(const ChunkInsertState *cis, TupleTableSlot *slot)
147
162
num_heap_scankeys ,
148
163
mem_scankeys ,
149
164
num_mem_scankeys ,
165
+ on_conflict ,
166
+ & skip_insert ,
150
167
NULL , /* no null column check for non-segmentby
151
168
columns */
152
169
NIL );
@@ -171,12 +188,19 @@ decompress_batches_for_insert(const ChunkInsertState *cis, TupleTableSlot *slot)
171
188
num_heap_scankeys ,
172
189
mem_scankeys ,
173
190
num_mem_scankeys ,
191
+ on_conflict ,
192
+ & skip_insert ,
174
193
null_columns ,
175
194
NIL );
176
195
bms_free (key_columns );
177
196
}
178
197
179
198
Assert (cis -> cds != NULL );
199
+ if (skip_insert )
200
+ {
201
+ cis -> cds -> skip_insert = true;
202
+ }
203
+
180
204
cis -> cds -> batches_decompressed += stats .batches_decompressed ;
181
205
cis -> cds -> tuples_decompressed += stats .tuples_decompressed ;
182
206
@@ -260,6 +284,8 @@ decompress_batches_for_update_delete(HypertableModifyState *ht_state, Chunk *chu
260
284
num_scankeys ,
261
285
mem_scankeys ,
262
286
num_mem_scankeys ,
287
+ ONCONFLICT_UPDATE ,
288
+ NULL ,
263
289
null_columns ,
264
290
is_null );
265
291
/* close the selected index */
@@ -274,6 +300,8 @@ decompress_batches_for_update_delete(HypertableModifyState *ht_state, Chunk *chu
274
300
num_scankeys ,
275
301
mem_scankeys ,
276
302
num_mem_scankeys ,
303
+ ONCONFLICT_UPDATE ,
304
+ NULL ,
277
305
null_columns ,
278
306
is_null );
279
307
}
@@ -318,6 +346,7 @@ decompress_batches_indexscan(Relation in_rel, Relation out_rel, Relation index_r
318
346
Snapshot snapshot , ScanKeyData * index_scankeys , int num_index_scankeys ,
319
347
ScanKeyData * heap_scankeys , int num_heap_scankeys ,
320
348
ScanKeyData * mem_scankeys , int num_mem_scankeys ,
349
+ OnConflictAction on_conflict , bool * skip_insert ,
321
350
Bitmapset * null_columns , List * is_nulls )
322
351
{
323
352
HeapTuple compressed_tuple ;
@@ -404,12 +433,22 @@ decompress_batches_indexscan(Relation in_rel, Relation out_rel, Relation index_r
404
433
decompressor .compressed_datums ,
405
434
decompressor .compressed_is_nulls );
406
435
407
- if (num_mem_scankeys && !batch_matches (& decompressor , mem_scankeys , num_mem_scankeys ))
436
+ if (num_mem_scankeys &&
437
+ !batch_matches (& decompressor , mem_scankeys , num_mem_scankeys , on_conflict , skip_insert ))
408
438
{
409
439
row_decompressor_reset (& decompressor );
410
440
continue ;
411
441
}
412
442
443
+ if (skip_insert && * skip_insert )
444
+ {
445
+ row_decompressor_close (& decompressor );
446
+ index_endscan (scan );
447
+ index_close (index_rel , AccessShareLock );
448
+ ExecDropSingleTupleTableSlot (slot );
449
+ return stats ;
450
+ }
451
+
413
452
write_logical_replication_msg_decompression_start ();
414
453
result = delete_compressed_tuple (& decompressor , snapshot , compressed_tuple );
415
454
/* skip reporting error if isolation level is < Repeatable Read
@@ -470,7 +509,8 @@ decompress_batches_indexscan(Relation in_rel, Relation out_rel, Relation index_r
470
509
static struct decompress_batches_stats
471
510
decompress_batches_seqscan (Relation in_rel , Relation out_rel , Snapshot snapshot ,
472
511
ScanKeyData * scankeys , int num_scankeys , ScanKeyData * mem_scankeys ,
473
- int num_mem_scankeys , Bitmapset * null_columns , List * is_nulls )
512
+ int num_mem_scankeys , OnConflictAction on_conflict , bool * skip_insert ,
513
+ Bitmapset * null_columns , List * is_nulls )
474
514
{
475
515
RowDecompressor decompressor ;
476
516
bool decompressor_initialized = false;
@@ -534,12 +574,21 @@ decompress_batches_seqscan(Relation in_rel, Relation out_rel, Snapshot snapshot,
534
574
decompressor .compressed_datums ,
535
575
decompressor .compressed_is_nulls );
536
576
537
- if (num_mem_scankeys && !batch_matches (& decompressor , mem_scankeys , num_mem_scankeys ))
577
+ if (num_mem_scankeys &&
578
+ !batch_matches (& decompressor , mem_scankeys , num_mem_scankeys , on_conflict , skip_insert ))
538
579
{
539
580
row_decompressor_reset (& decompressor );
540
581
continue ;
541
582
}
542
583
584
+ if (skip_insert && * skip_insert )
585
+ {
586
+ row_decompressor_close (& decompressor );
587
+ ExecDropSingleTupleTableSlot (slot );
588
+ table_endscan (scan );
589
+ return stats ;
590
+ }
591
+
543
592
write_logical_replication_msg_decompression_start ();
544
593
result = delete_compressed_tuple (& decompressor , snapshot , compressed_tuple );
545
594
/* skip reporting error if isolation level is < Repeatable Read
@@ -582,7 +631,8 @@ decompress_batches_seqscan(Relation in_rel, Relation out_rel, Snapshot snapshot,
582
631
}
583
632
584
633
static bool
585
- batch_matches (RowDecompressor * decompressor , ScanKeyData * scankeys , int num_scankeys )
634
+ batch_matches (RowDecompressor * decompressor , ScanKeyData * scankeys , int num_scankeys ,
635
+ OnConflictAction on_conflict , bool * skip_insert )
586
636
{
587
637
int num_tuples = decompress_batch (decompressor );
588
638
@@ -599,6 +649,16 @@ batch_matches(RowDecompressor *decompressor, ScanKeyData *scankeys, int num_scan
599
649
#endif
600
650
if (valid )
601
651
{
652
+ if (on_conflict == ONCONFLICT_NONE )
653
+ {
654
+ ereport (ERROR ,
655
+ (errcode (ERRCODE_UNIQUE_VIOLATION ),
656
+ errmsg ("duplicate key value violates unique constraint" )));
657
+ }
658
+ if (on_conflict == ONCONFLICT_NOTHING )
659
+ {
660
+ * skip_insert = true;
661
+ }
602
662
return true;
603
663
}
604
664
}
@@ -743,9 +803,12 @@ decompress_chunk_walker(PlanState *ps, struct decompress_chunk_context *ctx)
743
803
* In case of multiple unique indexes we have to return the shared columns.
744
804
* For expression indexes we ignore the columns with expressions, for partial
745
805
* indexes we ignore predicate.
806
+ *
807
+ * The covering flag is set to true if we have a single constraint that is covered
808
+ * by all the columns present in the Bitmapset.
746
809
*/
747
810
static Bitmapset *
748
- compressed_insert_key_columns (Relation relation )
811
+ compressed_insert_key_columns (Relation relation , bool * covering )
749
812
{
750
813
Bitmapset * shared_attrs = NULL ; /* indexed columns */
751
814
ListCell * l ;
@@ -792,8 +855,24 @@ compressed_insert_key_columns(Relation relation)
792
855
}
793
856
index_close (indexDesc , AccessShareLock );
794
857
795
- shared_attrs = shared_attrs ? bms_intersect (idx_attrs , shared_attrs ) : idx_attrs ;
858
+ if (!shared_attrs )
859
+ {
860
+ /* First iteration */
861
+ shared_attrs = idx_attrs ;
862
+ /* We only optimize unique constraint checks for non-partial indexes. */
863
+ * covering = indexDesc -> rd_indpred == NIL ;
864
+ }
865
+ else
866
+ {
867
+ shared_attrs = bms_intersect (idx_attrs , shared_attrs );
868
+ * covering = true;
869
+ }
796
870
871
+ /* When multiple unique indexes are present, in theory there could be no shared
872
+ * columns even though that is very unlikely as they will probably at least share
873
+ * the partitioning columns. But since we are looking at chunk indexes here that
874
+ * is not guaranteed.
875
+ */
797
876
if (!shared_attrs )
798
877
return NULL ;
799
878
}
0 commit comments