aurora-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ma...@apache.org
Subject [2/2] git commit: Implementing parallel updater.
Date Wed, 23 Jul 2014 17:08:12 GMT
Implementing parallel updater.

The updater now spawns upto batch_size threads to process
one instance per thread.

All scheduler calls are multiplexed by the SchedulerMux to do
batch kill/add/restart/status calls.

Bugs closed: AURORA-350

Reviewed at https://reviews.apache.org/r/21440/


Project: http://git-wip-us.apache.org/repos/asf/incubator-aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-aurora/commit/e1c0ade2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-aurora/tree/e1c0ade2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-aurora/diff/e1c0ade2

Branch: refs/heads/master
Commit: e1c0ade29d232282a82550949f5f5370ce2666ab
Parents: 2b78aff
Author: Maxim Khutornenko <maxim@apache.org>
Authored: Wed Jul 23 10:07:41 2014 -0700
Committer: Maxim Khutornenko <maxim@apache.org>
Committed: Wed Jul 23 10:07:41 2014 -0700

----------------------------------------------------------------------
 src/main/python/apache/aurora/client/api/BUILD  |  34 ++
 .../aurora/client/api/error_handling_thread.py  |  75 +++
 .../aurora/client/api/instance_watcher.py       |  36 +-
 .../apache/aurora/client/api/job_monitor.py     |  38 +-
 .../aurora/client/api/scheduler_client.py       |  51 +-
 .../apache/aurora/client/api/scheduler_mux.py   | 121 +++++
 src/main/python/apache/aurora/client/api/sla.py |   6 +-
 .../apache/aurora/client/api/task_util.py       | 101 ++++
 .../python/apache/aurora/client/api/updater.py  | 495 +++++++++++++------
 .../apache/aurora/client/api/updater_util.py    |   4 +-
 src/main/python/apache/aurora/client/base.py    |   8 +-
 src/test/python/apache/aurora/client/api/BUILD  |  17 +
 .../aurora/client/api/test_instance_watcher.py  |  27 +-
 .../aurora/client/api/test_job_monitor.py       |  22 +-
 .../aurora/client/api/test_scheduler_mux.py     |  72 +++
 .../apache/aurora/client/api/test_task_util.py  |  83 ++++
 .../apache/aurora/client/api/test_updater.py    | 299 +++++------
 .../apache/aurora/client/cli/test_create.py     |   6 +-
 .../apache/aurora/client/cli/test_kill.py       |  20 +-
 .../apache/aurora/client/cli/test_restart.py    |  10 +-
 .../apache/aurora/client/cli/test_update.py     |  42 +-
 .../python/apache/aurora/client/cli/util.py     |   2 +-
 .../aurora/client/commands/test_create.py       |   4 +-
 .../apache/aurora/client/commands/test_kill.py  |   8 +-
 .../aurora/client/commands/test_restart.py      |   8 +-
 .../aurora/client/commands/test_update.py       |  46 +-
 .../apache/aurora/client/commands/util.py       |   2 +-
 .../aurora/client/fake_scheduler_proxy.py       |   7 +
 .../apache/aurora/e2e/http/http_example.aurora  |   4 +-
 .../aurora/e2e/http/http_example_updated.aurora |  13 +-
 .../sh/org/apache/aurora/e2e/test_end_to_end.sh |   2 +-
 .../org/apache/aurora/e2e/test_end_to_end_v2.sh |   2 +-
 32 files changed, 1182 insertions(+), 483 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/e1c0ade2/src/main/python/apache/aurora/client/api/BUILD
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/client/api/BUILD b/src/main/python/apache/aurora/client/api/BUILD
index c205a7d..70ad38e 100644
--- a/src/main/python/apache/aurora/client/api/BUILD
+++ b/src/main/python/apache/aurora/client/api/BUILD
@@ -55,6 +55,7 @@ python_library(
   name = 'job_monitor',
   sources = ['job_monitor.py'],
   dependencies = [
+    pants(':task_util'),
     pants('3rdparty/python:twitter.common.log'),
     pants('3rdparty/python:twitter.common.quantity'),
     pants('src/main/thrift/org/apache/aurora/gen:py-thrift'),
@@ -88,10 +89,43 @@ python_library(
 )
 
 python_library(
+  name = 'error_handling_thread',
+  sources = ['error_handling_thread.py'],
+  dependencies = [
+    pants('3rdparty/python:twitter.common.decorators'),
+    pants('3rdparty/python:twitter.common.quantity'),
+    pants('3rdparty/python:twitter.common.log'),
+  ]
+)
+
+python_library(
+  name = 'scheduler_mux',
+  sources = ['scheduler_mux.py'],
+  dependencies = [
+    pants(':error_handling_thread'),
+    pants('3rdparty/python:twitter.common.quantity'),
+    pants('3rdparty/python:twitter.common.log'),
+  ]
+)
+
+python_library(
+  name = 'task_util',
+  sources = ['task_util.py'],
+  dependencies = [
+    pants(':scheduler_mux'),
+    pants('3rdparty/python:twitter.common.log'),
+    pants('src/main/python/apache/aurora/client:base'),
+    pants('src/main/thrift/org/apache/aurora/gen:py-thrift'),
+  ]
+)
+
+python_library(
   name = 'instance_watcher',
   sources = ['instance_watcher.py', 'health_check.py'],
   dependencies = [
     pants(':scheduler_client'),
+    pants(':scheduler_mux'),
+    pants(':task_util'),
     pants('3rdparty/python:twitter.common.lang'),
     pants('3rdparty/python:twitter.common.log'),
     pants('src/main/python/apache/aurora/common:http_signaler'),

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/e1c0ade2/src/main/python/apache/aurora/client/api/error_handling_thread.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/client/api/error_handling_thread.py b/src/main/python/apache/aurora/client/api/error_handling_thread.py
new file mode 100644
index 0000000..530715a
--- /dev/null
+++ b/src/main/python/apache/aurora/client/api/error_handling_thread.py
@@ -0,0 +1,75 @@
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+import sys
+import traceback
+from threading import Thread
+
+from twitter.common.decorators import identify_thread
+
+try:
+  from Queue import Queue
+except ImportError:
+  from queue import Queue
+
+
+class ExecutionError(Exception):
+  """Unhandled thread error wrapper. Raised on the calling thread."""
+  pass
+
+
+class ErrorHandlingThread(Thread):
+  """A thread that helps with unhandled exceptions by re-raising errors
+  with the parent thread upon completion."""
+
+  def __init__(self, *args, **kw):
+    super(ErrorHandlingThread, self).__init__(*args, **kw)
+    self.__real_run, self.run = self.run, self._excepting_run
+    self.__errors = Queue()
+
+  @identify_thread
+  def _excepting_run(self, *args, **kw):
+    try:
+      self.__real_run(*args, **kw)
+      self.__errors.put(None)
+    except Exception:
+      try:
+        e_type, e_val, e_tb = sys.exc_info()
+        self.__errors.put(ExecutionError(
+            'Unhandled error while running worker thread. '
+            'Original error details: %s' % traceback.format_exception(e_type, e_val, e_tb)))
+      except: # noqa
+        # This appears to be the only way to avoid nasty "interpreter shutdown" errors when
+        # dealing with daemon threads. While not ideal, there is nothing else we could do here
+        # if the sys.exc_info() call fails.
+        pass
+
+  def join_and_raise(self):
+    """Waits for completion and re-raises any exception on a caller thread."""
+    error = self.__errors.get(timeout=sys.maxint)  # Timeout for interruptibility.
+    if error is not None:
+      raise error
+
+
+def spawn_worker(target, *args, **kwargs):
+  """Creates and starts a new daemon worker thread.
+
+  Arguments:
+  target -- target method.
+
+  Returns thread handle.
+  """
+  thread = ErrorHandlingThread(target=target, *args, **kwargs)
+  thread.daemon = True
+  thread.start()
+  return thread

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/e1c0ade2/src/main/python/apache/aurora/client/api/instance_watcher.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/client/api/instance_watcher.py b/src/main/python/apache/aurora/client/api/instance_watcher.py
index d2ad6fd..fe2f551 100644
--- a/src/main/python/apache/aurora/client/api/instance_watcher.py
+++ b/src/main/python/apache/aurora/client/api/instance_watcher.py
@@ -13,10 +13,12 @@
 #
 
 import time
+from threading import Event
 
 from twitter.common import log
 
 from .health_check import StatusHealthCheck
+from .task_util import StatusMuxHelper
 
 from gen.apache.aurora.api.ttypes import Identity, ResponseCode, ScheduleStatus, TaskQuery
 
@@ -42,7 +44,9 @@ class InstanceWatcher(object):
                restart_threshold,
                watch_secs,
                health_check_interval_seconds,
-               clock=time):
+               clock=time,
+               terminating_event=None,
+               scheduler_mux=None):
 
     self._scheduler = scheduler
     self._job_key = job_key
@@ -50,6 +54,8 @@ class InstanceWatcher(object):
     self._watch_secs = watch_secs
     self._health_check_interval_seconds = health_check_interval_seconds
     self._clock = clock
+    self._terminating = terminating_event or Event()
+    self._status_helper = StatusMuxHelper(self._scheduler, self._create_query, scheduler_mux)
 
   def watch(self, instance_ids, health_check=None):
     """Watches a set of instances and detects failures based on a delegated health check.
@@ -91,8 +97,8 @@ class InstanceWatcher(object):
           instance_id, self._restart_threshold))
         instance_states[instance_id] = Instance(finished=True)
 
-    while True:
-      running_tasks = self._get_tasks_by_instance_id(instance_ids)
+    while not self._terminating.is_set():
+      running_tasks = self._status_helper.get_tasks(instance_ids)
       now = self._clock.time()
       tasks_by_instance = dict((task.assignedTask.instanceId, task) for task in running_tasks)
       for instance_id in instance_ids:
@@ -119,27 +125,17 @@ class InstanceWatcher(object):
         return set([s_id for s_id in instance_ids if s_id not in instance_states
                                              or not instance_states[s_id].healthy])
 
-      self._clock.sleep(self._health_check_interval_seconds)
+      self._terminating.wait(self._health_check_interval_seconds)
 
-  def _get_tasks_by_instance_id(self, instance_ids):
-    log.debug('Querying instance statuses.')
+  def terminate(self):
+    """Requests immediate termination of the watch cycle."""
+    self._terminating.set()
+
+  def _create_query(self, instance_ids):
     query = TaskQuery()
     query.owner = Identity(role=self._job_key.role)
     query.environment = self._job_key.environment
     query.jobName = self._job_key.name
     query.statuses = set([ScheduleStatus.RUNNING])
-
     query.instanceIds = instance_ids
-    try:
-      resp = self._scheduler.getTasksWithoutConfigs(query)
-    except IOError as e:
-      log.error('IO Exception during scheduler call: %s' % e)
-      return []
-
-    tasks = []
-    if resp.responseCode == ResponseCode.OK:
-      tasks = resp.result.scheduleStatusResult.tasks
-
-    log.debug('Response from scheduler: %s (message: %s)'
-        % (ResponseCode._VALUES_TO_NAMES[resp.responseCode], resp.messageDEPRECATED))
-    return tasks
+    return query

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/e1c0ade2/src/main/python/apache/aurora/client/api/job_monitor.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/client/api/job_monitor.py b/src/main/python/apache/aurora/client/api/job_monitor.py
index 6c26cb9..756093d 100644
--- a/src/main/python/apache/aurora/client/api/job_monitor.py
+++ b/src/main/python/apache/aurora/client/api/job_monitor.py
@@ -12,12 +12,14 @@
 # limitations under the License.
 #
 
-import time
+from threading import Event
 
 from thrift.transport import TTransport
 from twitter.common import log
 from twitter.common.quantity import Amount, Time
 
+from .task_util import StatusMuxHelper
+
 from gen.apache.aurora.api.constants import LIVE_STATES, TERMINAL_STATES
 from gen.apache.aurora.api.ttypes import Identity, TaskQuery
 
@@ -34,28 +36,24 @@ class JobMonitor(object):
   def terminal(cls, status):
     return status in TERMINAL_STATES
 
-  def __init__(self, scheduler, job_key, clock=time,
-               min_poll_interval=MIN_POLL_INTERVAL, max_poll_interval=MAX_POLL_INTERVAL):
+  def __init__(self, scheduler, job_key, terminating_event=None,
+               min_poll_interval=MIN_POLL_INTERVAL, max_poll_interval=MAX_POLL_INTERVAL,
+               scheduler_mux=None):
     self._scheduler = scheduler
     self._job_key = job_key
-    self._clock = clock
     self._min_poll_interval = min_poll_interval
     self._max_poll_interval = max_poll_interval
+    self._terminating = terminating_event or Event()
+    self._status_helper = StatusMuxHelper(self._scheduler, self.create_query, scheduler_mux)
 
-  def iter_query(self, query):
-    try:
-      res = self._scheduler.getTasksWithoutConfigs(query)
-    except TTransport.TTransportException as e:
-      log.error('Failed to query tasks from scheduler: %s' % e)
-      return
-    if res is None or res.result is None:
-      return
-    for task in res.result.scheduleStatusResult.tasks:
+  def iter_tasks(self, instances):
+    tasks = self._status_helper.get_tasks(instances)
+    for task in tasks:
       yield task
 
-  def states(self, query):
+  def states(self, instance_ids):
     states = {}
-    for task in self.iter_query(query):
+    for task in self.iter_tasks(instance_ids):
       status, instance_id = task.status, task.assignedTask.instanceId
       first_timestamp = task.taskEvents[0].timestamp
       if instance_id not in states or first_timestamp > states[instance_id][0]:
@@ -81,11 +79,17 @@ class JobMonitor(object):
     Returns: True if predicate is met or False if timeout has expired.
     """
     poll_interval = self._min_poll_interval
-    while not all(predicate(state) for state in self.states(self.create_query(instances)).values()):
+    while not self._terminating.is_set() and not all(predicate(state) for state
+        in self.states(instances).values()):
+
       if with_timeout and poll_interval >= self._max_poll_interval:
         return False
 
-      self._clock.sleep(poll_interval.as_(Time.SECONDS))
+      self._terminating.wait(poll_interval.as_(Time.SECONDS))
       poll_interval = min(self._max_poll_interval, 2 * poll_interval)
 
     return True
+
+  def terminate(self):
+    """Requests immediate termination of the wait cycle."""
+    self._terminating.set()

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/e1c0ade2/src/main/python/apache/aurora/client/api/scheduler_client.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/client/api/scheduler_client.py b/src/main/python/apache/aurora/client/api/scheduler_client.py
index e911135..4b20efa 100644
--- a/src/main/python/apache/aurora/client/api/scheduler_client.py
+++ b/src/main/python/apache/aurora/client/api/scheduler_client.py
@@ -204,6 +204,8 @@ class SchedulerProxy(object):
     self._session_key_factory = session_key_factory
     self._client = self._scheduler_client = None
     self.verbose = verbose
+    self._lock = threading.RLock()
+    self._terminating = threading.Event()
 
   def with_scheduler(method):
     """Decorator magic to make sure a connection is made to the scheduler"""
@@ -216,6 +218,11 @@ class SchedulerProxy(object):
   def invalidate(self):
     self._client = self._scheduler_client = None
 
+  def terminate(self):
+    """Requests immediate termination of any retry attempts and invalidates client."""
+    self._terminating.set()
+    self.invalidate()
+
   @with_scheduler
   def client(self):
     return self._client
@@ -268,24 +275,30 @@ class SchedulerProxy(object):
 
     @functools.wraps(method)
     def method_wrapper(*args):
-      start = time.time()
-      while (time.time() - start) < self.RPC_MAXIMUM_WAIT.as_(Time.SECONDS):
-        auth_args = () if method_name in self.UNAUTHENTICATED_RPCS else (self.session_key(),)
-        try:
-          method = getattr(self.client(), method_name)
-          if not callable(method):
-            return method
-          return method(*(args + auth_args))
-        except (TTransport.TTransportException, self.TimeoutError) as e:
-          log.warning('Connection error with scheduler: %s, reconnecting...' % e)
-          self.invalidate()
-          time.sleep(self.RPC_RETRY_INTERVAL.as_(Time.SECONDS))
-        except Exception as e:
-          # Take any error that occurs during the RPC call, and transform it
-          # into something clients can handle.
-          raise self.ThriftInternalError("Error during thrift call %s to %s: %s" %
-              (method_name, self.cluster.name, e))
-      raise self.TimeoutError('Timed out attempting to issue %s to %s' % (
-          method_name, self.cluster.name))
+      with self._lock:
+        start = time.time()
+        while not self._terminating.is_set() and (
+            time.time() - start) < self.RPC_MAXIMUM_WAIT.as_(Time.SECONDS):
+
+          auth_args = () if method_name in self.UNAUTHENTICATED_RPCS else (self.session_key(),)
+          try:
+            method = getattr(self.client(), method_name)
+            if not callable(method):
+              return method
+            return method(*(args + auth_args))
+          except (TTransport.TTransportException, self.TimeoutError) as e:
+            if not self._terminating:
+              log.warning('Connection error with scheduler: %s, reconnecting...' % e)
+              self.invalidate()
+              self._terminating.wait(self.RPC_RETRY_INTERVAL.as_(Time.SECONDS))
+          except Exception as e:
+            # Take any error that occurs during the RPC call, and transform it
+            # into something clients can handle.
+            if not self._terminating:
+              raise self.ThriftInternalError("Error during thrift call %s to %s: %s" %
+                                            (method_name, self.cluster.name, e))
+        if not self._terminating:
+          raise self.TimeoutError('Timed out attempting to issue %s to %s' % (
+              method_name, self.cluster.name))
 
     return method_wrapper

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/e1c0ade2/src/main/python/apache/aurora/client/api/scheduler_mux.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/client/api/scheduler_mux.py b/src/main/python/apache/aurora/client/api/scheduler_mux.py
new file mode 100644
index 0000000..0832a13
--- /dev/null
+++ b/src/main/python/apache/aurora/client/api/scheduler_mux.py
@@ -0,0 +1,121 @@
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+import threading
+from collections import defaultdict, namedtuple
+
+from twitter.common.quantity import Amount, Time
+
+from .error_handling_thread import spawn_worker
+
+try:
+  from Queue import Queue, Empty
+except ImportError:
+  from queue import Queue, Empty
+
+
+class SchedulerMux(object):
+  """Multiplexes scheduler RPC requests on a dedicated worker thread."""
+
+  class Error(Exception):
+    """Call error wrapper."""
+    pass
+
+  OK_RESULT = 1
+  DEFAULT_WAIT_TIMEOUT = Amount(1, Time.SECONDS)
+  DEFAULT_JOIN_TIMEOUT = Amount(5, Time.SECONDS)
+  DEFAULT_RPC_TIMEOUT = Amount(120, Time.SECONDS)
+  WORK_ITEM = namedtuple('WorkItem', ['completion_queue', 'command', 'data', 'aggregator'])
+
+  def __init__(self, wait_timeout=DEFAULT_WAIT_TIMEOUT):
+    self.__queue = Queue()
+    self.__terminating = threading.Event()
+    self.__wait_timeout = wait_timeout
+    self.__worker = spawn_worker(self.__monitor)
+
+  def __monitor(self):
+    """Main body of the multiplexer thread.
+
+    This method repeatedly polls the worker queue for new calls, and then
+    dispatches them in batches to the scheduler.
+    Callers are notified when their requests complete."""
+
+    requests_by_command = defaultdict(list)
+    while not self.__terminating.is_set():
+      try:
+        work_item = self.__queue.get(timeout=self.__wait_timeout.as_(Time.SECONDS))
+        requests_by_command[work_item.command].append(work_item)
+      except Empty:
+        self.__call_and_notify(requests_by_command)
+        requests_by_command = defaultdict(list)
+
+  def __call_and_notify(self, requests_by_command):
+    """Batch executes scheduler requests and notifies on completion.
+
+    Takes a set of RPC requests grouped by command type, dispatches them to the scheduler,
+    and then waits for the batched calls to complete. When a call is completed, its callers
+    will be notified via the completion queue."""
+
+    for command, work_items in requests_by_command.items():
+      request = [item.data for item in work_items]
+      request = work_items[0].aggregator(request) if work_items[0].aggregator else request
+      result_status = self.OK_RESULT
+      result_data = None
+      try:
+        result_data = command(request)
+      except (self.Error, Exception) as e:
+        result_status = e
+
+      for work_item in work_items:
+        work_item.completion_queue.put((result_status, result_data))
+
+  def _enqueue(self, completion_queue, command, data, aggregator):
+    """Queues up a scheduler call for a delayed (batched) completion.
+
+    Arguments:
+    completion_queue -- completion queue to notify caller on completion.
+    command -- callback signature accepting a list of data.
+    data -- single request data object to be batched with other similar requests.
+    aggregator -- callback function for data aggregation.
+    """
+    self.__queue.put(self.WORK_ITEM(completion_queue, command, data, aggregator))
+
+  def terminate(self):
+    """Requests the SchedulerMux to terminate."""
+    self.__terminating.set()
+    self.__worker.join(timeout=self.DEFAULT_JOIN_TIMEOUT.as_(Time.SECONDS))
+
+  def enqueue_and_wait(self, command, data, aggregator=None, timeout=DEFAULT_RPC_TIMEOUT):
+    """Queues up the scheduler call and waits for completion.
+
+    Arguments:
+    command -- scheduler command to run.
+    data -- data to query scheduler for.
+    aggregator -- callback function for data aggregation.
+    timeout -- amount of time to wait for completion.
+
+    Returns the aggregated command call response. Response data decomposition is up to the caller.
+    """
+    try:
+      completion_queue = Queue()
+      self._enqueue(completion_queue, command, data, aggregator)
+      result = completion_queue.get(timeout=timeout.as_(Time.SECONDS))
+      result_status = result[0]
+      if result_status != self.OK_RESULT and not self.__terminating.is_set():
+        if isinstance(result_status, self.Error):
+          raise result_status
+        else:
+          raise self.Error('Unknown error: %s' % result_status)
+      return result[1]
+    except Empty:
+      raise self.Error('Failed to complete operation within %s' % timeout)

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/e1c0ade2/src/main/python/apache/aurora/client/api/sla.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/client/api/sla.py b/src/main/python/apache/aurora/client/api/sla.py
index 50befea..b9b6468 100644
--- a/src/main/python/apache/aurora/client/api/sla.py
+++ b/src/main/python/apache/aurora/client/api/sla.py
@@ -16,7 +16,9 @@ import math
 import time
 from collections import defaultdict, namedtuple
 
-from apache.aurora.client.base import DEFAULT_GROUPING, group_hosts, log_response
+from twitter.common import log
+
+from apache.aurora.client.base import DEFAULT_GROUPING, format_response, group_hosts
 from apache.aurora.common.aurora_job_key import AuroraJobKey
 
 from gen.apache.aurora.api.constants import LIVE_STATES
@@ -323,7 +325,7 @@ class Sla(object):
 
   def _get_tasks(self, task_query):
     resp = self._scheduler.getTasksWithoutConfigs(task_query)
-    log_response(resp)
+    log.info(format_response(resp))
     if resp.responseCode != ResponseCode.OK:
       return []
     return resp.result.scheduleStatusResult.tasks

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/e1c0ade2/src/main/python/apache/aurora/client/api/task_util.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/client/api/task_util.py b/src/main/python/apache/aurora/client/api/task_util.py
new file mode 100644
index 0000000..b5244ee
--- /dev/null
+++ b/src/main/python/apache/aurora/client/api/task_util.py
@@ -0,0 +1,101 @@
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+from itertools import chain
+
+from twitter.common import log
+
+from apache.aurora.client.base import format_response
+
+from .scheduler_mux import SchedulerMux
+
+from gen.apache.aurora.api.ttypes import ResponseCode
+
+
+class StatusMuxHelper(object):
+  """Handles mux/demux logic of the getTasksWithoutConfigs RPC."""
+
+  def __init__(self, scheduler, query_factory, scheduler_mux=None):
+    self._scheduler = scheduler
+    self._query_factory = query_factory
+    self._scheduler_mux = scheduler_mux
+
+  def get_tasks(self, instance_ids=None):
+    """Routes call to either immediate direct or multiplexed threaded execution.
+
+    Arguments:
+    instance_ids -- optional list of instance IDs to query for.
+
+    Returns a list of tasks.
+    """
+    log.debug('Querying instance statuses: %s' % instance_ids)
+
+    if self._scheduler_mux is not None:
+      return self._get_tasks_multiplexed(instance_ids)
+    else:
+      return self._get_tasks(self._query_factory(instance_ids))
+
+  def _get_tasks_multiplexed(self, instance_ids=None):
+    """Gets tasks via SchedulerMux.
+
+    Arguments:
+    instance_ids -- optional list of instance IDs to query for.
+
+    Returns a list of tasks.
+    """
+    tasks = []
+    include_ids = lambda id: id in instance_ids if instance_ids is not None else True
+
+    log.debug('Batch getting task status: %s' % instance_ids)
+    try:
+      unfiltered_tasks = self._scheduler_mux.enqueue_and_wait(
+        self._get_tasks,
+        instance_ids if instance_ids else [],
+        self._create_aggregated_query)
+      tasks = [task for task in unfiltered_tasks if include_ids(task.assignedTask.instanceId)]
+    except SchedulerMux.Error as e:
+      log.error('Failed to query status for instances %s. Reason: %s' % (instance_ids, e))
+
+    log.debug('Done batch getting task status: %s' % instance_ids)
+    return tasks
+
+  def _get_tasks(self, query):
+    """Gets tasks directly via SchedulerProxy.
+
+    Arguments:
+    query -- TaskQuery instance.
+
+    Returns a list of tasks.
+    """
+    try:
+      resp = self._scheduler.getTasksWithoutConfigs(query)
+    except IOError as e:
+      log.error('IO Exception during scheduler call: %s' % e)
+      return []
+
+    tasks = []
+    if resp.responseCode == ResponseCode.OK:
+      tasks = resp.result.scheduleStatusResult.tasks
+
+    log.debug(format_response(resp))
+    return tasks
+
+  def _create_aggregated_query(self, instance_id_lists):
+    """Aggregates multiple instance_id lists into a single list.
+
+    Arguments:
+    instance_id_lists -- list of lists of int.
+    """
+    instance_ids = list(chain.from_iterable(instance_id_lists))
+    log.debug('Aggregated instance ids to query status: %s' % instance_ids)
+    return self._query_factory(instance_ids)

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/e1c0ade2/src/main/python/apache/aurora/client/api/updater.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/client/api/updater.py b/src/main/python/apache/aurora/client/api/updater.py
index c592651..05b4c0c 100644
--- a/src/main/python/apache/aurora/client/api/updater.py
+++ b/src/main/python/apache/aurora/client/api/updater.py
@@ -13,17 +13,22 @@
 #
 
 import json
+import signal
 from collections import namedtuple
 from difflib import unified_diff
+from threading import Lock as threading_lock
 
 from thrift.protocol import TJSONProtocol
 from thrift.TSerialization import serialize
 from twitter.common import log
+from twitter.common.quantity import Amount, Time
 
+from .error_handling_thread import ExecutionError, spawn_worker
 from .instance_watcher import InstanceWatcher
 from .job_monitor import JobMonitor
 from .quota_check import CapacityRequest, QuotaCheck
 from .scheduler_client import SchedulerProxy
+from .scheduler_mux import SchedulerMux
 from .updater_util import FailureThreshold, UpdaterConfig
 
 from gen.apache.aurora.api.constants import ACTIVE_STATES
@@ -39,22 +44,29 @@ from gen.apache.aurora.api.ttypes import (
     TaskQuery
 )
 
-InstanceState = namedtuple('InstanceState', ['instance_id', 'is_updated'])
+try:
+  from Queue import Queue, Empty
+except ImportError:
+  from queue import Queue, Empty
 
 
-OperationConfigs = namedtuple('OperationConfigs', ['from_config', 'to_config'])
-
+class Updater(object):
+  """Performs an update command using a collection of parallel threads.
+  The number of parallel threads used is determined by the UpdateConfig.batch_size."""
 
-InstanceConfigs = namedtuple(
-    'InstanceConfigs',
-    ['remote_config_map', 'local_config_map', 'instances_to_process']
-)
+  class Error(Exception):
+    """Updater error wrapper."""
+    pass
 
+  RPC_COMPLETION_TIMEOUT_SECS = Amount(120, Time.SECONDS)
 
-class Updater(object):
-  """Update the instances of a job in batches."""
+  OPERATION_CONFIGS = namedtuple('OperationConfigs', ['from_config', 'to_config'])
+  INSTANCE_CONFIGS = namedtuple(
+      'InstanceConfigs',
+      ['remote_config_map', 'local_config_map', 'instances_to_process']
+  )
 
-  class Error(Exception): pass
+  INSTANCE_DATA = namedtuple('InstanceData', ['instance_id', 'operation_configs'])
 
   def __init__(self,
                config,
@@ -62,30 +74,43 @@ class Updater(object):
                scheduler=None,
                instance_watcher=None,
                quota_check=None,
-               job_monitor=None):
+               job_monitor=None,
+               scheduler_mux=None,
+               rpc_completion_timeout=RPC_COMPLETION_TIMEOUT_SECS):
     self._config = config
     self._job_key = JobKey(role=config.role(), environment=config.environment(), name=config.name())
     self._health_check_interval_seconds = health_check_interval_seconds
     self._scheduler = scheduler or SchedulerProxy(config.cluster())
     self._quota_check = quota_check or QuotaCheck(self._scheduler)
-    self._job_monitor = job_monitor or JobMonitor(self._scheduler, self._config.job_key())
+    self._scheduler_mux = scheduler_mux or SchedulerMux()
+    self._job_monitor = job_monitor or JobMonitor(
+        self._scheduler,
+        self._config.job_key(),
+        scheduler_mux=self._scheduler_mux)
+    self._rpc_completion_timeout = rpc_completion_timeout
     try:
       self._update_config = UpdaterConfig(**config.update_config().get())
     except ValueError as e:
       raise self.Error(str(e))
     self._lock = None
+    self._thread_lock = threading_lock()
+    self.failure_threshold = FailureThreshold(
+        self._update_config.max_per_instance_failures,
+        self._update_config.max_total_failures
+    )
     self._watcher = instance_watcher or InstanceWatcher(
         self._scheduler,
         self._job_key,
         self._update_config.restart_threshold,
         self._update_config.watch_secs,
-        self._health_check_interval_seconds)
+        self._health_check_interval_seconds,
+        scheduler_mux=self._scheduler_mux)
+    self._terminating = False
 
   def _start(self):
     """Starts an update by applying an exclusive lock on a job being updated.
 
-       Returns:
-         Response instance from the scheduler call.
+    Returns Response instance from the scheduler call.
     """
     resp = self._scheduler.acquireLock(LockKey(job=self._job_key))
     if resp.responseCode == ResponseCode.OK:
