Skip to content

Commit e88ca9d

Browse files
authored
fix(instrumentation-kafkajs): fix instr to work with [email protected] and earlier (#2787)
The tests broke on [email protected] and earlier. The instrumentation crashed on [email protected] and earlier. Refs: #2784 (comment) Fixes: #2784
1 parent b6d6d94 commit e88ca9d

File tree

4 files changed

+18
-20
lines changed

4 files changed

+18
-20
lines changed

.github/component-label-map.yml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,11 @@ pkg:instrumentation-ioredis:
131131
- plugins/node/opentelemetry-instrumentation-ioredis/**
132132
- packages/opentelemetry-test-utils/**
133133
- packages/opentelemetry-redis-common/**
134+
pkg:instrumentation-kafkajs:
135+
- changed-files:
136+
- any-glob-to-any-file:
137+
- plugins/node/instrumentation-kafkajs/**
138+
- packages/opentelemetry-test-utils/**
134139
pkg:instrumentation-knex:
135140
- changed-files:
136141
- any-glob-to-any-file:

plugins/node/instrumentation-kafkajs/README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ This package uses `@opentelemetry/semantic-conventions` version `1.30+`, which i
6969
| --------------------- | ------------------------------------- | ------------------------------------------------------------ |
7070
| Consumer | `messaging.process.duration` | Duration of processing operation. [1] |
7171
| Consumer | `messaging.client.consumed.messages` | Number of messages that were delivered to the application. |
72-
| Consumer and Producer | `messaging.client.operation.duration` | Number of messages that were delivered to the application. |
72+
| Consumer and Producer | `messaging.client.operation.duration` | Number of messages that were delivered to the application. (Only emitted for [email protected] and later.) |
7373
| Producer | `messaging.client.sent.messages` | Number of messages producer attempted to send to the broker. |
7474

7575
**[1] `messaging.process.duration`:** In the context of `eachBatch`, this metric will be emitted once for each message but the value reflects the duration of the entire batch.

plugins/node/instrumentation-kafkajs/src/instrumentation.ts

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -251,10 +251,13 @@ export class KafkaJsInstrumentation extends InstrumentationBase<KafkaJsInstrumen
251251
private _setKafkaEventListeners(kafkaObj: KafkaEventEmitter) {
252252
if (kafkaObj[EVENT_LISTENERS_SET]) return;
253253

254-
kafkaObj.on(
255-
kafkaObj.events.REQUEST,
256-
this._recordClientDurationMetric.bind(this)
257-
);
254+
// The REQUEST Consumer event was added in [email protected].
255+
if (kafkaObj.events?.REQUEST) {
256+
kafkaObj.on(
257+
kafkaObj.events.REQUEST,
258+
this._recordClientDurationMetric.bind(this)
259+
);
260+
}
258261

259262
kafkaObj[EVENT_LISTENERS_SET] = true;
260263
}

plugins/node/instrumentation-kafkajs/test/kafkajs.test.ts

Lines changed: 5 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -220,9 +220,7 @@ describe('instrumentation-kafkajs', () => {
220220
);
221221
instrumentation.disable();
222222
instrumentation.enable();
223-
producer = kafka.producer({
224-
createPartitioner: kafkajs.Partitioners.LegacyPartitioner,
225-
});
223+
producer = kafka.producer();
226224
}
227225
beforeEach(() => {
228226
initializeProducer();
@@ -479,9 +477,7 @@ describe('instrumentation-kafkajs', () => {
479477
});
480478
instrumentation.disable();
481479
instrumentation.enable();
482-
producer = kafka.producer({
483-
createPartitioner: kafkajs.Partitioners.LegacyPartitioner,
484-
});
480+
producer = kafka.producer();
485481
});
486482

487483
it('error in send create failed span', async () => {
@@ -634,9 +630,7 @@ describe('instrumentation-kafkajs', () => {
634630
instrumentation.disable();
635631
instrumentation.setConfig(config);
636632
instrumentation.enable();
637-
producer = kafka.producer({
638-
createPartitioner: kafkajs.Partitioners.LegacyPartitioner,
639-
});
633+
producer = kafka.producer();
640634
});
641635

642636
it('producer hook add span attribute with value from message', async () => {
@@ -671,9 +665,7 @@ describe('instrumentation-kafkajs', () => {
671665
instrumentation.disable();
672666
instrumentation.setConfig(config);
673667
instrumentation.enable();
674-
producer = kafka.producer({
675-
createPartitioner: kafkajs.Partitioners.LegacyPartitioner,
676-
});
668+
producer = kafka.producer();
677669
});
678670

679671
it('producer hook add span attribute with value from message', async () => {
@@ -1227,9 +1219,7 @@ describe('instrumentation-kafkajs', () => {
12271219
storeRunConfig();
12281220
instrumentation.disable();
12291221
instrumentation.enable();
1230-
producer = kafka.producer({
1231-
createPartitioner: kafkajs.Partitioners.LegacyPartitioner,
1232-
});
1222+
producer = kafka.producer();
12331223
consumer = kafka.consumer({ groupId: 'testing-group-id' });
12341224
});
12351225

0 commit comments

Comments
 (0)