@@ -1150,7 +1150,7 @@ func TestDistributor_Push_LabelRemoval(t *testing.T) {
1150
1150
})
1151
1151
1152
1152
// Push the series to the distributor
1153
- req := mockWriteRequest (tc .inputSeries , 1 , 1 )
1153
+ req := mockWriteRequest ([]labels. Labels { tc .inputSeries } , 1 , 1 )
1154
1154
_ , err = ds [0 ].Push (ctx , req )
1155
1155
require .NoError (t , err )
1156
1156
@@ -1166,6 +1166,47 @@ func TestDistributor_Push_LabelRemoval(t *testing.T) {
1166
1166
}
1167
1167
}
1168
1168
1169
+ func TestDistributor_Push_LabelRemoval_RemovingNameLabelWillError (t * testing.T ) {
1170
+ ctx := user .InjectOrgID (context .Background (), "user" )
1171
+ type testcase struct {
1172
+ inputSeries labels.Labels
1173
+ expectedSeries labels.Labels
1174
+ removeReplica bool
1175
+ removeLabels []string
1176
+ }
1177
+
1178
+ tc := testcase {
1179
+ removeReplica : true ,
1180
+ removeLabels : []string {"__name__" },
1181
+ inputSeries : labels.Labels {
1182
+ {Name : "__name__" , Value : "some_metric" },
1183
+ {Name : "cluster" , Value : "one" },
1184
+ {Name : "__replica__" , Value : "two" },
1185
+ },
1186
+ expectedSeries : labels.Labels {},
1187
+ }
1188
+
1189
+ var err error
1190
+ var limits validation.Limits
1191
+ flagext .DefaultValues (& limits )
1192
+ limits .DropLabels = tc .removeLabels
1193
+ limits .AcceptHASamples = tc .removeReplica
1194
+
1195
+ ds , _ , _ := prepare (t , prepConfig {
1196
+ numIngesters : 2 ,
1197
+ happyIngesters : 2 ,
1198
+ numDistributors : 1 ,
1199
+ shardByAllLabels : true ,
1200
+ limits : & limits ,
1201
+ })
1202
+
1203
+ // Push the series to the distributor
1204
+ req := mockWriteRequest ([]labels.Labels {tc .inputSeries }, 1 , 1 )
1205
+ _ , err = ds [0 ].Push (ctx , req )
1206
+ require .Error (t , err )
1207
+ assert .Equal (t , "rpc error: code = Code(400) desc = sample missing metric name" , err .Error ())
1208
+ }
1209
+
1169
1210
func TestDistributor_Push_ShouldGuaranteeShardingTokenConsistencyOverTheTime (t * testing.T ) {
1170
1211
ctx := user .InjectOrgID (context .Background (), "user" )
1171
1212
tests := map [string ]struct {
@@ -1254,7 +1295,7 @@ func TestDistributor_Push_ShouldGuaranteeShardingTokenConsistencyOverTheTime(t *
1254
1295
})
1255
1296
1256
1297
// Push the series to the distributor
1257
- req := mockWriteRequest (testData .inputSeries , 1 , 1 )
1298
+ req := mockWriteRequest ([]labels. Labels { testData .inputSeries } , 1 , 1 )
1258
1299
_ , err := ds [0 ].Push (ctx , req )
1259
1300
require .NoError (t , err )
1260
1301
@@ -1312,7 +1353,7 @@ func TestDistributor_Push_LabelNameValidation(t *testing.T) {
1312
1353
shuffleShardSize : 1 ,
1313
1354
skipLabelNameValidation : tc .skipLabelNameValidationCfg ,
1314
1355
})
1315
- req := mockWriteRequest (tc .inputLabels , 42 , 100000 )
1356
+ req := mockWriteRequest ([]labels. Labels { tc .inputLabels } , 42 , 100000 )
1316
1357
req .SkipLabelNameValidation = tc .skipLabelNameValidationReq
1317
1358
_ , err := ds [0 ].Push (ctx , req )
1318
1359
if tc .errExpected {
@@ -1790,7 +1831,7 @@ func TestDistributor_MetricsForLabelMatchers(t *testing.T) {
1790
1831
ctx := user .InjectOrgID (context .Background (), "test" )
1791
1832
1792
1833
for _ , series := range fixtures {
1793
- req := mockWriteRequest (series .lbls , series .value , series .timestamp )
1834
+ req := mockWriteRequest ([]labels. Labels { series .lbls } , series .value , series .timestamp )
1794
1835
_ , err := ds [0 ].Push (ctx , req )
1795
1836
require .NoError (t , err )
1796
1837
}
@@ -1875,15 +1916,16 @@ func mustNewMatcher(t labels.MatchType, n, v string) *labels.Matcher {
1875
1916
return m
1876
1917
}
1877
1918
1878
- func mockWriteRequest (lbls labels.Labels , value float64 , timestampMs int64 ) * cortexpb.WriteRequest {
1879
- samples := []cortexpb.Sample {
1880
- {
1919
+ func mockWriteRequest (lbls []labels.Labels , value float64 , timestampMs int64 ) * cortexpb.WriteRequest {
1920
+ samples := make ([]cortexpb.Sample , len (lbls ))
1921
+ for i := range lbls {
1922
+ samples [i ] = cortexpb.Sample {
1881
1923
TimestampMs : timestampMs ,
1882
1924
Value : value ,
1883
- },
1925
+ }
1884
1926
}
1885
1927
1886
- return cortexpb .ToWriteRequest ([]labels. Labels { lbls } , samples , nil , cortexpb .API )
1928
+ return cortexpb .ToWriteRequest (lbls , samples , nil , cortexpb .API )
1887
1929
}
1888
1930
1889
1931
type prepConfig struct {
@@ -2644,27 +2686,33 @@ func TestDistributor_Push_Relabel(t *testing.T) {
2644
2686
ctx := user .InjectOrgID (context .Background (), "user" )
2645
2687
2646
2688
type testcase struct {
2647
- inputSeries labels.Labels
2689
+ name string
2690
+ inputSeries []labels.Labels
2648
2691
expectedSeries labels.Labels
2649
2692
metricRelabelConfigs []* relabel.Config
2650
2693
}
2651
2694
2652
2695
cases := []testcase {
2653
- // No relabel config.
2654
2696
{
2655
- inputSeries : labels.Labels {
2656
- {Name : "__name__" , Value : "foo" },
2657
- {Name : "cluster" , Value : "one" },
2697
+ name : "with no relabel config" ,
2698
+ inputSeries : []labels.Labels {
2699
+ {
2700
+ {Name : "__name__" , Value : "foo" },
2701
+ {Name : "cluster" , Value : "one" },
2702
+ },
2658
2703
},
2659
2704
expectedSeries : labels.Labels {
2660
2705
{Name : "__name__" , Value : "foo" },
2661
2706
{Name : "cluster" , Value : "one" },
2662
2707
},
2663
2708
},
2664
2709
{
2665
- inputSeries : labels.Labels {
2666
- {Name : "__name__" , Value : "foo" },
2667
- {Name : "cluster" , Value : "one" },
2710
+ name : "with hardcoded replace" ,
2711
+ inputSeries : []labels.Labels {
2712
+ {
2713
+ {Name : "__name__" , Value : "foo" },
2714
+ {Name : "cluster" , Value : "one" },
2715
+ },
2668
2716
},
2669
2717
expectedSeries : labels.Labels {
2670
2718
{Name : "__name__" , Value : "foo" },
@@ -2680,37 +2728,126 @@ func TestDistributor_Push_Relabel(t *testing.T) {
2680
2728
},
2681
2729
},
2682
2730
},
2731
+ {
2732
+ name : "with drop action" ,
2733
+ inputSeries : []labels.Labels {
2734
+ {
2735
+ {Name : "__name__" , Value : "foo" },
2736
+ {Name : "cluster" , Value : "one" },
2737
+ },
2738
+ {
2739
+ {Name : "__name__" , Value : "bar" },
2740
+ {Name : "cluster" , Value : "two" },
2741
+ },
2742
+ },
2743
+ expectedSeries : labels.Labels {
2744
+ {Name : "__name__" , Value : "bar" },
2745
+ {Name : "cluster" , Value : "two" },
2746
+ },
2747
+ metricRelabelConfigs : []* relabel.Config {
2748
+ {
2749
+ SourceLabels : []model.LabelName {"__name__" },
2750
+ Action : relabel .Drop ,
2751
+ Regex : relabel .MustNewRegexp ("(foo)" ),
2752
+ },
2753
+ },
2754
+ },
2683
2755
}
2684
2756
2685
2757
for _ , tc := range cases {
2686
- var err error
2687
- var limits validation.Limits
2688
- flagext .DefaultValues (& limits )
2689
- limits .MetricRelabelConfigs = tc .metricRelabelConfigs
2758
+ t .Run (tc .name , func (t * testing.T ) {
2759
+ var err error
2760
+ var limits validation.Limits
2761
+ flagext .DefaultValues (& limits )
2762
+ limits .MetricRelabelConfigs = tc .metricRelabelConfigs
2690
2763
2691
- ds , ingesters , _ := prepare (t , prepConfig {
2692
- numIngesters : 2 ,
2693
- happyIngesters : 2 ,
2694
- numDistributors : 1 ,
2695
- shardByAllLabels : true ,
2696
- limits : & limits ,
2697
- })
2764
+ ds , ingesters , _ := prepare (t , prepConfig {
2765
+ numIngesters : 2 ,
2766
+ happyIngesters : 2 ,
2767
+ numDistributors : 1 ,
2768
+ shardByAllLabels : true ,
2769
+ limits : & limits ,
2770
+ })
2698
2771
2699
- // Push the series to the distributor
2700
- req := mockWriteRequest (tc .inputSeries , 1 , 1 )
2701
- _ , err = ds [0 ].Push (ctx , req )
2702
- require .NoError (t , err )
2772
+ // Push the series to the distributor
2773
+ req := mockWriteRequest (tc .inputSeries , 1 , 1 )
2774
+ _ , err = ds [0 ].Push (ctx , req )
2775
+ require .NoError (t , err )
2703
2776
2704
- // Since each test pushes only 1 series, we do expect the ingester
2705
- // to have received exactly 1 series
2706
- for i := range ingesters {
2707
- timeseries := ingesters [i ].series ()
2708
- assert .Equal (t , 1 , len (timeseries ))
2709
- for _ , v := range timeseries {
2710
- assert .Equal (t , tc .expectedSeries , cortexpb .FromLabelAdaptersToLabels (v .Labels ))
2777
+ // Since each test pushes only 1 series, we do expect the ingester
2778
+ // to have received exactly 1 series
2779
+ for i := range ingesters {
2780
+ timeseries := ingesters [i ].series ()
2781
+ assert .Equal (t , 1 , len (timeseries ))
2782
+ for _ , v := range timeseries {
2783
+ assert .Equal (t , tc .expectedSeries , cortexpb .FromLabelAdaptersToLabels (v .Labels ))
2784
+ }
2711
2785
}
2712
- }
2786
+ })
2787
+ }
2788
+ }
2789
+
2790
+ func TestDistributor_Push_RelabelDropWillExportMetricOfDroppedSamples (t * testing.T ) {
2791
+ metricRelabelConfigs := []* relabel.Config {
2792
+ {
2793
+ SourceLabels : []model.LabelName {"__name__" },
2794
+ Action : relabel .Drop ,
2795
+ Regex : relabel .MustNewRegexp ("(foo)" ),
2796
+ },
2797
+ }
2798
+
2799
+ inputSeries := []labels.Labels {
2800
+ {
2801
+ {Name : "__name__" , Value : "foo" },
2802
+ {Name : "cluster" , Value : "one" },
2803
+ },
2804
+ {
2805
+ {Name : "__name__" , Value : "bar" },
2806
+ {Name : "cluster" , Value : "two" },
2807
+ },
2808
+ }
2809
+
2810
+ var err error
2811
+ var limits validation.Limits
2812
+ flagext .DefaultValues (& limits )
2813
+ limits .MetricRelabelConfigs = metricRelabelConfigs
2814
+
2815
+ ds , ingesters , regs := prepare (t , prepConfig {
2816
+ numIngesters : 2 ,
2817
+ happyIngesters : 2 ,
2818
+ numDistributors : 1 ,
2819
+ shardByAllLabels : true ,
2820
+ limits : & limits ,
2821
+ })
2822
+
2823
+ regs [0 ].MustRegister (validation .DiscardedSamples )
2824
+ validation .DiscardedSamples .Reset ()
2825
+
2826
+ // Push the series to the distributor
2827
+ req := mockWriteRequest (inputSeries , 1 , 1 )
2828
+ ctx := user .InjectOrgID (context .Background (), "user1" )
2829
+ _ , err = ds [0 ].Push (ctx , req )
2830
+ require .NoError (t , err )
2831
+
2832
+ // Since each test pushes only 1 series, we do expect the ingester
2833
+ // to have received exactly 1 series
2834
+ for i := range ingesters {
2835
+ timeseries := ingesters [i ].series ()
2836
+ assert .Equal (t , 1 , len (timeseries ))
2713
2837
}
2838
+
2839
+ metrics := []string {"cortex_distributor_received_samples_total" , "cortex_discarded_samples_total" }
2840
+
2841
+ expectedMetrics := `
2842
+ # HELP cortex_discarded_samples_total The total number of samples that were discarded.
2843
+ # TYPE cortex_discarded_samples_total counter
2844
+ cortex_discarded_samples_total{reason="relabel_configuration",user="user1"} 1
2845
+ # HELP cortex_distributor_received_samples_total The total number of received samples, excluding rejected and deduped samples.
2846
+ # TYPE cortex_distributor_received_samples_total counter
2847
+ cortex_distributor_received_samples_total{user="user1"} 1
2848
+ `
2849
+
2850
+ require .NoError (t , testutil .GatherAndCompare (regs [0 ], strings .NewReader (expectedMetrics ), metrics ... ))
2714
2851
}
2715
2852
2716
2853
func countMockIngestersCalls (ingesters []mockIngester , name string ) int {
0 commit comments