Skip to content

Commit afcc711

Browse files
authored
fix(sdk-trace-base): always wait on pending export in SimpleSpanProcessor (open-telemetry#5303)
1 parent fcd96de commit afcc711

File tree

3 files changed

+57
-42
lines changed

3 files changed

+57
-42
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,7 @@ For semantic convention package changes, see the [semconv CHANGELOG](packages/se
104104

105105
* fix(exporter-zipkin): remove usages of deprecated `url.parse` from `node:url` [#5390](https://github.com/open-telemetry/opentelemetry-js/pull/5390) @chancancode
106106
* fix(sdk-metrics): do not export from `PeriodicExportingMetricReader` when there are no metrics to export. [#5288](https://github.com/open-telemetry/opentelemetry-js/pull/5288) @jacksonweber
107+
* fix(sdk-trace-base): always wait on pending export in SimpleSpanProcessor. [#5303](https://github.com/open-telemetry/opentelemetry-js/pull/5303) @anuraaga
107108

108109
### :books: (Refine Doc)
109110

packages/opentelemetry-sdk-trace-base/src/export/SimpleSpanProcessor.ts

Lines changed: 20 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ import {
2020
ExportResultCode,
2121
globalErrorHandler,
2222
BindOnceFuture,
23-
ExportResult,
2423
} from '@opentelemetry/core';
2524
import { Span } from '../Span';
2625
import { SpanProcessor } from '../SpanProcessor';
@@ -38,16 +37,15 @@ import { Resource } from '@opentelemetry/resources';
3837
*/
3938
export class SimpleSpanProcessor implements SpanProcessor {
4039
private _shutdownOnce: BindOnceFuture<void>;
41-
private _unresolvedExports: Set<Promise<void>>;
40+
private _pendingExports: Set<Promise<void>>;
4241

4342
constructor(private readonly _exporter: SpanExporter) {
4443
this._shutdownOnce = new BindOnceFuture(this._shutdown, this);
45-
this._unresolvedExports = new Set<Promise<void>>();
44+
this._pendingExports = new Set<Promise<void>>();
4645
}
4746

4847
async forceFlush(): Promise<void> {
49-
// await unresolved resources before resolving
50-
await Promise.all(Array.from(this._unresolvedExports));
48+
await Promise.all(Array.from(this._pendingExports));
5149
if (this._exporter.forceFlush) {
5250
await this._exporter.forceFlush();
5351
}
@@ -64,43 +62,26 @@ export class SimpleSpanProcessor implements SpanProcessor {
6462
return;
6563
}
6664

67-
const doExport = () =>
68-
internal
69-
._export(this._exporter, [span])
70-
.then((result: ExportResult) => {
71-
if (result.code !== ExportResultCode.SUCCESS) {
72-
globalErrorHandler(
73-
result.error ??
74-
new Error(
75-
`SimpleSpanProcessor: span export failed (status ${result})`
76-
)
77-
);
78-
}
79-
})
80-
.catch(error => {
81-
globalErrorHandler(error);
82-
});
65+
const pendingExport = this._doExport(span).catch(err =>
66+
globalErrorHandler(err)
67+
);
68+
// Enqueue this export to the pending list so it can be flushed by the user.
69+
this._pendingExports.add(pendingExport);
70+
pendingExport.finally(() => this._pendingExports.delete(pendingExport));
71+
}
8372

84-
// Avoid scheduling a promise to make the behavior more predictable and easier to test
73+
private async _doExport(span: ReadableSpan): Promise<void> {
8574
if (span.resource.asyncAttributesPending) {
86-
const exportPromise = (span.resource as Resource)
87-
.waitForAsyncAttributes?.()
88-
.then(
89-
() => {
90-
if (exportPromise != null) {
91-
this._unresolvedExports.delete(exportPromise);
92-
}
93-
return doExport();
94-
},
95-
err => globalErrorHandler(err)
96-
);
75+
// Ensure resource is fully resolved before exporting.
76+
await (span.resource as Resource).waitForAsyncAttributes?.();
77+
}
9778

98-
// store the unresolved exports
99-
if (exportPromise != null) {
100-
this._unresolvedExports.add(exportPromise);
101-
}
102-
} else {
103-
void doExport();
79+
const result = await internal._export(this._exporter, [span]);
80+
if (result.code !== ExportResultCode.SUCCESS) {
81+
throw (
82+
result.error ??
83+
new Error(`SimpleSpanProcessor: span export failed (status ${result})`)
84+
);
10485
}
10586
}
10687

packages/opentelemetry-sdk-trace-base/test/common/export/SimpleSpanProcessor.test.ts

Lines changed: 36 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -217,7 +217,40 @@ describe('SimpleSpanProcessor', () => {
217217
);
218218
});
219219

220-
it('should await doExport() and delete from _unresolvedExports', async () => {
220+
it('should await doExport() and delete from _pendingExports', async () => {
221+
const testExporterWithDelay = new TestExporterWithDelay();
222+
const processor = new SimpleSpanProcessor(testExporterWithDelay);
223+
const spanContext: SpanContext = {
224+
traceId: 'a3cda95b652f4a1592b449d5929fda1b',
225+
spanId: '5e0c63257de34c92',
226+
traceFlags: TraceFlags.SAMPLED,
227+
};
228+
const tracer = provider.getTracer('default') as Tracer;
229+
const span = new SpanImpl({
230+
scope: tracer.instrumentationScope,
231+
resource: tracer['_resource'],
232+
context: ROOT_CONTEXT,
233+
spanContext,
234+
name: 'span-name',
235+
kind: SpanKind.CLIENT,
236+
spanLimits: tracer.getSpanLimits(),
237+
spanProcessor: tracer['_spanProcessor'],
238+
});
239+
processor.onStart(span, ROOT_CONTEXT);
240+
processor.onEnd(span);
241+
242+
assert.strictEqual(processor['_pendingExports'].size, 1);
243+
244+
await processor.forceFlush();
245+
246+
assert.strictEqual(processor['_pendingExports'].size, 0);
247+
248+
const exportedSpans = testExporterWithDelay.getFinishedSpans();
249+
250+
assert.strictEqual(exportedSpans.length, 1);
251+
});
252+
253+
it('should await doExport() and delete from _pendingExports with async resource', async () => {
221254
const testExporterWithDelay = new TestExporterWithDelay();
222255
const processor = new SimpleSpanProcessor(testExporterWithDelay);
223256

@@ -249,11 +282,11 @@ describe('SimpleSpanProcessor', () => {
249282
processor.onStart(span, ROOT_CONTEXT);
250283
processor.onEnd(span);
251284

252-
assert.strictEqual(processor['_unresolvedExports'].size, 1);
285+
assert.strictEqual(processor['_pendingExports'].size, 1);
253286

254287
await processor.forceFlush();
255288

256-
assert.strictEqual(processor['_unresolvedExports'].size, 0);
289+
assert.strictEqual(processor['_pendingExports'].size, 0);
257290

258291
const exportedSpans = testExporterWithDelay.getFinishedSpans();
259292

0 commit comments

Comments
 (0)