forked from mirrors/gecko-dev
		
	
		
			
				
	
	
		
			262 lines
		
	
	
	
		
			9.4 KiB
		
	
	
	
		
			JavaScript
		
	
	
	
	
	
			
		
		
	
	
			262 lines
		
	
	
	
		
			9.4 KiB
		
	
	
	
		
			JavaScript
		
	
	
	
	
	
| var assert = require('assert');
 | |
| 
 | |
| var Serializer   = require('./framer').Serializer;
 | |
| var Deserializer = require('./framer').Deserializer;
 | |
| var Compressor   = require('./compressor').Compressor;
 | |
| var Decompressor = require('./compressor').Decompressor;
 | |
| var Connection   = require('./connection').Connection;
 | |
| var Duplex       = require('stream').Duplex;
 | |
| var Transform    = require('stream').Transform;
 | |
| 
 | |
| exports.Endpoint = Endpoint;
 | |
| 
 | |
| // The Endpoint class
 | |
| // ==================
 | |
| 
 | |
| // Public API
 | |
| // ----------
 | |
| 
 | |
| // - **new Endpoint(log, role, settings, filters)**: create a new Endpoint.
 | |
| //
 | |
| //   - `log`: bunyan logger of the parent
 | |
| //   - `role`: 'CLIENT' or 'SERVER'
 | |
| //   - `settings`: initial HTTP/2 settings
 | |
| //   - `filters`: a map of functions that filter the traffic between components (for debugging or
 | |
| //     intentional failure injection).
 | |
| //
 | |
| //     Filter functions get three arguments:
 | |
| //     1. `frame`: the current frame
 | |
| //     2. `forward(frame)`: function that can be used to forward a frame to the next component
 | |
| //     3. `done()`: callback to signal the end of the filter process
 | |
| //
 | |
| //     Valid filter names and their position in the stack:
 | |
| //     - `beforeSerialization`: after compression, before serialization
 | |
| //     - `beforeCompression`: after multiplexing, before compression
 | |
| //     - `afterDeserialization`: after deserialization, before decompression
 | |
| //     - `afterDecompression`: after decompression, before multiplexing
 | |
| //
 | |
| // * **Event: 'stream' (Stream)**: 'stream' event forwarded from the underlying Connection
 | |
| //
 | |
| // * **Event: 'error' (type)**: signals an error
 | |
| //
 | |
| // * **createStream(): Stream**: initiate a new stream (forwarded to the underlying Connection)
 | |
| //
 | |
| // * **close([error])**: close the connection with an error code
 | |
| 
 | |
| // Constructor
 | |
| // -----------
 | |
| 
 | |
| // The process of initialization:
 | |
| function Endpoint(log, role, settings, filters) {
 | |
|   Duplex.call(this);
 | |
| 
 | |
|   // * Initializing logging infrastructure
 | |
|   this._log = log.child({ component: 'endpoint', e: this });
 | |
| 
 | |
|   // * First part of the handshake process: sending and receiving the client connection header
 | |
|   //   prelude.
 | |
|   assert((role === 'CLIENT') || role === 'SERVER');
 | |
|   if (role === 'CLIENT') {
 | |
|     this._writePrelude();
 | |
|   } else {
 | |
|     this._readPrelude();
 | |
|   }
 | |
| 
 | |
|   // * Initialization of component. This includes the second part of the handshake process:
 | |
|   //   sending the first SETTINGS frame. This is done by the connection class right after
 | |
|   //   initialization.
 | |
|   this._initializeDataFlow(role, settings, filters || {});
 | |
| 
 | |
|   // * Initialization of management code.
 | |
|   this._initializeManagement();
 | |
| 
 | |
|   // * Initializing error handling.
 | |
|   this._initializeErrorHandling();
 | |
| }
 | |
| Endpoint.prototype = Object.create(Duplex.prototype, { constructor: { value: Endpoint } });
 | |
| 
 | |
| // Handshake
 | |
| // ---------
 | |
| 
 | |
| var CLIENT_PRELUDE = new Buffer('PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n');
 | |
| 
 | |
| // Writing the client header is simple and synchronous.
 | |
| Endpoint.prototype._writePrelude = function _writePrelude() {
 | |
|   this._log.debug('Sending the client connection header prelude.');
 | |
|   this.push(CLIENT_PRELUDE);
 | |
| };
 | |
