Skip to content

Commit fc13916

Browse files
chore: Added kafkajs instrumentation and versioned tests skeleton (#2224)
Co-authored-by: Bob Evans <[email protected]>
1 parent 89df06a commit fc13916

File tree

10 files changed

+236
-6
lines changed

10 files changed

+236
-6
lines changed

docker-compose.yml

+43-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
version: "3"
22
services:
3+
34
elasticsearch:
45
container_name: nr_node_elastic
56
image: docker.elastic.co/elasticsearch/elasticsearch:8.8.0
@@ -21,11 +22,45 @@ services:
2122
interval: 30s
2223
timeout: 10s
2324
retries: 5
25+
26+
# Kafka setup based on the e2e tests in node-rdkafka. Needs both the
27+
# `zookeeper` and `kafka` services.
28+
zookeeper:
29+
container_name: nr_node_kafka_zookeeper
30+
image: confluentinc/cp-zookeeper
31+
ports:
32+
- '2181:2181'
33+
environment:
34+
ZOOKEEPER_CLIENT_PORT: 2181
35+
ZOOKEEPER_TICK_TIME: 2000
36+
kafka:
37+
container_name: nr_node_kafka
38+
image: confluentinc/cp-kafka
39+
links:
40+
- zookeeper
41+
ports:
42+
- '9092:9092'
43+
healthcheck:
44+
test: /usr/bin/kafka-cluster cluster-id --bootstrap-server localhost:9092 || exit 1
45+
interval: 1s
46+
timeout: 60s
47+
retries: 60
48+
environment:
49+
KAFKA_BROKER_ID: 1
50+
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
51+
KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://localhost:9092'
52+
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
53+
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
54+
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
55+
KAFKA_DEFAULT_REPLICATION_FACTOR: 1
56+
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
57+
2458
memcached:
2559
container_name: nr_node_memcached
2660
image: memcached
2761
ports:
2862
- "11211:11211"
63+
2964
mongodb_3:
3065
container_name: nr_node_mongodb
3166
platform: linux/amd64
@@ -37,6 +72,7 @@ services:
3772
interval: 1s
3873
timeout: 10s
3974
retries: 30
75+
4076
mongodb_5:
4177
container_name: nr_node_mongodb_5
4278
image: library/mongo:5
@@ -47,6 +83,7 @@ services:
4783
interval: 1s
4884
timeout: 10s
4985
retries: 30
86+
5087
mysql:
5188
container_name: nr_node_mysql
5289
platform: linux/amd64
@@ -60,6 +97,7 @@ services:
6097
interval: 1s
6198
timeout: 10s
6299
retries: 30
100+
63101
redis:
64102
container_name: nr_node_redis
65103
image: redis
@@ -70,6 +108,7 @@ services:
70108
interval: 1s
71109
timeout: 10s
72110
retries: 30
111+
73112
cassandra:
74113
container_name: nr_node_cassandra
75114
platform: linux/amd64
@@ -80,13 +119,15 @@ services:
80119
test: [ "CMD", "cqlsh", "-u cassandra", "-p cassandra"]
81120
interval: 5s
82121
timeout: 10s
83-
retries: 6
122+
retries: 6
123+
84124
# pg 9.2 has built in healthcheck
85125
pg:
86126
container_name: nr_node_postgres
87127
image: postgres:9.2
88128
ports:
89129
- "5432:5432"
130+
90131
pg_prisma:
91132
container_name: nr_node_postgres_prisma
92133
image: postgres:15
@@ -100,6 +141,7 @@ services:
100141
interval: 1s
101142
timeout: 10s
102143
retries: 30
144+
103145
rmq:
104146
container_name: nr_node_rabbit
105147
image: rabbitmq:3

lib/instrumentation/kafkajs.js

+11
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
/*
2+
* Copyright 2024 New Relic Corporation. All rights reserved.
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
'use strict'
7+
8+
// eslint-disable-next-line no-unused-vars
9+
module.exports = function initialize(_agent, kafkajs, _moduleName, shim) {
10+
// Put instrumentation code here for kafkajs.Kafka.producer and kafkajs.Kafka.consumer
11+
}

lib/instrumentations.js

+1
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ module.exports = function instrumentations() {
2929
'fastify': { type: InstrumentationDescriptor.TYPE_WEB_FRAMEWORK },
3030
'generic-pool': { type: InstrumentationDescriptor.TYPE_GENERIC },
3131
'ioredis': { type: InstrumentationDescriptor.TYPE_DATASTORE },
32+
'kafkajs': { type: InstrumentationDescriptor.TYPE_MESSAGE },
3233
'koa': { module: './instrumentation/koa' },
3334
'langchain': { module: './instrumentation/langchain' },
3435
'memcached': { type: InstrumentationDescriptor.TYPE_DATASTORE },

package.json

+1
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,7 @@
164164
"public-docs": "jsdoc -c ./jsdoc-conf.jsonc && cp examples/shim/*.png out/",
165165
"publish-docs": "./bin/publish-docs.sh",
166166
"services": "docker compose up -d --wait",
167+
"services:stop": "docker compose down",
167168
"smoke": "npm run ssl && time tap test/smoke/**/**/*.tap.js --timeout=180 --no-coverage",
168169
"ssl": "./bin/ssl.sh",
169170
"sub-install": "node test/bin/install_sub_deps",

test/lib/cache-buster.js

-5
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,6 @@
55

66
'use strict'
77

8-
/**
9-
* Utility method to remove a set of modules from the require cache.
10-
*
11-
* @param {string[]} modules The set of module names to remove from the cache.
12-
*/
138
module.exports = {
149
/**
1510
* Removes explicitly named modules from the require cache.

test/lib/params.js

+3
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,9 @@
66
'use strict'
77

88
module.exports = {
9+
kafka_host: process.env.NR_NODE_TEST_KAFKA_HOST || '127.0.0.1',
10+
kafka_port: process.env.NR_NODE_TEST_KAFKA_PORT || 9092,
11+
912
memcached_host: process.env.NR_NODE_TEST_MEMCACHED_HOST || 'localhost',
1013
memcached_port: process.env.NR_NODE_TEST_MEMCACHED_PORT || 11211,
1114

test/versioned/kafkajs/kafka.tap.js

+66
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
/*
2+
* Copyright 2024 New Relic Corporation. All rights reserved.
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
'use strict'
7+
8+
const tap = require('tap')
9+
const helper = require('../../lib/agent_helper')
10+
const params = require('../../lib/params')
11+
const { removeModules } = require('../../lib/cache-buster')
12+
const utils = require('./utils')
13+
14+
const broker = `${params.kafka_host}:${params.kafka_port}`
15+
16+
tap.beforeEach(async (t) => {
17+
t.context.agent = helper.instrumentMockedAgent()
18+
19+
const { Kafka, logLevel } = require('kafkajs')
20+
t.context.Kafka = Kafka
21+
const topic = utils.randomTopic()
22+
t.context.topic = topic
23+
24+
const kafka = new Kafka({
25+
clientId: 'kafka-test',
26+
brokers: [broker],
27+
logLevel: logLevel.NOTHING
28+
})
29+
await utils.createTopic({ topic, kafka })
30+
31+
const producer = kafka.producer()
32+
await producer.connect()
33+
t.context.producer = producer
34+
const consumer = kafka.consumer({ groupId: 'kafka' })
35+
await consumer.connect()
36+
t.context.consumer = consumer
37+
})
38+
39+
tap.afterEach(async (t) => {
40+
helper.unloadAgent(t.context.agent)
41+
removeModules(['kafkajs'])
42+
await t.context.consumer.disconnect()
43+
await t.context.producer.disconnect()
44+
})
45+
46+
tap.test('stub', async (t) => {
47+
const { consumer, producer, topic } = t.context
48+
const message = 'test message'
49+
50+
await consumer.subscribe({ topics: [topic], fromBeginning: true })
51+
const testPromise = new Promise((resolve) => {
52+
consumer.run({
53+
eachMessage: async ({ message: actualMessage }) => {
54+
t.equal(actualMessage.value.toString(), message)
55+
resolve()
56+
}
57+
})
58+
})
59+
await utils.waitForConsumersToJoinGroup({ consumer })
60+
await producer.send({
61+
acks: 1,
62+
topic,
63+
messages: [{ key: 'key', value: message }]
64+
})
65+
await testPromise
66+
})

test/versioned/kafkajs/newrelic.js

+28
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
/*
2+
* Copyright 2021 New Relic Corporation. All rights reserved.
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
'use strict'
7+
8+
exports.config = {
9+
app_name: ['My Application'],
10+
license_key: 'license key here',
11+
logging: {
12+
level: 'trace',
13+
filepath: '../../newrelic_agent.log'
14+
},
15+
utilization: {
16+
detect_aws: false,
17+
detect_pcf: false,
18+
detect_azure: false,
19+
detect_gcp: false,
20+
detect_docker: false
21+
},
22+
distributed_tracing: {
23+
enabled: true
24+
},
25+
transaction_tracer: {
26+
enabled: true
27+
}
28+
}

test/versioned/kafkajs/package.json

+19
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
{
2+
"name": "kafka-tests",
3+
"targets": [{"name":"kafkajs","minAgentVersion":"11.19.0"}],
4+
"version": "0.0.0",
5+
"private": true,
6+
"tests": [
7+
{
8+
"engines": {
9+
"node": ">=16"
10+
},
11+
"dependencies": {
12+
"kafkajs": ">=2.0.0"
13+
},
14+
"files": [
15+
"kafka.tap.js"
16+
]
17+
}
18+
]
19+
}

test/versioned/kafkajs/utils.js

+64
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
/*
2+
* Copyright 2024 New Relic Corporation. All rights reserved.
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
'use strict'
7+
const { makeId } = require('../../../lib/util/hashes')
8+
const utils = module.exports
9+
10+
/**
11+
* Creates a random topic to be used for testing
12+
* @param {string} [prefix=test-topic] topic prefix
13+
* @returns {string} topic name with random id appended
14+
*/
15+
utils.randomTopic = (prefix = 'test-topic') => {
16+
return `${prefix}-${makeId()}`
17+
}
18+
19+
/**
20+
* Creates a topic with the admin class
21+
* @param {object} params to function
22+
* @param {object} params.kafka instance of kafka.Kafka
23+
* @param {string} params.topic topic name
24+
*/
25+
utils.createTopic = async ({ kafka, topic }) => {
26+
const admin = kafka.admin()
27+
try {
28+
await admin.connect()
29+
await admin.createTopics({
30+
waitForLeaders: true,
31+
topics: [{ topic, numPartitions: 1, replicationFactor: 1, configEntries: [] }]
32+
})
33+
} finally {
34+
await admin.disconnect()
35+
}
36+
}
37+
38+
/**
39+
* Waits for consumer to join the group
40+
*
41+
* @param {object} params to function
42+
* @param {object} params.consumer instance of kafkajs.Kafka.consumer
43+
* @param {number} [params.maxWait=10000] how long to wait for consumer to join group
44+
* @returns {Promise}
45+
*
46+
*/
47+
utils.waitForConsumersToJoinGroup = ({ consumer, maxWait = 10000 }) =>
48+
new Promise((resolve, reject) => {
49+
const timeoutId = setTimeout(() => {
50+
consumer.disconnect().then(() => {
51+
reject()
52+
})
53+
}, maxWait)
54+
consumer.on(consumer.events.GROUP_JOIN, (event) => {
55+
clearTimeout(timeoutId)
56+
resolve(event)
57+
})
58+
consumer.on(consumer.events.CRASH, (event) => {
59+
clearTimeout(timeoutId)
60+
consumer.disconnect().then(() => {
61+
reject(event.payload.error)
62+
})
63+
})
64+
})

0 commit comments

Comments
 (0)