aurora-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From wfar...@apache.org
Subject [15/51] [partial] Rename twitter* and com.twitter to apache and org.apache directories to preserve all file history before the refactor.
Date Tue, 31 Dec 2013 21:20:08 GMT
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/twitter/aurora/admin/mesos_maintenance.py
----------------------------------------------------------------------
diff --git a/src/main/python/twitter/aurora/admin/mesos_maintenance.py b/src/main/python/twitter/aurora/admin/mesos_maintenance.py
deleted file mode 100644
index 4e14436..0000000
--- a/src/main/python/twitter/aurora/admin/mesos_maintenance.py
+++ /dev/null
@@ -1,113 +0,0 @@
-from collections import defaultdict
-import time
-
-from twitter.common import log
-from twitter.common.quantity import Amount, Time
-
-from twitter.aurora.client.api import AuroraClientAPI
-from twitter.aurora.client.base import check_and_log_response
-
-from gen.twitter.aurora.ttypes import Hosts, MaintenanceMode
-
-
-def group_by_host(hostname):
-  return hostname
-
-
-class MesosMaintenance(object):
-  """This class provides more methods to interact with the mesos cluster and perform
-  maintenance.
-  """
-
-  DEFAULT_GROUPING = 'by_host'
-  GROUPING_FUNCTIONS = {
-    'by_host': group_by_host,
-  }
-  START_MAINTENANCE_DELAY = Amount(30, Time.SECONDS)
-
-  @classmethod
-  def group_hosts(cls, hostnames, grouping_function=DEFAULT_GROUPING):
-    try:
-      grouping_function = cls.GROUPING_FUNCTIONS[grouping_function]
-    except KeyError:
-      raise ValueError('Unknown grouping function %s!' % grouping_function)
-    groups = defaultdict(set)
-    for hostname in hostnames:
-      groups[grouping_function(hostname)].add(hostname)
-    return groups
-
-  @classmethod
-  def iter_batches(cls, hostnames, batch_size, grouping_function=DEFAULT_GROUPING):
-    if batch_size <= 0:
-      raise ValueError('Batch size must be > 0!')
-    groups = cls.group_hosts(hostnames, grouping_function)
-    groups = sorted(groups.items(), key=lambda v: v[0])
-    for k in range(0, len(groups), batch_size):
-      yield Hosts(set.union(*(hostset for (key, hostset) in groups[k:k+batch_size])))
-
-  def __init__(self, cluster, verbosity):
-    self._client = AuroraClientAPI(cluster, verbosity == 'verbose')
-
-  def _drain_hosts(self, drainable_hosts, clock=time):
-    """This will actively turn down tasks running on hosts."""
-    check_and_log_response(self._client.drain_hosts(drainable_hosts))
-    not_ready_hosts = [hostname for hostname in drainable_hosts.hostNames]
-    while not_ready_hosts:
-      log.info("Sleeping for %s." % self.START_MAINTENANCE_DELAY)
-      clock.sleep(self.START_MAINTENANCE_DELAY.as_(Time.SECONDS))
-      resp = self._client.maintenance_status(Hosts(not_ready_hosts))
-      #TODO(jsmith): Workaround until scheduler responds with unknown slaves in MESOS-3454
-      if not resp.result.maintenanceStatusResult.statuses:
-        not_ready_hosts = None
-      for host_status in resp.result.maintenanceStatusResult.statuses:
-        if host_status.mode != MaintenanceMode.DRAINED:
-          log.warning('%s is currently in status %s' %
-              (host_status.host, MaintenanceMode._VALUES_TO_NAMES[host_status.mode]))
-        else:
-          not_ready_hosts.remove(host_status.host)
-
-  def _complete_maintenance(self, drained_hosts):
-    """End the maintenance status for a give set of hosts."""
-    check_and_log_response(self._client.end_maintenance(drained_hosts))
-    resp = self._client.maintenance_status(drained_hosts)
-    for host_status in resp.result.maintenanceStatusResult.statuses:
-      if host_status.mode != MaintenanceMode.NONE:
-        log.warning('%s is DRAINING or in DRAINED' % host_status.host)
-
-  def _operate_on_hosts(self, drained_hosts, callback):
-    """Perform a given operation on a list of hosts that are ready for maintenance."""
-    for host in drained_hosts.hostNames:
-      callback(host)
-
-  def end_maintenance(self, hosts):
-    """Pull a list of hosts out of maintenance mode."""
-    self._complete_maintenance(Hosts(set(hosts)))
-
-  def start_maintenance(self, hosts):
-    """Put a list of hosts into maintenance mode, to de-prioritize scheduling."""
-    check_and_log_response(self._client.start_maintenance(Hosts(set(hosts))))
-
-  def perform_maintenance(self, hosts, batch_size=1, grouping_function=DEFAULT_GROUPING,
-                          callback=None):
-    """The wrap a callback in between sending hosts into maintenance mode and back.
-
-    Walk through the process of putting hosts into maintenance, draining them of tasks,
-    performing an action on them once drained, then removing them from maintenance mode
-    so tasks can schedule.
-    """
-    self._complete_maintenance(Hosts(set(hosts)))
-    self.start_maintenance(hosts)
-
-    for hosts in self.iter_batches(hosts, batch_size, grouping_function):
-      self._drain_hosts(hosts)
-      if callback:
-        self._operate_on_hosts(hosts, callback)
-      self._complete_maintenance(hosts)
-
-  def check_status(self, hosts):
-    resp = self._client.maintenance_status(Hosts(set(hosts)))
-    check_and_log_response(resp)
-    statuses = []
-    for host_status in resp.result.maintenanceStatusResult.statuses:
-      statuses.append((host_status.host, MaintenanceMode._VALUES_TO_NAMES[host_status.mode]))
-    return statuses

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/twitter/aurora/client/BUILD
----------------------------------------------------------------------
diff --git a/src/main/python/twitter/aurora/client/BUILD b/src/main/python/twitter/aurora/client/BUILD
deleted file mode 100644
index 70eef2e..0000000
--- a/src/main/python/twitter/aurora/client/BUILD
+++ /dev/null
@@ -1,78 +0,0 @@
-import os
-
-# Create an alias for the previous target
-python_library(
-  name = 'api',
-  dependencies = [
-    pants('src/main/python/twitter/aurora/client/api')
-  ]
-)
-
-python_library(
-  name = 'base',
-  sources = ['base.py'],
-  dependencies = [
-    pants('aurora/twitterdeps/src/python/twitter/common/app'),
-    pants('aurora/twitterdeps/src/python/twitter/common/log'),
-    pants('src/main/thrift/com/twitter/aurora/gen:py-thrift'),
-  ]
-)
-
-python_library(
-  name = 'binding_helper',
-  sources = ['binding_helper.py'],
-  dependencies = [
-    pants('aurora/twitterdeps/src/python/twitter/common/lang'),
-  ]
-)
-
-python_library(
-  name = 'config',
-  sources = ['config.py'],
-  dependencies = [
-    pants('src/main/python/twitter/aurora/BUILD.thirdparty:pystachio'),
-    pants(':base'),
-    pants(':binding_helper'),
-    pants('aurora/twitterdeps/src/python/twitter/common/app'),
-    pants('aurora/twitterdeps/src/python/twitter/common/log'),
-    pants('src/main/python/twitter/aurora/config'),
-    pants('src/main/thrift/com/twitter/aurora/gen:py-thrift'),
-  ]
-)
-
-python_library(
-  name = 'factory',
-  sources = ['factory.py'],
-  dependencies = [
-    pants(':base'),
-    pants('aurora/twitterdeps/src/python/twitter/common/app'),
-    pants('src/main/python/twitter/aurora/client/hooks'),
-    pants('src/main/python/twitter/aurora/common:cluster'),
-    pants('src/main/python/twitter/aurora/common:clusters'),
-  ]
-)
-
-python_library(
-  name = 'options',
-  sources = ['options.py'],
-  dependencies = [
-    pants('src/main/python/twitter/thermos/common:options'),
-    pants('src/main/python/twitter/aurora/common:aurora_job_key'),
-  ]
-)
-
-python_library(
-  name = 'client-packaged',
-  dependencies = [
-    pants('src/main/python/twitter/aurora/common'),
-    pants('src/main/python/twitter/aurora/config'),
-    pants('src/main/python/twitter/thermos/common'),
-  ],
-  provides = setup_py(
-    name = 'twitter.aurora.client',
-    version = open(os.path.join(get_buildroot(), '.auroraversion')).read().strip().lower(),
-  ).with_binaries(
-    aurora_admin = pants('src/main/python/twitter/aurora/client/bin:aurora_admin'),
-    aurora_client = pants('src/main/python/twitter/aurora/client/bin:aurora_client'),
-  )
-)

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/twitter/aurora/client/__init__.py
----------------------------------------------------------------------
diff --git a/src/main/python/twitter/aurora/client/__init__.py b/src/main/python/twitter/aurora/client/__init__.py
deleted file mode 100644
index b0d6433..0000000
--- a/src/main/python/twitter/aurora/client/__init__.py
+++ /dev/null
@@ -1 +0,0 @@
-__import__('pkg_resources').declare_namespace(__name__)
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/twitter/aurora/client/api/BUILD
----------------------------------------------------------------------
diff --git a/src/main/python/twitter/aurora/client/api/BUILD b/src/main/python/twitter/aurora/client/api/BUILD
deleted file mode 100644
index 4935b8a..0000000
--- a/src/main/python/twitter/aurora/client/api/BUILD
+++ /dev/null
@@ -1,104 +0,0 @@
-python_library(
-  name = 'api',
-  sources = ['__init__.py'],
-  dependencies = [
-    pants(':restarter'),
-    pants(':scheduler_client'),
-    pants(':updater'),
-    pants('aurora/twitterdeps/src/python/twitter/common/lang'),
-    pants('aurora/twitterdeps/src/python/twitter/common/log'),
-    pants('src/main/python/twitter/aurora/common'),
-    pants('src/main/thrift/com/twitter/aurora/gen:py-thrift'),
-  ]
-)
-
-python_library(
-  name = 'command_runner',
-  sources = ['command_runner.py'],
-  dependencies = [
-    pants('src/main/python/twitter/aurora/BUILD.thirdparty:pystachio'),
-    pants(':api'),
-    pants('src/main/python/twitter/thermos/config:schema'),
-    pants('src/main/python/twitter/aurora/common:cluster'),
-    pants('src/main/python/twitter/aurora/config:schema'),
-    pants('src/main/thrift/com/twitter/aurora/gen:py-thrift'),
-  ]
-)
-
-python_library(
-  name = 'disambiguator',
-  sources = ['disambiguator.py'],
-  dependencies = [
-    pants(':api'),
-    pants('aurora/twitterdeps/src/python/twitter/common/log'),
-    pants('src/main/python/twitter/aurora/client:base'),
-    pants('src/main/python/twitter/aurora/common'),
-  ]
-)
-
-python_library(
-  name = 'job_monitor',
-  sources = ['job_monitor.py'],
-  dependencies = [
-    pants('aurora/twitterdeps/src/python/twitter/common/quantity'),
-    pants('src/main/thrift/com/twitter/aurora/gen:py-thrift'),
-  ]
-)
-
-python_library(
-  name = 'restarter',
-  sources = ['restarter.py'],
-  dependencies = [
-    pants(':instance_watcher'),
-    pants(':updater_util'),
-    pants('aurora/twitterdeps/src/python/twitter/common/log'),
-    pants('src/main/thrift/com/twitter/aurora/gen:py-thrift'),
-  ]
-)
-
-python_library(
-  name = 'scheduler_client',
-  sources = ['scheduler_client.py'],
-  dependencies = [
-    pants('aurora/twitterdeps/src/python/twitter/common/log'),
-    pants('aurora/twitterdeps/src/python/twitter/common/quantity'),
-    pants('aurora/twitterdeps/src/python/twitter/common/rpc/transports:tsslsocket'),
-    pants('aurora/twitterdeps/src/python/twitter/common/zookeeper/serverset:kazoo_serverset'),
-    pants('aurora/twitterdeps/src/python/twitter/common/zookeeper:kazoo_client'),
-    pants('src/main/python/twitter/aurora/common/auth'),
-    pants('src/main/python/twitter/aurora/common:cluster'),
-    pants('src/main/thrift/com/twitter/aurora/gen:py-thrift'),
-  ]
-)
-
-python_library(
-  name = 'instance_watcher',
-  sources = ['instance_watcher.py', 'health_check.py'],
-  dependencies = [
-    pants(':scheduler_client'),
-    pants('aurora/twitterdeps/src/python/twitter/common/lang'),
-    pants('aurora/twitterdeps/src/python/twitter/common/log'),
-    pants('src/main/python/twitter/aurora/common:http_signaler'),
-    pants('src/main/thrift/com/twitter/aurora/gen:py-thrift'),
-  ]
-)
-
-python_library(
-  name = 'updater',
-  sources = ['updater.py'],
-  dependencies = [
-    pants(':scheduler_client'),
-    pants(':instance_watcher'),
-    pants(':updater_util'),
-    pants('aurora/twitterdeps/src/python/twitter/common/log'),
-    pants('src/main/thrift/com/twitter/aurora/gen:py-thrift'),
-  ]
-)
-
-python_library(
-  name = 'updater_util',
-  sources = ['updater_util.py'],
-  dependencies = [
-    pants('aurora/twitterdeps/src/python/twitter/common/log'),
-  ]
-)

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/twitter/aurora/client/api/__init__.py
----------------------------------------------------------------------
diff --git a/src/main/python/twitter/aurora/client/api/__init__.py b/src/main/python/twitter/aurora/client/api/__init__.py
deleted file mode 100644
index 60f4011..0000000
--- a/src/main/python/twitter/aurora/client/api/__init__.py
+++ /dev/null
@@ -1,190 +0,0 @@
-from twitter.common import log
-
-from twitter.aurora.common.aurora_job_key import AuroraJobKey
-from twitter.aurora.common.auth import make_session_key
-from twitter.aurora.common.cluster import Cluster
-
-from gen.twitter.aurora.constants import LIVE_STATES
-from gen.twitter.aurora.ttypes import (
-    Response,
-    Identity,
-    Quota,
-    ResponseCode,
-    TaskQuery)
-
-from .restarter import Restarter
-from .scheduler_client import SchedulerProxy
-from .updater import Updater
-
-
-class AuroraClientAPI(object):
-  """This class provides the API to talk to the twitter scheduler"""
-
-  class Error(Exception): pass
-  class TypeError(Error, TypeError): pass
-  class ClusterMismatch(Error, ValueError): pass
-
-  def __init__(self, cluster, verbose=False, session_key_factory=make_session_key):
-    if not isinstance(cluster, Cluster):
-      raise TypeError('AuroraClientAPI expects instance of Cluster for "cluster", got %s' %
-          type(cluster))
-    self._scheduler = SchedulerProxy(
-        cluster, verbose=verbose, session_key_factory=session_key_factory)
-    self._cluster = cluster
-
-  @property
-  def cluster(self):
-    return self._cluster
-
-  @property
-  def scheduler(self):
-    return self._scheduler
-
-  def create_job(self, config, lock=None):
-    log.info('Creating job %s' % config.name())
-    log.debug('Full configuration: %s' % config.job())
-    log.debug('Lock %s' % lock)
-    return self._scheduler.createJob(config.job(), lock)
-
-  def populate_job_config(self, config, validation=None):
-    return self._scheduler.populateJobConfig(config.job(), validation)
-
-  def start_cronjob(self, job_key):
-    self._assert_valid_job_key(job_key)
-
-    log.info("Starting cron job: %s" % job_key)
-    return self._scheduler.startCronJob(job_key.to_thrift())
-
-  def get_jobs(self, role):
-    log.info("Retrieving jobs for role %s" % role)
-    return self._scheduler.getJobs(role)
-
-  def kill_job(self, job_key, instances=None, lock=None):
-    log.info("Killing tasks for job: %s" % job_key)
-    if not isinstance(job_key, AuroraJobKey):
-      raise TypeError('Expected type of job_key %r to be %s but got %s instead'
-          % (job_key, AuroraJobKey.__name__, job_key.__class__.__name__))
-
-    # Leave query.owner.user unset so the query doesn't filter jobs only submitted by a particular
-    # user.
-    # TODO(wfarner): Refactor this when Identity is removed from TaskQuery.
-    query = job_key.to_thrift_query()
-    if instances is not None:
-      log.info("Instances to be killed: %s" % instances)
-      query.instanceIds = frozenset([int(s) for s in instances])
-    return self._scheduler.killTasks(query, lock)
-
-  def check_status(self, job_key):
-    self._assert_valid_job_key(job_key)
-
-    log.info("Checking status of %s" % job_key)
-    return self.query(job_key.to_thrift_query())
-
-  @classmethod
-  def build_query(cls, role, job, instances=None, statuses=LIVE_STATES, env=None):
-    return TaskQuery(owner=Identity(role=role),
-                     jobName=job,
-                     statuses=statuses,
-                     instanceIds=instances,
-                     environment=env)
-
-  def query(self, query):
-    return self._scheduler.getTasksStatus(query)
-
-  def update_job(self, config, health_check_interval_seconds=3, instances=None):
-    """Run a job update for a given config, for the specified instances.  If
-       instances is left unspecified, update all instances.  Returns whether or not
-       the update was successful."""
-
-    log.info("Updating job: %s" % config.name())
-    updater = Updater(config, health_check_interval_seconds, self._scheduler)
-
-    return updater.update(instances)
-
-  def cancel_update(self, job_key):
-    """Cancel the update represented by job_key. Returns whether or not the cancellation was
-       successful."""
-    self._assert_valid_job_key(job_key)
-
-    log.info("Canceling update on job %s" % job_key)
-    resp = Updater.cancel_update(self._scheduler, job_key)
-    if resp.responseCode != ResponseCode.OK:
-      log.error('Error cancelling the update: %s' % resp.message)
-    return resp
-
-  def restart(self, job_key, instances, updater_config, health_check_interval_seconds):
-    """Perform a rolling restart of the job. If instances is None or [], restart all instances. Returns
-       the scheduler response for the last restarted batch of instances (which allows the client to
-       show the job URL), or the status check response if no tasks were active.
-    """
-    self._assert_valid_job_key(job_key)
-
-    return Restarter(job_key, updater_config, health_check_interval_seconds, self._scheduler
-    ).restart(instances)
-
-  def start_maintenance(self, hosts):
-    log.info("Starting maintenance for: %s" % hosts.hostNames)
-    return self._scheduler.startMaintenance(hosts)
-
-  def drain_hosts(self, hosts):
-    log.info("Draining tasks on: %s" % hosts.hostNames)
-    return self._scheduler.drainHosts(hosts)
-
-  def maintenance_status(self, hosts):
-    log.info("Maintenance status for: %s" % hosts.hostNames)
-    return self._scheduler.maintenanceStatus(hosts)
-
-  def end_maintenance(self, hosts):
-    log.info("Ending maintenance for: %s" % hosts.hostNames)
-    return self._scheduler.endMaintenance(hosts)
-
-  def get_quota(self, role):
-    log.info("Getting quota for: %s" % role)
-    return self._scheduler.getQuota(role)
-
-  def set_quota(self, role, cpu, ram_mb, disk_mb):
-    log.info("Setting quota for user:%s cpu:%f ram_mb:%d disk_mb: %d"
-              % (role, cpu, ram_mb, disk_mb))
-    return self._scheduler.setQuota(role, Quota(cpu, ram_mb, disk_mb))
-
-  def force_task_state(self, task_id, status):
-    log.info("Requesting that task %s transition to state %s" % (task_id, status))
-    return self._scheduler.forceTaskState(task_id, status)
-
-  def perform_backup(self):
-    return self._scheduler.performBackup()
-
-  def list_backups(self):
-    return self._scheduler.listBackups()
-
-  def stage_recovery(self, backup_id):
-    return self._scheduler.stageRecovery(backup_id)
-
-  def query_recovery(self, query):
-    return self._scheduler.queryRecovery(query)
-
-  def delete_recovery_tasks(self, query):
-    return self._scheduler.deleteRecoveryTasks(query)
-
-  def commit_recovery(self):
-    return self._scheduler.commitRecovery()
-
-  def unload_recovery(self):
-    return self._scheduler.unloadRecovery()
-
-  def get_job_updates(self):
-    return self._scheduler.getJobUpdates()
-
-  def snapshot(self):
-    return self._scheduler.snapshot()
-
-  def unsafe_rewrite_config(self, rewrite_request):
-    return self._scheduler.rewriteConfigs(rewrite_request)
-
-  def _assert_valid_job_key(self, job_key):
-    if not isinstance(job_key, AuroraJobKey):
-      raise self.TypeError('Invalid job_key %r: expected %s but got %s'
-          % (job_key, AuroraJobKey.__name__, job_key.__class__.__name__))
-    if job_key.cluster != self.cluster.name:
-      raise self.ClusterMismatch('job %s does not belong to cluster %s' % (job_key,
-          self.cluster.name))

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/twitter/aurora/client/api/command_runner.py
----------------------------------------------------------------------
diff --git a/src/main/python/twitter/aurora/client/api/command_runner.py b/src/main/python/twitter/aurora/client/api/command_runner.py
deleted file mode 100644
index 0a8ad33..0000000
--- a/src/main/python/twitter/aurora/client/api/command_runner.py
+++ /dev/null
@@ -1,120 +0,0 @@
-from multiprocessing.pool import ThreadPool
-import posixpath
-import subprocess
-
-from twitter.common import log
-
-from twitter.aurora.client.api import AuroraClientAPI
-from twitter.aurora.config.schema.base import MesosContext
-from twitter.aurora.common.cluster import Cluster
-from twitter.thermos.config.schema import ThermosContext
-
-from gen.twitter.aurora.constants import LIVE_STATES
-from gen.twitter.aurora.ttypes import (
-  Identity,
-  ResponseCode,
-  TaskQuery)
-
-from pystachio import Environment, Required, String
-
-
-class CommandRunnerTrait(Cluster.Trait):
-  slave_root          = Required(String)
-  slave_run_directory = Required(String)
-
-
-class DistributedCommandRunner(object):
-  @staticmethod
-  def execute(args):
-    hostname, role, command = args
-    ssh_command = ['ssh', '-n', '-q', '%s@%s' % (role, hostname), command]
-    po = subprocess.Popen(ssh_command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
-    output = po.communicate()
-    return '\n'.join('%s:  %s' % (hostname, line) for line in output[0].splitlines())
-
-  @classmethod
-  def make_executor_path(cls, cluster, executor_name):
-    parameters = cls.sandbox_args(cluster)
-    parameters.update(executor_name=executor_name)
-    return posixpath.join(
-        '%(slave_root)s',
-        'slaves/*/frameworks/*/executors/%(executor_name)s/runs',
-        '%(slave_run_directory)s'
-    ) % parameters
-
-  @classmethod
-  def thermos_sandbox(cls, cluster, executor_sandbox=False):
-    sandbox = cls.make_executor_path(cluster, 'thermos-{{thermos.task_id}}')
-    return sandbox if executor_sandbox else posixpath.join(sandbox, 'sandbox')
-
-  @classmethod
-  def sandbox_args(cls, cluster):
-    cluster = cluster.with_trait(CommandRunnerTrait)
-    return {'slave_root': cluster.slave_root, 'slave_run_directory': cluster.slave_run_directory}
-
-  @classmethod
-  def substitute_thermos(cls, command, task, cluster, **kw):
-    prefix_command = 'cd %s;' % cls.thermos_sandbox(cluster, **kw)
-    thermos_namespace = ThermosContext(
-        task_id=task.assignedTask.taskId,
-        ports=task.assignedTask.assignedPorts)
-    mesos_namespace = MesosContext(instance=task.assignedTask.instanceId)
-    command = String(prefix_command + command) % Environment(
-        thermos=thermos_namespace,
-        mesos=mesos_namespace)
-    return command.get()
-
-  @classmethod
-  def aurora_sandbox(cls, cluster, executor_sandbox=False):
-    if executor_sandbox:
-      return cls.make_executor_path(cluster, 'twitter')
-    else:
-      return '/var/run/nexus/%task_id%/sandbox'
-
-  @classmethod
-  def substitute_aurora(cls, command, task, cluster, **kw):
-    command = ('cd %s;' % cls.aurora_sandbox(cluster, **kw)) + command
-    command = command.replace('%shard_id%', str(task.assignedTask.instanceId))
-    command = command.replace('%task_id%', task.assignedTask.taskId)
-    for name, port in task.assignedTask.assignedPorts.items():
-      command = command.replace('%port:' + name + '%', str(port))
-    return command
-
-  @classmethod
-  def substitute(cls, command, task, cluster, **kw):
-    if task.assignedTask.task.executorConfig:
-      return cls.substitute_thermos(command, task, cluster, **kw)
-    else:
-      return cls.substitute_aurora(command, task, cluster, **kw)
-
-  @classmethod
-  def query_from(cls, role, env, job):
-    return TaskQuery(statuses=LIVE_STATES, owner=Identity(role), jobName=job, environment=env)
-
-  def __init__(self, cluster, role, env, jobs, ssh_user=None):
-    self._cluster = cluster
-    self._api = AuroraClientAPI(cluster=cluster)
-    self._role = role
-    self._env = env
-    self._jobs = jobs
-    self._ssh_user = ssh_user if ssh_user else self._role
-
-  def resolve(self):
-    for job in self._jobs:
-      resp = self._api.query(self.query_from(self._role, self._env, job))
-      if resp.responseCode != ResponseCode.OK:
-        log.error('Failed to query job: %s' % job)
-        continue
-      for task in resp.result.scheduleStatusResult.tasks:
-        yield task
-
-  def process_arguments(self, command, **kw):
-    for task in self.resolve():
-      host = task.assignedTask.slaveHost
-      role = task.assignedTask.task.owner.role
-      yield (host, self._ssh_user, self.substitute(command, task, self._cluster, **kw))
-
-  def run(self, command, parallelism=1, **kw):
-    threadpool = ThreadPool(processes=parallelism)
-    for result in threadpool.imap_unordered(self.execute, self.process_arguments(command, **kw)):
-      print result

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/twitter/aurora/client/api/disambiguator.py
----------------------------------------------------------------------
diff --git a/src/main/python/twitter/aurora/client/api/disambiguator.py b/src/main/python/twitter/aurora/client/api/disambiguator.py
deleted file mode 100644
index 4693574..0000000
--- a/src/main/python/twitter/aurora/client/api/disambiguator.py
+++ /dev/null
@@ -1,89 +0,0 @@
-from twitter.common import log
-
-from twitter.aurora.client.api import AuroraClientAPI
-from twitter.aurora.client.base import check_and_log_response, deprecation_warning, die
-from twitter.aurora.common.aurora_job_key import AuroraJobKey
-
-
-class LiveJobDisambiguator(object):
-  """
-  Disambiguates a job-specification into concrete AuroraJobKeys by querying the scheduler API.
-  """
-
-  def __init__(self, client, role, env, name):
-    if not isinstance(client, AuroraClientAPI):
-      raise TypeError("client must be a AuroraClientAPI")
-    self._client = client
-
-    if not role:
-      raise ValueError("role is required")
-    self._role = role
-    if not name:
-      raise ValueError("name is required")
-    self._name = name
-    self._env = env
-
-  @property
-  def ambiguous(self):
-    return not all((self._role, self._env, self._name))
-
-  def query_matches(self):
-    resp = self._client.get_jobs(self._role)
-    check_and_log_response(resp)
-    return set(AuroraJobKey(self._client.cluster.name, j.key.role, j.key.environment, j.key.name)
-        for j in resp.result.getJobsResult.configs if j.key.name == self._name)
-
-  @classmethod
-  def _disambiguate_or_die(cls, client, role, env, name):
-    # Returns a single AuroraJobKey if one can be found given the args, potentially
-    # querying the scheduler. Calls die() with an appropriate error message otherwise.
-    try:
-      disambiguator = cls(client, role, env, name)
-    except ValueError as e:
-      die(e)
-
-    if not disambiguator.ambiguous:
-      return AuroraJobKey(client.cluster.name, role, env, name)
-
-    deprecation_warning("Job ambiguously specified - querying the scheduler to disambiguate")
-    matches = disambiguator.query_matches()
-    if len(matches) == 1:
-      (match,) = matches
-      log.info("Found job %s" % match)
-      return match
-    elif len(matches) == 0:
-      die("No jobs found")
-    else:
-      die("Multiple jobs match (%s) - disambiguate by using the CLUSTER/ROLE/ENV/NAME form"
-          % ",".join(str(m) for m in matches))
-
-  @classmethod
-  def disambiguate_args_or_die(cls, args, options, client_factory=AuroraClientAPI):
-    """
-    Returns a (AuroraClientAPI, AuroraJobKey, AuroraConfigFile:str) tuple
-    if one can be found given the args, potentially querying the scheduler with the returned client.
-    Calls die() with an appropriate error message otherwise.
-
-    Arguments:
-      args: args from app command invocation.
-      options: options from app command invocation. must have env and cluster attributes.
-      client_factory: a callable (cluster) -> AuroraClientAPI.
-    """
-    if not len(args) > 0:
-      die('job path is required')
-    try:
-      job_key = AuroraJobKey.from_path(args[0])
-      client = client_factory(job_key.cluster)
-      config_file = args[1] if len(args) > 1 else None  # the config for hooks
-      return client, job_key, config_file
-    except AuroraJobKey.Error:
-      log.warning("Failed to parse job path, falling back to compatibility mode")
-      role = args[0] if len(args) > 0 else None
-      name = args[1] if len(args) > 1 else None
-      env = None
-      config_file = None  # deprecated form does not support hooks functionality
-      cluster = options.cluster
-      if not cluster:
-        die('cluster is required')
-      client = client_factory(cluster)
-      return client, cls._disambiguate_or_die(client, role, env, name), config_file

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/twitter/aurora/client/api/health_check.py
----------------------------------------------------------------------
diff --git a/src/main/python/twitter/aurora/client/api/health_check.py b/src/main/python/twitter/aurora/client/api/health_check.py
deleted file mode 100644
index b75ee6e..0000000
--- a/src/main/python/twitter/aurora/client/api/health_check.py
+++ /dev/null
@@ -1,123 +0,0 @@
-from abc import abstractmethod
-
-from twitter.common import log
-from twitter.common.lang import Interface
-
-from twitter.aurora.common.http_signaler import HttpSignaler
-
-from gen.twitter.aurora.ttypes import ScheduleStatus
-
-
-class HealthCheck(Interface):
-  @abstractmethod
-  def health(self, task):
-    """Checks health of the task and returns a (healthy, retriable) pair."""
-
-
-class HealthStatus(object):
-  @classmethod
-  def alive(cls):
-    return cls(True).health()
-
-  @classmethod
-  def dead(cls):
-    return cls(False).health()
-
-  def __init__(self, retry, health):
-    self._retry = retry
-    self._health = health
-
-  def health(self):
-    return (self._health, self._retry)
-
-
-class NotRetriable(HealthStatus):
-  def __init__(self, health):
-    super(NotRetriable, self).__init__(False, health)
-
-
-class Retriable(HealthStatus):
-  def __init__(self, health):
-    super(Retriable, self).__init__(True, health)
-
-
-class StatusHealthCheck(HealthCheck):
-  """Verifies the health of a task based on the task status. A task is healthy iff,
-    1. A task is in state RUNNING
-    2. A task that satisfies (1) and is already known has the same task id.
-  """
-  def __init__(self):
-    self._task_ids = {}
-
-  def health(self, task):
-    task_id = task.assignedTask.taskId
-    instance_id = task.assignedTask.instanceId
-    status = task.status
-
-    if status == ScheduleStatus.RUNNING:
-      if instance_id in self._task_ids:
-        return Retriable.alive() if task_id == self._task_ids.get(instance_id) else NotRetriable.dead()
-      else:
-        log.info('Detected RUNNING instance %s' % instance_id)
-        self._task_ids[instance_id] = task_id
-        return Retriable.alive()
-    else:
-      return Retriable.dead()
-
-
-class HttpHealthCheck(HealthCheck):
-  """Verifies the health of a task based on http health checks. A new http signaler is created for a
-  task iff,
-    1. The instance id of the task is unknown.
-    2. The instance id is known but the (host, port) is different for the task.
-  """
-  def __init__(self, http_signaler_factory=HttpSignaler):
-    self._http_signalers = {}
-    self._http_signaler_factory = http_signaler_factory
-
-  def health(self, task):
-    assigned_task = task.assignedTask
-    instance_id = assigned_task.instanceId
-    host_port = (assigned_task.slaveHost, assigned_task.assignedPorts['health'])
-    http_signaler = None
-    if instance_id in self._http_signalers:
-      checker_host_port, signaler = self._http_signalers.get(instance_id)
-      # Only reuse the health checker if it is for the same destination.
-      if checker_host_port == host_port:
-        http_signaler = signaler
-    if not http_signaler:
-      http_signaler = self._http_signaler_factory(host_port[1], host_port[0])
-      self._http_signalers[instance_id] = (host_port, http_signaler)
-    return Retriable.alive() if http_signaler.health()[0] else Retriable.dead()
-
-
-class ChainedHealthCheck(HealthCheck):
-  """Delegates health checks to configured health checkers."""
-  def __init__(self, *health_checkers):
-    self._health_checkers = health_checkers
-
-  def health(self, task):
-    for checker in self._health_checkers:
-      healthy, retriable = checker.health(task)
-      if not healthy:
-        return (healthy, retriable)
-    return Retriable.alive()
-
-
-class InstanceWatcherHealthCheck(HealthCheck):
-  """Makes the decision: if a task has health port, then use Status+HTTP, else use only status.
-     Caveat: Only works if either ALL tasks have a health port or none of them have a health port.
-  """
-  # TODO(atollenaere) Refactor the code to use the executor StatusChecker/HealthChecker instead
-
-  def __init__(self, http_signaler_factory=HttpSignaler):
-    self._has_health_port = False
-    self._health_checker = StatusHealthCheck()
-    self._http_signaler_factory = http_signaler_factory
-
-  def health(self, task):
-    if not self._has_health_port and 'health' in task.assignedTask.assignedPorts:
-      log.debug('Health port detected, enabling HTTP checks')
-      self._health_checker = ChainedHealthCheck(self._health_checker, HttpHealthCheck(self._http_signaler_factory))
-      self._has_health_port = True
-    return self._health_checker.health(task)

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/twitter/aurora/client/api/instance_watcher.py
----------------------------------------------------------------------
diff --git a/src/main/python/twitter/aurora/client/api/instance_watcher.py b/src/main/python/twitter/aurora/client/api/instance_watcher.py
deleted file mode 100644
index 5c69487..0000000
--- a/src/main/python/twitter/aurora/client/api/instance_watcher.py
+++ /dev/null
@@ -1,134 +0,0 @@
-import time
-
-from twitter.common import log
-from .health_check import InstanceWatcherHealthCheck
-
-from gen.twitter.aurora.ttypes import (
-  Identity,
-  ResponseCode,
-  ScheduleStatus,
-  TaskQuery,
-)
-
-class Instance(object):
-  def __init__(self, birthday=None, finished=False):
-    self.birthday = birthday
-    self.finished = finished
-    self.healthy = False
-
-  def set_healthy(self, value):
-    self.healthy = value
-    self.finished = True
-
-  def __str__(self):
-    return ('[birthday=%s, healthy=%s, finished=%s]' % (self.birthday, self.healthy, self.finished))
-
-
-class InstanceWatcher(object):
-  def __init__(self,
-               scheduler,
-               job_key,
-               restart_threshold,
-               watch_secs,
-               health_check_interval_seconds,
-               clock=time):
-
-    self._scheduler = scheduler
-    self._job_key = job_key
-    self._restart_threshold = restart_threshold
-    self._watch_secs = watch_secs
-    self._health_check_interval_seconds = health_check_interval_seconds
-    self._clock = clock
-
-  def watch(self, instance_ids, health_check=None):
-    """Watches a set of instances and detects failures based on a delegated health check.
-
-    Arguments:
-    instance_ids -- set of instances to watch.
-
-    Returns a set of instances that are considered failed.
-    """
-    log.info('Watching instances: %s' % instance_ids)
-    instance_ids = set(instance_ids)
-    health_check = health_check or InstanceWatcherHealthCheck()
-    now = self._clock.time()
-    expected_healthy_by = now + self._restart_threshold
-    max_time = now + self._restart_threshold + self._watch_secs
-
-    instance_states = {}
-
-    def finished_instances():
-      return dict((s_id, s) for s_id, s in instance_states.items() if s.finished)
-
-    def set_instance_healthy(instance_id, now):
-      if instance_id not in instance_states:
-        instance_states[instance_id] = Instance(now)
-      instance = instance_states.get(instance_id)
-      if now > (instance.birthday + self._watch_secs):
-        log.info('Instance %s has been up and healthy for at least %d seconds' % (
-          instance_id, self._watch_secs))
-        instance.set_healthy(True)
-
-    def maybe_set_instance_unhealthy(instance_id, retriable):
-      # An instance that was previously healthy and currently unhealthy has failed.
-      if instance_id in instance_states:
-        log.info('Instance %s is unhealthy' % instance_id)
-        instance_states[instance_id].set_healthy(False)
-      # If the restart threshold has expired or if the instance cannot be retried it is unhealthy.
-      elif now > expected_healthy_by or not retriable:
-        log.info('Instance %s was not reported healthy within %d seconds' % (
-          instance_id, self._restart_threshold))
-        instance_states[instance_id] = Instance(finished=True)
-
-    while True:
-      running_tasks = self._get_tasks_by_instance_id(instance_ids)
-      now = self._clock.time()
-      tasks_by_instance = dict((task.assignedTask.instanceId, task) for task in running_tasks)
-      for instance_id in instance_ids:
-        if instance_id not in finished_instances():
-          running_task = tasks_by_instance.get(instance_id)
-          if running_task is not None:
-            task_healthy, retriable = health_check.health(running_task)
-            if task_healthy:
-              set_instance_healthy(instance_id, now)
-            else:
-              maybe_set_instance_unhealthy(instance_id, retriable)
-          else:
-            # Set retriable=True since an instance should be retried if it has not been healthy.
-            maybe_set_instance_unhealthy(instance_id, retriable=True)
-
-      log.debug('Instances health: %s' % ['%s: %s' % val for val in instance_states.items()])
-
-      # Return if all tasks are finished.
-      if set(finished_instances().keys()) == instance_ids:
-        return set([s_id for s_id, s in instance_states.items() if not s.healthy])
-
-      # Return if time is up.
-      if now > max_time:
-        return set([s_id for s_id in instance_ids if s_id not in instance_states
-                                             or not instance_states[s_id].healthy])
-
-      self._clock.sleep(self._health_check_interval_seconds)
-
-  def _get_tasks_by_instance_id(self, instance_ids):
-    log.debug('Querying instance statuses.')
-    query = TaskQuery()
-    query.owner = Identity(role=self._job_key.role)
-    query.environment = self._job_key.environment
-    query.jobName = self._job_key.name
-    query.statuses = set([ScheduleStatus.RUNNING])
-
-    query.instanceIds = instance_ids
-    try:
-      resp = self._scheduler.getTasksStatus(query)
-    except IOError as e:
-      log.error('IO Exception during scheduler call: %s' % e)
-      return []
-
-    tasks = []
-    if resp.responseCode == ResponseCode.OK:
-      tasks = resp.result.scheduleStatusResult.tasks
-
-    log.debug('Response from scheduler: %s (message: %s)'
-        % (ResponseCode._VALUES_TO_NAMES[resp.responseCode], resp.message))
-    return tasks

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/twitter/aurora/client/api/job_monitor.py
----------------------------------------------------------------------
diff --git a/src/main/python/twitter/aurora/client/api/job_monitor.py b/src/main/python/twitter/aurora/client/api/job_monitor.py
deleted file mode 100644
index 45edd1a..0000000
--- a/src/main/python/twitter/aurora/client/api/job_monitor.py
+++ /dev/null
@@ -1,63 +0,0 @@
-import time
-
-from twitter.common.quantity import Amount, Time
-
-from gen.twitter.aurora.constants import (
-    LIVE_STATES,
-    TERMINAL_STATES
-)
-from gen.twitter.aurora.ttypes import (
-    Identity,
-    TaskQuery
-)
-
-from thrift.transport import TTransport
-
-
-class JobMonitor(object):
-  MIN_POLL_INTERVAL = Amount(10, Time.SECONDS)
-  MAX_POLL_INTERVAL = Amount(2, Time.MINUTES)
-
-  @classmethod
-  def running_or_finished(cls, status):
-    return status in (LIVE_STATES | TERMINAL_STATES)
-
-  @classmethod
-  def terminal(cls, status):
-    return status in TERMINAL_STATES
-
-  # TODO(ksweeney): Make this use the AuroraJobKey
-  def __init__(self, client, role, env, jobname):
-    self._client = client
-    self._query = TaskQuery(owner=Identity(role=role), environment=env, jobName=jobname)
-    self._initial_tasks = set()
-    self._initial_tasks = set(task.assignedTask.taskId for task in self.iter_query())
-
-  def iter_query(self):
-    try:
-      res = self._client.scheduler.getTasksStatus(self._query)
-    except TTransport.TTransportException as e:
-      print('Failed to query slaves from scheduler: %s' % e)
-      return
-    if res is None or res.result is None:
-      return
-    for task in res.result.scheduleStatusResult.tasks:
-      if task.assignedTask.taskId not in self._initial_tasks:
-        yield task
-
-  def states(self):
-    states = {}
-    for task in self.iter_query():
-      status, instance_id = task.status, task.assignedTask.instanceId
-      first_timestamp = task.taskEvents[0].timestamp
-      if instance_id not in states or first_timestamp > states[instance_id][0]:
-        states[instance_id] = (first_timestamp, status)
-    return dict((instance_id, status[1]) for (instance_id, status) in states.items())
-
-  def wait_until(self, predicate):
-    """Given a predicate (from ScheduleStatus => Boolean), return once all tasks
-       return true for that predicate."""
-    poll_interval = self.MIN_POLL_INTERVAL
-    while not all(predicate(state) for state in self.states().values()):
-      time.sleep(poll_interval.as_(Time.SECONDS))
-      poll_interval = min(self.MAX_POLL_INTERVAL, 2 * poll_interval)

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/twitter/aurora/client/api/restarter.py
----------------------------------------------------------------------
diff --git a/src/main/python/twitter/aurora/client/api/restarter.py b/src/main/python/twitter/aurora/client/api/restarter.py
deleted file mode 100644
index bf196e1..0000000
--- a/src/main/python/twitter/aurora/client/api/restarter.py
+++ /dev/null
@@ -1,73 +0,0 @@
-from twitter.common import log
-
-from gen.twitter.aurora.constants import ACTIVE_STATES
-from gen.twitter.aurora.ttypes import ResponseCode
-
-from .instance_watcher import InstanceWatcher
-from .updater_util import FailureThreshold
-
-
-class Restarter(object):
-  def __init__(self,
-               job_key,
-               update_config,
-               health_check_interval_seconds,
-               scheduler,
-               instance_watcher=None,
-               lock=None):
-    self._job_key = job_key
-    self._update_config = update_config
-    self.health_check_interval_seconds = health_check_interval_seconds
-    self._scheduler = scheduler
-    self._lock = lock
-    self._instance_watcher = instance_watcher or InstanceWatcher(
-        scheduler,
-        job_key.to_thrift(),
-        update_config.restart_threshold,
-        update_config.watch_secs,
-        health_check_interval_seconds)
-
-  def restart(self, instances):
-    failure_threshold = FailureThreshold(
-        self._update_config.max_per_instance_failures,
-        self._update_config.max_total_failures)
-
-    if not instances:
-      query = self._job_key.to_thrift_query()
-      query.statuses = ACTIVE_STATES
-      status = self._scheduler.getTasksStatus(query)
-
-      if status.responseCode != ResponseCode.OK:
-        return status
-
-      tasks = status.result.scheduleStatusResult.tasks
-
-      instances = sorted(task.assignedTask.instanceId for task in tasks)
-      if not instances:
-        log.info("No instances specified, and no active instances found in job %s" % self._job_key)
-        log.info("Nothing to do.")
-        return status
-
-    log.info("Performing rolling restart of job %s (instances: %s)" % (self._job_key, instances))
-
-    while instances and not failure_threshold.is_failed_update():
-      batch = instances[:self._update_config.batch_size]
-      instances = instances[self._update_config.batch_size:]
-
-      log.info("Restarting instances: %s", batch)
-
-      resp = self._scheduler.restartShards(self._job_key.to_thrift(), batch, self._lock)
-      if resp.responseCode != ResponseCode.OK:
-        log.error('Error restarting instances: %s', resp.message)
-        return resp
-
-      failed_instances = self._instance_watcher.watch(batch)
-      instances += failed_instances
-      failure_threshold.update_failure_counts(failed_instances)
-
-    if failure_threshold.is_failed_update():
-      log.info("Restart failures threshold reached. Aborting")
-    else:
-      log.info("All instances were restarted successfully")
-
-    return resp

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/twitter/aurora/client/api/scheduler_client.py
----------------------------------------------------------------------
diff --git a/src/main/python/twitter/aurora/client/api/scheduler_client.py b/src/main/python/twitter/aurora/client/api/scheduler_client.py
deleted file mode 100644
index ffec604..0000000
--- a/src/main/python/twitter/aurora/client/api/scheduler_client.py
+++ /dev/null
@@ -1,257 +0,0 @@
-import functools
-import threading
-import time
-
-from twitter.aurora.common.auth import make_session_key, SessionKeyError
-from twitter.aurora.common.cluster import Cluster
-from twitter.common import log
-from twitter.common.quantity import Amount, Time
-from twitter.common.rpc.transports.tsslsocket import DelayedHandshakeTSSLSocket
-from twitter.common.zookeeper.kazoo_client import TwitterKazooClient
-from twitter.common.zookeeper.serverset import ServerSet
-
-from gen.twitter.aurora import AuroraAdmin
-from gen.twitter.aurora.constants import CURRENT_API_VERSION
-
-from thrift.protocol import TBinaryProtocol
-from thrift.transport import TSocket, TTransport
-from pystachio import Boolean, Default, Integer, String
-
-
-class SchedulerClientTrait(Cluster.Trait):
-  zk                = String
-  zk_port           = Default(Integer, 2181)
-  scheduler_zk_path = String
-  scheduler_uri     = String
-  proxy_url         = String
-  auth_mechanism    = Default(String, 'UNAUTHENTICATED')
-  use_thrift_ssl    = Default(Boolean, False)
-
-
-class SchedulerClient(object):
-  THRIFT_RETRIES = 5
-  RETRY_TIMEOUT = Amount(1, Time.SECONDS)
-
-  class CouldNotConnect(Exception): pass
-
-  # TODO(wickman) Refactor per MESOS-3005 into two separate classes with separate traits:
-  #   ZookeeperClientTrait
-  #   DirectClientTrait
-  @classmethod
-  def get(cls, cluster, **kwargs):
-    if not isinstance(cluster, Cluster):
-      raise TypeError('"cluster" must be an instance of Cluster, got %s' % type(cluster))
-    cluster = cluster.with_trait(SchedulerClientTrait)
-    if cluster.zk:
-      return ZookeeperSchedulerClient(
-          cluster, port=cluster.zk_port, ssl=cluster.use_thrift_ssl, **kwargs)
-    elif cluster.scheduler_uri:
-      try:
-        host, port = cluster.scheduler_uri.split(':', 2)
-        port = int(port)
-      except ValueError:
-        raise ValueError('Malformed Cluster scheduler_uri: %s' % cluster.scheduler_uri)
-      return DirectSchedulerClient(host, port, ssl=cluster.use_thrift_ssl)
-    else:
-      raise ValueError('"cluster" does not specify zk or scheduler_uri')
-
-  def __init__(self, verbose=False, ssl=False):
-    self._client = None
-    self._verbose = verbose
-    self._ssl = ssl
-
-  def get_thrift_client(self):
-    if self._client is None:
-      self._client = self._connect()
-    return self._client
-
-  # per-class implementation -- mostly meant to set up a valid host/port
-  # pair and then delegate the opening to SchedulerClient._connect_scheduler
-  def _connect(self):
-    return None
-
-  @staticmethod
-  def _connect_scheduler(host, port, with_ssl=False):
-    if with_ssl:
-      socket = DelayedHandshakeTSSLSocket(host, port, delay_handshake=True, validate=False)
-    else:
-      socket = TSocket.TSocket(host, port)
-    transport = TTransport.TBufferedTransport(socket)
-    protocol = TBinaryProtocol.TBinaryProtocol(transport)
-    schedulerClient = AuroraAdmin.Client(protocol)
-    for _ in range(SchedulerClient.THRIFT_RETRIES):
-      try:
-        transport.open()
-        return schedulerClient
-      except TTransport.TTransportException:
-        time.sleep(SchedulerClient.RETRY_TIMEOUT.as_(Time.SECONDS))
-        continue
-      except Exception as e:
-        # Monkey-patched proxies, like socks, can generate a proxy error here.
-        # without adding a dependency, we can't catch those in a more specific way.
-        raise SchedulerClient.CouldNotConnect('Connection to scheduler failed: %s' % e)
-    raise SchedulerClient.CouldNotConnect('Could not connect to %s:%s' % (host, port))
-
-
-class ZookeeperSchedulerClient(SchedulerClient):
-  SERVERSET_TIMEOUT = Amount(10, Time.SECONDS)
-
-  @classmethod
-  def get_scheduler_serverset(cls, cluster, port=2181, verbose=False, **kw):
-    if cluster.zk is None:
-      raise ValueError('Cluster has no associated zookeeper ensemble!')
-    if cluster.scheduler_zk_path is None:
-      raise ValueError('Cluster has no defined scheduler path, must specify scheduler_zk_path '
-                       'in your cluster config!')
-    zk = TwitterKazooClient.make(str('%s:%s' % (cluster.zk, port)), verbose=verbose)
-    return zk, ServerSet(zk, cluster.scheduler_zk_path, **kw)
-
-  def __init__(self, cluster, port=2181, ssl=False, verbose=False):
-    SchedulerClient.__init__(self, verbose=verbose, ssl=ssl)
-    self._cluster = cluster
-    self._zkport = port
-    self._endpoint = None
-
-  def _connect(self):
-    joined = threading.Event()
-    def on_join(elements):
-      joined.set()
-    zk, serverset = self.get_scheduler_serverset(self._cluster, verbose=self._verbose,
-        port=self._zkport, on_join=on_join)
-    joined.wait(timeout=self.SERVERSET_TIMEOUT.as_(Time.SECONDS))
-    serverset_endpoints = list(serverset)
-    if len(serverset_endpoints) == 0:
-      raise self.CouldNotConnect('No schedulers detected in %s!' % self._cluster.name)
-    instance = serverset_endpoints[0]
-    self._endpoint = instance.service_endpoint
-    self._http = instance.additional_endpoints.get('http')
-    zk.stop()
-    return self._connect_scheduler(self._endpoint.host, self._endpoint.port, self._ssl)
-
-  @property
-  def url(self):
-    proxy_url = self._cluster.proxy_url
-    if proxy_url:
-      return proxy_url
-    if self._http:
-      return 'http://%s:%s' % (self._http.host, self._http.port)
-
-
-class DirectSchedulerClient(SchedulerClient):
-  def __init__(self, host, port, ssl=False):
-    SchedulerClient.__init__(self, verbose=True, ssl=ssl)
-    self._host = host
-    self._port = port
-
-  def _connect(self):
-    return self._connect_scheduler(self._host, self._port, with_ssl=self._ssl)
-
-  @property
-  def url(self):
-    # TODO(wickman) This is broken -- make this tunable in MESOS-3005
-    return 'http://%s:8081' % self._host
-
-
-class SchedulerProxy(object):
-  """
-    This class is responsible for creating a reliable thrift client to the
-    twitter scheduler.  Basically all the dirty work needed by the
-    AuroraClientAPI.
-  """
-  CONNECT_MAXIMUM_WAIT = Amount(1, Time.MINUTES)
-  RPC_RETRY_INTERVAL = Amount(5, Time.SECONDS)
-  RPC_MAXIMUM_WAIT = Amount(10, Time.MINUTES)
-  UNAUTHENTICATED_RPCS = frozenset([
-    'populateJobConfig',
-    'getTasksStatus',
-    'getJobs',
-    'getQuota',
-    'getVersion',
-  ])
-
-  class Error(Exception): pass
-  class TimeoutError(Error): pass
-  class AuthenticationError(Error): pass
-  class APIVersionError(Error): pass
-
-  def __init__(self, cluster, verbose=False, session_key_factory=make_session_key):
-    """A callable session_key_factory should be provided for authentication"""
-    self.cluster = cluster
-    # TODO(Sathya): Make this a part of cluster trait when authentication is pushed to the transport
-    # layer.
-    self._session_key_factory = session_key_factory
-    self._client = self._scheduler = None
-    self.verbose = verbose
-
-  def with_scheduler(method):
-    """Decorator magic to make sure a connection is made to the scheduler"""
-    def _wrapper(self, *args, **kwargs):
-      if not self._scheduler:
-        self._construct_scheduler()
-      return method(self, *args, **kwargs)
-    return _wrapper
-
-  def invalidate(self):
-    self._client = self._scheduler = None
-
-  @with_scheduler
-  def client(self):
-    return self._client
-
-  @with_scheduler
-  def scheduler(self):
-    return self._scheduler
-
-  def session_key(self):
-    try:
-      return self._session_key_factory(self.cluster.auth_mechanism)
-    except SessionKeyError as e:
-      raise self.AuthenticationError('Unable to create session key %s' % e)
-
-  def _construct_scheduler(self):
-    """
-      Populates:
-        self._scheduler
-        self._client
-    """
-    self._scheduler = SchedulerClient.get(self.cluster, verbose=self.verbose)
-    assert self._scheduler, "Could not find scheduler (cluster = %s)" % self.cluster.name
-    start = time.time()
-    while (time.time() - start) < self.CONNECT_MAXIMUM_WAIT.as_(Time.SECONDS):
-      try:
-        self._client = self._scheduler.get_thrift_client()
-        break
-      except SchedulerClient.CouldNotConnect as e:
-        log.warning('Could not connect to scheduler: %s' % e)
-    if not self._client:
-      raise self.TimeoutError('Timed out trying to connect to scheduler at %s' % self.cluster.name)
-
-    server_version = self._client.getVersion().result.getVersionResult
-    if server_version != CURRENT_API_VERSION:
-      raise self.APIVersionError("Client Version: %s, Server Version: %s" %
-                                 (CURRENT_API_VERSION, server_version))
-
-  def __getattr__(self, method_name):
-    # If the method does not exist, getattr will return AttributeError for us.
-    method = getattr(AuroraAdmin.Client, method_name)
-    if not callable(method):
-      return method
-
-    @functools.wraps(method)
-    def method_wrapper(*args):
-      start = time.time()
-      while (time.time() - start) < self.RPC_MAXIMUM_WAIT.as_(Time.SECONDS):
-        auth_args = () if method_name in self.UNAUTHENTICATED_RPCS else (self.session_key(),)
-        try:
-          method = getattr(self.client(), method_name)
-          if not callable(method):
-            return method
-          return method(*(args + auth_args))
-        except (TTransport.TTransportException, self.TimeoutError) as e:
-          log.warning('Connection error with scheduler: %s, reconnecting...' % e)
-          self.invalidate()
-          time.sleep(self.RPC_RETRY_INTERVAL.as_(Time.SECONDS))
-      raise self.TimeoutError('Timed out attempting to issue %s to %s' % (
-          method_name, self.cluster.name))
-
-    return method_wrapper

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/twitter/aurora/client/api/updater.py
----------------------------------------------------------------------
diff --git a/src/main/python/twitter/aurora/client/api/updater.py b/src/main/python/twitter/aurora/client/api/updater.py
deleted file mode 100644
index 6e8e8a9..0000000
--- a/src/main/python/twitter/aurora/client/api/updater.py
+++ /dev/null
@@ -1,410 +0,0 @@
-from collections import namedtuple
-from difflib import unified_diff
-
-from twitter.common import log
-
-from gen.twitter.aurora.constants import ACTIVE_STATES
-
-from gen.twitter.aurora.ttypes import (
-    AddInstancesConfig,
-    JobConfigValidation,
-    JobKey,
-    Identity,
-    Lock,
-    LockKey,
-    LockValidation,
-    Response,
-    ResponseCode,
-    TaskQuery,
-)
-from .updater_util import FailureThreshold, UpdaterConfig
-from .instance_watcher import InstanceWatcher
-from .scheduler_client import SchedulerProxy
-
-
-class Updater(object):
-  """Update the instances of a job in batches."""
-
-  class Error(Exception): pass
-
-  InstanceState = namedtuple('InstanceState', ['instance_id', 'is_updated'])
-  OperationConfigs = namedtuple('OperationConfigs', ['from_config', 'to_config'])
-  InstanceConfigs = namedtuple(
-      'InstanceConfigs',
-      ['remote_config_map', 'local_config_map', 'instances_to_process']
-  )
-
-  def __init__(self, config, health_check_interval_seconds, scheduler=None, instance_watcher=None):
-    self._config = config
-    self._job_key = JobKey(role=config.role(), environment=config.environment(), name=config.name())
-    self._health_check_interval_seconds = health_check_interval_seconds
-    self._scheduler = scheduler or SchedulerProxy(config.cluster())
-    try:
-      self._update_config = UpdaterConfig(**config.update_config().get())
-    except ValueError as e:
-      raise self.Error(str(e))
-    self._lock = None
-    self._watcher = instance_watcher or InstanceWatcher(
-        self._scheduler,
-        self._job_key,
-        self._update_config.restart_threshold,
-        self._update_config.watch_secs,
-        self._health_check_interval_seconds)
-
-  def _start(self):
-    """Starts an update by applying an exclusive lock on a job being updated.
-
-       Returns:
-         Response instance from the scheduler call.
-    """
-    resp = self._scheduler.acquireLock(LockKey(job=self._job_key))
-    if resp.responseCode == ResponseCode.OK:
-      self._lock = resp.result.acquireLockResult.lock
-    return resp
-
-  def _finish(self):
-    """Finishes an update by removing an exclusive lock on an updated job.
-
-       Returns:
-         Response instance from the scheduler call.
-    """
-    resp = self._scheduler.releaseLock(self._lock, LockValidation.CHECKED)
-
-    if resp.responseCode == ResponseCode.OK:
-      self._lock = None
-    else:
-      log.error('There was an error finalizing the update: %s' % resp.message)
-    return resp
-
-  def _update(self, instance_configs):
-    """Drives execution of the update logic. Performs a batched update/rollback for all instances
-    affected by the current update request.
-
-    Arguments:
-    instance_configs -- list of instance update configurations to go through.
-
-    Returns the set of instances that failed to update.
-    """
-    failure_threshold = FailureThreshold(
-        self._update_config.max_per_instance_failures,
-        self._update_config.max_total_failures
-    )
-
-    instance_operation = self.OperationConfigs(
-      from_config=instance_configs.remote_config_map,
-      to_config=instance_configs.local_config_map
-    )
-
-    remaining_instances = [
-        self.InstanceState(instance_id, is_updated=False)
-        for instance_id in instance_configs.instances_to_process
-    ]
-
-    log.info('Starting job update.')
-    while remaining_instances and not failure_threshold.is_failed_update():
-      batch_instances = remaining_instances[0 : self._update_config.batch_size]
-      remaining_instances = list(set(remaining_instances) - set(batch_instances))
-      instances_to_restart = [s.instance_id for s in batch_instances if s.is_updated]
-      instances_to_update = [s.instance_id for s in batch_instances if not s.is_updated]
-
-      instances_to_watch = []
-      if instances_to_restart:
-        instances_to_watch += self._restart_instances(instances_to_restart)
-
-      if instances_to_update:
-        instances_to_watch += self._update_instances(instances_to_update, instance_operation)
-
-      failed_instances = self._watcher.watch(instances_to_watch) if instances_to_watch else set()
-
-      if failed_instances:
-        log.error('Failed instances: %s' % failed_instances)
-
-      unretryable_instances = failure_threshold.update_failure_counts(failed_instances)
-      if unretryable_instances:
-        log.warn('Not restarting failed instances %s, which exceeded '
-                 'maximum allowed instance failure limit of %s' %
-                 (unretryable_instances, self._update_config.max_per_instance_failures))
-      retryable_instances = list(set(failed_instances) - set(unretryable_instances))
-      remaining_instances += [
-          self.InstanceState(instance_id, is_updated=True) for instance_id in retryable_instances
-      ]
-      remaining_instances.sort(key=lambda tup: tup.instance_id)
-
-    if failure_threshold.is_failed_update():
-      untouched_instances = [s.instance_id for s in remaining_instances if not s.is_updated]
-      instances_to_rollback = list(
-          set(instance_configs.instances_to_process) - set(untouched_instances)
-      )
-      self._rollback(instances_to_rollback, instance_configs)
-
-    return not failure_threshold.is_failed_update()
-
-  def _rollback(self, instances_to_rollback, instance_configs):
-    """Performs a rollback operation for the failed instances.
-
-    Arguments:
-    instances_to_rollback -- instance ids to rollback.
-    instance_configs -- instance configuration to use for rollback.
-    """
-    log.info('Reverting update for %s' % instances_to_rollback)
-    instance_operation = self.OperationConfigs(
-        from_config=instance_configs.local_config_map,
-        to_config=instance_configs.remote_config_map
-    )
-    instances_to_rollback.sort(reverse=True)
-    failed_instances = []
-    while instances_to_rollback:
-      batch_instances = instances_to_rollback[0 : self._update_config.batch_size]
-      instances_to_rollback = list(set(instances_to_rollback) - set(batch_instances))
-      instances_to_rollback.sort(reverse=True)
-      instances_to_watch = self._update_instances(batch_instances, instance_operation)
-      failed_instances += self._watcher.watch(instances_to_watch)
-
-    if failed_instances:
-      log.error('Rollback failed for instances: %s' % failed_instances)
-
-  def _create_kill_add_lists(self, instance_ids, operation_configs):
-    """Determines a particular action (kill or add) to use for every instance in instance_ids.
-
-    Arguments:
-    instance_ids -- current batch of IDs to process.
-    operation_configs -- OperationConfigs with update details.
-
-    Returns lists of instances to kill and to add.
-    """
-    to_kill = []
-    to_add = []
-    for instance_id in instance_ids:
-      from_config = operation_configs.from_config.get(instance_id)
-      to_config = operation_configs.to_config.get(instance_id)
-
-      if from_config and to_config:
-        # Sort internal dicts before comparing to rule out differences due to hashing.
-        diff_output = ''.join(unified_diff(
-          str(sorted(from_config.__dict__.items(), key=lambda x: x[0])),
-          str(sorted(to_config.__dict__.items(), key=lambda x: x[0]))))
-        if diff_output:
-          log.debug('Task configuration changed for instance [%s]:\n%s' % (instance_id, diff_output))
-          to_kill.append(instance_id)
-          to_add.append(instance_id)
-      elif from_config and not to_config:
-        to_kill.append(instance_id)
-      elif not from_config and to_config:
-        to_add.append(instance_id)
-      else:
-        raise self.Error('Instance %s is outside of supported range' % instance_id)
-
-    return to_kill, to_add
-
-  def _update_instances(self, instance_ids, operation_configs):
-    """Applies kill/add actions for the specified batch instances.
-
-    Arguments:
-    instance_ids -- current batch of IDs to process.
-    operation_configs -- OperationConfigs with update details.
-
-    Returns a list of added instances.
-    """
-    log.info('Examining instances: %s' % instance_ids)
-
-    to_kill, to_add = self._create_kill_add_lists(instance_ids, operation_configs)
-
-    unchanged = list(set(instance_ids) - set(to_kill + to_add))
-    if unchanged:
-      log.info('Skipping unchanged instances: %s' % unchanged)
-
-    # Kill is a blocking call in scheduler -> no need to watch it yet.
-    self._kill_instances(to_kill)
-    self._add_instances(to_add, operation_configs.to_config)
-    return to_add
-
-  def _kill_instances(self, instance_ids):
-    """Instructs the scheduler to kill instances and waits for completion.
-
-    Arguments:
-    instance_ids -- list of IDs to kill.
-    """
-    if instance_ids:
-      log.info('Killing instances: %s' % instance_ids)
-      query = self._create_task_query(instanceIds=frozenset(int(s) for s in instance_ids))
-      self._check_and_log_response(self._scheduler.killTasks(query, self._lock))
-      log.info('Instances killed')
-
-  def _add_instances(self, instance_ids, to_config):
-    """Instructs the scheduler to add instances.
-
-    Arguments:
-    instance_ids -- list of IDs to add.
-    to_config -- OperationConfigs with update details.
-    """
-    if instance_ids:
-      log.info('Adding instances: %s' % instance_ids)
-      add_config = AddInstancesConfig(
-          key=self._job_key,
-          taskConfig=to_config[instance_ids[0]],  # instance_ids will always have at least 1 item.
-          instanceIds=frozenset(int(s) for s in instance_ids))
-      self._check_and_log_response(self._scheduler.addInstances(add_config, self._lock))
-      log.info('Instances added')
-
-  def _restart_instances(self, instance_ids):
-    """Instructs the scheduler to restart instances.
-
-    Arguments:
-    instance_ids -- set of instances to be restarted by the scheduler.
-    """
-    log.info('Restarting instances: %s' % instance_ids)
-    resp = self._scheduler.restartShards(self._job_key, instance_ids, self._lock)
-    self._check_and_log_response(resp)
-    return instance_ids
-
-  def _get_update_instructions(self, instances=None):
-    """Loads, validates and populates update working set.
-
-    Arguments:
-    instances -- (optional) set of instances to update.
-
-    Returns:
-    InstanceConfigs with the following data:
-      remote_config_map -- dictionary of {key:instance_id, value:task_config} from scheduler.
-      local_config_map  -- dictionary of {key:instance_id, value:task_config} with local
-                           task configs validated and populated with default values.
-      instances_to_process -- list of instance IDs to go over in update.
-    """
-    # Load existing tasks and populate remote config map and instance list.
-    assigned_tasks = self._get_existing_tasks()
-    remote_config_map = {}
-    remote_instances = []
-    for assigned_task in assigned_tasks:
-      remote_config_map[assigned_task.instanceId] = assigned_task.task
-      remote_instances.append(assigned_task.instanceId)
-
-    # Validate local job config and populate local task config.
-    local_task_config = self._validate_and_populate_local_config()
-
-    # Union of local and remote instance IDs.
-    job_config_instances = list(range(self._config.instances()))
-    instance_superset = sorted(list(set(remote_instances) | set(job_config_instances)))
-
-    # Calculate the update working set.
-    if instances is None:
-      # Full job update -> union of remote and local instances
-      instances_to_process = instance_superset
-    else:
-      # Partial job update -> validate all instances are recognized
-      instances_to_process = instances
-      unrecognized = list(set(instances) - set(instance_superset))
-      if unrecognized:
-        raise self.Error('Instances %s are outside of supported range' % unrecognized)
-
-    # Populate local config map
-    local_config_map = dict.fromkeys(job_config_instances, local_task_config)
-
-    return self.InstanceConfigs(remote_config_map, local_config_map, instances_to_process)
-
-  def _get_existing_tasks(self):
-    """Loads all existing tasks from the scheduler.
-
-    Returns a list of AssignedTasks.
-    """
-    resp = self._scheduler.getTasksStatus(self._create_task_query())
-    self._check_and_log_response(resp)
-    return [t.assignedTask for t in resp.result.scheduleStatusResult.tasks]
-
-  def _validate_and_populate_local_config(self):
-    """Validates local job configuration and populates local task config with default values.
-
-    Returns a TaskConfig populated with default values.
-    """
-    resp = self._scheduler.populateJobConfig(self._config.job(), JobConfigValidation.RUN_FILTERS)
-    self._check_and_log_response(resp)
-
-    # Safe to take the first element as Scheduler would throw in case zero instances provided.
-    return list(resp.result.populateJobResult.populated)[0]
-
-  def _replace_template_if_cron(self):
-    """Checks if the provided job config represents a cron job and if so, replaces it.
-
-    Returns True if job is cron and False otherwise.
-    """
-    if self._config.job().cronSchedule:
-      resp = self._scheduler.replaceCronTemplate(self._config.job(), self._lock)
-      self._check_and_log_response(resp)
-      return True
-    else:
-      return False
-
-  def _create_task_query(self, instanceIds=None):
-    return TaskQuery(
-        owner=Identity(role=self._job_key.role),
-        environment=self._job_key.environment,
-        jobName=self._job_key.name,
-        statuses=ACTIVE_STATES,
-        instanceIds=instanceIds)
-
-  def _failed_response(self, message):
-    return Response(responseCode=ResponseCode.ERROR, message=message)
-
-  def update(self, instances=None):
-    """Performs the job update, blocking until it completes.
-    A rollback will be performed if the update was considered a failure based on the
-    update configuration.
-
-    Arguments:
-    instances -- (optional) instances to update. If not specified, all instances will be updated.
-
-    Returns a response object with update result status.
-    """
-    resp = self._start()
-    if resp.responseCode != ResponseCode.OK:
-      return resp
-
-    try:
-      # Handle cron jobs separately from other jobs.
-      if self._replace_template_if_cron():
-        log.info('Cron template updated, next run will reflect changes')
-        return self._finish()
-      else:
-        try:
-          instance_configs = self._get_update_instructions(instances)
-        except self.Error as e:
-          # Safe to release the lock acquired above as no job mutation has happened yet.
-          self._finish()
-          return self._failed_response('Unable to start job update: %s' % e)
-
-        if not self._update(instance_configs):
-          log.warn('Update failures threshold reached')
-          self._finish()
-          return self._failed_response('Update reverted')
-        else:
-          log.info('Update successful')
-          return self._finish()
-    except self.Error as e:
-      return self._failed_response('Aborting update without rollback! Fatal error: %s' % e)
-
-  @classmethod
-  def cancel_update(cls, scheduler, job_key):
-    """Cancels an update process by removing an exclusive lock on a provided job.
-
-    Arguments:
-    scheduler -- scheduler instance to use.
-    job_key -- job key to cancel update for.
-
-    Returns a response object with cancel update result status.
-    """
-    return scheduler.releaseLock(
-        Lock(key=LockKey(job=job_key.to_thrift())),
-        LockValidation.UNCHECKED)
-
-  def _check_and_log_response(self, resp):
-    """Checks scheduler return status, raises Error in case of unexpected response.
-
-    Arguments:
-    resp -- scheduler response object.
-
-    Raises Error in case of unexpected response status.
-    """
-    name, message = ResponseCode._VALUES_TO_NAMES[resp.responseCode], resp.message
-    if resp.responseCode == ResponseCode.OK:
-      log.debug('Response from scheduler: %s (message: %s)' % (name, message))
-    else:
-      raise self.Error(message)

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/twitter/aurora/client/api/updater_util.py
----------------------------------------------------------------------
diff --git a/src/main/python/twitter/aurora/client/api/updater_util.py b/src/main/python/twitter/aurora/client/api/updater_util.py
deleted file mode 100644
index db9f053..0000000
--- a/src/main/python/twitter/aurora/client/api/updater_util.py
+++ /dev/null
@@ -1,77 +0,0 @@
-import collections
-
-from twitter.common import log
-
-class UpdaterConfig(object):
-  """
-  For updates involving a health check,
-
-  UPDATE INSTANCE                         HEALTHY              REMAIN HEALTHY
-  ----------------------------------------|-----------------------|
-  \--------------------------------------/ \----------------------/
-            restart_thresold                      watch_secs
-
-  When an update is initiated, an instance is expected to be "healthy" before restart_threshold.
-  An instance is also expected to remain healthy for at least watch_secs. If these conditions are
-  not satisfied, the instance is deemed unhealthy.
-  """
-  def __init__(self,
-               batch_size,
-               restart_threshold,
-               watch_secs,
-               max_per_shard_failures,
-               max_total_failures):
-
-    if batch_size <= 0:
-      raise ValueError('Batch size should be greater than 0')
-    if restart_threshold <= 0:
-      raise ValueError('Restart Threshold should be greater than 0')
-    if watch_secs <= 0:
-      raise ValueError('Watch seconds should be greater than 0')
-    self.batch_size = batch_size
-    self.restart_threshold = restart_threshold
-    self.watch_secs = watch_secs
-    self.max_total_failures = max_total_failures
-    self.max_per_instance_failures = max_per_shard_failures
-
-
-class FailureThreshold(object):
-  def __init__(self, max_per_instance_failures, max_total_failures):
-    self._max_per_instance_failures = max_per_instance_failures
-    self._max_total_failures = max_total_failures
-    self._failures_by_instance = collections.defaultdict(int)
-
-
-  def update_failure_counts(self, failed_instances):
-    """Update the failure counts metrics based upon a batch of failed instances.
-
-    Arguments:
-    failed_instances - list of failed instances
-
-    Returns a list of instances with failure counts exceeding _max_per_instance_failures.
-    """
-    exceeded_failure_count_instances = []
-    for instance in failed_instances:
-      self._failures_by_instance[instance] += 1
-      if self._failures_by_instance[instance] > self._max_per_instance_failures:
-        exceeded_failure_count_instances.append(instance)
-
-    return exceeded_failure_count_instances
-
-  def is_failed_update(self):
-    total_failed_instances = self._exceeded_instance_fail_count()
-    is_failed = total_failed_instances > self._max_total_failures
-
-    if is_failed:
-      log.error('%s failed instances observed, maximum allowed is %s' % (total_failed_instances,
-          self._max_total_failures))
-      for instance, failure_count in self._failures_by_instance.items():
-        if failure_count > self._max_per_instance_failures:
-          log.error('%s instance failures for instance %s, maximum allowed is %s' %
-              (failure_count, instance, self._max_per_instance_failures))
-    return is_failed
-
-  def _exceeded_instance_fail_count(self):
-    """Checks if the per instance failure is greater than a threshold."""
-    return sum(count > self._max_per_instance_failures
-               for count in self._failures_by_instance.values())

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/twitter/aurora/client/base.py
----------------------------------------------------------------------
diff --git a/src/main/python/twitter/aurora/client/base.py b/src/main/python/twitter/aurora/client/base.py
deleted file mode 100644
index 0936743..0000000
--- a/src/main/python/twitter/aurora/client/base.py
+++ /dev/null
@@ -1,151 +0,0 @@
-from collections import defaultdict
-import functools
-import sys
-from urlparse import urljoin
-
-from twitter.common import app, log
-
-from gen.twitter.aurora.ttypes import ResponseCode
-
-LOCKED_WARNING = """
-Note: if the scheduler detects that a job update is in progress (or was not
-properly completed) it will reject subsequent updates.  This is because your
-job is likely in a partially-updated state.  You should only begin another
-update if you are confident that nobody is updating this job, and that
-the job is in a state suitable for an update.
-
-After checking on the above, you may release the update lock on the job by
-invoking cancel_update.
-"""
-
-def die(msg):
-  log.fatal(msg)
-  sys.exit(1)
-
-def check_and_log_response(resp):
-  log.info('Response from scheduler: %s (message: %s)'
-      % (ResponseCode._VALUES_TO_NAMES[resp.responseCode], resp.message))
-  if resp.responseCode != ResponseCode.OK:
-    check_and_log_locked_response(resp)
-    sys.exit(1)
-
-def check_and_log_locked_response(resp):
-  if resp.responseCode == ResponseCode.LOCK_ERROR:
-    log.info(LOCKED_WARNING)
-
-def deprecation_warning(text):
-  log.warning('')
-  log.warning('*' * 80)
-  log.warning('* The command you ran is deprecated and will soon break!')
-  for line in text.split('\n'):
-    log.warning('* %s' % line)
-  log.warning('*' * 80)
-  log.warning('')
-
-
-class requires(object):
-  @staticmethod
-  def wrap_function(fn, fnargs, comparator):
-    @functools.wraps(fn)
-    def wrapped_function(args):
-      if not comparator(args, fnargs):
-        help = 'Incorrect parameters for %s' % fn.__name__
-        if fn.__doc__:
-          help = '%s\n\nsee the help subcommand for more details.' % fn.__doc__.split('\n')[0]
-        die(help)
-      return fn(*args)
-    return wrapped_function
-
-  @staticmethod
-  def exactly(*args):
-    def wrap(fn):
-      return requires.wrap_function(fn, args, (lambda want, got: len(want) == len(got)))
-    return wrap
-
-  @staticmethod
-  def at_least(*args):
-    def wrap(fn):
-      return requires.wrap_function(fn, args, (lambda want, got: len(want) >= len(got)))
-    return wrap
-
-  @staticmethod
-  def nothing(fn):
-    @functools.wraps(fn)
-    def real_fn(line):
-      return fn(*line)
-    return real_fn
-
-
-def synthesize_url(scheduler_url, role=None, env=None, job=None):
-  if not scheduler_url:
-    log.warning("Unable to find scheduler web UI!")
-    return None
-
-  if env and not role:
-    die('If env specified, must specify role')
-  if job and not (role and env):
-    die('If job specified, must specify role and env')
-
-  scheduler_url = urljoin(scheduler_url, 'scheduler')
-  if role:
-    scheduler_url += '/' + role
-    if env:
-      scheduler_url += '/' + env
-      if job:
-        scheduler_url += '/' + job
-  return scheduler_url
-
-
-def handle_open(scheduler_url, role, env, job):
-  url = synthesize_url(scheduler_url, role, env, job)
-  if url:
-    log.info('Job url: %s' % url)
-    if app.get_options().open_browser:
-      import webbrowser
-      webbrowser.open_new_tab(url)
-
-
-def make_commands_str(command_aliases):
-  """Format a string representation of a number of command aliases."""
-  commands = command_aliases[:]
-  commands.sort()
-  if len(commands) == 1:
-    return str(commands[0])
-  elif len(commands) == 2:
-    return '%s (or %s)' % (str(commands[0]), str(commands[1]))
-  else:
-    return '%s (or any of: %s)' % (str(commands[0]), ' '.join(map(str, commands[1:])))
-
-
-# TODO(wickman) This likely belongs in twitter.common.app (or split out as
-# part of a possible twitter.common.cli)
-def generate_full_usage():
-  """Generate verbose application usage from all registered
-     twitter.common.app commands and return as a string."""
-  docs_to_commands = defaultdict(list)
-  for (command, doc) in app.get_commands_and_docstrings():
-    docs_to_commands[doc].append(command)
-  def make_docstring(item):
-    (doc_text, commands) = item
-    def format_line(line):
-      return '    %s\n' % line.lstrip()
-    stripped = ''.join(map(format_line, doc_text.splitlines()))
-    return '%s\n%s' % (make_commands_str(commands), stripped)
-  usage = sorted(map(make_docstring, docs_to_commands.items()))
-  return 'Available commands:\n\n' + '\n'.join(usage)
-
-
-def generate_terse_usage():
-  """Generate minimal application usage from all registered
-     twitter.common.app commands and return as a string."""
-  docs_to_commands = defaultdict(list)
-  for (command, doc) in app.get_commands_and_docstrings():
-    docs_to_commands[doc].append(command)
-  usage = '\n    '.join(sorted(map(make_commands_str, docs_to_commands.values())))
-  return """
-Available commands:
-    %s
-
-For more help on an individual command:
-    %s help <command>
-""" % (usage, app.name())

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/twitter/aurora/client/bin/BUILD
----------------------------------------------------------------------
diff --git a/src/main/python/twitter/aurora/client/bin/BUILD b/src/main/python/twitter/aurora/client/bin/BUILD
deleted file mode 100644
index 7802245..0000000
--- a/src/main/python/twitter/aurora/client/bin/BUILD
+++ /dev/null
@@ -1,25 +0,0 @@
-python_binary(
-  name = 'aurora_client',
-  source = 'aurora_client.py',
-  entry_point = 'twitter.aurora.client.bin.aurora_client:proxy_main',
-  dependencies = [
-    pants('aurora/twitterdeps/src/python/twitter/common/app'),
-    pants('aurora/twitterdeps/src/python/twitter/common/log'),
-    pants('src/main/python/twitter/aurora/client/commands:all'),
-    pants('src/main/python/twitter/aurora/client:base'),
-  ]
-)
-
-python_binary(
-  name = 'aurora_admin',
-  source = 'aurora_admin.py',
-  entry_point = 'twitter.aurora.client.bin.aurora_admin:proxy_main',
-  dependencies = [
-    pants('aurora/twitterdeps/src/python/twitter/common/app'),
-    pants('aurora/twitterdeps/src/python/twitter/common/log'),
-    pants('src/main/python/twitter/aurora/client/commands:admin'),
-    pants('src/main/python/twitter/aurora/client/commands:help'),
-    pants('src/main/python/twitter/aurora/client:base'),
-    pants('src/main/python/twitter/aurora/client:options'),
-  ]
-)

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/twitter/aurora/client/bin/__init__.py
----------------------------------------------------------------------
diff --git a/src/main/python/twitter/aurora/client/bin/__init__.py b/src/main/python/twitter/aurora/client/bin/__init__.py
deleted file mode 100644
index e69de29..0000000

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/twitter/aurora/client/bin/aurora_admin.py
----------------------------------------------------------------------
diff --git a/src/main/python/twitter/aurora/client/bin/aurora_admin.py b/src/main/python/twitter/aurora/client/bin/aurora_admin.py
deleted file mode 100644
index 53fd882..0000000
--- a/src/main/python/twitter/aurora/client/bin/aurora_admin.py
+++ /dev/null
@@ -1,23 +0,0 @@
-from twitter.aurora.client.base import generate_terse_usage
-from twitter.aurora.client.commands import admin, help
-from twitter.aurora.client.options import add_verbosity_options
-from twitter.common import app
-from twitter.common.log.options import LogOptions
-
-
-app.register_commands_from(admin, help)
-add_verbosity_options()
-
-
-def main():
-  app.help()
-
-
-LogOptions.set_stderr_log_level('INFO')
-LogOptions.disable_disk_logging()
-app.set_name('aurora-admin')
-app.set_usage(generate_terse_usage())
-
-
-def proxy_main():
-  app.main()

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/twitter/aurora/client/bin/aurora_client.py
----------------------------------------------------------------------
diff --git a/src/main/python/twitter/aurora/client/bin/aurora_client.py b/src/main/python/twitter/aurora/client/bin/aurora_client.py
deleted file mode 100644
index 8228b88..0000000
--- a/src/main/python/twitter/aurora/client/bin/aurora_client.py
+++ /dev/null
@@ -1,33 +0,0 @@
-from twitter.aurora.client.base import generate_terse_usage
-from twitter.common import app
-from twitter.common.log.options import LogOptions
-
-# These are are side-effecting imports in that they register commands via
-# app.command.  This is a poor code practice and should be fixed long-term
-# with the creation of twitter.common.cli that allows for argparse-style CLI
-# composition.
-from twitter.aurora.client.commands import (
-    core,
-    help,
-    run,
-    ssh,
-)
-from twitter.aurora.client.options import add_verbosity_options
-
-app.register_commands_from(core, run, ssh)
-app.register_commands_from(help)
-add_verbosity_options()
-
-
-def main():
-  app.help()
-
-
-LogOptions.set_stderr_log_level('INFO')
-LogOptions.disable_disk_logging()
-app.set_name('aurora-client')
-app.set_usage(generate_terse_usage())
-
-
-def proxy_main():
-  app.main()

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/twitter/aurora/client/binding_helper.py
----------------------------------------------------------------------
diff --git a/src/main/python/twitter/aurora/client/binding_helper.py b/src/main/python/twitter/aurora/client/binding_helper.py
deleted file mode 100644
index fa03b17..0000000
--- a/src/main/python/twitter/aurora/client/binding_helper.py
+++ /dev/null
@@ -1,115 +0,0 @@
-from abc import abstractmethod, abstractproperty
-import inspect
-import os
-import sys
-
-from twitter.common.lang import Interface
-
-__all__ = (
-  'BindingHelper',
-  'CachingBindingHelper',
-  'apply_all',
-  'clear_binding_caches',
-  'unregister_all',
-)
-
-
-# The registry for binding helpers.
-_BINDING_HELPERS = []
-
-
-# TODO(wickman) Update the pydocs to remove references to common_internal components.
-class BindingHelper(Interface):
-  """A component which resolves some set of pseudo-bindings in a config.
-
-  Many bindings are too complex to resolve with bindings using the standard mechanisms,
-  because they require some python computation to determine how to bind them. For example,
-  for references like {{packer[role][pkg][version]}}, we need to talk to the packer to figure
-  out the correct packer call for the desired cluster.
-
-  A BindingHelper is responsible for resolving one of these types of pseudo-bindings.
-  PackerBindingHelper will resolve "packer" bindings; BuildBindingHelper will resolve "build"
-  bindings, JenkinsBindingHelper will resolve "jenkins" bindings, etc.
-
-  A BindingHelper can be registered by calling "BindingHelper.register(Helper)". Instead of
-  explicitly calling "inject" methods in populate_namespaces, it will compute the set of open
-  bindings, and then call the appropriate helpers for each.
-
-  The bindings can be computed either from scratch, or from a binding dictionary. A binding
-  dictionary can be computed from live data, and then passed over an RPC connection, so that
-  the bindings can be recomputed on the server.
-
-  Each helper is responsible for computing its own binding dict. The data in the dict should
-  meet two requirements: it should be enough data to allow it to produce exactly the same
-  result as the scratch binding, and the data should provide information that makes the
-  binding comprehensible for a human debugging a job.
-
-  For example, a packer helper's binding dict should provide enough information to identify
-  the HDFS file that should be used, but also the version number of the binary in packer,
-  (because a human reader wants to know the version of the package, not the meaningless
-  HDFS URL.
-  """
-  @classmethod
-  def register(cls):
-    _BINDING_HELPERS.append(cls())
-
-  def apply(self, config, env=None, binding_dict=None):
-    for match in self.matcher.match(config.raw()):
-      self.bind(config, match, env, binding_dict or config.binding_dicts[self.name])
-
-  @abstractproperty
-  def name(self):
-    """Returns the name of this BindingHelper.  Typically it is the first component of
-       the matcher, e.g. if the matcher matches {{git[sha]}}, return "git"."""
-
-  @abstractproperty
-  def matcher(self):
-    """Returns the pystachio matcher for refs that this binding helper binds."""
-
-  @abstractmethod
-  def bind(self, config, match, env, binding_dict):
-    """Resolves a ref, adding a binding to the config."""
-
-
-class CachingBindingHelper(BindingHelper):
-  """A binding helper implementation that caches binding results"""
-  def __init__(self):
-    self.cache = {}
-
-  def flush_cache(self):
-    self.cache = {}
-
-  def bind(self, config, match, env, binding_dict):
-    if match not in self.cache:
-      self.cache[match] = self.uncached_bind(config, match, env, binding_dict)
-    config.bind(self.cache[match])
-
-  @abstractmethod
-  def uncached_bind(self, config, match, env, binding_dict):
-    """Compute the binding for a ref that hasn't been seen before."""
-
-
-def unregister_all():
-  _BINDING_HELPERS[:] = []
-
-
-def apply_all(config, env=None, binding_dict=None):
-  """Computes a set of bindings and applies them to the config.
-
-  :param config: the config whose bindings need to be computed.
-  :param env: the python environment where the configuration was evaluated.
-  :param binding_dict: an optional dictionary containing data to be used to compute the
-      bindings. If this is provided, then data from the dictionary should be used in
-      preference over live data.
-  :return: a binding dictionary with data that can be used to recompute the bindings. The
-      config is updated in-place.
-  """
-  for helper in _BINDING_HELPERS:
-    helper.apply(config, env, binding_dict or config.binding_dicts[helper.name])
-
-
-def clear_binding_caches():
-  """Clear the binding helper's caches for testing."""
-  for helper in _BINDING_HELPERS:
-    if isinstance(helper, CachingBindingHelper):
-      helper.flush_cache()


Mime
View raw message