source: trip-planner-front/node_modules/websocket-extensions/lib/pipeline/README.md

Last change on this file was 6a3a178, checked in by Ema <ema_spirova@…>, 3 years ago

initial commit

  • Property mode set to 100644
File size: 24.1 KB
RevLine 
[6a3a178]1# Extension pipelining
2
3`websocket-extensions` models the extension negotiation and processing pipeline
4of the WebSocket protocol. Between the driver parsing messages from the TCP
5stream and handing those messages off to the application, there may exist a
6stack of extensions that transform the message somehow.
7
8In the parlance of this framework, a *session* refers to a single instance of an
9extension, acting on a particular socket on either the server or the client
10side. A session may transform messages both incoming to the application and
11outgoing from the application, for example the `permessage-deflate` extension
12compresses outgoing messages and decompresses incoming messages. Message streams
13in either direction are independent; that is, incoming and outgoing messages
14cannot be assumed to 'pair up' as in a request-response protocol.
15
16Asynchronous processing of messages poses a number of problems that this
17pipeline construction is intended to solve.
18
19
20## Overview
21
22Logically, we have the following:
23
24
25 +-------------+ out +---+ +---+ +---+ +--------+
26 | |------>| |---->| |---->| |------>| |
27 | Application | | A | | B | | C | | Driver |
28 | |<------| |<----| |<----| |<------| |
29 +-------------+ in +---+ +---+ +---+ +--------+
30
31 \ /
32 +----------o----------+
33 |
34 sessions
35
36
37For outgoing messages, the driver receives the result of
38
39 C.outgoing(B.outgoing(A.outgoing(message)))
40
41 or, [A, B, C].reduce(((m, ext) => ext.outgoing(m)), message)
42
43For incoming messages, the application receives the result of
44
45 A.incoming(B.incoming(C.incoming(message)))
46
47 or, [C, B, A].reduce(((m, ext) => ext.incoming(m)), message)
48
49A session is of the following type, to borrow notation from pseudo-Haskell:
50
51 type Session = {
52 incoming :: Message -> Message
53 outgoing :: Message -> Message
54 close :: () -> ()
55 }
56
57(That `() -> ()` syntax is intended to mean that `close()` is a nullary void
58method; I apologise to any Haskell readers for not using the right monad.)
59
60The `incoming()` and `outgoing()` methods perform message transformation in the
61respective directions; `close()` is called when a socket closes so the session
62can release any resources it's holding, for example a DEFLATE de/compression
63context.
64
65However because this is JavaScript, the `incoming()` and `outgoing()` methods
66may be asynchronous (indeed, `permessage-deflate` is based on `zlib`, whose API
67is stream-based). So their interface is strictly:
68
69 type Session = {
70 incoming :: Message -> Callback -> ()
71 outgoing :: Message -> Callback -> ()
72 close :: () -> ()
73 }
74
75 type Callback = Either Error Message -> ()
76
77This means a message *m2* can be pushed into a session while it's still
78processing the preceding message *m1*. The messages can be processed
79concurrently but they *must* be given to the next session in line (or to the
80application) in the same order they came in. Applications will expect to receive
81messages in the order they arrived over the wire, and sessions require this too.
82So ordering of messages must be preserved throughout the pipeline.
83
84Consider the following highly simplified extension that deflates messages on the
85wire. `message` is a value conforming the type:
86
87 type Message = {
88 rsv1 :: Boolean
89 rsv2 :: Boolean
90 rsv3 :: Boolean
91 opcode :: Number
92 data :: Buffer
93 }
94
95Here's the extension:
96
97```js
98var zlib = require('zlib');
99
100var deflate = {
101 outgoing: function(message, callback) {
102 zlib.deflateRaw(message.data, function(error, result) {
103 message.rsv1 = true;
104 message.data = result;
105 callback(error, message);
106 });
107 },
108
109 incoming: function(message, callback) {
110 // decompress inbound messages (elided)
111 },
112
113 close: function() {
114 // no state to clean up
115 }
116};
117```
118
119We can call it with a large message followed by a small one, and the small one
120will be returned first:
121
122```js
123var crypto = require('crypto'),
124 large = crypto.randomBytes(1 << 14),
125 small = new Buffer('hi');
126
127deflate.outgoing({ data: large }, function() {
128 console.log(1, 'large');
129});
130
131deflate.outgoing({ data: small }, function() {
132 console.log(2, 'small');
133});
134
135/* prints: 2 'small'
136 1 'large' */
137```
138
139So a session that processes messages asynchronously may fail to preserve message
140ordering.
141
142Now, this extension is stateless, so it can process messages in any order and
143still produce the same output. But some extensions are stateful and require
144message order to be preserved.
145
146For example, when using `permessage-deflate` without `no_context_takeover` set,
147the session retains a DEFLATE de/compression context between messages, which
148accumulates state as it consumes data (later messages can refer to sections of
149previous ones to improve compression). Reordering parts of the DEFLATE stream
150will result in a failed decompression. Messages must be decompressed in the same
151order they were compressed by the peer in order for the DEFLATE protocol to
152work.
153
154Finally, there is the problem of closing a socket. When a WebSocket is closed by
155the application, or receives a closing request from the other peer, there may be
156messages outgoing from the application and incoming from the peer in the
157pipeline. If we close the socket and pipeline immediately, two problems arise:
158
159* We may send our own closing frame to the peer before all prior messages we
160 sent have been written to the socket, and before we have finished processing
161 all prior messages from the peer
162* The session may be instructed to close its resources (e.g. its de/compression
163 context) while it's in the middle of processing a message, or before it has
164 received messages that are upstream of it in the pipeline
165
166Essentially, we must defer closing the sessions and sending a closing frame
167until after all prior messages have exited the pipeline.
168
169
170## Design goals
171
172* Message order must be preserved between the protocol driver, the extension
173 sessions, and the application
174* Messages should be handed off to sessions and endpoints as soon as possible,
175 to maximise throughput of stateless sessions
176* The closing procedure should block any further messages from entering the
177 pipeline, and should allow all existing messages to drain
178* Sessions should be closed as soon as possible to prevent them holding memory
179 and other resources when they have no more messages to handle
180* The closing API should allow the caller to detect when the pipeline is empty
181 and it is safe to continue the WebSocket closing procedure
182* Individual extensions should remain as simple as possible to facilitate
183 modularity and independent authorship
184
185The final point about modularity is an important one: this framework is designed
186to facilitate extensions existing as plugins, by decoupling the protocol driver,
187extensions, and application. In an ideal world, plugins should only need to
188contain code for their specific functionality, and not solve these problems that
189apply to all sessions. Also, solving some of these problems requires
190consideration of all active sessions collectively, which an individual session
191is incapable of doing.
192
193For example, it is entirely possible to take the simple `deflate` extension
194above and wrap its `incoming()` and `outgoing()` methods in two `Transform`
195streams, producing this type:
196
197 type Session = {
198 incoming :: TransformStream
199 outtoing :: TransformStream
200 close :: () -> ()
201 }
202
203The `Transform` class makes it easy to wrap an async function such that message
204order is preserved:
205
206```js
207var stream = require('stream'),
208 session = new stream.Transform({ objectMode: true });
209
210session._transform = function(message, _, callback) {
211 var self = this;
212 deflate.outgoing(message, function(error, result) {
213 self.push(result);
214 callback();
215 });
216};
217```
218
219However, this has a negative impact on throughput: it works by deferring
220`callback()` until the async function has 'returned', which blocks `Transform`
221from passing further input into the `_transform()` method until the current
222message is dealt with completely. This would prevent sessions from processing
223messages concurrently, and would unnecessarily reduce the throughput of
224stateless extensions.
225
226So, input should be handed off to sessions as soon as possible, and all we need
227is a mechanism to reorder the output so that message order is preserved for the
228next session in line.
229
230
231## Solution
232
233We now describe the model implemented here and how it meets the above design
234goals. The above diagram where a stack of extensions sit between the driver and
235application describes the data flow, but not the object graph. That looks like
236this:
237
238
239 +--------+
240 | Driver |
241 +---o----+
242 |
243 V
244 +------------+ +----------+
245 | Extensions o----->| Pipeline |
246 +------------+ +-----o----+
247 |
248 +---------------+---------------+
249 | | |
250 +-----o----+ +-----o----+ +-----o----+
251 | Cell [A] | | Cell [B] | | Cell [C] |
252 +----------+ +----------+ +----------+
253
254
255A driver using this framework holds an instance of the `Extensions` class, which
256it uses to register extension plugins, negotiate headers and transform messages.
257The `Extensions` instance itself holds a `Pipeline`, which contains an array of
258`Cell` objects, each of which wraps one of the sessions.
259
260
261### Message processing
262
263Both the `Pipeline` and `Cell` classes have `incoming()` and `outgoing()`
264methods; the `Pipeline` interface pushes messages into the pipe, delegates the
265message to each `Cell` in turn, then returns it back to the driver. Outgoing
266messages pass through `A` then `B` then `C`, and incoming messages in the
267reverse order.
268
269Internally, a `Cell` contains two `Functor` objects. A `Functor` wraps an async
270function and makes sure its output messages maintain the order of its input
271messages. This name is due to [@fronx](https://github.com/fronx), on the basis
272that, by preserving message order, the abstraction preserves the *mapping*
273between input and output messages. To use our simple `deflate` extension from
274above:
275
276```js
277var functor = new Functor(deflate, 'outgoing');
278
279functor.call({ data: large }, function() {
280 console.log(1, 'large');
281});
282
283functor.call({ data: small }, function() {
284 console.log(2, 'small');
285});
286
287/* -> 1 'large'
288 2 'small' */
289```
290
291A `Cell` contains two of these, one for each direction:
292
293
294 +-----------------------+
295 +---->| Functor [A, incoming] |
296 +----------+ | +-----------------------+
297 | Cell [A] o------+
298 +----------+ | +-----------------------+
299 +---->| Functor [A, outgoing] |
300 +-----------------------+
301
302
303This satisfies the message transformation requirements: the `Pipeline` simply
304loops over the cells in the appropriate direction to transform each message.
305Because each `Cell` will preserve message order, we can pass a message to the
306next `Cell` in line as soon as the current `Cell` returns it. This gives each
307`Cell` all the messages in order while maximising throughput.
308
309
310### Session closing
311
312We want to close each session as soon as possible, after all existing messages
313have drained. To do this, each `Cell` begins with a pending message counter in
314each direction, labelled `in` and `out` below.
315
316
317 +----------+
318 | Pipeline |
319 +-----o----+
320 |
321 +---------------+---------------+
322 | | |
323 +-----o----+ +-----o----+ +-----o----+
324 | Cell [A] | | Cell [B] | | Cell [C] |
325 +----------+ +----------+ +----------+
326 in: 0 in: 0 in: 0
327 out: 0 out: 0 out: 0
328
329
330When a message *m1* enters the pipeline, say in the `outgoing` direction, we
331increment the `pending.out` counter on all cells immediately.
332
333
334 +----------+
335 m1 => | Pipeline |
336 +-----o----+
337 |
338 +---------------+---------------+
339 | | |
340 +-----o----+ +-----o----+ +-----o----+
341 | Cell [A] | | Cell [B] | | Cell [C] |
342 +----------+ +----------+ +----------+
343 in: 0 in: 0 in: 0
344 out: 1 out: 1 out: 1
345
346
347*m1* is handed off to `A`, meanwhile a second message `m2` arrives in the same
348direction. All `pending.out` counters are again incremented.
349
350
351 +----------+
352 m2 => | Pipeline |
353 +-----o----+
354 |
355 +---------------+---------------+
356 m1 | | |
357 +-----o----+ +-----o----+ +-----o----+
358 | Cell [A] | | Cell [B] | | Cell [C] |
359 +----------+ +----------+ +----------+
360 in: 0 in: 0 in: 0
361 out: 2 out: 2 out: 2
362
363
364When the first cell's `A.outgoing` functor finishes processing *m1*, the first
365`pending.out` counter is decremented and *m1* is handed off to cell `B`.
366
367
368 +----------+
369 | Pipeline |
370 +-----o----+
371 |
372 +---------------+---------------+
373 m2 | m1 | |
374 +-----o----+ +-----o----+ +-----o----+
375 | Cell [A] | | Cell [B] | | Cell [C] |
376 +----------+ +----------+ +----------+
377 in: 0 in: 0 in: 0
378 out: 1 out: 2 out: 2
379
380
381
382As `B` finishes with *m1*, and as `A` finishes with *m2*, the `pending.out`
383counters continue to decrement.
384
385
386 +----------+
387 | Pipeline |
388 +-----o----+
389 |
390 +---------------+---------------+
391 | m2 | m1 |
392 +-----o----+ +-----o----+ +-----o----+
393 | Cell [A] | | Cell [B] | | Cell [C] |
394 +----------+ +----------+ +----------+
395 in: 0 in: 0 in: 0
396 out: 0 out: 1 out: 2
397
398
399
400Say `C` is a little slow, and begins processing *m2* while still processing
401*m1*. That's fine, the `Functor` mechanism will keep *m1* ahead of *m2* in the
402output.
403
404
405 +----------+
406 | Pipeline |
407 +-----o----+
408 |
409 +---------------+---------------+
410 | | m2 | m1
411 +-----o----+ +-----o----+ +-----o----+
412 | Cell [A] | | Cell [B] | | Cell [C] |
413 +----------+ +----------+ +----------+
414 in: 0 in: 0 in: 0
415 out: 0 out: 0 out: 2
416
417
418Once all messages are dealt with, the counters return to `0`.
419
420
421 +----------+
422 | Pipeline |
423 +-----o----+
424 |
425 +---------------+---------------+
426 | | |
427 +-----o----+ +-----o----+ +-----o----+
428 | Cell [A] | | Cell [B] | | Cell [C] |
429 +----------+ +----------+ +----------+
430 in: 0 in: 0 in: 0
431 out: 0 out: 0 out: 0
432
433
434The same process applies in the `incoming` direction, the only difference being
435that messages are passed to `C` first.
436
437This makes closing the sessions quite simple. When the driver wants to close the
438socket, it calls `Pipeline.close()`. This *immediately* calls `close()` on all
439the cells. If a cell has `in == out == 0`, then it immediately calls
440`session.close()`. Otherwise, it stores the closing call and defers it until
441`in` and `out` have both ticked down to zero. The pipeline will not accept new
442messages after `close()` has been called, so we know the pending counts will not
443increase after this point.
444
445This means each session is closed as soon as possible: `A` can close while the
446slow `C` session is still working, because it knows there are no more messages
447on the way. Similarly, `C` will defer closing if `close()` is called while *m1*
448is still in `B`, and *m2* in `A`, because its pending count means it knows it
449has work yet to do, even if it's not received those messages yet. This concern
450cannot be addressed by extensions acting only on their own local state, unless
451we pollute individual extensions by making them all implement this same
452mechanism.
453
454The actual closing API at each level is slightly different:
455
456 type Session = {
457 close :: () -> ()
458 }
459
460 type Cell = {
461 close :: () -> Promise ()
462 }
463
464 type Pipeline = {
465 close :: Callback -> ()
466 }
467
468This might appear inconsistent so it's worth explaining. Remember that a
469`Pipeline` holds a list of `Cell` objects, each wrapping a `Session`. The driver
470talks (via the `Extensions` API) to the `Pipeline` interface, and it wants
471`Pipeline.close()` to do two things: close all the sessions, and tell me when
472it's safe to start the closing procedure (i.e. when all messages have drained
473from the pipe and been handed off to the application or socket). A callback API
474works well for that.
475
476At the other end of the stack, `Session.close()` is a nullary void method with
477no callback or promise API because we don't care what it does, and whatever it
478does do will not block the WebSocket protocol; we're not going to hold off
479processing messages while a session closes its de/compression context. We just
480tell it to close itself, and don't want to wait while it does that.
481
482In the middle, `Cell.close()` returns a promise rather than using a callback.
483This is for two reasons. First, `Cell.close()` might not do anything
484immediately, it might have to defer its effect while messages drain. So, if
485given a callback, it would have to store it in a queue for later execution.
486Callbacks work fine if your method does something and can then invoke the
487callback itself, but if you need to store callbacks somewhere so another method
488can execute them, a promise is a better fit. Second, it better serves the
489purposes of `Pipeline.close()`: it wants to call `close()` on each of a list of
490cells, and wait for all of them to finish. This is simple and idiomatic using
491promises:
492
493```js
494var closed = cells.map((cell) => cell.close());
495Promise.all(closed).then(callback);
496```
497
498(We don't actually use a full *Promises/A+* compatible promise here, we use a
499much simplified construction that acts as a callback aggregater and resolves
500synchronously and does not support chaining, but the principle is the same.)
501
502
503### Error handling
504
505We've not mentioned error handling so far but it bears some explanation. The
506above counter system still applies, but behaves slightly differently in the
507presence of errors.
508
509Say we push three messages into the pipe in the outgoing direction:
510
511
512 +----------+
513 m3, m2, m1 => | Pipeline |
514 +-----o----+
515 |
516 +---------------+---------------+
517 | | |
518 +-----o----+ +-----o----+ +-----o----+
519 | Cell [A] | | Cell [B] | | Cell [C] |
520 +----------+ +----------+ +----------+
521 in: 0 in: 0 in: 0
522 out: 3 out: 3 out: 3
523
524
525They pass through the cells successfully up to this point:
526
527
528 +----------+
529 | Pipeline |
530 +-----o----+
531 |
532 +---------------+---------------+
533 m3 | m2 | m1 |
534 +-----o----+ +-----o----+ +-----o----+
535 | Cell [A] | | Cell [B] | | Cell [C] |
536 +----------+ +----------+ +----------+
537 in: 0 in: 0 in: 0
538 out: 1 out: 2 out: 3
539
540
541At this point, session `B` produces an error while processing *m2*, that is *m2*
542becomes *e2*. *m1* is still in the pipeline, and *m3* is queued behind *m2*.
543What ought to happen is that *m1* is handed off to the socket, then *m2* is
544released to the driver, which will detect the error and begin closing the
545socket. No further processing should be done on *m3* and it should not be
546released to the driver after the error is emitted.
547
548To handle this, we allow errors to pass down the pipeline just like messages do,
549to maintain ordering. But, once a cell sees its session produce an error, or it
550receives an error from upstream, it should refuse to accept any further
551messages. Session `B` might have begun processing *m3* by the time it produces
552the error *e2*, but `C` will have been given *e2* before it receives *m3*, and
553can simply drop *m3*.
554
555Now, say *e2* reaches the slow session `C` while *m1* is still present,
556meanwhile *m3* has been dropped. `C` will never receive *m3* since it will have
557been dropped upstream. Under the present model, its `out` counter will be `3`
558but it is only going to emit two more values: *m1* and *e2*. In order for
559closing to work, we need to decrement `out` to reflect this. The situation
560should look like this:
561
562
563 +----------+
564 | Pipeline |
565 +-----o----+
566 |
567 +---------------+---------------+
568 | | e2 | m1
569 +-----o----+ +-----o----+ +-----o----+
570 | Cell [A] | | Cell [B] | | Cell [C] |
571 +----------+ +----------+ +----------+
572 in: 0 in: 0 in: 0
573 out: 0 out: 0 out: 2
574
575
576When a cell sees its session emit an error, or when it receives an error from
577upstream, it sets its pending count in the appropriate direction to equal the
578number of messages it is *currently* processing. It will not accept any messages
579after it sees the error, so this will allow the counter to reach zero.
580
581Note that while *e2* is in the pipeline, `Pipeline` should drop any further
582messages in the outgoing direction, but should continue to accept incoming
583messages. Until *e2* makes it out of the pipe to the driver, behind previous
584successful messages, the driver does not know an error has happened, and a
585message may arrive over the socket and make it all the way through the incoming
586pipe in the meantime. We only halt processing in the affected direction to avoid
587doing unnecessary work since messages arriving after an error should not be
588processed.
589
590Some unnecessary work may happen, for example any messages already in the
591pipeline following *m2* will be processed by `A`, since it's upstream of the
592error. Those messages will be dropped by `B`.
593
594
595## Alternative ideas
596
597I am considering implementing `Functor` as an object-mode transform stream
598rather than what is essentially an async function. Being object-mode, a stream
599would preserve message boundaries and would also possibly help address
600back-pressure. I'm not sure whether this would require external API changes so
601that such streams could be connected to the downstream driver's streams.
602
603
604## Acknowledgements
605
606Credit is due to [@mnowster](https://github.com/mnowster) for helping with the
607design and to [@fronx](https://github.com/fronx) for helping name things.
Note: See TracBrowser for help on using the repository browser.