tinkerpop-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From davebs...@apache.org
Subject [04/14] tinkerpop git commit: added code for new driver, updated driver tests
Date Tue, 28 Feb 2017 15:48:06 GMT
added code for new driver, updated driver tests


Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/0e0d4666
Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/0e0d4666
Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/0e0d4666

Branch: refs/heads/master
Commit: 0e0d4666f7a7c3683fc10e032809ad5de7fb3727
Parents: 3b3a1da
Author: davebshow <davebshow@gmail.com>
Authored: Mon Jan 30 17:22:36 2017 -0500
Committer: davebshow <davebshow@gmail.com>
Committed: Tue Feb 28 10:30:48 2017 -0500

----------------------------------------------------------------------
 gremlin-python/pom.xml                          |   1 +
 .../python/TraversalSourceGenerator.groovy      |  11 +-
 .../main/jython/gremlin_python/driver/client.py | 114 ++++++++
 .../jython/gremlin_python/driver/connection.py  |  78 ++++++
 .../driver/driver_remote_connection.py          | 260 +++----------------
 .../jython/gremlin_python/driver/protocol.py    | 103 ++++++++
 .../gremlin_python/driver/remote_connection.py  |  98 ++++---
 .../jython/gremlin_python/driver/request.py     |  25 ++
 .../jython/gremlin_python/driver/resultset.py   |  91 +++++++
 .../jython/gremlin_python/driver/serializer.py  | 117 +++++++++
 .../gremlin_python/driver/tornado/__init__.py   |  18 ++
 .../gremlin_python/driver/tornado/transport.py  |  48 ++++
 .../jython/gremlin_python/driver/transport.py   |  46 ++++
 .../jython/gremlin_python/process/traversal.py  |  11 +-
 gremlin-python/src/main/jython/setup.py         |  18 +-
 .../src/main/jython/tests/conftest.py           |  72 +++++
 .../src/main/jython/tests/driver/test_client.py |  98 +++++++
 .../driver/test_driver_remote_connection.py     | 150 ++++-------
 .../test_driver_remote_connection_threaded.py   |  77 +++---
 .../jython/tests/structure/io/test_graphson.py  |  21 +-
 20 files changed, 1039 insertions(+), 418 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/0e0d4666/gremlin-python/pom.xml
----------------------------------------------------------------------
diff --git a/gremlin-python/pom.xml b/gremlin-python/pom.xml
index c9d6c65..2b4a80e 100644
--- a/gremlin-python/pom.xml
+++ b/gremlin-python/pom.xml
@@ -314,6 +314,7 @@
                                         <param>aenum==1.4.5</param>
                                         <param>tornado==4.4.1</param>
                                         <param>six==1.10.0</param>
+                                        <param>futures==3.0.5</param>
                                     </libraries>
                                 </configuration>
                             </execution>

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/0e0d4666/gremlin-python/src/main/groovy/org/apache/tinkerpop/gremlin/python/TraversalSourceGenerator.groovy
----------------------------------------------------------------------
diff --git a/gremlin-python/src/main/groovy/org/apache/tinkerpop/gremlin/python/TraversalSourceGenerator.groovy b/gremlin-python/src/main/groovy/org/apache/tinkerpop/gremlin/python/TraversalSourceGenerator.groovy
index fc76b71..995fe80 100644
--- a/gremlin-python/src/main/groovy/org/apache/tinkerpop/gremlin/python/TraversalSourceGenerator.groovy
+++ b/gremlin-python/src/main/groovy/org/apache/tinkerpop/gremlin/python/TraversalSourceGenerator.groovy
@@ -116,15 +116,16 @@ class Traversal(object):
             return tempList
     def promise(self, cb=None):
         self.traversal_strategies.apply_async_strategies(self)
-        future_traversers = self.traversers
-        future = type(future_traversers)()
+        future_traversal = self.remote_results
+        future = type(future_traversal)()
         def process(f):
             try:
-                traversers = f.result()
+                traversal = f.result()
             except Exception as e:
                 future.set_exception(e)
             else:
-                self.traversers = iter(traversers)
+                self.traversers = iter(traversal.traversers)
+                self.side_effects = traversal.side_effects
                 if cb:
                     try:
                         result = cb(self)
@@ -134,7 +135,7 @@ class Traversal(object):
                         future.set_result(result)
                 else:
                     future.set_result(self)
