aurora-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From wick...@apache.org
Subject git commit: Implement a TRequestsClient as a prelude to kerberization.
Date Mon, 09 Jun 2014 23:54:14 GMT
Repository: incubator-aurora
Updated Branches:
  refs/heads/master 0cdeba5f2 -> 5665d4ce8


Implement a TRequestsClient as a prelude to kerberization.

This is the first bit of AURORA-515.  To add kerberos support, we just need
to add a dependency on requests_kerberos and inject KerberosAuth() as the
'auth=' parameter to TRequestsClient with the proper service principal
specified.

Testing Done:
Updated client/api tests for the new transport.

Reviewed at https://reviews.apache.org/r/22280/


Project: http://git-wip-us.apache.org/repos/asf/incubator-aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-aurora/commit/5665d4ce
Tree: http://git-wip-us.apache.org/repos/asf/incubator-aurora/tree/5665d4ce
Diff: http://git-wip-us.apache.org/repos/asf/incubator-aurora/diff/5665d4ce

Branch: refs/heads/master
Commit: 5665d4ce80a42ac9a971a6f641de5109acabc967
Parents: 0cdeba5
Author: Brian Wickman <wickman@apache.org>
Authored: Mon Jun 9 16:54:06 2014 -0700
Committer: Brian Wickman <wickman@apache.org>
Committed: Mon Jun 9 16:54:06 2014 -0700

----------------------------------------------------------------------
 3rdparty/python/BUILD                           |   2 +-
 src/main/python/apache/aurora/client/api/BUILD  |   1 +
 .../aurora/client/api/scheduler_client.py       |  57 +++---
 src/main/python/apache/aurora/common/BUILD      |   9 +
 .../python/apache/aurora/common/transport.py    | 110 ++++++++++++
 .../aurora/client/api/test_scheduler_client.py  | 177 +++++++------------
 src/test/python/apache/aurora/common/BUILD      |  11 ++
 .../apache/aurora/common/test_transport.py      |  84 +++++++++
 8 files changed, 310 insertions(+), 141 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/5665d4ce/3rdparty/python/BUILD
----------------------------------------------------------------------
diff --git a/3rdparty/python/BUILD b/3rdparty/python/BUILD
index f2e94d2..c9f63cf 100644
--- a/3rdparty/python/BUILD
+++ b/3rdparty/python/BUILD
@@ -31,7 +31,7 @@ python_requirement('mox==0.5.3')
 python_requirement('psutil==1.1.2')
 python_requirement('pystachio==0.7.2')
 python_requirement('pyyaml==3.10')
-python_requirement('requests==2.0.0')
+python_requirement('requests==2.3.0')
 python_requirement('thrift==0.9.1')
 python_requirement('twitter.common.app==%s' % COMMONS_VERSION)
 python_requirement('twitter.common.collections==%s' % COMMONS_VERSION)

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/5665d4ce/src/main/python/apache/aurora/client/api/BUILD
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/client/api/BUILD b/src/main/python/apache/aurora/client/api/BUILD
index 6968d62..c205a7d 100644
--- a/src/main/python/apache/aurora/client/api/BUILD
+++ b/src/main/python/apache/aurora/client/api/BUILD
@@ -82,6 +82,7 @@ python_library(
     pants('3rdparty/python:twitter.common.zookeeper'),
     pants('src/main/python/apache/aurora/common/auth'),
     pants('src/main/python/apache/aurora/common:cluster'),
+    pants('src/main/python/apache/aurora/common:transport'),
     pants('src/main/thrift/org/apache/aurora/gen:py-thrift'),
   ]
 )

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/5665d4ce/src/main/python/apache/aurora/client/api/scheduler_client.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/client/api/scheduler_client.py b/src/main/python/apache/aurora/client/api/scheduler_client.py
index 7c5fba6..10a956c 100644
--- a/src/main/python/apache/aurora/client/api/scheduler_client.py
+++ b/src/main/python/apache/aurora/client/api/scheduler_client.py
@@ -27,10 +27,16 @@ from twitter.common.zookeeper.serverset import ServerSet
 
 from apache.aurora.common.auth import make_session_key, SessionKeyError
 from apache.aurora.common.cluster import Cluster