@@ -95,8 +120,7 @@ class Updater(object):
   def _finish(self):
     """Finishes an update by removing an exclusive lock on an updated job.
 
-       Returns:
-         Response instance from the scheduler call.
+    Returns Response instance from the scheduler call.
     """
     resp = self._scheduler.releaseLock(self._lock, LockValidation.CHECKED)
 
@@ -106,96 +130,261 @@ class Updater(object):
       log.error('There was an error finalizing the update: %s' % resp.messageDEPRECATED)
     return resp
 
+  def int_handler(self, *args):
+    """Ensures keyboard interrupt exception is raised on a main thread."""
+    raise KeyboardInterrupt()
+
   def _update(self, instance_configs):
-    """Drives execution of the update logic. Performs a batched update/rollback for all instances
-    affected by the current update request.
+    """Drives execution of the update logic.
+
+    Performs instance updates in parallel using a number of threads bound by
+    the batch_size config option.
 
     Arguments:
     instance_configs -- list of instance update configurations to go through.
 
     Returns the set of instances that failed to update.
     """
-    failure_threshold = FailureThreshold(
-        self._update_config.max_per_instance_failures,
-        self._update_config.max_total_failures
-    )
+    # Register signal handler to ensure KeyboardInterrupt is received by a main thread.
+    signal.signal(signal.SIGINT, self.int_handler)
+
+    instances_to_update = [
+      self.INSTANCE_DATA(
+        instance_id,
+        self.OPERATION_CONFIGS(
+          from_config=instance_configs.remote_config_map,
+          to_config=instance_configs.local_config_map))
+      for instance_id in instance_configs.instances_to_process
+    ]
 
-    instance_operation = OperationConfigs(
-      from_config=instance_configs.remote_config_map,
-      to_config=instance_configs.local_config_map
-    )
+    log.info('Instances to update: %s' % instance_configs.instances_to_process)
+    update_queue = self._update_instances_in_parallel(self._update_instance, instances_to_update)
+
+    if self._is_failed_update(quiet=False):
+      if not self._update_config.rollback_on_failure:
+        log.info('Rollback on failure is disabled in config. Aborting rollback')
+        return
+
+      rollback_ids = self._get_rollback_ids(instance_configs.instances_to_process, update_queue)
+      instances_to_revert = [
+          self.INSTANCE_DATA(
+              instance_id,
+              self.OPERATION_CONFIGS(
+                  from_config=instance_configs.local_config_map,
+                  to_config=instance_configs.remote_config_map))
+          for instance_id in rollback_ids
+      ]
 
-    remaining_instances = [
-        InstanceState(instance_id, is_updated=False)
-        for instance_id in instance_configs.instances_to_process
-    ]
+      log.info('Reverting update for: %s' % rollback_ids)
+      self._update_instances_in_parallel(self._revert_instance, instances_to_revert)
+
+    return not self._is_failed_update()
+
+  def _update_instances_in_parallel(self, target, instances_to_update):
+    """Processes instance updates in parallel and waits for completion.
+
+    Arguments:
+    target -- target method to handle instance update.
+    instances_to_update -- list of InstanceData with update details.
+
+    Returns Queue with non-updated instance data.
+    """
+    log.info('Processing in parallel with %s worker thread(s)' % self._update_config.batch_size)
+    instance_queue = Queue()
+    for instance_to_update in instances_to_update:
+      instance_queue.put(instance_to_update)
+
+    try:
+      threads = []
+      for _ in range(self._update_config.batch_size):
+        threads.append(spawn_worker(target, kwargs={'instance_queue': instance_queue}))
+
+      for thread in threads:
+        thread.join_and_raise()
+    except Exception:
+      self._terminate()
+      raise
+
+    return instance_queue
+
+  def _terminate(self):
+    """Attempts to terminate all outstanding activities."""
+    if not self._terminating:
+      log.info('Cleaning up')
+      self._terminating = True
+      self._scheduler.terminate()
+      self._job_monitor.terminate()
+      self._scheduler_mux.terminate()
+      self._watcher.terminate()
+
+  def _update_instance(self, instance_queue):
+    """Works through the instance_queue and performs instance updates (one at a time).
+
+    Arguments:
+    instance_queue -- Queue of InstanceData to update.
+    """
+    while not self._terminating and not self._is_failed_update():
+      try:
+        instance_data = instance_queue.get_nowait()
+      except Empty:
+        return
+
+      update = True
+      restart = False
+      while update or restart and not self._terminating and not self._is_failed_update():
+        instances_to_watch = []
+        if update:
+          instances_to_watch += self._kill_and_add_instance(instance_data)
+          update = False
+        else:
+          instances_to_watch += self._request_restart_instance(instance_data)
+
+        if instances_to_watch:
+          failed_instances = self._watcher.watch(instances_to_watch)
+          restart = self._is_restart_needed(failed_instances)
+
+  def _revert_instance(self, instance_queue):
+    """Works through the instance_queue and performs instance rollbacks (one at a time).
+
+    Arguments:
+    instance_queue -- Queue of InstanceData to revert.
+    """
+    while not self._terminating:
+      try:
+        instance_data = instance_queue.get_nowait()
+      except Empty:
+        return
+
+      log.info('Reverting instance: %s' % instance_data.instance_id)
+      instances_to_watch = self._kill_and_add_instance(instance_data)
+      if instances_to_watch and self._watcher.watch(instances_to_watch):
+        log.error('Rollback failed for instance: %s' % instance_data.instance_id)
+
+  def _kill_and_add_instance(self, instance_data):
+    """Acquires update instructions and performs required kill/add/kill+add sequence.
+
+    Arguments:
+    instance_data -- InstanceData to update.
+
+    Returns added instance ID.
+    """
+    log.info('Examining instance: %s' % instance_data.instance_id)
+    to_kill, to_add = self._create_kill_add_lists(
+        [instance_data.instance_id],
+        instance_data.operation_configs)
+    if not to_kill and not to_add:
+      log.info('Skipping unchanged instance: %s' % instance_data.instance_id)
+      return to_add
+
+    if to_kill:
+      self._request_kill_instance(instance_data)
+    if to_add:
+      self._request_add_instance(instance_data)
+
+    return to_add
+
+  def _request_kill_instance(self, instance_data):
+    """Instructs the scheduler to kill instance and waits for completion.
 
