12
12
// See the License for the specific language governing permissions and
13
13
// limitations under the License.
14
14
15
- use std:: cmp:: { Ord , Ordering , PartialEq , PartialOrd } ;
16
- use std:: collections:: { BTreeSet , HashMap , btree_set} ;
15
+ use std:: collections:: HashMap ;
17
16
18
17
use async_trait:: async_trait;
19
18
use prost:: Message ;
@@ -188,36 +187,6 @@ pub struct LogRecord {
188
187
pub scope_dropped_attributes_count : u32 ,
189
188
}
190
189
191
- /// A wrapper around `LogRecord` that implements `Ord` to allow insertion of log records into a
192
- /// `BTreeSet`.
193
- #[ derive( Debug ) ]
194
- struct OrdLogRecord ( LogRecord ) ;
195
-
196
- impl Ord for OrdLogRecord {
197
- fn cmp ( & self , other : & Self ) -> Ordering {
198
- self . 0
199
- . service_name
200
- . cmp ( & other. 0 . service_name )
201
- . then ( self . 0 . timestamp_nanos . cmp ( & other. 0 . timestamp_nanos ) )
202
- }
203
- }
204
-
205
- impl PartialOrd for OrdLogRecord {
206
- fn partial_cmp ( & self , other : & Self ) -> Option < Ordering > {
207
- Some ( self . cmp ( other) )
208
- }
209
- }
210
-
211
- impl PartialEq for OrdLogRecord {
212
- fn eq ( & self , other : & Self ) -> bool {
213
- self . 0 . timestamp_nanos == other. 0 . timestamp_nanos
214
- && self . 0 . service_name == other. 0 . service_name
215
- && self . 0 . body == other. 0 . body
216
- }
217
- }
218
-
219
- impl Eq for OrdLogRecord { }
220
-
221
190
struct ParsedLogRecords {
222
191
doc_batch : DocBatchV2 ,
223
192
num_log_records : u64 ,
@@ -301,11 +270,11 @@ impl OtlpGrpcLogsService {
301
270
let num_log_records = log_records. len ( ) as u64 ;
302
271
let mut error_message = String :: new ( ) ;
303
272
304
- let mut doc_batch_builder = JsonDocBatchV2Builder :: default ( ) ;
273
+ let mut doc_batch_builder = JsonDocBatchV2Builder :: with_num_docs ( num_log_records as usize ) ;
305
274
let mut doc_uid_generator = DocUidGenerator :: default ( ) ;
306
275
for log_record in log_records {
307
276
let doc_uid = doc_uid_generator. next_doc_uid ( ) ;
308
- if let Err ( error) = doc_batch_builder. add_doc ( doc_uid, log_record. 0 ) {
277
+ if let Err ( error) = doc_batch_builder. add_doc ( doc_uid, log_record) {
309
278
error ! ( error=?error, "failed to JSON serialize span" ) ;
310
279
error_message = format ! ( "failed to JSON serialize span: {error:?}" ) ;
311
280
num_parse_errors += 1 ;
@@ -393,19 +362,24 @@ impl LogsService for OtlpGrpcLogsService {
393
362
}
394
363
}
395
364
396
- fn parse_otlp_logs (
397
- request : ExportLogsServiceRequest ,
398
- ) -> Result < BTreeSet < OrdLogRecord > , OtlpLogsError > {
399
- let mut log_records = BTreeSet :: new ( ) ;
400
- for resource_log in request. resource_logs {
365
+ fn parse_otlp_logs ( request : ExportLogsServiceRequest ) -> Result < Vec < LogRecord > , OtlpLogsError > {
366
+ let num_log_records = request
367
+ . resource_logs
368
+ . iter ( )
369
+ . flat_map ( |resource_log| resource_log. scope_logs . iter ( ) )
370
+ . map ( |scope_logs| scope_logs. log_records . len ( ) )
371
+ . sum ( ) ;
372
+ let mut log_records = Vec :: with_capacity ( num_log_records) ;
373
+
374
+ for resource_logs in request. resource_logs {
401
375
let mut resource_attributes = extract_attributes (
402
- resource_log
376
+ resource_logs
403
377
. resource
404
378
. clone ( )
405
379
. map ( |rsrc| rsrc. attributes )
406
380
. unwrap_or_default ( ) ,
407
381
) ;
408
- let resource_dropped_attributes_count = resource_log
382
+ let resource_dropped_attributes_count = resource_logs
409
383
. resource
410
384
. map ( |rsrc| rsrc. dropped_attributes_count )
411
385
. unwrap_or ( 0 ) ;
@@ -414,31 +388,31 @@ fn parse_otlp_logs(
414
388
Some ( JsonValue :: String ( value) ) => value. to_string ( ) ,
415
389
_ => "unknown_service" . to_string ( ) ,
416
390
} ;
417
- for scope_log in resource_log . scope_logs {
418
- let scope_name = scope_log
391
+ for scope_logs in resource_logs . scope_logs {
392
+ let scope_name = scope_logs
419
393
. scope
420
394
. as_ref ( )
421
395
. map ( |scope| & scope. name )
422
396
. filter ( |name| !name. is_empty ( ) ) ;
423
- let scope_version = scope_log
397
+ let scope_version = scope_logs
424
398
. scope
425
399
. as_ref ( )
426
400
. map ( |scope| & scope. version )
427
401
. filter ( |version| !version. is_empty ( ) ) ;
428
402
let scope_attributes = extract_attributes (
429
- scope_log
403
+ scope_logs
430
404
. scope
431
405
. clone ( )
432
406
. map ( |scope| scope. attributes )
433
407
. unwrap_or_default ( ) ,
434
408
) ;
435
- let scope_dropped_attributes_count = scope_log
409
+ let scope_dropped_attributes_count = scope_logs
436
410
. scope
437
411
. as_ref ( )
438
412
. map ( |scope| scope. dropped_attributes_count )
439
413
. unwrap_or ( 0 ) ;
440
414
441
- for log_record in scope_log . log_records {
415
+ for log_record in scope_logs . log_records {
442
416
let observed_timestamp_nanos = if log_record. observed_time_unix_nano == 0 {
443
417
// As per OTEL model spec, this field SHOULD be set once the
444
418
// event is observed by OpenTelemetry. If it's not set, we
@@ -503,7 +477,7 @@ fn parse_otlp_logs(
503
477
scope_attributes : scope_attributes. clone ( ) ,
504
478
scope_dropped_attributes_count,
505
479
} ;
506
- log_records. insert ( OrdLogRecord ( log_record) ) ;
480
+ log_records. push ( log_record) ;
507
481
}
508
482
}
509
483
}
@@ -512,15 +486,15 @@ fn parse_otlp_logs(
512
486
513
487
/// An iterator of JSON OTLP log records for use in the doc processor.
514
488
pub struct JsonLogIterator {
515
- logs : btree_set :: IntoIter < OrdLogRecord > ,
489
+ logs : std :: vec :: IntoIter < LogRecord > ,
516
490
current_log_idx : usize ,
517
491
num_logs : usize ,
518
492
avg_log_size : usize ,
519
493
avg_log_size_rem : usize ,
520
494
}
521
495
522
496
impl JsonLogIterator {
523
- fn new ( logs : BTreeSet < OrdLogRecord > , num_bytes : usize ) -> Self {
497
+ fn new ( logs : Vec < LogRecord > , num_bytes : usize ) -> Self {
524
498
let num_logs = logs. len ( ) ;
525
499
let avg_log_size = num_bytes. checked_div ( num_logs) . unwrap_or ( 0 ) ;
526
500
let avg_log_size_rem = avg_log_size + num_bytes. checked_rem ( num_logs) . unwrap_or ( 0 ) ;
@@ -539,9 +513,10 @@ impl Iterator for JsonLogIterator {
539
513
type Item = ( JsonValue , usize ) ;
540
514
541
515
fn next ( & mut self ) -> Option < Self :: Item > {
542
- let log_opt = self . logs . next ( ) . map ( |OrdLogRecord ( log) | {
543
- serde_json:: to_value ( log) . expect ( "`LogRecord` should be JSON serializable" )
544
- } ) ;
516
+ let log_opt = self
517
+ . logs
518
+ . next ( )
519
+ . map ( |log| serde_json:: to_value ( log) . expect ( "`LogRecord` should be JSON serializable" ) ) ;
545
520
if log_opt. is_some ( ) {
546
521
self . current_log_idx += 1 ;
547
522
}
0 commit comments