+from apache.aurora.common.transport import TRequestsTransport
 
 from gen.apache.aurora.api import AuroraAdmin
 from gen.apache.aurora.api.constants import CURRENT_API_VERSION
 
+try:
+  from urlparse import urljoin
+except ImportError:
+  from urllib.parse import urljoin
+
 
 class SchedulerClientTrait(Cluster.Trait):
   zk                = String  # noqa
@@ -59,12 +65,7 @@ class SchedulerClient(object):
     if cluster.zk:
       return ZookeeperSchedulerClient(cluster, port=cluster.zk_port, **kwargs)
     elif cluster.scheduler_uri:
-      try:
-        host, port = cluster.scheduler_uri.split(':', 2)
-        port = int(port)
-      except ValueError:
-        raise ValueError('Malformed Cluster scheduler_uri: %s' % cluster.scheduler_uri)
-      return DirectSchedulerClient(host, port)
+      return DirectSchedulerClient(cluster.scheduler_uri)
     else:
       raise ValueError('"cluster" does not specify zk or scheduler_uri')
 
@@ -83,8 +84,8 @@ class SchedulerClient(object):
     return None
 
   @classmethod
-  def _connect_scheduler(cls, host, port, clock=time):
-    transport = THttpClient.THttpClient('http://%s:%s/api' % (host, port))
+  def _connect_scheduler(cls, uri, clock=time):
+    transport = TRequestsTransport(uri)
     protocol = TJSONProtocol.TJSONProtocol(transport)
     schedulerClient = AuroraAdmin.Client(protocol)
     for _ in range(cls.THRIFT_RETRIES):
@@ -98,7 +99,7 @@ class SchedulerClient(object):
         # Monkey-patched proxies, like socks, can generate a proxy error here.
         # without adding a dependency, we can't catch those in a more specific way.
         raise cls.CouldNotConnect('Connection to scheduler failed: %s' % e)
-    raise cls.CouldNotConnect('Could not connect to %s:%s' % (host, port))
+    raise cls.CouldNotConnect('Could not connect to %s' % uri)
 
 
 class ZookeeperSchedulerClient(SchedulerClient):
@@ -118,9 +119,11 @@ class ZookeeperSchedulerClient(SchedulerClient):
     SchedulerClient.__init__(self, verbose=verbose)
     self._cluster = cluster
     self._zkport = port
-    self._http = None
+    self._endpoint = None
+    self._uri = None
 
-  def _connect(self):
+  def _resolve(self):
+    """Resolve the uri associated with this scheduler from zookeeper."""
     joined = threading.Event()
     def on_join(elements):
       joined.set()
@@ -131,34 +134,42 @@ class ZookeeperSchedulerClient(SchedulerClient):
     if len(serverset_endpoints) == 0:
       raise self.CouldNotConnect('No schedulers detected in %s!' % self._cluster.name)
     instance = serverset_endpoints[0]
-    self._http = instance.additional_endpoints.get('http')
+    if 'https' in instance.additional_endpoints:
+      endpoint = instance.additional_endpoints['https']
+      self._uri = 'https://%s:%s' % (endpoint.host, endpoint.port)
+    elif 'http' in instance.additional_endpoints:
+      endpoint = instance.additional_endpoints['http']
+      self._uri = 'http://%s:%s' % (endpoint.host, endpoint.port)
     zk.stop()
-    return self._connect_scheduler(self._http.host, self._http.port)
+
+  def _connect(self):
+    if self._uri is None:
+      self._resolve()
+    if self._uri is not None:
+      return self._connect_scheduler(urljoin(self._uri, 'api'))
 
   @property
   def url(self):
     proxy_url = self._cluster.proxy_url
     if proxy_url:
       return proxy_url
