avro-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From b...@apache.org
Subject [03/43] avro git commit: AVRO-1778. JavaScript: Add IPC/RPC support.
Date Sat, 14 May 2016 23:43:36 GMT
AVRO-1778. JavaScript: Add IPC/RPC support.

This commit adds protocols to the JavaScript implementation.

The API was designed to:

+ Be simple and idiomatic. The `Protocol` class added here is heavily
  inspired by node.js' core `EventEmitter` to keep things as familiar as
  possible. Getting a client and server working is straightforward
  and requires very few lines of code.
+ Support arbitrary transports, both stateful and stateless. Built-in
  node.js streams are supported out of the box (e.g. TCP/UNIX sockets,
  or even stdin/stdout). Exchanging messages over a custom transport
  requires implementing a single simple function.
+ Work both server-side and in the browser!


Project: http://git-wip-us.apache.org/repos/asf/avro/repo
Commit: http://git-wip-us.apache.org/repos/asf/avro/commit/133fafac
Tree: http://git-wip-us.apache.org/repos/asf/avro/tree/133fafac
Diff: http://git-wip-us.apache.org/repos/asf/avro/diff/133fafac

Branch: refs/heads/branch-1.8
Commit: 133fafacdd6e39d68d14e2f903e2a49c6d6aad16
Parents: 8a3960a
Author: Matthieu Monsch <mtth@apache.org>
Authored: Sat Feb 6 10:48:34 2016 -0800
Committer: Ryan Blue <blue@apache.org>
Committed: Sat May 14 16:42:29 2016 -0700

----------------------------------------------------------------------
 CHANGES.txt                    |    2 +
 lang/js/build.sh               |    2 +-
 lang/js/doc/API.md             |  189 +++++
 lang/js/doc/Advanced-usage.md  |  124 ++++
 lang/js/lib/files.js           |    8 +-
 lang/js/lib/index.js           |    3 +
 lang/js/lib/protocols.js       | 1271 ++++++++++++++++++++++++++++++++
 lang/js/lib/schemas.js         |   62 +-
 lang/js/lib/utils.js           |   24 +
 lang/js/package.json           |    5 +-
 lang/js/test/mocha.opts        |    2 +
 lang/js/test/test_files.js     |    8 +-
 lang/js/test/test_protocols.js | 1392 +++++++++++++++++++++++++++++++++++
 lang/js/test/test_schemas.js   |   11 +
 14 files changed, 3073 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/avro/blob/133fafac/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index fa0a9fa..aa84b9e 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -13,6 +13,8 @@ Trunk (not yet released)
     AVRO-1793. Python2: Retain stack trace and original exception when failing
     to parse schemas. Contributed by Jakob Homan (jghoman).
 
+    AVRO-1778: JavaScript: Add IPC/RPC support. (mtth)
+
   BUG FIXES
 
 Avro 1.8.0 (22 January 2016)

http://git-wip-us.apache.org/repos/asf/avro/blob/133fafac/lang/js/build.sh
----------------------------------------------------------------------
diff --git a/lang/js/build.sh b/lang/js/build.sh
index c551f9d..08823fc 100755
--- a/lang/js/build.sh
+++ b/lang/js/build.sh
@@ -22,7 +22,7 @@ cd `dirname "$0"`
 case "$1" in
   test)
     npm install
-    npm test
+    npm run cover
     ;;
   dist)
     npm pack

