Skip to content

Commit 6642228

Browse files
committed
Fixing the dev & shared queue consumer telemetry so we can see what’s going on
1 parent d39145d commit 6642228

File tree

5 files changed

+57
-48
lines changed

5 files changed

+57
-48
lines changed

apps/webapp/app/env.server.ts

+1-1
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,7 @@ const EnvironmentSchema = z.object({
137137
INTERNAL_OTEL_TRACE_EXPORTER_AUTH_HEADER_VALUE: z.string().optional(),
138138
INTERNAL_OTEL_TRACE_LOGGING_ENABLED: z.string().default("1"),
139139
// this means 1/20 traces or 5% of traces will be sampled (sampled = recorded)
140-
INTERNAL_OTEL_TRACE_SAMPING_RATE: z.string().default("20"),
140+
INTERNAL_OTEL_TRACE_SAMPLING_RATE: z.string().default("20"),
141141
INTERNAL_OTEL_TRACE_INSTRUMENT_PRISMA_ENABLED: z.string().default("0"),
142142
});
143143

apps/webapp/app/v3/marqs/devQueueConsumer.server.ts

+15-3
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,14 @@ import { generateFriendlyId } from "../friendlyIdentifiers";
1616
import { marqs } from "~/v3/marqs/index.server";
1717
import { CancelAttemptService } from "../services/cancelAttempt.server";
1818
import { CompleteAttemptService } from "../services/completeAttempt.server";
19-
import { attributesFromAuthenticatedEnv } from "../tracer.server";
19+
import {
20+
SEMINTATTRS_FORCE_RECORDING,
21+
attributesFromAuthenticatedEnv,
22+
tracer,
23+
} from "../tracer.server";
2024
import { DevSubscriber, devPubSub } from "./devPubSub.server";
2125
import { CancelTaskRunService } from "../services/cancelTaskRun.server";
2226