| 
 | |
| // The asynchronous process of reading the client header:
 | |
| Endpoint.prototype._readPrelude = function _readPrelude() {
 | |
|   // * progress in the header is tracker using a `cursor`
 | |
|   var cursor = 0;
 | |
| 
 | |
|   // * `_write` is temporarily replaced by the comparator function
 | |
|   this._write = function _temporalWrite(chunk, encoding, done) {
 | |
|     // * which compares the stored header with the current `chunk` byte by byte and emits the
 | |
|     //   'error' event if there's a byte that doesn't match
 | |
|     var offset = cursor;
 | |
|     while(cursor < CLIENT_PRELUDE.length && (cursor - offset) < chunk.length) {
 | |
|       if (CLIENT_PRELUDE[cursor] !== chunk[cursor - offset]) {
 | |
|         this._log.fatal({ cursor: cursor, offset: offset, chunk: chunk },
 | |
|                         'Client connection header prelude does not match.');
 | |
|         this._error('handshake', 'PROTOCOL_ERROR');
 | |
|         return;
 | |
|       }
 | |
|       cursor += 1;
 | |
|     }
 | |
| 
 | |
|     // * if the whole header is over, and there were no error then restore the original `_write`
 | |
|     //   and call it with the remaining part of the current chunk
 | |
|     if (cursor === CLIENT_PRELUDE.length) {
 | |
|       this._log.debug('Successfully received the client connection header prelude.');
 | |
|       delete this._write;
 | |
|       chunk = chunk.slice(cursor - offset);
 | |
|       this._write(chunk, encoding, done);
 | |
|     }
 | |
|   };
 | |
| };
 | |
| 
 | |
| // Data flow
 | |
| // ---------
 | |
| 
 | |
| //     +---------------------------------------------+
 | |
| //     |                                             |
 | |
| //     |   +-------------------------------------+   |
 | |
| //     |   | +---------+ +---------+ +---------+ |   |
 | |
| //     |   | | stream1 | | stream2 | |   ...   | |   |
 | |
| //     |   | +---------+ +---------+ +---------+ |   |
 | |
| //     |   |             connection              |   |
 | |
| //     |   +-------------------------------------+   |
 | |
| //     |             |                 ^             |
 | |
| //     |        pipe |                 | pipe        |
 | |
| //     |             v                 |             |
 | |
| //     |   +------------------+------------------+   |
 | |
| //     |   |    compressor    |   decompressor   |   |
 | |
| //     |   +------------------+------------------+   |
 | |
| //     |             |                 ^             |
 | |
| //     |        pipe |                 | pipe        |
 | |
| //     |             v                 |             |
 | |
| //     |   +------------------+------------------+   |
 | |
| //     |   |    serializer    |   deserializer   |   |
 | |
| //     |   +------------------+------------------+   |
 | |
| //     |             |                 ^             |
 | |
| //     |     _read() |                 | _write()    |
 | |
| //     |             v                 |             |
 | |
| //     |      +------------+     +-----------+       |
 | |
| //     |      |output queue|     |input queue|       |
 | |
| //     +------+------------+-----+-----------+-------+
 | |
| //                   |                 ^
 | |
| //            read() |                 | write()
 | |
| //                   v                 |
 | |
| 
 | |
| function createTransformStream(filter) {
 | |
|   var transform = new Transform({ objectMode: true });
 | |
|   var push = transform.push.bind(transform);
 | |
|   transform._transform = function(frame, encoding, done) {
 | |
|     filter(frame, push, done);
 | |
|   };
 | |
|   return transform;
 | |
| }
 | |
| 
 | |
| function pipeAndFilter(stream1, stream2, filter) {
 | |
|   if (filter) {
 | |
|     stream1.pipe(createTransformStream(filter)).pipe(stream2);
 | |
|   } else {
 | |
|     stream1.pipe(stream2);
 | |
|   }
 | |
| }
 | |
| 
 | |