-    log.info('Starting job update.')
-    while remaining_instances and not failure_threshold.is_failed_update():
-      batch_instances = remaining_instances[0:self._update_config.batch_size]
-      remaining_instances = list(set(remaining_instances) - set(batch_instances))
-      instances_to_restart = [s.instance_id for s in batch_instances if s.is_updated]
-      instances_to_update = [s.instance_id for s in batch_instances if not s.is_updated]
+    Arguments:
+    instance_data -- InstanceData to kill.
+    """
+    log.info('Killing instance: %s' % instance_data.instance_id)
+    self._enqueue_and_wait(instance_data, self._kill_instances)
+    result = self._job_monitor.wait_until(
+        JobMonitor.terminal,
+        [instance_data.instance_id],
+        with_timeout=True)
+
+    if not result:
+      raise self.Error('Instance %s was not killed in time' % instance_data.instance_id)
+    log.info('Killed: %s' % instance_data.instance_id)
+
+  def _request_add_instance(self, instance_data):
+    """Instructs the scheduler to add instance.
+
+    Arguments:
+    instance_data -- InstanceData to add.
+    """
+    log.info('Adding instance: %s' % instance_data.instance_id)
+    self._enqueue_and_wait(instance_data, self._add_instances)
+    log.info('Added: %s' % instance_data.instance_id)
+
+  def _request_restart_instance(self, instance_data):
+    """Instructs the scheduler to restart instance.
+
+    Arguments:
+    instance_data -- InstanceData to restart.
+
+    Returns restarted instance ID.
+    """
+    log.info('Restarting instance: %s' % instance_data.instance_id)
+    self._enqueue_and_wait(instance_data, self._restart_instances)
+    log.info('Restarted: %s' % instance_data.instance_id)
+    return [instance_data.instance_id]
 
-      instances_to_watch = []
-      if instances_to_restart:
-        instances_to_watch += self._restart_instances(instances_to_restart)
+  def _enqueue_and_wait(self, instance_data, command):
+    """Queues up the scheduler call and waits for completion.
 
