aurora-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ma...@apache.org
Subject git commit: Ignore THROTTLED tasks when looking for tasks associated with a slave.
Date Fri, 07 Feb 2014 00:21:19 GMT
Updated Branches:
  refs/heads/master 1590d5b0e -> 4dcddab2f


Ignore THROTTLED tasks when looking for tasks associated with a slave.

Bugs closed: AURORA-194

Reviewed at https://reviews.apache.org/r/17785/


Project: http://git-wip-us.apache.org/repos/asf/incubator-aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-aurora/commit/4dcddab2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-aurora/tree/4dcddab2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-aurora/diff/4dcddab2

Branch: refs/heads/master
Commit: 4dcddab2fca13553c20965ff8d5ae185d421afcc
Parents: 1590d5b
Author: Bill Farner <wfarner@apache.org>
Authored: Thu Feb 6 16:15:06 2014 -0800
Committer: Maxim Khutornenko <maxim@apache.org>
Committed: Thu Feb 6 16:15:06 2014 -0800

----------------------------------------------------------------------
 .../aurora/scheduler/async/Preemptor.java       |  2 +-
 .../aurora/scheduler/async/TaskScheduler.java   |  8 +--
 .../org/apache/aurora/scheduler/base/Tasks.java |  6 ++
 .../thrift/org/apache/aurora/gen/api.thrift     | 28 ++++++---
 .../scheduler/async/PreemptorImplTest.java      | 45 +++++++++++++++
 .../scheduler/async/TaskSchedulerImplTest.java  | 60 ++++++++++++++++----
 .../org/apache/aurora/gen/api.thrift.md5        |  2 +-
 7 files changed, 123 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/4dcddab2/src/main/java/org/apache/aurora/scheduler/async/Preemptor.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/Preemptor.java b/src/main/java/org/apache/aurora/scheduler/async/Preemptor.java
