aurora-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mchucarr...@apache.org
Subject git commit: Add task noun, supporting "run" and "ssh" verbs.
Date Tue, 18 Feb 2014 19:33:21 GMT
Repository: incubator-aurora
Updated Branches:
  refs/heads/master 6aa025de6 -> 14c21fd7a


Add task noun, supporting "run" and "ssh" verbs.

Bugs closed: aurora-124

Reviewed at https://reviews.apache.org/r/17752/


Project: http://git-wip-us.apache.org/repos/asf/incubator-aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-aurora/commit/14c21fd7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-aurora/tree/14c21fd7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-aurora/diff/14c21fd7

Branch: refs/heads/master
Commit: 14c21fd7aace4338b8d201daa3bc2bccacb78d1e
Parents: 6aa025d
Author: Mark Chu-Carroll <mchucarroll@twopensource.com>
Authored: Tue Feb 18 14:31:04 2014 -0500
Committer: Mark Chu-Carroll <mchucarroll@twitter.com>
Committed: Tue Feb 18 14:31:04 2014 -0500

----------------------------------------------------------------------
 src/main/python/apache/aurora/client/cli/BUILD  |  10 +-
 .../python/apache/aurora/client/cli/client.py   |   2 +
 .../python/apache/aurora/client/cli/jobs.py     |  20 +-
 .../python/apache/aurora/client/cli/options.py  |  31 ++-
 .../python/apache/aurora/client/cli/task.py     | 177 +++++++++++++++++
 src/test/python/apache/aurora/client/cli/BUILD  |  21 +-
 .../apache/aurora/client/cli/test_task_run.py   | 194 +++++++++++++++++++
 7 files changed, 442 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/14c21fd7/src/main/python/apache/aurora/client/cli/BUILD
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/client/cli/BUILD b/src/main/python/apache/aurora/client/cli/BUILD
index 7df9dee..02e061c 100644
--- a/src/main/python/apache/aurora/client/cli/BUILD
+++ b/src/main/python/apache/aurora/client/cli/BUILD
@@ -39,7 +39,15 @@ python_library(
 
 python_library(
   name='cli',
-  sources = [ '__init__.py', 'jobs.py', 'quota.py', 'context.py', 'options.py', 'sla.py'
],
+  sources = [
+    '__init__.py',
+    'context.py',
+    'jobs.py',
+    'options.py',
+    'quota.py',
+    'sla.py',
+    'task.py'
+  ],
   dependencies = [
     pants('3rdparty/python:argparse'),
     pants('3rdparty/python:twitter.common.python'),

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/14c21fd7/src/main/python/apache/aurora/client/cli/client.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/client/cli/client.py b/src/main/python/apache/aurora/client/cli/client.py
index 3c07c94..604eb44 100644
--- a/src/main/python/apache/aurora/client/cli/client.py
+++ b/src/main/python/apache/aurora/client/cli/client.py
@@ -19,6 +19,8 @@ class AuroraCommandLine(CommandLine):
     self.register_noun(Quota())
     from apache.aurora.client.cli.sla import Sla
     self.register_noun(Sla())
+    from apache.aurora.client.cli.task import Task
+    self.register_noun(Task())
 
 
 class AuroraClientV2CommandProcessor(CommandProcessor):

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/14c21fd7/src/main/python/apache/aurora/client/cli/jobs.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/client/cli/jobs.py b/src/main/python/apache/aurora/client/cli/jobs.py
index 632c539..199d27f 100644
--- a/src/main/python/apache/aurora/client/cli/jobs.py
+++ b/src/main/python/apache/aurora/client/cli/jobs.py
@@ -355,14 +355,14 @@ class RestartCommand(Verb):
     self.add_option(parser, INSTANCES_OPTION)
     self.add_option(parser, JSON_READ_OPTION)
     self.add_option(parser, WATCH_OPTION)
-    parser.add_argument('--max_per_shard_failures', type=int, default=0,
-        help='Maximum number of restarts per shard during restart. Increments total failure
'
+    parser.add_argument('--max_per_instance_failures', type=int, default=0,
+        help='Maximum number of restarts per instance during restart. Increments total failure
'
             'count when this limit is exceeded.')
     parser.add_argument('--restart_threshold', type=int, default=60,
-        help='Maximum number of seconds before a shard must move into the RUNNING state before
'
+        help='Maximum number of seconds before an instance must move into the RUNNING state
before '
              'considered a failure.')
     parser.add_argument('--max_total_failures', type=int, default=0,
-        help='Maximum number of shard failures to be tolerated in total during restart.')
+        help='Maximum number of instance failures to be tolerated in total during restart.')
     parser.add_argument('--rollback_on_failure', type=bool, default=True,
         help='If false, prevent update from performing a rollback.')
     self.add_option(parser, JOBSPEC_ARGUMENT)
@@ -371,16 +371,16 @@ class RestartCommand(Verb):
   @property
   def help(self):
     return """Usage: restart cluster/role/env/job
-    [--shards=SHARDS]
+    [--instances=INSTANCES]
     [--batch_size=INT]
     [--updater_health_check_interval_seconds=SECONDS]
-    [--max_per_shard_failures=INT]
+    [--max_per_instance_failures=INT]
     [--max_total_failures=INT]
     [--restart_threshold=INT]
     [--watch_secs=SECONDS]
     [--open_browser]
 
-  Performs a rolling restart of shards within a job.
+  Performs a rolling restart of running task instances within a job.
 
   Restarts are fully controlled client-side, so aborting halts the restart.
   """
@@ -393,7 +393,7 @@ class RestartCommand(Verb):
         context.options.batch_size,
         context.options.restart_threshold,
         context.options.watch_secs,
-        context.options.max_per_shard_failures,
+        context.options.max_per_instance_failures,
         context.options.max_total_failures,
         context.options.rollback_on_failure)
     resp = api.restart(context.options.jobspec, context.options.instances, updater_config,
@@ -518,8 +518,8 @@ class UpdateCommand(Verb):
   Subsequent update attempts will fail until the update is 'unlocked' using the
   'cancel_update' command.
 
-  The updater only takes action on shards in a job that have changed, meaning
-  that changing a single shard will only induce a restart on the changed shard.
+  The updater only takes action on instances in a job that have changed, meaning
+  that changing a single instance will only induce a restart on the changed task instance.
 
   You may want to consider using the 'diff' subcommand before updating,
   to preview what changes will take effect.

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/14c21fd7/src/main/python/apache/aurora/client/cli/options.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/client/cli/options.py b/src/main/python/apache/aurora/client/cli/options.py
index 017f141..37f416d 100644
--- a/src/main/python/apache/aurora/client/cli/options.py
+++ b/src/main/python/apache/aurora/client/cli/options.py
@@ -14,6 +14,8 @@
 # limitations under the License.
 #
 
+from collections import namedtuple
+
 from apache.aurora.client.cli import CommandOption
 from apache.aurora.common.aurora_job_key import AuroraJobKey
 
@@ -55,6 +57,21 @@ def parse_time_values(time_values):
   return sorted(map(parse_time, time_values.split(',')))
 
 
+TaskInstanceKey = namedtuple('TaskInstanceKey', [ 'jobkey', 'instance' ])
+
+def parse_task_instance_key(key):
+  pieces = key.split('/')
+  if len(pieces) != 5:
+    raise ValueError('Task instance specifier %s is not in the form '
+        'CLUSTER/ROLE/ENV/NAME/INSTANCE' % key)
+  (cluster, role, env, name, instance_str) = pieces
+  try:
+    instance = int(instance_str)
+  except ValueError:
+    raise ValueError('Instance must be an integer, but got %s' % instance_str)
+  return TaskInstanceKey(AuroraJobKey(cluster, role, env, name), instance)
+
+
 BATCH_OPTION = CommandOption('--batch_size', type=int, default=5,
         help='Number of instances to be operate on in one iteration')
 
@@ -74,6 +91,10 @@ CONFIG_ARGUMENT = CommandOption('config_file', type=str,
     help='pathname of the aurora configuration file contain the job specification')
 
 
+EXECUTOR_SANDBOX_OPTION = CommandOption('--executor_sandbox', action='store_true',
+     default=False, help='Run the command in the executor sandbox instead of the task sandbox')
+
+
 FORCE_OPTION = CommandOption('--force', default=False, action='store_true',
     help='Force execution of the command even if there is a warning')
 
@@ -108,8 +129,16 @@ ROLE_ARGUMENT = CommandOption('role', type=parse_qualified_role,
     help='Rolename to retrieve information about, in CLUSTER/NAME format')
 
 
+SSH_USER_OPTION = CommandOption('--ssh_user', '-l', default=None,
+    help='ssh as this username instead of the job\'s role')
+
+
+TASK_INSTANCE_ARGUMENT = CommandOption('task_instance', type=parse_task_instance_key,
+    help='A task instance specifier, in the form CLUSTER/ROLE/ENV/NAME/INSTANCE')
+
+
 WATCH_OPTION = CommandOption('--watch_secs', type=int, default=30,
-    help='Minimum number of seconds a shard must remain in RUNNING state before considered
a '
+    help='Minimum number of seconds a instance must remain in RUNNING state before considered
a '
          'success.')
 
 

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/14c21fd7/src/main/python/apache/aurora/client/cli/task.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/client/cli/task.py b/src/main/python/apache/aurora/client/cli/task.py
new file mode 100644
index 0000000..d7bf5cd
--- /dev/null
+++ b/src/main/python/apache/aurora/client/cli/task.py
@@ -0,0 +1,177 @@
+#
+# Copyright 2013 Apache Software Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Implementation of the Task noun for the Apache Aurora client.
+"""
+
+from __future__ import print_function
+from datetime import datetime
+import json
+import os
+import pprint
+import subprocess
+import sys
+import time
+
+from apache.aurora.client.api.command_runner import DistributedCommandRunner
+from apache.aurora.client.api.job_monitor import JobMonitor
+from apache.aurora.client.api.updater_util import UpdaterConfig
+from apache.aurora.client.cli import (
+    EXIT_COMMAND_FAILURE,
+    EXIT_INVALID_CONFIGURATION,
+    EXIT_INVALID_PARAMETER,
+    EXIT_OK,
+    Noun,
+    Verb,
+)
+from apache.aurora.client.cli.context import AuroraCommandContext
+from apache.aurora.client.cli.options import (
+    BATCH_OPTION,
+    BIND_OPTION,
+    BROWSER_OPTION,
+    CONFIG_ARGUMENT,
+    EXECUTOR_SANDBOX_OPTION,
+    FORCE_OPTION,
+    HEALTHCHECK_OPTION,
+    INSTANCES_OPTION,
+    JOBSPEC_ARGUMENT,
+    JSON_READ_OPTION,
+    JSON_WRITE_OPTION,
+    SSH_USER_OPTION,
+    TASK_INSTANCE_ARGUMENT,
+    WATCH_OPTION,
+)
+from apache.aurora.common.aurora_job_key import AuroraJobKey
+from apache.aurora.common.clusters import CLUSTERS
+
+from gen.apache.aurora.constants import ACTIVE_STATES, AURORA_EXECUTOR_NAME
+from gen.apache.aurora.ttypes import (
+    ExecutorConfig,
+    ResponseCode,
+    ScheduleStatus,
+)
+
+from pystachio.config import Config
+from thrift.TSerialization import serialize
+from thrift.protocol import TJSONProtocol
+
+
+
+class RunCommand(Verb):
+  @property
+  def name(self):
+    return 'run'
+
+  @property
+  def help(self):
+    return """Usage: aurora task run cluster/role/env/job cmd
+
+  Runs a shell command on all machines currently hosting instances of a single job.
+
+  This feature supports the same command line wildcards that are used to
+  populate a job's commands.
+
+  This means anything in the {{mesos.*}} and {{thermos.*}} namespaces.
+  """
+
+  def setup_options_parser(self, parser):
+    parser.add_argument('--threads', '-t', type=int, default=1, dest='num_threads',
+        help='Number of threads to use')
+    self.add_option(parser, SSH_USER_OPTION)
+    self.add_option(parser, EXECUTOR_SANDBOX_OPTION)
+    self.add_option(parser, JOBSPEC_ARGUMENT)
+    parser.add_argument('cmd', type=str)
+
+  def execute(self, context):
+    # TODO(mchucarroll): add options to specify which instances to run on (AURORA-198)
+    cluster_name, role, env, name = context.options.jobspec
+    cluster = CLUSTERS[cluster_name]
+    dcr = DistributedCommandRunner(cluster, role, env, [name], context.options.ssh_user)
+    dcr.run(context.options.cmd, parallelism=context.options.num_threads,
+        executor_sandbox=context.options.executor_sandbox)
+
+
+class SshCommand(Verb):
+  @property
+  def name(self):
+    return 'ssh'
+
+  @property
+  def help(self):
+    return """usage: aurora task ssh cluster/role/env/job/instance [args...]
+
+  Initiate an SSH session on the machine that a task instance is running on.
+  """
+
+  def setup_options_parser(self, parser):
+    self.add_option(parser, SSH_USER_OPTION)
+    self.add_option(parser, EXECUTOR_SANDBOX_OPTION)
+    parser.add_argument('--tunnels', '-L', dest='tunnels', action='append', metavar='PORT:NAME',
+        default=[],
+        help="Add tunnel from local port PART to remote named port NAME")
+    parser.add_argument('--command', '-c', dest='command', type=str, default=None,
+        help="Command to execute through the ssh connection.")
+    self.add_option(parser, TASK_INSTANCE_ARGUMENT)
+
+  def execute(self, context):
+    (cluster, role, env, name) = context.options.task_instance.jobkey
+    instance = context.options.task_instance.instance
+
+    api = context.get_api(cluster)
+    resp = api.query(api.build_query(role, name, set([int(instance)]), env=env))
+    if resp.responseCode != ResponseCode.OK:
+      raise context.CommandError('Unable to get information about instance: %s' % resp.message)
+    first_task = resp.result.scheduleStatusResult.tasks[0]
+    remote_cmd = context.options.command or 'bash'
+    command = DistributedCommandRunner.substitute(remote_cmd, first_task,
+        api.cluster, executor_sandbox=context.options.executor_sandbox)
+
+    ssh_command = ['ssh', '-t']
+    role = first_task.assignedTask.task.owner.role
+    slave_host = first_task.assignedTask.slaveHost
+
+    for tunnel in context.options.tunnels:
+      try:
+        port, name = tunnel.split(':')
+        port = int(port)
+      except ValueError:
+        die('Could not parse tunnel: %s.  Must be of form PORT:NAME' % tunnel)
+      if name not in first_task.assignedTask.assignedPorts:
+        die('Task %s has no port named %s' % (first_task.assignedTask.taskId, name))
+      ssh_command += [
+          '-L', '%d:%s:%d' % (port, slave_host, first_task.assignedTask.assignedPorts[name])]
+
+    ssh_command += ['%s@%s' % (context.options.ssh_user or role, slave_host), command]
+    return subprocess.call(ssh_command)
+
+
+class Task(Noun):
+  @property
+  def name(self):
+    return 'task'
+
+  @property
+  def help(self):
+    return "Work with a task running in an Apache Aurora cluster"
+
+  @classmethod
+  def create_context(cls):
+    return AuroraCommandContext()
+
+  def __init__(self):
+    super(Task, self).__init__()
+    self.register_verb(RunCommand())
+    self.register_verb(SshCommand())

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/14c21fd7/src/test/python/apache/aurora/client/cli/BUILD
----------------------------------------------------------------------
diff --git a/src/test/python/apache/aurora/client/cli/BUILD b/src/test/python/apache/aurora/client/cli/BUILD
index 636f407..13a3d09 100644
--- a/src/test/python/apache/aurora/client/cli/BUILD
+++ b/src/test/python/apache/aurora/client/cli/BUILD
@@ -16,7 +16,13 @@
 
 python_test_suite(
   name = 'all',
-  dependencies = [ pants(':bridge'), pants(':job'), pants(':quota'), pants(':sla') ]
+  dependencies = [
+    pants(':bridge'),
+    pants(':job'),
+    pants(':quota'),
+    pants(':sla'),
+    pants(':task')
+  ]
 )
 
 python_library(
@@ -82,3 +88,16 @@ python_tests(
     pants('src/test/python/apache/aurora/client/commands:util')
   ]
 )
+
+python_tests(
+  name='task',
+  sources = [ 'test_task_run.py'],
+  dependencies = [
+    pants(':util'),
+    pants('3rdparty/python:mock'),
+    pants('3rdparty/python:twitter.common.contextutil'),
+    pants('src/main/python/apache/aurora/client/cli'),
+    pants('src/main/python/apache/aurora/client/cli:client'),
+    pants('src/test/python/apache/aurora/client/commands:util')
+  ]
+)

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/14c21fd7/src/test/python/apache/aurora/client/cli/test_task_run.py
----------------------------------------------------------------------
diff --git a/src/test/python/apache/aurora/client/cli/test_task_run.py b/src/test/python/apache/aurora/client/cli/test_task_run.py
new file mode 100644
index 0000000..56eed95
--- /dev/null
+++ b/src/test/python/apache/aurora/client/cli/test_task_run.py
@@ -0,0 +1,194 @@
+#
+# Copyright 2013 Apache Software Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import contextlib
+from mock import Mock, patch
+
+from apache.aurora.client.cli import EXIT_INVALID_PARAMETER
+from apache.aurora.client.cli.client import AuroraCommandLine
+from apache.aurora.common.aurora_job_key import AuroraJobKey
+from apache.aurora.client.cli.util import AuroraClientCommandTest, FakeAuroraCommandContext
+
+from gen.apache.aurora.ttypes import (
+    AssignedTask,
+    Identity,
+    JobKey,
+    ResponseCode,
+    ScheduleStatus,
+    ScheduleStatusResult,
+    TaskConfig,
+    TaskEvent,
+    TaskQuery,
+)
+
+
+class TestRunCommand(AuroraClientCommandTest):
+
+  @classmethod
+  def create_mock_scheduled_tasks(cls):
+    jobs = []
+    for name in ['foo', 'bar', 'baz']:
+      job = Mock()
+      job.key = JobKey(role=cls.TEST_ROLE, environment=cls.TEST_ENV, name=name)
+      job.failure_count = 0
+      job.assignedTask = Mock(spec=AssignedTask)
+      job.assignedTask.taskId = 1287391823
+      job.assignedTask.slaveHost = 'slavehost'
+      job.assignedTask.task = Mock(spec=TaskConfig)
+      job.assignedTask.task.executorConfig = Mock()
+      job.assignedTask.task.maxTaskFailures = 1
+      job.assignedTask.task.packages = []
+      job.assignedTask.task.owner = Identity(role='bozo')
+      job.assignedTask.task.environment = 'test'
+      job.assignedTask.task.jobName = 'woops'
+      job.assignedTask.task.numCpus = 2
+      job.assignedTask.task.ramMb = 2
+      job.assignedTask.task.diskMb = 2
+      job.assignedTask.instanceId = 4237894
+      job.assignedTask.assignedPorts = {}
+      job.status = ScheduleStatus.RUNNING
+      mockEvent = Mock(spec=TaskEvent)
+      mockEvent.timestamp = 28234726395
+      mockEvent.status = ScheduleStatus.RUNNING
+      mockEvent.message = "Hi there"
+      job.taskEvents = [mockEvent]
+      jobs.append(job)
+    return jobs
+
+  @classmethod
+  def create_status_response(cls):
+    resp = cls.create_simple_success_response()
+    resp.result.scheduleStatusResult = Mock(spec=ScheduleStatusResult)
+    resp.result.scheduleStatusResult.tasks = cls.create_mock_scheduled_tasks()
+    return resp
+
+  @classmethod
+  def create_failed_status_response(cls):
+    return cls.create_blank_response(ResponseCode.INVALID_REQUEST, 'No tasks found for query')
+
+  @classmethod
+  def create_mock_process(cls):
+    process = Mock()
+    process.communicate.return_value = ["hello", "world"]
+    return process
+
+  def test_successful_run(self):
+    """Test the run command."""
+    # Calls api.check_status, which calls scheduler_proxy.getJobs
+    (mock_api, mock_scheduler_proxy) = self.create_mock_api()
+    mock_scheduler_proxy.getTasksStatus.return_value = self.create_status_response()
+    sandbox_args = {'slave_root': '/slaveroot', 'slave_run_directory': 'slaverun'}
+    with contextlib.nested(
+        patch('apache.aurora.client.api.SchedulerProxy', return_value=mock_scheduler_proxy),
+        patch('apache.aurora.client.factory.CLUSTERS', new=self.TEST_CLUSTERS),
+        patch('apache.aurora.client.cli.task.CLUSTERS', new=self.TEST_CLUSTERS),
+        patch('apache.aurora.client.api.command_runner.DistributedCommandRunner.sandbox_args',
+            return_value=sandbox_args),
+        patch('subprocess.Popen', return_value=self.create_mock_process())) as (
+            mock_scheduler_proxy_class,
+            mock_clusters,
+            mock_clusters_cli,
+            mock_runner_args_patch,
+            mock_subprocess):
+      cmd = AuroraCommandLine()
+      cmd.execute(['task', 'run', 'west/bozo/test/hello', 'ls'])
+      # The status command sends a getTasksStatus query to the scheduler,
+      # and then prints the result.
+      mock_scheduler_proxy.getTasksStatus.assert_called_with(TaskQuery(jobName='hello',
+          environment='test', owner=Identity(role='bozo'),
+          statuses=set([ScheduleStatus.RUNNING, ScheduleStatus.KILLING, ScheduleStatus.RESTARTING,
+              ScheduleStatus.PREEMPTING, ScheduleStatus.DRAINING])))
+
+      # The mock status call returns 3 three ScheduledTasks, so three commands should have
been run
+      assert mock_subprocess.call_count == 3
+      mock_subprocess.assert_called_with(['ssh', '-n', '-q', 'bozo@slavehost',
+          'cd /slaveroot/slaves/*/frameworks/*/executors/thermos-1287391823/runs/'
+          'slaverun/sandbox;ls'],
+          stderr=-2, stdout=-1)
+
+
+class TestSshCommand(AuroraClientCommandTest):
+  @classmethod
+  def create_mock_scheduled_tasks(cls):
+    jobs = []
+    for name in ['foo', 'bar', 'baz']:
+      job = Mock()
+      job.key = JobKey(role=cls.TEST_ROLE, environment=cls.TEST_ENV, name=name)
+      job.failure_count = 0
+      job.assignedTask = Mock(spec=AssignedTask)
+      job.assignedTask.taskId = 1287391823
+      job.assignedTask.slaveHost = 'slavehost'
+      job.assignedTask.task = Mock(spec=TaskConfig)
+      job.assignedTask.task.executorConfig = Mock()
+      job.assignedTask.task.maxTaskFailures = 1
+      job.assignedTask.task.packages = []
+      job.assignedTask.task.owner = Identity(role='bozo')
+      job.assignedTask.task.environment = 'test'
+      job.assignedTask.task.jobName = 'woops'
+      job.assignedTask.task.numCpus = 2
+      job.assignedTask.task.ramMb = 2
+      job.assignedTask.task.diskMb = 2
+      job.assignedTask.instanceId = 4237894
+      job.assignedTask.assignedPorts = {}
+      job.status = ScheduleStatus.RUNNING
+      mockEvent = Mock(spec=TaskEvent)
+      mockEvent.timestamp = 28234726395
+      mockEvent.status = ScheduleStatus.RUNNING
+      mockEvent.message = "Hi there"
+      job.taskEvents = [mockEvent]
+      jobs.append(job)
+    return jobs
+
+  @classmethod
+  def create_status_response(cls):
+    resp = cls.create_simple_success_response()
+    resp.result.scheduleStatusResult = Mock(spec=ScheduleStatusResult)
+    resp.result.scheduleStatusResult.tasks = cls.create_mock_scheduled_tasks()
+    return resp
+
+  @classmethod
+  def create_failed_status_response(cls):
+    return cls.create_blank_response(ResponseCode.INVALID_REQUEST, 'No tasks found for query')
+
+  def test_successful_ssh(self):
+    """Test the ssh command."""
+    (mock_api, mock_scheduler_proxy) = self.create_mock_api()
+    mock_scheduler_proxy.getTasksStatus.return_value = self.create_status_response()
+    sandbox_args = {'slave_root': '/slaveroot', 'slave_run_directory': 'slaverun'}
+    with contextlib.nested(
+        patch('apache.aurora.client.api.SchedulerProxy', return_value=mock_scheduler_proxy),
+        patch('apache.aurora.client.factory.CLUSTERS', new=self.TEST_CLUSTERS),
+        patch('apache.aurora.client.api.command_runner.DistributedCommandRunner.sandbox_args',
+            return_value=sandbox_args),
+        patch('subprocess.call', return_value=0)) as (
+            mock_scheduler_proxy_class,
+            mock_clusters,
+            mock_runner_args_patch,
+            mock_subprocess):
+      cmd = AuroraCommandLine()
+      cmd.execute(['task', 'ssh', 'west/bozo/test/hello/1', '--command=ls'])
+
+      # The status command sends a getTasksStatus query to the scheduler,
+      # and then prints the result.
+      mock_scheduler_proxy.getTasksStatus.assert_called_with(TaskQuery(jobName='hello',
+          environment='test', owner=Identity(role='bozo'), instanceIds=set([1]),
+          statuses=set([ScheduleStatus.RUNNING, ScheduleStatus.KILLING, ScheduleStatus.RESTARTING,
+              ScheduleStatus.PREEMPTING, ScheduleStatus.DRAINING
+              ])))
+      mock_subprocess.assert_called_with(['ssh', '-t', 'bozo@slavehost',
+          'cd /slaveroot/slaves/*/frameworks/*/executors/thermos-1287391823/runs/'
+          'slaverun/sandbox;ls'])
+


Mime
View raw message