@@ -854,22 +854,36 @@ async fn batch_write_tag_index(batch_tasks: Vec<Task>, writer: &CacheWriter) {
854
854
let mut tag_name_builder = BinaryBuilder :: new ( ) ;
855
855
let mut tag_value_builder = BinaryBuilder :: new ( ) ;
856
856
857
- batch_tasks. into_iter ( ) . for_each ( |mut task| {
858
- if let Task :: TagIndex ( duration, series_id, ref mut names, ref mut values, metric_id) = task
859
- {
860
- remove_default_tag ( names, values) ;
861
-
862
- names. iter ( ) . zip ( values. iter ( ) ) . for_each ( |( name, value) | {
863
- metrics_id_builder. append_value ( metric_id. 0 ) ;
864
- tag_name_builder. append_value ( name. to_byte_slice ( ) ) ;
865
- tag_value_builder. append_value ( value. to_byte_slice ( ) ) ;
866
- series_id_builder. append_value ( series_id. 0 ) ;
867
- field_duration_builder. append_value ( duration. as_millis ( ) as u64 ) ;
868
- } ) ;
869
- } else {
870
- error ! ( "Some task are not tag index." ) ;
871
- }
872
- } ) ;
857
+ let mut start_ts: i64 = 0 ;
858
+ let mut end_ts: i64 = 0 ;
859
+ let task_len = batch_tasks. len ( ) ;
860
+
861
+ batch_tasks
862
+ . into_iter ( )
863
+ . enumerate ( )
864
+ . for_each ( |( index, mut task) | {
865
+ if let Task :: TagIndex ( duration, series_id, ref mut names, ref mut values, metric_id) =
866
+ task
867
+ {
868
+ if index == 0 {
869
+ start_ts = duration. as_millis ( ) as i64 ;
870
+ } else if index == task_len - 1 {
871
+ end_ts = duration. as_millis ( ) as i64 ;
872
+ }
873
+
874
+ remove_default_tag ( names, values) ;
875
+
876
+ names. iter ( ) . zip ( values. iter ( ) ) . for_each ( |( name, value) | {
877
+ metrics_id_builder. append_value ( metric_id. 0 ) ;
878
+ tag_name_builder. append_value ( name. to_byte_slice ( ) ) ;
879
+ tag_value_builder. append_value ( value. to_byte_slice ( ) ) ;
880
+ series_id_builder. append_value ( series_id. 0 ) ;
881
+ field_duration_builder. append_value ( duration. as_millis ( ) as u64 ) ;
882
+ } ) ;
883
+ } else {
884
+ error ! ( "Some task are not tag index." ) ;
885
+ }
886
+ } ) ;
873
887
874
888
let arrays: Vec < ArrayRef > = vec ! [
875
889
Arc :: new( metrics_id_builder. finish( ) ) ,
@@ -883,7 +897,7 @@ async fn batch_write_tag_index(batch_tasks: Vec<Task>, writer: &CacheWriter) {
883
897
. storage
884
898
. write ( WriteRequest {
885
899
batch,
886
- time_range : ( 0 .. 10 ) . into ( ) ,
900
+ time_range : ( Timestamp ( start_ts ) .. Timestamp ( end_ts ) ) . into ( ) ,
887
901
enable_check : true ,
888
902
} )
889
903
. await
@@ -900,9 +914,19 @@ async fn batch_write_series(batch_tasks: Vec<Task>, writer: &CacheWriter) {
900
914
let mut name_binary_values: Vec < & [ u8 ] > = Vec :: new ( ) ;
901
915
let mut value_binary_values: Vec < & [ u8 ] > = Vec :: new ( ) ;
902
916
917
+ let mut start_ts: i64 = 0 ;
918
+ let mut end_ts: i64 = 0 ;
919
+ let task_len = batch_tasks. len ( ) ;
920
+
903
921
let mut offsets: Vec < i32 > = vec ! [ 0 ; batch_tasks. len( ) + 1 ] ;
904
922
batch_tasks. iter ( ) . enumerate ( ) . for_each ( |( index, task) | {
905
923
if let Task :: Series ( duration, id, key, metric_id) = task {
924
+ if index == 0 {
925
+ start_ts = duration. as_millis ( ) as i64 ;
926
+ } else if index == task_len - 1 {
927
+ end_ts = duration. as_millis ( ) as i64 ;
928
+ }
929
+
906
930
metric_id_builder. append_value ( metric_id. 0 ) ;
907
931
series_id_builder. append_value ( id. 0 ) ;
908
932
field_duration_builder. append_value ( duration. as_millis ( ) as u64 ) ;
@@ -952,7 +976,7 @@ async fn batch_write_series(batch_tasks: Vec<Task>, writer: &CacheWriter) {
952
976
. storage
953
977
. write ( WriteRequest {
954
978
batch,
955
- time_range : ( 0 .. 10 ) . into ( ) ,
979
+ time_range : ( Timestamp ( start_ts ) .. Timestamp ( end_ts ) ) . into ( ) ,
956
980
enable_check : true ,
957
981
} )
958
982
. await
@@ -970,18 +994,31 @@ async fn batch_write_metrics(batch_tasks: Vec<Task>, writer: &CacheWriter) {
970
994
let mut field_type_builder = UInt8Builder :: new ( ) ;
971
995
let mut field_duration_builder = UInt64Builder :: new ( ) ;
972
996
973
- batch_tasks. into_iter ( ) . for_each ( |task| {
974
- if let Task :: Metric ( current, name, field_name, field_type) = task {
975
- metric_id_builder. append_value ( hash ( & name) ) ;
976
- metric_name_builder. append_value ( name) ;
977
- field_id_builder. append_value ( hash ( field_name. to_byte_slice ( ) ) ) ;
978
- field_name_builder. append_value ( field_name) ;
979
- field_type_builder. append_value ( field_type) ;
980
- field_duration_builder. append_value ( current. as_millis ( ) as u64 ) ;
981
- } else {
982
- error ! ( "Some task are not metric." ) ;
983
- }
984
- } ) ;
997
+ let mut start_ts: i64 = 0 ;
998
+ let mut end_ts: i64 = 0 ;
999
+ let task_len = batch_tasks. len ( ) ;
1000
+
1001
+ batch_tasks
1002
+ . into_iter ( )
1003
+ . enumerate ( )
1004
+ . for_each ( |( index, task) | {
1005
+ if let Task :: Metric ( duration, name, field_name, field_type) = task {
1006
+ if index == 0 {
1007
+ start_ts = duration. as_millis ( ) as i64 ;
1008
+ } else if index == task_len - 1 {
1009
+ end_ts = duration. as_millis ( ) as i64 ;
1010
+ }
1011
+
1012
+ metric_id_builder. append_value ( hash ( & name) ) ;
1013
+ metric_name_builder. append_value ( name) ;
1014
+ field_id_builder. append_value ( hash ( field_name. to_byte_slice ( ) ) ) ;
1015
+ field_name_builder. append_value ( field_name) ;
1016
+ field_type_builder. append_value ( field_type) ;
1017
+ field_duration_builder. append_value ( duration. as_millis ( ) as u64 ) ;
1018
+ } else {
1019
+ error ! ( "Some task are not metric." ) ;
1020
+ }
1021
+ } ) ;
985
1022
986
1023
vec ! [
987
1024
Arc :: new( metric_name_builder. finish( ) ) ,
0 commit comments