http://git-wip-us.apache.org/repos/asf/avro/blob/133fafac/lang/js/doc/API.md
----------------------------------------------------------------------
diff --git a/lang/js/doc/API.md b/lang/js/doc/API.md
index a6dd63e..2639ea1 100644
--- a/lang/js/doc/API.md
+++ b/lang/js/doc/API.md
@@ -21,6 +21,7 @@ limitations under the License.
 + [Avro types](#avro-types)
 + [Records](#records)
 + [Files and streams](#files-and-streams)
++ [IPC & RPC](#ipc--rpc)
 
 
 ## Parsing schemas
@@ -758,9 +759,197 @@ The encoding equivalent of `RawDecoder`.
 + `data` {Buffer} Serialized bytes.
 
 
+# IPC & RPC
+
+Avro also defines a way of executing remote procedure calls. We expose this via
+an API modeled after node.js' core [`EventEmitter`][event-emitter].
+
+#### Class `Protocol`
+
+`Protocol` instances are obtained by [`parse`](#parseschema-opts)-ing a
+[protocol declaration][protocol-declaration] and provide a way of sending
+remote messages (for example to another machine, or another process on the same
+machine). For this reason, instances of this class are very similar to
+`EventEmitter`s, exposing both [`emit`](#protocolemitname-req-emitter-cb) and
+[`on`](#protocolonname-handler) methods.
+
+Being able to send remote messages (and to do so efficiently) introduces a few
+differences however:
+
++ The types used in each event (for both the emitted message and its response)
+  must be defined upfront in the protocol's declaration.
++ The arguments emitted with each event must match the ones defined in the
+  protocol. Similarly, handlers are guaranteed to be called only with these
+  matching arguments.
++ Events are one-to-one: they have exactly one response (unless they are
+  declared as one-way, in which case they have none).
+
+##### `protocol.emit(name, req, emitter, cb)`
+
++ `name` {String} Name of the message to emit. If this message is sent to a
+  `Protocol` instance with no handler defined for this name, an "unsupported
+  message" error will be returned.
++ `req` {Object} Request value, must correspond to the message's declared
+  request type.
++ `emitter` {MessageEmitter} Emitter used to send the message. See
+  [`createEmitter`](#protocolcreateemittertransport-opts-cb) for how to obtain
+  one.
++ `cb(err, res)` {Function} Function called with the remote call's response
+  (and eventual error) when available. This can be omitted when the message is
+  one way.
+
+Send a message. This is always done asynchronously.
+
+##### `protocol.on(name, handler)`
+
++ `name` {String} Message name to add the handler for. An error will be thrown
+  if this name isn't defined in the protocol. At most one handler can exist for
+  a given name (any previously defined handler will be overwritten).
++ `handler(req, listener, cb)` {Function} Handler, called each time a message
+  with matching name is received. The `listener` argument will be the
+  corresponding `MessageListener` instance. The final callback argument
+  `cb(err, res)` should be called to send the response back to the emitter
+  (except when the message is one way, in which case `cb` will be `undefined`).
+
+Add a handler for a given message.
+
+##### `protocol.createEmitter(transport, [opts,] [cb])`
+
++ `transport` {Duplex|Object|Function} The transport used to communicate with
+  the remote listener. Multiple argument types are supported, see below.
++ `opts` {Object} Options.
+  + `IdType` {LogicalType} Metadata logical type.
+  + `bufferSize` {Number} Internal serialization buffer size (in bytes).
+    Defaults to 2048.
+  + `frameSize` {Number} Size used when [framing messages][framing-messages].
+    Defaults to 2048.
++ `cb(pending)` {Function} End of transmission callback.
+
+Generate a [`MessageEmitter`](#class-messageemitter) for this protocol. This
+emitter can then be used to communicate with a remote server of compatible
+protocol.
+
+There are two major types of transports:
+
++ Stateful
+
+  A pair of binary streams `{readable, writable}`.
+
+  As a convenience passing a single duplex stream is also supported and
+  equivalent to passing `{readable: duplex, writable: duplex}`.
+
++ Stateless
+
+  Stream factory `fn(cb)` which should return a writable stream and call its
+  callback argument with a readable stream (when available).
+
+##### `protocol.createListener(transport, [opts,] [cb])`
+
++ `transport` {Duplex|Object|Function} Similar to [`createEmitter`](#)'s
+  corresponding argument, except that readable and writable roles are reversed
+  for stateless transports.
++ `opts` {Object} Identical to `createEmitter`'s options.
+  + `IdType` {LogicalType} Metadata logical type.
+  + `bufferSize` {Number} Internal serialization buffer size (in bytes).
+    Defaults to 2048.
+  + `frameSize` {Number} Size used when [framing messages][framing-messages].
+    Defaults to 2048.
++ `cb(pending)` {Function} End of transmission callback.
+
+Generate a [`MessageListener`](#class-messagelistener) for this protocol. This
+listener can be used to respond to messages emitted from compatible protocols.
+
+##### `protocol.subprotocol()`
+
+Returns a copy of the original protocol, which inherits all its handlers.
+
+##### `protocol.getMessages()`
+
+Retrieve all the messages defined in the protocol. Each message is an object
+with the following (read-only) properties:
+
++ `name` {String}
++ `requestType` {Type}
++ `responseType` {Type}
++ `errorType` {Type}
++ `oneWay` {Boolean}
+
+##### `protocol.getName()`
+
+Returns the protocol's fully qualified name.
+
+##### `protocol.getType(name)`
+
++ `name` {String} A type's fully qualified name.
+
+Convenience function to retrieve a type defined inside this protocol. Returns
+`undefined` if no type exists for the given name.
+
+
+#### Class `MessageEmitter`
+
+Instance of this class are [`EventEmitter`s][event-emitter], with the following
+events:
+
+##### Event `'handshake'`
+
++ `request` {Object} Handshake request.
++ `response` {Object} Handshake response.
+
+Emitted when the server's handshake response is received.
+
+##### Event `'eot'`
+
++ `pending` {Number} Number of interrupted requests. This will always be zero,
+  unless the emitter was destroyed with `noWait` set.
+
+End of transmission event, emitted after the client is destroyed and there are
+no more pending requests.
+
+##### `emitter.destroy([noWait])`
+
++ `noWait` {Boolean} Cancel any pending requests. By default pending requests
+  will still be honored.
+
+Disable the emitter.
+
+
+#### Class `MessageListener`
+
+Listeners are the receiving-side equivalent of `MessageEmitter`s and are also
+[`EventEmitter`s][event-emitter], with the following events:
+
+##### Event `'handshake'`
+
++ `request` {Object} Handshake request.
++ `response` {Object} Handshake response.
+
+Emitted right before the server sends a handshake response.
+
+##### Event `'eot'`
+
++ `pending` {Number} Number of cancelled pending responses. This will always be
+  zero, unless the listener was destroyed with `noWait` set.
+
+End of transmission event, emitted after the listener is destroyed and there are
+no more responses to send.
+
+##### `listener.destroy([noWait])`
+
++ `noWait` {Boolean} Don't wait for all pending responses to have been sent.
+
+Disable this listener and release underlying streams. In general you shouldn't
+need to call this: stateless listeners will be destroyed automatically when a
+response is sent, and stateful listeners are best destroyed from the client's
+side.
+
+
 [canonical-schema]: https://avro.apache.org/docs/current/spec.html#Parsing+Canonical+Form+for+Schemas
 [schema-resolution]: https://avro.apache.org/docs/current/spec.html#Schema+Resolution
 [sort-order]: https://avro.apache.org/docs/current/spec.html#order
 [fingerprint]: https://avro.apache.org/docs/current/spec.html#Schema+Fingerprints
 [custom-long]: Advanced-usage#custom-long-types
 [logical-types]: Advanced-usage#logical-types
+[framing-messages]: https://avro.apache.org/docs/current/spec.html#Message+Framing
+[event-emitter]: https://nodejs.org/api/events.html#events_class_events_eventemitter
+[protocol-declaration]: https://avro.apache.org/docs/current/spec.html#Protocol+Declaration

http://git-wip-us.apache.org/repos/asf/avro/blob/133fafac/lang/js/doc/Advanced-usage.md
----------------------------------------------------------------------
diff --git a/lang/js/doc/Advanced-usage.md b/lang/js/doc/Advanced-usage.md
index 23cab5f..4344bb4 100644
--- a/lang/js/doc/Advanced-usage.md
+++ b/lang/js/doc/Advanced-usage.md
@@ -20,6 +20,7 @@ limitations under the License.
 + [Schema evolution](#schema-evolution)
 + [Logical types](#logical-types)
 + [Custom long types](#custom-long-types)
++ [Remote procedure calls](#remote-procedure-calls)
 
 
 ## Schema evolution
@@ -352,8 +353,131 @@ and unpacking routine (for example when using a native C++ addon), we can
 disable this behavior by setting `LongType.using`'s `noUnpack` argument to
 `true`.
 
+
+# Remote procedure calls
+
+`avro-js` provides an efficient and "type-safe" API for communicating with
+remote node processes via [`Protocol`s](Api#class-protocol).
+
+To enable this, we first declare the types involved inside an [Avro
+protocol][protocol-declaration]. For example, consider the following simple
+protocol which supports two calls (saved as `./math.avpr`):
+
+```json
+{
+  "protocol": "Math",
+  "doc": "A sample interface for performing math.",
+  "messages": {
+    "multiply": {
+      "doc": "A call for multiplying doubles.",
+      "request": [
+        {"name": "numbers", "type": {"type": "array", "items": "double"}}
+      ],
+      "response": "double"
+    },
+    "add": {
+      "doc": "A call which adds integers, optionally after some delay.",
+      "request": [
+        {"name": "numbers", "type": {"type": "array", "items": "int"}},
+        {"name": "delay", "type": "float", "default": 0}
+      ],
+      "response": "int"
+    }
+  }
+}
+```
+
+Servers and clients then share the same protocol and respectively:
+
++ Implement interface calls (servers):
+
+  ```javascript
+  var protocol = avro.parse('./math.avpr')
+    .on('add', function (req, ee, cb) {
+      var sum = req.numbers.reduce(function (agg, el) { return agg + el; }, 0);
+      setTimeout(function () { cb(null, sum); }, 1000 * req.delay);
+    })
+    .on('multiply', function (req, ee, cb) {
+      var prod = req.numbers.reduce(function (agg, el) { return agg * el; }, 1);
+      cb(null, prod);
+    });
+  ```
+
++ Call the interface (clients):
+
+  ```javascript
+  var protocol = avro.parse('./math.avpr');
+  var ee; // Message emitter, see below for various instantiation examples.
+
+  protocol.emit('add', {numbers: [1, 3, 5], delay: 2}, ee, function (err, res) {
+    console.log(res); // 9!
+  });
+  protocol.emit('multiply', {numbers: [4, 2]}, ee, function (err, res) {
+    console.log(res); // 8!
+  });
+  ```
+
+`avro-js` supports communication  between any two node processes connected by
+binary streams. See below for a few different common use-cases.
+
+## Persistent streams
+
+E.g. UNIX sockets, TCP sockets, WebSockets, (and even stdin/stdout).
+
+### Client
+
+```javascript
+var net = require('net');
+
+var ee = protocol.createEmitter(net.createConnection({port: 8000}));
+```
+
+### Server
+
+```javascript
+var net = require('net');
+
+net.createServer()
+  .on('connection', function (con) { protocol.createListener(con); })
+  .listen(8000);
+```
+
+## Transient streams
+
+For example HTTP requests/responses.
+
+### Client
+
+```javascript
+var http = require('http');
+
+var ee = protocol.createEmitter(function (cb) {
+  return http.request({
+    port: 3000,
+    headers: {'content-type': 'avro/binary'},
+    method: 'POST'
+  }).on('response', function (res) { cb(res); });
+});
+```
+
+### Server
+
+Using [express][] for example:
+
+```javascript
+var app = require('express')();
+
+app.post('/', function (req, res) {
+  protocol.createListener(function (cb) { cb(res); return req; });
+});
+
+app.listen(3000);
+```
+
+
 [parse-api]: API#parseschema-opts
 [create-resolver-api]: API#typecreateresolverwritertype
 [logical-type-api]: API#class-logicaltypeattrs-opts-types
 [decimal-type]: https://avro.apache.org/docs/current/spec.html#Decimal
 [schema-resolution]: https://avro.apache.org/docs/current/spec.html#Schema+Resolution
+[protocol-declaration]: https://avro.apache.org/docs/current/spec.html#Protocol+Declaration

http://git-wip-us.apache.org/repos/asf/avro/blob/133fafac/lang/js/lib/files.js
----------------------------------------------------------------------
diff --git a/lang/js/lib/files.js b/lang/js/lib/files.js
index b9c6da0..c96da68 100644
--- a/lang/js/lib/files.js
+++ b/lang/js/lib/files.js
@@ -21,7 +21,8 @@
 
 'use strict';
 
-var schemas = require('./schemas'),
+var protocols = require('./protocols'),
+    schemas = require('./schemas'),
     utils = require('./utils'),
     fs = require('fs'),
     stream = require('stream'),
@@ -66,7 +67,10 @@ var Tap = utils.Tap;
  *
  */
 function parse(schema, opts) {
-  return schemas.createType(loadSchema(schema), opts);
+  var attrs = loadSchema(schema);
+  return attrs.protocol ?
+    protocols.createProtocol(attrs, opts) :
+    schemas.createType(attrs, opts);
 }
 
 

http://git-wip-us.apache.org/repos/asf/avro/blob/133fafac/lang/js/lib/index.js
----------------------------------------------------------------------
diff --git a/lang/js/lib/index.js b/lang/js/lib/index.js
index 0eab2ad..666dd56 100644
--- a/lang/js/lib/index.js
+++ b/lang/js/lib/index.js
@@ -29,11 +29,14 @@
  */
 
 var files = require('./files'),
+    protocols = require('./protocols'),
     schemas = require('./schemas'),
     deprecated = require('../etc/deprecated/validator');
 
 
 module.exports = {
+  Type: schemas.Type,
+  Protocol: protocols.Protocol,
   parse: files.parse,
   createFileDecoder: files.createFileDecoder,
   createFileEncoder: files.createFileEncoder,

http://git-wip-us.apache.org/repos/asf/avro/blob/133fafac/lang/js/lib/protocols.js
----------------------------------------------------------------------
diff --git a/lang/js/lib/protocols.js b/lang/js/lib/protocols.js
new file mode 100644
index 0000000..6344f47
--- /dev/null
+++ b/lang/js/lib/protocols.js
@@ -0,0 +1,1271 @@
+/* jshint node: true */
+
+/**
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+'use strict';
+
+/**
+ * This module implements Avro's IPC/RPC logic.
+ *
+ * This is done the Node.js way, mimicking the `EventEmitter` class.
+ *
+ */
+
+var schemas = require('./schemas'),
+    utils = require('./utils'),
+    events = require('events'),
+    stream = require('stream'),
+    util = require('util');
+
+
+var BOOLEAN_TYPE = schemas.createType('boolean');
+var STRING_TYPE = schemas.createType('string');
+var SYSTEM_ERROR_TYPE = schemas.createType(['string']);
+
+var HANDSHAKE_REQUEST_TYPE = schemas.createType({
+  namespace: 'org.apache.avro.ipc',
+  name: 'HandshakeRequest',
+  type: 'record',
+  fields: [
+    {name: 'clientHash', type: {name: 'MD5', type: 'fixed', size: 16}},
+    {name: 'clientProtocol', type: ['null', 'string'], 'default': null},
+    {name: 'serverHash', type: 'org.apache.avro.ipc.MD5'},
+    {
+      name: 'meta',
+      type: ['null', {type: 'map', values: 'bytes'}],
+      'default': null
+    }
+  ]
+});
+
+var HANDSHAKE_RESPONSE_TYPE = schemas.createType({
+  namespace: 'org.apache.avro.ipc',
+  name: 'HandshakeResponse',
+  type: 'record',
+  fields: [
+    {
+      name: 'match',
+      type: {
+        name: 'HandshakeMatch',
+        type: 'enum',
+        symbols: ['BOTH', 'CLIENT', 'NONE']
+      }
+    },
+    {name: 'serverProtocol', type: ['null', 'string'], 'default': null},
+    {
+      name: 'serverHash',
+      type: ['null', {name: 'MD5', type: 'fixed', size: 16}],
+      'default': null
+    },
+    {
+      name: 'meta',
+      type: ['null', {type: 'map', values: 'bytes'}],
+      'default': null
+    }
+  ]
+});
+
+var HandshakeRequest = HANDSHAKE_REQUEST_TYPE.getRecordConstructor();
+var HandshakeResponse = HANDSHAKE_RESPONSE_TYPE.getRecordConstructor();
+var Tap = utils.Tap;
+var f = util.format;
+
+
+/**
+ * Protocol generation function.
+ *
+ * This should be used instead of the protocol constructor. The protocol's
+ * constructor performs no logic to better support efficient protocol copy.
+ *
+ */
+function createProtocol(attrs, opts) {
+  opts = opts || {};
+
+  var name = attrs.protocol;
+  if (!name) {
+    throw new Error('missing protocol name');
+  }
+  opts.namespace = attrs.namespace;
+  if (opts.namespace && !~name.indexOf('.')) {
+    name = f('%s.%s', opts.namespace, name);
+  }
+
+  if (attrs.types) {
+    attrs.types.forEach(function (obj) { schemas.createType(obj, opts); });
+  }
+  var messages = {};
+  if (attrs.messages) {
+    Object.keys(attrs.messages).forEach(function (key) {
+      messages[key] = new Message(key, attrs.messages[key], opts);
+    });
+  }
+
+  return new Protocol(name, messages, opts.registry || {});
+}
+
+/**
+ * An Avro protocol.
+ *
+ * It contains a cache for all remote protocols encountered by its emitters and
+ * listeners. Note that a protocol can be listening to multiple listeners at a
+ * given time. This can be a mix of stateful or stateless listeners.
+ *
+ */
+function Protocol(name, messages, types, ptcl) {
+  this._name = name;
+  this._messages = messages;
+  this._types = types;
+  this._parent = ptcl;
+
+  // Cache a string instead of the buffer to avoid retaining an entire slab.
+  this._hashString = utils.getHash(this.toString()).toString('binary');
+
+  // Listener callbacks. Note the prototype used for handlers when this is a
+  // subprotocol. This lets us easily implement the desired fallback behavior.
+  var self = this;
+  this._handlers = Object.create(ptcl ? ptcl._handlers : null);
+  this._onListenerCall = function (name, req, cb) {
+    var handler = self._handlers[name];
+    if (!handler) {
+      cb(new Error(f('unsupported message: %s', name)));
+    } else {
+      handler.call(self, req, this, cb);
+    }
+  };
+
+  // Resolvers are split since we want emitters to still be able to talk to
+  // servers with more messages (which would be incompatible the other way).
+  this._emitterResolvers = ptcl ? ptcl._emitterResolvers : {};
+  this._listenerResolvers = ptcl ? ptcl._listenerResolvers : {};
+}
+
+Protocol.prototype.subprotocol = function () {
+  return new Protocol(this._name, this._messages, this._types, this);
+};
+
+Protocol.prototype.emit = function (name, req, emitter, cb) {
+  cb = cb || throwError; // To provide a more helpful error message.
+
+  if (
+    !(emitter instanceof MessageEmitter) ||
+    emitter._ptcl._hashString !== this._hashString
+  ) {
+    asyncAvroCb(this, cb, 'invalid emitter');
+    return;
+  }
+
+  var message = this._messages[name];
+  if (!message) {
+    asyncAvroCb(this, cb, f('unknown message: %s', name));
+    return;
+  }
+
+  emitter._emit(message, req, cb);
+};
+
+Protocol.prototype.createEmitter = function (transport, opts, cb) {
+  if (!cb && typeof opts == 'function') {
+    cb = opts;
+    opts = undefined;
+  }
+
+  var emitter;
+  if (typeof transport == 'function') {
+    emitter = new StatelessEmitter(this, transport, opts);
+  } else {
+    var readable, writable;
+    if (isStream(transport)) {
+      readable = writable = transport;
+    } else {
+      readable = transport.readable;
+      writable = transport.writable;
+    }
+    emitter = new StatefulEmitter(this, readable, writable, opts);
+  }
+  if (cb) {
+    emitter.once('eot', cb);
+  }
+  return emitter;
+};
+
+Protocol.prototype.on = function (name, handler) {
+  if (!this._messages[name]) {
+    throw new Error(f('unknown message: %s', name));
+  }
+  this._handlers[name] = handler;
+  return this;
+};
+
+Protocol.prototype.createListener = function (transport, opts, cb) {
+  if (!cb && typeof opts == 'function') {
+    cb = opts;
+    opts = undefined;
+  }
+
+  var listener;
+  if (typeof transport == 'function') {
+    listener = new StatelessListener(this, transport, opts);
+  } else {
+    var readable, writable;
+    if (isStream(transport)) {
+      readable = writable = transport;
+    } else {
+      readable = transport.readable;
+      writable = transport.writable;
+    }
+    listener = new StatefulListener(this, readable, writable, opts);
+  }
+  if (cb) {
+    listener.once('eot', cb);
+  }
+  return listener.on('_call', this._onListenerCall);
+};
+
+Protocol.prototype.getType = function (name) { return this._types[name]; };
+
+Protocol.prototype.getName = function () { return this._name; };
+
+Protocol.prototype.getMessages = function () { return this._messages; };
+
+Protocol.prototype.toString = function () {
+  var namedTypes = [];
+  Object.keys(this._types).forEach(function (name) {
+    var type = this._types[name];
+    if (type.getName()) {
+      namedTypes.push(type);
+    }
+  }, this);
+
+  return schemas.stringify({
+    protocol: this._name,
+    types: namedTypes.length ? namedTypes : undefined,
+    messages: this._messages
+  });
+};
+
+Protocol.prototype.inspect = function () {
+  return f('<Protocol %j>', this._name);
+};
+
+/**
+ * Base message emitter class.
+ *
+ * See below for the two available variants.
+ *
+ */
+function MessageEmitter(ptcl, opts) {
+  events.EventEmitter.call(this);
+
+  this._ptcl = ptcl;
+  this._resolvers = ptcl._emitterResolvers;
+  this._serverHashString = ptcl._hashString;
+  this._idType = IdType.createMetadataType(opts.IdType);
+  this._bufferSize = opts.bufferSize || 2048;
+  this._frameSize = opts.frameSize || 2048;
+
+  this.once('_eot', function (pending) { this.emit('eot', pending); });
+}
+util.inherits(MessageEmitter, events.EventEmitter);
+
+MessageEmitter.prototype._generateResolvers = function (
+  hashString, serverPtcl
+) {
+  var resolvers = {};
+  var emitterMessages = this._ptcl._messages;
+  var serverMessages = serverPtcl._messages;
+  Object.keys(emitterMessages).forEach(function (name) {
+    var cm = emitterMessages[name];
+    var sm = serverMessages[name];
+    if (!sm) {
+      throw new Error(f('missing server message: %s', name));
+    }
+    resolvers[name] = {
+      responseType: cm.responseType.createResolver(sm.responseType),
+      errorType: cm.errorType.createResolver(sm.errorType)
+    };
+  });
+  this._resolvers[hashString] = resolvers;
+};
+
+MessageEmitter.prototype._createHandshakeRequest = function (
+  hashString, noPtcl
+) {
+  return new HandshakeRequest(
+    getHash(this._ptcl),
+    noPtcl ? null : {string: this._ptcl.toString()},
+    new Buffer(hashString, 'binary')
+  );
+};
+
+MessageEmitter.prototype._finalizeHandshake = function (tap, handshakeReq) {
+  var res = HANDSHAKE_RESPONSE_TYPE._read(tap);
+  this.emit('handshake', handshakeReq, res);
+
+  if (handshakeReq.clientProtocol && res.match === 'NONE') {
+    // If the emitter's protocol was included in the original request, this is
+    // not a failure which a retry will fix.
+    var buf = res.meta && res.meta.map.error;
+    throw new Error(buf ? buf.toString() : 'handshake error');
+  }
+
+  var hashString;
+  if (res.serverHash && res.serverProtocol) {
+    // This means the request didn't include the correct server hash. Note that
+    // we use the handshake response's hash rather than our computed one in
+    // case the server computes it differently.
+    hashString = res.serverHash['org.apache.avro.ipc.MD5'].toString('binary');
+    if (!canResolve(this, hashString)) {
+      this._generateResolvers(
+        hashString,
+        createProtocol(JSON.parse(res.serverProtocol.string))
+      );
+    }
+    // Make this hash the new default.
+    this._serverHashString = hashString;
+  } else {
+    hashString = handshakeReq.serverHash.toString('binary');
+  }
+
+  // We return the server's hash for stateless emitters. It might be that the
+  // default hash changes in between requests, in which case using the default
+  // one will fail.
+  return {match: res.match, serverHashString: hashString};
+};
+
+MessageEmitter.prototype._encodeRequest = function (tap, message, req) {
+  safeWrite(tap, STRING_TYPE, message.name);
+  safeWrite(tap, message.requestType, req);
+};
+
+MessageEmitter.prototype._decodeArguments = function (
+  tap, hashString, message
+) {
+  var resolvers = getResolvers(this, hashString, message);
+  var args = [null, null];
+  if (tap.readBoolean()) {
+    args[0] = resolvers.errorType._read(tap);
+  } else {
+    args[1] = resolvers.responseType._read(tap);
+  }
+  if (!tap.isValid()) {
+    throw new Error('truncated message');
+  }
+  return args;
+};
+
+/**
+ * Factory-based emitter.
+ *
+ * This emitter doesn't keep a persistent connection to the server and requires
+ * prepending a handshake to each message emitted. Usage examples include
+ * talking to an HTTP server (where the factory returns an HTTP request).
+ *
+ * Since each message will use its own writable/readable stream pair, the
+ * advantage of this emitter is that it is able to keep track of which response
+ * corresponds to each request without relying on messages' metadata. In
+ * particular, this means these emitters are compatible with any server
+ * implementation.
+ *
+ */
+function StatelessEmitter(ptcl, writableFactory, opts) {
+  opts = opts || {};
+  MessageEmitter.call(this, ptcl, opts);
+
+  this._writableFactory = writableFactory;
+  this._id = 1;
+  this._pending = {};
+  this._destroyed = false;
+  this._interrupted = false;
+}
+util.inherits(StatelessEmitter, MessageEmitter);
+
+StatelessEmitter.prototype._emit = function (message, req, cb) {
+  // We enclose the server's hash inside this message's closure since the
+  // emitter might be emitting several message concurrently and the hash might
+  // change before the response returns (unlikely but possible if the emitter
+  // talks to multiple servers at once or the server changes protocol).
+  var serverHashString = this._serverHashString;
+  var id = this._id++;
+  var self = this;
+
+  this._pending[id] = cb;
+  if (this._destroyed) {
+    asyncAvroCb(undefined, done, 'emitter destroyed');
+    return;
+  }
+  emit(false);
+
+  function emit(retry) {
+    var tap = new Tap(new Buffer(self._bufferSize));
+
+    var handshakeReq = self._createHandshakeRequest(serverHashString, !retry);
+    safeWrite(tap, HANDSHAKE_REQUEST_TYPE, handshakeReq);
+    try {
+      safeWrite(tap, self._idType, id);
+      self._encodeRequest(tap, message, req);
+    } catch (err) {
+      asyncAvroCb(undefined, done, err);
+      return;
+    }
+
+    var writable = self._writableFactory(function onReadable(readable) {
+      if (self._interrupted) {
+        // In case this function is called asynchronously (e.g. when sending
+        // HTTP requests), it might be that we have ended since.
+        return;
+      }
+
+      readable
+        .pipe(new MessageDecoder(true))
+        .on('error', done)
+        // This will happen when the readable stream ends before a single
+        // message has been decoded (e.g. on invalid response).
+        .on('data', function (buf) {
+          readable.unpipe(this); // Single message per readable stream.
+          if (self._interrupted) {
+            return;
+          }
+
+          var tap = new Tap(buf);
+          try {
+            var info = self._finalizeHandshake(tap, handshakeReq);
+            serverHashString = info.serverHashString;
+            if (info.match === 'NONE') {
+              emit(true); // Retry, attaching emitter protocol this time.
+              return;
+            }
+            self._idType._read(tap); // Skip metadata.
+            var args = self._decodeArguments(tap, serverHashString, message);
+          } catch (err) {
+            done(err);
+            return;
+          }
+          done.apply(undefined, args);
+        });
+    });
+
+    var encoder = new MessageEncoder(self._frameSize);
+    encoder.pipe(writable);
+    encoder.end(tap.getValue());
+  }
+
+  function done(err, res) {
+    var cb = self._pending[id];
+    delete self._pending[id];
+    cb.call(self._ptcl, err, res);
+    if (self._destroyed) {
+      self.destroy();
+    }
+  }
+};
+
+StatelessEmitter.prototype.destroy = function (noWait) {
+  this._destroyed = true;
+
+  var pendingIds = Object.keys(this._pending);
+  if (noWait) {
+    this._interrupted = true;
+    pendingIds.forEach(function (id) {
+      this._pending[id]({string: 'interrupted'});
+      delete this._pending[id];
+    }, this);
+  }
+
+  if (noWait || !pendingIds.length) {
+    this.emit('_eot', pendingIds.length);
+  }
+};
+
+/**
+ * Multiplexing emitter.
+ *
+ * These emitters reuse the same streams (both readable and writable) for all
+ * messages. This avoids a lot of overhead (e.g. creating new connections,
+ * re-issuing handshakes) but requires the server to include compatible
+ * metadata in each response (namely forwarding each request's ID into its
+ * response).
+ *
+ * A custom metadata format can be specified via the `idType` option. The
+ * default is compatible with this package's default server (i.e. listener)
+ * implementation.
+ *
+ */
+function StatefulEmitter(ptcl, readable, writable, opts) {
+  opts = opts || {};
+  MessageEmitter.call(this, ptcl, opts);
+
+  this._readable = readable;
+  this._writable = writable;
+  this._id = 1;
+  this._pending = {};
+  this._started = false;
+  this._destroyed = false;
+  this._ended = false; // Readable input ended.
+  this._decoder = new MessageDecoder();
+  this._encoder = new MessageEncoder(this._frameSize);
+
+  var handshakeReq = null;
+  var self = this;
+
+  process.nextTick(function () {
+    self._readable.pipe(self._decoder)
+      .on('error', function (err) { self.emit('error', err); })
+      .on('data', onHandshakeData)
+      .on('end', function () {
+        self._ended = true;
+        self.destroy();
+      });
+
+    self._encoder.pipe(self._writable);
+    emitHandshake(true);
+  });
+
+  function emitHandshake(noPtcl) {
+    handshakeReq = self._createHandshakeRequest(
+      self._serverHashString,
+      noPtcl
+    );
+    self._encoder.write(handshakeReq.$toBuffer());
+  }
+
+  function onHandshakeData(buf) {
+    var tap = new Tap(buf);
+    try {
+      var info = self._finalizeHandshake(tap, handshakeReq);
+    } catch (err) {
+      self.emit('error', err);
+      self.destroy(); // This isn't a recoverable error.
+      return;
+    }
+
+    if (info.match !== 'NONE') {
+      self._decoder
+        .removeListener('data', onHandshakeData)
+        .on('data', onMessageData);
+      self._started = true;
+      self.emit('_start'); // Send any pending messages.
+    } else {
+      emitHandshake(false);
+    }
+  }
+
+  function onMessageData(buf) {
+    var tap = new Tap(buf);
+    try {
+      var id = self._idType._read(tap);
+      if (!id) {
+        throw new Error('missing ID');
+      }
+    } catch (err) {
+      self.emit('error', new Error('invalid metadata: ' + err.message));
+      return;
+    }
+
+    var info = self._pending[id];
+    if (info === undefined) {
+      self.emit('error', new Error('orphan response: ' + id));
+      return;
+    }
+
+    try {
+      var args = self._decodeArguments(
+        tap,
+        self._serverHashString,
+        info.message
+      );
+    } catch (err) {
+      info.cb({string: 'invalid response: ' + err.message});
+      return;
+    }
+    delete self._pending[id];
+    info.cb.apply(self._ptcl, args);
+    if (self._destroyed) {
+      self.destroy();
+    }
+  }
+}
+util.inherits(StatefulEmitter, MessageEmitter);
+
+StatefulEmitter.prototype._emit = function (message, req, cb) {
+  if (this._destroyed) {
+    asyncAvroCb(this._ptcl, cb, 'emitter destroyed');
+    return;
+  }
+
+  var self = this;
+  if (!this._started) {
+    this.once('_start', function () { self._emit(message, req, cb); });
+    return;
+  }
+
+  var tap = new Tap(new Buffer(this._bufferSize));
+  var id = this._id++;
+  try {
+    safeWrite(tap, this._idType, -id);
+    this._encodeRequest(tap, message, req);
+  } catch (err) {
+    asyncAvroCb(this._ptcl, cb, err);
+    return;
+  }
+
+  if (!message.oneWay) {
+    this._pending[id] = {message: message, cb: cb};
+  }
+  this._encoder.write(tap.getValue());
+};
+
+StatefulEmitter.prototype.destroy = function (noWait) {
+  this._destroyed = true;
+  if (!this._started) {
+    this.emit('_start'); // Error out any pending calls.
+  }
+
+  var pendingIds = Object.keys(this._pending);
+  if (pendingIds.length && !(noWait || this._ended)) {
+    return; // Wait for pending requests.
+  }
+  pendingIds.forEach(function (id) {
+    var cb = this._pending[id].cb;
+    delete this._pending[id];
+    cb({string: 'interrupted'});
+  }, this);
+
+  this._readable.unpipe(this._decoder);
+  this._encoder.unpipe(this._writable);
+  this.emit('_eot', pendingIds.length);
+};
+
+/**
+ * The server-side emitter equivalent.
+ *
+ * In particular it is responsible for handling handshakes appropriately.
+ *
+ */
+function MessageListener(ptcl, opts) {
+  events.EventEmitter.call(this);
+  opts = opts || {};
+
+  this._ptcl = ptcl;
+  this._resolvers = ptcl._listenerResolvers;
+  this._emitterHashString = null;
+  this._idType = IdType.createMetadataType(opts.IdType);
+  this._bufferSize = opts.bufferSize || 2048;
+  this._frameSize = opts.frameSize || 2048;
+  this._decoder = new MessageDecoder();
+  this._encoder = new MessageEncoder(this._frameSize);
+  this._destroyed = false;
+  this._pending = 0;
+
+  this.once('_eot', function (pending) { this.emit('eot', pending); });
+}
+util.inherits(MessageListener, events.EventEmitter);
+
+MessageListener.prototype._generateResolvers = function (
+  hashString, emitterPtcl
+) {
+  var resolvers = {};
+  var clientMessages = emitterPtcl._messages;
+  var serverMessages = this._ptcl._messages;
+  Object.keys(clientMessages).forEach(function (name) {
+    var sm = serverMessages[name];
+    if (!sm) {
+      throw new Error(f('missing server message: %s', name));
+    }
+    var cm = clientMessages[name];
+    resolvers[name] = {
+      requestType: sm.requestType.createResolver(cm.requestType)
+    };
+  });
+  this._resolvers[hashString] = resolvers;
+};
+
+MessageListener.prototype._validateHandshake = function (reqTap, resTap) {
+  // Reads handshake request and write corresponding response out. If an error
+  // occurs when parsing the request, a response with match NONE will be sent.
+  // Also emits 'handshake' event with both the request and the response.
+  var validationErr = null;
+  try {
+    var handshakeReq = HANDSHAKE_REQUEST_TYPE._read(reqTap);
+    var serverHashString = handshakeReq.serverHash.toString('binary');
+  } catch (err) {
+    validationErr = err;
+  }
+
+  if (!validationErr) {
+    this._emitterHashString = handshakeReq.clientHash.toString('binary');
+    if (!canResolve(this, this._emitterHashString)) {
+      var emitterPtclString = handshakeReq.clientProtocol;
+      if (emitterPtclString) {
+        try {
+          this._generateResolvers(
+            this._emitterHashString,
+            createProtocol(JSON.parse(emitterPtclString.string))
+          );
+        } catch (err) {
+          validationErr = err;
+        }
+      } else {
+        validationErr = new Error('unknown client protocol hash');
+      }
+    }
+  }
+
+  // We use the handshake response's meta field to transmit an eventual error
+  // to the client. This will let us display a more useful message later on.
+  var serverMatch = serverHashString === this._ptcl._hashString;
+  var handshakeRes = new HandshakeResponse(
+    validationErr ? 'NONE' : serverMatch ? 'BOTH' : 'CLIENT',
+    serverMatch ? null : {string: this._ptcl.toString()},
+    serverMatch ? null : {'org.apache.avro.ipc.MD5': getHash(this._ptcl)},
+    validationErr ? {map: {error: new Buffer(validationErr.message)}} : null
+  );
+
+  this.emit('handshake', handshakeReq, handshakeRes);
+  safeWrite(resTap, HANDSHAKE_RESPONSE_TYPE, handshakeRes);
+  return validationErr === null;
+};
+
+MessageListener.prototype._decodeRequest = function (tap, message) {
+  var resolvers = getResolvers(this, this._emitterHashString, message);
+  var val = resolvers.requestType._read(tap);
+  if (!tap.isValid()) {
+    throw new Error('invalid request');
+  }
+  return val;
+};
+
+MessageListener.prototype._encodeSystemError = function (tap, err) {
+  safeWrite(tap, BOOLEAN_TYPE, true);
+  safeWrite(tap, SYSTEM_ERROR_TYPE, avroError(err));
+};
+
+MessageListener.prototype._encodeArguments = function (
+  tap, message, err, res
+) {
+  var noError = err === null;
+  var pos = tap.pos;
+  safeWrite(tap, BOOLEAN_TYPE, !noError);
+  try {
+    if (noError) {
+      safeWrite(tap, message.responseType, res);
+    } else {
+      if (err instanceof Error) {
+        // Convenience to allow emitter to use JS errors inside handlers.
+        err = avroError(err);
+      }
+      safeWrite(tap, message.errorType, err);
+    }
+  } catch (err) {
+    tap.pos = pos;
+    this._encodeSystemError(tap, err);
+  }
+};
+
+MessageListener.prototype.destroy = function (noWait) {
+  if (!this._destroyed) {
+    // Stop listening. This will also correctly push back any unused bytes into
+    // the readable stream (via `MessageDecoder`'s `unpipe` handler).
+    this._readable.unpipe(this._decoder);
+  }
+
+  this._destroyed = true;
+  if (noWait || !this._pending) {
+    this._encoder.unpipe(this._writable);
+    this.emit('_eot', this._pending);
+  }
+};
+
+/**
+ * Listener for stateless transport.
+ *
+ * This listener expect a handshake to precede each message.
+ *
+ */
+function StatelessListener(ptcl, readableFactory, opts) {
+  MessageListener.call(this, ptcl, opts);
+
+  this._tap = new Tap(new Buffer(this._bufferSize));
+  this._message = undefined;
+
+  var self = this;
+  this._readable = readableFactory(function (writable) {
+    // The encoder will buffer writes that happen before this function is
+    // called, so we don't need to do any special handling.
+    self._writable = self._encoder
+      .pipe(writable)
+      .on('finish', onEnd);
+  });
+
+  this._readable.pipe(this._decoder)
+    .on('data', onRequestData)
+    .on('end', onEnd);
+
+  function onRequestData(buf) {
+    self._pending++;
+    self.destroy(); // Only one message per stateless listener.
+
+    var reqTap = new Tap(buf);
+    if (!self._validateHandshake(reqTap, self._tap)) {
+      onResponse(new Error('invalid handshake'));
+      return;
+    }
+
+    try {
+      self._idType._read(reqTap); // Skip metadata.
+      var name = STRING_TYPE._read(reqTap);
+      self._message = self._ptcl._messages[name];
+      if (!self._message) {
+        throw new Error(f('unknown message: %s', name));
+      }
+      var req = self._decodeRequest(reqTap, self._message);
+    } catch (err) {
+      onResponse(err);
+      return;
+    }
+
+    self.emit('_call', name, req, onResponse);
+  }
+
+  function onResponse(err, res) {
+    safeWrite(self._tap, self._idType, 0);
+    if (!self._message) {
+      self._encodeSystemError(self._tap, err);
+    } else {
+      self._encodeArguments(self._tap, self._message, err, res);
+    }
+    self._pending--;
+    self._encoder.end(self._tap.getValue());
+  }
+
+  function onEnd() { self.destroy(); }
+}
+util.inherits(StatelessListener, MessageListener);
+
+/**
+ * Stateful transport listener.
+ *
+ * A handshake is done when the listener is first opened, then all messages are
+ * sent without.
+ *
+ */
+function StatefulListener(ptcl, readable, writable, opts) {
+  MessageListener.call(this, ptcl, opts);
+
+  this._readable = readable;
+  this._writable = writable;
+
+  var self = this;
+
+  this._readable
+    .pipe(this._decoder)
+    .on('data', onHandshakeData)
+    .on('end', function () { self.destroy(); });
+
+  this._encoder
+    .pipe(this._writable)
+    .on('finish', function () { self.destroy(); });
+
+  function onHandshakeData(buf) {
+    var reqTap = new Tap(buf);
+    var resTap = new Tap(new Buffer(self._bufferSize));
+    if (self._validateHandshake(reqTap, resTap)) {
+      self._decoder
+        .removeListener('data', onHandshakeData)
+        .on('data', onRequestData);
+    }
+    self._encoder.write(resTap.getValue());
+  }
+
+  function onRequestData(buf) {
+    var reqTap = new Tap(buf);
+    var resTap = new Tap(new Buffer(self._bufferSize));
+    var id = 0;
+    try {
+      id = -self._idType._read(reqTap) | 0;
+      if (!id) {
+        throw new Error('missing ID');
+      }
+    } catch (err) {
+      self.emit('error', new Error('invalid metadata: ' + err.message));
+      return;
+    }
+
+    self._pending++;
+    try {
+      var name = STRING_TYPE._read(reqTap);
+      var message = self._ptcl._messages[name];
+      if (!message) {
+        throw new Error('unknown message: ' + name);
+      }
+      var req = self._decodeRequest(reqTap, message);
+    } catch (err) {
+      onResponse(err);
+      return;
+    }
+
+    if (message.oneWay) {
+      self.emit('_call', name, req);
+      self._pending--;
+    } else {
+      self.emit('_call', name, req, onResponse);
+    }
+
+    function onResponse(err, res) {
+      self._pending--;
+      safeWrite(resTap, self._idType, id);
+      if (!message) {
+        self._encodeSystemError(resTap, err);
+      } else {
+        self._encodeArguments(resTap, message, err, res);
+      }
+      self._encoder.write(resTap.getValue(), undefined, function () {
+        if (!self._pending && self._destroyed) {
+          self.destroy(); // For real this time.
+        }
+      });
+    }
+  }
+}
+util.inherits(StatefulListener, MessageListener);
+
+// Helpers.
+
+/**
+ * An Avro message.
+ *
+ */
+function Message(name, attrs, opts) {
+  this.name = name;
+
+  this.requestType = schemas.createType({
+    name: name,
+    type: 'request',
+    fields: attrs.request
+  }, opts);
+
+  if (!attrs.response) {
+    throw new Error('missing response');
+  }
+  this.responseType = schemas.createType(attrs.response, opts);
+
+  var errors = attrs.errors || [];
+  errors.unshift('string');
+  this.errorType = schemas.createType(errors, opts);
+
+  this.oneWay = !!attrs['one-way'];
+  if (this.oneWay) {
+    if (
+      !(this.responseType instanceof schemas.types.NullType) ||
+      errors.length > 1
+    ) {
+      throw new Error('unapplicable one-way parameter');
+    }
+  }
+}
+
+Message.prototype.toJSON = function () {
+  var obj = {
+    request: this.requestType.getFields(),
+    response: this.responseType
+  };
+  var errorTypes = this.errorType.getTypes();
+  if (errorTypes.length > 1) {
+    obj.errors = schemas.createType(errorTypes.slice(1));
+  }
+  return obj;
+};
+
+/**
+ * "Framing" stream.
+ *
+ * @param frameSize {Number} (Maximum) size in bytes of each frame. The last
+ * frame might be shorter.
+ *
+ */
+function MessageEncoder(frameSize) {
+  stream.Transform.call(this);
+  this._frameSize = frameSize | 0;
+  if (this._frameSize <= 0) {
+    throw new Error('invalid frame size');
+  }
+}
+util.inherits(MessageEncoder, stream.Transform);
+
+MessageEncoder.prototype._transform = function (buf, encoding, cb) {
+  var frames = [];
+  var length = buf.length;
+  var start = 0;
+  var end;
+  do {
+    end = start + this._frameSize;
+    if (end > length) {
+      end = length;
+    }
+    frames.push(intBuffer(end - start));
+    frames.push(buf.slice(start, end));
+  } while ((start = end) < length);
+  frames.push(intBuffer(0));
+  cb(null, Buffer.concat(frames));
+};
+
+/**
+ * "Un-framing" stream.
+ *
+ * @param noEmpty {Boolean} Emit an error if the decoder ends before emitting a
+ * single frame.
+ *
+ * This stream should only be used by being piped/unpiped to. Otherwise there
+ * is a risk that too many bytes get consumed from the source stream (i.e.
+ * data corresponding to a partial message might be lost).
+ *
+ */
+function MessageDecoder(noEmpty) {
+  stream.Transform.call(this);
+  this._buf = new Buffer(0);
+  this._bufs = [];
+  this._length = 0;
+  this._empty = !!noEmpty;
+
+  this
+    .on('finish', function () { this.push(null); })
+    .on('unpipe', function (src) {
+      if (~this._length && !src._readableState.ended) {
+        // Not ideal to rely on this to check whether we can unshift, but the
+        // official documentation mentions it (in the context of the read
+        // buffers) so it should be stable. Alternatives are more complex,
+        // costly (e.g. attaching a handler on pipe), and not as fool-proof
+        // (the stream might have ended earlier).
+        this._bufs.push(this._buf);
+        src.unshift(Buffer.concat(this._bufs));
+      }
+    });
+}
+util.inherits(MessageDecoder, stream.Transform);
+
+MessageDecoder.prototype._transform = function (buf, encoding, cb) {
+  buf = Buffer.concat([this._buf, buf]);
+  var frameLength;
+  while (
+    buf.length >= 4 &&
+    buf.length >= (frameLength = buf.readInt32BE(0)) + 4
+  ) {
+    if (frameLength) {
+      this._bufs.push(buf.slice(4, frameLength + 4));
+      this._length += frameLength;
+    } else {
+      var frame = Buffer.concat(this._bufs, this._length);
+      this._empty = false;
+      this._length = 0;
+      this._bufs = [];
+      this.push(frame);
+    }
+    buf = buf.slice(frameLength + 4);
+  }
+  this._buf = buf;
+  cb();
+};
+
+MessageDecoder.prototype._flush = function () {
+  if (this._length || this._buf.length) {
+    this._length = -1; // Don't unshift data on incoming unpipe.
+    this.emit('error', new Error('trailing data'));
+  } else if (this._empty) {
+    this.emit('error', new Error('no message decoded'));
+  }
+};
+
+/**
+ * Default ID generator, using Avro messages' metadata field.
+ *
+ * This is required for stateful emitters to work and can be overridden to read
+ * or write arbitrary metadata. Note that the message contents are
+ * (intentionally) not available when updating this metadata.
+ *
+ */
+function IdType(attrs, opts) {
+  schemas.types.LogicalType.call(this, attrs, opts);
+}
+util.inherits(IdType, schemas.types.LogicalType);
+
+IdType.prototype._fromValue = function (val) {
+  var buf = val.id;
+  return buf && buf.length === 4 ? buf.readInt32BE(0) : 0;
+};
+
+IdType.prototype._toValue = function (any) {
+  return {id: intBuffer(any | 0)};
+};
+
+IdType.createMetadataType = function (Type) {
+  Type = Type || IdType;
+  return new Type({type: 'map', values: 'bytes'});
+};
+
+/**
+ * Returns a buffer containing an integer's big-endian representation.
+ *
+ * @param n {Number} Integer.
+ *
+ */
+function intBuffer(n) {
+  var buf = new Buffer(4);
+  buf.writeInt32BE(n);
+  return buf;
+}
+
+/**
+ * Write and maybe resize.
+ *
+ * @param tap {Tap} Tap written to.
+ * @param type {Type} Avro type.
+ * @param val {...} Corresponding Avro value.
+ *
+ */
+function safeWrite(tap, type, val) {
+  var pos = tap.pos;
+  type._write(tap, val);
+
+  if (!tap.isValid()) {
+    var buf = new Buffer(tap.pos);
+    tap.buf.copy(buf, 0, 0, pos);
+    tap.buf = buf;
+    tap.pos = pos;
+    type._write(tap, val);
+  }
+}
+
+/**
+ * Default callback when not provided.
+ *
+ */
+function throwError(err) {
+  if (!err) {
+    return;
+  }
+  if (typeof err == 'object' && err.string) {
+    err = err.string;
+  }
+  if (typeof err == 'string') {
+    err = new Error(err);
+  }
+  throw err;
+}
+
+/**
+ * Convert an error message into a format suitable for RPC.
+ *
+ * @param err {Error|String} Error message. It will be converted into valid
+ * format for Avro.
+ *
+ */
+function avroError(err) {
+  if (err instanceof Error) {
+    err = err.message;
+  }
+  return {string: err};
+}
+
+/**
+ * Asynchronous error handling.
+ *
+ * @param cb {Function} Callback.
+ * @param err {...} Error, passed as first argument to `cb.` If an `Error`
+ * instance or a string, it will be converted into valid format for Avro.
+ * @param res {...} Response. Passed as second argument to `cb`.
+ *
+ */
+function asyncAvroCb(ctx, cb, err, res) {
+  process.nextTick(function () { cb.call(ctx, avroError(err), res); });
+}
+
+/**
+ * Convenience function to get a protocol's hash.
+ *
+ * @param ptcl {Protocol} Any protocol.
+ *
+ */
+function getHash(ptcl) {
+  return new Buffer(ptcl._hashString, 'binary');
+}
+
+/**
+ * Whether a emitter or listener can resolve messages from a hash string.
+ *
+ * @param emitter {MessageEmitter|MessageListener}
+ * @param hashString {String}
+ *
+ */
+function canResolve(emitter, hashString) {
+  var resolvers = emitter._resolvers[hashString];
+  return !!resolvers || hashString === emitter._ptcl._hashString;
+}
+
+/**
+ * Retrieve resolvers for a given hash string.
+ *
+ * @param emitter {MessageEmitter|MessageListener}
+ * @param hashString {String}
+ * @param message {Message}
+ *
+ */
+function getResolvers(emitter, hashString, message) {
+  if (hashString === emitter._ptcl._hashString) {
+    return message;
+  }
+  var resolvers = emitter._resolvers[hashString];
+  return resolvers && resolvers[message.name];
+}
+
+/**
+ * Check whether something is a stream.
+ *
+ * @param any {Object} Any object.
+ *
+ */
+function isStream(any) {
+  // This is a hacky way of checking that the transport is a stream-like
+  // object. We unfortunately can't use `instanceof Stream` checks since
+  // some libraries (e.g. websocket-stream) return streams which don't
+  // inherit from it.
+  return !!any.pipe;
+}
+
+
+module.exports = {
+  HANDSHAKE_REQUEST_TYPE: HANDSHAKE_REQUEST_TYPE,
+  HANDSHAKE_RESPONSE_TYPE: HANDSHAKE_RESPONSE_TYPE,
+  IdType: IdType,
+  Message: Message,
+  Protocol: Protocol,
+  createProtocol: createProtocol,
+  emitters: {
+    StatefulEmitter: StatefulEmitter,
+    StatelessEmitter: StatelessEmitter
+  },
+  listeners: {
+    StatefulListener: StatefulListener,
+    StatelessListener: StatelessListener
+  },
+  streams: {
+    MessageDecoder: MessageDecoder,
+    MessageEncoder: MessageEncoder
+  },
+  throwError: throwError
+};

http://git-wip-us.apache.org/repos/asf/avro/blob/133fafac/lang/js/lib/schemas.js
----------------------------------------------------------------------
diff --git a/lang/js/lib/schemas.js b/lang/js/lib/schemas.js
index 325d77a..429ace4 100644
--- a/lang/js/lib/schemas.js
+++ b/lang/js/lib/schemas.js
@@ -23,7 +23,6 @@
 
 var utils = require('./utils'),
     buffer = require('buffer'), // For `SlowBuffer`.
-    crypto = require('crypto'),
     util = require('util');
 
 // Convenience imports.
@@ -45,6 +44,7 @@ var TYPES = {
   'map': MapType,
   'null': NullType,
   'record': RecordType,
+  'request': RecordType,
   'string': StringType,
   'union': UnionType
 };
@@ -307,31 +307,11 @@ Type.prototype.getName = function (noRef) {
 };
 
 Type.prototype.getSchema = function (noDeref) {
-  // Since JS objects are unordered, this implementation (unfortunately)
-  // relies on engines returning properties in the same order that they are
-  // inserted in. This is not in the JS spec, but can be "somewhat" safely
-  // assumed (more here: http://stackoverflow.com/q/5525795/1062617).
-  return (function (type, registry) {
-    return JSON.stringify(type, function (key, value) {
-      if (value instanceof Field) {
-        return {name: value._name, type: value._type};
-      } else if (value && value.name) {
-        var name = value.name;
-        if (noDeref || registry[name]) {
-          return name;
-        }
-        registry[name] = true;
-      }
-      return value;
-    });
-  })(this, {});
+  return stringify(this, noDeref);
 };
 
 Type.prototype.getFingerprint = function (algorithm) {
-  algorithm = algorithm || 'md5';
-  var hash = crypto.createHash(algorithm);
-  hash.end(this.getSchema());
-  return hash.read();
+  return utils.getHash(this.getSchema(), algorithm);
 };
 
 Type.prototype.inspect = function () {
@@ -1434,7 +1414,9 @@ function RecordType(attrs, opts) {
   var resolutions = resolveNames(attrs, opts.namespace);
   this._name = resolutions.name;
   this._aliases = resolutions.aliases;
-  Type.call(this, opts.registry);
+  this._type = attrs.type;
+  // Requests shouldn't be registered since their name is only a placeholder.
+  Type.call(this, this._type === 'request' ? undefined : opts.registry);
 
   if (!(attrs.fields instanceof Array)) {
     throw new Error(f('non-array %s fields', this._name));
@@ -2156,6 +2138,37 @@ function readArraySize(tap) {
 }
 
 /**
+ * Correctly stringify an object which contains types.
+ *
+ * @param obj {Object} The object to stringify. Typically, a type itself or an
+ * object containing types. Any types inside will be expanded only once then
+ * referenced by name.
+ * @param noDeref {Boolean} Always reference types by name when possible,
+ * rather than expand it the first time it is encountered.
+ *
+ */
+function stringify(obj, noDeref) {
+  // Since JS objects are unordered, this implementation (unfortunately)
+  // relies on engines returning properties in the same order that they are
+  // inserted in. This is not in the JS spec, but can be "somewhat" safely
+  // assumed (more here: http://stackoverflow.com/q/5525795/1062617).
+  return (function (registry) {
+    return JSON.stringify(obj, function (key, value) {
+      if (value instanceof Field) {
+        return {name: value._name, type: value._type};
+      } else if (value && value.name) {
+        var name = value.name;
+        if (noDeref || registry[name]) {
+          return name;
+        }
+        registry[name] = true;
+      }
+      return value;
+    });
+  })({});
+}
+
+/**
  * Check whether a long can be represented without precision loss.
  *
  * @param n {Number} The number.
@@ -2192,6 +2205,7 @@ function throwInvalidError(path, val, type) {
 module.exports = {
   createType: createType,
   resolveNames: resolveNames, // Protocols use the same name resolution logic.
+  stringify: stringify,
   types: (function () {
     // Export the base types along with all concrete implementations.
     var obj = {Type: Type, LogicalType: LogicalType};

http://git-wip-us.apache.org/repos/asf/avro/blob/133fafac/lang/js/lib/utils.js
----------------------------------------------------------------------
diff --git a/lang/js/lib/utils.js b/lang/js/lib/utils.js
index 2627c1a..c453bdd 100644
--- a/lang/js/lib/utils.js
+++ b/lang/js/lib/utils.js
@@ -21,6 +21,9 @@
 
 'use strict';
 
+var crypto = require('crypto');
+
+
 /**
  * Uppercase the first letter of a string.
  *
@@ -39,6 +42,20 @@ function capitalize(s) { return s.charAt(0).toUpperCase() + s.slice(1); }
 function compare(n1, n2) { return n1 === n2 ? 0 : (n1 < n2 ? -1 : 1); }
 
 /**
+ * Compute a string's hash.
+ *
+ * @param str {String} The string to hash.
+ * @param algorithm {String} The algorithm used. Defaults to MD5.
+ *
+ */
+function getHash(str, algorithm) {
+  algorithm = algorithm || 'md5';
+  var hash = crypto.createHash(algorithm);
+  hash.end(str);
+  return hash.read();
+}
+
+/**
  * Find index of value in array.
  *
  * @param arr {Array} Can also be a false-ish value.
@@ -278,6 +295,12 @@ function Tap(buf, pos) {
  */
 Tap.prototype.isValid = function () { return this.pos <= this.buf.length; };
 
+/**
+ * Returns the contents of the tap up to the current position.
+ *
+ */
+Tap.prototype.getValue = function () { return this.buf.slice(0, this.pos); };
+
 // Read, skip, write methods.
 //
 // These should fail silently when the buffer overflows. Note this is only
@@ -623,6 +646,7 @@ module.exports = {
   abstractFunction: abstractFunction,
   capitalize: capitalize,
   compare: compare,
+  getHash: getHash,
   toMap: toMap,
   singleIndexOf: singleIndexOf,
   hasDuplicates: hasDuplicates,

http://git-wip-us.apache.org/repos/asf/avro/blob/133fafac/lang/js/package.json
----------------------------------------------------------------------
diff --git a/lang/js/package.json b/lang/js/package.json
index fda5325..672faff 100644
--- a/lang/js/package.json
+++ b/lang/js/package.json
@@ -34,8 +34,9 @@
     "crypto": "./etc/browser/crypto.js"
   },
   "scripts": {
-    "test": "mocha --ui tdd --reporter dot",
-    "clean": "rm -rf node_modules"
+    "cover": "istanbul cover _mocha",
+    "test": "mocha",
+    "clean": "rm -rf coverage node_modules"
   },
   "dependencies": {
     "underscore": "*"

http://git-wip-us.apache.org/repos/asf/avro/blob/133fafac/lang/js/test/mocha.opts
----------------------------------------------------------------------
diff --git a/lang/js/test/mocha.opts b/lang/js/test/mocha.opts
new file mode 100644
index 0000000..1ed06f8
--- /dev/null
+++ b/lang/js/test/mocha.opts
@@ -0,0 +1,2 @@
+--ui tdd
+--reporter dot

http://git-wip-us.apache.org/repos/asf/avro/blob/133fafac/lang/js/test/test_files.js
----------------------------------------------------------------------
diff --git a/lang/js/test/test_files.js b/lang/js/test/test_files.js
index c0a334b..4f7c547 100644
--- a/lang/js/test/test_files.js
+++ b/lang/js/test/test_files.js
@@ -22,6 +22,7 @@
 'use strict';
 
 var files = require('../lib/files'),
+    protocols = require('../lib/protocols'),
     schemas = require('../lib/schemas'),
     assert = require('assert'),
     fs = require('fs'),
@@ -43,7 +44,7 @@ suite('files', function () {
 
     var parse = files.parse;
 
-    test('object', function () {
+    test('type object', function () {
       var obj = {
         type: 'record',
         name: 'Person',
@@ -52,6 +53,11 @@ suite('files', function () {
       assert(parse(obj) instanceof types.RecordType);
     });
 
+    test('protocol object', function () {
+      var obj = {protocol: 'Foo'};
+      assert(parse(obj) instanceof protocols.Protocol);
+    });
+
     test('schema instance', function () {
       var type = parse({
         type: 'record',


Mime
View raw message