Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ jobs:
contents: read
strategy:
matrix:
node-version: [18, 20]
node-version: [20, 22, 24]
steps:
- name: Check out repo
uses: actions/checkout@v5
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# pino-elasticsearch  [![Build Status](https://github.com/pinojs/pino-elasticsearch/workflows/CI/badge.svg)](https://github.com/pinojs/pino-elasticsearch/actions) [![Coverage Status](https://coveralls.io/repos/github/pinojs/pino-elasticsearch/badge.svg?branch=master)](https://coveralls.io/github/pinojs/pino-elasticsearch?branch=master)
# pino-elasticsearch  [![Build Status](https://github.com/pinojs/pino-elasticsearch/workflows/CI/badge.svg)](https://github.com/pinojs/pino-elasticsearch/actions)

Load [pino](https://github.com/pinojs/pino) logs into
[Elasticsearch](https://www.elastic.co/products/elasticsearch).
Expand Down
4 changes: 2 additions & 2 deletions lib.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
/* eslint no-prototype-builtins: 0 */

const split = require('split2')
const { Client } = require('@elastic/elasticsearch')
const { Client: DefaultClient } = require('@elastic/elasticsearch')

function initializeBulkHandler (opts, client, splitter) {
const esVersion = Number(opts.esVersion || opts['es-version']) || 7
Expand Down Expand Up @@ -58,7 +58,7 @@ function initializeBulkHandler (opts, client, splitter) {
}
}

function pinoElasticSearch (opts = {}) {
function pinoElasticSearch (opts = {}, { Client = DefaultClient } = {}) {
if (opts['flush-bytes']) {
process.emitWarning('The "flush-bytes" option has been deprecated, use "flushBytes" instead')
}
Expand Down
66 changes: 33 additions & 33 deletions test/acceptance.test.js
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
'use strict'

const { once } = require('events')
const test = require('node:test')
const { once } = require('node:events')
const { pino } = require('pino')
const elastic = require('../')
const { teardown, beforeEach, test } = require('tap')
const { Client } = require('@elastic/elasticsearch')
const ecsFormat = require('@elastic/ecs-pino-format')

const elastic = require('../')

const index = 'pinotest'
const streamIndex = 'logs-pino-test'
const type = 'log'
Expand Down Expand Up @@ -41,8 +42,8 @@ async function esWaitCluster () {
}
}

teardown(() => {
client.close()
test.after(async () => {
await client.close()
})

let esVersion = 7
Expand All @@ -51,7 +52,7 @@ let esMinor = 15
const supportsDatastreams =
() => esVersion > 7 || (esVersion === 7 && esMinor >= 9)

beforeEach(async () => {
test.beforeEach(async () => {
if (!await esIsRunning()) {
await esWaitCluster()
}
Expand Down Expand Up @@ -79,15 +80,15 @@ test('store a log line', { timeout }, async (t) => {
setImmediate(() => instance.end())

const [stats] = await once(instance, 'insert')
t.equal(stats.total, 1)
t.assert.equal(stats.total, 1)
const documents = await client.helpers.search({
index,
type: esVersion >= 7 ? undefined : type,
body: {
query: { match_all: {} }
}
})
t.equal(documents[0].msg, 'hello world')
t.assert.equal(documents[0].msg, 'hello world')
})

test('Ignores a boolean line even though it is JSON-parsable', { timeout }, (t) => {
Expand All @@ -96,8 +97,8 @@ test('Ignores a boolean line even though it is JSON-parsable', { timeout }, (t)
const instance = elastic({ index, type, node, auth })

instance.on('unknown', (obj, body) => {
t.equal(obj, 'true', 'Object is parsed')
t.equal(body, 'Boolean value ignored', 'Message is emitted')
t.assert.equal(obj, 'true', 'Object is parsed')
t.assert.equal(body, 'Boolean value ignored', 'Message is emitted')
})

instance.write('true\n')
Expand All @@ -109,8 +110,8 @@ test('Ignores "null" being parsed as json', { timeout }, (t) => {
const instance = elastic({ index, type, node, auth })

instance.on('unknown', (obj, body) => {
t.equal(obj, 'null', 'Object is parsed')
t.equal(body, 'Null value ignored', 'Message is emitted')
t.assert.equal(obj, 'null', 'Object is parsed')
t.assert.equal(body, 'Null value ignored', 'Message is emitted')
})

instance.write('null\n')
Expand All @@ -122,7 +123,7 @@ test('Can process number being parsed as json', { timeout }, (t) => {
const instance = elastic({ index, type, node, auth })

instance.on('unknown', (obj, body) => {
t.error(obj, body)
t.assert.fail(obj, body)
})

instance.write('12\n')
Expand All @@ -145,15 +146,15 @@ test('store an deeply nested log line', { timeout }, async (t) => {
setImmediate(() => instance.end())

const [stats] = await once(instance, 'insert')
t.equal(stats.total, 1)
t.assert.equal(stats.total, 1)
const documents = await client.helpers.search({
index,
type: esVersion >= 7 ? undefined : type,
body: {
query: { match_all: {} }
}
})
t.equal(documents[0].deeply.nested.hello, 'world', 'obj gets linearized')
t.assert.equal(documents[0].deeply.nested.hello, 'world', 'obj gets linearized')
})

test('store lines in bulk', { timeout }, async (t) => {
Expand All @@ -171,7 +172,7 @@ test('store lines in bulk', { timeout }, async (t) => {
setImmediate(() => instance.end())

const [stats] = await once(instance, 'insert')
t.equal(stats.total, 5)
t.assert.equal(stats.total, 5)
const documents = await client.helpers.search({
index,
type: esVersion >= 7 ? undefined : type,
Expand All @@ -180,7 +181,7 @@ test('store lines in bulk', { timeout }, async (t) => {
}
})
for (const doc of documents) {
t.equal(doc.msg, 'hello world')
t.assert.equal(doc.msg, 'hello world')
}
})

Expand All @@ -200,15 +201,15 @@ test('replaces date in index', { timeout }, async (t) => {
setImmediate(() => instance.end())

const [stats] = await once(instance, 'insert')
t.equal(stats.total, 1)
t.assert.equal(stats.total, 1)
const documents = await client.helpers.search({
index: index.replace('%{DATE}', new Date().toISOString().substring(0, 10)),
type: esVersion >= 7 ? undefined : type,
body: {
query: { match_all: {} }
}
})
t.equal(documents[0].msg, 'hello world')
t.assert.equal(documents[0].msg, 'hello world')
})

test('replaces date in index during bulk insert', { timeout }, async (t) => {
Expand All @@ -232,7 +233,7 @@ test('replaces date in index during bulk insert', { timeout }, async (t) => {
setImmediate(() => instance.end())

const [stats] = await once(instance, 'insert')
t.equal(stats.total, 5)
t.assert.equal(stats.total, 5)
const documents = await client.helpers.search({
index: index.replace('%{DATE}', new Date().toISOString().substring(0, 10)),
type: esVersion >= 7 ? undefined : type,
Expand All @@ -241,7 +242,7 @@ test('replaces date in index during bulk insert', { timeout }, async (t) => {
}
})
for (const doc of documents) {
t.equal(doc.msg, 'hello world')
t.assert.equal(doc.msg, 'hello world')
}
})

Expand All @@ -256,23 +257,23 @@ test('Use ecs format', { timeout }, async (t) => {
setImmediate(() => instance.end())

const [stats] = await once(instance, 'insert')
t.equal(stats.total, 1)
t.assert.equal(stats.total, 1)
const documents = await client.helpers.search({
index,
type: esVersion >= 7 ? undefined : type,
body: {
query: { match_all: {} }
}
})
t.type(documents[0]['@timestamp'], 'string')
t.assert.equal(typeof documents[0]['@timestamp'], 'string')
})

test('dynamic index name', { timeout }, async (t) => {
t.plan(4)

let indexNameGenerated
const index = function (time) {
t.match(time, new Date().toISOString().substring(0, 10))
t.assert.equal(time.startsWith(new Date().toISOString().substring(0, 10)), true)
indexNameGenerated = `dynamic-index-${process.pid}`
return indexNameGenerated
}
Expand All @@ -285,23 +286,23 @@ test('dynamic index name', { timeout }, async (t) => {
setImmediate(() => instance.end())

const [stats] = await once(instance, 'insert')
t.equal(stats.total, 1)
t.assert.equal(stats.total, 1)
const documents = await client.helpers.search({
index: indexNameGenerated,
type: esVersion >= 7 ? undefined : type,
body: {
query: { match_all: {} }
}
})
t.equal(documents[0].msg, 'hello world')
t.assert.equal(documents[0].msg, 'hello world')
})

test('dynamic index name during bulk insert', { timeout }, async (t) => {
t.plan(12)

let indexNameGenerated
const index = function (time) {
t.match(time, new Date().toISOString().substring(0, 10))
t.assert.equal(time.startsWith(new Date().toISOString().substring(0, 10)), true)
indexNameGenerated = `dynamic-index-${process.pid + 1}`
return indexNameGenerated
}
Expand All @@ -318,7 +319,7 @@ test('dynamic index name during bulk insert', { timeout }, async (t) => {
setImmediate(() => instance.end())

const [stats] = await once(instance, 'insert')
t.equal(stats.total, 5)
t.assert.equal(stats.total, 5)
const documents = await client.helpers.search({
index: indexNameGenerated,
type: esVersion >= 7 ? undefined : type,
Expand All @@ -327,7 +328,7 @@ test('dynamic index name during bulk insert', { timeout }, async (t) => {
}
})
for (const doc of documents) {
t.equal(doc.msg, 'hello world')
t.assert.equal(doc.msg, 'hello world')
}
})

Expand All @@ -354,7 +355,7 @@ test('handle datastreams during bulk insert', { timeout }, async (t) => {

// Assert
const [stats] = await once(instance, 'insert')
t.equal(stats.successful, 5)
t.assert.equal(stats.successful, 5)

const documents = await client.helpers.search({
index: streamIndex,
Expand All @@ -365,10 +366,9 @@ test('handle datastreams during bulk insert', { timeout }, async (t) => {
})

for (let i = 0; i < documents.length; i++) {
t.equal(documents[i]['@timestamp'], logEntries[i].time)
t.assert.equal(documents[i]['@timestamp'], logEntries[i].time)
}
} else {
t.comment('The current elasticsearch version does not support datastreams yet!')
t.diagnostic('The current elasticsearch version does not support datastreams yet!')
}
t.end()
})
21 changes: 14 additions & 7 deletions test/cli.test.js
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
'use strict'
const test = require('tap').test

const test = require('node:test')
const proxyquire = require('proxyquire')

test('CLI: arg node should passed to client constructor', async (t) => {
t.plan(1)
const cli = proxyquire('../cli.js', {
pump: () => { },
'./lib.js': (opts) => {
t.same(opts, { node: 'https://custom-node-url:9999' })
t.assert.deepEqual(opts, { node: 'https://custom-node-url:9999' })
return {
on: () => { }
}
Expand All @@ -17,10 +19,11 @@ test('CLI: arg node should passed to client constructor', async (t) => {
})

test('CLI: arg rejectUnauthorized, if set to \'true\', should passed as true (bool) to client constructor', async (t) => {
t.plan(1)
const cli = proxyquire('../cli.js', {
pump: () => { },
'./lib.js': (opts) => {
t.same(opts, {
t.assert.deepEqual(opts, {
node: 'https://custom-node-url:9999',
rejectUnauthorized: true
})
Expand All @@ -37,10 +40,11 @@ test('CLI: arg rejectUnauthorized, if set to \'true\', should passed as true (bo
})

test('CLI: arg rejectUnauthorized, if set to \'false\', should passed as false (bool) to client constructor', async (t) => {
t.plan(1)
const cli = proxyquire('../cli.js', {
pump: () => { },
'./lib.js': (opts) => {
t.same(opts, {
t.assert.deepEqual(opts, {
node: 'https://custom-node-url:9999',
rejectUnauthorized: false
})
Expand All @@ -57,10 +61,11 @@ test('CLI: arg rejectUnauthorized, if set to \'false\', should passed as false (
})

test('CLI: arg rejectUnauthorized, if set to anything instead of true or false, should passed as true (bool) to client constructor', async (t) => {
t.plan(1)
const cli = proxyquire('../cli.js', {
pump: () => { },
'./lib.js': (opts) => {
t.same(opts, {
t.assert.deepEqual(opts, {
node: 'https://custom-node-url:9999',
rejectUnauthorized: true
})
Expand All @@ -77,10 +82,11 @@ test('CLI: arg rejectUnauthorized, if set to anything instead of true or false,
})

test('CLI: if arg.read-config is set, should read the config file and passed the value (only allowed values)', async (t) => {
t.plan(1)
const cli = proxyquire('../cli.js', {
pump: () => { },
'./lib.js': (opts) => {
t.same(opts, {
t.assert.deepEqual(opts, {
index: 'custom-index',
node: 'https://127.0.0.1:9200',
rejectUnauthorized: false,
Expand All @@ -107,10 +113,11 @@ test('CLI: if arg.read-config is set, should read the config file and passed the
})

test('CLI: arg opType should be passed to client constructor', async (t) => {
t.plan(1)
const cli = proxyquire('../cli.js', {
pump: () => { },
'./lib.js': (opts) => {
t.same(opts, {
t.assert.deepEqual(opts, {
node: 'https://custom-node-url:9999',
opType: 'create'
})
Expand Down
Loading
Loading