23-
const tracer = trace.getTracer("devQueueConsumer");
24-
2527
const MessageBody = z.discriminatedUnion("type", [
2628
z.object({
2729
type: z.literal("EXECUTE"),
@@ -165,6 +167,11 @@ export class DevQueueConsumer {
165167

166168
logger.debug("Unsubscribed from background worker channel", { id });
167169
}
170+
171+
// We need to end the current span
172+
if (this._currentSpan) {
173+
this._currentSpan.end();
174+
}
168175
}
169176

170177
async #cancelInProgressRunsAndAttempts(reason: string) {
@@ -284,6 +291,10 @@ export class DevQueueConsumer {
284291
this._currentSpan.setAttribute("tasks.period.failures", this._taskFailures);
285292
this._currentSpan.setAttribute("tasks.period.successes", this._taskSuccesses);
286293

294+
logger.debug("Ending DevQueueConsumer.doWork() trace", {
295+
isRecording: this._currentSpan.isRecording(),
296+
});
297+
287298
this._currentSpan.end();
288299
}
289300

@@ -294,6 +305,7 @@ export class DevQueueConsumer {
294305
kind: SpanKind.CONSUMER,
295306
attributes: {
296307
...attributesFromAuthenticatedEnv(this.env),
308+
[SEMINTATTRS_FORCE_RECORDING]: true,
297309
},
298310
},
299311
ROOT_CONTEXT

apps/webapp/app/v3/marqs/index.server.ts

+7-23
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,7 @@ export class MarQS {
139139
public async dequeueMessageInEnv(env: AuthenticatedEnvironment) {
140140
return this.#trace(
141141
"dequeueMessageInEnv",
142-
async (span, abort) => {
142+
async (span) => {
143143
const parentQueue = this.keys.envSharedQueueKey(env);
144144

145145
// Read the parent queue for matching queues
@@ -150,7 +150,6 @@ export class MarQS {
150150
);
151151

152152
if (!messageQueue) {
153-
abort();
154153
return;
155154
}
156155

@@ -167,7 +166,6 @@ export class MarQS {
167166
});
168167

169168
if (!messageData) {
170-
abort();
171169
return;
172170
}
173171

@@ -181,8 +179,6 @@ export class MarQS {
181179
[SemanticAttributes.CONCURRENCY_KEY]: message.concurrencyKey,
182180
[SemanticAttributes.PARENT_QUEUE]: message.parentQueue,
183181
});
184-
} else {
185-
abort();
186182
}
187183

188184
return message;
@@ -204,7 +200,7 @@ export class MarQS {
204200
public async dequeueMessageInSharedQueue() {
205201
return this.#trace(
206202
"dequeueMessageInSharedQueue",
207-
async (span, abort) => {
203+
async (span) => {
208204
const parentQueue = constants.SHARED_QUEUE;
209205

210206
// Read the parent queue for matching queues
@@ -215,7 +211,6 @@ export class MarQS {
215211
);
216212

217213
if (!messageQueue) {
218-
abort();
219214
return;
220215
}
221216

@@ -233,7 +228,6 @@ export class MarQS {
233228
});
234229

235230
if (!messageData) {
236-
abort();
237231
return;
238232
}
239233

@@ -247,8 +241,6 @@ export class MarQS {
247241
[SemanticAttributes.CONCURRENCY_KEY]: message.concurrencyKey,
248242
[SemanticAttributes.PARENT_QUEUE]: message.parentQueue,
249243
});
250-
} else {
251-
abort();
252244
}
253245

254246
return message;
@@ -355,17 +347,12 @@ export class MarQS {
355347

356348
async #trace<T>(
357349
name: string,
358-
fn: (span: Span, abort: () => void) => Promise<T>,
359-
options?: SpanOptions
350+
fn: (span: Span) => Promise<T>,
351+
options?: SpanOptions & { sampleRate?: number }
360352
): Promise<T> {
361353
return tracer.startActiveSpan(name, options ?? {}, async (span) => {
362-
let _abort = false;
363-
let aborter = () => {
364-
_abort = true;
365-
};
366-
367354
try {
368-
return await fn(span, aborter);
355+
return await fn(span);
369356
} catch (e) {
370357
if (e instanceof Error) {
371358
span.recordException(e);
@@ -375,9 +362,7 @@ export class MarQS {
375362

376363
throw e;
377364
} finally {
378-
if (!_abort) {
379-
span.end();
380-
}
365+
span.end();
381366
}
382367
});
383368
}
@@ -480,7 +465,7 @@ export class MarQS {
480465
) {
481466
return this.#trace(
482467
"getRandomQueueFromParentQueue",
483-
async (span, abort) => {
468+
async (span) => {
484469
const { range, selectionId } = await queuePriorityStrategy.nextCandidateSelection(
485470
parentQueue
486471
);
@@ -497,7 +482,6 @@ export class MarQS {
497482
);
498483

499484
if (typeof choice !== "string") {
500-
abort();
501485
return;
502486
}
503487

apps/webapp/app/v3/marqs/sharedQueueConsumer.server.ts

+10-15
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,4 @@
1-
import {
2-
Context,
3-
ROOT_CONTEXT,
4-
Span,
5-
SpanKind,
6-
context,
7-
propagation,
8-
trace,
9-
} from "@opentelemetry/api";
1+
import { Context, ROOT_CONTEXT, Span, SpanKind, context, trace } from "@opentelemetry/api";
102
import {
113
Machine,
124
ProdTaskRunExecution,
@@ -28,16 +20,15 @@ import {
2820
import { z } from "zod";
2921
import { prisma } from "~/db.server";
3022
import { logger } from "~/services/logger.server";
31-
import { generateFriendlyId } from "../friendlyIdentifiers";
23+
import { singleton } from "~/utils/singleton";
3224
import { marqs } from "~/v3/marqs/index.server";
3325
import { EnvironmentVariablesRepository } from "../environmentVariables/environmentVariablesRepository.server";
34-
import { CancelAttemptService } from "../services/cancelAttempt.server";
26+
import { generateFriendlyId } from "../friendlyIdentifiers";
3527
import { socketIo } from "../handleSocketIo.server";
36-
import { singleton } from "~/utils/singleton";
37-
import { RestoreCheckpointService } from "../services/restoreCheckpoint.server";
3828
import { findCurrentWorkerDeployment } from "../models/workerDeployment.server";
39-
40-
const tracer = trace.getTracer("sharedQueueConsumer");
29+
import { CancelAttemptService } from "../services/cancelAttempt.server";
30+
import { RestoreCheckpointService } from "../services/restoreCheckpoint.server";
31+
import { tracer } from "../tracer.server";
4132

4233
const WithTraceContext = z.object({
4334
traceparent: z.string().optional(),
@@ -154,6 +145,10 @@ export class SharedQueueConsumer {
154145

155146
logger.debug("Stopping shared queue consumer");
156147
this._enabled = false;
148+
149+
if (this._currentSpan) {
150+
this._currentSpan.end();
151+
}
157152
}
158153

159154
async #cancelInProgressAttempts(reason: string) {

apps/webapp/app/v3/tracer.server.ts

+24-6
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@ import { AuthenticatedEnvironment } from "~/services/apiAuth.server";
3030
import { singleton } from "~/utils/singleton";
3131
import { LoggerSpanExporter } from "./telemetry/loggerExporter.server";
3232

33+
export const SEMINTATTRS_FORCE_RECORDING = "forceRecording";
34+
3335
class CustomWebappSampler implements Sampler {
3436
constructor(private readonly _baseSampler: Sampler) {}
3537

@@ -49,8 +51,22 @@ class CustomWebappSampler implements Sampler {
4951
return { decision: SamplingDecision.NOT_RECORD };
5052
}
5153

54+
// If the span has the forceRecording attribute, always record it
55+
if (attributes[SEMINTATTRS_FORCE_RECORDING]) {
56+
return { decision: SamplingDecision.RECORD_AND_SAMPLED };
57+
}
58+
5259
// For all other spans, defer to the base sampler
53-
return this._baseSampler.shouldSample(context, traceId, name, spanKind, attributes, links);
60+
const result = this._baseSampler.shouldSample(
61+
context,
62+
traceId,
63+
name,
64+
spanKind,
65+
attributes,
66+
links
67+
);
68+
69+
return result;
5470
}
5571

5672
toString(): string {
@@ -63,16 +79,16 @@ export const tracer = singleton("tracer", getTracer);
6379
function getTracer() {
6480
diag.setLogger(new DiagConsoleLogger(), DiagLogLevel.ERROR);
6581

66-
const samplingRate = 1.0 / Math.max(parseInt(env.INTERNAL_OTEL_TRACE_SAMPING_RATE, 10), 1);
82+
const samplingRate = 1.0 / Math.max(parseInt(env.INTERNAL_OTEL_TRACE_SAMPLING_RATE, 10), 1);
6783

6884
const provider = new NodeTracerProvider({
6985
forceFlushTimeoutMillis: 500,
7086
resource: new Resource({
7187
[SEMRESATTRS_SERVICE_NAME]: env.SERVICE_NAME,
7288
}),
7389
sampler: new ParentBasedSampler({
74-
root: new CustomWebappSampler(new TraceIdRatioBasedSampler(samplingRate)), // 5% sampling
75-
}), // 5% sampling
90+
root: new CustomWebappSampler(new TraceIdRatioBasedSampler(samplingRate)),
91+
}),
7692
});
7793

7894
if (env.INTERNAL_OTEL_TRACE_EXPORTER_URL) {
@@ -92,13 +108,15 @@ function getTracer() {
92108
provider.addSpanProcessor(
93109
new BatchSpanProcessor(exporter, {
94110
maxExportBatchSize: 512,
95-
scheduledDelayMillis: 200,
111+
scheduledDelayMillis: 1000,
96112
exportTimeoutMillis: 30000,
97113
maxQueueSize: 2048,
98114
})
99115
);
100116

101-
console.log(`🔦 Tracer: OTLP exporter enabled to ${env.INTERNAL_OTEL_TRACE_EXPORTER_URL}`);
117+
console.log(
118+
`🔦 Tracer: OTLP exporter enabled to ${env.INTERNAL_OTEL_TRACE_EXPORTER_URL} (sampling = ${samplingRate})`
119+
);
102120
} else {
103121
if (env.INTERNAL_OTEL_TRACE_LOGGING_ENABLED === "1") {
104122
console.log(`🔦 Tracer: Logger exporter enabled`);

0 commit comments

Comments
 (0)