Skip to content

Commit 5ceb596

Browse files
feat: send config to transports on init (#1930)
* feat: send config to transports on init * test: integration for transports using pino config
1 parent d9911b0 commit 5ceb596

10 files changed

+297
-3
lines changed

lib/multistream.js

+10
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ function multistream (streamsArray, opts) {
2121
const res = {
2222
write,
2323
add,
24+
emit,
2425
flushSync,
2526
end,
2627
minLevel: 0,
@@ -79,6 +80,14 @@ function multistream (streamsArray, opts) {
7980
}
8081
}
8182

83+
function emit (...args) {
84+
for (const { stream } of this.streams) {
85+
if (typeof stream.emit === 'function') {
86+
stream.emit(...args)
87+
}
88+
}
89+
}
90+
8291
function flushSync () {
8392
for (const { stream } of this.streams) {
8493
if (typeof stream.flushSync === 'function') {
@@ -153,6 +162,7 @@ function multistream (streamsArray, opts) {
153162
minLevel: level,
154163
streams,
155164
clone,
165+
emit,
156166
flushSync,
157167
[metadata]: true
158168
}

lib/transport.js

+2
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,8 @@ function transport (fullOptions) {
111111
options.dedupe = dedupe
112112
}
113113

114+
options.pinoWillSendConfig = true
115+
114116
return buildStream(fixTarget(target), options, worker)
115117

116118
function fixTarget (origin) {

package.json

+2-2
Original file line numberDiff line numberDiff line change
@@ -104,14 +104,14 @@
104104
"atomic-sleep": "^1.0.0",
105105
"fast-redact": "^3.1.1",
106106
"on-exit-leak-free": "^2.1.0",
107-
"pino-abstract-transport": "^1.1.0",
107+
"pino-abstract-transport": "^1.2.0",
108108
"pino-std-serializers": "^6.0.0",
109109
"process-warning": "^3.0.0",
110110
"quick-format-unescaped": "^4.0.3",
111111
"real-require": "^0.2.0",
112112
"safe-stable-stringify": "^2.3.1",
113113
"sonic-boom": "^3.7.0",
114-
"thread-stream": "^2.0.0"
114+
"thread-stream": "^2.6.0"
115115
},
116116
"tsd": {
117117
"directory": "test/types"

pino.js

+4
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,10 @@ function pino (...args) {
161161
assertDefaultLevelFound(level, customLevels, useOnlyCustomLevels)
162162
const levels = mappings(customLevels, useOnlyCustomLevels)
163163

164+
if (typeof stream.emit === 'function') {
165+
stream.emit('message', { code: 'PINO_CONFIG', config: { levels, messageKey, errorKey } })
166+
}
167+
164168
assertLevelComparison(levelComparison)
165169
const levelCompFunc = genLevelComparison(levelComparison)
166170

test/basic.test.js

+33
Original file line numberDiff line numberDiff line change
@@ -229,6 +229,39 @@ test('serializers can return undefined to strip field', async ({ equal }) => {
229229
equal('test' in result, false)
230230
})
231231

232+
test('streams receive a message event with PINO_CONFIG', ({ match, end }) => {
233+
const stream = sink()
234+
stream.once('message', (message) => {
235+
match(message, {
236+
code: 'PINO_CONFIG',
237+
config: {
238+
errorKey: 'err',
239+
levels: {
240+
labels: {
241+
10: 'trace',
242+
20: 'debug',
243+
30: 'info',
244+
40: 'warn',
245+
50: 'error',
246+
60: 'fatal'
247+
},
248+
values: {
249+
debug: 20,
250+
error: 50,
251+
fatal: 60,
252+
info: 30,
253+
trace: 10,
254+
warn: 40
255+
}
256+
},
257+
messageKey: 'msg'
258+
}
259+
})
260+
end()
261+
})
262+
pino(stream)
263+
})
264+
232265
test('does not explode with a circular ref', async ({ doesNotThrow }) => {
233266
const stream = sink()
234267
const instance = pino(stream)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
'use strict'
2+
3+
const build = require('pino-abstract-transport')
4+
const { pipeline, Transform } = require('stream')
5+
module.exports = () => {
6+
return build(function (source) {
7+
const myTransportStream = new Transform({
8+
autoDestroy: true,
9+
objectMode: true,
10+
transform (chunk, enc, cb) {
11+
const {
12+
time,
13+
level,
14+
[source.messageKey]: body,
15+
[source.errorKey]: error,
16+
...attributes
17+
} = chunk
18+
this.push(JSON.stringify({
19+
severityText: source.levels.labels[level],
20+
body,
21+
attributes,
22+
...(error && { error })
23+
}))
24+
cb()
25+
}
26+
})
27+
pipeline(source, myTransportStream, () => {})
28+
return myTransportStream
29+
}, {
30+
enablePipelining: true,
31+
expectPinoConfig: true
32+
})
33+
}
+19
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
'use strict'
2+
3+
const { parentPort, workerData } = require('worker_threads')
4+
const { Writable } = require('stream')
5+
6+
module.exports = (options) => {
7+
const myTransportStream = new Writable({
8+
autoDestroy: true,
9+
write (chunk, enc, cb) {
10+
parentPort.postMessage({
11+
code: 'EVENT',
12+
name: 'workerData',
13+
args: [workerData]
14+
})
15+
cb()
16+
}
17+
})
18+
return myTransportStream
19+
}

test/multistream.test.js

+14-1
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ const pino = require('../')
88
const multistream = pino.multistream
99
const proxyquire = require('proxyquire')
1010
const strip = require('strip-ansi')
11-
const { file } = require('./helper')
11+
const { file, sink } = require('./helper')
1212

1313
test('sends to multiple streams using string levels', function (t) {
1414
let messageCount = 0
@@ -246,6 +246,19 @@ test('supports pretty print', function (t) {
246246
log.info('pretty print')
247247
})
248248

249+
test('emit propagates events to each stream', function (t) {
250+
t.plan(3)
251+
const handler = function (data) {
252+
t.equal(data.msg, 'world')
253+
}
254+
const streams = [sink(), sink(), sink()]
255+
streams.forEach(function (s) {
256+
s.once('hello', handler)
257+
})
258+
const stream = multistream(streams)
259+
stream.emit('hello', { msg: 'world' })
260+
})
261+
249262
test('children support custom levels', function (t) {
250263
const stream = writeStream(function (data, enc, cb) {
251264
t.equal(JSON.parse(data).msg, 'bar')

test/transport/core.test.js

+13
Original file line numberDiff line numberDiff line change
@@ -453,6 +453,19 @@ test('pino.transport with target pino-pretty', async ({ match, teardown }) => {
453453
match(strip(actual), /\[.*\] INFO.*hello/)
454454
})
455455

456+
test('sets worker data informing the transport that pino will send its config', ({ match, plan, teardown }) => {
457+
plan(1)
458+
const transport = pino.transport({
459+
target: join(__dirname, '..', 'fixtures', 'transport-worker-data.js')
460+
})
461+
teardown(transport.end.bind(transport))
462+
const instance = pino(transport)
463+
transport.once('workerData', (workerData) => {
464+
match(workerData.workerData, { pinoWillSendConfig: true })
465+
})
466+
instance.info('hello')
467+
})
468+
456469
test('stdout in worker', async ({ not }) => {
457470
let actual = ''
458471
const child = execa(process.argv[0], [join(__dirname, '..', 'fixtures', 'transport-main.js')])
+167
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,167 @@
1+
'use strict'
2+
3+
const os = require('os')
4+
const { join } = require('path')
5+
const { readFile } = require('fs').promises
6+
const writeStream = require('flush-write-stream')
7+
const { watchFileCreated, file } = require('../helper')
8+
const { test } = require('tap')
9+
const pino = require('../../')
10+
11+
const { pid } = process
12+
const hostname = os.hostname()
13+
14+
function serializeError (error) {
15+
return {
16+
type: error.name,
17+
message: error.message,
18+
stack: error.stack
19+
}
20+
}
21+
22+
function parseLogs (buffer) {
23+
return JSON.parse(`[${buffer.toString().replace(/}{/g, '},{')}]`)
24+
}
25+
26+
test('transport uses pino config', async ({ same, teardown, plan }) => {
27+
plan(1)
28+
const destination = file()
29+
const transport = pino.transport({
30+
pipeline: [{
31+
target: join(__dirname, '..', 'fixtures', 'transport-uses-pino-config.js')
32+
}, {
33+
target: 'pino/file',
34+
options: { destination }
35+
}]
36+
})
37+
teardown(transport.end.bind(transport))
38+
const instance = pino({
39+
messageKey: 'customMessageKey',
40+
errorKey: 'customErrorKey',
41+
customLevels: { custom: 35 }
42+
}, transport)
43+
44+
const error = new Error('bar')
45+
instance.custom('foo')
46+
instance.error(error)
47+
await watchFileCreated(destination)
48+
const result = parseLogs(await readFile(destination))
49+
50+
same(result, [{
51+
severityText: 'custom',
52+
body: 'foo',
53+
attributes: {
54+
pid,
55+
hostname
56+
}
57+
}, {
58+
severityText: 'error',
59+
body: 'bar',
60+
attributes: {
61+
pid,
62+
hostname
63+
},
64+
error: serializeError(error)
65+
}])
66+
})
67+
68+
test('transport uses pino config without customizations', async ({ same, teardown, plan }) => {
69+
plan(1)
70+
const destination = file()
71+
const transport = pino.transport({
72+
pipeline: [{
73+
target: join(__dirname, '..', 'fixtures', 'transport-uses-pino-config.js')
74+
}, {
75+
target: 'pino/file',
76+
options: { destination }
77+
}]
78+
})
79+
teardown(transport.end.bind(transport))
80+
const instance = pino(transport)
81+
82+
const error = new Error('qux')
83+
instance.info('baz')
84+
instance.error(error)
85+
await watchFileCreated(destination)
86+
const result = parseLogs(await readFile(destination))
87+
88+
same(result, [{
89+
severityText: 'info',
90+
body: 'baz',
91+
attributes: {
92+
pid,
93+
hostname
94+
}
95+
}, {
96+
severityText: 'error',
97+
body: 'qux',
98+
attributes: {
99+
pid,
100+
hostname
101+
},
102+
error: serializeError(error)
103+
}])
104+
})
105+
106+
test('transport uses pino config with multistream', async ({ same, teardown, plan }) => {
107+
plan(2)
108+
const destination = file()
109+
const messages = []
110+
const stream = writeStream(function (data, enc, cb) {
111+
const message = JSON.parse(data)
112+
delete message.time
113+
messages.push(message)
114+
cb()
115+
})
116+
const transport = pino.transport({
117+
pipeline: [{
118+
target: join(__dirname, '..', 'fixtures', 'transport-uses-pino-config.js')
119+
}, {
120+
target: 'pino/file',
121+
options: { destination }
122+
}]
123+
})
124+
teardown(transport.end.bind(transport))
125+
const instance = pino({
126+
messageKey: 'customMessageKey',
127+
errorKey: 'customErrorKey',
128+
customLevels: { custom: 35 }
129+
}, pino.multistream([transport, { stream }]))
130+
131+
const error = new Error('buzz')
132+
const serializedError = serializeError(error)
133+
instance.custom('fizz')
134+
instance.error(error)
135+
await watchFileCreated(destination)
136+
const result = parseLogs(await readFile(destination))
137+
138+
same(result, [{
139+
severityText: 'custom',
140+
body: 'fizz',
141+
attributes: {
142+
pid,
143+
hostname
144+
}
145+
}, {
146+
severityText: 'error',
147+
body: 'buzz',
148+
attributes: {
149+
pid,
150+
hostname
151+
},
152+
error: serializedError
153+
}])
154+
155+
same(messages, [{
156+
level: 35,
157+
pid,
158+
hostname,
159+
customMessageKey: 'fizz'
160+
}, {
161+
level: 50,
162+
pid,
163+
hostname,
164+
customErrorKey: serializedError,
165+
customMessageKey: 'buzz'
166+
}])
167+
})

0 commit comments

Comments
 (0)