index 9d00315..7893e55 100644
--- a/src/main/java/org/apache/aurora/scheduler/async/Preemptor.java
+++ b/src/main/java/org/apache/aurora/scheduler/async/Preemptor.java
@@ -101,7 +101,7 @@ public interface Preemptor {
 
     @VisibleForTesting
     static final Query.Builder CANDIDATE_QUERY = Query.statusScoped(
-        EnumSet.copyOf(Sets.difference(Tasks.ACTIVE_STATES, EnumSet.of(PENDING, PREEMPTING))));
+        EnumSet.copyOf(Sets.difference(Tasks.SLAVE_ASSIGNED_STATES, EnumSet.of(PREEMPTING))));
 
     private static final Function<IAssignedTask, Integer> GET_PRIORITY =
         new Function<IAssignedTask, Integer>() {

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/4dcddab2/src/main/java/org/apache/aurora/scheduler/async/TaskScheduler.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/TaskScheduler.java b/src/main/java/org/apache/aurora/scheduler/async/TaskScheduler.java
index 5357481..61385c6 100644
--- a/src/main/java/org/apache/aurora/scheduler/async/TaskScheduler.java
+++ b/src/main/java/org/apache/aurora/scheduler/async/TaskScheduler.java
@@ -17,7 +17,6 @@ package org.apache.aurora.scheduler.async;
 
 import java.lang.annotation.Retention;
 import java.lang.annotation.Target;
-import java.util.EnumSet;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.logging.Level;
@@ -35,7 +34,6 @@ import com.google.common.cache.Cache;
 import com.google.common.cache.CacheBuilder;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
-import com.google.common.collect.Sets;
 import com.google.common.eventbus.Subscribe;
 import com.google.inject.BindingAnnotation;
 import com.twitter.common.inject.TimedInterceptor.Timed;
@@ -45,7 +43,6 @@ import com.twitter.common.stats.StatImpl;
 import com.twitter.common.stats.Stats;
 import com.twitter.common.util.Clock;
 
-import org.apache.aurora.gen.ScheduleStatus;
 import org.apache.aurora.scheduler.base.Query;
 import org.apache.aurora.scheduler.base.Tasks;
 import org.apache.aurora.scheduler.events.PubsubEvent.EventSubscriber;
@@ -165,12 +162,9 @@ interface TaskScheduler extends EventSubscriber {
     static final Optional<String> LAUNCH_FAILED_MSG =
         Optional.of("Unknown exception attempting to schedule task.");
 
-    private static final Iterable<ScheduleStatus> ACTIVE_NOT_PENDING_STATES =
-        EnumSet.copyOf(Sets.difference(Tasks.ACTIVE_STATES, EnumSet.of(ScheduleStatus.PENDING)));
-
     @VisibleForTesting
     static Query.Builder activeJobStateQuery(IJobKey jobKey) {
-      return Query.jobScoped(jobKey).byStatus(ACTIVE_NOT_PENDING_STATES);
+      return Query.jobScoped(jobKey).byStatus(Tasks.SLAVE_ASSIGNED_STATES);
     }
 
     private CachedJobState getJobState(final TaskStore store, final IJobKey jobKey) {

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/4dcddab2/src/main/java/org/apache/aurora/scheduler/base/Tasks.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/base/Tasks.java b/src/main/java/org/apache/aurora/scheduler/base/Tasks.java
index 0375b96..d9cb886 100644
--- a/src/main/java/org/apache/aurora/scheduler/base/Tasks.java
+++ b/src/main/java/org/apache/aurora/scheduler/base/Tasks.java
@@ -122,6 +122,12 @@ public final class Tasks {
   public static final Set<ScheduleStatus> TERMINAL_STATES =
       EnumSet.copyOf(apiConstants.TERMINAL_STATES);
 
+  /**
+   * Tasks a state can be in when associated with a slave machine.
+   */
+  public static final Set<ScheduleStatus> SLAVE_ASSIGNED_STATES =
+      EnumSet.copyOf(apiConstants.SLAVE_ASSIGNED_STATES);
+
   public static final Predicate<ITaskConfig> IS_PRODUCTION =
       new Predicate<ITaskConfig>() {
         @Override

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/4dcddab2/src/main/thrift/org/apache/aurora/gen/api.thrift
----------------------------------------------------------------------
diff --git a/src/main/thrift/org/apache/aurora/gen/api.thrift b/src/main/thrift/org/apache/aurora/gen/api.thrift
index e5a3675..23883cf 100644
--- a/src/main/thrift/org/apache/aurora/gen/api.thrift
+++ b/src/main/thrift/org/apache/aurora/gen/api.thrift
@@ -262,20 +262,30 @@ enum ScheduleStatus {
 }
 
 // States that a task may be in while still considered active.
-const set<ScheduleStatus> ACTIVE_STATES = [ScheduleStatus.THROTTLED,
+const set<ScheduleStatus> ACTIVE_STATES = [ScheduleStatus.ASSIGNED,
+                                           ScheduleStatus.DRAINING,
+                                           ScheduleStatus.KILLING,
                                            ScheduleStatus.PENDING,
-                                           ScheduleStatus.ASSIGNED,
-                                           ScheduleStatus.STARTING,
+                                           ScheduleStatus.PREEMPTING,
+                                           ScheduleStatus.RESTARTING
                                            ScheduleStatus.RUNNING,
-                                           ScheduleStatus.KILLING,
-                                           ScheduleStatus.RESTARTING,
-                                           ScheduleStatus.PREEMPTING]
+                                           ScheduleStatus.STARTING,
+                                           ScheduleStatus.THROTTLED]
+
+// States that a task may be in while associated with a slave machine and non-terminal.
+const set<ScheduleStatus> SLAVE_ASSIGNED_STATES = [ScheduleStatus.ASSIGNED,
+                                                   ScheduleStatus.DRAINING,
+                                                   ScheduleStatus.KILLING,
+                                                   ScheduleStatus.PREEMPTING,
+                                                   ScheduleStatus.RESTARTING,
+                                                   ScheduleStatus.RUNNING,
+                                                   ScheduleStatus.STARTING]
 
 // States that a task may be in while in an active sandbox.
-const set<ScheduleStatus> LIVE_STATES = [ScheduleStatus.RUNNING,
-                                         ScheduleStatus.KILLING,
+const set<ScheduleStatus> LIVE_STATES = [ScheduleStatus.KILLING,
+                                         ScheduleStatus.PREEMPTING,
                                          ScheduleStatus.RESTARTING,
-                                         ScheduleStatus.PREEMPTING]
+                                         ScheduleStatus.RUNNING]
 
 // States a completed task may be in.
 const set<ScheduleStatus> TERMINAL_STATES = [ScheduleStatus.FAILED,

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/4dcddab2/src/test/java/org/apache/aurora/scheduler/async/PreemptorImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/async/PreemptorImplTest.java b/src/test/java/org/apache/aurora/scheduler/async/PreemptorImplTest.java
index bf097d7..dc371e0 100644
--- a/src/test/java/org/apache/aurora/scheduler/async/PreemptorImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/async/PreemptorImplTest.java
@@ -50,8 +50,12 @@ import org.apache.aurora.scheduler.filter.SchedulingFilter;
 import org.apache.aurora.scheduler.filter.SchedulingFilterImpl;
 import org.apache.aurora.scheduler.state.MaintenanceController;
 import org.apache.aurora.scheduler.state.StateManager;
+import org.apache.aurora.scheduler.storage.Storage;
+import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
+import org.apache.aurora.scheduler.storage.Storage.MutateWork;
 import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
 import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
+import org.apache.aurora.scheduler.storage.mem.MemStorage;
 import org.apache.aurora.scheduler.storage.testing.StorageTestUtil;
 import org.easymock.EasyMock;
 import org.easymock.IAnswer;
@@ -62,12 +66,14 @@ import org.junit.Test;
 import static org.apache.aurora.gen.MaintenanceMode.NONE;
 import static org.apache.aurora.gen.ScheduleStatus.PENDING;
 import static org.apache.aurora.gen.ScheduleStatus.RUNNING;
+import static org.apache.aurora.gen.ScheduleStatus.THROTTLED;
 import static org.apache.aurora.scheduler.async.Preemptor.PreemptorImpl;
 import static org.apache.aurora.scheduler.filter.SchedulingFilter.Veto;
 import static org.apache.mesos.Protos.Offer;
 import static org.apache.mesos.Protos.Resource;
 import static org.easymock.EasyMock.eq;
 import static org.easymock.EasyMock.expect;
+import static org.junit.Assert.assertEquals;
 
 public class PreemptorImplTest extends EasyMockTest {
 
@@ -469,6 +475,45 @@ public class PreemptorImplTest extends EasyMockTest {
     runPreemptor(p1);
   }
 
+  @Test
+  public void testIgnoresThrottledTasks() throws Exception {
+    // Ensures that the preemptor does not consider a throttled task to be a preemption candidate.
+    schedulingFilter = createMock(SchedulingFilter.class);
+
+    Storage storage = MemStorage.newEmptyStorage();
+
+    final ScheduledTask throttled = makeTask(USER_A, JOB_A, TASK_ID_A + "_a1").setStatus(THROTTLED);
+    throttled.getAssignedTask().getTask().setNumCpus(1).setRamMb(512);
+
+    final ScheduledTask pending = makeProductionTask(USER_B, JOB_B, TASK_ID_B + "_p1");
+    pending.getAssignedTask().getTask().setNumCpus(1).setRamMb(1024);
+
+    storage.write(new MutateWork.NoResult.Quiet() {
+      @Override
+      protected void execute(MutableStoreProvider store) {
+        store.getUnsafeTaskStore().saveTasks(ImmutableSet.of(
+            IScheduledTask.build(pending),
+            IScheduledTask.build(throttled)));
+      }
+    });
+
+    clock.advance(PREEMPTION_DELAY);
+
+    control.replay();
+
+    PreemptorImpl preemptor = new PreemptorImpl(
+        storage,
+        stateManager,
+        offerQueue,
+        schedulingFilter,
+        PREEMPTION_DELAY,
+        clock);
+
+    assertEquals(
+        Optional.<String>absent(),
+        preemptor.findPreemptionSlotFor(pending.getAssignedTask().getTaskId(), EMPTY_JOB));
+  }
+
   // TODO(zmanji) spread tasks across slave ids on the same host and see if preemption fails.
 
   private Offer makeOffer(String offerId,

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/4dcddab2/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerImplTest.java b/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerImplTest.java
index 4c86d8a..9464783 100644
--- a/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerImplTest.java
@@ -42,7 +42,10 @@ import org.apache.aurora.scheduler.state.PubsubTestUtil;
 import org.apache.aurora.scheduler.state.StateManager;
 import org.apache.aurora.scheduler.state.TaskAssigner;
 import org.apache.aurora.scheduler.storage.Storage;
+import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
+import org.apache.aurora.scheduler.storage.Storage.MutateWork;
 import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
+import org.apache.aurora.scheduler.storage.mem.MemStorage;
 import org.apache.aurora.scheduler.storage.testing.StorageTestUtil;
 import org.apache.mesos.Protos.Offer;
 import org.apache.mesos.Protos.TaskInfo;
@@ -51,6 +54,7 @@ import org.junit.Before;
 import org.junit.Test;
 
 import static org.apache.aurora.gen.ScheduleStatus.PENDING;
+import static org.apache.aurora.gen.ScheduleStatus.THROTTLED;
 import static org.apache.aurora.scheduler.async.TaskScheduler.TaskSchedulerResult.SUCCESS;
 import static org.apache.aurora.scheduler.async.TaskScheduler.TaskSchedulerResult.TRY_AGAIN;
 import static org.easymock.EasyMock.capture;
@@ -89,7 +93,13 @@ public class TaskSchedulerImplTest extends EasyMockTest {
     clock.setNowMillis(0);
     preemptor = createMock(Preemptor.class);
 
-    Injector injector = Guice.createInjector(new AbstractModule() {
+    Injector injector = getInjector(storageUtil.storage);
+    scheduler = injector.getInstance(TaskScheduler.class);
+    eventSink = PubsubTestUtil.startPubsub(injector);
+  }
+
+  private Injector getInjector(final Storage storageImpl) {
+    return Guice.createInjector(new AbstractModule() {
       @Override
       protected void configure() {
         PubsubTestUtil.installPubsub(binder());
@@ -99,12 +109,9 @@ public class TaskSchedulerImplTest extends EasyMockTest {
         bind(StateManager.class).toInstance(stateManager);
         bind(TaskAssigner.class).toInstance(assigner);
         bind(Clock.class).toInstance(clock);
-        bind(Storage.class).toInstance(storageUtil.storage);
+        bind(Storage.class).toInstance(storageImpl);
       }
     });
-
-    scheduler = injector.getInstance(TaskScheduler.class);
-    eventSink = PubsubTestUtil.startPubsub(injector);
   }
 
   private void expectTaskStillPendingQuery(IScheduledTask task) {
@@ -119,7 +126,7 @@ public class TaskSchedulerImplTest extends EasyMockTest {
   }
 
   @Test
-  public void testReservationsDeniesTasksForTimePeriod() throws OfferQueue.LaunchException
{
+  public void testReservationsDeniesTasksForTimePeriod() throws Exception {
     storageUtil.expectOperations();
 
     expectTaskStillPendingQuery(TASK_A);
@@ -154,7 +161,7 @@ public class TaskSchedulerImplTest extends EasyMockTest {
   }
 
   @Test
-  public void testReservationsExpireAfterAccepted() throws OfferQueue.LaunchException {
+  public void testReservationsExpireAfterAccepted() throws Exception {
     storageUtil.expectOperations();
 
     expectTaskStillPendingQuery(TASK_A);
@@ -188,7 +195,7 @@ public class TaskSchedulerImplTest extends EasyMockTest {
   }
 
   @Test
-  public void testReservationsAcceptsWithInTimePeriod() throws OfferQueue.LaunchException
{
+  public void testReservationsAcceptsWithInTimePeriod() throws Exception {
     storageUtil.expectOperations();
     expectTaskStillPendingQuery(TASK_A);
     expectActiveJobFetch(TASK_A);
@@ -211,7 +218,7 @@ public class TaskSchedulerImplTest extends EasyMockTest {
   }
 
   @Test
-  public void testReservationsCancellation() throws OfferQueue.LaunchException {
+  public void testReservationsCancellation() throws Exception {
     storageUtil.expectOperations();
 
     expectTaskStillPendingQuery(TASK_A);
@@ -237,7 +244,7 @@ public class TaskSchedulerImplTest extends EasyMockTest {
   }
 
   @Test
-  public void testReservationsExpire() throws OfferQueue.LaunchException {
+  public void testReservationsExpire() throws Exception {
     storageUtil.expectOperations();
 
     expectTaskStillPendingQuery(TASK_B);
@@ -260,6 +267,39 @@ public class TaskSchedulerImplTest extends EasyMockTest {
     firstAssignment.getValue().apply(OFFER);
   }
 
+  @Test
+  public void testIgnoresThrottledTasks() throws Exception {
+    // Ensures that tasks in THROTTLED state are not considered part of the active job state
passed
+    // to the assigner function.
+
+    Storage memStorage = MemStorage.newEmptyStorage();
+
+    Injector injector = getInjector(memStorage);
+    scheduler = injector.getInstance(TaskScheduler.class);
+    eventSink = PubsubTestUtil.startPubsub(injector);
+
+    ScheduledTask builder = TASK_A.newBuilder();
+    final IScheduledTask taskA = IScheduledTask.build(builder.setStatus(PENDING));
+    builder.getAssignedTask().setTaskId("b");
+    final IScheduledTask taskB = IScheduledTask.build(builder.setStatus(THROTTLED));
+
+    memStorage.write(new MutateWork.NoResult.Quiet() {
+      @Override
+      protected void execute(MutableStoreProvider store) {
+        store.getUnsafeTaskStore().saveTasks(ImmutableSet.of(taskA, taskB));
+      }
+    });
+
+    Capture<Function<Offer, Optional<TaskInfo>>> assignment = expectLaunchAttempt(true);
+    expect(assigner.maybeAssign(OFFER, taskA, EMPTY_JOB))
+        .andReturn(Optional.of(TaskInfo.getDefaultInstance()));
+
+    control.replay();
+
+    assertEquals(SUCCESS, scheduler.schedule(Tasks.id(taskA)));
+    assignment.getValue().apply(OFFER);
+  }
+
   private static IScheduledTask makeTask(String taskId) {
     return IScheduledTask.build(new ScheduledTask()
         .setAssignedTask(new AssignedTask()

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/4dcddab2/src/test/resources/org/apache/aurora/gen/api.thrift.md5
----------------------------------------------------------------------
diff --git a/src/test/resources/org/apache/aurora/gen/api.thrift.md5 b/src/test/resources/org/apache/aurora/gen/api.thrift.md5
index 22042a4..2bdd038 100644
--- a/src/test/resources/org/apache/aurora/gen/api.thrift.md5
+++ b/src/test/resources/org/apache/aurora/gen/api.thrift.md5
@@ -1 +1 @@
-63ed50d02363dadde8422b08a163add4
+049d4b884c2d698e27c627796bd46e65


Mime
View raw message