aurora-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From wfar...@apache.org
Subject incubator-aurora git commit: Remove LiveClusterState, make CachedClusterState (now ClusterStateImpl) default.
Date Wed, 25 Feb 2015 21:01:58 GMT
Repository: incubator-aurora
Updated Branches:
  refs/heads/master 895d03ba6 -> 9442e0836


Remove LiveClusterState, make CachedClusterState (now ClusterStateImpl) default.

Bugs closed: AURORA-1055

Reviewed at https://reviews.apache.org/r/31389/


Project: http://git-wip-us.apache.org/repos/asf/incubator-aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-aurora/commit/9442e083
Tree: http://git-wip-us.apache.org/repos/asf/incubator-aurora/tree/9442e083
Diff: http://git-wip-us.apache.org/repos/asf/incubator-aurora/diff/9442e083

Branch: refs/heads/master
Commit: 9442e08360157235ff8d7e64e125d2473ca7ec47
Parents: 895d03b
Author: Bill Farner <wfarner@apache.org>
Authored: Wed Feb 25 12:58:45 2015 -0800
Committer: Bill Farner <wfarner@apache.org>
Committed: Wed Feb 25 12:58:45 2015 -0800

----------------------------------------------------------------------
 .../aurora/benchmark/SchedulingBenchmarks.java  |   8 +-
 .../async/preemptor/CachedClusterState.java     |  50 -------
 .../async/preemptor/ClusterStateImpl.java       |  50 +++++++
 .../async/preemptor/LiveClusterState.java       |  66 ---------
 .../async/preemptor/PreemptorModule.java        |  20 +--
 .../async/preemptor/CachedClusterStateTest.java | 134 -------------------
 .../async/preemptor/ClusterStateImplTest.java   | 134 +++++++++++++++++++
 .../async/preemptor/LiveClusterStateTest.java   |  87 ------------
 .../async/preemptor/PreemptorImplTest.java      | 105 ++++++---------
 9 files changed, 231 insertions(+), 423 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/9442e083/src/jmh/java/org/apache/aurora/benchmark/SchedulingBenchmarks.java
----------------------------------------------------------------------
diff --git a/src/jmh/java/org/apache/aurora/benchmark/SchedulingBenchmarks.java b/src/jmh/java/org/apache/aurora/benchmark/SchedulingBenchmarks.java
index 0351b10..3239eaa 100644
--- a/src/jmh/java/org/apache/aurora/benchmark/SchedulingBenchmarks.java
+++ b/src/jmh/java/org/apache/aurora/benchmark/SchedulingBenchmarks.java
@@ -45,8 +45,8 @@ import org.apache.aurora.scheduler.async.OfferManager;
 import org.apache.aurora.scheduler.async.RescheduleCalculator;
 import org.apache.aurora.scheduler.async.TaskScheduler;
 import org.apache.aurora.scheduler.async.TaskScheduler.TaskSchedulerImpl.ReservationDuration;
-import org.apache.aurora.scheduler.async.preemptor.CachedClusterState;
 import org.apache.aurora.scheduler.async.preemptor.ClusterState;
+import org.apache.aurora.scheduler.async.preemptor.ClusterStateImpl;
 import org.apache.aurora.scheduler.async.preemptor.Preemptor;
 import org.apache.aurora.scheduler.async.preemptor.PreemptorImpl;
 import org.apache.aurora.scheduler.configuration.Resources;