-    if self._http is None:
-      self._connect()
-    if self._http:
-      return 'http://%s:%s' % (self._http.host, self._http.port)
+    if self._uri is None:
+      self._resolve()
+    if self._uri:
+      return self._uri
 
 
 class DirectSchedulerClient(SchedulerClient):
-  def __init__(self, host, port):
+  def __init__(self, uri):
     SchedulerClient.__init__(self, verbose=True)
-    self._host = host
-    self._port = port
+    self._uri = uri
 
   def _connect(self):
-    return self._connect_scheduler(self._host, self._port)
+    return self._connect_scheduler(urljoin(self._uri, 'api'))
 
   @property
   def url(self):
-    # TODO(wickman) This is broken -- make this tunable in MESOS-3005
-    return 'http://%s:8081' % self._host
+    return self._uri
 
 
 class SchedulerProxy(object):

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/5665d4ce/src/main/python/apache/aurora/common/BUILD
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/common/BUILD b/src/main/python/apache/aurora/common/BUILD
index 0de0cf7..b2ff1ff 100644
--- a/src/main/python/apache/aurora/common/BUILD
+++ b/src/main/python/apache/aurora/common/BUILD
@@ -67,6 +67,15 @@ python_library(
 )
 
 python_library(
+  name = 'transport',
+  sources = ['transport.py'],
+  dependencies = [
+    pants('3rdparty/python:requests'),
+    pants('3rdparty/python:thrift'),
+  ],
+)
+
+python_library(
   name = 'common',
   dependencies = [
     pants(':aurora_job_key'),

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/5665d4ce/src/main/python/apache/aurora/common/transport.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/common/transport.py b/src/main/python/apache/aurora/common/transport.py
new file mode 100644
index 0000000..6f7c355
--- /dev/null
+++ b/src/main/python/apache/aurora/common/transport.py
@@ -0,0 +1,110 @@
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from io import BytesIO
+
+import requests
+from requests import exceptions as request_exceptions
+from thrift.transport.TTransport import TTransportBase, TTransportException
+
+try:
+  from urlparse import urlparse
+except ImportError:
+  from urllib.parse import urlparse
+
+
+def default_requests_session_factory():
+  session = requests.session()
+  session.headers['User-Agent'] = 'Python TRequestsTransport v1.0'
+  return session
+
+
+class TRequestsTransport(TTransportBase):
+  """A Thrift HTTP client based upon the requests module."""
+
+  def __init__(self, uri, auth=None, session_factory=default_requests_session_factory):
+    """Construct a TRequestsTransport.
+
+    Construct a Thrift transport based upon the requests module.  URI is the
+    HTTP endpoint that the server should be listening on.  The 'auth'
+    keyword is passed directly to the requests client and can be used to
+    provide different authentication contexts such as Kerberos
+    authentication via the requests-kerberos module.
+
+    :param uri: The endpoint uri
+    :type uri: str
+    :keyword auth: The requests authentication context.
+    """
+    self.__session = None
+    self.__session_factory = session_factory
+    if not callable(session_factory):
+      raise TypeError('session_factory should be a callable that produces a requests.Session!')
+    self.__wbuf = BytesIO()
+    self.__rbuf = BytesIO()
+    self.__uri = uri
+    try:
+      self.__urlparse = urlparse(uri)
+    except ValueError:
+      raise TTransportException('Failed to parse uri %r' % (uri,))
+    self.__timeout = None
+    self.__auth = auth
+
+  def isOpen(self):
+    return self.__session is not None
+
+  def open(self):
+    self.__session = self.__session_factory()
+
+  def close(self):
+    session, self.__session = self.__session, None
+    session.close()
+
+  def setTimeout(self, ms):
+    self.__timeout = ms / 1000.0
+
+  def read(self, size):
+    return self.__rbuf.read(size)
+
+  def write(self, buf):
+    self.__wbuf.write(buf)
+
+  def flush(self):
+    if self.isOpen():
+      self.close()
+
+    self.open()
+
+    data = self.__wbuf.getvalue()
+    self.__wbuf = BytesIO()
+
+    self.__session.headers['Content-Type'] = 'application/x-thrift'
+    self.__session.headers['Content-Length'] = str(len(data))
+    self.__session.headers['Host'] = self.__urlparse.hostname
+
+    try:
+      response = self.__session.post(
+          self.__uri,
+          data=data,
+          timeout=self.__timeout,
+          auth=self.__auth)
+    except request_exceptions.Timeout:
+      raise TTransportException(
+          type=TTransportException.TIMED_OUT,
+          message='Timed out talking to %s' % self.__uri)
+    except request_exceptions.RequestException as e:
+      raise TTransportException(
+          type=TTransportException.UNKNOWN,
+          message='Unknown error talking to %s: %s' % (self.__uri, e))
+
+    self.__rbuf = BytesIO(response.content)

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/5665d4ce/src/test/python/apache/aurora/client/api/test_scheduler_client.py
----------------------------------------------------------------------
diff --git a/src/test/python/apache/aurora/client/api/test_scheduler_client.py b/src/test/python/apache/aurora/client/api/test_scheduler_client.py
index 6b23a4a..dd16fe2 100644
--- a/src/test/python/apache/aurora/client/api/test_scheduler_client.py
+++ b/src/test/python/apache/aurora/client/api/test_scheduler_client.py
@@ -17,6 +17,7 @@ import time
 import unittest
 
 import mock
+import pytest
 from mox import IgnoreArg, IsA, Mox
 from thrift.transport import THttpClient, TTransport
 from twitter.common.quantity import Amount, Time
@@ -25,6 +26,7 @@ from twitter.common.zookeeper.serverset.endpoint import ServiceInstance
 
 import apache.aurora.client.api.scheduler_client as scheduler_client
 from apache.aurora.common.cluster import Cluster
+from apache.aurora.common.transport import TRequestsTransport
 
 import gen.apache.aurora.api.AuroraAdmin as AuroraAdmin
 import gen.apache.aurora.api.AuroraSchedulerManager as AuroraSchedulerManager
@@ -98,23 +100,17 @@ class TestSchedulerProxyInjection(unittest.TestCase):
 
   def test_startCronJob(self):
     self.mock_thrift_client.startCronJob(IsA(JobKey), IsA(SessionKey))
-
     self.mox.ReplayAll()
-
     self.make_scheduler_proxy().startCronJob(JOB_KEY)
 
   def test_createJob(self):
     self.mock_thrift_client.createJob(IsA(JobConfiguration), IsA(SessionKey))
-
     self.mox.ReplayAll()
-
     self.make_scheduler_proxy().createJob(JobConfiguration())
 
   def test_replaceCronTemplate(self):
     self.mock_thrift_client.replaceCronTemplate(IsA(JobConfiguration), IsA(Lock), IsA(SessionKey))
-
     self.mox.ReplayAll()
-
     self.make_scheduler_proxy().replaceCronTemplate(JobConfiguration(), Lock())
 
   def test_scheduleCronJob(self):
@@ -129,240 +125,187 @@ class TestSchedulerProxyInjection(unittest.TestCase):
 
   def test_populateJobConfig(self):
     self.mock_thrift_client.populateJobConfig(IsA(JobConfiguration))
-
     self.mox.ReplayAll()
-
     self.make_scheduler_proxy().populateJobConfig(JobConfiguration())
 
   def test_restartShards(self):
     self.mock_thrift_client.restartShards(IsA(JobKey), IgnoreArg(), IsA(SessionKey))
-
     self.mox.ReplayAll()
-
     self.make_scheduler_proxy().restartShards(JOB_KEY, set([0]))
 
   def test_getTasksStatus(self):
     self.mock_thrift_client.getTasksStatus(IsA(TaskQuery))
-
     self.mox.ReplayAll()
-
     self.make_scheduler_proxy().getTasksStatus(TaskQuery())
 
   def test_getJobs(self):
     self.mock_thrift_client.getJobs(IgnoreArg())
-
     self.mox.ReplayAll()
-
     self.make_scheduler_proxy().getJobs(ROLE)
 
   def test_killTasks(self):
     self.mock_thrift_client.killTasks(IsA(TaskQuery), IsA(SessionKey))
-
     self.mox.ReplayAll()
-
     self.make_scheduler_proxy().killTasks(TaskQuery())
 
   def test_getQuota(self):
     self.mock_thrift_client.getQuota(IgnoreArg())
-
     self.mox.ReplayAll()
-
     self.make_scheduler_proxy().getQuota(ROLE)
 
   def test_startMaintenance(self):
     self.mock_thrift_client.startMaintenance(IsA(Hosts), IsA(SessionKey))
-
     self.mox.ReplayAll()
-
     self.make_scheduler_proxy().startMaintenance(Hosts())
 
   def test_drainHosts(self):
     self.mock_thrift_client.drainHosts(IsA(Hosts), IsA(SessionKey))
-
     self.mox.ReplayAll()
-
     self.make_scheduler_proxy().drainHosts(Hosts())
 
   def test_maintenanceStatus(self):
     self.mock_thrift_client.maintenanceStatus(IsA(Hosts), IsA(SessionKey))
-
     self.mox.ReplayAll()
-
     self.make_scheduler_proxy().maintenanceStatus(Hosts())
 
   def test_endMaintenance(self):
     self.mock_thrift_client.endMaintenance(IsA(Hosts), IsA(SessionKey))
-
     self.mox.ReplayAll()
-
     self.make_scheduler_proxy().endMaintenance(Hosts())
 
   def test_getVersion(self):
     self.mock_thrift_client.getVersion()
-
     self.mox.ReplayAll()
-
     self.make_scheduler_proxy().getVersion()
 
   def test_addInstances(self):
     self.mock_thrift_client.addInstances(IsA(JobKey), IgnoreArg(), IsA(Lock), IsA(SessionKey))
-
     self.mox.ReplayAll()
-
     self.make_scheduler_proxy().addInstances(JobKey(), {}, Lock())
 
   def test_acquireLock(self):
     self.mock_thrift_client.acquireLock(IsA(Lock), IsA(SessionKey))
-
     self.mox.ReplayAll()
-
     self.make_scheduler_proxy().acquireLock(Lock())
 
   def test_releaseLock(self):
     self.mock_thrift_client.releaseLock(IsA(Lock), IsA(LockValidation), IsA(SessionKey))
-
     self.mox.ReplayAll()
-
     self.make_scheduler_proxy().releaseLock(Lock(), LockValidation())
 
 
 class TestSchedulerProxyAdminInjection(TestSchedulerProxyInjection):
   def test_setQuota(self):
     self.mock_thrift_client.setQuota(IgnoreArg(), IsA(ResourceAggregate), IsA(SessionKey))
-
     self.mox.ReplayAll()
-
     self.make_scheduler_proxy().setQuota(ROLE, ResourceAggregate())
 
   def test_forceTaskState(self):
     self.mock_thrift_client.forceTaskState(IgnoreArg(), IgnoreArg(), IsA(SessionKey))
-
     self.mox.ReplayAll()
-
     self.make_scheduler_proxy().forceTaskState('taskid', ScheduleStatus.LOST)
 
   def test_performBackup(self):
     self.mock_thrift_client.performBackup(IsA(SessionKey))
-
     self.mox.ReplayAll()
-
     self.make_scheduler_proxy().performBackup()
 
   def test_listBackups(self):
     self.mock_thrift_client.listBackups(IsA(SessionKey))
-
     self.mox.ReplayAll()
-
     self.make_scheduler_proxy().listBackups()
 
   def test_stageRecovery(self):
     self.mock_thrift_client.stageRecovery(IsA(TaskQuery), IsA(SessionKey))
-
     self.mox.ReplayAll()
-
     self.make_scheduler_proxy().stageRecovery(TaskQuery())
 
   def test_queryRecovery(self):
     self.mock_thrift_client.queryRecovery(IsA(TaskQuery), IsA(SessionKey))
-
     self.mox.ReplayAll()
-
     self.make_scheduler_proxy().queryRecovery(TaskQuery())
 
   def test_deleteRecoveryTasks(self):
     self.mock_thrift_client.deleteRecoveryTasks(IsA(TaskQuery), IsA(SessionKey))
-
     self.mox.ReplayAll()
-
     self.make_scheduler_proxy().deleteRecoveryTasks(TaskQuery())
 
   def test_commitRecovery(self):
     self.mock_thrift_client.commitRecovery(IsA(SessionKey))
-
     self.mox.ReplayAll()
-
     self.make_scheduler_proxy().commitRecovery()
 
   def test_unloadRecovery(self):
     self.mock_thrift_client.unloadRecovery(IsA(SessionKey))
-
     self.mox.ReplayAll()
-
     self.make_scheduler_proxy().unloadRecovery()
 
   def test_snapshot(self):
     self.mock_thrift_client.snapshot(IsA(SessionKey))
-
     self.mox.ReplayAll()
-
     self.make_scheduler_proxy().snapshot()
 
   def test_rewriteConfigs(self):
     self.mock_thrift_client.rewriteConfigs(IsA(RewriteConfigsRequest), IsA(SessionKey))
-
     self.mox.ReplayAll()
-
     self.make_scheduler_proxy().rewriteConfigs(RewriteConfigsRequest())
 
 
-class TestZookeeperSchedulerClient(unittest.TestCase):
-  def setUp(self):
-    self.mox = Mox()
-
-  def tearDown(self):
-    self.mox.UnsetStubs()
-    self.mox.VerifyAll()
-
-  def test_url_when_not_connected_and_cluster_has_no_proxy_url(self):
-    host = 'some-host.example.com'
-    port = 31181
-
-    mock_zk = self.mox.CreateMock(TwitterKazooClient)
-
-    def mock_get_serverset(*args, **kwargs):
-      service_json = '''{
-        "additionalEndpoints": {
-            "http": {
-                "host": "%s",
-                "port": %d
-            }
-        },
-        "serviceEndpoint": {
-            "host": "%s",
-            "port": %d
-        },
-        "shard": 0,
-        "status": "ALIVE"
-    }''' % (host, port, host, port)
-
-      return mock_zk, [ServiceInstance.unpack(service_json)]
-
-    class ZookeeperSchedulerClientTestImpl(scheduler_client.ZookeeperSchedulerClient):
-      SERVERSET_TIMEOUT = Amount(10, Time.MILLISECONDS)
-
-    original_method = ZookeeperSchedulerClientTestImpl.get_scheduler_serverset
-
-    try:
-      ZookeeperSchedulerClientTestImpl.get_scheduler_serverset = mock_get_serverset
-
-      zk_scheduler_client = ZookeeperSchedulerClientTestImpl(Cluster(proxy_url=None))
-      self.mox.StubOutWithMock(zk_scheduler_client, '_connect_scheduler')
-      mock_zk.stop()
-      zk_scheduler_client._connect_scheduler(host, port)
-
-      self.mox.ReplayAll()
-
-      assert zk_scheduler_client.url == 'http://%s:%d' % (host, port)
-    finally:
-      ZookeeperSchedulerClientTestImpl.get_scheduler_serverset = original_method
-
-
-class TestSchedulerClient(unittest.TestCase):
-  @mock.patch('thrift.transport.THttpClient.THttpClient', spec=THttpClient.THttpClient)
-  def test_connect_scheduler(self, MockTHttpClient):
-    MockTHttpClient.return_value.open.side_effect = [TTransport.TTransportException, True]
-    mock_time = mock.Mock(spec=time)
-    scheduler_client.SchedulerClient._connect_scheduler('scheduler.example.com', 1337, mock_time)
-    assert MockTHttpClient.return_value.open.call_count is 2
-    mock_time.sleep.assert_called_once_with(
-        scheduler_client.SchedulerClient.RETRY_TIMEOUT.as_(Time.SECONDS))
+@pytest.mark.parametrize('scheme', ('http', 'https'))
+def test_url_when_not_connected_and_cluster_has_no_proxy_url(scheme):
+  host = 'some-host.example.com'
+  port = 31181
+
+  mock_zk = mock.MagicMock(spec=TwitterKazooClient)
+
+  service_json = '''{
+    "additionalEndpoints": {
+        "%(scheme)s": {
+            "host": "%(host)s",
+            "port": %(port)d
+        }
+    },
+    "serviceEndpoint": {
+        "host": "%(host)s",
+        "port": %(port)d
+    },
+    "shard": 0,
+    "status": "ALIVE"
+  }''' % dict(host=host, port=port, scheme=scheme)
+
+  service_endpoints = [ServiceInstance.unpack(service_json)]
+
+  def make_mock_client(proxy_url):
+    client = scheduler_client.ZookeeperSchedulerClient(Cluster(proxy_url=proxy_url))
+    client.get_scheduler_serverset = mock.MagicMock(return_value=(mock_zk, service_endpoints))
+    client.SERVERSET_TIMEOUT = Amount(0, Time.SECONDS)
+    client._connect_scheduler = mock.MagicMock()
+    return client
+
+  client = make_mock_client(proxy_url=None)
+  assert client.url == '%s://%s:%d' % (scheme, host, port)
+  client._connect_scheduler.assert_has_calls([])
+
+  client = make_mock_client(proxy_url='https://scheduler.proxy')
+  assert client.url == 'https://scheduler.proxy'
+  client._connect_scheduler.assert_has_calls([])
+
+  client = make_mock_client(proxy_url=None)
+  client.get_thrift_client()
+  assert client.url == '%s://%s:%d' % (scheme, host, port)
+  client._connect_scheduler.assert_has_calls([mock.call('%s://%s:%d/api' % (scheme, host,
port))])
+  client._connect_scheduler.reset_mock()
+  client.get_thrift_client()
+  client._connect_scheduler.assert_has_calls([])
+
+
+@mock.patch('apache.aurora.client.api.scheduler_client.TRequestsTransport', spec=TRequestsTransport)
+def test_connect_scheduler(mock_client):
+  mock_client.return_value.open.side_effect = [TTransport.TTransportException, True]
+  mock_time = mock.Mock(spec=time)
+  scheduler_client.SchedulerClient._connect_scheduler(
+      'https://scheduler.example.com:1337',
+      mock_time)
+  assert mock_client.return_value.open.call_count == 2
+  mock_time.sleep.assert_called_once_with(
+      scheduler_client.SchedulerClient.RETRY_TIMEOUT.as_(Time.SECONDS))

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/5665d4ce/src/test/python/apache/aurora/common/BUILD
----------------------------------------------------------------------
diff --git a/src/test/python/apache/aurora/common/BUILD b/src/test/python/apache/aurora/common/BUILD
index 4fc7788..e949ba8 100644
--- a/src/test/python/apache/aurora/common/BUILD
+++ b/src/test/python/apache/aurora/common/BUILD
@@ -21,6 +21,7 @@ python_test_suite(
     pants(':test_cluster_option'),
     pants(':test_http_signaler'),
     pants(':test_shellify'),
+    pants(':test_transport'),
   ]
 )
 
@@ -76,3 +77,13 @@ python_tests(
     pants('src/main/python/apache/aurora/common:shellify'),
   ]
 )