-        future_traversers.add_done_callback(process)
+        future_traversal.add_done_callback(process)
         return future
 
 

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/0e0d4666/gremlin-python/src/main/jython/gremlin_python/driver/client.py
----------------------------------------------------------------------
diff --git a/gremlin-python/src/main/jython/gremlin_python/driver/client.py b/gremlin-python/src/main/jython/gremlin_python/driver/client.py
new file mode 100644
index 0000000..dec39bf
--- /dev/null
+++ b/gremlin-python/src/main/jython/gremlin_python/driver/client.py
@@ -0,0 +1,114 @@
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+"""
+import collections
+import functools
+
+from concurrent.futures import ThreadPoolExecutor
+
+from six.moves import queue
+
+from gremlin_python.driver import connection, protocol, request
+from gremlin_python.process import traversal
+
+# This is until concurrent.futures backport 3.1.0 release
+try:
+    from multiprocessing import cpu_count
+except ImportError:
+    # some platforms don't have multiprocessing
+    def cpu_count():
+        return None
+
+__author__ = 'David M. Brown (davebshow@gmail.com)'
+
+
+class Client:
+
+    def __init__(self, url, traversal_source, protocol_factory=None,
+                 transport_factory=None, pool_size=None, max_workers=None,
+                 message_serializer=None, username="", password=""):
+        self._url = url
+        self._traversal_source = traversal_source
+        self._message_serializer = message_serializer
+        self._username = username
+        self._password = password
+        if transport_factory is None:
+            try:
+                from gremlin_python.driver.tornado.transport import (
+                    TornadoTransport)
+            except ImportError:
+                raise Exception("Please install Tornado or pass"
+                                "custom transport factory")
+            else:
+                transport_factory = lambda: TornadoTransport()
+        self._transport_factory = transport_factory
+        if protocol_factory is None:
+            protocol_factory = lambda: protocol.GremlinServerWSProtocol(
+                message_serializer=self._message_serializer,
+                username=self._username,
+                password=self._password)
+        self._protocol_factory = protocol_factory
+        if pool_size is None:
+            pool_size = 4
+        self._pool_size = pool_size
+        # This is until concurrent.futures backport 3.1.0 release
+        if max_workers is None:
+            # Use this number because ThreadPoolExecutor is often
+            # used to overlap I/O instead of CPU work.
+            max_workers = (cpu_count() or 1) * 5
+        self._executor = ThreadPoolExecutor(max_workers=max_workers)
+        # Threadsafe queue
+        self._pool = queue.Queue()
+        self._fill_pool()
+
+    @property
+    def executor(self):
+        return self._executor
+
+    @property
+    def traversal_source(self):
+        return self._traversal_source
+
+    def _fill_pool(self):
+        for i in range(self._pool_size):
+            conn = self._get_connection()
+            self._pool.put_nowait(conn)
+
+    def close(self):
+        while not self._pool.empty():
+            conn = self._pool.get(True)
+            conn.close()
+        self._executor.shutdown()
+
+    def _get_connection(self):
+        protocol = self._protocol_factory()
+        return connection.Connection(
+            self._url, self._traversal_source, protocol,
+            self._transport_factory, self._executor, self._pool)
+
+    def submit(self, message):
+        return self.submitAsync(message).result()
+
+    def submitAsync(self, message):
+        if isinstance(message, traversal.Bytecode):
+            message = request.RequestMessage(
+                processor='traversal', op='bytecode',
+                args={'gremlin': message,
+                      'aliases': {'g': self._traversal_source}})
+        conn = self._pool.get(True)
+        return conn.write(message)

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/0e0d4666/gremlin-python/src/main/jython/gremlin_python/driver/connection.py
----------------------------------------------------------------------
diff --git a/gremlin-python/src/main/jython/gremlin_python/driver/connection.py b/gremlin-python/src/main/jython/gremlin_python/driver/connection.py
new file mode 100644
index 0000000..abc4545
--- /dev/null
+++ b/gremlin-python/src/main/jython/gremlin_python/driver/connection.py
@@ -0,0 +1,78 @@
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+"""
+import uuid
+from concurrent.futures import Future
+from six.moves import queue
+
+from gremlin_python.driver import resultset
+
+__author__ = 'David M. Brown (davebshow@gmail.com)'
+
+
+class Connection:
+
+    def __init__(self, url, traversal_source, protocol, transport_factory,
+                 executor, pool):
+        self._url = url
+        self._traversal_source = traversal_source
+        self._protocol = protocol
+        self._transport_factory = transport_factory
+        self._executor = executor
+        self._transport = None
+        self._pool = pool
+        self._results = {}
+        self.connect()
+
+    def connect(self):
+        if self._transport:
+            self._transport.close()
+        self._transport = self._transport_factory()
+        self._transport.connect(self._url)
+        self._protocol.connection_made(self._transport)
+
+    def close(self):
+        self._transport.close()
+
+    def write(self, request_message):
+        request_id = str(uuid.uuid4())
+        result_set = resultset.ResultSet(queue.Queue(), request_id)
+        self._results[request_id] = result_set
+        # Create write task
+        future = Future()
+        future_write = self._executor.submit(
+            self._protocol.write, request_id, request_message)
+
+        def cb(f):
+            try:
+                f.result()
+            except Exception as e:
+                future.set_exception(e)
+            else:
+                # Start receive task
+                done = self._executor.submit(self._receive)
+                result_set.done = done
+                future.set_result(result_set)
+
+        future_write.add_done_callback(cb)
+        return future
+
+    def _receive(self):
+        data = self._transport.read()
+        self._protocol.data_received(data, self._results)
+        self._pool.put_nowait(self)

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/0e0d4666/gremlin-python/src/main/jython/gremlin_python/driver/driver_remote_connection.py
----------------------------------------------------------------------
diff --git a/gremlin-python/src/main/jython/gremlin_python/driver/driver_remote_connection.py b/gremlin-python/src/main/jython/gremlin_python/driver/driver_remote_connection.py
index 2cbe0e7..fb0e4ba 100644
--- a/gremlin-python/src/main/jython/gremlin_python/driver/driver_remote_connection.py
+++ b/gremlin-python/src/main/jython/gremlin_python/driver/driver_remote_connection.py
@@ -16,236 +16,50 @@ KIND, either express or implied.  See the License for the
 specific language governing permissions and limitations
 under the License.
 """
-import base64
-import functools
-import json
-import uuid
-from tornado import gen
-from tornado import concurrent
-from tornado import ioloop
-from tornado import websocket
+from concurrent.futures import Future
 
-from gremlin_python.structure.io.graphson import GraphSONReader, GraphSONWriter
-from .remote_connection import RemoteConnection
-from .remote_connection import RemoteTraversal
-from .remote_connection import RemoteTraversalSideEffects
+from gremlin_python.driver import client
+from gremlin_python.driver.remote_connection import (
+    RemoteConnection, RemoteTraversal, RemoteTraversalSideEffects)
 
-
-class GremlinServerError(Exception):
-    pass
+__author__ = 'David M. Brown (davebshow@gmail.com)'
 
 
 class DriverRemoteConnection(RemoteConnection):
-    def __init__(self, url, traversal_source, username="", password="", loop=None, graphson_reader=None, graphson_writer=None):
-        super(DriverRemoteConnection, self).__init__(url, traversal_source)
-        self._url = url
-        self._username = username
-        self._password = password
-        if loop is None:
-            loop = ioloop.IOLoop.current()
-        self._loop = loop
-        self._websocket = self._loop.run_sync(lambda: websocket.websocket_connect(self.url))
-        self._graphson_reader = graphson_reader or GraphSONReader()
-        self._graphson_writer = graphson_writer or GraphSONWriter()
-
-    def submit(self, bytecode):
-        '''
-        :param bytecode: the bytecode of a traversal to submit to the RemoteConnection
-        :return: a RemoteTraversal with RemoteTraversalSideEffects
-        '''
-        request_id = str(uuid.uuid4())
-        traversers = self._loop.run_sync(lambda: self.submit_traversal_bytecode(request_id, bytecode))
-        keys, value, close = self._get_side_effect_lambdas(request_id)
-        return RemoteTraversal(iter(traversers), RemoteTraversalSideEffects(keys, value, close, self._loop))
-
-    def submit_async(self, bytecode):
-        request_id = str(uuid.uuid4())
-        future_traversers = self.submit_traversal_bytecode(request_id, bytecode)
-        keys, value, close = self._get_side_effect_lambdas(request_id)
-        side_effects = RemoteTraversalSideEffects(keys, value, close, self._loop)
-        return RemoteTraversal(future_traversers, side_effects)
-
-    @gen.coroutine
-    def submit_traversal_bytecode(self, request_id, bytecode):
-        message = {
-            "requestId": {
-                "@type": "g:UUID",
-                "@value": request_id
-            },
-            "op": "bytecode",
-            "processor": "traversal",
-            "args": {
-                "gremlin": self._graphson_writer.toDict(bytecode),
-                "aliases": {"g": self.traversal_source}
-            }
-        }
-        traversers = yield self._execute_message(message)
-        raise gen.Return(traversers)
-
-    @gen.coroutine
-    def submit_sideEffect_keys(self, request_id):
-        message = {
-            "requestId": {
-                "@type": "g:UUID",
-                "@value": str(uuid.uuid4())
-            },
-            "op": "keys",
-            "processor": "traversal",
-            "args": {
-                "sideEffect": {
-                    "@type": "g:UUID",
-                    "@value": request_id
-                }
-            }
-        }
-        keys = yield self._execute_message(message)
-        raise gen.Return(set(keys))
-
-    @gen.coroutine
-    def submit_sideEffect_value(self, request_id, key):
-        message = {
-            "requestId": {
-                "@type": "g:UUID",
-                "@value": str(uuid.uuid4())
-            },
-            "op": "gather",
-            "processor": "traversal",
-            "args": {
-                "sideEffect": {
-                    "@type": "g:UUID",
-                    "@value": request_id
-                },
-                "sideEffectKey": key,
-                "aliases": {"g": self.traversal_source}
-            }
-        }
-        try:
-            value = yield self._execute_message(message)
-        except:
-            raise KeyError(key)
-        raise gen.Return(value)
-
-    @gen.coroutine
-    def submit_sideEffect_close(self, request_id):
-        message = {
-            "requestId": {
-                "@type": "g:UUID",
-                "@value": str(uuid.uuid4())
-            },
-            "op": "close",
-            "processor": "traversal",
-            "args": {
-                "sideEffect": {
-                    "@type": "g:UUID",
-                    "@value": request_id
-                }
-            }
-        }
-        result = yield self._execute_message(message)
-        raise gen.Return(result)
 
-    def _get_side_effect_lambdas(self, request_id):
-        side_effect_keys = lambda: self._loop.run_sync(lambda: self.submit_sideEffect_keys(request_id))
-        side_effect_value = lambda key: self._loop.run_sync(lambda: self.submit_sideEffect_value(request_id, key))
-        side_effect_close = lambda: self._loop.run_sync(lambda: self.submit_sideEffect_close(request_id))
-        return side_effect_keys, side_effect_value, side_effect_close
-
-    @gen.coroutine
-    def _execute_message(self, send_message):
-        send_message = b"".join([b"\x21",
-                                 b"application/vnd.gremlin-v2.0+json",
-                                 json.dumps(send_message, separators=(',', ':')).encode("utf-8")])
-        if self._websocket.protocol is None:
-            self._websocket = yield websocket.websocket_connect(self.url)
-        self._websocket.write_message(send_message, binary=True)
-        response = Response(self._websocket, self._username, self._password, self._graphson_reader)
-        results = None
-        while True:
-            recv_message = yield response.receive()
-            if recv_message is None:
-                break
-            aggregateTo = recv_message[0]
-            # on first message, get the right result data structure
-            if None == results:
-                if "list" == aggregateTo:
-                    results = []
-                elif "set" == aggregateTo:
-                    results = set()
-                elif aggregateTo in ["map", "bulkset"]:
-                    results = {}
-                elif "none" == aggregateTo:
-                    results = None
-                else:
-                    results = []
-
-            # if there is no update to a structure, then the item is the result
-            if results is None:
-                results = recv_message[1][0]
-            # updating a map is different than a list or a set
-            elif isinstance(results, dict):
-                if "map" == aggregateTo:
-                    for item in recv_message[1]:
-                        results.update(item)
-                else:
-                    for item in recv_message[1]:
-                        results[item.object] = item.bulk
-            # flat add list to result list
-            else:
-                results += recv_message[1]
-        raise gen.Return([] if None == results else results)
+    def __init__(self, url, traversal_source, protocol_factory=None,
+                 transport_factory=None, pool_size=None, max_workers=None,
+                 username="", password=""):
+        self._client = client.Client(url, traversal_source, protocol_factory,
+                                     transport_factory, pool_size, max_workers,
+                                     None, username, password)
+        self._url = self._client._url
+        self._traversal_source = self._client._traversal_source
 
     def close(self):
-        self._websocket.close()
-
+        self._client.close()
 
-class Response:
-    def __init__(self, websocket, username, password, graphson_reader):
-        self._websocket = websocket
-        self._username = username
-        self._password = password
-        self._closed = False
-        self._graphson_reader = graphson_reader
-
-    @gen.coroutine
-    def receive(self):
-        if self._closed:
-            return
-        recv_message = yield self._websocket.read_message()
-        recv_message = json.loads(recv_message.decode('utf-8'))
-        status_code = recv_message["status"]["code"]
-        aggregateTo = recv_message["result"]["meta"].get("aggregateTo", "list")
+    def submit(self, bytecode):
+        result_set = self._client.submit(bytecode)
+        results = result_set.all().result()
+        side_effects = RemoteTraversalSideEffects(result_set.request_id,
+                                                  self._client)
+        return RemoteTraversal(iter(results), side_effects)
+
+    def submitAsync(self, bytecode):
+        future = Future()
+        future_result_set = self._client.submitAsync(bytecode)
+
+        def cb(f):
+            try:
+                result_set = f.result()
+            except Exception as e:
+                future.set_exception(e)
+            else:
+                results = result_set.all().result()
+                side_effects = RemoteTraversalSideEffects(result_set.request_id,
+                                                          self._client)
+                future.set_result(RemoteTraversal(iter(results), side_effects))
 
-        # authentification required then
-        if status_code == 407:
-            self._websocket.write_message(
-                b"".join([b"\x21",
-                          b"application/vnd.gremlin-v2.0+json",
-                          json.dumps({
-                              "requestId": {
-                                  "@type": "g:UUID",
-                                  "@value": str(uuid.uuid4())
-                              },
-                              "op": "authentication",
-                              "processor": "traversal",
-                              "args": {
-                                  "sasl": base64.b64encode(
-                                      b"".join([b"\x00", self._username.encode("utf-8"),
-                                                b"\x00", self._password.encode("utf-8")])).decode()
-                              }
-                          }, separators=(',', ':')).encode("utf-8")]), binary=True)
-            results = yield self.receive()
-            raise gen.Return(results)
-        elif status_code == 204:
-            self._closed = True
-            return
-        elif status_code in [200, 206]:
-            results = []
-            for item in recv_message["result"]["data"]:
-                results.append(self._graphson_reader.toObject(item))
-            if status_code == 200:
-                self._closed = True
-            raise gen.Return((aggregateTo, results))
-        else:
-            self._closed = True
-            raise GremlinServerError(
-                "{0}: {1}".format(status_code, recv_message["status"]["message"]))
+        future_result_set.add_done_callback(cb)
+        return future

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/0e0d4666/gremlin-python/src/main/jython/gremlin_python/driver/protocol.py
----------------------------------------------------------------------
diff --git a/gremlin-python/src/main/jython/gremlin_python/driver/protocol.py b/gremlin-python/src/main/jython/gremlin_python/driver/protocol.py
new file mode 100644
index 0000000..2ace35e
--- /dev/null
+++ b/gremlin-python/src/main/jython/gremlin_python/driver/protocol.py
@@ -0,0 +1,103 @@
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+"""
+import abc
+import base64
+import collections
+import json
+import uuid
+
+import six
+
+from gremlin_python.driver import serializer, request
+
+__author__ = 'David M. Brown (davebshow@gmail.com)'
+
+
+class GremlinServerError(Exception):
+    pass
+
+
+@six.add_metaclass(abc.ABCMeta)
+class AbstractBaseProtocol:
+
+    @abc.abstractmethod
+    def connection_made(self, transport):
+        self._transport = transport
+
+    @abc.abstractmethod
+    def data_received(self, message):
+        pass
+
+    @abc.abstractmethod
+    def write(self, request_id, request_message):
+        pass
+
+
+class GremlinServerWSProtocol(AbstractBaseProtocol):
+
+    def __init__(self, message_serializer=None, username='', password=''):
+        if message_serializer is None:
+            message_serializer = serializer.GraphSONMessageSerializer()
+        self._message_serializer = message_serializer
+        self._username = username
+        self._password = password
+
+    def connection_made(self, transport):
+        super(GremlinServerWSProtocol, self).connection_made(transport)
+
+    def write(self, request_id, request_message):
+        message = self._message_serializer.serialize_message(
+            request_id, request_message)
+        self._transport.write(message)
+
+    def data_received(self, data, results_dict):
+        data = json.loads(data.decode('utf-8'))
+        request_id = data['requestId']
+        result_set = results_dict[request_id]
+        status_code = data['status']['code']
+        aggregate_to = data['result']['meta'].get('aggregateTo', 'list')
+        result_set.aggregate_to = aggregate_to
+        if status_code == 407:
+            auth = b''.join([b'\x00', self._username.encode('utf-8'),
+                             b'\x00', self._password.encode('utf-8')])
+            request_message = request.RequestMessage(
+                'traversal', 'authentication',
+                {'sasl': base64.b64encode(auth).decode()})
+            self.write(request_id, request_message)
+            data = self._transport.read()
+            self.data_received(data, results_dict)
+        elif status_code == 204:
+            result_set.stream.put_nowait([])
+            del results_dict[request_id]
+        elif status_code in [200, 206]:
+            results = []
+            for msg in data["result"]["data"]:
+                results.append(
+                    self._message_serializer.deserialize_message(msg))
+            result_set.stream.put_nowait(results)
+            if status_code == 206:
+                data = self._transport.read()
+                self.data_received(data, results_dict)
+            else:
+                # result_set.done.set_result(None)
+                del results_dict[request_id]
+        else:
+            del results_dict[request_id]
+            raise GremlinServerError(
+                "{0}: {1}".format(status_code, data["status"]["message"]))

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/0e0d4666/gremlin-python/src/main/jython/gremlin_python/driver/remote_connection.py
----------------------------------------------------------------------
diff --git a/gremlin-python/src/main/jython/gremlin_python/driver/remote_connection.py b/gremlin-python/src/main/jython/gremlin_python/driver/remote_connection.py
index f7ed48e..c48cfe3 100644
--- a/gremlin-python/src/main/jython/gremlin_python/driver/remote_connection.py
+++ b/gremlin-python/src/main/jython/gremlin_python/driver/remote_connection.py
@@ -17,11 +17,11 @@ specific language governing permissions and limitations
 under the License.
 '''
 import abc
+import collections
 import six
 
-from ..process.traversal import Traversal
-from ..process.traversal import TraversalStrategy
-from ..process.traversal import TraversalSideEffects
+from gremlin_python.driver import request
+from gremlin_python.process import traversal
 
 __author__ = 'Marko A. Rodriguez (http://markorodriguez.com)'
 
@@ -42,45 +42,53 @@ class RemoteConnection(object):
 
     @abc.abstractmethod
     def submit(self, bytecode):
-        print("sending " + bytecode + " to GremlinServer...")
-        return RemoteTraversal(iter([]), TraversalSideEffects())
+        pass
 
     def __repr__(self):
         return "remoteconnection[" + self._url + "," + self._traversal_source + "]"
 
 
-class RemoteTraversal(Traversal):
+class RemoteTraversal(traversal.Traversal):
     def __init__(self, traversers, side_effects):
-        Traversal.__init__(self, None, None, None)
+        super(RemoteTraversal, self).__init__(None, None, None)
         self.traversers = traversers
-        self.side_effects = side_effects
+        self._side_effects = side_effects
+
+    @property
+    def side_effects(self):
+        return self._side_effects
+
+    @side_effects.setter
+    def side_effects(self, val):
+        self._side_effects = val
 
 
-class RemoteTraversalSideEffects(TraversalSideEffects):
-    def __init__(self, keys_lambda, value_lambda, close_lambda, loop):
-        self._keys_lambda = keys_lambda
-        self._value_lambda = value_lambda
-        self._close_lambda = close_lambda
-        self._loop = loop
+class RemoteTraversalSideEffects(traversal.TraversalSideEffects):
+    def __init__(self, side_effect, client):
+        self._side_effect = side_effect
+        self._client = client
         self._keys = set()
         self._side_effects = {}
         self._closed = False
 
     def keys(self):
-        if self._loop._running:
-            raise RuntimeError("Cannot call side effect methods"
-                               "while event loop is running")
         if not self._closed:
-            self._keys = self._keys_lambda()
+            message = request.RequestMessage(
+                'traversal', 'keys',
+                {'sideEffect': self._side_effect,
+                'aliases': {'g': self._client.traversal_source}})
+            self._keys = set(self._client.submit(message).all().result())
         return self._keys
 
     def get(self, key):
-        if self._loop._running:
-            raise RuntimeError("Cannot call side effect methods"
-                               "while event loop is running")
+
         if not self._side_effects.get(key):
             if not self._closed:
-                results = self._value_lambda(key)
+                message = request.RequestMessage(
+                    'traversal', 'gather',
+                    {'sideEffect': self._side_effect, 'sideEffectKey': key,
+                     'aliases': {'g': self._client.traversal_source}})
+                results = self._aggregate_results(self._client.submit(message))
                 self._side_effects[key] = results
                 self._keys.add(key)
             else:
@@ -88,27 +96,57 @@ class RemoteTraversalSideEffects(TraversalSideEffects):
         return self._side_effects[key]
 
     def close(self):
-        if self._loop._running:
-            raise RuntimeError("Cannot call side effect methods"
-                               "while event loop is running")
-        results = self._close_lambda()
+        if not self._closed:
+            message = request.RequestMessage(
+                'traversal', 'close',
+                {'sideEffect': self._side_effect,
+                 'aliases': {'g': self._client._traversal_source}})
+            results = self._client.submit(message).all().result()
         self._closed = True
         return results
 
+    def _aggregate_results(self, result_set):
+        aggregates = {'list': [], 'set': set(), 'map': {}, 'bulkset': {},
+                      'none': None}
+        results = None
+        for msg in result_set:
+            if results is None:
+                aggregate_to = result_set.aggregate_to
+                results = aggregates.get(aggregate_to, [])
+            # on first message, get the right result data structure
+            # if there is no update to a structure, then the item is the result
+            if results is None:
+                results = msg[0]
+            # updating a map is different than a list or a set
+            elif isinstance(results, dict):
+                if aggregate_to == "map":
+                    for item in msg:
+                        results.update(item)
+                else:
+                    for item in msg:
+                        results[item.object] = item.bulk
+            elif isinstance(results, set):
+                results.update(msg)
+            # flat add list to result list
+            else:
+                results += msg
+        if results is None:
+            results = []
+        return results
+
 
-class RemoteStrategy(TraversalStrategy):
+class RemoteStrategy(traversal.TraversalStrategy):
     def __init__(self, remote_connection):
         self.remote_connection = remote_connection
 
     def apply(self, traversal):
         if traversal.traversers is None:
             remote_traversal = self.remote_connection.submit(traversal.bytecode)
+            traversal.remote_results = remote_traversal
             traversal.side_effects = remote_traversal.side_effects
             traversal.traversers = remote_traversal.traversers
 
     def apply_async(self, traversal):
         if traversal.traversers is None:
-            remote_traversal = self.remote_connection.submit_async(
+            traversal.remote_results = self.remote_connection.submitAsync(
                 traversal.bytecode)
-            traversal.side_effects = remote_traversal.side_effects
-            traversal.traversers = remote_traversal.traversers

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/0e0d4666/gremlin-python/src/main/jython/gremlin_python/driver/request.py
----------------------------------------------------------------------
diff --git a/gremlin-python/src/main/jython/gremlin_python/driver/request.py b/gremlin-python/src/main/jython/gremlin_python/driver/request.py
new file mode 100644
index 0000000..ac7b845
--- /dev/null
+++ b/gremlin-python/src/main/jython/gremlin_python/driver/request.py
@@ -0,0 +1,25 @@
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+"""
+import collections
+
+__author__ = 'David M. Brown (davebshow@gmail.com)'
+
+
+RequestMessage = collections.namedtuple(
+    'RequestMessage', ['processor', 'op', 'args'])

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/0e0d4666/gremlin-python/src/main/jython/gremlin_python/driver/resultset.py
----------------------------------------------------------------------
diff --git a/gremlin-python/src/main/jython/gremlin_python/driver/resultset.py b/gremlin-python/src/main/jython/gremlin_python/driver/resultset.py
new file mode 100644
index 0000000..cfdca5b
--- /dev/null
+++ b/gremlin-python/src/main/jython/gremlin_python/driver/resultset.py
@@ -0,0 +1,91 @@
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+"""
+from concurrent.futures import Future
+
+__author__ = 'David M. Brown (davebshow@gmail.com)'
+
+
+class ResultSet:
+
+    def __init__(self, stream, request_id):
+        self._stream = stream
+        self._request_id = request_id
+        self._done = None
+        self._aggregate_to = None
+
+    @property
+    def aggregate_to(self):
+        return self._aggregate_to
+
+    @aggregate_to.setter
+    def aggregate_to(self, val):
+        self._aggregate_to = val
+
+    @property
+    def request_id(self):
+        return self._request_id
+
+    @property
+    def stream(self):
+        return self._stream
+
+    def __iter__(self):
+        return self
+
+    def __next__(self):
+        result = self.one()
+        if not result:
+            raise StopIteration
+        return result
+
+    def next(self):
+        return self.__next__()
+
+    @property
+    def done(self):
+        return self._done
+
+    @done.setter
+    def done(self, future):
+        self._done = future
+
+    def one(self):
+        while not self.done.done():
+            if not self.stream.empty():
+                return self.stream.get_nowait()
+        if not self.stream.empty():
+            return self.stream.get_nowait()
+        return self.done.result()
+
+    def all(self):
+        future = Future()
+
+        def cb(f):
+            try:
+                f.result()
+            except Exception as e:
+                future.set_exception(e)
+            else:
+                results = []
+                while not self.stream.empty():
+                    results += self.stream.get_nowait()
+                future.set_result(results)
+
+        self.done.add_done_callback(cb)
+        return future

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/0e0d4666/gremlin-python/src/main/jython/gremlin_python/driver/serializer.py
----------------------------------------------------------------------
diff --git a/gremlin-python/src/main/jython/gremlin_python/driver/serializer.py b/gremlin-python/src/main/jython/gremlin_python/driver/serializer.py
new file mode 100644
index 0000000..10dcfd3
--- /dev/null
+++ b/gremlin-python/src/main/jython/gremlin_python/driver/serializer.py
@@ -0,0 +1,117 @@
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+"""
+try:
+    import ujson as json
+except ImportError:
+    import json
+
+from gremlin_python.structure.io import graphson
+
+__author__ = 'David M. Brown (davebshow@gmail.com)'
+
+
+class Processor:
+    """Base class for OpProcessor serialization system."""
+
+    _graphson_writer = graphson.GraphSONWriter()
+
+    def __init__(self, default_args):
+        self._default_args = default_args
+
+    def get_op_args(self, op, args):
+        op_method = getattr(self, op, None)
+        if not op_method:
+            raise Exception("Processor does not support op: {}".format(op))
+        args_ = self._default_args.get(op, dict()).copy()
+        args_.update(args)
+        return op_method(args_)
+
+
+class GraphSONMessageSerializer:
+    """Message serializer for GraphSON"""
+
+    _graphson_reader = graphson.GraphSONReader()
+
+    class traversal(Processor):
+
+        def authentication(self, args):
+            return args
+
+        def bytecode(self, args):
+            gremlin = args['gremlin']
+            args['gremlin'] = self._graphson_writer.toDict(gremlin)
+            aliases = args.get('aliases', '')
+            if not aliases:
+                aliases = {'g': 'g'}
+            args['aliases'] = aliases
+            return args
+
+        def close(self, args):
+            return self.keys(args)
+
+        def gather(self, args):
+            side_effect = args['sideEffect']
+            args['sideEffect'] = {'@type': 'g:UUID', '@value': side_effect}
+            aliases = args.get('aliases', '')
+            if not aliases:
+                aliases = {'g': 'g'}
+            args['aliases'] = aliases
+            return args
+
+        def keys(self, args):
+            side_effect = args['sideEffect']
+            args['sideEffect'] = {'@type': 'g:UUID', '@value': side_effect}
+            return args
+
+
+    def get_processor(self, processor):
+        processor = getattr(self, processor, None)
+        if not processor:
+            raise Exception("Unknown processor")
+        return processor({})
+
+    def serialize_message(self, request_id, request_message):
+        processor = request_message.processor
+        op = request_message.op
+        args = request_message.args
+        if not processor:
+            processor_obj = self.get_processor('standard')
+        else:
+            processor_obj = self.get_processor(processor)
+        args = processor_obj.get_op_args(op, args)
+        message = self.build_message(request_id, processor, op, args)
+        return message
+
+    def build_message(self, request_id, processor, op, args):
+        message = {
+            'requestId': {'@type': 'g:UUID', '@value': request_id},
+            'processor': processor,
+            'op': op,
+            'args': args
+        }
+        return self.finalize_message(message, b"\x21",
+                                     b"application/vnd.gremlin-v2.0+json")
+
+    def finalize_message(self, message, mime_len, mime_type):
+        message = json.dumps(message)
+        message = b''.join([mime_len, mime_type, message.encode('utf-8')])
+        return message
+
+    def deserialize_message(self, message):
+        return self._graphson_reader.toObject(message)

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/0e0d4666/gremlin-python/src/main/jython/gremlin_python/driver/tornado/__init__.py
----------------------------------------------------------------------
diff --git a/gremlin-python/src/main/jython/gremlin_python/driver/tornado/__init__.py b/gremlin-python/src/main/jython/gremlin_python/driver/tornado/__init__.py
new file mode 100644
index 0000000..17b49a5
--- /dev/null
+++ b/gremlin-python/src/main/jython/gremlin_python/driver/tornado/__init__.py
@@ -0,0 +1,18 @@
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+"""

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/0e0d4666/gremlin-python/src/main/jython/gremlin_python/driver/tornado/transport.py
----------------------------------------------------------------------
diff --git a/gremlin-python/src/main/jython/gremlin_python/driver/tornado/transport.py b/gremlin-python/src/main/jython/gremlin_python/driver/tornado/transport.py
new file mode 100644
index 0000000..cc218e9
--- /dev/null
+++ b/gremlin-python/src/main/jython/gremlin_python/driver/tornado/transport.py
@@ -0,0 +1,48 @@
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+"""
+from tornado import ioloop, gen
+from tornado import websocket
+
+from gremlin_python.driver.transport import AbstractBaseTransport
+
+__author__ = 'David M. Brown (davebshow@gmail.com)'
+
+
+class TornadoTransport(AbstractBaseTransport):
+
+    def __init__(self):
+        self._loop = ioloop.IOLoop(make_current=False)
+
+    def connect(self, url):
+        self._ws = self._loop.run_sync(
+            lambda: websocket.websocket_connect(url))
+
+    def write(self, message):
+        self._loop.run_sync(
+            lambda: self._ws.write_message(message, binary=True))
+
+    def read(self):
+        return self._loop.run_sync(lambda: self._ws.read_message())
+
+    def close(self):
+        self._ws.close()
+        self._loop.close()
+
+    def closed(self):
+        return not self._ws.protocol

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/0e0d4666/gremlin-python/src/main/jython/gremlin_python/driver/transport.py
----------------------------------------------------------------------
diff --git a/gremlin-python/src/main/jython/gremlin_python/driver/transport.py b/gremlin-python/src/main/jython/gremlin_python/driver/transport.py
new file mode 100644
index 0000000..9181956
--- /dev/null
+++ b/gremlin-python/src/main/jython/gremlin_python/driver/transport.py
@@ -0,0 +1,46 @@
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+"""
+import abc
+import six
+
+__author__ = 'David M. Brown (davebshow@gmail.com)'
+
+
+@six.add_metaclass(abc.ABCMeta)
+class AbstractBaseTransport:
+
+    @abc.abstractmethod
+    def connect(self, url):
+        pass
+
+    @abc.abstractmethod
+    def write(self, message):
+        pass
+
+    @abc.abstractmethod
+    def read(self):
+        pass
+
+    @abc.abstractmethod
+    def close(self):
+        pass
+
+    @abc.abstractproperty
+    def closed(self):
+        pass

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/0e0d4666/gremlin-python/src/main/jython/gremlin_python/process/traversal.py
----------------------------------------------------------------------
diff --git a/gremlin-python/src/main/jython/gremlin_python/process/traversal.py b/gremlin-python/src/main/jython/gremlin_python/process/traversal.py
index bb227f3..6996b23 100644
--- a/gremlin-python/src/main/jython/gremlin_python/process/traversal.py
+++ b/gremlin-python/src/main/jython/gremlin_python/process/traversal.py
@@ -79,15 +79,16 @@ class Traversal(object):
             return tempList
     def promise(self, cb=None):
         self.traversal_strategies.apply_async_strategies(self)
-        future_traversers = self.traversers
-        future = type(future_traversers)()
+        future_traversal = self.remote_results
+        future = type(future_traversal)()
         def process(f):
             try:
-                traversers = f.result()
+                traversal = f.result()
             except Exception as e:
                 future.set_exception(e)
             else:
-                self.traversers = iter(traversers)
+                self.traversers = iter(traversal.traversers)
+                self.side_effects = traversal.side_effects
                 if cb:
                     try:
                         result = cb(self)
@@ -97,7 +98,7 @@ class Traversal(object):
                         future.set_result(result)
                 else:
                     future.set_result(self)
-        future_traversers.add_done_callback(process)
+        future_traversal.add_done_callback(process)
         return future
 
 

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/0e0d4666/gremlin-python/src/main/jython/setup.py
----------------------------------------------------------------------
diff --git a/gremlin-python/src/main/jython/setup.py b/gremlin-python/src/main/jython/setup.py
index 6878c34..8b30b9f 100644
--- a/gremlin-python/src/main/jython/setup.py
+++ b/gremlin-python/src/main/jython/setup.py
@@ -18,8 +18,9 @@ under the License.
 '''
 import codecs
 import os
