aurora-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ma...@apache.org
Subject aurora git commit: Deprecating --restart-threshold option in 'aurora job restart'
Date Mon, 25 Apr 2016 23:22:42 GMT
Repository: aurora
Updated Branches:
  refs/heads/master ff6e05f05 -> 534c4694c


Deprecating --restart-threshold option in 'aurora job restart'

Bugs closed: AURORA-1631

Reviewed at https://reviews.apache.org/r/46587/


Project: http://git-wip-us.apache.org/repos/asf/aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/534c4694
Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/534c4694
Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/534c4694

Branch: refs/heads/master
Commit: 534c4694c052baa687b3cc938024802593c3e49b
Parents: ff6e05f
Author: Maxim Khutornenko <maxim@apache.org>
Authored: Mon Apr 25 16:22:16 2016 -0700
Committer: Maxim Khutornenko <maxim@apache.org>
Committed: Mon Apr 25 16:22:16 2016 -0700

----------------------------------------------------------------------
 RELEASE-NOTES.md                                |   5 +
 .../aurora/client/api/error_handling_thread.py  |  75 ------------
 .../apache/aurora/client/api/health_check.py    |  25 ++--
 .../aurora/client/api/instance_watcher.py       |  36 ++----
 .../apache/aurora/client/api/job_monitor.py     |   7 +-
 .../apache/aurora/client/api/restarter.py       |   3 -
 .../apache/aurora/client/api/scheduler_mux.py   | 121 -------------------
 .../apache/aurora/client/api/task_util.py       |  63 +---------
 .../python/apache/aurora/client/cli/jobs.py     |  11 +-
 .../aurora/client/api/test_health_check.py      |  26 ++--
 .../aurora/client/api/test_instance_watcher.py  |  55 ++-------
 .../apache/aurora/client/api/test_restarter.py  |   1 -
 .../aurora/client/api/test_scheduler_mux.py     |  72 -----------
 .../apache/aurora/client/api/test_task_util.py  |  33 ++---
 .../apache/aurora/client/cli/test_restart.py    |  58 ++-------
 15 files changed, 74 insertions(+), 517 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/aurora/blob/534c4694/RELEASE-NOTES.md
----------------------------------------------------------------------
diff --git a/RELEASE-NOTES.md b/RELEASE-NOTES.md
index 4b810f2..3f2c54c 100644
--- a/RELEASE-NOTES.md
+++ b/RELEASE-NOTES.md
@@ -9,6 +9,11 @@
   role's user.
 - Upgraded Mesos to 0.27.2
 
+### Deprecations and removals:
+
+- Deprecated `--restart-threshold` option in the `aurora job restart` command to match the
job
+  updater behavior. This option has no effect now and will be removed in the future release.
+
 0.13.0
 ------
 

http://git-wip-us.apache.org/repos/asf/aurora/blob/534c4694/src/main/python/apache/aurora/client/api/error_handling_thread.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/client/api/error_handling_thread.py b/src/main/python/apache/aurora/client/api/error_handling_thread.py
deleted file mode 100644
index 530715a..0000000
--- a/src/main/python/apache/aurora/client/api/error_handling_thread.py
+++ /dev/null
@@ -1,75 +0,0 @@
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-import sys
-import traceback
-from threading import Thread
-
-from twitter.common.decorators import identify_thread
-
-try:
-  from Queue import Queue
-except ImportError:
-  from queue import Queue
-
-
-class ExecutionError(Exception):
-  """Unhandled thread error wrapper. Raised on the calling thread."""
-  pass
-
-
-class ErrorHandlingThread(Thread):
-  """A thread that helps with unhandled exceptions by re-raising errors
-  with the parent thread upon completion."""
-
-  def __init__(self, *args, **kw):
-    super(ErrorHandlingThread, self).__init__(*args, **kw)
-    self.__real_run, self.run = self.run, self._excepting_run
-    self.__errors = Queue()
-
-  @identify_thread
-  def _excepting_run(self, *args, **kw):
-    try:
-      self.__real_run(*args, **kw)
-      self.__errors.put(None)
-    except Exception:
-      try:
-        e_type, e_val, e_tb = sys.exc_info()
-        self.__errors.put(ExecutionError(
-            'Unhandled error while running worker thread. '
-            'Original error details: %s' % traceback.format_exception(e_type, e_val, e_tb)))
-      except: # noqa
-        # This appears to be the only way to avoid nasty "interpreter shutdown" errors when
-        # dealing with daemon threads. While not ideal, there is nothing else we could do
here
-        # if the sys.exc_info() call fails.
-        pass
-
-  def join_and_raise(self):
-    """Waits for completion and re-raises any exception on a caller thread."""
-    error = self.__errors.get(timeout=sys.maxint)  # Timeout for interruptibility.
-    if error is not None:
-      raise error
-
-
-def spawn_worker(target, *args, **kwargs):
-  """Creates and starts a new daemon worker thread.
-
-  Arguments:
-  target -- target method.
-
-  Returns thread handle.
-  """
-  thread = ErrorHandlingThread(target=target, *args, **kwargs)
-  thread.daemon = True
-  thread.start()
-  return thread

http://git-wip-us.apache.org/repos/asf/aurora/blob/534c4694/src/main/python/apache/aurora/client/api/health_check.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/client/api/health_check.py b/src/main/python/apache/aurora/client/api/health_check.py
index 0503c0b..b3d8331 100644
--- a/src/main/python/apache/aurora/client/api/health_check.py
+++ b/src/main/python/apache/aurora/client/api/health_check.py
@@ -23,7 +23,7 @@ from gen.apache.aurora.api.ttypes import ScheduleStatus
 class HealthCheck(Interface):
   @abstractmethod
   def health(self, task):
-    """Checks health of the task and returns a (healthy, retriable) pair."""
+    """Checks health of the task and returns a True or False."""
 
 
 class HealthStatus(object):