@@ -136,8 +136,8 @@ public class SchedulingBenchmarks {
               bind(new TypeLiteral<Amount<Long, Time>>() { })
                   .annotatedWith(PreemptorImpl.PreemptionDelay.class)
                   .toInstance(Amount.of(0L, Time.MILLISECONDS));
-              bind(ClusterState.class).to(CachedClusterState.class);
-              bind(CachedClusterState.class).in(Singleton.class);
+              bind(ClusterState.class).to(ClusterStateImpl.class);
+              bind(ClusterStateImpl.class).in(Singleton.class);
 
               bind(Storage.class).toInstance(storage);
               bind(Driver.class).toInstance(new FakeDriver());
@@ -156,7 +156,7 @@ public class SchedulingBenchmarks {
 
       taskScheduler = injector.getInstance(TaskScheduler.class);
       offerManager = injector.getInstance(OfferManager.class);
-      eventBus.register(injector.getInstance(CachedClusterState.class));
+      eventBus.register(injector.getInstance(ClusterStateImpl.class));
 
       settings = getSettings();
       saveHostAttributes(settings.getHostAttributes());

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/9442e083/src/main/java/org/apache/aurora/scheduler/async/preemptor/CachedClusterState.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/preemptor/CachedClusterState.java
b/src/main/java/org/apache/aurora/scheduler/async/preemptor/CachedClusterState.java
deleted file mode 100644
index 2831103..0000000
--- a/src/main/java/org/apache/aurora/scheduler/async/preemptor/CachedClusterState.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.async.preemptor;
-
-import com.google.common.collect.HashMultimap;
-import com.google.common.collect.Multimap;
-import com.google.common.collect.Multimaps;
-import com.google.common.eventbus.Subscribe;
-
-import org.apache.aurora.scheduler.base.Tasks;
-import org.apache.aurora.scheduler.events.PubsubEvent;
-import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
-
-/**
- * A cached view of cluster state, kept up to date by pubsub notifications.
- */
-public class CachedClusterState implements ClusterState, PubsubEvent.EventSubscriber {
-
-  private final Multimap<String, PreemptionVictim> victims =
-      Multimaps.synchronizedMultimap(HashMultimap.<String, PreemptionVictim>create());
-
-  @Override
-  public Multimap<String, PreemptionVictim> getSlavesToActiveTasks() {
-    return Multimaps.unmodifiableMultimap(victims);
-  }
-
-  @Subscribe
-  public void taskChangedState(TaskStateChange stateChange) {
-    synchronized (victims) {
-      String slaveId = stateChange.getTask().getAssignedTask().getSlaveId();
-      PreemptionVictim victim = PreemptionVictim.fromTask(stateChange.getTask().getAssignedTask());
-      if (Tasks.SLAVE_ASSIGNED_STATES.contains(stateChange.getNewState())) {
-        victims.put(slaveId, victim);
-      } else {
-        victims.remove(slaveId, victim);
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/9442e083/src/main/java/org/apache/aurora/scheduler/async/preemptor/ClusterStateImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/preemptor/ClusterStateImpl.java
b/src/main/java/org/apache/aurora/scheduler/async/preemptor/ClusterStateImpl.java
new file mode 100644
index 0000000..cd016af
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/async/preemptor/ClusterStateImpl.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.async.preemptor;
+
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Multimaps;
+import com.google.common.eventbus.Subscribe;
+
+import org.apache.aurora.scheduler.base.Tasks;
+import org.apache.aurora.scheduler.events.PubsubEvent;
+import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
+
+/**
+ * A cached view of cluster state, kept up to date by pubsub notifications.
+ */
+public class ClusterStateImpl implements ClusterState, PubsubEvent.EventSubscriber {
+
+  private final Multimap<String, PreemptionVictim> victims =
+      Multimaps.synchronizedMultimap(HashMultimap.<String, PreemptionVictim>create());
+
+  @Override
+  public Multimap<String, PreemptionVictim> getSlavesToActiveTasks() {
+    return Multimaps.unmodifiableMultimap(victims);
+  }
+
+  @Subscribe
+  public void taskChangedState(TaskStateChange stateChange) {
+    synchronized (victims) {
+      String slaveId = stateChange.getTask().getAssignedTask().getSlaveId();
+      PreemptionVictim victim = PreemptionVictim.fromTask(stateChange.getTask().getAssignedTask());
+      if (Tasks.SLAVE_ASSIGNED_STATES.contains(stateChange.getNewState())) {
+        victims.put(slaveId, victim);
+      } else {
+        victims.remove(slaveId, victim);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/9442e083/src/main/java/org/apache/aurora/scheduler/async/preemptor/LiveClusterState.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/preemptor/LiveClusterState.java
b/src/main/java/org/apache/aurora/scheduler/async/preemptor/LiveClusterState.java
deleted file mode 100644
index 38cb173..0000000
--- a/src/main/java/org/apache/aurora/scheduler/async/preemptor/LiveClusterState.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.async.preemptor;
-
-import java.util.EnumSet;
-
-import javax.inject.Inject;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.ImmutableMultimap;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Multimap;
-import com.google.common.collect.Sets;
-
-import org.apache.aurora.scheduler.base.Query;
-import org.apache.aurora.scheduler.base.Tasks;
-import org.apache.aurora.scheduler.storage.Storage;
-import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
-
-import static java.util.Objects.requireNonNull;
-
-import static org.apache.aurora.gen.ScheduleStatus.PREEMPTING;
-import static org.apache.aurora.scheduler.base.Tasks.SCHEDULED_TO_ASSIGNED;
-
-// TODO(wfarner): Remove this in favor of CachedClusterState.
-class LiveClusterState implements ClusterState {
-
-  @VisibleForTesting
-  static final Query.Builder CANDIDATE_QUERY = Query.statusScoped(
-      EnumSet.copyOf(Sets.difference(Tasks.SLAVE_ASSIGNED_STATES, EnumSet.of(PREEMPTING))));
-
-  private final Storage storage;
-
-  @Inject
-  LiveClusterState(Storage storage) {
-    this.storage = requireNonNull(storage);
-  }
-
-  @Override
-  public Multimap<String, PreemptionVictim> getSlavesToActiveTasks() {
-    // Only non-pending active tasks may be preempted.
-    Iterable<IAssignedTask> activeTasks = Iterables.transform(
-        Storage.Util.fetchTasks(storage, CANDIDATE_QUERY),
-        SCHEDULED_TO_ASSIGNED);
-
-    // Group the tasks by slave id so they can be paired with offers from the same slave.
-    // Choosing to do this iteratively instead of using Multimaps.index/transform to avoid
-    // generating a very large intermediate map.
-    ImmutableMultimap.Builder<String, PreemptionVictim> tasksBySlave = ImmutableMultimap.builder();
-    for (IAssignedTask task : activeTasks) {
-      tasksBySlave.put(task.getSlaveId(), PreemptionVictim.fromTask(task));
-    }
-    return tasksBySlave.build();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/9442e083/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptorModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptorModule.java
b/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptorModule.java
index 2030a94..85b3874 100644
--- a/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptorModule.java
+++ b/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptorModule.java
@@ -39,11 +39,6 @@ public class PreemptorModule extends AbstractModule {
       help = "Enable the preemptor and preemption")
   private static final Arg<Boolean> ENABLE_PREEMPTOR = Arg.create(true);
 
-  @CmdLine(name = "enable_preemptor_caching",
-      help = "Cache the state consumed by the preemptor to improve scheduling throughput
at the "
-          + "cost of higher memory consumption.")
-  private static final Arg<Boolean> ENABLE_PREEMPTOR_CACHING = Arg.create(true);
-
   @CmdLine(name = "preemption_delay",
       help = "Time interval after which a pending task becomes eligible to preempt other
tasks")
   private static final Arg<Amount<Long, Time>> PREEMPTION_DELAY =
@@ -71,14 +66,9 @@ public class PreemptorModule extends AbstractModule {
           bind(PreemptorImpl.class).in(Singleton.class);
           bind(new TypeLiteral<Amount<Long, Time>>() { }).annotatedWith(PreemptionDelay.class)
               .toInstance(PREEMPTION_DELAY.get());
-          if (ENABLE_PREEMPTOR_CACHING.get()) {
-            bind(ClusterState.class).to(CachedClusterState.class);
-            bind(CachedClusterState.class).in(Singleton.class);
-            expose(CachedClusterState.class);
-          } else {
-            bind(ClusterState.class).to(LiveClusterState.class);
-            bind(LiveClusterState.class).in(Singleton.class);
-          }
+          bind(ClusterState.class).to(ClusterStateImpl.class);
+          bind(ClusterStateImpl.class).in(Singleton.class);
+          expose(ClusterStateImpl.class);
         } else {
           bind(Preemptor.class).toInstance(NULL_PREEMPTOR);
           LOG.warning("Preemptor Disabled.");
@@ -91,9 +81,7 @@ public class PreemptorModule extends AbstractModule {
     // We can't do this in the private module due to the known conflict between multibindings
     // and private modules due to multiple injectors.  We accept the added complexity here
to keep
     // the other bindings private.
-    if (enablePreemptor && ENABLE_PREEMPTOR_CACHING.get()) {
-      PubsubEventModule.bindSubscriber(binder(), CachedClusterState.class);
-    }
+    PubsubEventModule.bindSubscriber(binder(), ClusterStateImpl.class);
   }
 
   private static final Preemptor NULL_PREEMPTOR = new Preemptor() {

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/9442e083/src/test/java/org/apache/aurora/scheduler/async/preemptor/CachedClusterStateTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/async/preemptor/CachedClusterStateTest.java
b/src/test/java/org/apache/aurora/scheduler/async/preemptor/CachedClusterStateTest.java
deleted file mode 100644
index 7cc04dd..0000000
--- a/src/test/java/org/apache/aurora/scheduler/async/preemptor/CachedClusterStateTest.java
+++ /dev/null
@@ -1,134 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.async.preemptor;
-
-import com.google.common.collect.HashMultimap;
-import com.google.common.collect.ImmutableMultimap;
-
-import org.apache.aurora.gen.AssignedTask;
-import org.apache.aurora.gen.Identity;
-import org.apache.aurora.gen.ScheduleStatus;
-import org.apache.aurora.gen.ScheduledTask;
-import org.apache.aurora.gen.TaskConfig;
-import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
-import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
-import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
-import org.junit.Before;
-import org.junit.Test;
-
-import static org.apache.aurora.gen.ScheduleStatus.ASSIGNED;
-import static org.apache.aurora.gen.ScheduleStatus.FAILED;
-import static org.apache.aurora.gen.ScheduleStatus.FINISHED;
-import static org.apache.aurora.gen.ScheduleStatus.KILLED;
-import static org.apache.aurora.gen.ScheduleStatus.KILLING;
-import static org.apache.aurora.gen.ScheduleStatus.PENDING;
-import static org.apache.aurora.gen.ScheduleStatus.RUNNING;
-import static org.apache.aurora.gen.ScheduleStatus.THROTTLED;
-import static org.junit.Assert.assertEquals;
-
-public class CachedClusterStateTest {
-
-  private CachedClusterState state;
-
-  @Before
-  public void setUp() {
-    state = new CachedClusterState();
-  }
-
-  @Test(expected = UnsupportedOperationException.class)
-  public void testImmutable() {
-    state.getSlavesToActiveTasks().clear();
-  }
-
-  @Test
-  public void testTaskLifecycle() {
-    IAssignedTask a = makeTask("a", "s1");
-
-    assertVictims();
-    changeState(a, THROTTLED);
-    assertVictims();
-    changeState(a, PENDING);
-    assertVictims();
-    changeState(a, ASSIGNED);
-    assertVictims(a);
-    changeState(a, RUNNING);
-    assertVictims(a);
-    changeState(a, KILLING);
-    assertVictims(a);
-    changeState(a, FINISHED);
-    assertVictims();
-  }
-
-  @Test
-  public void testTaskChangesSlaves() {
-    // We do not intend to handle the case of an external failure leading to the same task
ID
-    // on a different slave.
-    IAssignedTask a = makeTask("a", "s1");
-    IAssignedTask a1 = makeTask("a", "s2");
-    changeState(a, RUNNING);
-    changeState(a1, RUNNING);
-    assertVictims(a, a1);
-  }
-
-  @Test
-  public void testMultipleTasks() {
-    IAssignedTask a = makeTask("a", "s1");
-    IAssignedTask b = makeTask("b", "s1");
-    IAssignedTask c = makeTask("c", "s2");
-    IAssignedTask d = makeTask("d", "s3");
-    IAssignedTask e = makeTask("e", "s3");
-    IAssignedTask f = makeTask("f", "s1");
-    changeState(a, RUNNING);
-    assertVictims(a);
-    changeState(b, RUNNING);
-    assertVictims(a, b);
-    changeState(c, RUNNING);
-    assertVictims(a, b, c);
-    changeState(d, RUNNING);
-    assertVictims(a, b, c, d);
-    changeState(e, RUNNING);
-    assertVictims(a, b, c, d, e);
-    changeState(c, FINISHED);
-    assertVictims(a, b, d, e);
-    changeState(a, FAILED);
-    changeState(e, KILLED);
-    assertVictims(b, d);
-    changeState(f, RUNNING);
-    assertVictims(b, d, f);
-  }
-
-  private void assertVictims(IAssignedTask... tasks) {
-    ImmutableMultimap.Builder<String, PreemptionVictim> victims = ImmutableMultimap.builder();
-    for (IAssignedTask task : tasks) {
-      victims.put(task.getSlaveId(), PreemptionVictim.fromTask(task));
-    }
-    assertEquals(HashMultimap.create(victims.build()), state.getSlavesToActiveTasks());
-  }
-
-  private IAssignedTask makeTask(String taskId, String slaveId) {
-    return IAssignedTask.build(new AssignedTask()
-        .setTaskId(taskId)
-        .setSlaveId(slaveId)
-        .setSlaveHost(slaveId + "host")
-        .setTask(new TaskConfig()
-            .setOwner(new Identity().setRole("role"))));
-  }
-
-  private void changeState(IAssignedTask assignedTask, ScheduleStatus status) {
-    IScheduledTask task = IScheduledTask.build(new ScheduledTask()
-        .setStatus(status)
-        .setAssignedTask(assignedTask.newBuilder()));
-    state.taskChangedState(TaskStateChange.transition(task, ScheduleStatus.INIT));
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/9442e083/src/test/java/org/apache/aurora/scheduler/async/preemptor/ClusterStateImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/async/preemptor/ClusterStateImplTest.java
b/src/test/java/org/apache/aurora/scheduler/async/preemptor/ClusterStateImplTest.java
new file mode 100644
index 0000000..7207867
--- /dev/null
+++ b/src/test/java/org/apache/aurora/scheduler/async/preemptor/ClusterStateImplTest.java
@@ -0,0 +1,134 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.async.preemptor;
+
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.ImmutableMultimap;
+
+import org.apache.aurora.gen.AssignedTask;
+import org.apache.aurora.gen.Identity;
+import org.apache.aurora.gen.ScheduleStatus;
+import org.apache.aurora.gen.ScheduledTask;
+import org.apache.aurora.gen.TaskConfig;
+import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
+import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
+import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.apache.aurora.gen.ScheduleStatus.ASSIGNED;
+import static org.apache.aurora.gen.ScheduleStatus.FAILED;
+import static org.apache.aurora.gen.ScheduleStatus.FINISHED;
+import static org.apache.aurora.gen.ScheduleStatus.KILLED;
+import static org.apache.aurora.gen.ScheduleStatus.KILLING;
+import static org.apache.aurora.gen.ScheduleStatus.PENDING;
+import static org.apache.aurora.gen.ScheduleStatus.RUNNING;
+import static org.apache.aurora.gen.ScheduleStatus.THROTTLED;
+import static org.junit.Assert.assertEquals;
+
+public class ClusterStateImplTest {
+
+  private ClusterStateImpl state;
+
+  @Before
+  public void setUp() {
+    state = new ClusterStateImpl();
+  }
+
+  @Test(expected = UnsupportedOperationException.class)
+  public void testImmutable() {
+    state.getSlavesToActiveTasks().clear();
+  }
+
+  @Test
+  public void testTaskLifecycle() {
+    IAssignedTask a = makeTask("a", "s1");
+
+    assertVictims();
+    changeState(a, THROTTLED);
+    assertVictims();
+    changeState(a, PENDING);
+    assertVictims();
+    changeState(a, ASSIGNED);
+    assertVictims(a);
+    changeState(a, RUNNING);
+    assertVictims(a);
+    changeState(a, KILLING);
+    assertVictims(a);
+    changeState(a, FINISHED);
+    assertVictims();
+  }
+
+  @Test
+  public void testTaskChangesSlaves() {
+    // We do not intend to handle the case of an external failure leading to the same task
ID
+    // on a different slave.
+    IAssignedTask a = makeTask("a", "s1");
+    IAssignedTask a1 = makeTask("a", "s2");
+    changeState(a, RUNNING);
+    changeState(a1, RUNNING);
+    assertVictims(a, a1);
+  }
+
+  @Test
+  public void testMultipleTasks() {
+    IAssignedTask a = makeTask("a", "s1");
+    IAssignedTask b = makeTask("b", "s1");
+    IAssignedTask c = makeTask("c", "s2");
+    IAssignedTask d = makeTask("d", "s3");
+    IAssignedTask e = makeTask("e", "s3");
+    IAssignedTask f = makeTask("f", "s1");
+    changeState(a, RUNNING);
+    assertVictims(a);
+    changeState(b, RUNNING);
+    assertVictims(a, b);
+    changeState(c, RUNNING);
+    assertVictims(a, b, c);
+    changeState(d, RUNNING);
+    assertVictims(a, b, c, d);
+    changeState(e, RUNNING);
+    assertVictims(a, b, c, d, e);
+    changeState(c, FINISHED);
+    assertVictims(a, b, d, e);
+    changeState(a, FAILED);
+    changeState(e, KILLED);
+    assertVictims(b, d);
+    changeState(f, RUNNING);
+    assertVictims(b, d, f);
+  }
+
+  private void assertVictims(IAssignedTask... tasks) {
+    ImmutableMultimap.Builder<String, PreemptionVictim> victims = ImmutableMultimap.builder();
+    for (IAssignedTask task : tasks) {
+      victims.put(task.getSlaveId(), PreemptionVictim.fromTask(task));
+    }
+    assertEquals(HashMultimap.create(victims.build()), state.getSlavesToActiveTasks());
+  }
+
+  private IAssignedTask makeTask(String taskId, String slaveId) {
+    return IAssignedTask.build(new AssignedTask()
+        .setTaskId(taskId)
+        .setSlaveId(slaveId)
+        .setSlaveHost(slaveId + "host")
+        .setTask(new TaskConfig()
+            .setOwner(new Identity().setRole("role"))));
+  }
+
+  private void changeState(IAssignedTask assignedTask, ScheduleStatus status) {
+    IScheduledTask task = IScheduledTask.build(new ScheduledTask()
+        .setStatus(status)
+        .setAssignedTask(assignedTask.newBuilder()));
+    state.taskChangedState(TaskStateChange.transition(task, ScheduleStatus.INIT));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/9442e083/src/test/java/org/apache/aurora/scheduler/async/preemptor/LiveClusterStateTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/async/preemptor/LiveClusterStateTest.java
b/src/test/java/org/apache/aurora/scheduler/async/preemptor/LiveClusterStateTest.java
deleted file mode 100644
index 763f8b5..0000000
--- a/src/test/java/org/apache/aurora/scheduler/async/preemptor/LiveClusterStateTest.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.async.preemptor;
-
-import com.google.common.collect.ImmutableMultimap;
-import com.twitter.common.testing.easymock.EasyMockTest;
-
-import org.apache.aurora.gen.AssignedTask;
-import org.apache.aurora.gen.Identity;
-import org.apache.aurora.gen.ScheduledTask;
-import org.apache.aurora.gen.TaskConfig;
-import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
-import org.apache.aurora.scheduler.storage.testing.StorageTestUtil;
-import org.junit.Before;
-import org.junit.Test;
-
-import static org.junit.Assert.assertEquals;
-
-public class LiveClusterStateTest extends EasyMockTest {
-
-  private StorageTestUtil storageUtil;
-  private ClusterState clusterState;
-
-  @Before
-  public void setUp() {
-    storageUtil = new StorageTestUtil(this);
-    storageUtil.expectOperations();
-    clusterState = new LiveClusterState(storageUtil.storage);
-  }
-
-  @Test
-  public void testEmptyStorage() {
-    storageUtil.expectTaskFetch(LiveClusterState.CANDIDATE_QUERY);
-
-    control.replay();
-
-    assertEquals(
-        ImmutableMultimap.<String, PreemptionVictim>of(),
-        clusterState.getSlavesToActiveTasks());
-  }
-
-  private IScheduledTask makeTask(String taskId, String slaveId) {
-    return IScheduledTask.build(new ScheduledTask()
-        .setAssignedTask(new AssignedTask()
-            .setTaskId(taskId)
-            .setSlaveId(slaveId)
-            .setSlaveHost(slaveId + "-host")
-            .setTask(new TaskConfig()
-                .setOwner(new Identity("owner", "role"))
-                .setNumCpus(1)
-                .setRamMb(1)
-                .setDiskMb(1))));
-  }
-
-  private static PreemptionVictim fromTask(IScheduledTask task) {
-    return PreemptionVictim.fromTask(task.getAssignedTask());
-  }
-
-  @Test
-  public void testGetActiveTasks() {
-    IScheduledTask a = makeTask("a", "1");
-    IScheduledTask b = makeTask("b", "1");
-    IScheduledTask c = makeTask("c", "2");
-
-    storageUtil.expectTaskFetch(LiveClusterState.CANDIDATE_QUERY, a, b, c);
-
-    control.replay();
-
-    assertEquals(
-        ImmutableMultimap.<String, PreemptionVictim>builder()
-            .putAll("1", fromTask(a), fromTask(b))
-            .putAll("2", fromTask(c))
-            .build(),
-        clusterState.getSlavesToActiveTasks());
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/9442e083/src/test/java/org/apache/aurora/scheduler/async/preemptor/PreemptorImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/async/preemptor/PreemptorImplTest.java
b/src/test/java/org/apache/aurora/scheduler/async/preemptor/PreemptorImplTest.java
index 83cf118..44cd8f7 100644
--- a/src/test/java/org/apache/aurora/scheduler/async/preemptor/PreemptorImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/async/preemptor/PreemptorImplTest.java
@@ -25,6 +25,8 @@ import com.google.common.collect.FluentIterable;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Lists;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Multimaps;
 import com.twitter.common.quantity.Amount;
 import com.twitter.common.quantity.Data;
 import com.twitter.common.quantity.Time;
@@ -56,12 +58,9 @@ import org.apache.aurora.scheduler.filter.SchedulingFilterImpl;
 import org.apache.aurora.scheduler.mesos.TaskExecutors;
 import org.apache.aurora.scheduler.state.StateManager;
 import org.apache.aurora.scheduler.storage.AttributeStore;
-import org.apache.aurora.scheduler.storage.Storage;
-import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
-import org.apache.aurora.scheduler.storage.Storage.MutateWork;
+import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
 import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
 import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
-import org.apache.aurora.scheduler.storage.mem.MemStorage;
 import org.apache.aurora.scheduler.storage.testing.StorageTestUtil;
 import org.apache.aurora.scheduler.testing.FakeStatsProvider;
 import org.easymock.EasyMock;
@@ -73,13 +72,11 @@ import org.junit.Test;
 import static org.apache.aurora.gen.MaintenanceMode.NONE;
 import static org.apache.aurora.gen.ScheduleStatus.PENDING;
 import static org.apache.aurora.gen.ScheduleStatus.RUNNING;
-import static org.apache.aurora.gen.ScheduleStatus.THROTTLED;
 import static org.apache.aurora.scheduler.filter.SchedulingFilter.Veto;
 import static org.apache.mesos.Protos.Offer;
 import static org.apache.mesos.Protos.Resource;
 import static org.easymock.EasyMock.eq;
 import static org.easymock.EasyMock.expect;
-import static org.junit.Assert.assertEquals;
 
 public class PreemptorImplTest extends EasyMockTest {
 
@@ -106,6 +103,7 @@ public class PreemptorImplTest extends EasyMockTest {
   private SchedulingFilter schedulingFilter;
   private FakeClock clock;
   private StatsProvider statsProvider;
+  private ClusterState clusterState;
   private OfferManager offerManager;
   private AttributeAggregate emptyJob;
 
@@ -116,6 +114,7 @@ public class PreemptorImplTest extends EasyMockTest {
     stateManager = createMock(StateManager.class);
     clock = new FakeClock();
     statsProvider = new FakeStatsProvider();
+    clusterState = createMock(ClusterState.class);
     offerManager = createMock(OfferManager.class);
     emptyJob = new AttributeAggregate(
         Suppliers.ofInstance(ImmutableSet.<IScheduledTask>of()),
@@ -131,7 +130,7 @@ public class PreemptorImplTest extends EasyMockTest {
         PREEMPTION_DELAY,
         clock,
         statsProvider,
-        new LiveClusterState(storageUtil.storage),
+        clusterState,
         TaskExecutors.NO_OVERHEAD_EXECUTOR);
 
     preemptor.findPreemptionSlotFor(pendingTask.getAssignedTask().getTaskId(), emptyJob);
@@ -148,10 +147,26 @@ public class PreemptorImplTest extends EasyMockTest {
         IScheduledTask.setFromBuilders(Arrays.asList(returnedTasks)));
   }
 
-  private void expectGetActiveTasks(ScheduledTask... returnedTasks) {
-    storageUtil.expectTaskFetch(
-        LiveClusterState.CANDIDATE_QUERY,
-        IScheduledTask.setFromBuilders(Arrays.asList(returnedTasks)));
+  private static final Function<ScheduledTask, String> GET_SLAVE_ID =
+      new Function<ScheduledTask, String>() {
+        @Override
+        public String apply(ScheduledTask task) {
+          return task.getAssignedTask().getSlaveId();
+        }
+      };
+
+  private void expectGetClusterState(ScheduledTask... returnedTasks) {
+    Multimap<String, PreemptionVictim> state = Multimaps.transformValues(
+        Multimaps.index(Arrays.asList(returnedTasks), GET_SLAVE_ID),
+        new Function<ScheduledTask, PreemptionVictim>() {
+          @Override
+          public PreemptionVictim apply(ScheduledTask task) {
+            return PreemptionVictim.fromTask(IAssignedTask.build(task.getAssignedTask()));
+          }
+        }
+    );
+
+    expect(clusterState.getSlavesToActiveTasks()).andReturn(state);
   }
 
   @Test
@@ -168,7 +183,7 @@ public class PreemptorImplTest extends EasyMockTest {
     expectNoOffers();
 
     expectGetPendingTasks(highPriority);
-    expectGetActiveTasks(lowPriority);
+    expectGetClusterState(lowPriority);
 
     expectFiltering();
     expectPreempted(lowPriority);
@@ -194,7 +209,7 @@ public class PreemptorImplTest extends EasyMockTest {
     expectNoOffers();
 
     expectGetPendingTasks(highPriority);
-    expectGetActiveTasks(lowerPriority, lowerPriority);
+    expectGetClusterState(lowerPriority, lowerPriority);
 
     expectFiltering();
     expectPreempted(lowerPriority);
@@ -223,7 +238,7 @@ public class PreemptorImplTest extends EasyMockTest {
     expectNoOffers();
 
     expectGetPendingTasks(pendingPriority);
-    expectGetActiveTasks(highPriority, lowerPriority, lowestPriority);
+    expectGetClusterState(highPriority, lowerPriority, lowestPriority);
 
     expectFiltering();
     expectPreempted(lowestPriority);
@@ -244,7 +259,7 @@ public class PreemptorImplTest extends EasyMockTest {
     expectNoOffers();
 
     expectGetPendingTasks(task);
-    expectGetActiveTasks(highPriority);
+    expectGetClusterState(highPriority);
 
     control.replay();
     runPreemptor(task);
@@ -265,7 +280,7 @@ public class PreemptorImplTest extends EasyMockTest {
     expectNoOffers();
 
     expectGetPendingTasks(p1);
-    expectGetActiveTasks(a1);
+    expectGetClusterState(a1);
 
     expectFiltering();
     expectPreempted(a1);
@@ -289,7 +304,7 @@ public class PreemptorImplTest extends EasyMockTest {
     expectNoOffers();
 
     expectGetPendingTasks(p1);
-    expectGetActiveTasks(a1);
+    expectGetClusterState(a1);
 
     expectFiltering();
     expectPreempted(a1);
@@ -310,7 +325,7 @@ public class PreemptorImplTest extends EasyMockTest {
     expectNoOffers();
 
     expectGetPendingTasks(p1);
-    expectGetActiveTasks(a1);
+    expectGetClusterState(a1);
 
     control.replay();
     runPreemptor(p1);
@@ -339,7 +354,7 @@ public class PreemptorImplTest extends EasyMockTest {
     expectNoOffers();
 
     expectGetPendingTasks(p1);
-    expectGetActiveTasks(a1, b1);
+    expectGetClusterState(a1, b1);
 
     expectPreempted(a1);
     expectPreempted(b1);
@@ -375,7 +390,7 @@ public class PreemptorImplTest extends EasyMockTest {
     expectNoOffers();
 
     expectGetPendingTasks(p1);
-    expectGetActiveTasks(b1, b2, a1);
+    expectGetClusterState(b1, b2, a1);
 
     expectPreempted(a1);
 
@@ -401,7 +416,7 @@ public class PreemptorImplTest extends EasyMockTest {
 
     expectNoOffers();
 
-    expectGetActiveTasks(p1);
+    expectGetClusterState(p1);
     expectGetPendingTasks(p2);
 
     control.replay();
@@ -427,7 +442,7 @@ public class PreemptorImplTest extends EasyMockTest {
 
     clock.advance(PREEMPTION_DELAY);
 
-    expectGetActiveTasks(a1);
+    expectGetClusterState(a1);
     expectGetPendingTasks(p1);
 
     expectPreempted(a1);
@@ -459,7 +474,7 @@ public class PreemptorImplTest extends EasyMockTest {
 
     clock.advance(PREEMPTION_DELAY);
 
-    expectGetActiveTasks(a1, a2);
+    expectGetClusterState(a1, a2);
     expectGetPendingTasks(p1);
 
     expectPreempted(a1);
@@ -488,55 +503,13 @@ public class PreemptorImplTest extends EasyMockTest {
 
     clock.advance(PREEMPTION_DELAY);
 
-    expectGetActiveTasks(a1);
+    expectGetClusterState(a1);
     expectGetPendingTasks(p1);
 
     control.replay();
     runPreemptor(p1);
   }
 
-  @Test
-  public void testIgnoresThrottledTasks() throws Exception {
-    // Ensures that the preemptor does not consider a throttled task to be a preemption candidate.
-    schedulingFilter = createMock(SchedulingFilter.class);
-
-    Storage storage = MemStorage.newEmptyStorage();
-
-    final ScheduledTask throttled = makeTask(USER_A, JOB_A, TASK_ID_A + "_a1").setStatus(THROTTLED);
-    throttled.getAssignedTask().getTask().setNumCpus(1).setRamMb(512);
-
-    final ScheduledTask pending = makeProductionTask(USER_B, JOB_B, TASK_ID_B + "_p1");
-    pending.getAssignedTask().getTask().setNumCpus(1).setRamMb(1024);
-
-    storage.write(new MutateWork.NoResult.Quiet() {
-      @Override
-      protected void execute(MutableStoreProvider store) {
-        store.getUnsafeTaskStore().saveTasks(ImmutableSet.of(
-            IScheduledTask.build(pending),
-            IScheduledTask.build(throttled)));
-      }
-    });
-
-    clock.advance(PREEMPTION_DELAY);
-
-    control.replay();
-
-    PreemptorImpl preemptor = new PreemptorImpl(
-        storage,
-        stateManager,
-        offerManager,
-        schedulingFilter,
-        PREEMPTION_DELAY,
-        clock,
-        statsProvider,
-        new LiveClusterState(storage),
-        TaskExecutors.NO_OVERHEAD_EXECUTOR);
-
-    assertEquals(
-        Optional.<String>absent(),
-        preemptor.findPreemptionSlotFor(pending.getAssignedTask().getTaskId(), emptyJob));
-  }
-
   // TODO(zmanji) spread tasks across slave ids on the same host and see if preemption fails.
 
   private Offer makeOffer(String offerId,


Mime
View raw message