forked from mirrors/gecko-dev
		
	
		
			
				
	
	
		
			598 lines
		
	
	
	
		
			16 KiB
		
	
	
	
		
			JavaScript
		
	
	
	
	
	
			
		
		
	
	
			598 lines
		
	
	
	
		
			16 KiB
		
	
	
	
		
			JavaScript
		
	
	
	
	
	
| 'use strict';
 | |
| 
 | |
| const assert = require('assert');
 | |
| const EventEmitter = require('events');
 | |
| const { createServer } = require('http');
 | |
| const { Duplex } = require('stream');
 | |
| const { randomBytes } = require('crypto');
 | |
| 
 | |
| const createWebSocketStream = require('../lib/stream');
 | |
| const Sender = require('../lib/sender');
 | |
| const WebSocket = require('..');
 | |
| const { EMPTY_BUFFER } = require('../lib/constants');
 | |
| 
 | |
| describe('createWebSocketStream', () => {
 | |
|   it('is exposed as a property of the `WebSocket` class', () => {
 | |
|     assert.strictEqual(WebSocket.createWebSocketStream, createWebSocketStream);
 | |
|   });
 | |
| 
 | |
|   it('returns a `Duplex` stream', () => {
 | |
|     const duplex = createWebSocketStream(new EventEmitter());
 | |
| 
 | |
|     assert.ok(duplex instanceof Duplex);
 | |
|   });
 | |
| 
 | |
|   it('passes the options object to the `Duplex` constructor', (done) => {
 | |
|     const wss = new WebSocket.Server({ port: 0 }, () => {
 | |
|       const ws = new WebSocket(`ws://localhost:${wss.address().port}`);
 | |
|       const duplex = createWebSocketStream(ws, {
 | |
|         allowHalfOpen: false,
 | |
|         encoding: 'utf8'
 | |
|       });
 | |
| 
 | |
|       duplex.on('data', (chunk) => {
 | |
|         assert.strictEqual(chunk, 'hi');
 | |
| 
 | |
|         duplex.on('close', () => {
 | |
|           wss.close(done);
 | |
|         });
 | |
|       });
 | |
|     });
 | |
| 
 | |
|     wss.on('connection', (ws) => {
 | |
|       ws.send(Buffer.from('hi'));
 | |
|       ws.close();
 | |
|     });
 | |
|   });
 | |
| 
 | |
|   describe('The returned stream', () => {
 | |
|     it('buffers writes if `readyState` is `CONNECTING`', (done) => {
 | |
|       const chunk = randomBytes(1024);
 | |
|       const wss = new WebSocket.Server({ port: 0 }, () => {
 | |
|         const ws = new WebSocket(`ws://localhost:${wss.address().port}`);
 | |
| 
 | |
|         assert.strictEqual(ws.readyState, WebSocket.CONNECTING);
 | |
| 
 | |
|         const duplex = createWebSocketStream(ws);
 | |
| 
 | |
|         duplex.write(chunk);
 | |
|       });
 | |
| 
 | |
|       wss.on('connection', (ws) => {
 | |
|         ws.on('message', (message, isBinary) => {
 | |
|           ws.on('close', (code, reason) => {
 | |
|             assert.deepStrictEqual(message, chunk);
 | |
|             assert.ok(isBinary);
 | |
|             assert.strictEqual(code, 1005);
 | |
|             assert.strictEqual(reason, EMPTY_BUFFER);
 | |
|             wss.close(done);
 | |
|           });
 | |
|         });
 | |
| 
 | |
|         ws.close();
 | |
|       });
 | |
|     });
 | |
| 
 | |
|     it('errors if a write occurs when `readyState` is `CLOSING`', (done) => {
 | |
|       const wss = new WebSocket.Server({ port: 0 }, () => {
 | |
|         const ws = new WebSocket(`ws://localhost:${wss.address().port}`);
 | |
|         const duplex = createWebSocketStream(ws);
 | |
| 
 | |
|         duplex.on('error', (err) => {
 | |
|           assert.ok(duplex.destroyed);
 | |
|           assert.ok(err instanceof Error);
 | |
|           assert.strictEqual(
 | |
|             err.message,
 | |
|             'WebSocket is not open: readyState 2 (CLOSING)'
 | |
|           );
 | |
| 
 | |
|           duplex.on('close', () => {
 | |
|             wss.close(done);
 | |
|           });
 | |
|         });
 | |
| 
 | |
|         ws.on('open', () => {
 | |
|           ws._receiver.on('conclude', () => {
 | |
|             duplex.write('hi');
 | |
|           });
 | |
|         });
 | |
|       });
 | |
| 
 | |
|       wss.on('connection', (ws) => {
 | |
|         ws.close();
 | |
|       });
 | |
|     });
 | |
| 
 | |
|     it('errors if a write occurs when `readyState` is `CLOSED`', (done) => {
 | |
|       const wss = new WebSocket.Server({ port: 0 }, () => {
 | |
|         const ws = new WebSocket(`ws://localhost:${wss.address().port}`);
 | |
|         const duplex = createWebSocketStream(ws);
 | |
| 
 | |
|         duplex.on('error', (err) => {
 | |
|           assert.ok(duplex.destroyed);
 | |
|           assert.ok(err instanceof Error);
 | |
|           assert.strictEqual(
 | |
|             err.message,
 | |
|             'WebSocket is not open: readyState 3 (CLOSED)'
 | |
|           );
 | |
| 
 | |
|           duplex.on('close', () => {
 | |
|             wss.close(done);
 | |
|           });
 | |
|         });
 | |
| 
 | |
|         ws.on('close', () => {
 | |
|           duplex.write('hi');
 | |
|         });
 | |
|       });
 | |
| 
 | |
|       wss.on('connection', (ws) => {
 | |
|         ws.close();
 | |
|       });
 | |
|     });
 | |
| 
 | |
|     it('does not error if `_final()` is called while connecting', (done) => {
 | |
|       const wss = new WebSocket.Server({ port: 0 }, () => {
 | |
|         const ws = new WebSocket(`ws://localhost:${wss.address().port}`);
 | |
| 
 | |
|         assert.strictEqual(ws.readyState, WebSocket.CONNECTING);
 | |
| 
 | |
|         const duplex = createWebSocketStream(ws);
 | |
| 
 | |
|         duplex.on('close', () => {
 | |
|           wss.close(done);
 | |
|         });
 | |
| 
 | |
|         duplex.resume();
 | |
|         duplex.end();
 | |
|       });
 | |
|     });
 | |
| 
 | |
|     it('makes `_final()` a noop if no socket is assigned', (done) => {
 | |
|       const server = createServer();
 | |
| 
 | |
|       server.on('upgrade', (request, socket) => {
 | |
|         socket.on('end', socket.end);
 | |
| 
 | |
|         const headers = [
 | |
|           'HTTP/1.1 101 Switching Protocols',
 | |
|           'Upgrade: websocket',
 | |
|           'Connection: Upgrade',
 | |
|           'Sec-WebSocket-Accept: foo'
 | |
|         ];
 | |
| 
 | |
|         socket.write(headers.concat('\r\n').join('\r\n'));
 | |
|       });
 | |
| 
 | |
|       server.listen(() => {
 | |
|         const called = [];
 | |
|         const ws = new WebSocket(`ws://localhost:${server.address().port}`);
 | |
|         const duplex = WebSocket.createWebSocketStream(ws);
 | |
|         const final = duplex._final;
 | |
| 
 | |
|         duplex._final = (callback) => {
 | |
|           called.push('final');
 | |
|           assert.strictEqual(ws.readyState, WebSocket.CLOSING);
 | |
|           assert.strictEqual(ws._socket, null);
 | |
| 
 | |
|           final(callback);
 | |
|         };
 | |
| 
 | |
|         duplex.on('error', (err) => {
 | |
|           called.push('error');
 | |
|           assert.ok(err instanceof Error);
 | |
|           assert.strictEqual(
 | |
|             err.message,
 | |
|             'Invalid Sec-WebSocket-Accept header'
 | |
|           );
 | |
|         });
 | |
| 
 | |
|         duplex.on('finish', () => {
 | |
|           called.push('finish');
 | |
|         });
 | |
| 
 | |
|         duplex.on('close', () => {
 | |
|           assert.deepStrictEqual(called, ['final', 'error']);
 | |
|           server.close(done);
 | |
|         });
 | |
| 
 | |
|         ws.on('upgrade', () => {
 | |
|           process.nextTick(() => {
 | |
|             duplex.end();
 | |
|           });
 | |
|         });
 | |
|       });
 | |
|     });
 | |
| 
 | |
|     it('reemits errors', (done) => {
 | |
|       let duplexCloseEventEmitted = false;
 | |
|       let serverClientCloseEventEmitted = false;
 | |
| 
 | |
|       const wss = new WebSocket.Server({ port: 0 }, () => {
 | |
|         const ws = new WebSocket(`ws://localhost:${wss.address().port}`);
 | |
|         const duplex = createWebSocketStream(ws);
 | |
| 
 | |
|         duplex.on('error', (err) => {
 | |
|           assert.ok(err instanceof RangeError);
 | |
|           assert.strictEqual(err.code, 'WS_ERR_INVALID_OPCODE');
 | |
|           assert.strictEqual(
 | |
|             err.message,
 | |
|             'Invalid WebSocket frame: invalid opcode 5'
 | |
|           );
 | |
| 
 | |
|           duplex.on('close', () => {
 | |
|             duplexCloseEventEmitted = true;
 | |
|             if (serverClientCloseEventEmitted) wss.close(done);
 | |
|           });
 | |
|         });
 | |
|       });
 | |
| 
 | |
|       wss.on('connection', (ws) => {
 | |
|         ws._socket.write(Buffer.from([0x85, 0x00]));
 | |
|         ws.on('close', (code, reason) => {
 | |
|           assert.strictEqual(code, 1002);
 | |
|           assert.deepStrictEqual(reason, EMPTY_BUFFER);
 | |
| 
 | |
|           serverClientCloseEventEmitted = true;
 | |
|           if (duplexCloseEventEmitted) wss.close(done);
 | |
|         });
 | |
|       });
 | |
|     });
 | |
| 
 | |
|     it('does not swallow errors that may occur while destroying', (done) => {
 | |
|       const frame = Buffer.concat(
 | |
|         Sender.frame(Buffer.from([0x22, 0xfa, 0xec, 0x78]), {
 | |
|           fin: true,
 | |
|           rsv1: true,
 | |
|           opcode: 0x02,
 | |
|           mask: false,
 | |
|           readOnly: false
 | |
|         })
 | |
|       );
 | |
| 
 | |
|       const wss = new WebSocket.Server(
 | |
|         {
 | |
|           perMessageDeflate: true,
 | |
|           port: 0
 | |
|         },
 | |
|         () => {
 | |
|           const ws = new WebSocket(`ws://localhost:${wss.address().port}`);
 | |
|           const duplex = createWebSocketStream(ws);
 | |
| 
 | |
|           duplex.on('error', (err) => {
 | |
|             assert.ok(err instanceof Error);
 | |
|             assert.strictEqual(err.code, 'Z_DATA_ERROR');
 | |
|             assert.strictEqual(err.errno, -3);
 | |
| 
 | |
|             duplex.on('close', () => {
 | |
|               wss.close(done);
 | |
|             });
 | |
|           });
 | |
| 
 | |
|           let bytesRead = 0;
 | |
| 
 | |
|           ws.on('open', () => {
 | |
|             ws._socket.on('data', (chunk) => {
 | |
|               bytesRead += chunk.length;
 | |
|               if (bytesRead === frame.length) duplex.destroy();
 | |
|             });
 | |
|           });
 | |
|         }
 | |
|       );
 | |
| 
 | |
|       wss.on('connection', (ws) => {
 | |
|         ws._socket.write(frame);
 | |
|       });
 | |
|     });
 | |
| 
 | |
|     it("does not suppress the throwing behavior of 'error' events", (done) => {
 | |
|       const wss = new WebSocket.Server({ port: 0 }, () => {
 | |
|         const ws = new WebSocket(`ws://localhost:${wss.address().port}`);
 | |
|         createWebSocketStream(ws);
 | |
|       });
 | |
| 
 | |
|       wss.on('connection', (ws) => {
 | |
|         ws._socket.write(Buffer.from([0x85, 0x00]));
 | |
|       });
 | |
| 
 | |
|       assert.strictEqual(process.listenerCount('uncaughtException'), 1);
 | |
| 
 | |
|       const [listener] = process.listeners('uncaughtException');
 | |
| 
 | |
|       process.removeAllListeners('uncaughtException');
 | |
|       process.once('uncaughtException', (err) => {
 | |
|         assert.ok(err instanceof Error);
 | |
|         assert.strictEqual(
 | |
|           err.message,
 | |
|           'Invalid WebSocket frame: invalid opcode 5'
 | |
|         );
 | |
| 
 | |
|         process.on('uncaughtException', listener);
 | |
|         wss.close(done);
 | |
|       });
 | |
|     });
 | |
| 
 | |
|     it("is destroyed after 'end' and 'finish' are emitted (1/2)", (done) => {
 | |
|       const wss = new WebSocket.Server({ port: 0 }, () => {
 | |
|         const events = [];
 | |
|         const ws = new WebSocket(`ws://localhost:${wss.address().port}`);
 | |
|         const duplex = createWebSocketStream(ws);
 | |
| 
 | |
|         duplex.on('end', () => {
 | |
|           events.push('end');
 | |
|           assert.ok(duplex.destroyed);
 | |
|         });
 | |
| 
 | |
|         duplex.on('close', () => {
 | |
|           assert.deepStrictEqual(events, ['finish', 'end']);
 | |
|           wss.close(done);
 | |
|         });
 | |
| 
 | |
|         duplex.on('finish', () => {
 | |
|           events.push('finish');
 | |
|           assert.ok(!duplex.destroyed);
 | |
|           assert.ok(duplex.readable);
 | |
| 
 | |
|           duplex.resume();
 | |
|         });
 | |
| 
 | |
|         ws.on('close', () => {
 | |
|           duplex.end();
 | |
|         });
 | |
|       });
 | |
| 
 | |
|       wss.on('connection', (ws) => {
 | |
|         ws.send('foo');
 | |
|         ws.close();
 | |
|       });
 | |
|     });
 | |
| 
 | |
|     it("is destroyed after 'end' and 'finish' are emitted (2/2)", (done) => {
 | |
|       const wss = new WebSocket.Server({ port: 0 }, () => {
 | |
|         const events = [];
 | |
|         const ws = new WebSocket(`ws://localhost:${wss.address().port}`);
 | |
|         const duplex = createWebSocketStream(ws);
 | |
| 
 | |
|         duplex.on('end', () => {
 | |
|           events.push('end');
 | |
|           assert.ok(!duplex.destroyed);
 | |
|           assert.ok(duplex.writable);
 | |
| 
 | |
|           duplex.end();
 | |
|         });
 | |
| 
 | |
|         duplex.on('close', () => {
 | |
|           assert.deepStrictEqual(events, ['end', 'finish']);
 | |
|           wss.close(done);
 | |
|         });
 | |
| 
 | |
|         duplex.on('finish', () => {
 | |
|           events.push('finish');
 | |
|         });
 | |
| 
 | |
|         duplex.resume();
 | |
|       });
 | |
| 
 | |
|       wss.on('connection', (ws) => {
 | |
|         ws.close();
 | |
|       });
 | |
|     });
 | |
| 
 | |
|     it('handles backpressure (1/3)', (done) => {
 | |
|       const wss = new WebSocket.Server({ port: 0 }, () => {
 | |
|         // eslint-disable-next-line no-unused-vars
 | |
|         const ws = new WebSocket(`ws://localhost:${wss.address().port}`);
 | |
|       });
 | |
| 
 | |
|       wss.on('connection', (ws) => {
 | |
|         const duplex = createWebSocketStream(ws);
 | |
| 
 | |
|         duplex.resume();
 | |
| 
 | |
|         duplex.on('drain', () => {
 | |
|           duplex.on('close', () => {
 | |
|             wss.close(done);
 | |
|           });
 | |
| 
 | |
|           duplex.end();
 | |
|         });
 | |
| 
 | |
|         const chunk = randomBytes(1024);
 | |
|         let ret;
 | |
| 
 | |
|         do {
 | |
|           ret = duplex.write(chunk);
 | |
|         } while (ret !== false);
 | |
|       });
 | |
|     });
 | |
| 
 | |
|     it('handles backpressure (2/3)', (done) => {
 | |
|       const wss = new WebSocket.Server(
 | |
|         { port: 0, perMessageDeflate: true },
 | |
|         () => {
 | |
|           const called = [];
 | |
|           const ws = new WebSocket(`ws://localhost:${wss.address().port}`);
 | |
|           const duplex = createWebSocketStream(ws);
 | |
|           const read = duplex._read;
 | |
| 
 | |
|           duplex._read = () => {
 | |
|             duplex._read = read;
 | |
|             called.push('read');
 | |
|             assert.ok(ws._receiver._writableState.needDrain);
 | |
|             read();
 | |
|             assert.ok(ws._socket.isPaused());
 | |
|           };
 | |
| 
 | |
|           ws.on('open', () => {
 | |
|             ws._socket.on('pause', () => {
 | |
|               duplex.resume();
 | |
|             });
 | |
| 
 | |
|             ws._receiver.on('drain', () => {
 | |
|               called.push('drain');
 | |
|               assert.ok(!ws._socket.isPaused());
 | |
|               duplex.end();
 | |
|             });
 | |
| 
 | |
|             const opts = {
 | |
|               fin: true,
 | |
|               opcode: 0x02,
 | |
|               mask: false,
 | |
|               readOnly: false
 | |
|             };
 | |
| 
 | |
|             const list = [
 | |
|               ...Sender.frame(randomBytes(16 * 1024), { rsv1: false, ...opts }),
 | |
|               ...Sender.frame(Buffer.alloc(1), { rsv1: true, ...opts })
 | |
|             ];
 | |
| 
 | |
|             // This hack is used because there is no guarantee that more than
 | |
|             // 16 KiB will be sent as a single TCP packet.
 | |
|             ws._socket.push(Buffer.concat(list));
 | |
|           });
 | |
| 
 | |
|           duplex.on('close', () => {
 | |
|             assert.deepStrictEqual(called, ['read', 'drain']);
 | |
|             wss.close(done);
 | |
|           });
 | |
|         }
 | |
|       );
 | |
|     });
 | |
| 
 | |
|     it('handles backpressure (3/3)', (done) => {
 | |
|       const wss = new WebSocket.Server(
 | |
|         { port: 0, perMessageDeflate: true },
 | |
|         () => {
 | |
|           const called = [];
 | |
|           const ws = new WebSocket(`ws://localhost:${wss.address().port}`);
 | |
|           const duplex = createWebSocketStream(ws);
 | |
|           const read = duplex._read;
 | |
| 
 | |
|           duplex._read = () => {
 | |
|             called.push('read');
 | |
|             assert.ok(!ws._receiver._writableState.needDrain);
 | |
|             read();
 | |
|             assert.ok(!ws._socket.isPaused());
 | |
|             duplex.end();
 | |
|           };
 | |
| 
 | |
|           ws.on('open', () => {
 | |
|             ws._receiver.on('drain', () => {
 | |
|               called.push('drain');
 | |
|               assert.ok(ws._socket.isPaused());
 | |
|               duplex.resume();
 | |
|             });
 | |
| 
 | |
|             const opts = {
 | |
|               fin: true,
 | |
|               opcode: 0x02,
 | |
|               mask: false,
 | |
|               readOnly: false
 | |
|             };
 | |
| 
 | |
|             const list = [
 | |
|               ...Sender.frame(randomBytes(16 * 1024), { rsv1: false, ...opts }),
 | |
|               ...Sender.frame(Buffer.alloc(1), { rsv1: true, ...opts })
 | |
|             ];
 | |
| 
 | |
|             ws._socket.push(Buffer.concat(list));
 | |
|           });
 | |
| 
 | |
|           duplex.on('close', () => {
 | |
|             assert.deepStrictEqual(called, ['drain', 'read']);
 | |
|             wss.close(done);
 | |
|           });
 | |
|         }
 | |
|       );
 | |
|     });
 | |
| 
 | |
|     it('can be destroyed (1/2)', (done) => {
 | |
|       const wss = new WebSocket.Server({ port: 0 }, () => {
 | |
|         const error = new Error('Oops');
 | |
|         const ws = new WebSocket(`ws://localhost:${wss.address().port}`);
 | |
|         const duplex = createWebSocketStream(ws);
 | |
| 
 | |
|         duplex.on('error', (err) => {
 | |
|           assert.strictEqual(err, error);
 | |
| 
 | |
|           duplex.on('close', () => {
 | |
|             wss.close(done);
 | |
|           });
 | |
|         });
 | |
| 
 | |
|         ws.on('open', () => {
 | |
|           duplex.destroy(error);
 | |
|         });
 | |
|       });
 | |
|     });
 | |
| 
 | |
|     it('can be destroyed (2/2)', (done) => {
 | |
|       const wss = new WebSocket.Server({ port: 0 }, () => {
 | |
|         const ws = new WebSocket(`ws://localhost:${wss.address().port}`);
 | |
|         const duplex = createWebSocketStream(ws);
 | |
| 
 | |
|         duplex.on('close', () => {
 | |
|           wss.close(done);
 | |
|         });
 | |
| 
 | |
|         ws.on('open', () => {
 | |
|           duplex.destroy();
 | |
|         });
 | |
|       });
 | |
|     });
 | |
| 
 | |
|     it('converts text messages to strings in readable object mode', (done) => {
 | |
|       const wss = new WebSocket.Server({ port: 0 }, () => {
 | |
|         const events = [];
 | |
|         const ws = new WebSocket(`ws://localhost:${wss.address().port}`);
 | |
|         const duplex = createWebSocketStream(ws, { readableObjectMode: true });
 | |
| 
 | |
|         duplex.on('data', (data) => {
 | |
|           events.push('data');
 | |
|           assert.strictEqual(data, 'foo');
 | |
|         });
 | |
| 
 | |
|         duplex.on('end', () => {
 | |
|           events.push('end');
 | |
|           duplex.end();
 | |
|         });
 | |
| 
 | |
|         duplex.on('close', () => {
 | |
|           assert.deepStrictEqual(events, ['data', 'end']);
 | |
|           wss.close(done);
 | |
|         });
 | |
|       });
 | |
| 
 | |
|       wss.on('connection', (ws) => {
 | |
|         ws.send('foo');
 | |
|         ws.close();
 | |
|       });
 | |
|     });
 | |
| 
 | |
|     it('resumes the socket if `readyState` is `CLOSING`', (done) => {
 | |
|       const wss = new WebSocket.Server({ port: 0 }, () => {
 | |
|         const ws = new WebSocket(`ws://localhost:${wss.address().port}`);
 | |
|         const duplex = createWebSocketStream(ws);
 | |
| 
 | |
|         ws.on('message', () => {
 | |
|           assert.ok(ws._socket.isPaused());
 | |
| 
 | |
|           duplex.on('close', () => {
 | |
|             wss.close(done);
 | |
|           });
 | |
| 
 | |
|           duplex.end();
 | |
| 
 | |
|           process.nextTick(() => {
 | |
|             assert.strictEqual(ws.readyState, WebSocket.CLOSING);
 | |
|             duplex.resume();
 | |
|           });
 | |
|         });
 | |
|       });
 | |
| 
 | |
|       wss.on('connection', (ws) => {
 | |
|         ws.send(randomBytes(16 * 1024));
 | |
|       });
 | |
|     });
 | |
|   });
 | |
| });
 | 