@@ -35,22 +35,11 @@ class HealthStatus(object):
   def dead(cls):
     return cls(False).health()
 
-  def __init__(self, retry, health):
-    self._retry = retry
+  def __init__(self, health):
     self._health = health
 
   def health(self):
-    return (self._health, self._retry)
-
-
-class NotRetriable(HealthStatus):
-  def __init__(self, health):
-    super(NotRetriable, self).__init__(False, health)
-
-
-class Retriable(HealthStatus):
-  def __init__(self, health):
-    super(Retriable, self).__init__(True, health)
+    return self._health
 
 
 class StatusHealthCheck(HealthCheck):
@@ -70,12 +59,12 @@ class StatusHealthCheck(HealthCheck):
     if status == ScheduleStatus.RUNNING:
       if instance_id in self._task_ids:
         if task_id == self._task_ids.get(instance_id):
-          return Retriable.alive()
+          return HealthStatus.alive()
         else:
-          return NotRetriable.dead()
+          return HealthStatus.dead()
       else:
         log.info('Detected RUNNING instance %s' % instance_id)
         self._task_ids[instance_id] = task_id
-        return Retriable.alive()
+        return HealthStatus.alive()
     else:
-      return Retriable.dead()
+      return HealthStatus.dead()

http://git-wip-us.apache.org/repos/asf/aurora/blob/534c4694/src/main/python/apache/aurora/client/api/instance_watcher.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/client/api/instance_watcher.py b/src/main/python/apache/aurora/client/api/instance_watcher.py
index 6ed8154..a35fb22 100644
--- a/src/main/python/apache/aurora/client/api/instance_watcher.py
+++ b/src/main/python/apache/aurora/client/api/instance_watcher.py
@@ -18,7 +18,7 @@ from threading import Event
 from twitter.common import log
 
 from .health_check import StatusHealthCheck
-from .task_util import StatusMuxHelper
+from .task_util import StatusHelper
 
 from gen.apache.aurora.api.ttypes import ScheduleStatus, TaskQuery
 
@@ -41,21 +41,18 @@ class InstanceWatcher(object):
   def __init__(self,
                scheduler,
                job_key,
-               restart_threshold,
                watch_secs,
                health_check_interval_seconds,
                clock=time,
-               terminating_event=None,
-               scheduler_mux=None):
+               terminating_event=None):
 
     self._scheduler = scheduler
     self._job_key = job_key
-    self._restart_threshold = restart_threshold
     self._watch_secs = watch_secs
     self._health_check_interval_seconds = health_check_interval_seconds
     self._clock = clock
     self._terminating = terminating_event or Event()
