aurora-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From wfar...@apache.org
Subject [27/51] [partial] Rename twitter* and com.twitter to apache and org.apache directories to preserve all file history before the refactor.
Date Tue, 31 Dec 2013 21:20:20 GMT
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/org/apache/aurora/scheduler/thrift/auth/ThriftAuthModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/thrift/auth/ThriftAuthModule.java b/src/main/java/org/apache/aurora/scheduler/thrift/auth/ThriftAuthModule.java
new file mode 100644
index 0000000..66f9033
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/thrift/auth/ThriftAuthModule.java
@@ -0,0 +1,68 @@
+/*
+ * Copyright 2013 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.aurora.scheduler.thrift.auth;
+
+import java.util.Map;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
+import com.google.inject.AbstractModule;
+import com.google.inject.TypeLiteral;
+
+import com.twitter.aurora.auth.CapabilityValidator;
+import com.twitter.aurora.auth.CapabilityValidator.Capability;
+import com.twitter.aurora.auth.SessionValidator;
+import com.twitter.common.args.Arg;
+import com.twitter.common.args.CmdLine;
+import com.twitter.common.args.constraints.NotEmpty;
+
+/**
+ * Binding module for authentication of users with special capabilities for admin functions.
+ */
+public class ThriftAuthModule extends AbstractModule {
+
+  private static final Map<Capability, String> DEFAULT_CAPABILITIES =
+      ImmutableMap.of(Capability.ROOT, "mesos");
+
+  @NotEmpty
+  @CmdLine(name = "user_capabilities",
+      help = "Concrete name mappings for administration capabilities.")
+  private static final Arg<Map<Capability, String>> USER_CAPABILITIES =
+      Arg.create(DEFAULT_CAPABILITIES);
+
+  private Map<Capability, String> capabilities;
+
+  public ThriftAuthModule() {
+    this(USER_CAPABILITIES.get());
+  }
+
+  @VisibleForTesting
+  public ThriftAuthModule(Map<Capability, String> capabilities) {
+    this.capabilities = Preconditions.checkNotNull(capabilities);
+  }
+
+  @Override
+  protected void configure() {
+    Preconditions.checkArgument(
+        capabilities.containsKey(Capability.ROOT),
+        "A ROOT capability must be provided with --user_capabilities");
+    bind(new TypeLiteral<Map<Capability, String>>() { }).toInstance(capabilities);
+
+    requireBinding(SessionValidator.class);
+    requireBinding(CapabilityValidator.class);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/apache/__init__.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/__init__.py b/src/main/python/apache/__init__.py
new file mode 100644
index 0000000..de40ea7
--- /dev/null
+++ b/src/main/python/apache/__init__.py
@@ -0,0 +1 @@
+__import__('pkg_resources').declare_namespace(__name__)

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/apache/aurora/BUILD
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/BUILD b/src/main/python/apache/aurora/BUILD
new file mode 100644
index 0000000..e69de29

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/apache/aurora/BUILD.thirdparty
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/BUILD.thirdparty b/src/main/python/apache/aurora/BUILD.thirdparty
new file mode 100644
index 0000000..7a43aca
--- /dev/null
+++ b/src/main/python/apache/aurora/BUILD.thirdparty
@@ -0,0 +1,19 @@
+def make_dep(name, version, dependency_name=None):
+  """Build a target from a specified dependency tuple.
+
+    name is the target name, specified in other BUILD files.
+    version is a hardcoded version string
+    dependency_name is used to identify the specific binary to resolve
+  """
+  dependency_name = dependency_name or name
+  versioned_name = "%s==%s" % (dependency_name, version)
+  python_requirement(requirement=versioned_name, name=name)
+
+make_dep('argparse', '1.2.1')
+make_dep('mesos-core', '0.15.0-rc4', 'mesos')
+make_dep('mock', '1.0.1')
+make_dep('mox', '0.5.3')
+make_dep('psutil', '1.1.2')
+make_dep('pystachio', '0.7.2')
+make_dep('pyyaml', '3.10', 'PyYAML')
+make_dep('thrift', '0.9.1')

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/apache/aurora/__init__.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/__init__.py b/src/main/python/apache/aurora/__init__.py
new file mode 100644
index 0000000..b0d6433
--- /dev/null
+++ b/src/main/python/apache/aurora/__init__.py
@@ -0,0 +1 @@
+__import__('pkg_resources').declare_namespace(__name__)
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/apache/aurora/admin/BUILD
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/admin/BUILD b/src/main/python/apache/aurora/admin/BUILD
new file mode 100644
index 0000000..c8089b4
--- /dev/null
+++ b/src/main/python/apache/aurora/admin/BUILD
@@ -0,0 +1,11 @@
+python_library(
+  name = 'mesos_maintenance',
+  sources = 'mesos_maintenance.py',
+  dependencies = [
+    pants('aurora/twitterdeps/src/python/twitter/common/log'),
+    pants('aurora/twitterdeps/src/python/twitter/common/quantity'),
+    pants('src/main/python/twitter/aurora/client:api'),
+    pants('src/main/python/twitter/aurora/client:base'),
+    pants('src/main/thrift/com/twitter/aurora/gen:py-thrift'),
+  ]
+)

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/apache/aurora/admin/__init__.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/admin/__init__.py b/src/main/python/apache/aurora/admin/__init__.py
new file mode 100644
index 0000000..e69de29

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/apache/aurora/admin/mesos_maintenance.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/admin/mesos_maintenance.py b/src/main/python/apache/aurora/admin/mesos_maintenance.py
new file mode 100644
index 0000000..4e14436
--- /dev/null
+++ b/src/main/python/apache/aurora/admin/mesos_maintenance.py
@@ -0,0 +1,113 @@
+from collections import defaultdict
+import time
+
+from twitter.common import log
+from twitter.common.quantity import Amount, Time
+
+from twitter.aurora.client.api import AuroraClientAPI
+from twitter.aurora.client.base import check_and_log_response
+
+from gen.twitter.aurora.ttypes import Hosts, MaintenanceMode
+
+
+def group_by_host(hostname):
+  return hostname
+
+
+class MesosMaintenance(object):
+  """This class provides more methods to interact with the mesos cluster and perform
+  maintenance.
+  """
+
+  DEFAULT_GROUPING = 'by_host'
+  GROUPING_FUNCTIONS = {
+    'by_host': group_by_host,
+  }
+  START_MAINTENANCE_DELAY = Amount(30, Time.SECONDS)
+
+  @classmethod
+  def group_hosts(cls, hostnames, grouping_function=DEFAULT_GROUPING):
+    try:
+      grouping_function = cls.GROUPING_FUNCTIONS[grouping_function]
+    except KeyError:
+      raise ValueError('Unknown grouping function %s!' % grouping_function)
+    groups = defaultdict(set)
+    for hostname in hostnames:
+      groups[grouping_function(hostname)].add(hostname)
+    return groups
+
+  @classmethod
+  def iter_batches(cls, hostnames, batch_size, grouping_function=DEFAULT_GROUPING):
+    if batch_size <= 0:
+      raise ValueError('Batch size must be > 0!')
+    groups = cls.group_hosts(hostnames, grouping_function)
+    groups = sorted(groups.items(), key=lambda v: v[0])
+    for k in range(0, len(groups), batch_size):
+      yield Hosts(set.union(*(hostset for (key, hostset) in groups[k:k+batch_size])))
+
+  def __init__(self, cluster, verbosity):
+    self._client = AuroraClientAPI(cluster, verbosity == 'verbose')
+
+  def _drain_hosts(self, drainable_hosts, clock=time):
+    """This will actively turn down tasks running on hosts."""
+    check_and_log_response(self._client.drain_hosts(drainable_hosts))
+    not_ready_hosts = [hostname for hostname in drainable_hosts.hostNames]
+    while not_ready_hosts:
+      log.info("Sleeping for %s." % self.START_MAINTENANCE_DELAY)
+      clock.sleep(self.START_MAINTENANCE_DELAY.as_(Time.SECONDS))
+      resp = self._client.maintenance_status(Hosts(not_ready_hosts))
+      #TODO(jsmith): Workaround until scheduler responds with unknown slaves in MESOS-3454
+      if not resp.result.maintenanceStatusResult.statuses:
+        not_ready_hosts = None
+      for host_status in resp.result.maintenanceStatusResult.statuses:
+        if host_status.mode != MaintenanceMode.DRAINED:
+          log.warning('%s is currently in status %s' %
+              (host_status.host, MaintenanceMode._VALUES_TO_NAMES[host_status.mode]))
+        else:
+          not_ready_hosts.remove(host_status.host)
+
+  def _complete_maintenance(self, drained_hosts):
+    """End the maintenance status for a give set of hosts."""
+    check_and_log_response(self._client.end_maintenance(drained_hosts))
+    resp = self._client.maintenance_status(drained_hosts)
+    for host_status in resp.result.maintenanceStatusResult.statuses:
+      if host_status.mode != MaintenanceMode.NONE:
+        log.warning('%s is DRAINING or in DRAINED' % host_status.host)
+
+  def _operate_on_hosts(self, drained_hosts, callback):
+    """Perform a given operation on a list of hosts that are ready for maintenance."""
+    for host in drained_hosts.hostNames:
+      callback(host)
+
+  def end_maintenance(self, hosts):
+    """Pull a list of hosts out of maintenance mode."""
+    self._complete_maintenance(Hosts(set(hosts)))
+
+  def start_maintenance(self, hosts):
+    """Put a list of hosts into maintenance mode, to de-prioritize scheduling."""
+    check_and_log_response(self._client.start_maintenance(Hosts(set(hosts))))
+
+  def perform_maintenance(self, hosts, batch_size=1, grouping_function=DEFAULT_GROUPING,
+                          callback=None):
+    """The wrap a callback in between sending hosts into maintenance mode and back.
+
+    Walk through the process of putting hosts into maintenance, draining them of tasks,
+    performing an action on them once drained, then removing them from maintenance mode
+    so tasks can schedule.
+    """
+    self._complete_maintenance(Hosts(set(hosts)))
+    self.start_maintenance(hosts)
+
+    for hosts in self.iter_batches(hosts, batch_size, grouping_function):
+      self._drain_hosts(hosts)
+      if callback:
+        self._operate_on_hosts(hosts, callback)
+      self._complete_maintenance(hosts)
+
+  def check_status(self, hosts):
+    resp = self._client.maintenance_status(Hosts(set(hosts)))
+    check_and_log_response(resp)
+    statuses = []
+    for host_status in resp.result.maintenanceStatusResult.statuses:
+      statuses.append((host_status.host, MaintenanceMode._VALUES_TO_NAMES[host_status.mode]))
+    return statuses

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/apache/aurora/client/BUILD
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/client/BUILD b/src/main/python/apache/aurora/client/BUILD
new file mode 100644
index 0000000..70eef2e
--- /dev/null
+++ b/src/main/python/apache/aurora/client/BUILD
@@ -0,0 +1,78 @@
+import os
+
+# Create an alias for the previous target
+python_library(
+  name = 'api',
+  dependencies = [
+    pants('src/main/python/twitter/aurora/client/api')
+  ]
+)
+
+python_library(
+  name = 'base',
+  sources = ['base.py'],
+  dependencies = [
+    pants('aurora/twitterdeps/src/python/twitter/common/app'),
+    pants('aurora/twitterdeps/src/python/twitter/common/log'),
+    pants('src/main/thrift/com/twitter/aurora/gen:py-thrift'),
+  ]
+)
+
+python_library(
+  name = 'binding_helper',
+  sources = ['binding_helper.py'],
+  dependencies = [
+    pants('aurora/twitterdeps/src/python/twitter/common/lang'),
+  ]
+)
+
+python_library(
+  name = 'config',
+  sources = ['config.py'],
+  dependencies = [
+    pants('src/main/python/twitter/aurora/BUILD.thirdparty:pystachio'),
+    pants(':base'),
+    pants(':binding_helper'),
+    pants('aurora/twitterdeps/src/python/twitter/common/app'),
+    pants('aurora/twitterdeps/src/python/twitter/common/log'),
+    pants('src/main/python/twitter/aurora/config'),
+    pants('src/main/thrift/com/twitter/aurora/gen:py-thrift'),
+  ]
+)
+
+python_library(
+  name = 'factory',
+  sources = ['factory.py'],
+  dependencies = [
+    pants(':base'),
+    pants('aurora/twitterdeps/src/python/twitter/common/app'),
+    pants('src/main/python/twitter/aurora/client/hooks'),
+    pants('src/main/python/twitter/aurora/common:cluster'),
+    pants('src/main/python/twitter/aurora/common:clusters'),
+  ]
+)
+
+python_library(
+  name = 'options',
+  sources = ['options.py'],
+  dependencies = [
+    pants('src/main/python/twitter/thermos/common:options'),
+    pants('src/main/python/twitter/aurora/common:aurora_job_key'),
+  ]
+)
+
+python_library(
+  name = 'client-packaged',
+  dependencies = [
+    pants('src/main/python/twitter/aurora/common'),
+    pants('src/main/python/twitter/aurora/config'),
+    pants('src/main/python/twitter/thermos/common'),
+  ],
+  provides = setup_py(
+    name = 'twitter.aurora.client',
+    version = open(os.path.join(get_buildroot(), '.auroraversion')).read().strip().lower(),
+  ).with_binaries(
+    aurora_admin = pants('src/main/python/twitter/aurora/client/bin:aurora_admin'),
+    aurora_client = pants('src/main/python/twitter/aurora/client/bin:aurora_client'),
+  )
+)

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/apache/aurora/client/__init__.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/client/__init__.py b/src/main/python/apache/aurora/client/__init__.py
new file mode 100644
index 0000000..b0d6433
--- /dev/null
+++ b/src/main/python/apache/aurora/client/__init__.py
@@ -0,0 +1 @@
+__import__('pkg_resources').declare_namespace(__name__)
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/apache/aurora/client/api/BUILD
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/client/api/BUILD b/src/main/python/apache/aurora/client/api/BUILD
new file mode 100644
index 0000000..4935b8a
--- /dev/null
+++ b/src/main/python/apache/aurora/client/api/BUILD
@@ -0,0 +1,104 @@
+python_library(
+  name = 'api',
+  sources = ['__init__.py'],
+  dependencies = [
+    pants(':restarter'),
+    pants(':scheduler_client'),
+    pants(':updater'),
+    pants('aurora/twitterdeps/src/python/twitter/common/lang'),
+    pants('aurora/twitterdeps/src/python/twitter/common/log'),
+    pants('src/main/python/twitter/aurora/common'),
+    pants('src/main/thrift/com/twitter/aurora/gen:py-thrift'),
+  ]
+)
+
+python_library(
+  name = 'command_runner',
+  sources = ['command_runner.py'],
+  dependencies = [
+    pants('src/main/python/twitter/aurora/BUILD.thirdparty:pystachio'),
+    pants(':api'),
+    pants('src/main/python/twitter/thermos/config:schema'),
+    pants('src/main/python/twitter/aurora/common:cluster'),
+    pants('src/main/python/twitter/aurora/config:schema'),
+    pants('src/main/thrift/com/twitter/aurora/gen:py-thrift'),
+  ]
+)
+
+python_library(
+  name = 'disambiguator',
+  sources = ['disambiguator.py'],
+  dependencies = [
+    pants(':api'),
+    pants('aurora/twitterdeps/src/python/twitter/common/log'),
+    pants('src/main/python/twitter/aurora/client:base'),
+    pants('src/main/python/twitter/aurora/common'),
+  ]
+)
+
+python_library(
+  name = 'job_monitor',
+  sources = ['job_monitor.py'],
+  dependencies = [
+    pants('aurora/twitterdeps/src/python/twitter/common/quantity'),
+    pants('src/main/thrift/com/twitter/aurora/gen:py-thrift'),
+  ]
+)
+
+python_library(
+  name = 'restarter',
+  sources = ['restarter.py'],
+  dependencies = [
+    pants(':instance_watcher'),
+    pants(':updater_util'),
+    pants('aurora/twitterdeps/src/python/twitter/common/log'),
+    pants('src/main/thrift/com/twitter/aurora/gen:py-thrift'),
+  ]
+)
+
+python_library(
+  name = 'scheduler_client',
+  sources = ['scheduler_client.py'],
+  dependencies = [
+    pants('aurora/twitterdeps/src/python/twitter/common/log'),
+    pants('aurora/twitterdeps/src/python/twitter/common/quantity'),
+    pants('aurora/twitterdeps/src/python/twitter/common/rpc/transports:tsslsocket'),
+    pants('aurora/twitterdeps/src/python/twitter/common/zookeeper/serverset:kazoo_serverset'),
+    pants('aurora/twitterdeps/src/python/twitter/common/zookeeper:kazoo_client'),
+    pants('src/main/python/twitter/aurora/common/auth'),
+    pants('src/main/python/twitter/aurora/common:cluster'),
+    pants('src/main/thrift/com/twitter/aurora/gen:py-thrift'),
+  ]
+)
+
+python_library(
+  name = 'instance_watcher',
+  sources = ['instance_watcher.py', 'health_check.py'],
+  dependencies = [
+    pants(':scheduler_client'),
+    pants('aurora/twitterdeps/src/python/twitter/common/lang'),
+    pants('aurora/twitterdeps/src/python/twitter/common/log'),
+    pants('src/main/python/twitter/aurora/common:http_signaler'),
+    pants('src/main/thrift/com/twitter/aurora/gen:py-thrift'),
+  ]
+)
+
+python_library(
+  name = 'updater',
+  sources = ['updater.py'],
+  dependencies = [
+    pants(':scheduler_client'),
+    pants(':instance_watcher'),
+    pants(':updater_util'),
+    pants('aurora/twitterdeps/src/python/twitter/common/log'),
+    pants('src/main/thrift/com/twitter/aurora/gen:py-thrift'),
+  ]
+)
+
+python_library(
+  name = 'updater_util',
+  sources = ['updater_util.py'],
+  dependencies = [
+    pants('aurora/twitterdeps/src/python/twitter/common/log'),
+  ]
+)

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/apache/aurora/client/api/__init__.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/client/api/__init__.py b/src/main/python/apache/aurora/client/api/__init__.py
new file mode 100644
index 0000000..60f4011
--- /dev/null
+++ b/src/main/python/apache/aurora/client/api/__init__.py
@@ -0,0 +1,190 @@
+from twitter.common import log
+
+from twitter.aurora.common.aurora_job_key import AuroraJobKey
+from twitter.aurora.common.auth import make_session_key
+from twitter.aurora.common.cluster import Cluster
+
+from gen.twitter.aurora.constants import LIVE_STATES
+from gen.twitter.aurora.ttypes import (
+    Response,
+    Identity,
+    Quota,
+    ResponseCode,
+    TaskQuery)
+
+from .restarter import Restarter
+from .scheduler_client import SchedulerProxy
+from .updater import Updater
+
+
+class AuroraClientAPI(object):
+  """This class provides the API to talk to the twitter scheduler"""
+
+  class Error(Exception): pass
+  class TypeError(Error, TypeError): pass
+  class ClusterMismatch(Error, ValueError): pass
+
+  def __init__(self, cluster, verbose=False, session_key_factory=make_session_key):
+    if not isinstance(cluster, Cluster):
+      raise TypeError('AuroraClientAPI expects instance of Cluster for "cluster", got %s' %
+          type(cluster))
+    self._scheduler = SchedulerProxy(
+        cluster, verbose=verbose, session_key_factory=session_key_factory)
+    self._cluster = cluster
+
+  @property
+  def cluster(self):
+    return self._cluster
+
+  @property
+  def scheduler(self):
+    return self._scheduler
+
+  def create_job(self, config, lock=None):
+    log.info('Creating job %s' % config.name())
+    log.debug('Full configuration: %s' % config.job())
+    log.debug('Lock %s' % lock)
+    return self._scheduler.createJob(config.job(), lock)
+
+  def populate_job_config(self, config, validation=None):
+    return self._scheduler.populateJobConfig(config.job(), validation)
+
+  def start_cronjob(self, job_key):
+    self._assert_valid_job_key(job_key)
+
+    log.info("Starting cron job: %s" % job_key)
+    return self._scheduler.startCronJob(job_key.to_thrift())
+
+  def get_jobs(self, role):
+    log.info("Retrieving jobs for role %s" % role)
+    return self._scheduler.getJobs(role)
+
+  def kill_job(self, job_key, instances=None, lock=None):
+    log.info("Killing tasks for job: %s" % job_key)
+    if not isinstance(job_key, AuroraJobKey):
+      raise TypeError('Expected type of job_key %r to be %s but got %s instead'
+          % (job_key, AuroraJobKey.__name__, job_key.__class__.__name__))
+
+    # Leave query.owner.user unset so the query doesn't filter jobs only submitted by a particular
+    # user.
+    # TODO(wfarner): Refactor this when Identity is removed from TaskQuery.
+    query = job_key.to_thrift_query()
+    if instances is not None:
+      log.info("Instances to be killed: %s" % instances)
+      query.instanceIds = frozenset([int(s) for s in instances])
+    return self._scheduler.killTasks(query, lock)
+
+  def check_status(self, job_key):
+    self._assert_valid_job_key(job_key)
+
+    log.info("Checking status of %s" % job_key)
+    return self.query(job_key.to_thrift_query())
+
+  @classmethod
+  def build_query(cls, role, job, instances=None, statuses=LIVE_STATES, env=None):
+    return TaskQuery(owner=Identity(role=role),
+                     jobName=job,
+                     statuses=statuses,
+                     instanceIds=instances,
+                     environment=env)
+
+  def query(self, query):
+    return self._scheduler.getTasksStatus(query)
+
+  def update_job(self, config, health_check_interval_seconds=3, instances=None):
+    """Run a job update for a given config, for the specified instances.  If
+       instances is left unspecified, update all instances.  Returns whether or not
+       the update was successful."""
+
+    log.info("Updating job: %s" % config.name())
+    updater = Updater(config, health_check_interval_seconds, self._scheduler)
+
+    return updater.update(instances)
+
+  def cancel_update(self, job_key):
+    """Cancel the update represented by job_key. Returns whether or not the cancellation was
+       successful."""
+    self._assert_valid_job_key(job_key)
+
+    log.info("Canceling update on job %s" % job_key)
+    resp = Updater.cancel_update(self._scheduler, job_key)
+    if resp.responseCode != ResponseCode.OK:
+      log.error('Error cancelling the update: %s' % resp.message)
+    return resp
+
+  def restart(self, job_key, instances, updater_config, health_check_interval_seconds):
+    """Perform a rolling restart of the job. If instances is None or [], restart all instances. Returns
+       the scheduler response for the last restarted batch of instances (which allows the client to
+       show the job URL), or the status check response if no tasks were active.
+    """
+    self._assert_valid_job_key(job_key)
+
+    return Restarter(job_key, updater_config, health_check_interval_seconds, self._scheduler
+    ).restart(instances)
+
+  def start_maintenance(self, hosts):
+    log.info("Starting maintenance for: %s" % hosts.hostNames)
+    return self._scheduler.startMaintenance(hosts)
+
+  def drain_hosts(self, hosts):
+    log.info("Draining tasks on: %s" % hosts.hostNames)
+    return self._scheduler.drainHosts(hosts)
+
+  def maintenance_status(self, hosts):
+    log.info("Maintenance status for: %s" % hosts.hostNames)
+    return self._scheduler.maintenanceStatus(hosts)
+
+  def end_maintenance(self, hosts):
+    log.info("Ending maintenance for: %s" % hosts.hostNames)
+    return self._scheduler.endMaintenance(hosts)
+
+  def get_quota(self, role):
+    log.info("Getting quota for: %s" % role)
+    return self._scheduler.getQuota(role)
+
+  def set_quota(self, role, cpu, ram_mb, disk_mb):
+    log.info("Setting quota for user:%s cpu:%f ram_mb:%d disk_mb: %d"
+              % (role, cpu, ram_mb, disk_mb))
+    return self._scheduler.setQuota(role, Quota(cpu, ram_mb, disk_mb))
+
+  def force_task_state(self, task_id, status):
+    log.info("Requesting that task %s transition to state %s" % (task_id, status))
+    return self._scheduler.forceTaskState(task_id, status)
+
+  def perform_backup(self):
+    return self._scheduler.performBackup()
+
+  def list_backups(self):
+    return self._scheduler.listBackups()
+
+  def stage_recovery(self, backup_id):
+    return self._scheduler.stageRecovery(backup_id)
+
+  def query_recovery(self, query):
+    return self._scheduler.queryRecovery(query)
+
+  def delete_recovery_tasks(self, query):
+    return self._scheduler.deleteRecoveryTasks(query)
+
+  def commit_recovery(self):
+    return self._scheduler.commitRecovery()
+
+  def unload_recovery(self):
+    return self._scheduler.unloadRecovery()
+
+  def get_job_updates(self):
+    return self._scheduler.getJobUpdates()
+
+  def snapshot(self):
+    return self._scheduler.snapshot()
+
+  def unsafe_rewrite_config(self, rewrite_request):
+    return self._scheduler.rewriteConfigs(rewrite_request)
+
+  def _assert_valid_job_key(self, job_key):
+    if not isinstance(job_key, AuroraJobKey):
+      raise self.TypeError('Invalid job_key %r: expected %s but got %s'
+          % (job_key, AuroraJobKey.__name__, job_key.__class__.__name__))
+    if job_key.cluster != self.cluster.name:
+      raise self.ClusterMismatch('job %s does not belong to cluster %s' % (job_key,
+          self.cluster.name))

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/apache/aurora/client/api/command_runner.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/client/api/command_runner.py b/src/main/python/apache/aurora/client/api/command_runner.py
new file mode 100644
index 0000000..0a8ad33
--- /dev/null
+++ b/src/main/python/apache/aurora/client/api/command_runner.py
@@ -0,0 +1,120 @@
+from multiprocessing.pool import ThreadPool
+import posixpath
+import subprocess
+
+from twitter.common import log
+
+from twitter.aurora.client.api import AuroraClientAPI
+from twitter.aurora.config.schema.base import MesosContext
+from twitter.aurora.common.cluster import Cluster
+from twitter.thermos.config.schema import ThermosContext
+
+from gen.twitter.aurora.constants import LIVE_STATES
+from gen.twitter.aurora.ttypes import (
+  Identity,
+  ResponseCode,
+  TaskQuery)
+
+from pystachio import Environment, Required, String
+
+
+class CommandRunnerTrait(Cluster.Trait):
+  slave_root          = Required(String)
+  slave_run_directory = Required(String)
+
+
+class DistributedCommandRunner(object):
+  @staticmethod
+  def execute(args):
+    hostname, role, command = args
+    ssh_command = ['ssh', '-n', '-q', '%s@%s' % (role, hostname), command]
+    po = subprocess.Popen(ssh_command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
+    output = po.communicate()
+    return '\n'.join('%s:  %s' % (hostname, line) for line in output[0].splitlines())
+
+  @classmethod
+  def make_executor_path(cls, cluster, executor_name):
+    parameters = cls.sandbox_args(cluster)
+    parameters.update(executor_name=executor_name)
+    return posixpath.join(
+        '%(slave_root)s',
+        'slaves/*/frameworks/*/executors/%(executor_name)s/runs',
+        '%(slave_run_directory)s'
+    ) % parameters
+
+  @classmethod
+  def thermos_sandbox(cls, cluster, executor_sandbox=False):
+    sandbox = cls.make_executor_path(cluster, 'thermos-{{thermos.task_id}}')
+    return sandbox if executor_sandbox else posixpath.join(sandbox, 'sandbox')
+
+  @classmethod
+  def sandbox_args(cls, cluster):
+    cluster = cluster.with_trait(CommandRunnerTrait)
+    return {'slave_root': cluster.slave_root, 'slave_run_directory': cluster.slave_run_directory}
+
+  @classmethod
+  def substitute_thermos(cls, command, task, cluster, **kw):
+    prefix_command = 'cd %s;' % cls.thermos_sandbox(cluster, **kw)
+    thermos_namespace = ThermosContext(
+        task_id=task.assignedTask.taskId,
+        ports=task.assignedTask.assignedPorts)
+    mesos_namespace = MesosContext(instance=task.assignedTask.instanceId)
+    command = String(prefix_command + command) % Environment(
+        thermos=thermos_namespace,
+        mesos=mesos_namespace)
+    return command.get()
+
+  @classmethod
+  def aurora_sandbox(cls, cluster, executor_sandbox=False):
+    if executor_sandbox:
+      return cls.make_executor_path(cluster, 'twitter')
+    else:
+      return '/var/run/nexus/%task_id%/sandbox'
+
+  @classmethod
+  def substitute_aurora(cls, command, task, cluster, **kw):
+    command = ('cd %s;' % cls.aurora_sandbox(cluster, **kw)) + command
+    command = command.replace('%shard_id%', str(task.assignedTask.instanceId))
+    command = command.replace('%task_id%', task.assignedTask.taskId)
+    for name, port in task.assignedTask.assignedPorts.items():
+      command = command.replace('%port:' + name + '%', str(port))
+    return command
+
+  @classmethod
+  def substitute(cls, command, task, cluster, **kw):
+    if task.assignedTask.task.executorConfig:
+      return cls.substitute_thermos(command, task, cluster, **kw)
+    else:
+      return cls.substitute_aurora(command, task, cluster, **kw)
+
+  @classmethod
+  def query_from(cls, role, env, job):
+    return TaskQuery(statuses=LIVE_STATES, owner=Identity(role), jobName=job, environment=env)
+
+  def __init__(self, cluster, role, env, jobs, ssh_user=None):
+    self._cluster = cluster
+    self._api = AuroraClientAPI(cluster=cluster)
+    self._role = role
+    self._env = env
+    self._jobs = jobs
+    self._ssh_user = ssh_user if ssh_user else self._role
+
+  def resolve(self):
+    for job in self._jobs:
+      resp = self._api.query(self.query_from(self._role, self._env, job))
+      if resp.responseCode != ResponseCode.OK:
+        log.error('Failed to query job: %s' % job)
+        continue
+      for task in resp.result.scheduleStatusResult.tasks:
+        yield task
+
+  def process_arguments(self, command, **kw):
+    for task in self.resolve():
+      host = task.assignedTask.slaveHost
+      role = task.assignedTask.task.owner.role
+      yield (host, self._ssh_user, self.substitute(command, task, self._cluster, **kw))
+
+  def run(self, command, parallelism=1, **kw):
+    threadpool = ThreadPool(processes=parallelism)
+    for result in threadpool.imap_unordered(self.execute, self.process_arguments(command, **kw)):
+      print result

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/apache/aurora/client/api/disambiguator.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/client/api/disambiguator.py b/src/main/python/apache/aurora/client/api/disambiguator.py
new file mode 100644
index 0000000..4693574
--- /dev/null
+++ b/src/main/python/apache/aurora/client/api/disambiguator.py
@@ -0,0 +1,89 @@
+from twitter.common import log
+
+from twitter.aurora.client.api import AuroraClientAPI
+from twitter.aurora.client.base import check_and_log_response, deprecation_warning, die
+from twitter.aurora.common.aurora_job_key import AuroraJobKey
+
+
+class LiveJobDisambiguator(object):
+  """
+  Disambiguates a job-specification into concrete AuroraJobKeys by querying the scheduler API.
+  """
+
+  def __init__(self, client, role, env, name):
+    if not isinstance(client, AuroraClientAPI):
+      raise TypeError("client must be a AuroraClientAPI")
+    self._client = client
+
+    if not role:
+      raise ValueError("role is required")
+    self._role = role
+    if not name:
+      raise ValueError("name is required")
+    self._name = name
+    self._env = env
+
+  @property
+  def ambiguous(self):
+    return not all((self._role, self._env, self._name))
+
+  def query_matches(self):
+    resp = self._client.get_jobs(self._role)
+    check_and_log_response(resp)
+    return set(AuroraJobKey(self._client.cluster.name, j.key.role, j.key.environment, j.key.name)
+        for j in resp.result.getJobsResult.configs if j.key.name == self._name)
+
+  @classmethod
+  def _disambiguate_or_die(cls, client, role, env, name):
+    # Returns a single AuroraJobKey if one can be found given the args, potentially
+    # querying the scheduler. Calls die() with an appropriate error message otherwise.
+    try:
+      disambiguator = cls(client, role, env, name)
+    except ValueError as e:
+      die(e)
+
+    if not disambiguator.ambiguous:
+      return AuroraJobKey(client.cluster.name, role, env, name)
+
+    deprecation_warning("Job ambiguously specified - querying the scheduler to disambiguate")
+    matches = disambiguator.query_matches()
+    if len(matches) == 1:
+      (match,) = matches
+      log.info("Found job %s" % match)
+      return match
+    elif len(matches) == 0:
+      die("No jobs found")
+    else:
+      die("Multiple jobs match (%s) - disambiguate by using the CLUSTER/ROLE/ENV/NAME form"
+          % ",".join(str(m) for m in matches))
+
+  @classmethod
+  def disambiguate_args_or_die(cls, args, options, client_factory=AuroraClientAPI):
+    """
+    Returns a (AuroraClientAPI, AuroraJobKey, AuroraConfigFile:str) tuple
+    if one can be found given the args, potentially querying the scheduler with the returned client.
+    Calls die() with an appropriate error message otherwise.
+
+    Arguments:
+      args: args from app command invocation.
+      options: options from app command invocation. must have env and cluster attributes.
+      client_factory: a callable (cluster) -> AuroraClientAPI.
+    """
+    if not len(args) > 0:
+      die('job path is required')
+    try:
+      job_key = AuroraJobKey.from_path(args[0])
+      client = client_factory(job_key.cluster)
+      config_file = args[1] if len(args) > 1 else None  # the config for hooks
+      return client, job_key, config_file
+    except AuroraJobKey.Error:
+      log.warning("Failed to parse job path, falling back to compatibility mode")
+      role = args[0] if len(args) > 0 else None
+      name = args[1] if len(args) > 1 else None
+      env = None
+      config_file = None  # deprecated form does not support hooks functionality
+      cluster = options.cluster
+      if not cluster:
+        die('cluster is required')
+      client = client_factory(cluster)
+      return client, cls._disambiguate_or_die(client, role, env, name), config_file

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/apache/aurora/client/api/health_check.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/client/api/health_check.py b/src/main/python/apache/aurora/client/api/health_check.py
new file mode 100644
index 0000000..b75ee6e
--- /dev/null
+++ b/src/main/python/apache/aurora/client/api/health_check.py
@@ -0,0 +1,123 @@
+from abc import abstractmethod
+
+from twitter.common import log
+from twitter.common.lang import Interface
+
+from twitter.aurora.common.http_signaler import HttpSignaler
+
+from gen.twitter.aurora.ttypes import ScheduleStatus
+
+
+class HealthCheck(Interface):
+  @abstractmethod
+  def health(self, task):
+    """Checks health of the task and returns a (healthy, retriable) pair."""
+
+
+class HealthStatus(object):
+  @classmethod
+  def alive(cls):
+    return cls(True).health()
+
+  @classmethod
+  def dead(cls):
+    return cls(False).health()
+
+  def __init__(self, retry, health):
+    self._retry = retry
+    self._health = health
+
+  def health(self):
+    return (self._health, self._retry)
+
+
+class NotRetriable(HealthStatus):
+  def __init__(self, health):
+    super(NotRetriable, self).__init__(False, health)
+
+
+class Retriable(HealthStatus):
+  def __init__(self, health):
+    super(Retriable, self).__init__(True, health)
+
+
+class StatusHealthCheck(HealthCheck):
+  """Verifies the health of a task based on the task status. A task is healthy iff,
+    1. A task is in state RUNNING
+    2. A task that satisfies (1) and is already known has the same task id.
+  """
+  def __init__(self):
+    self._task_ids = {}
+
+  def health(self, task):
+    task_id = task.assignedTask.taskId
+    instance_id = task.assignedTask.instanceId
+    status = task.status
+
+    if status == ScheduleStatus.RUNNING:
+      if instance_id in self._task_ids:
+        return Retriable.alive() if task_id == self._task_ids.get(instance_id) else NotRetriable.dead()
+      else:
+        log.info('Detected RUNNING instance %s' % instance_id)
+        self._task_ids[instance_id] = task_id
+        return Retriable.alive()
+    else:
+      return Retriable.dead()
+
+
+class HttpHealthCheck(HealthCheck):
+  """Verifies the health of a task based on http health checks. A new http signaler is created for a
+  task iff,
+    1. The instance id of the task is unknown.
+    2. The instance id is known but the (host, port) is different for the task.
+  """
+  def __init__(self, http_signaler_factory=HttpSignaler):
+    self._http_signalers = {}
+    self._http_signaler_factory = http_signaler_factory
+
+  def health(self, task):
+    assigned_task = task.assignedTask
+    instance_id = assigned_task.instanceId
+    host_port = (assigned_task.slaveHost, assigned_task.assignedPorts['health'])
+    http_signaler = None
+    if instance_id in self._http_signalers:
+      checker_host_port, signaler = self._http_signalers.get(instance_id)
+      # Only reuse the health checker if it is for the same destination.
+      if checker_host_port == host_port:
+        http_signaler = signaler
+    if not http_signaler:
+      http_signaler = self._http_signaler_factory(host_port[1], host_port[0])
+      self._http_signalers[instance_id] = (host_port, http_signaler)
+    return Retriable.alive() if http_signaler.health()[0] else Retriable.dead()
+
+
+class ChainedHealthCheck(HealthCheck):
+  """Delegates health checks to configured health checkers."""
+  def __init__(self, *health_checkers):
+    self._health_checkers = health_checkers
+
+  def health(self, task):
+    for checker in self._health_checkers:
+      healthy, retriable = checker.health(task)
+      if not healthy:
+        return (healthy, retriable)
+    return Retriable.alive()
+
+
+class InstanceWatcherHealthCheck(HealthCheck):
+  """Makes the decision: if a task has health port, then use Status+HTTP, else use only status.
+     Caveat: Only works if either ALL tasks have a health port or none of them have a health port.
+  """
+  # TODO(atollenaere) Refactor the code to use the executor StatusChecker/HealthChecker instead
+
+  def __init__(self, http_signaler_factory=HttpSignaler):
+    self._has_health_port = False
+    self._health_checker = StatusHealthCheck()
+    self._http_signaler_factory = http_signaler_factory
+
+  def health(self, task):
+    if not self._has_health_port and 'health' in task.assignedTask.assignedPorts:
+      log.debug('Health port detected, enabling HTTP checks')
+      self._health_checker = ChainedHealthCheck(self._health_checker, HttpHealthCheck(self._http_signaler_factory))
+      self._has_health_port = True
+    return self._health_checker.health(task)

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/apache/aurora/client/api/instance_watcher.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/client/api/instance_watcher.py b/src/main/python/apache/aurora/client/api/instance_watcher.py
new file mode 100644
index 0000000..5c69487
--- /dev/null
+++ b/src/main/python/apache/aurora/client/api/instance_watcher.py
@@ -0,0 +1,134 @@
+import time
+
+from twitter.common import log
+from .health_check import InstanceWatcherHealthCheck
+
+from gen.twitter.aurora.ttypes import (
+  Identity,
+  ResponseCode,
+  ScheduleStatus,
+  TaskQuery,
+)
+
+class Instance(object):
+  def __init__(self, birthday=None, finished=False):
+    self.birthday = birthday
+    self.finished = finished
+    self.healthy = False
+
+  def set_healthy(self, value):
+    self.healthy = value
+    self.finished = True
+
+  def __str__(self):
+    return ('[birthday=%s, healthy=%s, finished=%s]' % (self.birthday, self.healthy, self.finished))
+
+
+class InstanceWatcher(object):
+  def __init__(self,
+               scheduler,
+               job_key,
+               restart_threshold,
+               watch_secs,
+               health_check_interval_seconds,
+               clock=time):
+
+    self._scheduler = scheduler
+    self._job_key = job_key
+    self._restart_threshold = restart_threshold
+    self._watch_secs = watch_secs
+    self._health_check_interval_seconds = health_check_interval_seconds
+    self._clock = clock
+
+  def watch(self, instance_ids, health_check=None):
+    """Watches a set of instances and detects failures based on a delegated health check.
+
+    Arguments:
+    instance_ids -- set of instances to watch.
+
+    Returns a set of instances that are considered failed.
+    """
+    log.info('Watching instances: %s' % instance_ids)
+    instance_ids = set(instance_ids)
+    health_check = health_check or InstanceWatcherHealthCheck()
+    now = self._clock.time()
+    expected_healthy_by = now + self._restart_threshold
+    max_time = now + self._restart_threshold + self._watch_secs
+
+    instance_states = {}
+
+    def finished_instances():
+      return dict((s_id, s) for s_id, s in instance_states.items() if s.finished)
+
+    def set_instance_healthy(instance_id, now):
+      if instance_id not in instance_states:
+        instance_states[instance_id] = Instance(now)
+      instance = instance_states.get(instance_id)
+      if now > (instance.birthday + self._watch_secs):
+        log.info('Instance %s has been up and healthy for at least %d seconds' % (
+          instance_id, self._watch_secs))
+        instance.set_healthy(True)
+
+    def maybe_set_instance_unhealthy(instance_id, retriable):
+      # An instance that was previously healthy and currently unhealthy has failed.
+      if instance_id in instance_states:
+        log.info('Instance %s is unhealthy' % instance_id)
+        instance_states[instance_id].set_healthy(False)
+      # If the restart threshold has expired or if the instance cannot be retried it is unhealthy.
+      elif now > expected_healthy_by or not retriable:
+        log.info('Instance %s was not reported healthy within %d seconds' % (
+          instance_id, self._restart_threshold))
+        instance_states[instance_id] = Instance(finished=True)
+
+    while True:
+      running_tasks = self._get_tasks_by_instance_id(instance_ids)
+      now = self._clock.time()
+      tasks_by_instance = dict((task.assignedTask.instanceId, task) for task in running_tasks)
+      for instance_id in instance_ids:
+        if instance_id not in finished_instances():
+          running_task = tasks_by_instance.get(instance_id)
+          if running_task is not None:
+            task_healthy, retriable = health_check.health(running_task)
+            if task_healthy:
+              set_instance_healthy(instance_id, now)
+            else:
+              maybe_set_instance_unhealthy(instance_id, retriable)
+          else:
+            # Set retriable=True since an instance should be retried if it has not been healthy.
+            maybe_set_instance_unhealthy(instance_id, retriable=True)
+
+      log.debug('Instances health: %s' % ['%s: %s' % val for val in instance_states.items()])
+
+      # Return if all tasks are finished.
+      if set(finished_instances().keys()) == instance_ids:
+        return set([s_id for s_id, s in instance_states.items() if not s.healthy])
+
+      # Return if time is up.
+      if now > max_time:
+        return set([s_id for s_id in instance_ids if s_id not in instance_states
+                                             or not instance_states[s_id].healthy])
+
+      self._clock.sleep(self._health_check_interval_seconds)
+
+  def _get_tasks_by_instance_id(self, instance_ids):
+    log.debug('Querying instance statuses.')
+    query = TaskQuery()
+    query.owner = Identity(role=self._job_key.role)
+    query.environment = self._job_key.environment
+    query.jobName = self._job_key.name
+    query.statuses = set([ScheduleStatus.RUNNING])
+
+    query.instanceIds = instance_ids
+    try:
+      resp = self._scheduler.getTasksStatus(query)
+    except IOError as e:
+      log.error('IO Exception during scheduler call: %s' % e)
+      return []
+
+    tasks = []
+    if resp.responseCode == ResponseCode.OK:
+      tasks = resp.result.scheduleStatusResult.tasks
+
+    log.debug('Response from scheduler: %s (message: %s)'
+        % (ResponseCode._VALUES_TO_NAMES[resp.responseCode], resp.message))
+    return tasks

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/apache/aurora/client/api/job_monitor.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/client/api/job_monitor.py b/src/main/python/apache/aurora/client/api/job_monitor.py
new file mode 100644
index 0000000..45edd1a
--- /dev/null
+++ b/src/main/python/apache/aurora/client/api/job_monitor.py
@@ -0,0 +1,63 @@
+import time
+
+from twitter.common.quantity import Amount, Time
+
+from gen.twitter.aurora.constants import (
+    LIVE_STATES,
+    TERMINAL_STATES
+)
+from gen.twitter.aurora.ttypes import (
+    Identity,
+    TaskQuery
+)
+
+from thrift.transport import TTransport
+
+
+class JobMonitor(object):
+  MIN_POLL_INTERVAL = Amount(10, Time.SECONDS)
+  MAX_POLL_INTERVAL = Amount(2, Time.MINUTES)
+
+  @classmethod
+  def running_or_finished(cls, status):
+    return status in (LIVE_STATES | TERMINAL_STATES)
+
+  @classmethod
+  def terminal(cls, status):
+    return status in TERMINAL_STATES
+
+  # TODO(ksweeney): Make this use the AuroraJobKey
+  def __init__(self, client, role, env, jobname):
+    self._client = client
+    self._query = TaskQuery(owner=Identity(role=role), environment=env, jobName=jobname)
+    self._initial_tasks = set()
+    self._initial_tasks = set(task.assignedTask.taskId for task in self.iter_query())
+
+  def iter_query(self):
+    try:
+      res = self._client.scheduler.getTasksStatus(self._query)
+    except TTransport.TTransportException as e:
+      print('Failed to query slaves from scheduler: %s' % e)
+      return
+    if res is None or res.result is None:
+      return
+    for task in res.result.scheduleStatusResult.tasks:
+      if task.assignedTask.taskId not in self._initial_tasks:
+        yield task
+
+  def states(self):
+    states = {}
+    for task in self.iter_query():
+      status, instance_id = task.status, task.assignedTask.instanceId
+      first_timestamp = task.taskEvents[0].timestamp
+      if instance_id not in states or first_timestamp > states[instance_id][0]:
+        states[instance_id] = (first_timestamp, status)
+    return dict((instance_id, status[1]) for (instance_id, status) in states.items())
+
+  def wait_until(self, predicate):
+    """Given a predicate (from ScheduleStatus => Boolean), return once all tasks
+       return true for that predicate."""
+    poll_interval = self.MIN_POLL_INTERVAL
+    while not all(predicate(state) for state in self.states().values()):
+      time.sleep(poll_interval.as_(Time.SECONDS))
+      poll_interval = min(self.MAX_POLL_INTERVAL, 2 * poll_interval)

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/apache/aurora/client/api/restarter.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/client/api/restarter.py b/src/main/python/apache/aurora/client/api/restarter.py
new file mode 100644
index 0000000..bf196e1
--- /dev/null
+++ b/src/main/python/apache/aurora/client/api/restarter.py
@@ -0,0 +1,73 @@
+from twitter.common import log
+
+from gen.twitter.aurora.constants import ACTIVE_STATES
+from gen.twitter.aurora.ttypes import ResponseCode
+
+from .instance_watcher import InstanceWatcher
+from .updater_util import FailureThreshold
+
+
+class Restarter(object):
+  def __init__(self,
+               job_key,
+               update_config,
+               health_check_interval_seconds,
+               scheduler,
+               instance_watcher=None,
+               lock=None):
+    self._job_key = job_key
+    self._update_config = update_config
+    self.health_check_interval_seconds = health_check_interval_seconds
+    self._scheduler = scheduler
+    self._lock = lock
+    self._instance_watcher = instance_watcher or InstanceWatcher(
+        scheduler,
+        job_key.to_thrift(),
+        update_config.restart_threshold,
+        update_config.watch_secs,
+        health_check_interval_seconds)
+
+  def restart(self, instances):
+    failure_threshold = FailureThreshold(
+        self._update_config.max_per_instance_failures,
+        self._update_config.max_total_failures)
+
+    if not instances:
+      query = self._job_key.to_thrift_query()
+      query.statuses = ACTIVE_STATES
+      status = self._scheduler.getTasksStatus(query)
+
+      if status.responseCode != ResponseCode.OK:
+        return status
+
+      tasks = status.result.scheduleStatusResult.tasks
+
+      instances = sorted(task.assignedTask.instanceId for task in tasks)
+      if not instances:
+        log.info("No instances specified, and no active instances found in job %s" % self._job_key)
+        log.info("Nothing to do.")
+        return status
+
+    log.info("Performing rolling restart of job %s (instances: %s)" % (self._job_key, instances))
+
+    while instances and not failure_threshold.is_failed_update():
+      batch = instances[:self._update_config.batch_size]
+      instances = instances[self._update_config.batch_size:]
+
+      log.info("Restarting instances: %s", batch)
+
+      resp = self._scheduler.restartShards(self._job_key.to_thrift(), batch, self._lock)
+      if resp.responseCode != ResponseCode.OK:
+        log.error('Error restarting instances: %s', resp.message)
+        return resp
+
+      failed_instances = self._instance_watcher.watch(batch)
+      instances += failed_instances
+      failure_threshold.update_failure_counts(failed_instances)
+
+    if failure_threshold.is_failed_update():
+      log.info("Restart failures threshold reached. Aborting")
+    else:
+      log.info("All instances were restarted successfully")
+
+    return resp

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/apache/aurora/client/api/scheduler_client.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/client/api/scheduler_client.py b/src/main/python/apache/aurora/client/api/scheduler_client.py
new file mode 100644
index 0000000..ffec604
--- /dev/null
+++ b/src/main/python/apache/aurora/client/api/scheduler_client.py
@@ -0,0 +1,257 @@
+import functools
+import threading
+import time
+
+from twitter.aurora.common.auth import make_session_key, SessionKeyError
+from twitter.aurora.common.cluster import Cluster
+from twitter.common import log
+from twitter.common.quantity import Amount, Time
+from twitter.common.rpc.transports.tsslsocket import DelayedHandshakeTSSLSocket
+from twitter.common.zookeeper.kazoo_client import TwitterKazooClient
+from twitter.common.zookeeper.serverset import ServerSet
+
+from gen.twitter.aurora import AuroraAdmin
+from gen.twitter.aurora.constants import CURRENT_API_VERSION
+
+from thrift.protocol import TBinaryProtocol
+from thrift.transport import TSocket, TTransport
+from pystachio import Boolean, Default, Integer, String
+
+
+class SchedulerClientTrait(Cluster.Trait):
+  zk                = String
+  zk_port           = Default(Integer, 2181)
+  scheduler_zk_path = String
+  scheduler_uri     = String
+  proxy_url         = String
+  auth_mechanism    = Default(String, 'UNAUTHENTICATED')
+  use_thrift_ssl    = Default(Boolean, False)
+
+
+class SchedulerClient(object):
+  THRIFT_RETRIES = 5
+  RETRY_TIMEOUT = Amount(1, Time.SECONDS)
+
+  class CouldNotConnect(Exception): pass
+
+  # TODO(wickman) Refactor per MESOS-3005 into two separate classes with separate traits:
+  #   ZookeeperClientTrait
+  #   DirectClientTrait
+  @classmethod
+  def get(cls, cluster, **kwargs):
+    if not isinstance(cluster, Cluster):
+      raise TypeError('"cluster" must be an instance of Cluster, got %s' % type(cluster))
+    cluster = cluster.with_trait(SchedulerClientTrait)
+    if cluster.zk:
+      return ZookeeperSchedulerClient(
+          cluster, port=cluster.zk_port, ssl=cluster.use_thrift_ssl, **kwargs)
+    elif cluster.scheduler_uri:
+      try:
+        host, port = cluster.scheduler_uri.split(':', 2)
+        port = int(port)
+      except ValueError:
+        raise ValueError('Malformed Cluster scheduler_uri: %s' % cluster.scheduler_uri)
+      return DirectSchedulerClient(host, port, ssl=cluster.use_thrift_ssl)
+    else:
+      raise ValueError('"cluster" does not specify zk or scheduler_uri')
+
+  def __init__(self, verbose=False, ssl=False):
+    self._client = None
+    self._verbose = verbose
+    self._ssl = ssl
+
+  def get_thrift_client(self):
+    if self._client is None:
+      self._client = self._connect()
+    return self._client
+
+  # per-class implementation -- mostly meant to set up a valid host/port
+  # pair and then delegate the opening to SchedulerClient._connect_scheduler
+  def _connect(self):
+    return None
+
+  @staticmethod
+  def _connect_scheduler(host, port, with_ssl=False):
+    if with_ssl:
+      socket = DelayedHandshakeTSSLSocket(host, port, delay_handshake=True, validate=False)
+    else:
+      socket = TSocket.TSocket(host, port)
+    transport = TTransport.TBufferedTransport(socket)
+    protocol = TBinaryProtocol.TBinaryProtocol(transport)
+    schedulerClient = AuroraAdmin.Client(protocol)
+    for _ in range(SchedulerClient.THRIFT_RETRIES):
+      try:
+        transport.open()
+        return schedulerClient
+      except TTransport.TTransportException:
+        time.sleep(SchedulerClient.RETRY_TIMEOUT.as_(Time.SECONDS))
+        continue
+      except Exception as e:
+        # Monkey-patched proxies, like socks, can generate a proxy error here.
+        # without adding a dependency, we can't catch those in a more specific way.
+        raise SchedulerClient.CouldNotConnect('Connection to scheduler failed: %s' % e)
+    raise SchedulerClient.CouldNotConnect('Could not connect to %s:%s' % (host, port))
+
+
+class ZookeeperSchedulerClient(SchedulerClient):
+  SERVERSET_TIMEOUT = Amount(10, Time.SECONDS)
+
+  @classmethod
+  def get_scheduler_serverset(cls, cluster, port=2181, verbose=False, **kw):
+    if cluster.zk is None:
+      raise ValueError('Cluster has no associated zookeeper ensemble!')
+    if cluster.scheduler_zk_path is None:
+      raise ValueError('Cluster has no defined scheduler path, must specify scheduler_zk_path '
+                       'in your cluster config!')
+    zk = TwitterKazooClient.make(str('%s:%s' % (cluster.zk, port)), verbose=verbose)
+    return zk, ServerSet(zk, cluster.scheduler_zk_path, **kw)
+
+  def __init__(self, cluster, port=2181, ssl=False, verbose=False):
+    SchedulerClient.__init__(self, verbose=verbose, ssl=ssl)
+    self._cluster = cluster
+    self._zkport = port
+    self._endpoint = None
+
+  def _connect(self):
+    joined = threading.Event()
+    def on_join(elements):
+      joined.set()
+    zk, serverset = self.get_scheduler_serverset(self._cluster, verbose=self._verbose,
+        port=self._zkport, on_join=on_join)
+    joined.wait(timeout=self.SERVERSET_TIMEOUT.as_(Time.SECONDS))
+    serverset_endpoints = list(serverset)
+    if len(serverset_endpoints) == 0:
+      raise self.CouldNotConnect('No schedulers detected in %s!' % self._cluster.name)
+    instance = serverset_endpoints[0]
+    self._endpoint = instance.service_endpoint
+    self._http = instance.additional_endpoints.get('http')
+    zk.stop()
+    return self._connect_scheduler(self._endpoint.host, self._endpoint.port, self._ssl)
+
+  @property
+  def url(self):
+    proxy_url = self._cluster.proxy_url
+    if proxy_url:
+      return proxy_url
+    if self._http:
+      return 'http://%s:%s' % (self._http.host, self._http.port)
+
+
+class DirectSchedulerClient(SchedulerClient):
+  def __init__(self, host, port, ssl=False):
+    SchedulerClient.__init__(self, verbose=True, ssl=ssl)
+    self._host = host
+    self._port = port
+
+  def _connect(self):
+    return self._connect_scheduler(self._host, self._port, with_ssl=self._ssl)
+
+  @property
+  def url(self):
+    # TODO(wickman) This is broken -- make this tunable in MESOS-3005
+    return 'http://%s:8081' % self._host
+
+
+class SchedulerProxy(object):
+  """
+    This class is responsible for creating a reliable thrift client to the
+    twitter scheduler.  Basically all the dirty work needed by the
+    AuroraClientAPI.
+  """
+  CONNECT_MAXIMUM_WAIT = Amount(1, Time.MINUTES)
+  RPC_RETRY_INTERVAL = Amount(5, Time.SECONDS)
+  RPC_MAXIMUM_WAIT = Amount(10, Time.MINUTES)
+  UNAUTHENTICATED_RPCS = frozenset([
+    'populateJobConfig',
+    'getTasksStatus',
+    'getJobs',
+    'getQuota',
+    'getVersion',
+  ])
+
+  class Error(Exception): pass
+  class TimeoutError(Error): pass
+  class AuthenticationError(Error): pass
+  class APIVersionError(Error): pass
+
+  def __init__(self, cluster, verbose=False, session_key_factory=make_session_key):
+    """A callable session_key_factory should be provided for authentication"""
+    self.cluster = cluster
+    # TODO(Sathya): Make this a part of cluster trait when authentication is pushed to the transport
+    # layer.
+    self._session_key_factory = session_key_factory
+    self._client = self._scheduler = None
+    self.verbose = verbose
+
+  def with_scheduler(method):
+    """Decorator magic to make sure a connection is made to the scheduler"""
+    def _wrapper(self, *args, **kwargs):
+      if not self._scheduler:
+        self._construct_scheduler()
+      return method(self, *args, **kwargs)
+    return _wrapper
+
+  def invalidate(self):
+    self._client = self._scheduler = None
+
+  @with_scheduler
+  def client(self):
+    return self._client
+
+  @with_scheduler
+  def scheduler(self):
+    return self._scheduler
+
+  def session_key(self):
+    try:
+      return self._session_key_factory(self.cluster.auth_mechanism)
+    except SessionKeyError as e:
+      raise self.AuthenticationError('Unable to create session key %s' % e)
+
+  def _construct_scheduler(self):
+    """
+      Populates:
+        self._scheduler
+        self._client
+    """
+    self._scheduler = SchedulerClient.get(self.cluster, verbose=self.verbose)
+    assert self._scheduler, "Could not find scheduler (cluster = %s)" % self.cluster.name
+    start = time.time()
+    while (time.time() - start) < self.CONNECT_MAXIMUM_WAIT.as_(Time.SECONDS):
+      try:
+        self._client = self._scheduler.get_thrift_client()
+        break
+      except SchedulerClient.CouldNotConnect as e:
+        log.warning('Could not connect to scheduler: %s' % e)
+    if not self._client:
+      raise self.TimeoutError('Timed out trying to connect to scheduler at %s' % self.cluster.name)
+
+    server_version = self._client.getVersion().result.getVersionResult
+    if server_version != CURRENT_API_VERSION:
+      raise self.APIVersionError("Client Version: %s, Server Version: %s" %
+                                 (CURRENT_API_VERSION, server_version))
+
+  def __getattr__(self, method_name):
+    # If the method does not exist, getattr will return AttributeError for us.
+    method = getattr(AuroraAdmin.Client, method_name)
+    if not callable(method):
+      return method
+
+    @functools.wraps(method)
+    def method_wrapper(*args):
+      start = time.time()
+      while (time.time() - start) < self.RPC_MAXIMUM_WAIT.as_(Time.SECONDS):
+        auth_args = () if method_name in self.UNAUTHENTICATED_RPCS else (self.session_key(),)
+        try:
+          method = getattr(self.client(), method_name)
+          if not callable(method):
+            return method
+          return method(*(args + auth_args))
+        except (TTransport.TTransportException, self.TimeoutError) as e:
+          log.warning('Connection error with scheduler: %s, reconnecting...' % e)
+          self.invalidate()
+          time.sleep(self.RPC_RETRY_INTERVAL.as_(Time.SECONDS))
+      raise self.TimeoutError('Timed out attempting to issue %s to %s' % (
+          method_name, self.cluster.name))
+
+    return method_wrapper

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/apache/aurora/client/api/updater.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/client/api/updater.py b/src/main/python/apache/aurora/client/api/updater.py
new file mode 100644
index 0000000..6e8e8a9
--- /dev/null
+++ b/src/main/python/apache/aurora/client/api/updater.py
@@ -0,0 +1,410 @@
+from collections import namedtuple
+from difflib import unified_diff
+
+from twitter.common import log
+
+from gen.twitter.aurora.constants import ACTIVE_STATES
+
+from gen.twitter.aurora.ttypes import (
+    AddInstancesConfig,
+    JobConfigValidation,
+    JobKey,
+    Identity,
+    Lock,
+    LockKey,
+    LockValidation,
+    Response,
+    ResponseCode,
+    TaskQuery,
+)
+from .updater_util import FailureThreshold, UpdaterConfig
+from .instance_watcher import InstanceWatcher
+from .scheduler_client import SchedulerProxy
+
+
+class Updater(object):
+  """Update the instances of a job in batches."""
+
+  class Error(Exception): pass
+
+  InstanceState = namedtuple('InstanceState', ['instance_id', 'is_updated'])
+  OperationConfigs = namedtuple('OperationConfigs', ['from_config', 'to_config'])
+  InstanceConfigs = namedtuple(
+      'InstanceConfigs',
+      ['remote_config_map', 'local_config_map', 'instances_to_process']
+  )
+
+  def __init__(self, config, health_check_interval_seconds, scheduler=None, instance_watcher=None):
+    self._config = config
+    self._job_key = JobKey(role=config.role(), environment=config.environment(), name=config.name())
+    self._health_check_interval_seconds = health_check_interval_seconds
+    self._scheduler = scheduler or SchedulerProxy(config.cluster())
+    try:
+      self._update_config = UpdaterConfig(**config.update_config().get())
+    except ValueError as e:
+      raise self.Error(str(e))
+    self._lock = None
+    self._watcher = instance_watcher or InstanceWatcher(
+        self._scheduler,
+        self._job_key,
+        self._update_config.restart_threshold,
+        self._update_config.watch_secs,
+        self._health_check_interval_seconds)
+
+  def _start(self):
+    """Starts an update by applying an exclusive lock on a job being updated.
+
+       Returns:
+         Response instance from the scheduler call.
+    """
+    resp = self._scheduler.acquireLock(LockKey(job=self._job_key))
+    if resp.responseCode == ResponseCode.OK:
+      self._lock = resp.result.acquireLockResult.lock
+    return resp
+
+  def _finish(self):
+    """Finishes an update by removing an exclusive lock on an updated job.
+
+       Returns:
+         Response instance from the scheduler call.
+    """
+    resp = self._scheduler.releaseLock(self._lock, LockValidation.CHECKED)
+
+    if resp.responseCode == ResponseCode.OK:
+      self._lock = None
+    else:
+      log.error('There was an error finalizing the update: %s' % resp.message)
+    return resp
+
+  def _update(self, instance_configs):
+    """Drives execution of the update logic. Performs a batched update/rollback for all instances
+    affected by the current update request.
+
+    Arguments:
+    instance_configs -- list of instance update configurations to go through.
+
+    Returns the set of instances that failed to update.
+    """
+    failure_threshold = FailureThreshold(
+        self._update_config.max_per_instance_failures,
+        self._update_config.max_total_failures
+    )
+
+    instance_operation = self.OperationConfigs(
+      from_config=instance_configs.remote_config_map,
+      to_config=instance_configs.local_config_map
+    )
+
+    remaining_instances = [
+        self.InstanceState(instance_id, is_updated=False)
+        for instance_id in instance_configs.instances_to_process
+    ]
+
+    log.info('Starting job update.')
+    while remaining_instances and not failure_threshold.is_failed_update():
+      batch_instances = remaining_instances[0 : self._update_config.batch_size]
+      remaining_instances = list(set(remaining_instances) - set(batch_instances))
+      instances_to_restart = [s.instance_id for s in batch_instances if s.is_updated]
+      instances_to_update = [s.instance_id for s in batch_instances if not s.is_updated]
+
+      instances_to_watch = []
+      if instances_to_restart:
+        instances_to_watch += self._restart_instances(instances_to_restart)
+
+      if instances_to_update:
+        instances_to_watch += self._update_instances(instances_to_update, instance_operation)
+
+      failed_instances = self._watcher.watch(instances_to_watch) if instances_to_watch else set()
+
+      if failed_instances:
+        log.error('Failed instances: %s' % failed_instances)
+
+      unretryable_instances = failure_threshold.update_failure_counts(failed_instances)
+      if unretryable_instances:
+        log.warn('Not restarting failed instances %s, which exceeded '
+                 'maximum allowed instance failure limit of %s' %
+                 (unretryable_instances, self._update_config.max_per_instance_failures))
+      retryable_instances = list(set(failed_instances) - set(unretryable_instances))
+      remaining_instances += [
+          self.InstanceState(instance_id, is_updated=True) for instance_id in retryable_instances
+      ]
+      remaining_instances.sort(key=lambda tup: tup.instance_id)
+
+    if failure_threshold.is_failed_update():
+      untouched_instances = [s.instance_id for s in remaining_instances if not s.is_updated]
+      instances_to_rollback = list(
+          set(instance_configs.instances_to_process) - set(untouched_instances)
+      )
+      self._rollback(instances_to_rollback, instance_configs)
+
+    return not failure_threshold.is_failed_update()
+
+  def _rollback(self, instances_to_rollback, instance_configs):
+    """Performs a rollback operation for the failed instances.
+
+    Arguments:
+    instances_to_rollback -- instance ids to rollback.
+    instance_configs -- instance configuration to use for rollback.
+    """
+    log.info('Reverting update for %s' % instances_to_rollback)
+    instance_operation = self.OperationConfigs(
+        from_config=instance_configs.local_config_map,
+        to_config=instance_configs.remote_config_map
+    )
+    instances_to_rollback.sort(reverse=True)
+    failed_instances = []
+    while instances_to_rollback:
+      batch_instances = instances_to_rollback[0 : self._update_config.batch_size]
+      instances_to_rollback = list(set(instances_to_rollback) - set(batch_instances))
+      instances_to_rollback.sort(reverse=True)
+      instances_to_watch = self._update_instances(batch_instances, instance_operation)
+      failed_instances += self._watcher.watch(instances_to_watch)
+
+    if failed_instances:
+      log.error('Rollback failed for instances: %s' % failed_instances)
+
+  def _create_kill_add_lists(self, instance_ids, operation_configs):
+    """Determines a particular action (kill or add) to use for every instance in instance_ids.
+
+    Arguments:
+    instance_ids -- current batch of IDs to process.
+    operation_configs -- OperationConfigs with update details.
+
+    Returns lists of instances to kill and to add.
+    """
+    to_kill = []
+    to_add = []
+    for instance_id in instance_ids:
+      from_config = operation_configs.from_config.get(instance_id)
+      to_config = operation_configs.to_config.get(instance_id)
+
+      if from_config and to_config:
+        # Sort internal dicts before comparing to rule out differences due to hashing.
+        diff_output = ''.join(unified_diff(
+          str(sorted(from_config.__dict__.items(), key=lambda x: x[0])),
+          str(sorted(to_config.__dict__.items(), key=lambda x: x[0]))))
+        if diff_output:
+          log.debug('Task configuration changed for instance [%s]:\n%s' % (instance_id, diff_output))
+          to_kill.append(instance_id)
+          to_add.append(instance_id)
+      elif from_config and not to_config:
+        to_kill.append(instance_id)
+      elif not from_config and to_config:
+        to_add.append(instance_id)
+      else:
+        raise self.Error('Instance %s is outside of supported range' % instance_id)
+
+    return to_kill, to_add
+
+  def _update_instances(self, instance_ids, operation_configs):
+    """Applies kill/add actions for the specified batch instances.
+
+    Arguments:
+    instance_ids -- current batch of IDs to process.
+    operation_configs -- OperationConfigs with update details.
+
+    Returns a list of added instances.
+    """
+    log.info('Examining instances: %s' % instance_ids)
+
+    to_kill, to_add = self._create_kill_add_lists(instance_ids, operation_configs)
+
+    unchanged = list(set(instance_ids) - set(to_kill + to_add))
+    if unchanged:
+      log.info('Skipping unchanged instances: %s' % unchanged)
+
+    # Kill is a blocking call in scheduler -> no need to watch it yet.
+    self._kill_instances(to_kill)
+    self._add_instances(to_add, operation_configs.to_config)
+    return to_add
+
+  def _kill_instances(self, instance_ids):
+    """Instructs the scheduler to kill instances and waits for completion.
+
+    Arguments:
+    instance_ids -- list of IDs to kill.
+    """
+    if instance_ids:
+      log.info('Killing instances: %s' % instance_ids)
+      query = self._create_task_query(instanceIds=frozenset(int(s) for s in instance_ids))
+      self._check_and_log_response(self._scheduler.killTasks(query, self._lock))
+      log.info('Instances killed')
+
+  def _add_instances(self, instance_ids, to_config):
+    """Instructs the scheduler to add instances.
+
+    Arguments:
+    instance_ids -- list of IDs to add.
+    to_config -- OperationConfigs with update details.
+    """
+    if instance_ids:
+      log.info('Adding instances: %s' % instance_ids)
+      add_config = AddInstancesConfig(
+          key=self._job_key,
+          taskConfig=to_config[instance_ids[0]],  # instance_ids will always have at least 1 item.
+          instanceIds=frozenset(int(s) for s in instance_ids))
+      self._check_and_log_response(self._scheduler.addInstances(add_config, self._lock))
+      log.info('Instances added')
+
+  def _restart_instances(self, instance_ids):
+    """Instructs the scheduler to restart instances.
+
+    Arguments:
+    instance_ids -- set of instances to be restarted by the scheduler.
+    """
+    log.info('Restarting instances: %s' % instance_ids)
+    resp = self._scheduler.restartShards(self._job_key, instance_ids, self._lock)
+    self._check_and_log_response(resp)
+    return instance_ids
+
+  def _get_update_instructions(self, instances=None):
+    """Loads, validates and populates update working set.
+
+    Arguments:
+    instances -- (optional) set of instances to update.
+
+    Returns:
+    InstanceConfigs with the following data:
+      remote_config_map -- dictionary of {key:instance_id, value:task_config} from scheduler.
+      local_config_map  -- dictionary of {key:instance_id, value:task_config} with local
+                           task configs validated and populated with default values.
+      instances_to_process -- list of instance IDs to go over in update.
+    """
+    # Load existing tasks and populate remote config map and instance list.
+    assigned_tasks = self._get_existing_tasks()
+    remote_config_map = {}
+    remote_instances = []
+    for assigned_task in assigned_tasks:
+      remote_config_map[assigned_task.instanceId] = assigned_task.task
+      remote_instances.append(assigned_task.instanceId)
+
+    # Validate local job config and populate local task config.
+    local_task_config = self._validate_and_populate_local_config()
+
+    # Union of local and remote instance IDs.
+    job_config_instances = list(range(self._config.instances()))
+    instance_superset = sorted(list(set(remote_instances) | set(job_config_instances)))
+
+    # Calculate the update working set.
+    if instances is None:
+      # Full job update -> union of remote and local instances
+      instances_to_process = instance_superset
+    else:
+      # Partial job update -> validate all instances are recognized
+      instances_to_process = instances
+      unrecognized = list(set(instances) - set(instance_superset))
+      if unrecognized:
+        raise self.Error('Instances %s are outside of supported range' % unrecognized)
+
+    # Populate local config map
+    local_config_map = dict.fromkeys(job_config_instances, local_task_config)
+
+    return self.InstanceConfigs(remote_config_map, local_config_map, instances_to_process)
+
+  def _get_existing_tasks(self):
+    """Loads all existing tasks from the scheduler.
+
+    Returns a list of AssignedTasks.
+    """
+    resp = self._scheduler.getTasksStatus(self._create_task_query())
+    self._check_and_log_response(resp)
+    return [t.assignedTask for t in resp.result.scheduleStatusResult.tasks]
+
+  def _validate_and_populate_local_config(self):
+    """Validates local job configuration and populates local task config with default values.
+
+    Returns a TaskConfig populated with default values.
+    """
+    resp = self._scheduler.populateJobConfig(self._config.job(), JobConfigValidation.RUN_FILTERS)
+    self._check_and_log_response(resp)
+
+    # Safe to take the first element as Scheduler would throw in case zero instances provided.
+    return list(resp.result.populateJobResult.populated)[0]
+
+  def _replace_template_if_cron(self):
+    """Checks if the provided job config represents a cron job and if so, replaces it.
+
+    Returns True if job is cron and False otherwise.
+    """
+    if self._config.job().cronSchedule:
+      resp = self._scheduler.replaceCronTemplate(self._config.job(), self._lock)
+      self._check_and_log_response(resp)
+      return True
+    else:
+      return False
+
+  def _create_task_query(self, instanceIds=None):
+    return TaskQuery(
+        owner=Identity(role=self._job_key.role),
+        environment=self._job_key.environment,
+        jobName=self._job_key.name,
+        statuses=ACTIVE_STATES,
+        instanceIds=instanceIds)
+
+  def _failed_response(self, message):
+    return Response(responseCode=ResponseCode.ERROR, message=message)
+
+  def update(self, instances=None):
+    """Performs the job update, blocking until it completes.
+    A rollback will be performed if the update was considered a failure based on the
+    update configuration.
+
+    Arguments:
+    instances -- (optional) instances to update. If not specified, all instances will be updated.
+
+    Returns a response object with update result status.
+    """
+    resp = self._start()
+    if resp.responseCode != ResponseCode.OK:
+      return resp
+
+    try:
+      # Handle cron jobs separately from other jobs.
+      if self._replace_template_if_cron():
+        log.info('Cron template updated, next run will reflect changes')
+        return self._finish()
+      else:
+        try:
+          instance_configs = self._get_update_instructions(instances)
+        except self.Error as e:
+          # Safe to release the lock acquired above as no job mutation has happened yet.
+          self._finish()
+          return self._failed_response('Unable to start job update: %s' % e)
+
+        if not self._update(instance_configs):
+          log.warn('Update failures threshold reached')
+          self._finish()
+          return self._failed_response('Update reverted')
+        else:
+          log.info('Update successful')
+          return self._finish()
+    except self.Error as e:
+      return self._failed_response('Aborting update without rollback! Fatal error: %s' % e)
+
+  @classmethod
+  def cancel_update(cls, scheduler, job_key):
+    """Cancels an update process by removing an exclusive lock on a provided job.
+
+    Arguments:
+    scheduler -- scheduler instance to use.
+    job_key -- job key to cancel update for.
+
+    Returns a response object with cancel update result status.
+    """
+    return scheduler.releaseLock(
+        Lock(key=LockKey(job=job_key.to_thrift())),
+        LockValidation.UNCHECKED)
+
+  def _check_and_log_response(self, resp):
+    """Checks scheduler return status, raises Error in case of unexpected response.
+
+    Arguments:
+    resp -- scheduler response object.
+
+    Raises Error in case of unexpected response status.
+    """
+    name, message = ResponseCode._VALUES_TO_NAMES[resp.responseCode], resp.message
+    if resp.responseCode == ResponseCode.OK:
+      log.debug('Response from scheduler: %s (message: %s)' % (name, message))
+    else:
+      raise self.Error(message)

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/apache/aurora/client/api/updater_util.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/client/api/updater_util.py b/src/main/python/apache/aurora/client/api/updater_util.py
new file mode 100644
index 0000000..db9f053
--- /dev/null
+++ b/src/main/python/apache/aurora/client/api/updater_util.py
@@ -0,0 +1,77 @@
+import collections
+
+from twitter.common import log
+
+class UpdaterConfig(object):
+  """
+  For updates involving a health check,
+
+  UPDATE INSTANCE                         HEALTHY              REMAIN HEALTHY
+  ----------------------------------------|-----------------------|
+  \--------------------------------------/ \----------------------/
+            restart_thresold                      watch_secs
+
+  When an update is initiated, an instance is expected to be "healthy" before restart_threshold.
+  An instance is also expected to remain healthy for at least watch_secs. If these conditions are
+  not satisfied, the instance is deemed unhealthy.
+  """
+  def __init__(self,
+               batch_size,
+               restart_threshold,
+               watch_secs,
+               max_per_shard_failures,
+               max_total_failures):
+
+    if batch_size <= 0:
+      raise ValueError('Batch size should be greater than 0')
+    if restart_threshold <= 0:
+      raise ValueError('Restart Threshold should be greater than 0')
+    if watch_secs <= 0:
+      raise ValueError('Watch seconds should be greater than 0')
+    self.batch_size = batch_size
+    self.restart_threshold = restart_threshold
+    self.watch_secs = watch_secs
+    self.max_total_failures = max_total_failures
+    self.max_per_instance_failures = max_per_shard_failures
+
+
+class FailureThreshold(object):
+  def __init__(self, max_per_instance_failures, max_total_failures):
+    self._max_per_instance_failures = max_per_instance_failures
+    self._max_total_failures = max_total_failures
+    self._failures_by_instance = collections.defaultdict(int)
+
+
+  def update_failure_counts(self, failed_instances):
+    """Update the failure counts metrics based upon a batch of failed instances.
+
+    Arguments:
+    failed_instances - list of failed instances
+
+    Returns a list of instances with failure counts exceeding _max_per_instance_failures.
+    """
+    exceeded_failure_count_instances = []
+    for instance in failed_instances:
+      self._failures_by_instance[instance] += 1
+      if self._failures_by_instance[instance] > self._max_per_instance_failures:
+        exceeded_failure_count_instances.append(instance)
+
+    return exceeded_failure_count_instances
+
+  def is_failed_update(self):
+    total_failed_instances = self._exceeded_instance_fail_count()
+    is_failed = total_failed_instances > self._max_total_failures
+
+    if is_failed:
+      log.error('%s failed instances observed, maximum allowed is %s' % (total_failed_instances,
+          self._max_total_failures))
+      for instance, failure_count in self._failures_by_instance.items():
+        if failure_count > self._max_per_instance_failures:
+          log.error('%s instance failures for instance %s, maximum allowed is %s' %
+              (failure_count, instance, self._max_per_instance_failures))
+    return is_failed
+
+  def _exceeded_instance_fail_count(self):
+    """Checks if the per instance failure is greater than a threshold."""
+    return sum(count > self._max_per_instance_failures
+               for count in self._failures_by_instance.values())

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/apache/aurora/client/base.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/client/base.py b/src/main/python/apache/aurora/client/base.py
new file mode 100644
index 0000000..0936743
--- /dev/null
+++ b/src/main/python/apache/aurora/client/base.py
@@ -0,0 +1,151 @@
+from collections import defaultdict
+import functools
+import sys
+from urlparse import urljoin
+
+from twitter.common import app, log
+
+from gen.twitter.aurora.ttypes import ResponseCode
+
+LOCKED_WARNING = """
+Note: if the scheduler detects that a job update is in progress (or was not
+properly completed) it will reject subsequent updates.  This is because your
+job is likely in a partially-updated state.  You should only begin another
+update if you are confident that nobody is updating this job, and that
+the job is in a state suitable for an update.
+
+After checking on the above, you may release the update lock on the job by
+invoking cancel_update.
+"""
+
+def die(msg):
+  log.fatal(msg)
+  sys.exit(1)
+
+def check_and_log_response(resp):
+  log.info('Response from scheduler: %s (message: %s)'
+      % (ResponseCode._VALUES_TO_NAMES[resp.responseCode], resp.message))
+  if resp.responseCode != ResponseCode.OK:
+    check_and_log_locked_response(resp)
+    sys.exit(1)
+
+def check_and_log_locked_response(resp):
+  if resp.responseCode == ResponseCode.LOCK_ERROR:
+    log.info(LOCKED_WARNING)
+
+def deprecation_warning(text):
+  log.warning('')
+  log.warning('*' * 80)
+  log.warning('* The command you ran is deprecated and will soon break!')
+  for line in text.split('\n'):
+    log.warning('* %s' % line)
+  log.warning('*' * 80)
+  log.warning('')
+
+
+class requires(object):
+  @staticmethod
+  def wrap_function(fn, fnargs, comparator):
+    @functools.wraps(fn)
+    def wrapped_function(args):
+      if not comparator(args, fnargs):
+        help = 'Incorrect parameters for %s' % fn.__name__
+        if fn.__doc__:
+          help = '%s\n\nsee the help subcommand for more details.' % fn.__doc__.split('\n')[0]
+        die(help)
+      return fn(*args)
+    return wrapped_function
+
+  @staticmethod
+  def exactly(*args):
+    def wrap(fn):
+      return requires.wrap_function(fn, args, (lambda want, got: len(want) == len(got)))
+    return wrap
+
+  @staticmethod
+  def at_least(*args):
+    def wrap(fn):
+      return requires.wrap_function(fn, args, (lambda want, got: len(want) >= len(got)))
+    return wrap
+
+  @staticmethod
+  def nothing(fn):
+    @functools.wraps(fn)
+    def real_fn(line):
+      return fn(*line)
+    return real_fn
+
+
+def synthesize_url(scheduler_url, role=None, env=None, job=None):
+  if not scheduler_url:
+    log.warning("Unable to find scheduler web UI!")
+    return None
+
+  if env and not role:
+    die('If env specified, must specify role')
+  if job and not (role and env):
+    die('If job specified, must specify role and env')
+
+  scheduler_url = urljoin(scheduler_url, 'scheduler')
+  if role:
+    scheduler_url += '/' + role
+    if env:
+      scheduler_url += '/' + env
+      if job:
+        scheduler_url += '/' + job
+  return scheduler_url
+
+
+def handle_open(scheduler_url, role, env, job):
+  url = synthesize_url(scheduler_url, role, env, job)
+  if url:
+    log.info('Job url: %s' % url)
+    if app.get_options().open_browser:
+      import webbrowser
+      webbrowser.open_new_tab(url)
+
+
+def make_commands_str(command_aliases):
+  """Format a string representation of a number of command aliases."""
+  commands = command_aliases[:]
+  commands.sort()
+  if len(commands) == 1:
+    return str(commands[0])
+  elif len(commands) == 2:
+    return '%s (or %s)' % (str(commands[0]), str(commands[1]))
+  else:
+    return '%s (or any of: %s)' % (str(commands[0]), ' '.join(map(str, commands[1:])))
+
+
+# TODO(wickman) This likely belongs in twitter.common.app (or split out as
+# part of a possible twitter.common.cli)
+def generate_full_usage():
+  """Generate verbose application usage from all registered
+     twitter.common.app commands and return as a string."""
+  docs_to_commands = defaultdict(list)
+  for (command, doc) in app.get_commands_and_docstrings():
+    docs_to_commands[doc].append(command)
+  def make_docstring(item):
+    (doc_text, commands) = item
+    def format_line(line):
+      return '    %s\n' % line.lstrip()
+    stripped = ''.join(map(format_line, doc_text.splitlines()))
+    return '%s\n%s' % (make_commands_str(commands), stripped)
+  usage = sorted(map(make_docstring, docs_to_commands.items()))
+  return 'Available commands:\n\n' + '\n'.join(usage)
+
+
+def generate_terse_usage():
+  """Generate minimal application usage from all registered
+     twitter.common.app commands and return as a string."""
+  docs_to_commands = defaultdict(list)
+  for (command, doc) in app.get_commands_and_docstrings():
+    docs_to_commands[doc].append(command)
+  usage = '\n    '.join(sorted(map(make_commands_str, docs_to_commands.values())))
+  return """
+Available commands:
+    %s
+
+For more help on an individual command:
+    %s help <command>
+""" % (usage, app.name())

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/apache/aurora/client/bin/BUILD
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/client/bin/BUILD b/src/main/python/apache/aurora/client/bin/BUILD
new file mode 100644
index 0000000..7802245
--- /dev/null
+++ b/src/main/python/apache/aurora/client/bin/BUILD
@@ -0,0 +1,25 @@
+python_binary(
+  name = 'aurora_client',
+  source = 'aurora_client.py',
+  entry_point = 'twitter.aurora.client.bin.aurora_client:proxy_main',
+  dependencies = [
+    pants('aurora/twitterdeps/src/python/twitter/common/app'),
+    pants('aurora/twitterdeps/src/python/twitter/common/log'),
+    pants('src/main/python/twitter/aurora/client/commands:all'),
+    pants('src/main/python/twitter/aurora/client:base'),
+  ]
+)
+
+python_binary(
+  name = 'aurora_admin',
+  source = 'aurora_admin.py',
+  entry_point = 'twitter.aurora.client.bin.aurora_admin:proxy_main',
+  dependencies = [
+    pants('aurora/twitterdeps/src/python/twitter/common/app'),
+    pants('aurora/twitterdeps/src/python/twitter/common/log'),
+    pants('src/main/python/twitter/aurora/client/commands:admin'),
+    pants('src/main/python/twitter/aurora/client/commands:help'),
+    pants('src/main/python/twitter/aurora/client:base'),
+    pants('src/main/python/twitter/aurora/client:options'),
+  ]
+)

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/apache/aurora/client/bin/__init__.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/client/bin/__init__.py b/src/main/python/apache/aurora/client/bin/__init__.py
new file mode 100644
index 0000000..e69de29

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/apache/aurora/client/bin/aurora_admin.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/client/bin/aurora_admin.py b/src/main/python/apache/aurora/client/bin/aurora_admin.py
new file mode 100644
index 0000000..53fd882
--- /dev/null
+++ b/src/main/python/apache/aurora/client/bin/aurora_admin.py
@@ -0,0 +1,23 @@
+from twitter.aurora.client.base import generate_terse_usage
+from twitter.aurora.client.commands import admin, help
+from twitter.aurora.client.options import add_verbosity_options
+from twitter.common import app
+from twitter.common.log.options import LogOptions
+
+
+app.register_commands_from(admin, help)
+add_verbosity_options()
+
+
+def main():
+  app.help()
+
+
+LogOptions.set_stderr_log_level('INFO')
+LogOptions.disable_disk_logging()
+app.set_name('aurora-admin')
+app.set_usage(generate_terse_usage())
+
+
+def proxy_main():
+  app.main()


Mime
View raw message