Skip to content

Commit fb6be52

Browse files
authored
Remove deduplication of logs and traces by service name and timestamp (#5853)
1 parent 8b21de3 commit fb6be52

File tree

3 files changed

+58
-107
lines changed

3 files changed

+58
-107
lines changed

quickwit/quickwit-ingest/src/ingest_v2/mod.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -174,6 +174,14 @@ impl JsonDocBatchV2Builder {
174174
doc_lengths: self.doc_lengths,
175175
}
176176
}
177+
178+
pub fn with_num_docs(num_docs: usize) -> Self {
179+
Self {
180+
doc_uids: Vec::with_capacity(num_docs),
181+
doc_lengths: Vec::with_capacity(num_docs),
182+
..Default::default()
183+
}
184+
}
177185
}
178186

179187
/// Helper struct to build an [`IngestRequestV2`].

quickwit/quickwit-opentelemetry/src/otlp/logs.rs

Lines changed: 28 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,7 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15-
use std::cmp::{Ord, Ordering, PartialEq, PartialOrd};
16-
use std::collections::{BTreeSet, HashMap, btree_set};
15+
use std::collections::HashMap;
1716

1817
use async_trait::async_trait;
1918
use prost::Message;
@@ -188,36 +187,6 @@ pub struct LogRecord {
188187
pub scope_dropped_attributes_count: u32,
189188
}
190189