-    self._status_helper = StatusMuxHelper(self._scheduler, self._create_query, scheduler_mux)
+    self._status_helper = StatusHelper(self._scheduler, self._create_query)
 
   def watch(self, instance_ids, health_check=None):
     """Watches a set of instances and detects failures based on a delegated health check.
@@ -68,9 +65,6 @@ class InstanceWatcher(object):
     log.info('Watching instances: %s' % instance_ids)
     instance_ids = set(instance_ids)
     health_check = health_check or StatusHealthCheck()
-    now = self._clock.time()
-    expected_healthy_by = now + self._restart_threshold
-    max_time = now + self._restart_threshold + self._watch_secs
 
     instance_states = {}
 
@@ -86,15 +80,13 @@ class InstanceWatcher(object):
           instance_id, self._watch_secs))
         instance.set_healthy(True)
 
-    def maybe_set_instance_unhealthy(instance_id, retriable):
-      # An instance that was previously healthy and currently unhealthy has failed.
+    def set_instance_unhealthy(instance_id):
+      log.info('Instance %s is unhealthy' % instance_id)
       if instance_id in instance_states:
-        log.info('Instance %s is unhealthy' % instance_id)
+        # An instance that was previously healthy and currently unhealthy has failed.
         instance_states[instance_id].set_healthy(False)
-      # If the restart threshold has expired or if the instance cannot be retried it is unhealthy.
-      elif now > expected_healthy_by or not retriable:
-        log.info('Instance %s was not reported healthy within %d seconds' % (
-          instance_id, self._restart_threshold))
+      else:
+        # An instance never passed a health check (e.g.: failed before the first health check).
         instance_states[instance_id] = Instance(finished=True)
 
     while not self._terminating.is_set():
@@ -105,14 +97,11 @@ class InstanceWatcher(object):
         if instance_id not in finished_instances():
           running_task = tasks_by_instance.get(instance_id)
           if running_task is not None:
-            task_healthy, retriable = health_check.health(running_task)
+            task_healthy = health_check.health(running_task)
             if task_healthy:
               set_instance_healthy(instance_id, now)
             else:
-              maybe_set_instance_unhealthy(instance_id, retriable)
-          else:
-            # Set retriable=True since an instance should be retried if it has not been healthy.
-            maybe_set_instance_unhealthy(instance_id, retriable=True)
+              set_instance_unhealthy(instance_id)
 
       log.debug('Instances health: %s' % ['%s: %s' % val for val in instance_states.items()])
 
@@ -120,11 +109,6 @@ class InstanceWatcher(object):
       if set(finished_instances().keys()) == instance_ids:
         return set([s_id for s_id, s in instance_states.items() if not s.healthy])
 
-      # Return if time is up.
-      if now > max_time:
-        return set([s_id for s_id in instance_ids if s_id not in instance_states
-                                             or not instance_states[s_id].healthy])
-
       self._terminating.wait(self._health_check_interval_seconds)
 
   def terminate(self):

http://git-wip-us.apache.org/repos/asf/aurora/blob/534c4694/src/main/python/apache/aurora/client/api/job_monitor.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/client/api/job_monitor.py b/src/main/python/apache/aurora/client/api/job_monitor.py
index aa7e976..d420a96 100644
--- a/src/main/python/apache/aurora/client/api/job_monitor.py
+++ b/src/main/python/apache/aurora/client/api/job_monitor.py
@@ -16,7 +16,7 @@ from threading import Event
 
 from twitter.common.quantity import Amount, Time
 
-from .task_util import StatusMuxHelper
+from .task_util import StatusHelper
 
 from gen.apache.aurora.api.constants import LIVE_STATES, TERMINAL_STATES
 from gen.apache.aurora.api.ttypes import JobKey, TaskQuery
@@ -35,14 +35,13 @@ class JobMonitor(object):
     return status in TERMINAL_STATES
 
   def __init__(self, scheduler, job_key, terminating_event=None,
-               min_poll_interval=MIN_POLL_INTERVAL, max_poll_interval=MAX_POLL_INTERVAL,
-               scheduler_mux=None):
+               min_poll_interval=MIN_POLL_INTERVAL, max_poll_interval=MAX_POLL_INTERVAL):
     self._scheduler = scheduler
     self._job_key = job_key
     self._min_poll_interval = min_poll_interval
     self._max_poll_interval = max_poll_interval
     self._terminating = terminating_event or Event()
-    self._status_helper = StatusMuxHelper(self._scheduler, self.create_query, scheduler_mux)
+    self._status_helper = StatusHelper(self._scheduler, self.create_query)
 
   def iter_tasks(self, instances):
     tasks = self._status_helper.get_tasks(instances)

http://git-wip-us.apache.org/repos/asf/aurora/blob/534c4694/src/main/python/apache/aurora/client/api/restarter.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/client/api/restarter.py b/src/main/python/apache/aurora/client/api/restarter.py
index fbe8f20..6600c6b 100644
--- a/src/main/python/apache/aurora/client/api/restarter.py
+++ b/src/main/python/apache/aurora/client/api/restarter.py
@@ -26,14 +26,12 @@ from gen.apache.aurora.api.ttypes import ResponseCode
 class RestartSettings(object):
   def __init__(self,
                batch_size,
-               restart_threshold,
                max_per_instance_failures,
                max_total_failures,
                watch_secs,
                health_check_interval_seconds):
 
     self.batch_size = batch_size
-    self.restart_threshold = restart_threshold
     self.max_per_instance_failures = max_per_instance_failures
     self.max_total_failures = max_total_failures
     self.watch_secs = watch_secs
@@ -55,7 +53,6 @@ class Restarter(object):
     self._instance_watcher = instance_watcher or InstanceWatcher(
         scheduler,
         job_key.to_thrift(),
-        restart_settings.restart_threshold,
         restart_settings.watch_secs,
         restart_settings.health_check_interval_seconds)
 

http://git-wip-us.apache.org/repos/asf/aurora/blob/534c4694/src/main/python/apache/aurora/client/api/scheduler_mux.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/client/api/scheduler_mux.py b/src/main/python/apache/aurora/client/api/scheduler_mux.py
deleted file mode 100644
index 0832a13..0000000
--- a/src/main/python/apache/aurora/client/api/scheduler_mux.py
+++ /dev/null
@@ -1,121 +0,0 @@
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-import threading
-from collections import defaultdict, namedtuple
-
-from twitter.common.quantity import Amount, Time
-
-from .error_handling_thread import spawn_worker
-
-try:
-  from Queue import Queue, Empty
-except ImportError:
-  from queue import Queue, Empty
-
-
-class SchedulerMux(object):
-  """Multiplexes scheduler RPC requests on a dedicated worker thread."""
-
-  class Error(Exception):
-    """Call error wrapper."""
-    pass
-
-  OK_RESULT = 1
-  DEFAULT_WAIT_TIMEOUT = Amount(1, Time.SECONDS)
-  DEFAULT_JOIN_TIMEOUT = Amount(5, Time.SECONDS)
-  DEFAULT_RPC_TIMEOUT = Amount(120, Time.SECONDS)
-  WORK_ITEM = namedtuple('WorkItem', ['completion_queue', 'command', 'data', 'aggregator'])
-
-  def __init__(self, wait_timeout=DEFAULT_WAIT_TIMEOUT):
-    self.__queue = Queue()
-    self.__terminating = threading.Event()
-    self.__wait_timeout = wait_timeout
-    self.__worker = spawn_worker(self.__monitor)
-
-  def __monitor(self):
-    """Main body of the multiplexer thread.
-
-    This method repeatedly polls the worker queue for new calls, and then
-    dispatches them in batches to the scheduler.
-    Callers are notified when their requests complete."""
-
-    requests_by_command = defaultdict(list)
-    while not self.__terminating.is_set():
-      try:
-        work_item = self.__queue.get(timeout=self.__wait_timeout.as_(Time.SECONDS))
-        requests_by_command[work_item.command].append(work_item)
-      except Empty:
-        self.__call_and_notify(requests_by_command)
-        requests_by_command = defaultdict(list)
-
-  def __call_and_notify(self, requests_by_command):
-    """Batch executes scheduler requests and notifies on completion.
-
-    Takes a set of RPC requests grouped by command type, dispatches them to the scheduler,
-    and then waits for the batched calls to complete. When a call is completed, its callers
-    will be notified via the completion queue."""
-
-    for command, work_items in requests_by_command.items():
-      request = [item.data for item in work_items]
-      request = work_items[0].aggregator(request) if work_items[0].aggregator else request
-      result_status = self.OK_RESULT
-      result_data = None
-      try:
-        result_data = command(request)
-      except (self.Error, Exception) as e:
-        result_status = e
-
-      for work_item in work_items:
-        work_item.completion_queue.put((result_status, result_data))
-
-  def _enqueue(self, completion_queue, command, data, aggregator):
-    """Queues up a scheduler call for a delayed (batched) completion.
-
-    Arguments:
-    completion_queue -- completion queue to notify caller on completion.
-    command -- callback signature accepting a list of data.
-    data -- single request data object to be batched with other similar requests.
-    aggregator -- callback function for data aggregation.
-    """
-    self.__queue.put(self.WORK_ITEM(completion_queue, command, data, aggregator))
-
-  def terminate(self):
-    """Requests the SchedulerMux to terminate."""
-    self.__terminating.set()
-    self.__worker.join(timeout=self.DEFAULT_JOIN_TIMEOUT.as_(Time.SECONDS))
-
-  def enqueue_and_wait(self, command, data, aggregator=None, timeout=DEFAULT_RPC_TIMEOUT):
-    """Queues up the scheduler call and waits for completion.
-
-    Arguments:
-    command -- scheduler command to run.
-    data -- data to query scheduler for.
-    aggregator -- callback function for data aggregation.
-    timeout -- amount of time to wait for completion.
-
-    Returns the aggregated command call response. Response data decomposition is up to the
caller.
-    """
-    try:
-      completion_queue = Queue()
-      self._enqueue(completion_queue, command, data, aggregator)
-      result = completion_queue.get(timeout=timeout.as_(Time.SECONDS))
-      result_status = result[0]
-      if result_status != self.OK_RESULT and not self.__terminating.is_set():
-        if isinstance(result_status, self.Error):
-          raise result_status
-        else:
-          raise self.Error('Unknown error: %s' % result_status)
-      return result[1]
-    except Empty:
-      raise self.Error('Failed to complete operation within %s' % timeout)

http://git-wip-us.apache.org/repos/asf/aurora/blob/534c4694/src/main/python/apache/aurora/client/api/task_util.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/client/api/task_util.py b/src/main/python/apache/aurora/client/api/task_util.py
index b5244ee..fb7c76f 100644
--- a/src/main/python/apache/aurora/client/api/task_util.py
+++ b/src/main/python/apache/aurora/client/api/task_util.py
@@ -11,27 +11,22 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 #
-from itertools import chain
-
 from twitter.common import log
 
 from apache.aurora.client.base import format_response
 
-from .scheduler_mux import SchedulerMux
-
 from gen.apache.aurora.api.ttypes import ResponseCode
 
 
-class StatusMuxHelper(object):
-  """Handles mux/demux logic of the getTasksWithoutConfigs RPC."""
+class StatusHelper(object):
+  """Simple wrapper around getTasksWithoutConfigs RPC call."""
 
-  def __init__(self, scheduler, query_factory, scheduler_mux=None):
+  def __init__(self, scheduler, query_factory):
     self._scheduler = scheduler
     self._query_factory = query_factory
-    self._scheduler_mux = scheduler_mux
 
   def get_tasks(self, instance_ids=None):
-    """Routes call to either immediate direct or multiplexed threaded execution.
+    """Gets tasks from the scheduler.
 
     Arguments:
     instance_ids -- optional list of instance IDs to query for.
@@ -39,46 +34,8 @@ class StatusMuxHelper(object):
     Returns a list of tasks.
     """
     log.debug('Querying instance statuses: %s' % instance_ids)
