avro-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From b...@apache.org
Subject [02/43] avro git commit: AVRO-1778. JavaScript: Add IPC/RPC support.
Date Sat, 14 May 2016 23:43:35 GMT
http://git-wip-us.apache.org/repos/asf/avro/blob/133fafac/lang/js/test/test_protocols.js
----------------------------------------------------------------------
diff --git a/lang/js/test/test_protocols.js b/lang/js/test/test_protocols.js
new file mode 100644
index 0000000..08ef784
--- /dev/null
+++ b/lang/js/test/test_protocols.js
@@ -0,0 +1,1392 @@
+/* jshint node: true, mocha: true */
+
+/**
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+'use strict';
+
+var protocols = require('../lib/protocols'),
+    utils = require('../lib/utils'),
+    assert = require('assert'),
+    stream = require('stream'),
+    util = require('util');
+
+
+var HANDSHAKE_REQUEST_TYPE = protocols.HANDSHAKE_REQUEST_TYPE;
+var HANDSHAKE_RESPONSE_TYPE = protocols.HANDSHAKE_RESPONSE_TYPE;
+var createProtocol = protocols.createProtocol;
+
+
+suite('protocols', function () {
+
+  suite('Protocol', function () {
+
+    test('get name and types', function () {
+      var p = createProtocol({
+        namespace: 'foo',
+        protocol: 'HelloWorld',
+        types: [
+          {
+            name: 'Greeting',
+            type: 'record',
+            fields: [{name: 'message', type: 'string'}]
+          },
+          {
+            name: 'Curse',
+            type: 'error',
+            fields: [{name: 'message', type: 'string'}]
+          }
+        ],
+        messages: {
+          hello: {
+            request: [{name: 'greeting', type: 'Greeting'}],
+            response: 'Greeting',
+            errors: ['Curse']
+          },
+          hi: {
+          request: [{name: 'hey', type: 'string'}],
+          response: 'null',
+          'one-way': true
+          }
+        }
+      });
+      assert.equal(p.getName(), 'foo.HelloWorld');
+      assert.equal(p.getType('foo.Greeting').getName(true), 'record');
+    });
+
+    test('missing message', function () {
+      var ptcl = createProtocol({namespace: 'com.acme', protocol: 'Hello'});
+      assert.throws(function () {
+        ptcl.on('add', function () {});
+      }, /unknown/);
+    });
+
+    test('missing name', function () {
+      assert.throws(function () {
+        createProtocol({namespace: 'com.acme', messages: {}});
+      });
+    });
+
+    test('missing type', function () {
+      assert.throws(function () {
+        createProtocol({
+          namespace: 'com.acme',
+          protocol: 'HelloWorld',
+          messages: {
+            hello: {
+              request: [{name: 'greeting', type: 'Greeting'}],
+              response: 'Greeting'
+            }
+          }
+        });
+      });
+    });
+
+    test('get messages', function () {
+      var ptcl;
+      ptcl = createProtocol({protocol: 'Empty'});
+      assert.deepEqual(ptcl.getMessages(), {});
+      ptcl = createProtocol({
+        protocol: 'Ping',
+        messages: {
+          ping: {
+            request: [],
+            response: 'string'
+          }
+        }
+      });
+      var messages = ptcl.getMessages();
+      assert.equal(Object.keys(messages).length, 1);
+      assert(messages.ping !== undefined);
+    });
+
+    test('create listener', function (done) {
+      var ptcl = createProtocol({protocol: 'Empty'});
+      var transport = new stream.PassThrough();
+      var ee = ptcl.createListener(transport, function (pending) {
+        assert.equal(pending, 0);
+        done();
+      });
+      ee.destroy();
+    });
+
+    test('subprotocol', function () {
+      var ptcl = createProtocol({namespace: 'com.acme', protocol: 'Hello'});
+      var subptcl = ptcl.subprotocol();
+      assert.strictEqual(subptcl._emitterResolvers, ptcl._emitterResolvers);
+      assert.strictEqual(subptcl._listenerResolvers, ptcl._listenerResolvers);
+    });
+
+    test('invalid emitter', function (done) {
+      var ptcl = createProtocol({protocol: 'Empty'});
+      ptcl.emit('hi', {}, null, function (err) {
+        assert(/invalid emitter/.test(err.string));
+        done();
+      });
+    });
+
+    test('inspect', function () {
+      var p = createProtocol({
+        namespace: 'hello',
+        protocol: 'World',
+      });
+      assert.equal(p.inspect(), '<Protocol "hello.World">');
+    });
+
+  });
+
+  suite('Message', function () {
+
+    var Message = protocols.Message;
+
+    test('empty errors', function () {
+      var m = new Message('Hi', {
+        request: [{name: 'greeting', type: 'string'}],
+        response: 'int'
+      });
+      assert.deepEqual(m.errorType.toString(), '["string"]');
+    });
+
+    test('missing response', function () {
+      assert.throws(function () {
+        new Message('Hi', {
+          request: [{name: 'greeting', type: 'string'}]
+        });
+      });
+    });
+
+    test('invalid one-way', function () {
+      // Non-null response.
+      assert.throws(function () {
+        new Message('Hi', {
+          request: [{name: 'greeting', type: 'string'}],
+          response: 'string',
+          'one-way': true
+        });
+      });
+      // Non-empty errors.
+      assert.throws(function () {
+        new Message('Hi', {
+          request: [{name: 'greeting', type: 'string'}],
+          response: 'null',
+          errors: ['int'],
+          'one-way': true
+        });
+      });
+    });
+
+  });
+
+  suite('MessageDecoder', function () {
+
+    var MessageDecoder = protocols.streams.MessageDecoder;
+
+    test('ok', function (done) {
+      var parts = [
+        new Buffer([0, 1]),
+        new Buffer([2]),
+        new Buffer([]),
+        new Buffer([3, 4, 5]),
+        new Buffer([])
+      ];
+      var messages = [];
+      var readable = createReadableStream(parts.map(frame), true);
+      var writable = createWritableStream(messages, true)
+        .on('finish', function () {
+          assert.deepEqual(
+            messages,
+            [new Buffer([0, 1, 2]), new Buffer([3, 4, 5])]
+          );
+          done();
+        });
+      readable.pipe(new MessageDecoder()).pipe(writable);
+    });
+
+    test('trailing data', function (done) {
+      var parts = [
+        new Buffer([0, 1]),
+        new Buffer([2]),
+        new Buffer([]),
+        new Buffer([3])
+      ];
+      var messages = [];
+      var readable = createReadableStream(parts.map(frame), true);
+      var writable = createWritableStream(messages, true);
+      readable
+        .pipe(new MessageDecoder())
+        .on('error', function () {
+          assert.deepEqual(messages, [new Buffer([0, 1, 2])]);
+          done();
+        })
+        .pipe(writable);
+    });
+
+    test('empty', function (done) {
+      var readable = createReadableStream([], true);
+      readable
+        .pipe(new MessageDecoder(true))
+        .on('error', function () { done(); });
+    });
+
+  });
+
+  suite('MessageEncoder', function () {
+
+    var MessageEncoder = protocols.streams.MessageEncoder;
+
+    test('invalid frame size', function () {
+      assert.throws(function () { new MessageEncoder(); });
+    });
+
+    test('ok', function (done) {
+      var messages = [
+        new Buffer([0, 1]),
+        new Buffer([2])
+      ];
+      var frames = [];
+      var readable = createReadableStream(messages, true);
+      var writable = createWritableStream(frames, true);
+      readable
+        .pipe(new MessageEncoder(64))
+        .pipe(writable)
+        .on('finish', function () {
+          assert.deepEqual(
+            frames,
+            [
+              new Buffer([0, 0, 0, 2, 0, 1, 0, 0, 0, 0]),
+              new Buffer([0, 0, 0, 1, 2, 0, 0, 0, 0])
+            ]
+          );
+          done();
+        });
+    });
+
+    test('all zeros', function (done) {
+      var messages = [new Buffer([0, 0, 0, 0])];
+      var frames = [];
+      var readable = createReadableStream(messages, true);
+      var writable = createWritableStream(frames, true);
+      readable
+        .pipe(new MessageEncoder(64))
+        .pipe(writable)
+        .on('finish', function () {
+          assert.deepEqual(
+            frames,
+            [new Buffer([0, 0, 0, 4, 0, 0, 0, 0, 0, 0, 0, 0])]
+          );
+          done();
+        });
+    });
+
+    test('short frame size', function (done) {
+      var messages = [
+        new Buffer([0, 1, 2]),
+        new Buffer([2])
+      ];
+      var frames = [];
+      var readable = createReadableStream(messages, true);
+      var writable = createWritableStream(frames, true);
+      readable
+        .pipe(new MessageEncoder(2))
+        .pipe(writable)
+        .on('finish', function () {
+          assert.deepEqual(
+            frames,
+            [
+              new Buffer([0, 0, 0, 2, 0, 1, 0, 0, 0, 1, 2, 0, 0, 0, 0]),
+              new Buffer([0, 0, 0, 1, 2, 0, 0, 0, 0])
+            ]
+          );
+          done();
+        });
+    });
+
+  });
+
+  suite('StatefulEmitter', function () {
+
+    test('ok handshake', function (done) {
+      var buf = HANDSHAKE_RESPONSE_TYPE.toBuffer({match: 'BOTH'});
+      var bufs = [];
+      var ptcl = createProtocol({protocol: 'Empty'});
+      var handshake = false;
+      ptcl.createEmitter(createTransport([buf], bufs))
+        .on('handshake', function (req, res) {
+            handshake = true;
+            assert(res.match === 'BOTH');
+            assert.deepEqual(
+              Buffer.concat(bufs),
+              HANDSHAKE_REQUEST_TYPE.toBuffer({
+                clientHash: new Buffer(ptcl._hashString, 'binary'),
+                serverHash: new Buffer(ptcl._hashString, 'binary')
+              })
+            );
+            this.destroy();
+        })
+        .on('eot', function () {
+          assert(handshake);
+          done();
+        });
+    });
+
+    test('no server match handshake', function (done) {
+      var ptcl = createProtocol({protocol: 'Empty'});
+      var resBufs = [
+        {
+          match: 'NONE',
+          serverHash: {'org.apache.avro.ipc.MD5': new Buffer(16)},
+          serverProtocol: {string: ptcl.toString()},
+        },
+        {match: 'BOTH'}
+      ].map(function (val) { return HANDSHAKE_RESPONSE_TYPE.toBuffer(val); });
+      var reqBufs = [];
+      var handshakes = 0;
+      ptcl.createEmitter(createTransport(resBufs, reqBufs))
+        .on('handshake', function (req, res) {
+          if (handshakes++) {
+            assert(res.match === 'BOTH');
+            this.destroy();
+          } else {
+            assert(res.match === 'NONE');
+          }
+        })
+        .on('eot', function () {
+          assert.equal(handshakes, 2);
+          done();
+        });
+    });
+
+    test('incompatible protocol', function (done) {
+      var ptcl = createProtocol({protocol: 'Empty'});
+      var hash = new Buffer(16); // Pretend the hash was different.
+      var resBufs = [
+        {
+          match: 'NONE',
+          serverHash: {'org.apache.avro.ipc.MD5': hash},
+          serverProtocol: {string: ptcl.toString()},
+        },
+        {
+          match: 'NONE',
+          serverHash: {'org.apache.avro.ipc.MD5': hash},
+          serverProtocol: {string: ptcl.toString()},
+          meta: {map: {error: new Buffer('abcd')}}
+        }
+      ].map(function (val) { return HANDSHAKE_RESPONSE_TYPE.toBuffer(val); });
+      var error = false;
+      ptcl.createEmitter(createTransport(resBufs, []))
+        .on('error', function (err) {
+          error = true;
+          assert.equal(err.message, 'abcd');
+        })
+        .on('eot', function () {
+          assert(error);
+          done();
+        });
+    });
+
+    test('handshake error', function (done) {
+      var resBufs = [
+        new Buffer([4, 0, 0]), // Invalid handshakes.
+        new Buffer([4, 0, 0])
+      ];
+      var ptcl = createProtocol({protocol: 'Empty'});
+      var error = false;
+      ptcl.createEmitter(createTransport(resBufs, []))
+        .on('error', function (err) {
+          error = true;
+          assert.equal(err.message, 'handshake error');
+        })
+        .on('eot', function () {
+          assert(error);
+          done();
+        });
+    });
+
+    test('orphan response', function (done) {
+      var ptcl = createProtocol({protocol: 'Empty'});
+      var idType = protocols.IdType.createMetadataType();
+      var resBufs = [
+        new Buffer([0, 0, 0]), // OK handshake.
+        idType.toBuffer(23)
+      ];
+      var error = false;
+      ptcl.createEmitter(createTransport(resBufs, []))
+        .on('error', function (err) {
+          error = true;
+          assert(/orphan response:/.test(err.message));
+        })
+        .on('eot', function () {
+          assert(error);
+          done();
+        });
+    });
+
+    test('ended readable', function (done) {
+      var bufs = [];
+      var ptcl = createProtocol({protocol: 'Empty'});
+      ptcl.createEmitter(createTransport([], bufs))
+        .on('eot', function () {
+          assert.equal(bufs.length, 1); // A single handshake was sent.
+          done();
+        });
+    });
+
+    test('interrupted', function (done) {
+      var ptcl = createProtocol({
+        protocol: 'Empty',
+        messages: {
+          id: {request: [{name: 'id', type: 'int'}], response: 'int'}
+        }
+      });
+      var resBufs = [
+        new Buffer([0, 0, 0]), // OK handshake.
+      ];
+      var interrupted = 0;
+      var transport = createTransport(resBufs, []);
+      var ee = ptcl.createEmitter(transport, function () {
+        assert.equal(interrupted, 2);
+        done();
+      });
+
+      ptcl.emit('id', {id: 123}, ee, cb);
+      ptcl.emit('id', {id: 123}, ee, cb);
+
+      function cb(err) {
+        assert.deepEqual(err, {string: 'interrupted'});
+        interrupted++;
+      }
+    });
+
+    test('missing client message', function (done) {
+      var ptcl1 = createProtocol({
+        protocol: 'Ping',
+        messages: {
+          ping: {request: [], response: 'string'}
+        }
+      });
+      var ptcl2 = createProtocol({
+        protocol: 'Ping',
+        messages: {
+          ping: {request: [], response: 'string'},
+          pong: {request: [], response: 'string'}
+        }
+      }).on('ping', function (req, ee, cb) { cb(null, 'ok'); });
+      var transports = createPassthroughTransports();
+      ptcl2.createListener(transports[1]);
+      var ee = ptcl1.createEmitter(transports[0]);
+      ptcl1.emit('ping', {}, ee, function (err, res) {
+        assert.equal(res, 'ok');
+        done();
+      });
+    });
+
+    test('missing server message', function (done) {
+      var ptcl1 = createProtocol({
+        protocol: 'Ping',
+        messages: {
+          ping: {request: [], response: 'string'}
+        }
+      });
+      var ptcl2 = createProtocol({protocol: 'Empty'});
+      var transports = createPassthroughTransports();
+      ptcl2.createListener(transports[1]);
+      ptcl1.createEmitter(transports[0])
+        .on('error', function (err) {
+          assert(/missing server message: ping/.test(err.message));
+          done();
+        });
+    });
+
+    test('trailing data', function (done) {
+      var ptcl = createProtocol({
+        protocol: 'Ping',
+        messages: {
+          ping: {request: [], response: 'string'}
+        }
+      });
+      var transports = createPassthroughTransports();
+      ptcl.createEmitter(transports[0])
+        .on('error', function (err) {
+          assert(/trailing data/.test(err.message));
+          done();
+        });
+      transports[0].readable.end(new Buffer([2, 3]));
+    });
+
+    test('invalid metadata', function (done) {
+      var ptcl = createProtocol({
+        protocol: 'Ping',
+        messages: {
+          ping: {request: [], response: 'string'}
+        }
+      });
+      var transports = createPassthroughTransports();
+      ptcl.createListener(transports[1]);
+      ptcl.createEmitter(transports[0])
+        .on('error', function (err) {
+          assert(/invalid metadata:/.test(err.message));
+          done();
+        })
+        .on('handshake', function () {
+          transports[0].readable.write(frame(new Buffer([2, 3])));
+          transports[0].readable.write(frame(new Buffer(0)));
+        });
+    });
+
+    test('invalid response', function (done) {
+      var ptcl = createProtocol({
+        protocol: 'Ping',
+        messages: {
+          ping: {request: [], response: 'string'}
+        }
+      });
+      var transports = createPassthroughTransports();
+      var ml = ptcl.createListener(transports[1]);
+      var me = ptcl.createEmitter(transports[0])
+        .on('handshake', function () {
+          ml.destroy();
+
+          ptcl.emit('ping', {}, me, function (err) {
+            assert(/invalid response:/.test(err.string));
+            done();
+          });
+
+          var idType = protocols.IdType.createMetadataType();
+          var bufs = [
+              idType.toBuffer(1), // Metadata.
+              new Buffer([3]) // Invalid response.
+          ];
+          transports[0].readable.write(frame(Buffer.concat(bufs)));
+          transports[0].readable.write(frame(new Buffer(0)));
+        });
+    });
+
+    test('one way', function (done) {
+      var beats = 0;
+      var ptcl = createProtocol({
+        protocol: 'Heartbeat',
+        messages: {
+          beat: {request: [], response: 'null', 'one-way': true}
+        }
+      }).on('beat', function (req, ee, cb) {
+        assert.strictEqual(cb, undefined);
+        if (++beats === 2) {
+          done();
+        }
+      });
+      var transports = createPassthroughTransports();
+      ptcl.createListener(transports[1]);
+      var ee = ptcl.createEmitter(transports[0]);
+      ptcl.emit('beat', {}, ee);
+      ptcl.emit('beat', {}, ee);
+    });
+
+  });
+
+  suite('StatelessEmitter', function () {
+
+    test('interrupted before response data', function (done) {
+      var ptcl = createProtocol({
+        protocol: 'Ping',
+        messages: {ping: {request: [], response: 'boolean'}}
+      });
+      var readable = stream.PassThrough()
+        .on('end', done);
+      var writable = createWritableStream([]);
+      var ee = ptcl.createEmitter(function (cb) {
+        cb(readable);
+        return writable;
+      });
+      ptcl.emit('ping', {}, ee, function (err) {
+        assert(/interrupted/.test(err.string));
+        readable.write(frame(new Buffer(2)));
+        readable.end(frame(new Buffer(0)));
+      });
+      ee.destroy(true);
+    });
+
+  });
+
+  suite('StatefulListener', function () {
+
+    test('end readable', function (done) {
+      var ptcl = createProtocol({protocol: 'Empty'});
+      var transports = createPassthroughTransports();
+      ptcl.createListener(transports[0])
+        .on('eot', function (pending) {
+          assert.equal(pending, 0);
+          done();
+        });
+      transports[0].readable.end();
+    });
+
+    test('finish writable', function (done) {
+      var ptcl = createProtocol({protocol: 'Empty'});
+      var transports = createPassthroughTransports();
+      ptcl.createListener(transports[0])
+        .on('eot', function (pending) {
+          assert.equal(pending, 0);
+          done();
+        });
+      transports[0].writable.end();
+    });
+
+    test('invalid handshake', function (done) {
+      var ptcl = createProtocol({protocol: 'Empty'});
+      var transport = createTransport(
+        [new Buffer([4])], // Invalid handshake.
+        []
+      );
+      ptcl.createListener(transport)
+        .on('handshake', function (req, res) {
+          assert(!req.$isValid());
+          assert.equal(res.match, 'NONE');
+          done();
+        });
+    });
+
+    test('missing server message', function (done) {
+      var ptcl1 = createProtocol({protocol: 'Empty'});
+      var ptcl2 = createProtocol({
+        protocol: 'Heartbeat',
+        messages: {beat: {request: [], response: 'boolean'}}
+      });
+      var hash = new Buffer(ptcl2._hashString, 'binary');
+      var req = {
+        clientHash: hash,
+        clientProtocol: {string: ptcl2.toString()},
+        serverHash: hash
+      };
+      var transport = createTransport(
+        [HANDSHAKE_REQUEST_TYPE.toBuffer(req)],
+        []
+      );
+      ptcl1.createListener(transport)
+        .on('handshake', function (req, res) {
+          assert(req.$isValid());
+          assert.equal(res.match, 'NONE');
+          var msg = res.meta.map.error.toString();
+          assert(/missing server message/.test(msg));
+          done();
+        });
+    });
+
+    test('invalid metadata', function (done) {
+      var ptcl = createProtocol({
+        protocol: 'Heartbeat',
+        messages: {beat: {request: [], response: 'boolean'}}
+      });
+      var transports = createPassthroughTransports();
+      ptcl.createListener(transports[1])
+        .on('error', function (err) {
+          assert(/invalid metadata/.test(err.message));
+          done();
+        });
+      ptcl.createEmitter(transports[0])
+        .on('handshake', function () {
+          // Handshake is complete now.
+          var writable = transports[0].writable;
+          writable.write(frame(new Buffer([0]))); // Empty metadata.
+          writable.write(frame(new Buffer(0)));
+        });
+    });
+
+    test('unknown message', function (done) {
+      var ptcl = createProtocol({
+        protocol: 'Heartbeat',
+        messages: {beat: {request: [], response: 'boolean'}}
+      });
+      var transports = createPassthroughTransports();
+      var ee = ptcl.createListener(transports[1])
+        .on('eot', function () {
+          transports[1].writable.end();
+        });
+      ptcl.createEmitter(transports[0])
+        .on('handshake', function () {
+          // Handshake is complete now.
+          this.destroy();
+          var idType = ee._idType;
+          var bufs = [];
+          transports[0].readable
+            .pipe(new protocols.streams.MessageDecoder())
+            .on('data', function (buf) { bufs.push(buf); })
+            .on('end', function () {
+              assert.equal(bufs.length, 1);
+              var tap = new utils.Tap(bufs[0]);
+              idType._read(tap);
+              assert(tap.buf[tap.pos++]); // Error byte.
+              tap.pos++; // Union marker.
+              assert(/unknown message/.test(tap.readString()));
+              done();
+            });
+          [
+            idType.toBuffer(-1),
+            new Buffer([4, 104, 105]), // `hi` message.
+            new Buffer(0) // End of frame.
+          ].forEach(function (buf) {
+            transports[0].writable.write(frame(buf));
+          });
+          transports[0].writable.end();
+        });
+    });
+
+    test('invalid request', function (done) {
+      var ptcl = createProtocol({
+        protocol: 'Heartbeat',
+        messages: {beat: {
+          request: [{name: 'id', type: 'string'}],
+          response: 'boolean'
+        }}
+      });
+      var transports = createPassthroughTransports();
+      var ee = ptcl.createListener(transports[1])
+        .on('eot', function () { transports[1].writable.end(); });
+      ptcl.createEmitter(transports[0])
+        .on('handshake', function () {
+          // Handshake is complete now.
+          this.destroy();
+          var idType = ee._idType;
+          var bufs = [];
+          transports[0].readable
+            .pipe(new protocols.streams.MessageDecoder())
+            .on('data', function (buf) { bufs.push(buf); })
+            .on('end', function () {
+              assert.equal(bufs.length, 1);
+              var tap = new utils.Tap(bufs[0]);
+              idType._read(tap);
+              assert.equal(tap.buf[tap.pos++], 1); // Error byte.
+              assert.equal(tap.buf[tap.pos++], 0); // Union marker.
+              assert(/invalid request/.test(tap.readString()));
+              done();
+            });
+          [
+            idType.toBuffer(-1),
+            new Buffer([8, 98, 101, 97, 116]), // `beat` message.
+            new Buffer([8]), // Invalid Avro string encoding.
+            new Buffer(0) // End of frame.
+          ].forEach(function (buf) {
+            transports[0].writable.write(frame(buf));
+          });
+          transports[0].writable.end();
+        });
+    });
+
+    test('destroy', function (done) {
+      var ptcl = createProtocol({
+        protocol: 'Heartbeat',
+        messages: {beat: {request: [], response: 'boolean'}}
+      }).on('beat', function (req, ee, cb) {
+        ee.destroy();
+        setTimeout(function () { cb(null, true); }, 10);
+      });
+      var transports = createPassthroughTransports();
+      var responded = false;
+      ptcl.createListener(transports[1])
+        .on('eot', function () {
+          assert(responded); // Works because the transport is sync.
+          done();
+        });
+      ptcl.emit('beat', {}, ptcl.createEmitter(transports[0]), function () {
+        responded = true;
+      });
+    });
+
+  });
+
+  suite('StatelessListener', function () {
+
+    test('unknown message', function (done) {
+      var ptcl = createProtocol({
+        protocol: 'Heartbeat',
+        messages: {beat: {request: [], response: 'boolean'}}
+      });
+      var readable = new stream.PassThrough();
+      var writable = new stream.PassThrough();
+      var ee = ptcl.createListener(function (cb) {
+        cb(writable);
+        return readable;
+      });
+      var bufs = [];
+      writable.pipe(new protocols.streams.MessageDecoder())
+        .on('data', function (buf) { bufs.push(buf); })
+        .on('end', function () {
+          assert.equal(bufs.length, 1);
+          var tap = new utils.Tap(bufs[0]);
+          tap.pos = 4; // Skip handshake response.
+          ee._idType._read(tap); // Skip metadata.
+          assert.equal(tap.buf[tap.pos++], 1); // Error.
+          assert.equal(tap.buf[tap.pos++], 0); // Union flag.
+          assert(/unknown message/.test(tap.readString()));
+          done();
+        });
+      var hash = new Buffer(ptcl._hashString, 'binary');
+      var req = {
+        clientHash: hash,
+        clientProtocol: null,
+        serverHash: hash
+      };
+      var encoder = new protocols.streams.MessageEncoder(64);
+      encoder.pipe(readable);
+      encoder.end(Buffer.concat([
+        HANDSHAKE_REQUEST_TYPE.toBuffer(req),
+        new Buffer([0]), // Empty metadata.
+        new Buffer([4, 104, 105]) // `id` message.
+      ]));
+    });
+
+    test('late writable', function (done) {
+      var ptcl = createProtocol({
+        protocol: 'Heartbeat',
+        messages: {beat: {request: [], response: 'boolean'}}
+      }).on('beat', function (req, ee, cb) {
+        cb(null, true);
+      });
+      var readable = new stream.PassThrough();
+      var writable = new stream.PassThrough();
+      ptcl.createListener(function (cb) {
+        setTimeout(function () { cb(readable); }, 10);
+        return writable;
+      });
+      var ee = ptcl.createEmitter(function (cb) {
+        cb(readable);
+        return writable;
+      });
+      ptcl.emit('beat', {}, ee, function (err, res) {
+        assert.strictEqual(err, null);
+        assert.equal(res, true);
+        done();
+      });
+    });
+
+  });
+
+  suite('emit', function () {
+
+    suite('stateful', function () {
+
+      run(function (emitterPtcl, listenerPtcl, cb) {
+        var pt1 = new stream.PassThrough();
+        var pt2 = new stream.PassThrough();
+        var opts = {bufferSize: 48};
+        cb(
+          emitterPtcl.createEmitter({readable: pt1, writable: pt2}, opts),
+          listenerPtcl.createListener({readable: pt2, writable: pt1}, opts)
+        );
+      });
+
+    });
+
+    suite('stateless', function () {
+
+      run(function (emitterPtcl, listenerPtcl, cb) {
+        cb(emitterPtcl.createEmitter(writableFactory));
+
+        function writableFactory(emitterCb) {
+          var reqPt = new stream.PassThrough()
+            .on('finish', function () {
+              listenerPtcl.createListener(function (listenerCb) {
+                var resPt = new stream.PassThrough()
+                  .on('finish', function () { emitterCb(resPt); });
+                listenerCb(resPt);
+                return reqPt;
+              });
+            });
+          return reqPt;
+        }
+      });
+
+    });
+
+    function run(setupFn) {
+
+      test('single', function (done) {
+        var ptcl = createProtocol({
+          protocol: 'Math',
+          messages: {
+            negate: {
+              request: [{name: 'n', type: 'int'}],
+              response: 'int'
+            }
+          }
+        });
+        setupFn(ptcl, ptcl, function (ee) {
+          ee.on('eot', function () { done(); });
+          ptcl.on('negate', function (req, ee, cb) { cb(null, -req.n); });
+          ptcl.emit('negate', {n: 20}, ee, function (err, res) {
+            assert.equal(this, ptcl);
+            assert.strictEqual(err, null);
+            assert.equal(res, -20);
+            this.emit('negate', {n: 'hi'}, ee, function (err) {
+              assert(/invalid "int"/.test(err.string));
+              ee.destroy();
+            });
+          });
+        });
+      });
+
+      test('invalid request', function (done) {
+        var ptcl = createProtocol({
+          protocol: 'Math',
+          messages: {
+            negate: {
+              request: [{name: 'n', type: 'int'}],
+              response: 'int'
+            }
+          }
+        }).on('negate', function () { assert(false); });
+        setupFn(ptcl, ptcl, function (ee) {
+          ee.on('eot', function () { done(); });
+          ptcl.emit('negate', {n: 'a'}, ee, function (err) {
+            assert(/invalid "int"/.test(err.string), null);
+            ee.destroy();
+          });
+        });
+      });
+
+      test('error response', function (done) {
+        var msg = 'must be non-negative';
+        var ptcl = createProtocol({
+          protocol: 'Math',
+          messages: {
+            sqrt: {
+              request: [{name: 'n', type: 'float'}],
+              response: 'float'
+            }
+          }
+        }).on('sqrt', function (req, ee, cb) {
+          var n = req.n;
+          if (n < 0) {
+            cb({string: msg});
+          } else {
+            cb(null, Math.sqrt(n));
+          }
+        });
+        setupFn(ptcl, ptcl, function (ee) {
+          ptcl.emit('sqrt', {n: 100}, ee, function (err, res) {
+            assert(Math.abs(res - 10) < 1e-5);
+            ptcl.emit('sqrt', {n: - 10}, ee, function (err) {
+              assert.equal(this, ptcl);
+              assert.equal(err.string, msg);
+              done();
+            });
+          });
+        });
+      });
+
+      test('invalid response', function (done) {
+        var ptcl = createProtocol({
+          protocol: 'Math',
+          messages: {
+            sqrt: {
+              request: [{name: 'n', type: 'float'}],
+              response: 'float'
+            }
+          }
+        }).on('sqrt', function (req, ee, cb) {
+          var n = req.n;
+          if (n < 0) {
+            cb(null, 'complex'); // Invalid response.
+          } else {
+            cb(null, Math.sqrt(n));
+          }
+        });
+        setupFn(ptcl, ptcl, function (ee) {
+          ptcl.emit('sqrt', {n: - 10}, ee, function (err) {
+            // The server error message is propagated to the client.
+            assert(/invalid "float"/.test(err.string));
+            ptcl.emit('sqrt', {n: 100}, ee, function (err, res) {
+              // And the server doesn't die (we can make a new request).
+              assert(Math.abs(res - 10) < 1e-5);
+              done();
+            });
+          });
+        });
+      });
+
+      test('invalid error', function (done) {
+        var ptcl = createProtocol({
+          protocol: 'Math',
+          messages: {
+            sqrt: {
+              request: [{name: 'n', type: 'float'}],
+              response: 'float'
+            }
+          }
+        }).on('sqrt', function (req, ee, cb) {
+          var n = req.n;
+          if (n < 0) {
+            cb({error: 'complex'}); // Invalid error.
+          } else {
+            cb(null, Math.sqrt(n));
+          }
+        });
+        setupFn(ptcl, ptcl, function (ee) {
+          ptcl.emit('sqrt', {n: - 10}, ee, function (err) {
+            assert(/invalid \["string"\]/.test(err.string));
+            ptcl.emit('sqrt', {n: 100}, ee, function (err, res) {
+              // The server still doesn't die (we can make a new request).
+              assert(Math.abs(res - 10) < 1e-5);
+              done();
+            });
+          });
+        });
+      });
+
+      test('out of order', function (done) {
+        var ptcl = createProtocol({
+          protocol: 'Delay',
+          messages: {
+            wait: {
+              request: [
+                {name: 'ms', type: 'float'},
+                {name: 'id', type: 'string'}
+              ],
+              response: 'string'
+            }
+          }
+        }).on('wait', function (req, ee, cb) {
+          var delay = req.ms;
+          if (delay < 0) {
+            cb(new Error('delay must be non-negative'));
+            return;
+          }
+          setTimeout(function () { cb(null, req.id); }, delay);
+        });
+        var ids = [];
+        setupFn(ptcl, ptcl, function (ee) {
+          ee.on('eot', function (pending) {
+            assert.equal(pending, 0);
+            assert.deepEqual(ids, [null, 'b', 'a']);
+            done();
+          });
+          ptcl.emit('wait', {ms: 100, id: 'a'}, ee, function (err, res) {
+            assert.strictEqual(err, null);
+            ids.push(res);
+          });
+          ptcl.emit('wait', {ms: 10, id: 'b'}, ee, function (err, res) {
+            assert.strictEqual(err, null);
+            ids.push(res);
+            ee.destroy();
+          });
+          ptcl.emit('wait', {ms: -100, id: 'c'}, ee, function (err, res) {
+            assert(/non-negative/.test(err.string));
+            ids.push(res);
+          });
+        });
+      });
+
+      test('compatible protocols', function (done) {
+        var emitterPtcl = createProtocol({
+          protocol: 'emitterProtocol',
+          messages: {
+            age: {
+              request: [{name: 'name', type: 'string'}],
+              response: 'long'
+            }
+          }
+        });
+        var listenerPtcl = createProtocol({
+          protocol: 'serverProtocol',
+          messages: {
+            age: {
+              request: [
+                {name: 'name', type: 'string'},
+                {name: 'address', type: ['null', 'string'], 'default': null}
+              ],
+              response: 'int'
+            },
+            id: {
+              request: [{name: 'name', type: 'string'}],
+              response: 'long'
+            }
+          }
+        });
+        setupFn(
+          emitterPtcl,
+          listenerPtcl,
+          function (ee) {
+            listenerPtcl.on('age', function (req, ee, cb) {
+              assert.equal(req.name, 'Ann');
+              cb(null, 23);
+            });
+            emitterPtcl.emit('age', {name: 'Ann'}, ee, function (err, res) {
+              assert.strictEqual(err, null);
+              assert.equal(res, 23);
+              done();
+            });
+          }
+        );
+      });
+
+      test('cached compatible protocols', function (done) {
+        var ptcl1 = createProtocol({
+          protocol: 'emitterProtocol',
+          messages: {
+            age: {
+              request: [{name: 'name', type: 'string'}],
+              response: 'long'
+            }
+          }
+        });
+        var ptcl2 = createProtocol({
+          protocol: 'serverProtocol',
+          messages: {
+            age: {
+              request: [
+                {name: 'name', type: 'string'},
+                {name: 'address', type: ['null', 'string'], 'default': null}
+              ],
+              response: 'int'
+            },
+            id: {
+              request: [{name: 'name', type: 'string'}],
+              response: 'long'
+            }
+          }
+        }).on('age', function (req, ee, cb) { cb(null, 48); });
+        setupFn(
+          ptcl1,
+          ptcl2,
+          function (ee1) {
+            ptcl1.emit('age', {name: 'Ann'}, ee1, function (err, res) {
+              assert.equal(res, 48);
+              setupFn(
+                ptcl1,
+                ptcl2,
+                function (ee2) { // ee2 has the server's protocol.
+                  ptcl1.emit('age', {name: 'Bob'}, ee2, function (err, res) {
+                    assert.equal(res, 48);
+                    done();
+                  });
+                }
+              );
+            });
+          }
+        );
+      });
+
+      test('incompatible protocols', function (done) {
+        var emitterPtcl = createProtocol({
+          protocol: 'emitterProtocol',
+          messages: {
+            age: {request: [{name: 'name', type: 'string'}], response: 'long'}
+          }
+        });
+        var listenerPtcl = createProtocol({
+          protocol: 'serverProtocol',
+          messages: {
+            age: {request: [{name: 'name', type: 'int'}], response: 'long'}
+          }
+        }).on('age', function (req, ee, cb) { cb(null, 0); });
+        setupFn(
+          emitterPtcl,
+          listenerPtcl,
+          function (ee) {
+            ee.on('error', function () {}); // For stateful protocols.
+            emitterPtcl.emit('age', {name: 'Ann'}, ee, function (err) {
+              assert(err);
+              done();
+            });
+          }
+        );
+      });
+
+      test('unknown message', function (done) {
+        var ptcl = createProtocol({protocol: 'Empty'});
+        setupFn(ptcl, ptcl, function (ee) {
+          ptcl.emit('echo', {}, ee, function (err) {
+            assert(/unknown/.test(err.string));
+            done();
+          });
+        });
+      });
+
+      test('unsupported message', function (done) {
+        var ptcl = createProtocol({
+          protocol: 'Echo',
+          messages: {
+            echo: {
+              request: [{name: 'id', type: 'string'}],
+              response: 'string'
+            }
+          }
+        });
+        setupFn(ptcl, ptcl, function (ee) {
+          ptcl.emit('echo', {id: ''}, ee, function (err) {
+            assert(/unsupported/.test(err.string));
+            done();
+          });
+        });
+      });
+
+      test('destroy emitter noWait', function (done) {
+        var ptcl = createProtocol({
+          protocol: 'Delay',
+          messages: {
+            wait: {
+              request: [{name: 'ms', type: 'int'}],
+              response: 'string'
+            }
+          }
+        }).on('wait', function (req, ee, cb) {
+            setTimeout(function () { cb(null, 'ok'); }, req.ms);
+          });
+        var interrupted = 0;
+        var eoted = false;
+        setupFn(ptcl, ptcl, function (ee) {
+          ee.on('eot', function (pending) {
+            eoted = true;
+            assert.equal(interrupted, 2);
+            assert.equal(pending, 2);
+            done();
+          });
+          ptcl.emit('wait', {ms: 75}, ee, interruptedCb);
+          ptcl.emit('wait', {ms: 50}, ee, interruptedCb);
+          ptcl.emit('wait', {ms: 10}, ee, function (err, res) {
+            assert.equal(res, 'ok');
+            ee.destroy(true);
+          });
+
+          function interruptedCb(err) {
+            assert(/interrupted/.test(err.string));
+            interrupted++;
+          }
+        });
+      });
+
+      test('destroy emitter', function (done) {
+        var ptcl = createProtocol({
+          protocol: 'Math',
+          messages: {
+            negate: {
+              request: [{name: 'n', type: 'int'}],
+              response: 'int'
+            }
+          }
+        });
+        setupFn(ptcl, ptcl, function (ee) {
+          ptcl.on('negate', function (req, ee, cb) { cb(null, -req.n); });
+          ptcl.emit('negate', {n: 20}, ee, function (err, res) {
+            assert.strictEqual(err, null);
+            assert.equal(res, -20);
+            ee.destroy();
+            this.emit('negate', {n: 'hi'}, ee, function (err) {
+              assert(/destroyed/.test(err.string));
+              done();
+            });
+          });
+        });
+      });
+
+    }
+
+  });
+
+  test('throw error', function () {
+    assert(!tryCatch(null));
+    assert.equal(tryCatch(new Error('hi')), 'hi');
+    assert.equal(tryCatch('hi'), 'hi');
+    assert.equal(tryCatch({string: 'hi'}), 'hi');
+
+    function tryCatch(err) {
+      try {
+        protocols.throwError(err);
+      } catch (err_) {
+        return err_.message;
+      }
+    }
+  });
+
+});
+
+// Helpers.
+
+// Message framing.
+function frame(buf) {
+  var framed = new Buffer(buf.length + 4);
+  framed.writeInt32BE(buf.length);
+  buf.copy(framed, 4);
+  return framed;
+}
+
+function createReadableTransport(bufs, frameSize) {
+  return createReadableStream(bufs)
+    .pipe(new protocols.streams.MessageEncoder(frameSize || 64));
+}
+
+function createWritableTransport(bufs) {
+  var decoder = new protocols.streams.MessageDecoder();
+  decoder.pipe(createWritableStream(bufs));
+  return decoder;
+}
+
+function createTransport(readBufs, writeBufs) {
+  return toDuplex(
+    createReadableTransport(readBufs),
+    createWritableTransport(writeBufs)
+  );
+}
+
+function createPassthroughTransports() {
+  var pt1 = stream.PassThrough();
+  var pt2 = stream.PassThrough();
+  return [{readable: pt1, writable: pt2}, {readable: pt2, writable: pt1}];
+}
+
+// Simplified stream constructor API isn't available in earlier node versions.
+
+function createReadableStream(bufs) {
+  var n = 0;
+  function Stream() { stream.Readable.call(this); }
+  util.inherits(Stream, stream.Readable);
+  Stream.prototype._read = function () {
+    this.push(bufs[n++] || null);
+  };
+  var readable = new Stream();
+  return readable;
+}
+
+function createWritableStream(bufs) {
+  function Stream() { stream.Writable.call(this); }
+  util.inherits(Stream, stream.Writable);
+  Stream.prototype._write = function (buf, encoding, cb) {
+    bufs.push(buf);
+    cb();
+  };
+  return new Stream();
+}
+
+// Combine two (binary) streams into a single duplex one. This is very basic
+// and doesn't handle a lot of cases (e.g. where `_read` doesn't return
+// something).
+function toDuplex(readable, writable) {
+  function Stream() {
+    stream.Duplex.call(this);
+    this.on('finish', function () { writable.end(); });
+  }
+  util.inherits(Stream, stream.Duplex);
+  Stream.prototype._read = function () {
+    this.push(readable.read());
+  };
+  Stream.prototype._write = function (buf, encoding, cb) {
+    writable.write(buf);
+    cb();
+  };
+  return new Stream();
+}

http://git-wip-us.apache.org/repos/asf/avro/blob/133fafac/lang/js/test/test_schemas.js
----------------------------------------------------------------------
diff --git a/lang/js/test/test_schemas.js b/lang/js/test/test_schemas.js
index cd9a727..b6d197b 100644
--- a/lang/js/test/test_schemas.js
+++ b/lang/js/test/test_schemas.js
@@ -1926,6 +1926,17 @@ suite('types', function () {
       assert.deepEqual(person.age, 12);
       assert.deepEqual(person.time, date);
       assert.throws(function () { derived.toBuffer({age: -1, date: date}); });
+
+      var invalid = {age: -1, time: date};
+      assert.throws(function () { derived.toBuffer(invalid); });
+      var hasError = false;
+      derived.isValid(invalid, {errorHook: function (path, any, type) {
+        hasError = true;
+        assert.deepEqual(path, ['age']);
+        assert.equal(any, -1);
+        assert(type instanceof AgeType);
+      }});
+      assert(hasError);
     });
 
     test('recursive', function () {


Mime
View raw message