+import sys
 import time
-from setuptools import setup, Command
+from setuptools import setup
 
 # Folder containing the setup.py
 root = os.path.dirname(os.path.abspath(__file__))
@@ -43,6 +44,15 @@ from gremlin_python import __version__
 
 version = __version__.version
 
+install_requires = [
+    'aenum==1.4.5',
+    'tornado==4.4.1',
+    'six==1.10.0'
+]
+
+if sys.version_info < (3,2):
+    install_requires += ['futures==3.0.5']
+
 setup(
     name='gremlinpython',
     version=version,
@@ -60,11 +70,7 @@ setup(
         'pytest',
         'mock'
     ],
-    install_requires=[
-        'aenum==1.4.5',
-        'tornado==4.4.1',
-        'six==1.10.0'
-    ],
+    install_requires=install_requires,
     classifiers=[
         "Intended Audience :: Developers",
         "License :: OSI Approved :: Apache Software License",

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/0e0d4666/gremlin-python/src/main/jython/tests/conftest.py
----------------------------------------------------------------------
diff --git a/gremlin-python/src/main/jython/tests/conftest.py b/gremlin-python/src/main/jython/tests/conftest.py
new file mode 100644
index 0000000..3ab64a8
--- /dev/null
+++ b/gremlin-python/src/main/jython/tests/conftest.py
@@ -0,0 +1,72 @@
+'''
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+'''
+import concurrent.futures
+import pytest
+
+from six.moves import queue
+
+from gremlin_python.driver.client import Client
+from gremlin_python.driver.connection import Connection
+from gremlin_python.driver.driver_remote_connection import (
+    DriverRemoteConnection)
+from gremlin_python.driver.protocol import GremlinServerWSProtocol
+from gremlin_python.driver.tornado.transport import TornadoTransport
+
+
+@pytest.fixture
+def connection(request):
+    try:
+        protocol = GremlinServerWSProtocol(
+            username='stephen', password='password')
+        executor = concurrent.futures.ThreadPoolExecutor(5)
+        pool = queue.Queue()
+        conn = Connection('ws://localhost:45940/gremlin', 'g', protocol,
+                          lambda: TornadoTransport(), executor, pool)
+    except:
+        pytest.skip('Gremlin Server is not running')
+    else:
+        def fin():
+            executor.shutdown()
+            conn.close()
+        request.addfinalizer(fin)
+        return conn
+
+@pytest.fixture
+def client(request):
+    try:
+        client = Client('ws://localhost:45940/gremlin', 'g')
+    except:
+        pytest.skip('Gremlin Server is not running')
+    else:
+        def fin():
+            client.close()
+        request.addfinalizer(fin)
+        return client
+
+@pytest.fixture
+def remote_connection(request):
+    try:
+        remote_conn = DriverRemoteConnection('ws://localhost:45940/gremlin', 'g')
+    except:
+        pytest.skip('Gremlin Server is not running')
+    else:
+        def fin():
+            remote_conn.close()
+        request.addfinalizer(fin)
+        return remote_conn

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/0e0d4666/gremlin-python/src/main/jython/tests/driver/test_client.py
----------------------------------------------------------------------
diff --git a/gremlin-python/src/main/jython/tests/driver/test_client.py b/gremlin-python/src/main/jython/tests/driver/test_client.py
new file mode 100644
index 0000000..6395d7b
--- /dev/null
+++ b/gremlin-python/src/main/jython/tests/driver/test_client.py
@@ -0,0 +1,98 @@
+'''
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+'''
+import pytest
+
+from gremlin_python.driver.client import Client
+from gremlin_python.driver.request import RequestMessage
+from gremlin_python.structure.graph import Graph
+
+__author__ = 'David M. Brown (davebshow@gmail.com)'
+
+
+def test_connection(connection):
+    g = Graph().traversal()
+    t = g.V()
+    message = RequestMessage('traversal', 'bytecode', {'gremlin': t.bytecode})
+    results_set = connection.write(message).result()
+    future = results_set.all()
+    results = future.result()
+    assert len(results) == 6
+    assert isinstance(results, list)
+
+def test_client(client):
+    g = Graph().traversal()
+    t = g.V()
+    message = RequestMessage('traversal', 'bytecode', {'gremlin': t.bytecode})
+    result_set = client.submit(message)
+    assert len(result_set.all().result()) == 6
+    client.close()
+
+def test_iterate_result_set(client):
+    g = Graph().traversal()
+    t = g.V()
+    message = RequestMessage('traversal', 'bytecode', {'gremlin': t.bytecode})
+    result_set = client.submit(message)
+    results = []
+    for result in result_set:
+        results += result
+    assert len(results) == 6
+    client.close()
+
+def test_client_async(client):
+    g = Graph().traversal()
+    t = g.V()
+    message = RequestMessage('traversal', 'bytecode', {'gremlin': t.bytecode})
+    future = client.submitAsync(message)
+    assert not future.done()
+    result_set = future.result()
+    assert len(result_set.all().result()) == 6
+    client.close()
+
+def test_connection_share(client):
+    # Overwrite fixture with pool_size=1 client
+    client = Client('ws://localhost:45940/gremlin', 'g', pool_size=1)
+    g = Graph().traversal()
+    t = g.V()
+    message = RequestMessage('traversal', 'bytecode', {'gremlin': t.bytecode})
+    future = client.submitAsync(message)
+    future2 = client.submitAsync(message)
+
+    result_set2 = future2.result()
+    assert len(result_set2.all().result()) == 6
+
+    # This future has to finish for the second to yield result - pool_size=1
+    assert future.done()
+    result_set = future.result()
+    assert len(result_set.all().result()) == 6
+    client.close()
+
+def test_multi_conn_pool(client):
+    g = Graph().traversal()
+    t = g.V()
+    message = RequestMessage('traversal', 'bytecode', {'gremlin': t.bytecode})
+    future = client.submitAsync(message)
+    future2 = client.submitAsync(message)
+
+    result_set2 = future2.result()
+    assert len(result_set2.all().result()) == 6
+
+    # with connection pool `future` may or may not be done here
+    result_set = future.result()
+    assert len(result_set.all().result()) == 6
+    client.close()

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/0e0d4666/gremlin-python/src/main/jython/tests/driver/test_driver_remote_connection.py
----------------------------------------------------------------------
diff --git a/gremlin-python/src/main/jython/tests/driver/test_driver_remote_connection.py b/gremlin-python/src/main/jython/tests/driver/test_driver_remote_connection.py
index 783cf7e..cd64e2b 100644
--- a/gremlin-python/src/main/jython/tests/driver/test_driver_remote_connection.py
+++ b/gremlin-python/src/main/jython/tests/driver/test_driver_remote_connection.py
@@ -16,19 +16,14 @@ KIND, either express or implied.  See the License for the
 specific language governing permissions and limitations
 under the License.
 '''
-
-__author__ = 'Marko A. Rodriguez (http://markorodriguez.com)'
-
-import unittest
-from unittest import TestCase
-
 import pytest
 
 from tornado import ioloop, gen
 
 from gremlin_python import statics
 from gremlin_python.statics import long
-from gremlin_python.driver.driver_remote_connection import DriverRemoteConnection
+from gremlin_python.driver.driver_remote_connection import (
+    DriverRemoteConnection)
 from gremlin_python.process.traversal import Traverser
 from gremlin_python.process.traversal import TraversalStrategy
 from gremlin_python.process.graph_traversal import __
@@ -36,16 +31,17 @@ from gremlin_python.structure.graph import Graph
 from gremlin_python.structure.graph import Vertex
 from gremlin_python.process.strategies import SubgraphStrategy
 
+__author__ = 'Marko A. Rodriguez (http://markorodriguez.com)'
+
 
-class TestDriverRemoteConnection(TestCase):
-    def test_traversals(self):
+class TestDriverRemoteConnection(object):
+    def test_traversals(self, remote_connection):
         statics.load_statics(globals())
-        connection = DriverRemoteConnection('ws://localhost:45940/gremlin', 'g')
-        assert "remoteconnection[ws://localhost:45940/gremlin,g]" == str(connection)
-        g = Graph().traversal().withRemote(connection)
+        assert "remoteconnection[ws://localhost:45940/gremlin,g]" == str(remote_connection)
+        g = Graph().traversal().withRemote(remote_connection)
 
         assert long(6) == g.V().count().toList()[0]
-        #
+        # #
         assert Vertex(1) == g.V(1).next()
         assert 1 == g.V(1).id().next()
         assert Traverser(Vertex(1)) == g.V(1).nextTraverser()
@@ -56,26 +52,24 @@ class TestDriverRemoteConnection(TestCase):
         assert 2 == len(results)
         assert "lop" in results
         assert "ripple" in results
-        #
+        # #
         assert 10 == g.V().repeat(both()).times(5)[0:10].count().next()
-        assert 1 == g.V().repeat(both()).times(5)[0].count().next()
+        assert 1 == g.V().repeat(both()).times(5)[0:1].count().next()
         assert 0 == g.V().repeat(both()).times(5)[0:0].count().next()
         assert 4 == g.V()[2:].count().next()
         assert 2 == g.V()[:2].count().next()
-        #
+        # #
         results = g.withSideEffect('a',['josh','peter']).V(1).out('created').in_('created').values('name').where(within('a')).toList()
         assert 2 == len(results)
         assert 'josh' in results
         assert 'peter' in results
-        # todo: need a traversal metrics deserializer
+        # # todo: need a traversal metrics deserializer
         g.V().out().profile().next()
-        connection.close()
 
-    def test_strategies(self):
+    def test_strategies(self, remote_connection):
         statics.load_statics(globals())
-        connection = DriverRemoteConnection('ws://localhost:45940/gremlin', 'g')
         #
-        g = Graph().traversal().withRemote(connection). \
+        g = Graph().traversal().withRemote(remote_connection). \
             withStrategies(TraversalStrategy("SubgraphStrategy",
                                              {"vertices": __.hasLabel("person"),
                                               "edges": __.hasLabel("created")}))
@@ -84,7 +78,7 @@ class TestDriverRemoteConnection(TestCase):
         assert 1 == g.V().label().dedup().count().next()
         assert "person" == g.V().label().dedup().next()
         #
-        g = Graph().traversal().withRemote(connection). \
+        g = Graph().traversal().withRemote(remote_connection). \
             withStrategies(SubgraphStrategy(vertices=__.hasLabel("person"), edges=__.hasLabel("created")))
         assert 4 == g.V().count().next()
         assert 0 == g.E().count().next()
@@ -98,24 +92,19 @@ class TestDriverRemoteConnection(TestCase):
         assert "person" == g.V().label().next()
         assert "marko" == g.V().name.next()
         #
-        g = Graph().traversal().withRemote(connection).withComputer()
+        g = Graph().traversal().withRemote(remote_connection).withComputer()
         assert 6 == g.V().count().next()
         assert 6 == g.E().count().next()
-        connection.close()
 
-    def test_side_effects(self):
+    def test_side_effects(self, remote_connection):
         statics.load_statics(globals())
-        connection = DriverRemoteConnection('ws://localhost:45940/gremlin', 'g')
         #
-        g = Graph().traversal().withRemote(connection)
+        g = Graph().traversal().withRemote(remote_connection)
         ###
         t = g.V().hasLabel("project").name.iterate()
         assert 0 == len(t.side_effects.keys())
-        try:
+        with pytest.raises(Exception):
             m = t.side_effects["m"]
-            raise Exception("Accessing a non-existent key should throw an error")
-        except KeyError:
-            pass
         ###
         t = g.V().out("created").groupCount("m").by("name")
         results = t.toSet()
@@ -131,7 +120,7 @@ class TestDriverRemoteConnection(TestCase):
         assert 1 == m["ripple"]
         assert isinstance(m["lop"], long)
         assert isinstance(m["ripple"], long)
-        ###
+        ##
         t = g.V().out("created").groupCount("m").by("name").name.aggregate("n")
         results = t.toSet()
         assert 2 == len(results)
@@ -154,16 +143,11 @@ class TestDriverRemoteConnection(TestCase):
         assert 32 == list(results)[0]
         assert 32 == t.side_effects['m']
         assert 1 == len(t.side_effects.keys())
-        try:
+        with pytest.raises(Exception):
             x = t.side_effects["x"]
-            raise Exception("Accessing a non-existent key should throw an error")
-        except KeyError:
-            pass
-        connection.close()
 
-    def test_side_effect_close(self):
-        connection = DriverRemoteConnection('ws://localhost:45940/gremlin', 'g')
-        g = Graph().traversal().withRemote(connection)
+    def test_side_effect_close(self, remote_connection):
+        g = Graph().traversal().withRemote(remote_connection)
         t = g.V().aggregate('a').aggregate('b')
         t.toList()
 
@@ -193,76 +177,30 @@ class TestDriverRemoteConnection(TestCase):
         # Try to get 'b' directly from server, should throw error
         with pytest.raises(Exception):
             t.side_effects.value_lambda('b')
-        connection.close()
 
-    def test_promise(self):
-        loop = ioloop.IOLoop.current()
-        connection = DriverRemoteConnection('ws://localhost:45940/gremlin', 'g')
-        g = Graph().traversal().withRemote(connection)
-
-        @gen.coroutine
-        def go():
-            future_traversal = g.V().promise(lambda x: x.toList())
-            assert not future_traversal.done()
-            resp = yield future_traversal
-            assert future_traversal.done()
-            assert len(resp) == 6
-            count = yield g.V().count().promise(lambda x: x.next())
-            assert count == 6
-
-        loop.run_sync(go)
-        connection.close()
-
-    def test_promise_side_effects(self):
-        loop = ioloop.IOLoop.current()
-        connection = DriverRemoteConnection('ws://localhost:45940/gremlin', 'g')
-        g = Graph().traversal().withRemote(connection)
-
-        # Side effects are problematic in coroutines.
-        # Because they are designed to be synchronous (calling `run_sync`)
-        # they result in an error if called from a coroutine because
-        # the event loop is already running
-        @gen.coroutine
-        def go():
-            traversal = yield g.V().aggregate('a').promise()
-            # Calling synchronous side effect methods from coroutine raises.
-            with pytest.raises(RuntimeError):
-                keys = traversal.side_effects.keys()
-
-            with pytest.raises(RuntimeError):
-                keys = traversal.side_effects.get('a')
-
-            with pytest.raises(RuntimeError):
-                keys = traversal.side_effects.close()
-
-        loop.run_sync(go)
-
-        # If we return the traversal though, we can use side effects per usual.
-        @gen.coroutine
-        def go():
-            traversal = yield g.V().aggregate('a').promise()
-            raise gen.Return(traversal)  # Weird legacy Python compatible idiom
-
-        # See, normal side effects.
-        traversal = loop.run_sync(go)
-        a, = traversal.side_effects.keys()
+    def test_promise(self, remote_connection):
+        g = Graph().traversal().withRemote(remote_connection)
+        future = g.V().aggregate('a').promise()
+        t = future.result()
+        assert len(t.toList()) == 6
+        a, = t.side_effects.keys()
         assert  a == 'a'
-        results = traversal.side_effects.get('a')
+        results = t.side_effects.get('a')
         assert results
-        results = traversal.side_effects.close()
+        results = t.side_effects.close()
         assert not results
 
-        connection.close()
-
 
-if __name__ == '__main__':
-    test = False
-    try:
-        connection = DriverRemoteConnection('ws://localhost:45940/gremlin', 'g')
-        test = True
-        connection.close()
-    except:
-        print("GremlinServer is not running and this test case will not execute: " + __file__)
+def test_in_tornado_app(remote_connection):
+    # Make sure nothing weird with loops
+    @gen.coroutine
+    def go():
+        conn = DriverRemoteConnection(
+            'ws://localhost:45940/gremlin', 'g', pool_size=4)
+        g = Graph().traversal().withRemote(conn)
+        yield gen.sleep(0)
+        assert len(g.V().toList()) == 6
+        conn.close()
 
-    if test:
-        unittest.main()
+    io_loop = ioloop.IOLoop.current()
+    io_loop.run_sync(go)

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/0e0d4666/gremlin-python/src/main/jython/tests/driver/test_driver_remote_connection_threaded.py
----------------------------------------------------------------------
diff --git a/gremlin-python/src/main/jython/tests/driver/test_driver_remote_connection_threaded.py b/gremlin-python/src/main/jython/tests/driver/test_driver_remote_connection_threaded.py
index 0c18651..d5efc0d 100644
--- a/gremlin-python/src/main/jython/tests/driver/test_driver_remote_connection_threaded.py
+++ b/gremlin-python/src/main/jython/tests/driver/test_driver_remote_connection_threaded.py
@@ -16,58 +16,53 @@ KIND, either express or implied.  See the License for the
 specific language governing permissions and limitations
 under the License.
 '''
-
-
-__author__ = 'David M. Brown (davebshow@gmail.com)'
-
-
 import sys
 from threading import Thread
 
 import pytest
-
 from six.moves import queue
-from tornado import ioloop
 
 from gremlin_python.driver.driver_remote_connection import (
     DriverRemoteConnection)
 from gremlin_python.structure.graph import Graph
 
-
-skip = False
-try:
-    connection = DriverRemoteConnection('ws://localhost:45940/gremlin', 'g')
-    connection.close()
-except:
-    skip = True
+__author__ = 'David M. Brown (davebshow@gmail.com)'
 
 
-@pytest.mark.skipif(skip, reason='Gremlin Server is not running')
-class TestDriverRemoteConnectionThreaded:
+def test_conns_in_threads(remote_connection):
+    q = queue.Queue()
+    child = Thread(target=_executor, args=(q, None))
+    child2 = Thread(target=_executor, args=(q, None))
+    child.start()
+    child2.start()
+    for x in range(2):
+        success = q.get()
+        assert success == 'success!'
+    child.join()
+    child2.join()
 
-    def test_threaded_client(self):
-        q = queue.Queue()
-        # Here if we give each thread its own loop there is no problem.
-        loop1 = ioloop.IOLoop()
-        loop2 = ioloop.IOLoop()
-        child = Thread(target=self._executor, args=(q, loop1))
-        child2 = Thread(target=self._executor, args=(q, loop2))
-        child.start()
-        child2.start()
-        for x in range(2):
-            success = q.get()
-            assert success == 'success!'
-        child.join()
-        child2.join()
+def test_conn_in_threads(remote_connection):
+    q = queue.Queue()
+    child = Thread(target=_executor, args=(q, remote_connection))
+    child2 = Thread(target=_executor, args=(q, remote_connection))
+    child.start()
+    child2.start()
+    for x in range(2):
+        success = q.get()
+        assert success == 'success!'
+    child.join()
+    child2.join()
 
-    def _executor(self, q, loop):
-        try:
-            connection = DriverRemoteConnection(
-                'ws://localhost:45940/gremlin', 'g', loop=loop)
-            g = Graph().traversal().withRemote(connection)
-            assert len(g.V().toList()) == 6
-        except:
-            q.put(sys.exc_info()[0])
-        else:
-            q.put('success!')
-            connection.close()
+def _executor(q, conn):
+    if not conn:
+        conn = DriverRemoteConnection(
+            'ws://localhost:45940/gremlin', 'g', pool_size=4)
+    try:
+        g = Graph().traversal().withRemote(conn)
+        future = g.V().promise()
+        t = future.result()
+        assert len(t.toList()) == 6
+    except:
+        q.put(sys.exc_info()[0])
+    else:
+        q.put('success!')

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/0e0d4666/gremlin-python/src/main/jython/tests/structure/io/test_graphson.py
----------------------------------------------------------------------
diff --git a/gremlin-python/src/main/jython/tests/structure/io/test_graphson.py b/gremlin-python/src/main/jython/tests/structure/io/test_graphson.py
index 2437cfd..7bca769 100644
--- a/gremlin-python/src/main/jython/tests/structure/io/test_graphson.py
+++ b/gremlin-python/src/main/jython/tests/structure/io/test_graphson.py
@@ -141,8 +141,25 @@ class TestGraphSONWriter(TestCase):
         assert """true""" == self.graphson_writer.writeObject(True)
 
     def test_P(self):
-        assert """{"@type":"g:P","@value":{"predicate":"and","value":[{"@type":"g:P","@value":{"predicate":"or","value":[{"@type":"g:P","@value":{"predicate":"lt","value":"b"}},{"@type":"g:P","@value":{"predicate":"gt","value":"c"}}]}},{"@type":"g:P","@value":{"predicate":"neq","value":"d"}}]}}""" == self.graphson_writer.writeObject(
-            P.lt("b").or_(P.gt("c")).and_(P.neq("d")))
+        result = {'@type': 'g:P',
+                  '@value': {
+                     'predicate': 'and',
+                     'value': [{
+                        '@type': 'g:P',
+                        '@value': {
+                            'predicate': 'or',
+                            'value': [{
+                                '@type': 'g:P',
+                                '@value': {'predicate': 'lt', 'value': 'b'}
+                            },
+                            {'@type': 'g:P', '@value': {'predicate': 'gt', 'value': 'c'}}
+                            ]
+                        }
+                    },
+                    {'@type': 'g:P', '@value': {'predicate': 'neq', 'value': 'd'}}]}}
+
+        assert  result == json.loads(
+            self.graphson_writer.writeObject(P.lt("b").or_(P.gt("c")).and_(P.neq("d"))))
 
     def test_strategies(self):
         # we have a proxy model for now given that we don't want to have to have g:XXX all registered on the Gremlin traversal machine (yet)


Mime
View raw message