-
-    if self._scheduler_mux is not None:
-      return self._get_tasks_multiplexed(instance_ids)
-    else:
-      return self._get_tasks(self._query_factory(instance_ids))
-
-  def _get_tasks_multiplexed(self, instance_ids=None):
-    """Gets tasks via SchedulerMux.
-
-    Arguments:
-    instance_ids -- optional list of instance IDs to query for.
-
-    Returns a list of tasks.
-    """
-    tasks = []
-    include_ids = lambda id: id in instance_ids if instance_ids is not None else True
-
-    log.debug('Batch getting task status: %s' % instance_ids)
     try:
-      unfiltered_tasks = self._scheduler_mux.enqueue_and_wait(
-        self._get_tasks,
-        instance_ids if instance_ids else [],
-        self._create_aggregated_query)
-      tasks = [task for task in unfiltered_tasks if include_ids(task.assignedTask.instanceId)]
-    except SchedulerMux.Error as e:
-      log.error('Failed to query status for instances %s. Reason: %s' % (instance_ids, e))
-
-    log.debug('Done batch getting task status: %s' % instance_ids)
-    return tasks
-
-  def _get_tasks(self, query):
-    """Gets tasks directly via SchedulerProxy.
-
-    Arguments:
-    query -- TaskQuery instance.
-
-    Returns a list of tasks.
-    """
-    try:
-      resp = self._scheduler.getTasksWithoutConfigs(query)
+      resp = self._scheduler.getTasksWithoutConfigs(self._query_factory(instance_ids))
     except IOError as e:
       log.error('IO Exception during scheduler call: %s' % e)
       return []
@@ -89,13 +46,3 @@ class StatusMuxHelper(object):
 
     log.debug(format_response(resp))
     return tasks
-
-  def _create_aggregated_query(self, instance_id_lists):
-    """Aggregates multiple instance_id lists into a single list.
-
-    Arguments:
-    instance_id_lists -- list of lists of int.
-    """
-    instance_ids = list(chain.from_iterable(instance_id_lists))
-    log.debug('Aggregated instance ids to query status: %s' % instance_ids)
-    return self._query_factory(instance_ids)

http://git-wip-us.apache.org/repos/asf/aurora/blob/534c4694/src/main/python/apache/aurora/client/cli/jobs.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/client/cli/jobs.py b/src/main/python/apache/aurora/client/cli/jobs.py
index 3cbd607..e8bc38a 100644
--- a/src/main/python/apache/aurora/client/cli/jobs.py
+++ b/src/main/python/apache/aurora/client/cli/jobs.py
@@ -615,9 +615,9 @@ class RestartCommand(Verb):
         CommandOption("--max-per-instance-failures", type=int, default=0,
              help="Maximum number of restarts per instance during restart. Increments total
"
                   "failure count when this limit is exceeded."),
-        CommandOption("--restart-threshold", type=int, default=60,
-             help="Maximum number of seconds before an instance must move into the RUNNING
state "
-                  "before considered a failure.")]
+        CommandOption("--restart-threshold", type=int, default=0,
+             help="This setting is DEPRECATED, will not have any effect if provided and will
be "
+                  "removed in the next release.")]
 
   @property
   def help(self):