-      if instances_to_update:
-        instances_to_watch += self._update_instances(instances_to_update, instance_operation)
+    Arguments:
+    instance_data -- InstanceData to query scheduler for.
+    command -- scheduler command to run.
+    """
+    try:
+      self._scheduler_mux.enqueue_and_wait(
+          command,
+          instance_data,
+          timeout=self._rpc_completion_timeout)
+    except SchedulerMux.Error as e:
+      raise self.Error('Failed to complete instance %s operation. Reason: %s'
+          % (instance_data.instance_id, e))
+
+  def _is_failed_update(self, quiet=True):
+    """Verifies the update status in a thread-safe manner.
+
+    Arguments:
+    quiet -- Whether the logging should be suppressed in case of a failed update. Default True.
 
-      failed_instances = self._watcher.watch(instances_to_watch) if instances_to_watch else set()
+    Returns True if update failed, False otherwise.
+    """
+    with self._thread_lock:
+      return self.failure_threshold.is_failed_update(log_errors=not quiet)
+
+  def _is_restart_needed(self, failed_instances):
+    """Checks if there are any failed instances recoverable via restart.
+
+    Arguments:
+    failed_instances -- Failed instance IDs.
 
-      if failed_instances:
-        log.error('Failed instances: %s' % failed_instances)
+    Returns True if restart is allowed, False otherwise (i.e. update failed).
+    """
+    if not failed_instances:
+      return False
 
-      unretryable_instances = failure_threshold.update_failure_counts(failed_instances)
+    log.info('Failed instances: %s' % failed_instances)
+
+    with self._thread_lock:
+      unretryable_instances = self.failure_threshold.update_failure_counts(failed_instances)
       if unretryable_instances:
         log.warn('Not restarting failed instances %s, which exceeded '
                  'maximum allowed instance failure limit of %s' %
                  (unretryable_instances, self._update_config.max_per_instance_failures))
-      retryable_instances = list(set(failed_instances) - set(unretryable_instances))
-      remaining_instances += [
-          InstanceState(instance_id, is_updated=True) for instance_id in retryable_instances
-      ]
-      remaining_instances.sort(key=lambda tup: tup.instance_id)
-
-    if failure_threshold.is_failed_update():
-      untouched_instances = [s.instance_id for s in remaining_instances if not s.is_updated]
-      instances_to_rollback = list(
-          set(instance_configs.instances_to_process) - set(untouched_instances)
-      )
-      self._rollback(instances_to_rollback, instance_configs)
-
-    return not failure_threshold.is_failed_update()
+      return False if unretryable_instances else True
 
-  def _rollback(self, instances_to_rollback, instance_configs):
-    """Performs a rollback operation for the failed instances.
+  def _get_rollback_ids(self, update_list, update_queue):
+    """Gets a list of instance ids to rollback.
 
     Arguments:
-    instances_to_rollback -- instance ids to rollback.
-    instance_configs -- instance configuration to use for rollback.
+    update_list -- original list of instances intended for update.
+    update_queue -- untouched instances not processed during update.
+
+    Returns sorted list of instance IDs to rollback.
     """
-    if not self._update_config.rollback_on_failure:
-      log.info('Rollback on failure is disabled in config. Aborting rollback')
-      return
-
-    log.info('Reverting update for %s' % instances_to_rollback)
-    instance_operation = OperationConfigs(
-        from_config=instance_configs.local_config_map,
-        to_config=instance_configs.remote_config_map
-    )
-    instances_to_rollback.sort(reverse=True)
-    failed_instances = []
-    while instances_to_rollback:
-      batch_instances = instances_to_rollback[0:self._update_config.batch_size]
-      instances_to_rollback = list(set(instances_to_rollback) - set(batch_instances))
-      instances_to_rollback.sort(reverse=True)
-      instances_to_watch = self._update_instances(batch_instances, instance_operation)
-      failed_instances += self._watcher.watch(instances_to_watch)
-
-    if failed_instances:
-      log.error('Rollback failed for instances: %s' % failed_instances)
+    untouched_ids = []
+    while not update_queue.empty():
+      untouched_ids.append(update_queue.get_nowait().instance_id)
+
+    return sorted(list(set(update_list) - set(untouched_ids)), reverse=True)
 
   def _hashable(self, element):
     if isinstance(element, (list, set)):