191-
/// A wrapper around `LogRecord` that implements `Ord` to allow insertion of log records into a
192-
/// `BTreeSet`.
193-
#[derive(Debug)]
194-
struct OrdLogRecord(LogRecord);
195-
196-
impl Ord for OrdLogRecord {
197-
fn cmp(&self, other: &Self) -> Ordering {
198-
self.0
199-
.service_name
200-
.cmp(&other.0.service_name)
201-
.then(self.0.timestamp_nanos.cmp(&other.0.timestamp_nanos))
202-
}
203-
}
204-
205-
impl PartialOrd for OrdLogRecord {
206-
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
207-
Some(self.cmp(other))
208-
}
209-
}
210-
211-
impl PartialEq for OrdLogRecord {
212-
fn eq(&self, other: &Self) -> bool {
213-
self.0.timestamp_nanos == other.0.timestamp_nanos
214-
&& self.0.service_name == other.0.service_name
215-
&& self.0.body == other.0.body
216-
}
217-
}
218-
219-
impl Eq for OrdLogRecord {}
220-
221190
struct ParsedLogRecords {
222191
doc_batch: DocBatchV2,
223192
num_log_records: u64,
@@ -301,11 +270,11 @@ impl OtlpGrpcLogsService {
301270
let num_log_records = log_records.len() as u64;
302271
let mut error_message = String::new();
303272

304-
let mut doc_batch_builder = JsonDocBatchV2Builder::default();
273+
let mut doc_batch_builder = JsonDocBatchV2Builder::with_num_docs(num_log_records as usize);
305274
let mut doc_uid_generator = DocUidGenerator::default();
306275
for log_record in log_records {
307276
let doc_uid = doc_uid_generator.next_doc_uid();
308-
if let Err(error) = doc_batch_builder.add_doc(doc_uid, log_record.0) {
277+
if let Err(error) = doc_batch_builder.add_doc(doc_uid, log_record) {
309278
error!(error=?error, "failed to JSON serialize span");
310279
error_message = format!("failed to JSON serialize span: {error:?}");
311280
num_parse_errors += 1;
@@ -393,19 +362,24 @@ impl LogsService for OtlpGrpcLogsService {
393362
}
394363
}
395364

396-
fn parse_otlp_logs(
397-
request: ExportLogsServiceRequest,
398-
) -> Result<BTreeSet<OrdLogRecord>, OtlpLogsError> {
399-
let mut log_records = BTreeSet::new();
400-
for resource_log in request.resource_logs {
365+
fn parse_otlp_logs(request: ExportLogsServiceRequest) -> Result<Vec<LogRecord>, OtlpLogsError> {
366+
let num_log_records = request
367+
.resource_logs
368+
.iter()
369+
.flat_map(|resource_log| resource_log.scope_logs.iter())
370+
.map(|scope_logs| scope_logs.log_records.len())
371+
.sum();
372+
let mut log_records = Vec::with_capacity(num_log_records);
373+
374+
for resource_logs in request.resource_logs {
401375
let mut resource_attributes = extract_attributes(
402-
resource_log
376+
resource_logs
403377
.resource
404378
.clone()
405379
.map(|rsrc| rsrc.attributes)
406380
.unwrap_or_default(),
407381
);
408-
let resource_dropped_attributes_count = resource_log
382+
let resource_dropped_attributes_count = resource_logs
409383
.resource
410384
.map(|rsrc| rsrc.dropped_attributes_count)
411385
.unwrap_or(0);
@@ -414,31 +388,31 @@ fn parse_otlp_logs(
414388
Some(JsonValue::String(value)) => value.to_string(),
415389
_ => "unknown_service".to_string(),
416390
};
417-
for scope_log in resource_log.scope_logs {
418-
let scope_name = scope_log
391+
for scope_logs in resource_logs.scope_logs {
392+
let scope_name = scope_logs
419393
.scope
420394
.as_ref()
421395
.map(|scope| &scope.name)
422396
.filter(|name| !name.is_empty());
423-
let scope_version = scope_log
397+
let scope_version = scope_logs
424398
.scope
425399
.as_ref()
426400
.map(|scope| &scope.version)
427401
.filter(|version| !version.is_empty());
428402
let scope_attributes = extract_attributes(
429-
scope_log
403+
scope_logs
430404
.scope
431405
.clone()
432406
.map(|scope| scope.attributes)
433407
.unwrap_or_default(),
434408
);
435-
let scope_dropped_attributes_count = scope_log
409+
let scope_dropped_attributes_count = scope_logs
436410
.scope
437411
.as_ref()
438412
.map(|scope| scope.dropped_attributes_count)
439413
.unwrap_or(0);
440414

441-
for log_record in scope_log.log_records {
415+
for log_record in scope_logs.log_records {
442416
let observed_timestamp_nanos = if log_record.observed_time_unix_nano == 0 {
443417
// As per OTEL model spec, this field SHOULD be set once the
444418
// event is observed by OpenTelemetry. If it's not set, we
@@ -503,7 +477,7 @@ fn parse_otlp_logs(
503477
scope_attributes: scope_attributes.clone(),
504478
scope_dropped_attributes_count,
505479
};
506-
log_records.insert(OrdLogRecord(log_record));
480+
log_records.push(log_record);
507481
}
508482
}
509483
}
@@ -512,15 +486,15 @@ fn parse_otlp_logs(
512486

513487
/// An iterator of JSON OTLP log records for use in the doc processor.
514488
pub struct JsonLogIterator {
515-
logs: btree_set::IntoIter<OrdLogRecord>,
489+
logs: std::vec::IntoIter<LogRecord>,
516490
current_log_idx: usize,
517491
num_logs: usize,
518492
avg_log_size: usize,
519493
avg_log_size_rem: usize,
520494
}
521495

522496
impl JsonLogIterator {
523-
fn new(logs: BTreeSet<OrdLogRecord>, num_bytes: usize) -> Self {
497+
fn new(logs: Vec<LogRecord>, num_bytes: usize) -> Self {
524498
let num_logs = logs.len();
525499
let avg_log_size = num_bytes.checked_div(num_logs).unwrap_or(0);
526500
let avg_log_size_rem = avg_log_size + num_bytes.checked_rem(num_logs).unwrap_or(0);
@@ -539,9 +513,10 @@ impl Iterator for JsonLogIterator {
539513
type Item = (JsonValue, usize);
540514

541515
fn next(&mut self) -> Option<Self::Item> {
542-
let log_opt = self.logs.next().map(|OrdLogRecord(log)| {
543-
serde_json::to_value(log).expect("`LogRecord` should be JSON serializable")
544-
});
516+
let log_opt = self
517+
.logs
518+
.next()
519+
.map(|log| serde_json::to_value(log).expect("`LogRecord` should be JSON serializable"));
545520
if log_opt.is_some() {
546521
self.current_log_idx += 1;
547522
}

quickwit/quickwit-opentelemetry/src/otlp/traces.rs

Lines changed: 22 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,8 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15-
use std::cmp::{Ord, Ordering, PartialEq, PartialOrd};
16-
use std::collections::{BTreeSet, HashMap, btree_set};
15+
use std::cmp::PartialEq;
16+
use std::collections::HashMap;
1717
use std::str::FromStr;
1818

1919
use async_trait::async_trait;
@@ -323,43 +323,6 @@ impl Span {
323323
}
324324
}
325325

326-
/// A wrapper around `Span` that implements `Ord` to allow insertion of spans into a `BTreeSet`.
327-
#[derive(Debug)]
328-
struct OrdSpan(Span);
329-
330-
impl Ord for OrdSpan {
331-
/// Sort spans by trace ID, span name, start timestamp, and span ID in an attempt to group the
332-
/// spans by trace ID and span name in the same docstore blocks. At some point, the
333-
/// cost–benefit of this approach should be evaluated via a benchmark and revisited if
334-
/// necessary.
335-
fn cmp(&self, other: &Self) -> Ordering {
336-
self.0
337-
.trace_id
338-
.cmp(&other.0.trace_id)
339-
.then(self.0.span_name.cmp(&other.0.span_name))
340-
.then(
341-
self.0
342-
.span_start_timestamp_nanos
343-
.cmp(&other.0.span_start_timestamp_nanos),
344-
)
345-
.then(self.0.span_id.cmp(&other.0.span_id))
346-
}
347-
}
348-
349-
impl PartialOrd for OrdSpan {
350-
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
351-
Some(self.cmp(other))
352-
}
353-
}
354-
355-
impl PartialEq for OrdSpan {
356-
fn eq(&self, other: &Self) -> bool {
357-
self.cmp(other) == Ordering::Equal
358-
}
359-
}
360-
361-
impl Eq for OrdSpan {}
362-
363326
#[derive(Debug, Clone)]
364327
pub struct SpanKind(i32);
365328

@@ -650,10 +613,14 @@ impl Link {
650613
}
651614
}
652615

653-
fn parse_otlp_spans(
654-
request: ExportTraceServiceRequest,
655-
) -> Result<BTreeSet<OrdSpan>, OtlpTracesError> {
656-
let mut spans = BTreeSet::new();
616+
fn parse_otlp_spans(request: ExportTraceServiceRequest) -> Result<Vec<Span>, OtlpTracesError> {
617+
let num_spans = request
618+
.resource_spans
619+
.iter()
620+
.flat_map(|resource_spans| resource_spans.scope_spans.iter())
621+
.map(|scope_spans| scope_spans.spans.len())
622+
.sum();
623+
let mut spans = Vec::with_capacity(num_spans);
657624

658625
for resource_spans in request.resource_spans {
659626
let resource = resource_spans
@@ -664,7 +631,7 @@ fn parse_otlp_spans(
664631
let scope = scope_spans.scope.map(Scope::from_otlp).unwrap_or_default();
665632
for span in scope_spans.spans {
666633
let span = Span::from_otlp(span, &resource, &scope)?;
667-
spans.insert(OrdSpan(span));
634+
spans.push(span);
668635
}
669636
}
670637
}
@@ -765,11 +732,11 @@ impl OtlpGrpcTracesService {
765732
let mut num_parse_errors = 0;
766733
let mut error_message = String::new();
767734

768-
let mut doc_batch_builder = JsonDocBatchV2Builder::default();
735+
let mut doc_batch_builder = JsonDocBatchV2Builder::with_num_docs(num_spans as usize);
769736
let mut doc_uid_generator = DocUidGenerator::default();
770737
for span in spans {
771738
let doc_uid = doc_uid_generator.next_doc_uid();
772-
if let Err(error) = doc_batch_builder.add_doc(doc_uid, span.0) {
739+
if let Err(error) = doc_batch_builder.add_doc(doc_uid, span) {
773740
error!(error=?error, "failed to JSON serialize span");
774741
error_message = format!("failed to JSON serialize span: {error:?}");
775742
num_parse_errors += 1;
@@ -859,15 +826,15 @@ impl TraceService for OtlpGrpcTracesService {
859826

860827
/// An iterator of JSON OTLP spans for use in the doc processor.
861828
pub struct JsonSpanIterator {
862-
spans: btree_set::IntoIter<OrdSpan>,
829+
spans: std::vec::IntoIter<Span>,
863830
current_span_idx: usize,
864831
num_spans: usize,
865832
avg_span_size: usize,
866833
avg_span_size_rem: usize,
867834
}
868835

869836
impl JsonSpanIterator {
870-
fn new(spans: BTreeSet<OrdSpan>, num_bytes: usize) -> Self {
837+
fn new(spans: Vec<Span>, num_bytes: usize) -> Self {
871838
let num_spans = spans.len();
872839
let avg_span_size = num_bytes.checked_div(num_spans).unwrap_or(0);
873840
let avg_span_size_rem = avg_span_size + num_bytes.checked_rem(num_spans).unwrap_or(0);
@@ -886,9 +853,10 @@ impl Iterator for JsonSpanIterator {
886853
type Item = (JsonValue, usize);
887854

888855
fn next(&mut self) -> Option<Self::Item> {
889-
let span_opt = self.spans.next().map(|OrdSpan(span)| {
890-
serde_json::to_value(span).expect("`Span` should be JSON serializable")
891-
});
856+
let span_opt = self
857+
.spans
858+
.next()
859+
.map(|span| serde_json::to_value(span).expect("`Span` should be JSON serializable"));
892860
if span_opt.is_some() {
893861
self.current_span_idx += 1;
894862
}
@@ -1348,7 +1316,7 @@ mod tests {
13481316

13491317
#[test]
13501318
fn test_json_span_iterator() {
1351-
let mut json_span_iterator = JsonSpanIterator::new(BTreeSet::new(), 0);
1319+
let mut json_span_iterator = JsonSpanIterator::new(Vec::new(), 0);
13521320
assert!(json_span_iterator.next().is_none());
13531321

13541322
let span_0 = Span {
@@ -1384,7 +1352,7 @@ mod tests {
13841352
links: Vec::new(),
13851353
};
13861354

1387-
let spans = BTreeSet::from_iter([OrdSpan(span_0.clone())]);
1355+
let spans = vec![span_0.clone()];
13881356
let mut json_span_iterator = JsonSpanIterator::new(spans, 3);
13891357

13901358
assert_eq!(
@@ -1396,7 +1364,7 @@ mod tests {
13961364
let mut span_1 = span_0.clone();
13971365
span_1.span_id = SpanId::new([3; 8]);
13981366

1399-
let spans = BTreeSet::from_iter([OrdSpan(span_0.clone()), OrdSpan(span_1.clone())]);
1367+
let spans = vec![span_0.clone(), span_1.clone()];
14001368
let mut json_span_iterator = JsonSpanIterator::new(spans, 7);
14011369

14021370
assert_eq!(

0 commit comments

Comments
 (0)