@@ -633,6 +633,10 @@ class RestartCommand(Verb):
           context.options.max_total_failures)
       return EXIT_INVALID_PARAMETER
 
+    if context.options.restart_threshold:
+      context.print_out("WARNING: '--restart-threshold' option is no longer supported and
will be "
+                        "removed in the next release.")
+
     job = context.options.instance_spec.jobkey
     instances = (None if context.options.instance_spec.instance == ALL_INSTANCES else
         context.options.instance_spec.instance)
@@ -642,7 +646,6 @@ class RestartCommand(Verb):
     config = context.get_job_config_optional(job, context.options.config)
     restart_settings = RestartSettings(
         batch_size=context.options.batch_size,
-        restart_threshold=context.options.restart_threshold,
         watch_secs=context.options.watch_secs,
         max_per_instance_failures=context.options.max_per_instance_failures,
         max_total_failures=context.options.max_total_failures,

http://git-wip-us.apache.org/repos/asf/aurora/blob/534c4694/src/test/python/apache/aurora/client/api/test_health_check.py
----------------------------------------------------------------------
diff --git a/src/test/python/apache/aurora/client/api/test_health_check.py b/src/test/python/apache/aurora/client/api/test_health_check.py
index af005be..f1be827 100644
--- a/src/test/python/apache/aurora/client/api/test_health_check.py
+++ b/src/test/python/apache/aurora/client/api/test_health_check.py
@@ -16,12 +16,7 @@ import unittest
 
 import mox
 
-from apache.aurora.client.api.health_check import (
-    HealthCheck,
-    NotRetriable,
-    Retriable,
-    StatusHealthCheck
-)
+from apache.aurora.client.api.health_check import HealthCheck, StatusHealthCheck
 
 from gen.apache.aurora.api.ttypes import AssignedTask, ScheduledTask, ScheduleStatus, TaskConfig
 
@@ -55,26 +50,19 @@ class HealthCheckTest(unittest.TestCase):
     """Verify that running instances are reported healthy"""
     task_a = self.create_task(0, 'a')
     task_b = self.create_task(1, 'b')
-    assert self._status_health_check.health(task_a) == Retriable.alive()
-    assert self._status_health_check.health(task_b) == Retriable.alive()
+    assert self._status_health_check.health(task_a)
+    assert self._status_health_check.health(task_b)
 
   def test_failed_status_health_check(self):
     """Verify that the health check fails for tasks in a state other than RUNNING"""
     pending_task = self.create_task(0, 'a', status=PENDING)
     failed_task = self.create_task(1, 'b', status=FAILED)
-    assert self._status_health_check.health(pending_task) == Retriable.dead()
-    assert self._status_health_check.health(failed_task) == Retriable.dead()
+    assert not self._status_health_check.health(pending_task)
+    assert not self._status_health_check.health(failed_task)
 
   def test_changed_task_id(self):
     """Verifes that an instance with a different task id causes the health check to fail"""
     task_a = self.create_task(0, 'a')
     task_b = self.create_task(0, 'b')
-    assert self._status_health_check.health(task_a) == Retriable.alive()
-    assert self._status_health_check.health(task_b) == NotRetriable.dead()
-
-  def test_health_statuses(self):
-    """Verfies that the health status tuple (health, retry_status) are as expected"""
-    assert Retriable.alive() == (True, True)
-    assert Retriable.dead() == (False, True)
-    assert NotRetriable.alive() == (True, False)
-    assert NotRetriable.dead() == (False, False)
+    assert self._status_health_check.health(task_a)
+    assert not self._status_health_check.health(task_b)

http://git-wip-us.apache.org/repos/asf/aurora/blob/534c4694/src/test/python/apache/aurora/client/api/test_instance_watcher.py
----------------------------------------------------------------------
diff --git a/src/test/python/apache/aurora/client/api/test_instance_watcher.py b/src/test/python/apache/aurora/client/api/test_instance_watcher.py
index 9efe1d4..8fd419f 100644
--- a/src/test/python/apache/aurora/client/api/test_instance_watcher.py
+++ b/src/test/python/apache/aurora/client/api/test_instance_watcher.py
@@ -68,7 +68,7 @@ def find_expected_cycles(period, sleep_secs):
 
 class InstanceWatcherTest(unittest.TestCase):
   WATCH_INSTANCES = range(3)
-  RESTART_THRESHOLD = WATCH_SECS = 50
+  WATCH_SECS = 50
   EXPECTED_CYCLES = find_expected_cycles(WATCH_SECS, 3.0)
 
   def setUp(self):
@@ -79,7 +79,6 @@ class InstanceWatcherTest(unittest.TestCase):
     self._health_check = mox.MockObject(HealthCheck)
     self._watcher = InstanceWatcher(self._scheduler,
                                  self._job_key,
-                                 self.RESTART_THRESHOLD,
                                  self.WATCH_SECS,
                                  health_check_interval_seconds=3,
                                  clock=self._clock,
@@ -114,12 +113,13 @@ class InstanceWatcherTest(unittest.TestCase):
     for _ in range(int(num_calls)):
       self._scheduler.getTasksWithoutConfigs(query).AndRaise(IOError('oops'))
 
-  def mock_health_check(self, task, status, retry):
-    self._health_check.health(task).InAnyOrder().AndReturn((status, retry))
+  def mock_health_check(self, task, status):
+    self._health_check.health(task).InAnyOrder().AndReturn(status)
 
-  def expect_health_check(self, instance, status, retry=True, num_calls=EXPECTED_CYCLES):
+  def expect_health_check(self, instance, status, num_calls=EXPECTED_CYCLES):
+    num_calls = num_calls if status else 1
     for _ in range(int(num_calls)):
-      self.mock_health_check(self.create_task(instance), status, retry)
+      self.mock_health_check(self.create_task(instance), status)
 
   def assert_watch_result(self, expected_failed_instances, instances_to_watch=WATCH_INSTANCES):
     instances_returned = self._watcher.watch(instances_to_watch, self._health_check)
@@ -155,18 +155,9 @@ class InstanceWatcherTest(unittest.TestCase):
     self.assert_watch_result([0])
     self.verify_mocks()
 
-  def test_io_failure(self):
-    """Check that IO errors (socket errors) communicating with the scheduler get handled
-     correctly"""
-
-    self.expect_io_error_in_get_statuses()
-    self.replay_mocks()
-    self.assert_watch_result([0, 1, 2])
-    self.verify_mocks()
-
   def test_all_instance_failure(self):
     """All failed instance in a batch of instances"""
-    self.expect_get_statuses()
+    self.expect_get_statuses(num_calls=1)
     self.expect_health_check(0, False)
     self.expect_health_check(1, False)
     self.expect_health_check(2, False)
@@ -174,39 +165,15 @@ class InstanceWatcherTest(unittest.TestCase):
     self.assert_watch_result([0, 1, 2])
     self.verify_mocks()
 
-  def test_restart_threshold_fail_fast(self):
-    """Instances are reported unhealthy with retry set to False"""
-    self.expect_get_statuses(num_calls=1)
-    self.expect_health_check(0, False, retry=False, num_calls=1)
-    self.expect_health_check(1, False, retry=False, num_calls=1)
-    self.expect_health_check(2, False, retry=False, num_calls=1)
-    self.replay_mocks()
-    self.assert_watch_result([0, 1, 2])
-    self.verify_mocks()
-
-  def test_restart_threshold(self):
-    """Instances are reported healthy at the end of the restart_threshold"""
-    self.expect_get_statuses(num_calls=self.EXPECTED_CYCLES - 1)
-    self.expect_health_check(0, False, num_calls=self.EXPECTED_CYCLES - 1)
-    self.expect_health_check(1, False, num_calls=self.EXPECTED_CYCLES - 1)
-    self.expect_health_check(2, False, num_calls=self.EXPECTED_CYCLES - 1)
-    self.expect_get_statuses()
-    self.expect_health_check(0, True)
-    self.expect_health_check(1, True)
-    self.expect_health_check(2, True)
-    self.replay_mocks()
-    self.assert_watch_result([])
-    self.verify_mocks()
-
   def test_watch_period_failure(self):
     """Instances are reported unhealthy before watch_secs expires"""
     self.expect_get_statuses()
     self.expect_health_check(0, True, num_calls=self.EXPECTED_CYCLES - 1)
     self.expect_health_check(1, True, num_calls=self.EXPECTED_CYCLES - 1)
     self.expect_health_check(2, True, num_calls=self.EXPECTED_CYCLES - 1)
-    self.expect_health_check(0, False, num_calls=1)
-    self.expect_health_check(1, False, num_calls=1)
-    self.expect_health_check(2, False, num_calls=1)
+    self.expect_health_check(0, False)
+    self.expect_health_check(1, False)
+    self.expect_health_check(2, False)
     self.replay_mocks()
     self.assert_watch_result([0, 1, 2])
     self.verify_mocks()
@@ -217,7 +184,7 @@ class InstanceWatcherTest(unittest.TestCase):
     self.expect_health_check(0, True)
     self.expect_health_check(1, True)
     self.expect_health_check(2, True, num_calls=self.EXPECTED_CYCLES - 1)
-    self.expect_health_check(2, False, num_calls=1)
+    self.expect_health_check(2, False)
     self.replay_mocks()
     self.assert_watch_result([2])
     self.verify_mocks()

http://git-wip-us.apache.org/repos/asf/aurora/blob/534c4694/src/test/python/apache/aurora/client/api/test_restarter.py
----------------------------------------------------------------------
diff --git a/src/test/python/apache/aurora/client/api/test_restarter.py b/src/test/python/apache/aurora/client/api/test_restarter.py
index ff2002e..a81003e 100644
--- a/src/test/python/apache/aurora/client/api/test_restarter.py
+++ b/src/test/python/apache/aurora/client/api/test_restarter.py
@@ -39,7 +39,6 @@ CLUSTER = 'east'
 JOB = AuroraJobKey(CLUSTER, 'johndoe', 'test', 'test_job')
 RESTART_SETTINGS = RestartSettings(
     batch_size=2,
-    restart_threshold=23,
     watch_secs=45,
     max_per_instance_failures=0,
     max_total_failures=0,

http://git-wip-us.apache.org/repos/asf/aurora/blob/534c4694/src/test/python/apache/aurora/client/api/test_scheduler_mux.py
----------------------------------------------------------------------
diff --git a/src/test/python/apache/aurora/client/api/test_scheduler_mux.py b/src/test/python/apache/aurora/client/api/test_scheduler_mux.py
deleted file mode 100644
index 021175c..0000000
--- a/src/test/python/apache/aurora/client/api/test_scheduler_mux.py
+++ /dev/null
@@ -1,72 +0,0 @@
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-import time
-import unittest
-
-from twitter.common.quantity import Amount, Time
-
-from apache.aurora.client.api.scheduler_mux import SchedulerMux
-
-
-class SchedulerMuxTest(unittest.TestCase):
-
-  DATA = [1, 2, 3]
-  MUX = None
-
-  @classmethod
-  def setUpClass(cls):
-    cls.MUX = SchedulerMux(wait_timeout=Amount(10, Time.MILLISECONDS))
-
-  @classmethod
-  def tearDownClass(cls):
-    cls.MUX.terminate()
-
-  @classmethod
-  def error_command(cls, data):
-    raise SchedulerMux.Error('expected')
-
-  @classmethod
-  def unknown_error_command(cls, data):
-    raise Exception('expected')
-
-  @classmethod
-  def timeout_command(cls, data):
-    time.sleep(2)
-
-  def test_success(self):
-    assert [self.DATA] == self.MUX.enqueue_and_wait(lambda d: d, self.DATA)
-
-  def test_failure(self):
-    try:
-      self.MUX.enqueue_and_wait(self.error_command, self.DATA)
-    except SchedulerMux.Error as e:
-      assert 'expected' in e.message
-    else:
-      self.fail()
-
-  def test_unknown_failure(self):
-    try:
-      self.MUX.enqueue_and_wait(self.unknown_error_command, self.DATA)
-    except SchedulerMux.Error as e:
-      assert 'Unknown error' in e.message
-    else:
-      self.fail()
-
-  def test_timeout(self):
-    try:
-      self.MUX.enqueue_and_wait(self.timeout_command, self.DATA, timeout=Amount(1, Time.SECONDS))
-    except SchedulerMux.Error as e:
-      'Failed to complete operation' in e.message
-    else:
-      self.fail()

http://git-wip-us.apache.org/repos/asf/aurora/blob/534c4694/src/test/python/apache/aurora/client/api/test_task_util.py
----------------------------------------------------------------------
diff --git a/src/test/python/apache/aurora/client/api/test_task_util.py b/src/test/python/apache/aurora/client/api/test_task_util.py
index eda326d..365ef59 100644
--- a/src/test/python/apache/aurora/client/api/test_task_util.py
+++ b/src/test/python/apache/aurora/client/api/test_task_util.py
@@ -14,10 +14,9 @@
 
 import unittest
 
-from mock import create_autospec
+from mock import call, create_autospec
 
-from apache.aurora.client.api.scheduler_mux import SchedulerMux
-from apache.aurora.client.api.task_util import StatusMuxHelper
+from apache.aurora.client.api.task_util import StatusHelper
 
 from ...api_util import SchedulerThriftApiSpec
 
@@ -43,20 +42,14 @@ class TaskUtilTest(unittest.TestCase):
     return query
 
   @classmethod
-  def create_mux_helper(cls, scheduler, query, scheduler_mux=None):
-    return StatusMuxHelper(scheduler, query, scheduler_mux=scheduler_mux)
+  def create_helper(cls, scheduler, query):
+    return StatusHelper(scheduler, query)
 
   @classmethod
   def create_tasks(cls):
     return [ScheduledTask(assignedTask=AssignedTask(instanceId=index)) for index in cls.INSTANCES]
 
   @classmethod
-  def mock_mux(cls, tasks):
-    mux = create_autospec(spec=SchedulerMux, instance=True)
-    mux.enqueue_and_wait.return_value = tasks
-    return mux
-
-  @classmethod
   def mock_scheduler(cls, response_code=None):
     scheduler = create_autospec(spec=SchedulerThriftApiSpec, instance=True)
     response_code = ResponseCode.OK if response_code is None else response_code
@@ -65,22 +58,10 @@ class TaskUtilTest(unittest.TestCase):
     scheduler.getTasksWithoutConfigs.return_value = resp
     return scheduler
 
-  def test_no_mux_run(self):
+  def test_run(self):
     scheduler = self.mock_scheduler()
-    helper = self.create_mux_helper(scheduler, self.create_query)
-    tasks = helper.get_tasks(self.INSTANCES)
-
-    scheduler.getTasksWithoutConfigs.assert_called_once_with(self.create_query(self.INSTANCES))
-    assert 1 == len(tasks)
-
-  def test_mux_run(self):
-    expected_tasks = self.create_tasks()
-    mux = self.mock_mux(expected_tasks)
-    helper = self.create_mux_helper(None, self.create_query, scheduler_mux=mux)
+    helper = self.create_helper(scheduler, self.create_query)
     tasks = helper.get_tasks(self.INSTANCES)
 
-    mux.enqueue_and_wait.assert_called_once_with(
-        helper._get_tasks,
-        self.INSTANCES,
-        helper._create_aggregated_query)
+    assert scheduler.getTasksWithoutConfigs.mock_calls == [call(self.create_query(self.INSTANCES))]
     assert 1 == len(tasks)

http://git-wip-us.apache.org/repos/asf/aurora/blob/534c4694/src/test/python/apache/aurora/client/cli/test_restart.py
----------------------------------------------------------------------
diff --git a/src/test/python/apache/aurora/client/cli/test_restart.py b/src/test/python/apache/aurora/client/cli/test_restart.py
index 54de6cf..967d560 100644
--- a/src/test/python/apache/aurora/client/cli/test_restart.py
+++ b/src/test/python/apache/aurora/client/cli/test_restart.py
@@ -18,8 +18,7 @@ import pytest
 from mock import call, create_autospec, patch
 from twitter.common.contextutil import temporary_file
 
-from apache.aurora.client.api.health_check import Retriable, StatusHealthCheck
-from apache.aurora.client.api.restarter import RestartSettings
+from apache.aurora.client.api.health_check import StatusHealthCheck
 from apache.aurora.client.cli import EXIT_API_ERROR, EXIT_INVALID_PARAMETER, Context
 from apache.aurora.client.cli.client import AuroraCommandLine
 from apache.aurora.client.cli.jobs import RestartCommand
@@ -28,43 +27,10 @@ from apache.aurora.common.aurora_job_key import AuroraJobKey
 
 from .util import AuroraClientCommandTest, FakeAuroraCommandContext, IOMock, mock_verb_options
 
-from gen.apache.aurora.api.ttypes import JobKey, PopulateJobResult, ResponseCode, Result,
TaskConfig
+from gen.apache.aurora.api.ttypes import JobKey, PopulateJobResult, Result, TaskConfig
 
 
 class TestRestartJobCommand(AuroraClientCommandTest):
-
-  def test_restart_with_lock(self):
-    command = RestartCommand()
-
-    jobkey = AuroraJobKey("cluster", "role", "env", "job")
-    mock_options = mock_verb_options(command)
-    mock_options.instance_spec = TaskInstanceKey(jobkey, [])
-
-    fake_context = FakeAuroraCommandContext()
-    fake_context.set_options(mock_options)
-
-    mock_api = fake_context.get_api("test")
-    mock_api.restart.return_value = AuroraClientCommandTest.create_blank_response(
-      ResponseCode.LOCK_ERROR, "Error.")
-
-    with pytest.raises(Context.CommandError):
-      command.execute(fake_context)
-
-    restart_settings = RestartSettings(
-        batch_size=mock_options.batch_size,
-        restart_threshold=mock_options.restart_threshold,
-        max_per_instance_failures=mock_options.max_per_instance_failures,
-        max_total_failures=mock_options.max_total_failures,
-        watch_secs=mock_options.watch_secs,
-        health_check_interval_seconds=mock_options.healthcheck_interval_seconds)
-
-    mock_api.restart.assert_called_once_with(
-        jobkey,
-        mock_options.instance_spec.instance,
-        restart_settings,
-        config=None)
-    self.assert_lock_message(fake_context)
-
   def test_restart_inactive_instance_spec(self):
     command = RestartCommand()
 
@@ -122,15 +88,15 @@ class TestRestartCommand(AuroraClientCommandTest):
     return populate
 
   @classmethod
-  def setup_health_checks(cls, mock_api):
+  def setup_health_checks(cls):
     mock_health_check = create_autospec(spec=StatusHealthCheck, instance=True)
-    mock_health_check.health.return_value = Retriable.alive()
+    mock_health_check.health.return_value = True
     return mock_health_check
 
   def test_restart_simple(self):
     # Test the client-side restart logic in its simplest case: everything succeeds
     (mock_api, mock_scheduler_proxy) = self.create_mock_api()
-    mock_health_check = self.setup_health_checks(mock_api)
+    mock_health_check = self.setup_health_checks()
     self.setup_mock_scheduler_for_simple_restart(mock_api)
     with contextlib.nested(
         patch('apache.aurora.client.api.SchedulerProxy', return_value=mock_scheduler_proxy),
@@ -159,7 +125,7 @@ class TestRestartCommand(AuroraClientCommandTest):
   def test_restart_simple_no_config(self):
     # Test the client-side restart logic in its simplest case: everything succeeds
     (mock_api, mock_scheduler_proxy) = self.create_mock_api()
-    mock_health_check = self.setup_health_checks(mock_api)
+    mock_health_check = self.setup_health_checks()
     self.setup_mock_scheduler_for_simple_restart(mock_api)
     with contextlib.nested(
         patch('apache.aurora.client.api.SchedulerProxy', return_value=mock_scheduler_proxy),
@@ -180,7 +146,7 @@ class TestRestartCommand(AuroraClientCommandTest):
     # Test the client-side restart when a shard argument is too large, and it's
     # using strict mode.
     (mock_api, mock_scheduler_proxy) = self.create_mock_api()
-    mock_health_check = self.setup_health_checks(mock_api)
+    mock_health_check = self.setup_health_checks()
     self.setup_mock_scheduler_for_simple_restart(mock_api)
     with contextlib.nested(
         patch('apache.aurora.client.api.SchedulerProxy', return_value=mock_scheduler_proxy),
@@ -201,7 +167,7 @@ class TestRestartCommand(AuroraClientCommandTest):
 
   def test_restart_failed_status(self):
     (mock_api, mock_scheduler_proxy) = self.create_mock_api()
-    mock_health_check = self.setup_health_checks(mock_api)
+    mock_health_check = self.setup_health_checks()
     self.setup_mock_scheduler_for_simple_restart(mock_api)
     mock_scheduler_proxy.getTasksWithoutConfigs.return_value = self.create_error_response()
     with contextlib.nested(
@@ -222,7 +188,7 @@ class TestRestartCommand(AuroraClientCommandTest):
 
   def test_restart_no_such_job_with_instances(self):
     (mock_api, mock_scheduler_proxy) = self.create_mock_api()
-    mock_health_check = self.setup_health_checks(mock_api)
+    mock_health_check = self.setup_health_checks()
     mock_io = IOMock()
     self.setup_mock_scheduler_for_simple_restart(mock_api)
     # Make getTasksWithoutConfigs return an error, which is what happens when a job is not
found.
@@ -254,7 +220,7 @@ class TestRestartCommand(AuroraClientCommandTest):
 
   def test_restart_failed_restart(self):
     (mock_api, mock_scheduler_proxy) = self.create_mock_api()
-    mock_health_check = self.setup_health_checks(mock_api)
+    mock_health_check = self.setup_health_checks()
     self.setup_mock_scheduler_for_simple_restart(mock_api)
     mock_scheduler_proxy.restartShards.return_value = self.create_error_response()
     with contextlib.nested(
@@ -297,7 +263,7 @@ class TestRestartCommand(AuroraClientCommandTest):
     self.reset_mock_io()
     # Test the client-side restart logic in its simplest case: everything succeeds
     (mock_api, mock_scheduler_proxy) = self.create_mock_api()
-    mock_health_check = self.setup_health_checks(mock_api)
+    mock_health_check = self.setup_health_checks()
     self.setup_mock_scheduler_for_simple_restart(mock_api)
     with contextlib.nested(
         patch('apache.aurora.client.api.SchedulerProxy', return_value=mock_scheduler_proxy),
@@ -322,7 +288,7 @@ class TestRestartCommand(AuroraClientCommandTest):
   def test_restart_failed_restart_output(self):
     self.reset_mock_io()
     (mock_api, mock_scheduler_proxy) = self.create_mock_api()
-    mock_health_check = self.setup_health_checks(mock_api)
+    mock_health_check = self.setup_health_checks()
     self.setup_mock_scheduler_for_simple_restart(mock_api)
     mock_scheduler_proxy.restartShards.return_value = self.create_error_response()
     with contextlib.nested(


Mime
View raw message