@@ -240,8 +429,8 @@ class Updater(object):
       if from_config and to_config:
         diff_output = self._diff_configs(from_config, to_config)
         if diff_output:
-          log.debug('Task configuration changed for instance [%s]:\n%s' % (
-              instance_id, diff_output))
+          log.debug('Task configuration changed for instance [%s]:\n%s'
+                    % (instance_id, diff_output))
           to_kill.append(instance_id)
           to_add.append(instance_id)
       elif from_config and not to_config:
@@ -253,68 +442,46 @@ class Updater(object):
 
     return to_kill, to_add
 
-  def _update_instances(self, instance_ids, operation_configs):
-    """Applies kill/add actions for the specified batch instances.
+  def _kill_instances(self, instance_data):
+    """Instructs the scheduler to batch-kill instances and waits for completion.
 
     Arguments:
-    instance_ids -- current batch of IDs to process.
-    operation_configs -- OperationConfigs with update details.
-
-    Returns a list of added instances.
+    instance_data -- list of InstanceData to kill.
     """
-    log.info('Examining instances: %s' % instance_ids)
-
-    to_kill, to_add = self._create_kill_add_lists(instance_ids, operation_configs)
+    instance_ids = [data.instance_id for data in instance_data]
+    log.debug('Batch killing instances: %s' % instance_ids)
+    query = self._create_task_query(instanceIds=frozenset(int(s) for s in instance_ids))
+    self._check_and_log_response(self._scheduler.killTasks(query, self._lock))
+    log.debug('Done batch killing instances: %s' % instance_ids)
 
-    unchanged = list(set(instance_ids) - set(to_kill + to_add))
-    if unchanged:
-      log.info('Skipping unchanged instances: %s' % unchanged)
-
-    self._kill_instances(to_kill)
-    self._add_instances(to_add, operation_configs.to_config)
-    return to_add
-
-  def _kill_instances(self, instance_ids):
-    """Instructs the scheduler to kill instances and waits for completion.
+  def _add_instances(self, instance_data):
+    """Instructs the scheduler to batch-add instances.
 
     Arguments:
-    instance_ids -- list of IDs to kill.
+    instance_data -- list of InstanceData to add.
     """
-    if instance_ids:
-      log.info('Killing instances: %s' % instance_ids)
-      query = self._create_task_query(instanceIds=frozenset(int(s) for s in instance_ids))
-      self._check_and_log_response(self._scheduler.killTasks(query, self._lock))
-      res = self._job_monitor.wait_until(JobMonitor.terminal, instance_ids, with_timeout=True)
-      if not res:
-        raise self.Error('Tasks were not killed in time.')
-      log.info('Instances killed')
-
-  def _add_instances(self, instance_ids, to_config):
-    """Instructs the scheduler to add instances.
+    instance_ids = [data.instance_id for data in instance_data]
+    to_config = instance_data[0].operation_configs.to_config
 
-    Arguments:
-    instance_ids -- list of IDs to add.
-    to_config -- OperationConfigs with update details.
-    """
-    if instance_ids:
-      log.info('Adding instances: %s' % instance_ids)
-      add_config = AddInstancesConfig(
-          key=self._job_key,
-          taskConfig=to_config[instance_ids[0]],  # instance_ids will always have at least 1 item.
-          instanceIds=frozenset(int(s) for s in instance_ids))
-      self._check_and_log_response(self._scheduler.addInstances(add_config, self._lock))
-      log.info('Instances added')
-
-  def _restart_instances(self, instance_ids):
-    """Instructs the scheduler to restart instances.
+    log.debug('Batch adding instances: %s' % instance_ids)
+    add_config = AddInstancesConfig(
+        key=self._job_key,
+        taskConfig=to_config[instance_ids[0]],  # instance_ids will always have at least 1 item.
+        instanceIds=frozenset(int(s) for s in instance_ids))
+    self._check_and_log_response(self._scheduler.addInstances(add_config, self._lock))
+    log.debug('Done batch adding instances: %s' % instance_ids)
+
+  def _restart_instances(self, instance_data):
+    """Instructs the scheduler to batch-restart instances.
 
     Arguments:
-    instance_ids -- set of instances to be restarted by the scheduler.
+    instance_data -- list of InstanceData to restart.
     """
-    log.info('Restarting instances: %s' % instance_ids)
+    instance_ids = [data.instance_id for data in instance_data]
+    log.debug('Batch restarting instances: %s' % instance_ids)
     resp = self._scheduler.restartShards(self._job_key, instance_ids, self._lock)
     self._check_and_log_response(resp)
-    return instance_ids
+    log.debug('Done batch restarting instances: %s' % instance_ids)
 
   def _validate_quota(self, instance_configs):
     """Validates job update will not exceed quota for production tasks.
@@ -323,7 +490,7 @@ class Updater(object):
 
     Returns Response.OK if quota check was successful.
     """
-    instance_operation = OperationConfigs(
+    instance_operation = self.OPERATION_CONFIGS(
       from_config=instance_configs.remote_config_map,
       to_config=instance_configs.local_config_map
     )
@@ -389,7 +556,7 @@ class Updater(object):
     # Populate local config map
     local_config_map = dict.fromkeys(job_config_instances, local_task_config)
 
-    return InstanceConfigs(remote_config_map, local_config_map, instances_to_process)
+    return self.INSTANCE_CONFIGS(remote_config_map, local_config_map, instances_to_process)
 
   def _get_existing_tasks(self):
     """Loads all existing tasks from the scheduler.
@@ -436,6 +603,7 @@ class Updater(object):
 
   def update(self, instances=None):
     """Performs the job update, blocking until it completes.
+
     A rollback will be performed if the update was considered a failure based on the
     update configuration.
 
@@ -444,33 +612,36 @@ class Updater(object):
 
     Returns a response object with update result status.
     """
-    resp = self._start()
-    if resp.responseCode != ResponseCode.OK:
-      return resp
-
     try:
-      # Handle cron jobs separately from other jobs.
-      if self._replace_template_if_cron():
-        log.info('Cron template updated, next run will reflect changes')
-        return self._finish()
-      else:
-        try:
-          instance_configs = self._get_update_instructions(instances)
-          self._check_and_log_response(self._validate_quota(instance_configs))
-        except self.Error as e:
-          # Safe to release the lock acquired above as no job mutation has happened yet.
-          self._finish()
-          return self._failed_response('Unable to start job update: %s' % e)
-
-        if not self._update(instance_configs):
-          log.warn('Update failures threshold reached')
-          self._finish()
-          return self._failed_response('Update reverted')
-        else:
-          log.info('Update successful')
+      resp = self._start()
+      if resp.responseCode != ResponseCode.OK:
+        return resp
+
+      try:
+        # Handle cron jobs separately from other jobs.
+        if self._replace_template_if_cron():
+          log.info('Cron template updated, next run will reflect changes')
           return self._finish()
-    except self.Error as e:
-      return self._failed_response('Aborting update without rollback! Fatal error: %s' % e)
+        else:
+          try:
+            instance_configs = self._get_update_instructions(instances)
+            self._check_and_log_response(self._validate_quota(instance_configs))
+          except self.Error as e:
+            # Safe to release the lock acquired above as no job mutation has happened yet.
+            self._finish()
+            return self._failed_response('Unable to start job update: %s' % e)
+
+          if not self._update(instance_configs):
+            log.warn('Update failures threshold reached')
+            self._finish()
+            return self._failed_response('Update reverted')
+          else:
+            log.info('Update successful')
+            return self._finish()
+      except (self.Error, ExecutionError, Exception) as e:
+        return self._failed_response('Aborting update without rollback! Fatal error: %s' % e)
+    finally:
+      self._scheduler_mux.terminate()
 
   @classmethod
   def cancel_update(cls, scheduler, job_key):

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/e1c0ade2/src/main/python/apache/aurora/client/api/updater_util.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/client/api/updater_util.py b/src/main/python/apache/aurora/client/api/updater_util.py
index 6b689c1..c5f8f23 100644
--- a/src/main/python/apache/aurora/client/api/updater_util.py
+++ b/src/main/python/apache/aurora/client/api/updater_util.py
@@ -75,11 +75,11 @@ class FailureThreshold(object):
 
     return exceeded_failure_count_instances
 
-  def is_failed_update(self):
+  def is_failed_update(self, log_errors=True):
     total_failed_instances = self._exceeded_instance_fail_count()
     is_failed = total_failed_instances > self._max_total_failures
 
-    if is_failed:
+    if is_failed and log_errors:
       log.error('%s failed instances observed, maximum allowed is %s' % (total_failed_instances,
           self._max_total_failures))
       for instance, failure_count in self._failures_by_instance.items():

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/e1c0ade2/src/main/python/apache/aurora/client/base.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/client/base.py b/src/main/python/apache/aurora/client/base.py
index 663a247..0c8e97e 100644
--- a/src/main/python/apache/aurora/client/base.py
+++ b/src/main/python/apache/aurora/client/base.py
@@ -40,13 +40,13 @@ def die(msg):
   sys.exit(1)
 
 
-def log_response(resp):
-  log.info('Response from scheduler: %s (message: %s)'
-      % (ResponseCode._VALUES_TO_NAMES[resp.responseCode], resp.messageDEPRECATED))
+def format_response(resp):
+  return 'Response from scheduler: %s (message: %s)' % (
+    ResponseCode._VALUES_TO_NAMES[resp.responseCode], resp.messageDEPRECATED)
 
 
 def check_and_log_response(resp):
-  log_response(resp)
+  log.info(format_response(resp))
   if resp.responseCode != ResponseCode.OK:
     if resp.responseCode == ResponseCode.LOCK_ERROR:
       log.info(LOCKED_WARNING)

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/e1c0ade2/src/test/python/apache/aurora/client/api/BUILD
----------------------------------------------------------------------
diff --git a/src/test/python/apache/aurora/client/api/BUILD b/src/test/python/apache/aurora/client/api/BUILD
index 804195b..db5c223 100644
--- a/src/test/python/apache/aurora/client/api/BUILD
+++ b/src/test/python/apache/aurora/client/api/BUILD
@@ -22,6 +22,7 @@ python_test_suite(name = 'all',
     pants(':updater'),
     pants(':quota_check'),
     pants(':sla'),
+    pants(':mux')
   ],
 )
 
@@ -94,6 +95,22 @@ python_tests(name = 'sla',
   ]
 )
 
