Skip to content

Commit a5de20b

Browse files
feat: add highwatermakr option to subscription queue (#1197)
1 parent 52d438c commit a5de20b

File tree

10 files changed

+127
-4
lines changed

10 files changed

+127
-4
lines changed

docs/api/options.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@
7676
- `subscription.keepAlive`: `Integer` Optional interval in ms to send the `GQL_CONNECTION_KEEP_ALIVE` message.
7777
- `subscription.fullWsTransport`: `Boolean` Enable sending every operation via WS.
7878
- `subscription.wsDefaultSubprotocol`: `String` Set the default subprotocol in case the client does not specify one. See [Supported subprotocols](../graphql-over-websocket.md#websocket-subprotocol)
79+
- `subscription.queueHighWaterMark`: `Number` Set the high water mark for the internal readable stream queue used by each subscription connection. This controls the buffering behavior of subscription messages. When the queue reaches this limit, backpressure is applied to prevent memory issues. Defaults to Node.js stream default (16 for object mode). See [Node.js Stream Buffering](https://nodejs.org/api/stream.html#buffering) for more details.
7980

8081
- `persistedQueries`: A hash/query map to resolve the full query text using it's unique hash. Overrides `persistedQueryProvider`.
8182
- `onlyPersisted`: Boolean. Flag to control whether to allow graphql queries other than persisted. When `true`, it'll make the server reject any queries that are not present in the `persistedQueries` option above. It will also disable any ide available (graphiql). Requires `persistedQueries` to be set, and overrides `persistedQueryProvider`.

index.d.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -537,6 +537,7 @@ declare namespace mercurius {
537537
keepAlive?: number,
538538
fullWsTransport?: boolean,
539539
wsDefaultSubprotocol?: string,
540+
queueHighWaterMark?: number,
540541
};
541542
/**
542543
* Persisted queries, overrides persistedQueryProvider.

index.js

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,7 @@ const mercurius = fp(async function (app, opts) {
134134
let keepAlive
135135
let fullWsTransport
136136
let wsDefaultSubprotocol
137+
let queueHighWaterMark
137138

138139
if (typeof subscriptionOpts === 'object') {
139140
if (subscriptionOpts.pubsub) {
@@ -149,6 +150,7 @@ const mercurius = fp(async function (app, opts) {
149150
keepAlive = subscriptionOpts.keepAlive
150151
fullWsTransport = subscriptionOpts.fullWsTransport
151152
wsDefaultSubprotocol = subscriptionOpts.wsDefaultSubprotocol
153+
queueHighWaterMark = subscriptionOpts.queueHighWaterMark
152154
} else if (subscriptionOpts === true) {
153155
emitter = mq()
154156
subscriber = new PubSub(emitter)
@@ -190,6 +192,11 @@ const mercurius = fp(async function (app, opts) {
190192
throw new MER_ERR_INVALID_OPTS('wsDefaultSubprotocol must be either graphql-ws or graphql-transport-ws')
191193
}
192194
}
195+
if (queueHighWaterMark) {
196+
if (typeof queueHighWaterMark !== 'number' || queueHighWaterMark <= 0) {
197+
throw new MER_ERR_INVALID_OPTS('queueHighWaterMark must be a positive number')
198+
}
199+
}
193200

194201
fastifyGraphQl.schema = schema
195202

@@ -226,6 +233,7 @@ const mercurius = fp(async function (app, opts) {
226233
keepAlive,
227234
fullWsTransport,
228235
wsDefaultSubprotocol,
236+
queueHighWaterMark,
229237
additionalRouteOptions: opts.additionalRouteOptions,
230238
csrfConfig
231239
})

lib/routes.js

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -204,6 +204,7 @@ module.exports = async function (app, opts) {
204204
keepAlive,
205205
fullWsTransport,
206206
wsDefaultSubprotocol,
207+
queueHighWaterMark,
207208
additionalRouteOptions
208209
} = opts
209210

@@ -349,6 +350,7 @@ module.exports = async function (app, opts) {
349350
keepAlive,
350351
fullWsTransport,
351352
wsDefaultSubprotocol,
353+
queueHighWaterMark,
352354
errorFormatter
353355
})
354356
} else {

lib/subscriber.js

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,11 +37,12 @@ class PubSub {
3737

3838
// One context - and queue for each subscription
3939
class SubscriptionContext {
40-
constructor ({ pubsub, fastify }) {
40+
constructor ({ pubsub, fastify, queueHighWaterMark }) {
4141
this.fastify = fastify
4242
this.pubsub = pubsub
4343
this.queue = new Readable({
4444
objectMode: true,
45+
highWaterMark: queueHighWaterMark,
4546
read: () => {}
4647
})
4748
this.closed = false

lib/subscription-connection.js

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ module.exports = class SubscriptionConnection {
2222
keepAlive,
2323
fullWsTransport,
2424
wsDefaultSubprotocol,
25+
queueHighWaterMark,
2526
errorFormatter
2627
}) {
2728
this.fastify = fastify
@@ -38,6 +39,7 @@ module.exports = class SubscriptionConnection {
3839
this.keepAlive = keepAlive
3940
this.fullWsTransport = fullWsTransport
4041
this.wsDefaultSubprotocol = wsDefaultSubprotocol
42+
this.queueHighWaterMark = queueHighWaterMark
4143
this.errorFormatter = errorFormatter
4244
this.headers = {}
4345
this.resolverContext = null
@@ -267,7 +269,8 @@ module.exports = class SubscriptionConnection {
267269

268270
const sc = new SubscriptionContext({
269271
fastify: this.fastify,
270-
pubsub: this.subscriber
272+
pubsub: this.subscriber,
273+
queueHighWaterMark: this.queueHighWaterMark
271274
})
272275

273276
// Trigger preSubscriptionParsing hook

lib/subscription.js

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,10 @@ const { kHooks } = require('./symbols')
66
const SubscriptionConnection = require('./subscription-connection')
77
const { isValidClientProtocol } = require('./subscription-protocol')
88

9-
function createConnectionHandler ({ subscriber, fastify, onConnect, onDisconnect, entityResolversFactory, subscriptionContextFn, keepAlive, fullWsTransport, wsDefaultSubprotocol, errorFormatter }) {
9+
function createConnectionHandler ({
10+
subscriber, fastify, onConnect, onDisconnect, entityResolversFactory, subscriptionContextFn, keepAlive,
11+
fullWsTransport, wsDefaultSubprotocol, queueHighWaterMark, errorFormatter
12+
}) {
1013
return async (socket, request) => {
1114
if (!isValidClientProtocol(socket.protocol, wsDefaultSubprotocol)) {
1215
console.log('wrong websocket protocol: ' + socket.protocol)
@@ -49,13 +52,14 @@ function createConnectionHandler ({ subscriber, fastify, onConnect, onDisconnect
4952
keepAlive,
5053
fullWsTransport,
5154
wsDefaultSubprotocol,
55+
queueHighWaterMark,
5256
errorFormatter
5357
})
5458
}
5559
}
5660

5761
module.exports = async function (fastify, opts) {
58-
const { getOptions, subscriber, verifyClient, onConnect, onDisconnect, entityResolversFactory, subscriptionContextFn, keepAlive, fullWsTransport, wsDefaultSubprotocol, errorFormatter } = opts
62+
const { getOptions, subscriber, verifyClient, onConnect, onDisconnect, entityResolversFactory, subscriptionContextFn, keepAlive, fullWsTransport, wsDefaultSubprotocol, queueHighWaterMark, errorFormatter } = opts
5963

6064
// If `fastify.websocketServer` exists, it means `@fastify/websocket` already registered.
6165
// Without this check, @fastify/websocket will be registered multiple times and raises FST_ERR_DEC_ALREADY_PRESENT.
@@ -80,6 +84,7 @@ module.exports = async function (fastify, opts) {
8084
keepAlive,
8185
fullWsTransport,
8286
wsDefaultSubprotocol,
87+
queueHighWaterMark,
8388
errorFormatter
8489
})
8590
})

test/options.test.js

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,3 +98,33 @@ test('invalid wsDefaultSubprotocol', async t => {
9898
message: 'Invalid options: wsDefaultSubprotocol must be either graphql-ws or graphql-transport-ws'
9999
})
100100
})
101+
102+
test('invalid queueHighWaterMark', async t => {
103+
const app = Fastify()
104+
t.after(() => app.close())
105+
106+
app.register(mercurius, {
107+
subscription: {
108+
queueHighWaterMark: 'invalid'
109+
}
110+
})
111+
112+
await t.assert.rejects(app.ready(), {
113+
message: 'Invalid options: queueHighWaterMark must be a positive number'
114+
})
115+
})
116+
117+
test('invalid queueHighWaterMark', async t => {
118+
const app = Fastify()
119+
t.after(() => app.close())
120+
121+
app.register(mercurius, {
122+
subscription: {
123+
queueHighWaterMark: -1
124+
}
125+
})
126+
127+
await t.assert.rejects(app.ready(), {
128+
message: 'Invalid options: queueHighWaterMark must be a positive number'
129+
})
130+
})

test/subscription.test.js

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -697,6 +697,70 @@ test('subscription with custom pubsub with custom params on subscribe method', (
697697
})
698698
})
699699

700+
test('subscription queue has highWaterMark when queueHighWaterMark is provided', async (t) => {
701+
const emitter = new EventEmitter()
702+
class SpyPubSub {
703+
async subscribe (_, queue) {
704+
emitter.emit('onsubscription', queue)
705+
}
706+
}
707+
708+
const app = Fastify()
709+
t.after(() => app.close())
710+
711+
const pubsub = new SpyPubSub()
712+
713+
const schema = `
714+
type Query {
715+
_placeholder: String
716+
}
717+
718+
type Subscription {
719+
onSubscription: String
720+
}
721+
`
722+
723+
const resolvers = {
724+
Subscription: {
725+
onSubscription: {
726+
subscribe: (root, args, { pubsub }) => pubsub.subscribe('ON_SUBSCRIPTION')
727+
}
728+
}
729+
}
730+
731+
app.register(GQL, {
732+
schema,
733+
resolvers,
734+
subscription: {
735+
pubsub,
736+
queueHighWaterMark: 2
737+
}
738+
})
739+
740+
await app.listen({ port: 0 })
741+
742+
const ws = new WebSocket('ws://localhost:' + (app.server.address()).port + '/graphql', 'graphql-ws')
743+
const client = WebSocket.createWebSocketStream(ws, { encoding: 'utf8', objectMode: true })
744+
t.after(() => client.destroy())
745+
client.setEncoding('utf8')
746+
747+
client.write(JSON.stringify({ type: 'connection_init' }))
748+
client.write(JSON.stringify({
749+
id: 1,
750+
type: 'start',
751+
payload: {
752+
query: `
753+
subscription {
754+
onSubscription
755+
}
756+
`
757+
}
758+
}))
759+
760+
const [queue] = await once(emitter, 'onsubscription')
761+
t.assert.strictEqual(queue._readableState.highWaterMark, 2)
762+
})
763+
700764
test('subscription server sends update to subscriptions with custom context', (t, done) => {
701765
const app = Fastify()
702766
t.after(() => app.close())

test/types/index.ts

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -362,6 +362,14 @@ app.register(mercurius, {
362362
}
363363
})
364364

365+
app.register(mercurius, {
366+
schema,
367+
resolvers,
368+
subscription: {
369+
queueHighWaterMark: 100
370+
}
371+
})
372+
365373
app.register(mercurius, {
366374
schema,
367375
resolvers,

0 commit comments

Comments
 (0)