| Endpoint.prototype._initializeDataFlow = function _initializeDataFlow(role, settings, filters) {
 | |
|   var firstStreamId, compressorRole, decompressorRole;
 | |
|   if (role === 'CLIENT') {
 | |
|     firstStreamId = 1;
 | |
|     compressorRole = 'REQUEST';
 | |
|     decompressorRole = 'RESPONSE';
 | |
|   } else {
 | |
|     firstStreamId = 2;
 | |
|     compressorRole = 'RESPONSE';
 | |
|     decompressorRole = 'REQUEST';
 | |
|   }
 | |
| 
 | |
|   this._serializer   = new Serializer(this._log);
 | |
|   this._deserializer = new Deserializer(this._log);
 | |
|   this._compressor   = new Compressor(this._log, compressorRole);
 | |
|   this._decompressor = new Decompressor(this._log, decompressorRole);
 | |
|   this._connection   = new Connection(this._log, firstStreamId, settings);
 | |
| 
 | |
|   pipeAndFilter(this._connection, this._compressor, filters.beforeCompression);
 | |
|   pipeAndFilter(this._compressor, this._serializer, filters.beforeSerialization);
 | |
|   pipeAndFilter(this._deserializer, this._decompressor, filters.afterDeserialization);
 | |
|   pipeAndFilter(this._decompressor, this._connection, filters.afterDecompression);
 | |
| 
 | |
|   this._connection.on('ACKNOWLEDGED_SETTINGS_HEADER_TABLE_SIZE',
 | |
|                       this._decompressor.setTableSizeLimit.bind(this._decompressor));
 | |
|   this._connection.on('RECEIVING_SETTINGS_HEADER_TABLE_SIZE',
 | |
|                       this._compressor.setTableSizeLimit.bind(this._compressor));
 | |
| };
 | |
| 
 | |
| var noread = {};
 | |
| Endpoint.prototype._read = function _read() {
 | |
|   this._readableState.sync = true;
 | |
|   var moreNeeded = noread, chunk;
 | |
|   while (moreNeeded && (chunk = this._serializer.read())) {
 | |
|     moreNeeded = this.push(chunk);
 | |
|   }
 | |
|   if (moreNeeded === noread) {
 | |
|     this._serializer.once('readable', this._read.bind(this));
 | |
|   }
 | |
|   this._readableState.sync = false;
 | |
| };
 | |
| 
 | |
| Endpoint.prototype._write = function _write(chunk, encoding, done) {
 | |
|   this._deserializer.write(chunk, encoding, done);
 | |
| };
 | |
| 
 | |
| // Management
 | |
| // --------------
 | |
| 
 | |
| Endpoint.prototype._initializeManagement = function _initializeManagement() {
 | |
|   this._connection.on('stream', this.emit.bind(this, 'stream'));
 | |
| };
 | |
| 
 | |
| Endpoint.prototype.createStream = function createStream() {
 | |
|   return this._connection.createStream();
 | |
| };
 | |
| 
 | |
| // Error handling
 | |
| // --------------
 | |
| 
 | |
| Endpoint.prototype._initializeErrorHandling = function _initializeErrorHandling() {
 | |
|   this._serializer.on('error', this._error.bind(this, 'serializer'));
 | |
|   this._deserializer.on('error', this._error.bind(this, 'deserializer'));
 | |
|   this._compressor.on('error', this._error.bind(this, 'compressor'));
 | |
|   this._decompressor.on('error', this._error.bind(this, 'decompressor'));
 | |
|   this._connection.on('error', this._error.bind(this, 'connection'));
 | |
| 
 | |
|   this._connection.on('peerError', this.emit.bind(this, 'peerError'));
 | |
| };
 | |
| 
 | |
| Endpoint.prototype._error = function _error(component, error) {
 | |
|   this._log.fatal({ source: component, message: error }, 'Fatal error, closing connection');
 | |
|   this.close(error);
 | |
|   setImmediate(this.emit.bind(this, 'error', error));
 | |
| };
 | |
| 
 | |
| Endpoint.prototype.close = function close(error) {
 | |
|   this._connection.close(error);
 | |
| };
 | |
| 
 | |
| // Bunyan serializers
 | |
| // ------------------
 | |
| 
 | |
| exports.serializers = {};
 | |
| 
 | |
| var nextId = 0;
 | |
| exports.serializers.e = function(endpoint) {
 | |
|   if (!('id' in endpoint)) {
 | |
|     endpoint.id = nextId;
 | |
|     nextId += 1;
 | |
|   }
 | |
|   return endpoint.id;
 | |
| };
 | 