+python_tests(name = 'mux',
+  sources = ['test_scheduler_mux.py'],
+  dependencies = [
+    pants('src/main/python/apache/aurora/client/api:scheduler_mux'),
+  ]
+)
+
+python_tests(name = 'task_util',
+  sources = ['test_task_util.py'],
+  dependencies = [
+    pants('3rdparty/python:mock'),
+    pants('src/main/python/apache/aurora/client/api:task_util'),
+    pants('src/main/thrift/org/apache/aurora/gen:py-thrift'),
+  ]
+)
+
 python_tests(name = 'updater',
   sources = ['test_updater.py'],
   dependencies = [

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/e1c0ade2/src/test/python/apache/aurora/client/api/test_instance_watcher.py
----------------------------------------------------------------------
diff --git a/src/test/python/apache/aurora/client/api/test_instance_watcher.py b/src/test/python/apache/aurora/client/api/test_instance_watcher.py
index 93b43e8..ae1b24b 100644
--- a/src/test/python/apache/aurora/client/api/test_instance_watcher.py
+++ b/src/test/python/apache/aurora/client/api/test_instance_watcher.py
@@ -47,6 +47,21 @@ class FakeClock(object):
     self._now_seconds += seconds
 
 
+class FakeEvent(object):
+  def __init__(self, clock):
+    self._clock = clock
+    self._is_set = False
+
+  def wait(self, seconds):
+    self._clock.sleep(seconds)
+
+  def is_set(self):
+    return self._is_set
+
+  def set(self):
+    self._is_set = True
+
+
 def find_expected_cycles(period, sleep_secs):
   return ceil(period / sleep_secs) + 1
 
@@ -61,6 +76,7 @@ class InstanceWatcherTest(unittest.TestCase):
     self._env = 'test'
     self._name = 'jimbob'
     self._clock = FakeClock()
+    self._event = FakeEvent(self._clock)
     self._scheduler = mox.MockObject(scheduler_client)
     job_key = JobKey(name=self._name, environment=self._env, role=self._role)
     self._health_check = mox.MockObject(HealthCheck)
@@ -69,7 +85,8 @@ class InstanceWatcherTest(unittest.TestCase):
                                  self.RESTART_THRESHOLD,
                                  self.WATCH_SECS,
                                  health_check_interval_seconds=3,
-                                 clock=self._clock)
+                                 clock=self._clock,
+                                 terminating_event=self._event)
 
   def get_tasks_status_query(self, instance_ids):
     query = TaskQuery()
@@ -212,3 +229,11 @@ class InstanceWatcherTest(unittest.TestCase):
     self.replay_mocks()
     self.assert_watch_result([2])
     self.verify_mocks()
+
+  def test_terminated_exits_immediately(self):
+    """Terminated instance watched should bail out immediately."""
+    self.replay_mocks()
+    self._watcher.terminate()
+    result = self._watcher.watch([], self._health_check)
+    assert result is None, ('Expected instances None : Returned instances (%s)' % result)
+    self.verify_mocks()

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/e1c0ade2/src/test/python/apache/aurora/client/api/test_job_monitor.py
----------------------------------------------------------------------
diff --git a/src/test/python/apache/aurora/client/api/test_job_monitor.py b/src/test/python/apache/aurora/client/api/test_job_monitor.py
index 3cc876f..5b26539 100644
--- a/src/test/python/apache/aurora/client/api/test_job_monitor.py
+++ b/src/test/python/apache/aurora/client/api/test_job_monitor.py
@@ -32,17 +32,26 @@ from gen.apache.aurora.api.ttypes import (
 )
 
 
-class FakeClock(object):
-  def sleep(self, seconds):
+class FakeEvent(object):
+  def __init__(self):
+    self._is_set = False
+
+  def wait(self, seconds):
     pass
 
+  def is_set(self):
+    return self._is_set
+
+  def set(self):
+    self._is_set = True
+
 
 class JobMonitorTest(unittest.TestCase):
 
   def setUp(self):
     self._scheduler = Mock()
     self._job_key = AuroraJobKey('cl', 'johndoe', 'test', 'test_job')
-    self._clock = FakeClock()
+    self._event = FakeEvent()
 
   def create_task(self, status, id):
     return ScheduledTask(
@@ -108,6 +117,11 @@ class JobMonitorTest(unittest.TestCase):
         self.create_task(ScheduleStatus.RUNNING, '3'),
     ])
 
-    monitor = JobMonitor(self._scheduler, self._job_key, clock=self._clock)
+    monitor = JobMonitor(self._scheduler, self._job_key, terminating_event=self._event)
     assert not monitor.wait_until(monitor.terminal, with_timeout=True)
     self.expect_task_status()
+
+  def test_terminated_exits_immediately(self):
+    self._event.set()
+    monitor = JobMonitor(self._scheduler, self._job_key, terminating_event=self._event)
+    assert monitor.wait_until(monitor.terminal)

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/e1c0ade2/src/test/python/apache/aurora/client/api/test_scheduler_mux.py
----------------------------------------------------------------------
diff --git a/src/test/python/apache/aurora/client/api/test_scheduler_mux.py b/src/test/python/apache/aurora/client/api/test_scheduler_mux.py
new file mode 100644
index 0000000..021175c
--- /dev/null
+++ b/src/test/python/apache/aurora/client/api/test_scheduler_mux.py
@@ -0,0 +1,72 @@
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+import time
+import unittest
+
+from twitter.common.quantity import Amount, Time
+
+from apache.aurora.client.api.scheduler_mux import SchedulerMux
+
+
+class SchedulerMuxTest(unittest.TestCase):
+
+  DATA = [1, 2, 3]
+  MUX = None
+
+  @classmethod
+  def setUpClass(cls):
+    cls.MUX = SchedulerMux(wait_timeout=Amount(10, Time.MILLISECONDS))
+
+  @classmethod
+  def tearDownClass(cls):
+    cls.MUX.terminate()
+
+  @classmethod
+  def error_command(cls, data):
+    raise SchedulerMux.Error('expected')
+
+  @classmethod
+  def unknown_error_command(cls, data):
+    raise Exception('expected')
+
+  @classmethod
+  def timeout_command(cls, data):
+    time.sleep(2)
+
+  def test_success(self):
+    assert [self.DATA] == self.MUX.enqueue_and_wait(lambda d: d, self.DATA)
+
+  def test_failure(self):
+    try:
+      self.MUX.enqueue_and_wait(self.error_command, self.DATA)
+    except SchedulerMux.Error as e:
+      assert 'expected' in e.message
+    else:
+      self.fail()
+
+  def test_unknown_failure(self):
+    try:
+      self.MUX.enqueue_and_wait(self.unknown_error_command, self.DATA)
+    except SchedulerMux.Error as e:
+      assert 'Unknown error' in e.message
+    else:
+      self.fail()
+
+  def test_timeout(self):
+    try:
+      self.MUX.enqueue_and_wait(self.timeout_command, self.DATA, timeout=Amount(1, Time.SECONDS))
+    except SchedulerMux.Error as e:
+      'Failed to complete operation' in e.message
+    else:
+      self.fail()

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/e1c0ade2/src/test/python/apache/aurora/client/api/test_task_util.py
----------------------------------------------------------------------
diff --git a/src/test/python/apache/aurora/client/api/test_task_util.py b/src/test/python/apache/aurora/client/api/test_task_util.py
new file mode 100644
index 0000000..582c708
--- /dev/null
+++ b/src/test/python/apache/aurora/client/api/test_task_util.py
@@ -0,0 +1,83 @@
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import unittest
+
+from mock import Mock
+
+from apache.aurora.client.api.scheduler_mux import SchedulerMux
+from apache.aurora.client.api.task_util import StatusMuxHelper
+
+from gen.apache.aurora.api.ttypes import (
+    AssignedTask,
+    Response,
+    ResponseCode,
+    Result,
+    ScheduledTask,
+    ScheduleStatusResult,
+    TaskQuery
+)
+
+
+class TaskUtilTest(unittest.TestCase):
+  INSTANCES = [1]
+
+  @classmethod
+  def create_query(cls, instances):
+    query = TaskQuery()
+    query.instanceIds = set(instances)
+    return query
+
+  @classmethod
+  def create_mux_helper(cls, scheduler, query, scheduler_mux=None):
+    return StatusMuxHelper(scheduler, query, scheduler_mux=scheduler_mux)
+
+  @classmethod
+  def create_tasks(cls):
+    return [ScheduledTask(assignedTask=AssignedTask(instanceId=index)) for index in cls.INSTANCES]
+
+  @classmethod
+  def mock_mux(cls, tasks):
+    mux = Mock(spec=SchedulerMux)
+    mux.enqueue_and_wait.return_value = tasks
+    return mux
+
+  @classmethod
+  def mock_scheduler(cls, response_code=None):
+    scheduler = Mock()
+    response_code = ResponseCode.OK if response_code is None else response_code
+    resp = Response(responseCode=response_code, messageDEPRECATED='test')
+    resp.result = Result(scheduleStatusResult=ScheduleStatusResult(tasks=cls.create_tasks()))
+    scheduler.getTasksWithoutConfigs.return_value = resp
+    return scheduler
+
+  def test_no_mux_run(self):
+    scheduler = self.mock_scheduler()
+    helper = self.create_mux_helper(scheduler, self.create_query)
+    tasks = helper.get_tasks(self.INSTANCES)
+
+    scheduler.getTasksWithoutConfigs.assert_called_once_with(self.create_query(self.INSTANCES))
+    assert 1 == len(tasks)
+
+  def test_mux_run(self):
+    expected_tasks = self.create_tasks()
+    mux = self.mock_mux(expected_tasks)
+    helper = self.create_mux_helper(None, self.create_query, scheduler_mux=mux)
+    tasks = helper.get_tasks(self.INSTANCES)
+
+    mux.enqueue_and_wait.assert_called_once_with(
+        helper._get_tasks,
+        self.INSTANCES,
+        helper._create_aggregated_query)
+    assert 1 == len(tasks)

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/e1c0ade2/src/test/python/apache/aurora/client/api/test_updater.py
----------------------------------------------------------------------
diff --git a/src/test/python/apache/aurora/client/api/test_updater.py b/src/test/python/apache/aurora/client/api/test_updater.py
index 48f82c8..7020712 100644
--- a/src/test/python/apache/aurora/client/api/test_updater.py
+++ b/src/test/python/apache/aurora/client/api/test_updater.py
@@ -22,6 +22,7 @@ from pytest import raises
 from apache.aurora.client.api.instance_watcher import InstanceWatcher
 from apache.aurora.client.api.job_monitor import JobMonitor
 from apache.aurora.client.api.quota_check import CapacityRequest, QuotaCheck
+from apache.aurora.client.api.scheduler_mux import SchedulerMux
 from apache.aurora.client.api.updater import Updater
 from apache.aurora.client.fake_scheduler_proxy import FakeSchedulerProxy
 from apache.aurora.common.aurora_job_key import AuroraJobKey
