Skip to content

chore: Added kafkajs instrumentation and versioned tests skeleton #2224

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
May 30, 2024
Merged
44 changes: 43 additions & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
version: "3"
services:

elasticsearch:
container_name: nr_node_elastic
image: docker.elastic.co/elasticsearch/elasticsearch:8.8.0
Expand All @@ -21,11 +22,45 @@ services:
interval: 30s
timeout: 10s
retries: 5

# Kafka setup based on the e2e tests in node-rdkafka. Needs both the
# `zookeeper` and `kafka` services.
zookeeper:
container_name: nr_node_kafka_zookeeper
image: confluentinc/cp-zookeeper
ports:
- '2181:2181'
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
kafka:
container_name: nr_node_kafka
image: confluentinc/cp-kafka
links:
- zookeeper
ports:
- '9092:9092'
healthcheck:
test: /usr/bin/kafka-cluster cluster-id --bootstrap-server localhost:9092 || exit 1
interval: 1s
timeout: 60s
retries: 60
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://localhost:9092'
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_DEFAULT_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1

memcached:
container_name: nr_node_memcached
image: memcached
ports:
- "11211:11211"

mongodb_3:
container_name: nr_node_mongodb
platform: linux/amd64
Expand All @@ -37,6 +72,7 @@ services:
interval: 1s
timeout: 10s
retries: 30

mongodb_5:
container_name: nr_node_mongodb_5
image: library/mongo:5
Expand All @@ -47,6 +83,7 @@ services:
interval: 1s
timeout: 10s
retries: 30

mysql:
container_name: nr_node_mysql
platform: linux/amd64
Expand All @@ -60,6 +97,7 @@ services:
interval: 1s
timeout: 10s
retries: 30

redis:
container_name: nr_node_redis
image: redis
Expand All @@ -70,6 +108,7 @@ services:
interval: 1s
timeout: 10s
retries: 30

cassandra:
container_name: nr_node_cassandra
platform: linux/amd64
Expand All @@ -80,13 +119,15 @@ services:
test: [ "CMD", "cqlsh", "-u cassandra", "-p cassandra"]
interval: 5s
timeout: 10s
retries: 6
retries: 6

# pg 9.2 has built in healthcheck
pg:
container_name: nr_node_postgres
image: postgres:9.2
ports:
- "5432:5432"

pg_prisma:
container_name: nr_node_postgres_prisma
image: postgres:15
Expand All @@ -100,6 +141,7 @@ services:
interval: 1s
timeout: 10s
retries: 30

rmq:
container_name: nr_node_rabbit
image: rabbitmq:3
Expand Down
11 changes: 11 additions & 0 deletions lib/instrumentation/node-rdkafka.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
/*
* Copyright 2024 New Relic Corporation. All rights reserved.
* SPDX-License-Identifier: Apache-2.0
*/

'use strict'

// eslint-disable-next-line no-unused-vars
module.exports = function initialize(_agent, Kafka, _moduleName, shim) {
// Put instrumentation code here for Kafka.Producer and Kafka.Consumer
}
1 change: 1 addition & 0 deletions lib/instrumentations.js
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ module.exports = function instrumentations() {
'memcached': { type: InstrumentationDescriptor.TYPE_DATASTORE },
'mongodb': { type: InstrumentationDescriptor.TYPE_DATASTORE },
'mysql': { module: './instrumentation/mysql' },
'node-rdkafka': { type: InstrumentationDescriptor.TYPE_MESSAGE },
'openai': { type: InstrumentationDescriptor.TYPE_GENERIC },
'pg': { type: InstrumentationDescriptor.TYPE_DATASTORE },
'pino': { module: './instrumentation/pino' },
Expand Down
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@
"public-docs": "jsdoc -c ./jsdoc-conf.jsonc && cp examples/shim/*.png out/",
"publish-docs": "./bin/publish-docs.sh",
"services": "docker compose up -d --wait",
"services:stop": "docker compose down",
"smoke": "npm run ssl && time tap test/smoke/**/**/*.tap.js --timeout=180 --no-coverage",
"ssl": "./bin/ssl.sh",
"sub-install": "node test/bin/install_sub_deps",
Expand Down
5 changes: 0 additions & 5 deletions test/lib/cache-buster.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,6 @@

'use strict'

/**
* Utility method to remove a set of modules from the require cache.
*
* @param {string[]} modules The set of module names to remove from the cache.
*/
module.exports = {
/**
* Removes explicitly named modules from the require cache.
Expand Down
69 changes: 69 additions & 0 deletions test/versioned/node-rdkafka/kafka.tap.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Copyright 2024 New Relic Corporation. All rights reserved.
* SPDX-License-Identifier: Apache-2.0
*/

'use strict'

const tap = require('tap')
const helper = require('../../lib/agent_helper')
const { removeModules } = require('../../lib/cache-buster')

tap.beforeEach(async (t) => {
t.context.agent = helper.instrumentMockedAgent()

const Kafka = require('node-rdkafka')
t.context.Kafka = Kafka

await new Promise((resolve) => {
const producer = new Kafka.Producer({
'metadata.broker.list': '127.0.0.1:9092'
})
producer.connect()
producer.setPollInterval(10)
producer.on('ready', () => {
t.context.producer = producer
resolve()
})
})

await new Promise((resolve) => {
const consumer = new Kafka.KafkaConsumer({
'metadata.broker.list': '127.0.0.1:9092',
'group.id': 'kafka'
})
consumer.connect()
consumer.on('ready', () => {
t.context.consumer = consumer
resolve()
})
})
})

tap.afterEach(async (t) => {
helper.unloadAgent(t.context.agent)
removeModules(['node-rdkafka'])

await new Promise((resolve) => {
t.context.producer.disconnect(resolve)
})
await new Promise((resolve) => {
t.context.consumer.disconnect(resolve)
})
})

tap.test('stub', { timeout: 10_000 }, (t) => {
const { consumer, producer } = t.context
const topic = 'test-topic'

consumer.on('data', (data) => {
t.equal(data.value.toString(), 'test message')
t.end()
})
consumer.subscribe([topic])
consumer.consume()

setTimeout(() => {
producer.produce(topic, null, Buffer.from('test message'), 'key')
}, 500)
})
28 changes: 28 additions & 0 deletions test/versioned/node-rdkafka/newrelic.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Copyright 2021 New Relic Corporation. All rights reserved.
* SPDX-License-Identifier: Apache-2.0
*/

'use strict'

exports.config = {
app_name: ['My Application'],
license_key: 'license key here',
logging: {
level: 'trace',
filepath: '../../newrelic_agent.log'
},
utilization: {
detect_aws: false,
detect_pcf: false,
detect_azure: false,
detect_gcp: false,
detect_docker: false
},
distributed_tracing: {
enabled: true
},
transaction_tracer: {
enabled: true
}
}
19 changes: 19 additions & 0 deletions test/versioned/node-rdkafka/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
{
"name": "kafka-tests",
"targets": [{"name":"node-rdkafka","minAgentVersion":"11.18.0"}],
"version": "0.0.0",
"private": true,
"tests": [
{
"engines": {
"node": ">=16"
},
"dependencies": {
"node-rdkafka": ">=3.0.0"
},
"files": [
"kafka.tap.js"
]
}
]
}
Loading