Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 4BA2E200B6B for ; Thu, 25 Aug 2016 16:47:51 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 4A1AC160AA5; Thu, 25 Aug 2016 14:47:51 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 4041E160AA4 for ; Thu, 25 Aug 2016 16:47:50 +0200 (CEST) Received: (qmail 96823 invoked by uid 500); 25 Aug 2016 14:47:49 -0000 Mailing-List: contact commits-help@tinkerpop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@tinkerpop.apache.org Delivered-To: mailing list commits@tinkerpop.apache.org Received: (qmail 96813 invoked by uid 99); 25 Aug 2016 14:47:49 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 25 Aug 2016 14:47:49 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 3FEA0DFE2C; Thu, 25 Aug 2016 14:47:49 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: okram@apache.org To: commits@tinkerpop.apache.org Message-Id: <2372a755a63b4a8d8ac3635f25d86704@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: tinkerpop git commit: driver_remote_connection has been simplified again and authentification fixed and working. ta-to-the-da. Date: Thu, 25 Aug 2016 14:47:49 +0000 (UTC) archived-at: Thu, 25 Aug 2016 14:47:51 -0000 Repository: tinkerpop Updated Branches: refs/heads/TINKERPOP-1278 ba8bb6097 -> d299cbcd7 driver_remote_connection has been simplified again and authentification fixed and working. ta-to-the-da. Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/d299cbcd Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/d299cbcd Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/d299cbcd Branch: refs/heads/TINKERPOP-1278 Commit: d299cbcd73db12fae9bafaf9714c47fff51876b2 Parents: ba8bb60 Author: Marko A. Rodriguez Authored: Thu Aug 25 08:47:27 2016 -0600 Committer: Marko A. Rodriguez Committed: Thu Aug 25 08:47:44 2016 -0600 ---------------------------------------------------------------------- .../driver/driver_remote_connection.py | 199 +++++++++---------- 1 file changed, 95 insertions(+), 104 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/d299cbcd/gremlin-python/src/main/jython/gremlin_python/driver/driver_remote_connection.py ---------------------------------------------------------------------- diff --git a/gremlin-python/src/main/jython/gremlin_python/driver/driver_remote_connection.py b/gremlin-python/src/main/jython/gremlin_python/driver/driver_remote_connection.py index 50a9944..165eed6 100644 --- a/gremlin-python/src/main/jython/gremlin_python/driver/driver_remote_connection.py +++ b/gremlin-python/src/main/jython/gremlin_python/driver/driver_remote_connection.py @@ -16,6 +16,7 @@ KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. """ +import base64 import json import uuid from tornado import gen @@ -34,75 +35,26 @@ class GremlinServerError(Exception): class DriverRemoteConnection(RemoteConnection): - def __init__(self, url, traversal_source, loop=None, username="", password=""): + def __init__(self, url, traversal_source, username="", password="", loop=None): super(DriverRemoteConnection, self).__init__(url, traversal_source) - if loop is None: - self._loop = ioloop.IOLoop.current() - self._ws = self._loop.run_sync(lambda: websocket.websocket_connect(self.url)) self._username = username self._password = password - - def submit(self, - bytecode, - op="bytecode", - processor="traversal"): + if loop is None: self._loop = ioloop.IOLoop.current() + self._websocket = self._loop.run_sync(lambda: websocket.websocket_connect(self.url)) + + def submit(self, bytecode): + ''' + :param bytecode: the bytecode of a traversal to submit to the RemoteConnection + :return: a RemoteTraversal with RemoteTraversalSideEffects + ''' request_id = str(uuid.uuid4()) - traversers = self._loop.run_sync(lambda: self.submit_traversal_bytecode(bytecode, request_id)) - keys_lambda = lambda: self._loop.run_sync(lambda: self.submit_sideEffect_keys(request_id)) - value_lambda = lambda key: self._loop.run_sync(lambda: self.submit_sideEffect_value(request_id, key)) - return RemoteTraversal(iter(traversers), RemoteTraversalSideEffects(keys_lambda, value_lambda)) - - @gen.coroutine - def submit_traversal_bytecode(self, bytecode, request_id): - message = self._get_traversal_bytecode_message(bytecode, request_id) - traversers = yield self._execute_message(message) - raise gen.Return(traversers) - - @gen.coroutine - def submit_sideEffect_keys(self, request_id): - message = self._get_sideEffect_keys_message(request_id) - keys = yield self._execute_message(message) - raise gen.Return(set(keys)) + traversers = self._loop.run_sync(lambda: self.submit_traversal_bytecode(request_id, bytecode)) + side_effect_keys = lambda: self._loop.run_sync(lambda: self.submit_sideEffect_keys(request_id)) + side_effect_value = lambda key: self._loop.run_sync(lambda: self.submit_sideEffect_value(request_id, key)) + return RemoteTraversal(iter(traversers), RemoteTraversalSideEffects(side_effect_keys, side_effect_value)) @gen.coroutine - def submit_sideEffect_value(self, request_id, key): - message = self._get_sideEffect_value_message(request_id, key) - side_effects = yield self._execute_message(message) - raise gen.Return(side_effects) - - @gen.coroutine - def _execute_message(self, message): - if self._ws.protocol is None: - self._ws = yield websocket.websocket_connect(self.url) - self._ws.write_message(message, binary=True) - resp = Response(self._ws, self._username, self._password) - results = None - while True: - msg = yield resp.receive() - if msg is None: - break - # on first message, get the right result data structure - if None == results: - if "list" == msg[0]: - results = [] - elif "set" == msg[0]: - results = set() - elif "map" == msg[0]: - results = {} - else: - results = [] - # updating a map is different than a list or a set - if isinstance(results, dict): - for item in msg[1]: - results.update(item) - else: - results += msg[1] - raise gen.Return([] if None == results else results) - - def close(self): - self._ws.close() - - def _get_traversal_bytecode_message(self, bytecode, request_id): + def submit_traversal_bytecode(self, request_id, bytecode): message = { "requestId": { "@type": "g:UUID", @@ -115,9 +67,11 @@ class DriverRemoteConnection(RemoteConnection): "aliases": {"g": self.traversal_source} } } - return DriverRemoteConnection._finalize_message(message) + traversers = yield self._execute_message(message) + raise gen.Return(traversers) - def _get_sideEffect_keys_message(self, request_id): + @gen.coroutine + def submit_sideEffect_keys(self, request_id): message = { "requestId": { "@type": "g:UUID", @@ -132,9 +86,11 @@ class DriverRemoteConnection(RemoteConnection): } } } - return DriverRemoteConnection._finalize_message(message) + keys = yield self._execute_message(message) + raise gen.Return(set(keys)) - def _get_sideEffect_value_message(self, request_id, key): + @gen.coroutine + def submit_sideEffect_value(self, request_id, key): message = { "requestId": { "@type": "g:UUID", @@ -151,60 +107,95 @@ class DriverRemoteConnection(RemoteConnection): "aliases": {"g": self.traversal_source} } } - return DriverRemoteConnection._finalize_message(message) + value = yield self._execute_message(message) + raise gen.Return(value) + + @gen.coroutine + def _execute_message(self, send_message): + send_message = b"".join([b"\x21", + b"application/vnd.gremlin-v2.0+json", + json.dumps(send_message).encode("utf-8")]) + if self._websocket.protocol is None: + self._websocket = yield websocket.websocket_connect(self.url) + self._websocket.write_message(send_message, binary=True) + response = Response(self._websocket, self._username, self._password) + results = None + while True: + recv_message = yield response.receive() + if recv_message is None: + break + + # on first message, get the right result data structure + if None == results: + if "list" == recv_message[0]: + results = [] + elif "set" == recv_message[0]: + results = set() + elif "map" == recv_message[0]: + results = {} + elif "none" == recv_message[0]: + results = None + else: + results = [] + + # if there is no update to a structure, then the item is the result + if results is None: + results = recv_message[1][0] + # updating a map is different than a list or a set + elif isinstance(results, dict): + for item in recv_message[1]: + results.update(item) + # flat add list to result list + else: + results += recv_message[1] + raise gen.Return([] if None == results else results) - @staticmethod - def _finalize_message(message): - message = json.dumps(message) - mime_type = b"application/vnd.gremlin-v2.0+json" - mime_len = b"\x21" - return b"".join([mime_len, mime_type, message.encode("utf-8")]) + def close(self): + self._websocket.close() class Response: - def __init__(self, ws, username, password): - self._ws = ws - self._closed = False + def __init__(self, websocket, username, password): + self._websocket = websocket self._username = username self._password = password - - def _authenticate(self, username, password): - auth = b"".join([b"\x00", username.encode("utf-8"), - b"\x00", password.encode("utf-8")]) - message = { - "requestId": { - "@type": "g:UUID", - "@value": str(uuid.uuid4()) - }, - "op": "authentication", - "processor": "traversal", - "args": { - "sasl": base64.b64encode(auth).decode() - } - } - self._ws.send_message(DriverRemoteConnection._finalize_message(message), binary=True) + self._closed = False @gen.coroutine def receive(self): if self._closed: return - data = yield self._ws.read_message() - message = json.loads(data) - status_code = message["status"]["code"] - data = message["result"]["data"] - msg = message["status"]["message"] - meta = message["result"]["meta"] - aggregateTo = "list" if "aggregateTo" not in meta else meta["aggregateTo"] + recv_message = yield self._websocket.read_message() + recv_message = json.loads(recv_message) + status_code = recv_message["status"]["code"] + aggregateTo = recv_message["result"]["meta"].get("aggregateTo", "list") + # authentification required then if status_code == 407: - self._authenticate(self._username, self._password) - yield self.receive() + self._websocket.write_message( + b"".join([b"\x21", + b"application/vnd.gremlin-v2.0+json", + json.dumps({ + "requestId": { + "@type": "g:UUID", + "@value": str(uuid.uuid4()) + }, + "op": "authentication", + "processor": "traversal", + "args": { + "sasl": base64.b64encode( + b"".join([b"\x00", self._username.encode("utf-8"), + b"\x00", self._password.encode("utf-8")])).decode() + } + }).encode("utf-8")]), binary=True) + results = yield self.receive() + raise gen.Return(results) elif status_code == 204: self._closed = True return elif status_code in [200, 206]: results = [] - for item in data: + for item in recv_message["result"]["data"]: results.append(GraphSONReader._objectify(item)) if status_code == 200: self._closed = True @@ -212,4 +203,4 @@ class Response: else: self._closed = True raise GremlinServerError( - "{0}: {1}".format(status_code, msg)) + "{0}: {1}".format(status_code, recv_message["status"]["message"]))