@@ -38,6 +39,7 @@ from gen.apache.aurora.api.ttypes import (
     JobConfiguration,
     JobKey,
     LimitConstraint,
+    Lock,
     LockKey,
     LockValidation,
     Metadata,
@@ -102,9 +104,25 @@ class FakeConfig(object):
     return self.job_config.instanceCount
 
 
+class FakeSchedulerMux(object):
+  def __init__(self):
+    self._raise_error = False
+
+  def enqueue_and_wait(self, command, data, timeout=None):
+    command([data])
+    if self._raise_error:
+      raise SchedulerMux.Error("expected")
+
+  def terminate(self):
+    pass
+
+  def raise_error(self):
+    self._raise_error = True
+
+
 class UpdaterTest(TestCase):
   UPDATE_CONFIG = {
-    'batch_size': 3,
+    'batch_size': 1,
     'restart_threshold': 50,
     'watch_secs': 50,
     'max_per_shard_failures': 0,
@@ -112,6 +130,7 @@ class UpdaterTest(TestCase):
     'rollback_on_failure': True,
   }
 
+
   def setUp(self):
     self._role = 'mesos'
     self._name = 'jimbob'
@@ -121,6 +140,7 @@ class UpdaterTest(TestCase):
     self._lock = 'test_lock'
     self._instance_watcher = MockObject(InstanceWatcher)
     self._job_monitor = MockObject(JobMonitor)
+    self._scheduler_mux = FakeSchedulerMux()
     self._scheduler = MockObject(scheduler_client)
     self._scheduler_proxy = FakeSchedulerProxy('test-cluster', self._scheduler, self._session_key)
     self._quota_check = MockObject(QuotaCheck)
@@ -149,10 +169,17 @@ class UpdaterTest(TestCase):
         self._scheduler_proxy,
         self._instance_watcher,
         self._quota_check,
-        self._job_monitor)
+        self._job_monitor,
+        self._scheduler_mux)
+
+  def expect_terminate(self):
+    self._job_monitor.terminate()
+    self._instance_watcher.terminate()
 
   def expect_watch_instances(self, instance_ids, failed_instances=[]):
-    self._instance_watcher.watch(instance_ids).AndReturn(set(failed_instances))
+    for i in instance_ids:
+      failed = [i] if i in failed_instances else []
+      self._instance_watcher.watch(instance_ids).AndReturn(set(failed))
 
   def expect_populate(self, job_config, response_code=None):
     response_code = ResponseCode.OK if response_code is None else response_code
@@ -182,38 +209,60 @@ class UpdaterTest(TestCase):
     self._scheduler.replaceCronTemplate(job_config, self._lock, self._session_key).AndReturn(resp)
 
   def expect_restart(self, instance_ids, response_code=None):
-    response_code = ResponseCode.OK if response_code is None else response_code
-    response = Response(responseCode=response_code, messageDEPRECATED='test')
-    self._scheduler.restartShards(
-        self._job_key,
-        instance_ids,
-        self._lock,
-        self._session_key).AndReturn(response)
-
-  def expect_kill(self, instance_ids, response_code=None, monitor_result=True):
-    response_code = ResponseCode.OK if response_code is None else response_code
-    response = Response(responseCode=response_code, messageDEPRECATED='test')
-    query = TaskQuery(
-        owner=Identity(role=self._job_key.role),
-        environment=self._job_key.environment,
-        jobName=self._job_key.name,
-        statuses=ACTIVE_STATES,
-        instanceIds=frozenset([int(s) for s in instance_ids]))
-    self._scheduler.killTasks(query, self._lock, self._session_key).AndReturn(response)
-    if response_code != ResponseCode.OK:
+    for i in instance_ids:
+      response_code = ResponseCode.OK if response_code is None else response_code
+      response = Response(responseCode=response_code, messageDEPRECATED='test')
+      self._scheduler.restartShards(
+          self._job_key,
+          [i],
+          self._lock,
+          self._session_key).AndReturn(response)
+
+  def expect_kill(self, instance_ids, response_code=None, monitor_result=True, skip_monitor=False):
+    for i in instance_ids:
+      response_code = ResponseCode.OK if response_code is None else response_code
+      response = Response(responseCode=response_code, messageDEPRECATED='test')
+      query = TaskQuery(
+          owner=Identity(role=self._job_key.role),
+          environment=self._job_key.environment,
+          jobName=self._job_key.name,
+          statuses=ACTIVE_STATES,
+          instanceIds=frozenset([int(i)]))
+      self._scheduler.killTasks(query, self._lock, self._session_key).AndReturn(response)
+
+    self.expect_job_monitor(response_code, instance_ids, monitor_result, skip_monitor)
+
+  def expect_job_monitor(self, response_code, instance_ids, monitor_result=True, skip=False):
+    if skip or response_code != ResponseCode.OK:
       return
 
     self._job_monitor.wait_until(JobMonitor.terminal, instance_ids, with_timeout=True).AndReturn(
-        monitor_result)
+      monitor_result)
 
   def expect_add(self, instance_ids, task_config, response_code=None):
-    response_code = ResponseCode.OK if response_code is None else response_code
-    response = Response(responseCode=response_code, messageDEPRECATED='test')
-    add_config = AddInstancesConfig(
-        key=self._job_key,
-        taskConfig=task_config,
-        instanceIds=frozenset([int(s) for s in instance_ids]))
-    self._scheduler.addInstances(add_config, self._lock, self._session_key).AndReturn(response)
+    for i in instance_ids:
+      response_code = ResponseCode.OK if response_code is None else response_code
+      response = Response(responseCode=response_code, messageDEPRECATED='test')
+      add_config = AddInstancesConfig(
+          key=self._job_key,
+          taskConfig=task_config,
+          instanceIds=frozenset([int(i)]))
+      self._scheduler.addInstances(add_config, self._lock, self._session_key).AndReturn(response)
+
+  def expect_update_instances(self, instance_ids, task_config):
+    for i in instance_ids:
+      self.expect_kill([i])
+      self.expect_add([i], task_config)
+      self.expect_watch_instances([i])
+
+  def expect_add_instances(self, instance_ids, task_config):
+    for i in instance_ids:
+      self.expect_add([i], task_config)
+      self.expect_watch_instances([i])
+
+  def expect_kill_instances(self, instance_ids):
+    for i in instance_ids:
+      self.expect_kill([i])
 
   def expect_start(self, response_code=None):
     response_code = ResponseCode.OK if response_code is None else response_code
@@ -288,10 +337,7 @@ class UpdaterTest(TestCase):
     self.expect_get_tasks(old_configs)
     self.expect_populate(job_config)
     self.expect_quota_check(0, 4)
-    self.expect_add([3, 4, 5], new_config)
-    self.expect_watch_instances([3, 4, 5])
-    self.expect_add([6], new_config)
-    self.expect_watch_instances([6])
+    self.expect_add_instances([3, 4, 5, 6], new_config)
     self.expect_finish()
     self.replay_mocks()
 
@@ -343,8 +389,7 @@ class UpdaterTest(TestCase):
     self.expect_populate(job_config)
     self.expect_quota_check(1, 0, prod=False)
     self.expect_kill([0])
-    self.expect_add([0, 1, 2], new_config)
-    self.expect_watch_instances([0, 1, 2])
+    self.expect_add_instances([0, 1, 2], new_config)
     self.expect_finish()
     self.replay_mocks()
 
@@ -361,9 +406,7 @@ class UpdaterTest(TestCase):
     self.expect_get_tasks(old_configs)
     self.expect_populate(job_config)
     self.expect_quota_check(7, 0)
-    self.expect_kill([3, 4, 5])
-    self.expect_kill([6, 7, 8])
-    self.expect_kill([9])
+    self.expect_kill_instances([3, 4, 5, 6, 7, 8, 9])
     self.expect_finish()
     self.replay_mocks()
 
@@ -381,13 +424,8 @@ class UpdaterTest(TestCase):
     self.expect_get_tasks(old_configs)
     self.expect_populate(job_config)
     self.expect_quota_check(3, 7)
-    self.expect_kill([0, 1, 2])
-    self.expect_add([0, 1, 2], new_config)
-    self.expect_watch_instances([0, 1, 2])
-    self.expect_add([3, 4, 5], new_config)
-    self.expect_watch_instances([3, 4, 5])
-    self.expect_add([6], new_config)
-    self.expect_watch_instances([6])
+    self.expect_update_instances([0, 1, 2], new_config)
+    self.expect_add_instances([3, 4, 5, 6], new_config)
     self.expect_finish()
     self.replay_mocks()
 
@@ -405,12 +443,8 @@ class UpdaterTest(TestCase):
     self.expect_get_tasks(old_configs)
     self.expect_populate(job_config)
     self.expect_quota_check(10, 1)
-    self.expect_kill([0, 1, 2])
-    self.expect_add([0], new_config)
-    self.expect_watch_instances([0])
-    self.expect_kill([3, 4, 5])
-    self.expect_kill([6, 7, 8])
-    self.expect_kill([9])
+    self.expect_update_instances([0], new_config)
+    self.expect_kill_instances([1, 2, 3, 4, 5, 6, 7, 8, 9])
     self.expect_finish()
     self.replay_mocks()
 
@@ -428,12 +462,7 @@ class UpdaterTest(TestCase):
     self.expect_get_tasks(old_configs)
     self.expect_populate(job_config)
     self.expect_quota_check(5, 5)
-    self.expect_kill([0, 1, 2])
-    self.expect_add([0, 1, 2], new_config)
-    self.expect_watch_instances([0, 1, 2])
-    self.expect_kill([3, 4])
-    self.expect_add([3, 4], new_config)
-    self.expect_watch_instances([3, 4])
+    self.expect_update_instances([0, 1, 2, 3, 4], new_config)
     self.expect_finish()
     self.replay_mocks()
 
@@ -450,8 +479,7 @@ class UpdaterTest(TestCase):
     self.expect_get_tasks(old_configs)
     self.expect_populate(job_config)
     self.expect_quota_check(0, 2)
-    self.expect_add([3, 4], new_config)
-    self.expect_watch_instances([3, 4])
+    self.expect_add_instances([3, 4], new_config)
     self.expect_finish()
     self.replay_mocks()
 
@@ -468,8 +496,7 @@ class UpdaterTest(TestCase):
     self.expect_get_tasks(old_configs)
     self.expect_populate(job_config)
     self.expect_quota_check(6, 0)
-    self.expect_kill([4, 5, 6])
-    self.expect_kill([7, 8, 9])
+    self.expect_kill_instances([4, 5, 6, 7, 8, 9])
     self.expect_finish()
     self.replay_mocks()
 
@@ -487,15 +514,14 @@ class UpdaterTest(TestCase):
     self.expect_get_tasks(old_configs)
     self.expect_populate(job_config)
     self.expect_quota_check(3, 3)
-    self.expect_kill([2, 3, 4])
-    self.expect_add([2, 3, 4], new_config)
-    self.expect_watch_instances([2, 3, 4])
+    self.expect_update_instances([2, 3, 4], new_config)
     self.expect_finish()
     self.replay_mocks()
 
     self.update_and_expect_ok(instances=[2, 3, 4])
     self.verify_mocks()
 
+
   def test_patch_hole_with_instance_option(self):
     """Patching an instance ID gap created by a terminated update."""
     old_configs = self.make_task_configs(8)
@@ -506,8 +532,7 @@ class UpdaterTest(TestCase):
     self.expect_get_tasks(old_configs, [2, 3])
     self.expect_populate(job_config)
     self.expect_quota_check(0, 2)
