1 | # Extension pipelining
|
---|
2 |
|
---|
3 | `websocket-extensions` models the extension negotiation and processing pipeline
|
---|
4 | of the WebSocket protocol. Between the driver parsing messages from the TCP
|
---|
5 | stream and handing those messages off to the application, there may exist a
|
---|
6 | stack of extensions that transform the message somehow.
|
---|
7 |
|
---|
8 | In the parlance of this framework, a *session* refers to a single instance of an
|
---|
9 | extension, acting on a particular socket on either the server or the client
|
---|
10 | side. A session may transform messages both incoming to the application and
|
---|
11 | outgoing from the application, for example the `permessage-deflate` extension
|
---|
12 | compresses outgoing messages and decompresses incoming messages. Message streams
|
---|
13 | in either direction are independent; that is, incoming and outgoing messages
|
---|
14 | cannot be assumed to 'pair up' as in a request-response protocol.
|
---|
15 |
|
---|
16 | Asynchronous processing of messages poses a number of problems that this
|
---|
17 | pipeline construction is intended to solve.
|
---|
18 |
|
---|
19 |
|
---|
20 | ## Overview
|
---|
21 |
|
---|
22 | Logically, we have the following:
|
---|
23 |
|
---|
24 |
|
---|
25 | +-------------+ out +---+ +---+ +---+ +--------+
|
---|
26 | | |------>| |---->| |---->| |------>| |
|
---|
27 | | Application | | A | | B | | C | | Driver |
|
---|
28 | | |<------| |<----| |<----| |<------| |
|
---|
29 | +-------------+ in +---+ +---+ +---+ +--------+
|
---|
30 |
|
---|
31 | \ /
|
---|
32 | +----------o----------+
|
---|
33 | |
|
---|
34 | sessions
|
---|
35 |
|
---|
36 |
|
---|
37 | For outgoing messages, the driver receives the result of
|
---|
38 |
|
---|
39 | C.outgoing(B.outgoing(A.outgoing(message)))
|
---|
40 |
|
---|
41 | or, [A, B, C].reduce(((m, ext) => ext.outgoing(m)), message)
|
---|
42 |
|
---|
43 | For incoming messages, the application receives the result of
|
---|
44 |
|
---|
45 | A.incoming(B.incoming(C.incoming(message)))
|
---|
46 |
|
---|
47 | or, [C, B, A].reduce(((m, ext) => ext.incoming(m)), message)
|
---|
48 |
|
---|
49 | A session is of the following type, to borrow notation from pseudo-Haskell:
|
---|
50 |
|
---|
51 | type Session = {
|
---|
52 | incoming :: Message -> Message
|
---|
53 | outgoing :: Message -> Message
|
---|
54 | close :: () -> ()
|
---|
55 | }
|
---|
56 |
|
---|
57 | (That `() -> ()` syntax is intended to mean that `close()` is a nullary void
|
---|
58 | method; I apologise to any Haskell readers for not using the right monad.)
|
---|
59 |
|
---|
60 | The `incoming()` and `outgoing()` methods perform message transformation in the
|
---|
61 | respective directions; `close()` is called when a socket closes so the session
|
---|
62 | can release any resources it's holding, for example a DEFLATE de/compression
|
---|
63 | context.
|
---|
64 |
|
---|
65 | However because this is JavaScript, the `incoming()` and `outgoing()` methods
|
---|
66 | may be asynchronous (indeed, `permessage-deflate` is based on `zlib`, whose API
|
---|
67 | is stream-based). So their interface is strictly:
|
---|
68 |
|
---|
69 | type Session = {
|
---|
70 | incoming :: Message -> Callback -> ()
|
---|
71 | outgoing :: Message -> Callback -> ()
|
---|
72 | close :: () -> ()
|
---|
73 | }
|
---|
74 |
|
---|
75 | type Callback = Either Error Message -> ()
|
---|
76 |
|
---|
77 | This means a message *m2* can be pushed into a session while it's still
|
---|
78 | processing the preceding message *m1*. The messages can be processed
|
---|
79 | concurrently but they *must* be given to the next session in line (or to the
|
---|
80 | application) in the same order they came in. Applications will expect to receive
|
---|
81 | messages in the order they arrived over the wire, and sessions require this too.
|
---|
82 | So ordering of messages must be preserved throughout the pipeline.
|
---|
83 |
|
---|
84 | Consider the following highly simplified extension that deflates messages on the
|
---|
85 | wire. `message` is a value conforming the type:
|
---|
86 |
|
---|
87 | type Message = {
|
---|
88 | rsv1 :: Boolean
|
---|
89 | rsv2 :: Boolean
|
---|
90 | rsv3 :: Boolean
|
---|
91 | opcode :: Number
|
---|
92 | data :: Buffer
|
---|
93 | }
|
---|
94 |
|
---|
95 | Here's the extension:
|
---|
96 |
|
---|
97 | ```js
|
---|
98 | var zlib = require('zlib');
|
---|
99 |
|
---|
100 | var deflate = {
|
---|
101 | outgoing: function(message, callback) {
|
---|
102 | zlib.deflateRaw(message.data, function(error, result) {
|
---|
103 | message.rsv1 = true;
|
---|
104 | message.data = result;
|
---|
105 | callback(error, message);
|
---|
106 | });
|
---|
107 | },
|
---|
108 |
|
---|
109 | incoming: function(message, callback) {
|
---|
110 | // decompress inbound messages (elided)
|
---|
111 | },
|
---|
112 |
|
---|
113 | close: function() {
|
---|
114 | // no state to clean up
|
---|
115 | }
|
---|
116 | };
|
---|
117 | ```
|
---|
118 |
|
---|
119 | We can call it with a large message followed by a small one, and the small one
|
---|
120 | will be returned first:
|
---|
121 |
|
---|
122 | ```js
|
---|
123 | var crypto = require('crypto'),
|
---|
124 | large = crypto.randomBytes(1 << 14),
|
---|
125 | small = new Buffer('hi');
|
---|
126 |
|
---|
127 | deflate.outgoing({ data: large }, function() {
|
---|
128 | console.log(1, 'large');
|
---|
129 | });
|
---|
130 |
|
---|
131 | deflate.outgoing({ data: small }, function() {
|
---|
132 | console.log(2, 'small');
|
---|
133 | });
|
---|
134 |
|
---|
135 | /* prints: 2 'small'
|
---|
136 | 1 'large' */
|
---|
137 | ```
|
---|
138 |
|
---|
139 | So a session that processes messages asynchronously may fail to preserve message
|
---|
140 | ordering.
|
---|
141 |
|
---|
142 | Now, this extension is stateless, so it can process messages in any order and
|
---|
143 | still produce the same output. But some extensions are stateful and require
|
---|
144 | message order to be preserved.
|
---|
145 |
|
---|
146 | For example, when using `permessage-deflate` without `no_context_takeover` set,
|
---|
147 | the session retains a DEFLATE de/compression context between messages, which
|
---|
148 | accumulates state as it consumes data (later messages can refer to sections of
|
---|
149 | previous ones to improve compression). Reordering parts of the DEFLATE stream
|
---|
150 | will result in a failed decompression. Messages must be decompressed in the same
|
---|
151 | order they were compressed by the peer in order for the DEFLATE protocol to
|
---|
152 | work.
|
---|
153 |
|
---|
154 | Finally, there is the problem of closing a socket. When a WebSocket is closed by
|
---|
155 | the application, or receives a closing request from the other peer, there may be
|
---|
156 | messages outgoing from the application and incoming from the peer in the
|
---|
157 | pipeline. If we close the socket and pipeline immediately, two problems arise:
|
---|
158 |
|
---|
159 | * We may send our own closing frame to the peer before all prior messages we
|
---|
160 | sent have been written to the socket, and before we have finished processing
|
---|
161 | all prior messages from the peer
|
---|
162 | * The session may be instructed to close its resources (e.g. its de/compression
|
---|
163 | context) while it's in the middle of processing a message, or before it has
|
---|
164 | received messages that are upstream of it in the pipeline
|
---|
165 |
|
---|
166 | Essentially, we must defer closing the sessions and sending a closing frame
|
---|
167 | until after all prior messages have exited the pipeline.
|
---|
168 |
|
---|
169 |
|
---|
170 | ## Design goals
|
---|
171 |
|
---|
172 | * Message order must be preserved between the protocol driver, the extension
|
---|
173 | sessions, and the application
|
---|
174 | * Messages should be handed off to sessions and endpoints as soon as possible,
|
---|
175 | to maximise throughput of stateless sessions
|
---|
176 | * The closing procedure should block any further messages from entering the
|
---|
177 | pipeline, and should allow all existing messages to drain
|
---|
178 | * Sessions should be closed as soon as possible to prevent them holding memory
|
---|
179 | and other resources when they have no more messages to handle
|
---|
180 | * The closing API should allow the caller to detect when the pipeline is empty
|
---|
181 | and it is safe to continue the WebSocket closing procedure
|
---|
182 | * Individual extensions should remain as simple as possible to facilitate
|
---|
183 | modularity and independent authorship
|
---|
184 |
|
---|
185 | The final point about modularity is an important one: this framework is designed
|
---|
186 | to facilitate extensions existing as plugins, by decoupling the protocol driver,
|
---|
187 | extensions, and application. In an ideal world, plugins should only need to
|
---|
188 | contain code for their specific functionality, and not solve these problems that
|
---|
189 | apply to all sessions. Also, solving some of these problems requires
|
---|
190 | consideration of all active sessions collectively, which an individual session
|
---|
191 | is incapable of doing.
|
---|
192 |
|
---|
193 | For example, it is entirely possible to take the simple `deflate` extension
|
---|
194 | above and wrap its `incoming()` and `outgoing()` methods in two `Transform`
|
---|
195 | streams, producing this type:
|
---|
196 |
|
---|
197 | type Session = {
|
---|
198 | incoming :: TransformStream
|
---|
199 | outtoing :: TransformStream
|
---|
200 | close :: () -> ()
|
---|
201 | }
|
---|
202 |
|
---|
203 | The `Transform` class makes it easy to wrap an async function such that message
|
---|
204 | order is preserved:
|
---|
205 |
|
---|
206 | ```js
|
---|
207 | var stream = require('stream'),
|
---|
208 | session = new stream.Transform({ objectMode: true });
|
---|
209 |
|
---|
210 | session._transform = function(message, _, callback) {
|
---|
211 | var self = this;
|
---|
212 | deflate.outgoing(message, function(error, result) {
|
---|
213 | self.push(result);
|
---|
214 | callback();
|
---|
215 | });
|
---|
216 | };
|
---|
217 | ```
|
---|
218 |
|
---|
219 | However, this has a negative impact on throughput: it works by deferring
|
---|
220 | `callback()` until the async function has 'returned', which blocks `Transform`
|
---|
221 | from passing further input into the `_transform()` method until the current
|
---|
222 | message is dealt with completely. This would prevent sessions from processing
|
---|
223 | messages concurrently, and would unnecessarily reduce the throughput of
|
---|
224 | stateless extensions.
|
---|
225 |
|
---|
226 | So, input should be handed off to sessions as soon as possible, and all we need
|
---|
227 | is a mechanism to reorder the output so that message order is preserved for the
|
---|
228 | next session in line.
|
---|
229 |
|
---|
230 |
|
---|
231 | ## Solution
|
---|
232 |
|
---|
233 | We now describe the model implemented here and how it meets the above design
|
---|
234 | goals. The above diagram where a stack of extensions sit between the driver and
|
---|
235 | application describes the data flow, but not the object graph. That looks like
|
---|
236 | this:
|
---|
237 |
|
---|
238 |
|
---|
239 | +--------+
|
---|
240 | | Driver |
|
---|
241 | +---o----+
|
---|
242 | |
|
---|
243 | V
|
---|
244 | +------------+ +----------+
|
---|
245 | | Extensions o----->| Pipeline |
|
---|
246 | +------------+ +-----o----+
|
---|
247 | |
|
---|
248 | +---------------+---------------+
|
---|
249 | | | |
|
---|
250 | +-----o----+ +-----o----+ +-----o----+
|
---|
251 | | Cell [A] | | Cell [B] | | Cell [C] |
|
---|
252 | +----------+ +----------+ +----------+
|
---|
253 |
|
---|
254 |
|
---|
255 | A driver using this framework holds an instance of the `Extensions` class, which
|
---|
256 | it uses to register extension plugins, negotiate headers and transform messages.
|
---|
257 | The `Extensions` instance itself holds a `Pipeline`, which contains an array of
|
---|
258 | `Cell` objects, each of which wraps one of the sessions.
|
---|
259 |
|
---|
260 |
|
---|
261 | ### Message processing
|
---|
262 |
|
---|
263 | Both the `Pipeline` and `Cell` classes have `incoming()` and `outgoing()`
|
---|
264 | methods; the `Pipeline` interface pushes messages into the pipe, delegates the
|
---|
265 | message to each `Cell` in turn, then returns it back to the driver. Outgoing
|
---|
266 | messages pass through `A` then `B` then `C`, and incoming messages in the
|
---|
267 | reverse order.
|
---|
268 |
|
---|
269 | Internally, a `Cell` contains two `Functor` objects. A `Functor` wraps an async
|
---|
270 | function and makes sure its output messages maintain the order of its input
|
---|
271 | messages. This name is due to [@fronx](https://github.com/fronx), on the basis
|
---|
272 | that, by preserving message order, the abstraction preserves the *mapping*
|
---|
273 | between input and output messages. To use our simple `deflate` extension from
|
---|
274 | above:
|
---|
275 |
|
---|
276 | ```js
|
---|
277 | var functor = new Functor(deflate, 'outgoing');
|
---|
278 |
|
---|
279 | functor.call({ data: large }, function() {
|
---|
280 | console.log(1, 'large');
|
---|
281 | });
|
---|
282 |
|
---|
283 | functor.call({ data: small }, function() {
|
---|
284 | console.log(2, 'small');
|
---|
285 | });
|
---|
286 |
|
---|
287 | /* -> 1 'large'
|
---|
288 | 2 'small' */
|
---|
289 | ```
|
---|
290 |
|
---|
291 | A `Cell` contains two of these, one for each direction:
|
---|
292 |
|
---|
293 |
|
---|
294 | +-----------------------+
|
---|
295 | +---->| Functor [A, incoming] |
|
---|
296 | +----------+ | +-----------------------+
|
---|
297 | | Cell [A] o------+
|
---|
298 | +----------+ | +-----------------------+
|
---|
299 | +---->| Functor [A, outgoing] |
|
---|
300 | +-----------------------+
|
---|
301 |
|
---|
302 |
|
---|
303 | This satisfies the message transformation requirements: the `Pipeline` simply
|
---|
304 | loops over the cells in the appropriate direction to transform each message.
|
---|
305 | Because each `Cell` will preserve message order, we can pass a message to the
|
---|
306 | next `Cell` in line as soon as the current `Cell` returns it. This gives each
|
---|
307 | `Cell` all the messages in order while maximising throughput.
|
---|
308 |
|
---|
309 |
|
---|
310 | ### Session closing
|
---|
311 |
|
---|
312 | We want to close each session as soon as possible, after all existing messages
|
---|
313 | have drained. To do this, each `Cell` begins with a pending message counter in
|
---|
314 | each direction, labelled `in` and `out` below.
|
---|
315 |
|
---|
316 |
|
---|
317 | +----------+
|
---|
318 | | Pipeline |
|
---|
319 | +-----o----+
|
---|
320 | |
|
---|
321 | +---------------+---------------+
|
---|
322 | | | |
|
---|
323 | +-----o----+ +-----o----+ +-----o----+
|
---|
324 | | Cell [A] | | Cell [B] | | Cell [C] |
|
---|
325 | +----------+ +----------+ +----------+
|
---|
326 | in: 0 in: 0 in: 0
|
---|
327 | out: 0 out: 0 out: 0
|
---|
328 |
|
---|
329 |
|
---|
330 | When a message *m1* enters the pipeline, say in the `outgoing` direction, we
|
---|
331 | increment the `pending.out` counter on all cells immediately.
|
---|
332 |
|
---|
333 |
|
---|
334 | +----------+
|
---|
335 | m1 => | Pipeline |
|
---|
336 | +-----o----+
|
---|
337 | |
|
---|
338 | +---------------+---------------+
|
---|
339 | | | |
|
---|
340 | +-----o----+ +-----o----+ +-----o----+
|
---|
341 | | Cell [A] | | Cell [B] | | Cell [C] |
|
---|
342 | +----------+ +----------+ +----------+
|
---|
343 | in: 0 in: 0 in: 0
|
---|
344 | out: 1 out: 1 out: 1
|
---|
345 |
|
---|
346 |
|
---|
347 | *m1* is handed off to `A`, meanwhile a second message `m2` arrives in the same
|
---|
348 | direction. All `pending.out` counters are again incremented.
|
---|
349 |
|
---|
350 |
|
---|
351 | +----------+
|
---|
352 | m2 => | Pipeline |
|
---|
353 | +-----o----+
|
---|
354 | |
|
---|
355 | +---------------+---------------+
|
---|
356 | m1 | | |
|
---|
357 | +-----o----+ +-----o----+ +-----o----+
|
---|
358 | | Cell [A] | | Cell [B] | | Cell [C] |
|
---|
359 | +----------+ +----------+ +----------+
|
---|
360 | in: 0 in: 0 in: 0
|
---|
361 | out: 2 out: 2 out: 2
|
---|
362 |
|
---|
363 |
|
---|
364 | When the first cell's `A.outgoing` functor finishes processing *m1*, the first
|
---|
365 | `pending.out` counter is decremented and *m1* is handed off to cell `B`.
|
---|
366 |
|
---|
367 |
|
---|
368 | +----------+
|
---|
369 | | Pipeline |
|
---|
370 | +-----o----+
|
---|
371 | |
|
---|
372 | +---------------+---------------+
|
---|
373 | m2 | m1 | |
|
---|
374 | +-----o----+ +-----o----+ +-----o----+
|
---|
375 | | Cell [A] | | Cell [B] | | Cell [C] |
|
---|
376 | +----------+ +----------+ +----------+
|
---|
377 | in: 0 in: 0 in: 0
|
---|
378 | out: 1 out: 2 out: 2
|
---|
379 |
|
---|
380 |
|
---|
381 |
|
---|
382 | As `B` finishes with *m1*, and as `A` finishes with *m2*, the `pending.out`
|
---|
383 | counters continue to decrement.
|
---|
384 |
|
---|
385 |
|
---|
386 | +----------+
|
---|
387 | | Pipeline |
|
---|
388 | +-----o----+
|
---|
389 | |
|
---|
390 | +---------------+---------------+
|
---|
391 | | m2 | m1 |
|
---|
392 | +-----o----+ +-----o----+ +-----o----+
|
---|
393 | | Cell [A] | | Cell [B] | | Cell [C] |
|
---|
394 | +----------+ +----------+ +----------+
|
---|
395 | in: 0 in: 0 in: 0
|
---|
396 | out: 0 out: 1 out: 2
|
---|
397 |
|
---|
398 |
|
---|
399 |
|
---|
400 | Say `C` is a little slow, and begins processing *m2* while still processing
|
---|
401 | *m1*. That's fine, the `Functor` mechanism will keep *m1* ahead of *m2* in the
|
---|
402 | output.
|
---|
403 |
|
---|
404 |
|
---|
405 | +----------+
|
---|
406 | | Pipeline |
|
---|
407 | +-----o----+
|
---|
408 | |
|
---|
409 | +---------------+---------------+
|
---|
410 | | | m2 | m1
|
---|
411 | +-----o----+ +-----o----+ +-----o----+
|
---|
412 | | Cell [A] | | Cell [B] | | Cell [C] |
|
---|
413 | +----------+ +----------+ +----------+
|
---|
414 | in: 0 in: 0 in: 0
|
---|
415 | out: 0 out: 0 out: 2
|
---|
416 |
|
---|
417 |
|
---|
418 | Once all messages are dealt with, the counters return to `0`.
|
---|
419 |
|
---|
420 |
|
---|
421 | +----------+
|
---|
422 | | Pipeline |
|
---|
423 | +-----o----+
|
---|
424 | |
|
---|
425 | +---------------+---------------+
|
---|
426 | | | |
|
---|
427 | +-----o----+ +-----o----+ +-----o----+
|
---|
428 | | Cell [A] | | Cell [B] | | Cell [C] |
|
---|
429 | +----------+ +----------+ +----------+
|
---|
430 | in: 0 in: 0 in: 0
|
---|
431 | out: 0 out: 0 out: 0
|
---|
432 |
|
---|
433 |
|
---|
434 | The same process applies in the `incoming` direction, the only difference being
|
---|
435 | that messages are passed to `C` first.
|
---|
436 |
|
---|
437 | This makes closing the sessions quite simple. When the driver wants to close the
|
---|
438 | socket, it calls `Pipeline.close()`. This *immediately* calls `close()` on all
|
---|
439 | the cells. If a cell has `in == out == 0`, then it immediately calls
|
---|
440 | `session.close()`. Otherwise, it stores the closing call and defers it until
|
---|
441 | `in` and `out` have both ticked down to zero. The pipeline will not accept new
|
---|
442 | messages after `close()` has been called, so we know the pending counts will not
|
---|
443 | increase after this point.
|
---|
444 |
|
---|
445 | This means each session is closed as soon as possible: `A` can close while the
|
---|
446 | slow `C` session is still working, because it knows there are no more messages
|
---|
447 | on the way. Similarly, `C` will defer closing if `close()` is called while *m1*
|
---|
448 | is still in `B`, and *m2* in `A`, because its pending count means it knows it
|
---|
449 | has work yet to do, even if it's not received those messages yet. This concern
|
---|
450 | cannot be addressed by extensions acting only on their own local state, unless
|
---|
451 | we pollute individual extensions by making them all implement this same
|
---|
452 | mechanism.
|
---|
453 |
|
---|
454 | The actual closing API at each level is slightly different:
|
---|
455 |
|
---|
456 | type Session = {
|
---|
457 | close :: () -> ()
|
---|
458 | }
|
---|
459 |
|
---|
460 | type Cell = {
|
---|
461 | close :: () -> Promise ()
|
---|
462 | }
|
---|
463 |
|
---|
464 | type Pipeline = {
|
---|
465 | close :: Callback -> ()
|
---|
466 | }
|
---|
467 |
|
---|
468 | This might appear inconsistent so it's worth explaining. Remember that a
|
---|
469 | `Pipeline` holds a list of `Cell` objects, each wrapping a `Session`. The driver
|
---|
470 | talks (via the `Extensions` API) to the `Pipeline` interface, and it wants
|
---|
471 | `Pipeline.close()` to do two things: close all the sessions, and tell me when
|
---|
472 | it's safe to start the closing procedure (i.e. when all messages have drained
|
---|
473 | from the pipe and been handed off to the application or socket). A callback API
|
---|
474 | works well for that.
|
---|
475 |
|
---|
476 | At the other end of the stack, `Session.close()` is a nullary void method with
|
---|
477 | no callback or promise API because we don't care what it does, and whatever it
|
---|
478 | does do will not block the WebSocket protocol; we're not going to hold off
|
---|
479 | processing messages while a session closes its de/compression context. We just
|
---|
480 | tell it to close itself, and don't want to wait while it does that.
|
---|
481 |
|
---|
482 | In the middle, `Cell.close()` returns a promise rather than using a callback.
|
---|
483 | This is for two reasons. First, `Cell.close()` might not do anything
|
---|
484 | immediately, it might have to defer its effect while messages drain. So, if
|
---|
485 | given a callback, it would have to store it in a queue for later execution.
|
---|
486 | Callbacks work fine if your method does something and can then invoke the
|
---|
487 | callback itself, but if you need to store callbacks somewhere so another method
|
---|
488 | can execute them, a promise is a better fit. Second, it better serves the
|
---|
489 | purposes of `Pipeline.close()`: it wants to call `close()` on each of a list of
|
---|
490 | cells, and wait for all of them to finish. This is simple and idiomatic using
|
---|
491 | promises:
|
---|
492 |
|
---|
493 | ```js
|
---|
494 | var closed = cells.map((cell) => cell.close());
|
---|
495 | Promise.all(closed).then(callback);
|
---|
496 | ```
|
---|
497 |
|
---|
498 | (We don't actually use a full *Promises/A+* compatible promise here, we use a
|
---|
499 | much simplified construction that acts as a callback aggregater and resolves
|
---|
500 | synchronously and does not support chaining, but the principle is the same.)
|
---|
501 |
|
---|
502 |
|
---|
503 | ### Error handling
|
---|
504 |
|
---|
505 | We've not mentioned error handling so far but it bears some explanation. The
|
---|
506 | above counter system still applies, but behaves slightly differently in the
|
---|
507 | presence of errors.
|
---|
508 |
|
---|
509 | Say we push three messages into the pipe in the outgoing direction:
|
---|
510 |
|
---|
511 |
|
---|
512 | +----------+
|
---|
513 | m3, m2, m1 => | Pipeline |
|
---|
514 | +-----o----+
|
---|
515 | |
|
---|
516 | +---------------+---------------+
|
---|
517 | | | |
|
---|
518 | +-----o----+ +-----o----+ +-----o----+
|
---|
519 | | Cell [A] | | Cell [B] | | Cell [C] |
|
---|
520 | +----------+ +----------+ +----------+
|
---|
521 | in: 0 in: 0 in: 0
|
---|
522 | out: 3 out: 3 out: 3
|
---|
523 |
|
---|
524 |
|
---|
525 | They pass through the cells successfully up to this point:
|
---|
526 |
|
---|
527 |
|
---|
528 | +----------+
|
---|
529 | | Pipeline |
|
---|
530 | +-----o----+
|
---|
531 | |
|
---|
532 | +---------------+---------------+
|
---|
533 | m3 | m2 | m1 |
|
---|
534 | +-----o----+ +-----o----+ +-----o----+
|
---|
535 | | Cell [A] | | Cell [B] | | Cell [C] |
|
---|
536 | +----------+ +----------+ +----------+
|
---|
537 | in: 0 in: 0 in: 0
|
---|
538 | out: 1 out: 2 out: 3
|
---|
539 |
|
---|
540 |
|
---|
541 | At this point, session `B` produces an error while processing *m2*, that is *m2*
|
---|
542 | becomes *e2*. *m1* is still in the pipeline, and *m3* is queued behind *m2*.
|
---|
543 | What ought to happen is that *m1* is handed off to the socket, then *m2* is
|
---|
544 | released to the driver, which will detect the error and begin closing the
|
---|
545 | socket. No further processing should be done on *m3* and it should not be
|
---|
546 | released to the driver after the error is emitted.
|
---|
547 |
|
---|
548 | To handle this, we allow errors to pass down the pipeline just like messages do,
|
---|
549 | to maintain ordering. But, once a cell sees its session produce an error, or it
|
---|
550 | receives an error from upstream, it should refuse to accept any further
|
---|
551 | messages. Session `B` might have begun processing *m3* by the time it produces
|
---|
552 | the error *e2*, but `C` will have been given *e2* before it receives *m3*, and
|
---|
553 | can simply drop *m3*.
|
---|
554 |
|
---|
555 | Now, say *e2* reaches the slow session `C` while *m1* is still present,
|
---|
556 | meanwhile *m3* has been dropped. `C` will never receive *m3* since it will have
|
---|
557 | been dropped upstream. Under the present model, its `out` counter will be `3`
|
---|
558 | but it is only going to emit two more values: *m1* and *e2*. In order for
|
---|
559 | closing to work, we need to decrement `out` to reflect this. The situation
|
---|
560 | should look like this:
|
---|
561 |
|
---|
562 |
|
---|
563 | +----------+
|
---|
564 | | Pipeline |
|
---|
565 | +-----o----+
|
---|
566 | |
|
---|
567 | +---------------+---------------+
|
---|
568 | | | e2 | m1
|
---|
569 | +-----o----+ +-----o----+ +-----o----+
|
---|
570 | | Cell [A] | | Cell [B] | | Cell [C] |
|
---|
571 | +----------+ +----------+ +----------+
|
---|
572 | in: 0 in: 0 in: 0
|
---|
573 | out: 0 out: 0 out: 2
|
---|
574 |
|
---|
575 |
|
---|
576 | When a cell sees its session emit an error, or when it receives an error from
|
---|
577 | upstream, it sets its pending count in the appropriate direction to equal the
|
---|
578 | number of messages it is *currently* processing. It will not accept any messages
|
---|
579 | after it sees the error, so this will allow the counter to reach zero.
|
---|
580 |
|
---|
581 | Note that while *e2* is in the pipeline, `Pipeline` should drop any further
|
---|
582 | messages in the outgoing direction, but should continue to accept incoming
|
---|
583 | messages. Until *e2* makes it out of the pipe to the driver, behind previous
|
---|
584 | successful messages, the driver does not know an error has happened, and a
|
---|
585 | message may arrive over the socket and make it all the way through the incoming
|
---|
586 | pipe in the meantime. We only halt processing in the affected direction to avoid
|
---|
587 | doing unnecessary work since messages arriving after an error should not be
|
---|
588 | processed.
|
---|
589 |
|
---|
590 | Some unnecessary work may happen, for example any messages already in the
|
---|
591 | pipeline following *m2* will be processed by `A`, since it's upstream of the
|
---|
592 | error. Those messages will be dropped by `B`.
|
---|
593 |
|
---|
594 |
|
---|
595 | ## Alternative ideas
|
---|
596 |
|
---|
597 | I am considering implementing `Functor` as an object-mode transform stream
|
---|
598 | rather than what is essentially an async function. Being object-mode, a stream
|
---|
599 | would preserve message boundaries and would also possibly help address
|
---|
600 | back-pressure. I'm not sure whether this would require external API changes so
|
---|
601 | that such streams could be connected to the downstream driver's streams.
|
---|
602 |
|
---|
603 |
|
---|
604 | ## Acknowledgements
|
---|
605 |
|
---|
606 | Credit is due to [@mnowster](https://github.com/mnowster) for helping with the
|
---|
607 | design and to [@fronx](https://github.com/fronx) for helping name things.
|
---|