+
+python_tests(
+  name = 'test_transport',
+  sources = ['test_transport.py'],
+  dependencies = [
+    pants('src/main/thrift/org/apache/aurora/gen:py-thrift'),
+    pants('src/main/python/apache/aurora/common:transport'),
+    pants('3rdparty/python:mock'),
+  ]
+)

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/5665d4ce/src/test/python/apache/aurora/common/test_transport.py
----------------------------------------------------------------------
diff --git a/src/test/python/apache/aurora/common/test_transport.py b/src/test/python/apache/aurora/common/test_transport.py
new file mode 100644
index 0000000..c9a4141
--- /dev/null
+++ b/src/test/python/apache/aurora/common/test_transport.py
@@ -0,0 +1,84 @@
+from threading import Thread
+
+import mock
+import requests
+from requests import exceptions as request_exceptions
+from thrift.protocol import TJSONProtocol
+from thrift.server import THttpServer
+from thrift.transport import TTransport
+
+from apache.aurora.common.transport import TRequestsTransport
+
+from gen.apache.aurora.api import ReadOnlyScheduler
+from gen.apache.aurora.api.ttypes import Response, ResponseCode, ServerInfo
+
+
+class ReadOnlySchedulerHandler(object):
+  def getRoleSummary(self):  # noqa
+    server_info = ServerInfo(clusterName='west', thriftAPIVersion=3)
+    return Response(responseCode=ResponseCode.OK, serverInfo=server_info)
+
+
+def test_request_transport_integration():
+  handler = ReadOnlySchedulerHandler()
+  processor = ReadOnlyScheduler.Processor(handler)
+  pfactory = TJSONProtocol.TJSONProtocolFactory()
+  server = THttpServer.THttpServer(processor, ('localhost', 0), pfactory)
+  server_thread = Thread(target=server.serve)
+  server_thread.start()
+  _, server_port = server.httpd.socket.getsockname()
+
+  response = None
+
+  try:
+    transport = TRequestsTransport('http://localhost:%d' % server_port)
+    protocol = TJSONProtocol.TJSONProtocol(transport)
+    client = ReadOnlyScheduler.Client(protocol)
+    response = client.getRoleSummary()
+  finally:
+    server.httpd.shutdown()
+
+  assert response is not None
+  assert response.responseCode == ResponseCode.OK
+  assert response.serverInfo.clusterName == 'west'
+  assert response.serverInfo.thriftAPIVersion == 3
+
+  transport.close()
+
+
+def test_request_transport_timeout():
+  session = mock.MagicMock(spec=requests.Session)
+  session.headers = {}
+  session.post = mock.Mock(side_effect=request_exceptions.Timeout())
+  transport = TRequestsTransport('http://localhost:12345', session_factory=lambda: session)
+  protocol = TJSONProtocol.TJSONProtocol(transport)
+  client = ReadOnlyScheduler.Client(protocol)
+
+  try:
+    client.getRoleSummary()
+    assert False, 'getRoleSummary should not succeed'
+  except TTransport.TTransportException as e:
+    assert e.message == 'Timed out talking to http://localhost:12345'
+  except Exception as e:
+    assert False, 'Only expected TTransportException, got %s' % e
+
+  transport.close()
+
+
+def test_request_any_other_exception():
+  session = mock.MagicMock(spec=requests.Session)
+  session.headers = {}
+  session.post = mock.Mock(side_effect=request_exceptions.ConnectionError())
+  transport = TRequestsTransport('http://localhost:12345', session_factory=lambda: session)
+  protocol = TJSONProtocol.TJSONProtocol(transport)
+  client = ReadOnlyScheduler.Client(protocol)
+
+  try:
+    client.getRoleSummary()
+    assert False, 'getRoleSummary should not succeed'
+  except TTransport.TTransportException:
+    pass
+  except Exception as e:
+    assert False, 'Only expected TTransportException, got %s' % e
+
+  transport.close()


Mime
View raw message