-    self.expect_add([2, 3], new_config)
-    self.expect_watch_instances([2, 3])
+    self.expect_add_instances([2, 3], new_config)
     self.expect_finish()
     self.replay_mocks()
 
@@ -533,7 +558,7 @@ class UpdaterTest(TestCase):
   def test_update_rollback(self):
     """Update process failures exceed total allowable count and update is rolled back."""
     update_config = self.UPDATE_CONFIG.copy()
-    update_config.update(max_total_failures=2, max_per_shard_failures=1)
+    update_config.update(max_per_shard_failures=1)
     self.init_updater(update_config)
 
     old_configs = self.make_task_configs(10)
@@ -545,55 +570,13 @@ class UpdaterTest(TestCase):
     self.expect_get_tasks(old_configs)
     self.expect_populate(job_config)
     self.expect_quota_check(10, 10)
-    self.expect_kill([0, 1, 2])
-    self.expect_add([0, 1, 2], new_config)
-    self.expect_watch_instances([0, 1, 2], failed_instances=[0, 1, 2])
-    self.expect_restart([0, 1, 2])
-    self.expect_watch_instances([0, 1, 2], failed_instances=[0, 1, 2])
-    self.expect_kill([2, 1, 0])
-    self.expect_add([2, 1, 0], old_configs[0])
-    self.expect_watch_instances([2, 1, 0])
-    self.expect_finish()
-    self.replay_mocks()
-
-    self.update_and_expect_response(ResponseCode.ERROR)
-    self.verify_mocks()
-
-  def test_update_rollback_sorted(self):
-    """Rolling back with a batch of 1 should still be correctly sorted in reverse"""
-    update_config = self.UPDATE_CONFIG.copy()
-    update_config.update(max_total_failures=0, max_per_shard_failures=1, batch_size=1)
-    self.init_updater(update_config)
-
-    old_configs = self.make_task_configs(5)
-    new_config = deepcopy(old_configs[0])
-    new_config.priority = 5
-    job_config = self.make_job_config(new_config, 5)
-    self._config.job_config = job_config
-    self.expect_start()
-    self.expect_get_tasks(old_configs)
-    self.expect_populate(job_config)
-    self.expect_quota_check(5, 5)
-    self.expect_kill([0])
-    self.expect_add([0], new_config)
-    self.expect_watch_instances([0])
-    self.expect_kill([1])
-    self.expect_add([1], new_config)
-    self.expect_watch_instances([1])
+    self.expect_update_instances([0, 1], new_config)
     self.expect_kill([2])
     self.expect_add([2], new_config)
     self.expect_watch_instances([2], failed_instances=[2])
     self.expect_restart([2])
     self.expect_watch_instances([2], failed_instances=[2])
-    self.expect_kill([2])
-    self.expect_add([2], old_configs[0])
-    self.expect_watch_instances([2])
-    self.expect_kill([1])
-    self.expect_add([1], old_configs[0])
-    self.expect_watch_instances([1])
-    self.expect_kill([0])
-    self.expect_add([0], old_configs[0])
-    self.expect_watch_instances([0])
+    self.expect_update_instances([2, 1, 0], old_configs[0])
     self.expect_finish()
     self.replay_mocks()
 
@@ -615,14 +598,13 @@ class UpdaterTest(TestCase):
     self.expect_get_tasks(old_configs)
     self.expect_populate(job_config)
     self.expect_quota_check(6, 6)
-    self.expect_kill([0, 1, 2])
-    self.expect_add([0, 1, 2], new_config)
-    self.expect_watch_instances([0, 1, 2], failed_instances=[0, 1, 2])
-    self.expect_restart([0, 1, 2])
-    self.expect_watch_instances([0, 1, 2])
-    self.expect_kill([3, 4, 5])
-    self.expect_add([3, 4, 5], new_config)
-    self.expect_watch_instances([3, 4, 5])
+    self.expect_update_instances([0, 1], new_config)
+    self.expect_kill([2])
+    self.expect_add([2], new_config)
+    self.expect_watch_instances([2], failed_instances=[2])
+    self.expect_restart([2])
+    self.expect_watch_instances([2])
+    self.expect_update_instances([3, 4, 5], new_config)
     self.expect_finish()
     self.replay_mocks()
 
@@ -695,7 +677,9 @@ class UpdaterTest(TestCase):
     self.expect_get_tasks(old_configs)
     self.expect_populate(job_config)
     self.expect_quota_check(5, 5)
-    self.expect_kill([0, 1, 2], response_code=ResponseCode.INVALID_REQUEST)
+    self._scheduler_mux.raise_error()
+    self.expect_kill([0], skip_monitor=True)
+    self.expect_terminate()
     self.replay_mocks()
 
     self.update_and_expect_response(ResponseCode.ERROR)
@@ -712,7 +696,8 @@ class UpdaterTest(TestCase):
     self.expect_get_tasks(old_configs)
     self.expect_populate(job_config)
     self.expect_quota_check(5, 5)
-    self.expect_kill([0, 1, 2], monitor_result=False)
+    self.expect_kill([0], monitor_result=False)
+    self.expect_terminate()
     self.replay_mocks()
 
     self.update_and_expect_response(ResponseCode.ERROR)
@@ -763,50 +748,15 @@ class UpdaterTest(TestCase):
     self.expect_get_tasks(old_configs)
     self.expect_populate(job_config)
     self.expect_quota_check(10, 10)
-    self.expect_kill([0, 1, 2])
-    self.expect_add([0, 1, 2], new_config)
-    self.expect_watch_instances([0, 1, 2], failed_instances=[0])
-    self.expect_restart([0])
-    self.expect_kill([3, 4])
-    self.expect_add([3, 4], new_config)
-    self.expect_watch_instances([0, 3, 4], failed_instances=[0])
-    self.expect_restart([0])
-    self.expect_kill([5, 6])
-    self.expect_add([5, 6], new_config)
-    self.expect_watch_instances([0, 5, 6], failed_instances=[0])
-    self.expect_kill([7, 8, 9])
-    self.expect_add([7, 8, 9], new_config)
-    self.expect_watch_instances([7, 8, 9])
-    self.expect_finish()
-    self.replay_mocks()
-
-    self.update_and_expect_ok()
-    self.verify_mocks()
-
-  def test_failed_unretryable_do_not_cause_rollback(self):
-    """Update process still succeeds if failed instances in last batch are within allowed limit."""
-    update_config = self.UPDATE_CONFIG.copy()
-    update_config.update(max_total_failures=1, max_per_shard_failures=2)
-    self.init_updater(update_config)
-
-    old_configs = self.make_task_configs(5)
-    new_config = deepcopy(old_configs[0])
-    new_config.priority = 5
-    job_config = self.make_job_config(new_config, 5)
-    self._config.job_config = job_config
-    self.expect_start()
-    self.expect_get_tasks(old_configs)
-    self.expect_populate(job_config)
-    self.expect_quota_check(5, 5)
-    self.expect_kill([0, 1, 2])
-    self.expect_add([0, 1, 2], new_config)
-    self.expect_watch_instances([0, 1, 2], failed_instances=[0])
-    self.expect_restart([0])
-    self.expect_kill([3, 4])
-    self.expect_add([3, 4], new_config)
-    self.expect_watch_instances([0, 3, 4], failed_instances=[0])
-    self.expect_restart([0])
-    self.expect_watch_instances([0], failed_instances=[0])
+    self.expect_update_instances([0, 1], new_config)
+    self.expect_kill([2])
+    self.expect_add([2], new_config)
+    self.expect_watch_instances([2], failed_instances=[2])
+    self.expect_restart([2])
+    self.expect_watch_instances([2], failed_instances=[2])
+    self.expect_restart([2])
+    self.expect_watch_instances([2], failed_instances=[2])
+    self.expect_update_instances([3, 4, 5, 6, 7, 8, 9], new_config)
     self.expect_finish()
     self.replay_mocks()
 
@@ -839,7 +789,7 @@ class UpdaterTest(TestCase):
   def test_update_no_rollback(self):
     """Update process failures exceed total allowable count and update is not rolled back."""
     update_config = self.UPDATE_CONFIG.copy()
-    update_config.update(max_total_failures=2, max_per_shard_failures=1, rollback_on_failure=False)
+    update_config.update(max_total_failures=1, max_per_shard_failures=1, rollback_on_failure=False)
     self.init_updater(update_config)
 
     old_configs = self.make_task_configs(10)
@@ -851,11 +801,16 @@ class UpdaterTest(TestCase):
     self.expect_get_tasks(old_configs)
     self.expect_populate(job_config)
     self.expect_quota_check(10, 10)
-    self.expect_kill([0, 1, 2])
-    self.expect_add([0, 1, 2], new_config)
-    self.expect_watch_instances([0, 1, 2], failed_instances=[0, 1, 2])
-    self.expect_restart([0, 1, 2])
-    self.expect_watch_instances([0, 1, 2], failed_instances=[0, 1, 2])
+    self.expect_kill([0])
+    self.expect_add([0], new_config)
+    self.expect_watch_instances([0], failed_instances=[0])
+    self.expect_restart([0])
+    self.expect_watch_instances([0], failed_instances=[0])
+    self.expect_kill([1])
+    self.expect_add([1], new_config)
+    self.expect_watch_instances([1], failed_instances=[1])
+    self.expect_restart([1])
+    self.expect_watch_instances([1], failed_instances=[1])
     self.expect_finish()
     self.replay_mocks()
 

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/e1c0ade2/src/test/python/apache/aurora/client/cli/test_create.py
----------------------------------------------------------------------
diff --git a/src/test/python/apache/aurora/client/cli/test_create.py b/src/test/python/apache/aurora/client/cli/test_create.py
index af548ae..ca635bd 100644
--- a/src/test/python/apache/aurora/client/cli/test_create.py
+++ b/src/test/python/apache/aurora/client/cli/test_create.py
@@ -95,7 +95,9 @@ class TestClientCreateCommand(AuroraClientCommandTest):
     # object, and everything can be stubbed through that.
     mock_context = FakeAuroraCommandContext()
     with contextlib.nested(
-        patch('time.sleep'),
+        # TODO(maxim): Patching threading.Event with all possible namespace/patch/mock
+        #              combinations did not produce the desired effect. Investigate why (AURORA-510)
+        patch('threading._Event.wait'),
         patch('apache.aurora.client.cli.jobs.Job.create_context', return_value=mock_context)):
       # After making the client, create sets up a job monitor.
       # The monitor uses TaskQuery to get the tasks. It's called at least twice:once before
@@ -127,7 +129,7 @@ class TestClientCreateCommand(AuroraClientCommandTest):
     """
     mock_context = FakeAuroraCommandContext()
     with contextlib.nested(
-        patch('time.sleep'),
+        patch('threading._Event.wait'),
         patch('apache.aurora.client.cli.jobs.Job.create_context', return_value=mock_context)):
       mock_query = self.create_mock_query()
       for result in [ScheduleStatus.PENDING, ScheduleStatus.PENDING, ScheduleStatus.RUNNING]:


Mime
View raw message