aurora-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ma...@apache.org
Subject incubator-aurora git commit: Pending reason for all tasks. Part 1 - Extracting GroupKey.
Date Thu, 05 Mar 2015 18:37:33 GMT
Repository: incubator-aurora
Updated Branches:
  refs/heads/master cb3749147 -> f64b3f85c


Pending reason for all tasks. Part 1 - Extracting GroupKey.

Bugs closed: AURORA-911

Reviewed at https://reviews.apache.org/r/31646/


Project: http://git-wip-us.apache.org/repos/asf/incubator-aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-aurora/commit/f64b3f85
Tree: http://git-wip-us.apache.org/repos/asf/incubator-aurora/tree/f64b3f85
Diff: http://git-wip-us.apache.org/repos/asf/incubator-aurora/diff/f64b3f85

Branch: refs/heads/master
Commit: f64b3f85c01f82ecf430a900df3e371e81d6565b
Parents: cb37491
Author: Maxim Khutornenko <maxim@apache.org>
Authored: Thu Mar 5 10:37:15 2015 -0800
Committer: Maxim Khutornenko <maxim@apache.org>
Committed: Thu Mar 5 10:37:15 2015 -0800

----------------------------------------------------------------------
 .../aurora/scheduler/async/OfferManager.java    | 14 ++---
 .../aurora/scheduler/async/TaskGroup.java       |  8 +--
 .../aurora/scheduler/async/TaskGroups.java      | 39 ++-----------
 .../aurora/scheduler/async/TaskScheduler.java   |  4 +-
 .../aurora/scheduler/base/TaskGroupKey.java     | 61 ++++++++++++++++++++
 .../scheduler/async/OfferManagerImplTest.java   |  6 +-
 .../scheduler/async/TaskSchedulerImplTest.java  | 10 ++--
 .../scheduler/async/TaskSchedulerTest.java      |  4 +-
 8 files changed, 90 insertions(+), 56 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/f64b3f85/src/main/java/org/apache/aurora/scheduler/async/OfferManager.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/OfferManager.java b/src/main/java/org/apache/aurora/scheduler/async/OfferManager.java
index 7d37139..7d2cb46 100644
--- a/src/main/java/org/apache/aurora/scheduler/async/OfferManager.java
+++ b/src/main/java/org/apache/aurora/scheduler/async/OfferManager.java
@@ -42,7 +42,7 @@ import com.twitter.common.stats.Stats;
 
 import org.apache.aurora.gen.MaintenanceMode;
 import org.apache.aurora.scheduler.HostOffer;
-import org.apache.aurora.scheduler.async.TaskGroups.GroupKey;
+import org.apache.aurora.scheduler.base.TaskGroupKey;
 import org.apache.aurora.scheduler.events.PubsubEvent.DriverDisconnected;
 import org.apache.aurora.scheduler.events.PubsubEvent.EventSubscriber;
 import org.apache.aurora.scheduler.mesos.Driver;
@@ -89,7 +89,7 @@ public interface OfferManager extends EventSubscriber {
    * @throws LaunchException If the acceptor accepted an offer, but there was an error launching
the
    *                         task.
    */
-  boolean launchFirst(Function<HostOffer, Assignment> acceptor, GroupKey groupKey)
+  boolean launchFirst(Function<HostOffer, Assignment> acceptor, TaskGroupKey groupKey)
       throws LaunchException;
 
   /**
@@ -254,7 +254,7 @@ public interface OfferManager extends EventSubscriber {
       // TODO(maxim): Expose via a debug endpoint. AURORA-1136.
       // Keep track of offer->groupKey mappings that will never be matched to avoid redundant
       // scheduling attempts. See Assignment.Result for more details on static ban.
-      private final Multimap<OfferID, GroupKey> staticallyBannedOffers = HashMultimap.create();
+      private final Multimap<OfferID, TaskGroupKey> staticallyBannedOffers = HashMultimap.create();
 
       HostOffers() {
         // Potential gotcha - since this is a ConcurrentSkipListSet, size() is more expensive.
@@ -297,7 +297,7 @@ public interface OfferManager extends EventSubscriber {
         return Iterables.unmodifiableIterable(offers);
       }
 
-      synchronized boolean isStaticallyBanned(HostOffer offer, GroupKey groupKey) {
+      synchronized boolean isStaticallyBanned(HostOffer offer, TaskGroupKey groupKey) {
         boolean result = staticallyBannedOffers.containsEntry(offer.getOffer().getId(), groupKey);
         if (LOG.isLoggable(Level.FINE)) {
           LOG.fine(String.format(
@@ -309,7 +309,7 @@ public interface OfferManager extends EventSubscriber {
         return result;
       }
 
-      synchronized void addStaticGroupBan(HostOffer offer, GroupKey groupKey) {
+      synchronized void addStaticGroupBan(HostOffer offer, TaskGroupKey groupKey) {
         OfferID offerId = offer.getOffer().getId();
         if (offersById.containsKey(offerId)) {
           staticallyBannedOffers.put(offerId, groupKey);
@@ -332,7 +332,7 @@ public interface OfferManager extends EventSubscriber {
 
     @Timed("offer_queue_launch_first")
     @Override
-    public boolean launchFirst(Function<HostOffer, Assignment> acceptor, GroupKey groupKey)
+    public boolean launchFirst(Function<HostOffer, Assignment> acceptor, TaskGroupKey
groupKey)
         throws LaunchException {
 
       // It's important that this method is not called concurrently - doing so would open
up the
@@ -352,7 +352,7 @@ public interface OfferManager extends EventSubscriber {
     protected boolean acceptOffer(
         HostOffer offer,
         Function<HostOffer, Assignment> acceptor,
-        GroupKey groupKey) throws LaunchException {
+        TaskGroupKey groupKey) throws LaunchException {
 
       Assignment assignment = acceptor.apply(offer);
       switch (assignment.getResult()) {

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/f64b3f85/src/main/java/org/apache/aurora/scheduler/async/TaskGroup.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/TaskGroup.java b/src/main/java/org/apache/aurora/scheduler/async/TaskGroup.java
index e5067e1..635419b 100644
--- a/src/main/java/org/apache/aurora/scheduler/async/TaskGroup.java
+++ b/src/main/java/org/apache/aurora/scheduler/async/TaskGroup.java
@@ -20,24 +20,24 @@ import com.google.common.base.Optional;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Lists;
 
-import org.apache.aurora.scheduler.async.TaskGroups.GroupKey;
+import org.apache.aurora.scheduler.base.TaskGroupKey;
 
 /**
  * A group of task IDs that are eligible for scheduling, but may be waiting for a backoff
to expire.
  */
 class TaskGroup {
-  private final GroupKey key;
+  private final TaskGroupKey key;
   private long penaltyMs;
   private final Queue<String> tasks;
 
-  TaskGroup(GroupKey key, String initialTaskId) {
+  TaskGroup(TaskGroupKey key, String initialTaskId) {
     this.key = key;
     this.penaltyMs = 0;
     this.tasks = Lists.newLinkedList();
     this.tasks.add(initialTaskId);
   }
 
-  synchronized GroupKey getKey() {
+  synchronized TaskGroupKey getKey() {
     return key;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/f64b3f85/src/main/java/org/apache/aurora/scheduler/async/TaskGroups.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/TaskGroups.java b/src/main/java/org/apache/aurora/scheduler/async/TaskGroups.java
index 8cd6c96..1580404 100644
--- a/src/main/java/org/apache/aurora/scheduler/async/TaskGroups.java
+++ b/src/main/java/org/apache/aurora/scheduler/async/TaskGroups.java
@@ -13,7 +13,6 @@
  */
 package org.apache.aurora.scheduler.async;
 
-import java.util.Objects;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
@@ -39,14 +38,13 @@ import com.twitter.common.util.BackoffStrategy;
 import com.twitter.common.util.concurrent.ExecutorServiceShutdown;
 
 import org.apache.aurora.scheduler.base.AsyncUtil;
-import org.apache.aurora.scheduler.base.JobKeys;
+import org.apache.aurora.scheduler.base.TaskGroupKey;
 import org.apache.aurora.scheduler.base.Tasks;
 import org.apache.aurora.scheduler.events.PubsubEvent.EventSubscriber;
 import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
 import org.apache.aurora.scheduler.events.PubsubEvent.TasksDeleted;
 import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
 import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
-import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
 
 import static java.util.Objects.requireNonNull;
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
@@ -56,7 +54,8 @@ import static org.apache.aurora.gen.ScheduleStatus.PENDING;
 /**
  * A collection of task groups, where a task group is a collection of tasks that are known
to be
  * equal in the way they schedule. This is expected to be tasks associated with the same
job key,
- * who also have {@code equal()} {@link ITaskConfig} values.
+ * who also have {@code equal()} {@link org.apache.aurora.scheduler.storage.entities.ITaskConfig}
+ * values.
  * <p>
  * This is used to prevent redundant work in trying to schedule tasks as well as to provide
  * nearly-equal responsiveness when scheduling across jobs.  In other words, a 1000 instance
job
@@ -66,7 +65,7 @@ public class TaskGroups implements EventSubscriber {
 
   private static final Logger LOG = Logger.getLogger(TaskGroups.class.getName());
 
-  private final ConcurrentMap<GroupKey, TaskGroup> groups = Maps.newConcurrentMap();
+  private final ConcurrentMap<TaskGroupKey, TaskGroup> groups = Maps.newConcurrentMap();
   private final ScheduledExecutorService executor;
   private final TaskScheduler taskScheduler;
   private final long firstScheduleDelay;
@@ -199,7 +198,7 @@ public class TaskGroups implements EventSubscriber {
   public synchronized void taskChangedState(TaskStateChange stateChange) {
     if (stateChange.getNewState() == PENDING) {
       IScheduledTask task = stateChange.getTask();
-      GroupKey key = new GroupKey(task.getAssignedTask().getTask());
+      TaskGroupKey key = TaskGroupKey.from(task.getAssignedTask().getTask());
       TaskGroup newGroup = new TaskGroup(key, Tasks.id(task));
       TaskGroup existing = groups.putIfAbsent(key, newGroup);
       if (existing == null) {
@@ -226,7 +225,7 @@ public class TaskGroups implements EventSubscriber {
   public synchronized void tasksDeleted(TasksDeleted deleted) {
     for (IAssignedTask task
         : Iterables.transform(deleted.getTasks(), Tasks.SCHEDULED_TO_ASSIGNED)) {
-      TaskGroup group = groups.get(new GroupKey(task.getTask()));
+      TaskGroup group = groups.get(TaskGroupKey.from(task.getTask()));
       if (group != null) {
         group.remove(task.getTaskId());
       }
@@ -237,30 +236,4 @@ public class TaskGroups implements EventSubscriber {
     return ImmutableSet.copyOf(groups.values());
   }
 
-  static class GroupKey {
-    private final ITaskConfig canonicalTask;
-
-    GroupKey(ITaskConfig task) {
-      this.canonicalTask = task;
-    }
-
-    @Override
-    public int hashCode() {
-      return Objects.hash(canonicalTask);
-    }
-
-    @Override
-    public boolean equals(Object o) {
-      if (!(o instanceof GroupKey)) {
-        return false;
-      }
-      GroupKey other = (GroupKey) o;
-      return Objects.equals(canonicalTask, other.canonicalTask);
-    }
-
-    @Override
-    public String toString() {
-      return JobKeys.canonicalString(Tasks.INFO_TO_JOB_KEY.apply(canonicalTask));
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/f64b3f85/src/main/java/org/apache/aurora/scheduler/async/TaskScheduler.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/TaskScheduler.java b/src/main/java/org/apache/aurora/scheduler/async/TaskScheduler.java
index d0fe3e1..e093ca5 100644
--- a/src/main/java/org/apache/aurora/scheduler/async/TaskScheduler.java
+++ b/src/main/java/org/apache/aurora/scheduler/async/TaskScheduler.java
@@ -42,9 +42,9 @@ import com.twitter.common.stats.StatsProvider;
 import com.twitter.common.util.Clock;
 
 import org.apache.aurora.scheduler.HostOffer;
-import org.apache.aurora.scheduler.async.TaskGroups.GroupKey;
 import org.apache.aurora.scheduler.async.preemptor.Preemptor;
 import org.apache.aurora.scheduler.base.Query;
+import org.apache.aurora.scheduler.base.TaskGroupKey;
 import org.apache.aurora.scheduler.base.Tasks;
 import org.apache.aurora.scheduler.events.PubsubEvent.EventSubscriber;
 import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
@@ -218,7 +218,7 @@ public interface TaskScheduler extends EventSubscriber {
         try {
           boolean launched = offerManager.launchFirst(
               getAssignerFunction(store, new ResourceRequest(task, taskId, aggregate)),
-              new GroupKey(task));
+              TaskGroupKey.from(task));
 
           if (!launched) {
             // Task could not be scheduled.

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/f64b3f85/src/main/java/org/apache/aurora/scheduler/base/TaskGroupKey.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/base/TaskGroupKey.java b/src/main/java/org/apache/aurora/scheduler/base/TaskGroupKey.java
new file mode 100644
index 0000000..6af3949
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/base/TaskGroupKey.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.base;
+
+import java.util.Objects;
+
+import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Identifier for a group of identical {@link ITaskConfig} instances. Serves as a separation
layer
+ * between a task configuration and its scheduling purpose representation.
+ */
+public final class TaskGroupKey {
+  private final ITaskConfig canonicalTask;
+
+  private TaskGroupKey(ITaskConfig task) {
+    this.canonicalTask = requireNonNull(task);
+  }
+
+  /**
+   * Creates a {@code TaskGroupKey} from {@link ITaskConfig}.
+   *
+   * @param task Task to create a {@code TaskGroupKey} from.
+   * @return An instance of {@code TaskGroupKey}.
+   */
+  public static TaskGroupKey from(ITaskConfig task) {
+    return new TaskGroupKey(task);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(canonicalTask);
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (!(o instanceof TaskGroupKey)) {
+      return false;
+    }
+    TaskGroupKey other = (TaskGroupKey) o;
+    return Objects.equals(canonicalTask, other.canonicalTask);
+  }
+
+  @Override
+  public String toString() {
+    return JobKeys.canonicalString(Tasks.INFO_TO_JOB_KEY.apply(canonicalTask));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/f64b3f85/src/test/java/org/apache/aurora/scheduler/async/OfferManagerImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/async/OfferManagerImplTest.java b/src/test/java/org/apache/aurora/scheduler/async/OfferManagerImplTest.java
index 0cbc71d..874a124 100644
--- a/src/test/java/org/apache/aurora/scheduler/async/OfferManagerImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/async/OfferManagerImplTest.java
@@ -31,7 +31,7 @@ import org.apache.aurora.gen.TaskConfig;
 import org.apache.aurora.scheduler.HostOffer;
 import org.apache.aurora.scheduler.async.OfferManager.OfferManagerImpl;
 import org.apache.aurora.scheduler.async.OfferManager.OfferReturnDelay;
-import org.apache.aurora.scheduler.async.TaskGroups.GroupKey;
+import org.apache.aurora.scheduler.base.TaskGroupKey;
 import org.apache.aurora.scheduler.events.PubsubEvent.DriverDisconnected;
 import org.apache.aurora.scheduler.filter.SchedulingFilter.Veto;
 import org.apache.aurora.scheduler.mesos.Driver;
@@ -65,8 +65,8 @@ public class OfferManagerImplTest extends EasyMockTest {
   private static final HostOffer OFFER_C = new HostOffer(
       Offers.makeOffer("OFFER_C", HOST_C),
       IHostAttributes.build(new HostAttributes().setMode(NONE)));
-  private static final GroupKey GROUP_KEY =
-      new GroupKey(ITaskConfig.build(new TaskConfig().setJob(new JobKey("role", "env", "name"))));
+  private static final TaskGroupKey GROUP_KEY = TaskGroupKey.from(
+      ITaskConfig.build(new TaskConfig().setJob(new JobKey("role", "env", "name"))));
 
   private Driver driver;
   private FakeScheduledExecutor clock;

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/f64b3f85/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerImplTest.java b/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerImplTest.java
index 4ee13c8..58733bd 100644
--- a/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerImplTest.java
@@ -36,10 +36,10 @@ import org.apache.aurora.gen.MaintenanceMode;
 import org.apache.aurora.gen.ScheduledTask;
 import org.apache.aurora.gen.TaskConfig;
 import org.apache.aurora.scheduler.HostOffer;
-import org.apache.aurora.scheduler.async.TaskGroups.GroupKey;
 import org.apache.aurora.scheduler.async.TaskScheduler.TaskSchedulerImpl;
 import org.apache.aurora.scheduler.async.preemptor.Preemptor;
 import org.apache.aurora.scheduler.base.Query;
+import org.apache.aurora.scheduler.base.TaskGroupKey;
 import org.apache.aurora.scheduler.base.Tasks;
 import org.apache.aurora.scheduler.events.EventSink;
 import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
@@ -82,8 +82,8 @@ public class TaskSchedulerImplTest extends EasyMockTest {
       Offers.makeOffer("OFFER_A", "HOST_A"),
       IHostAttributes.build(new HostAttributes().setMode(MaintenanceMode.NONE)));
 
-  private static final GroupKey GROUP_A = new GroupKey(TASK_A.getAssignedTask().getTask());
-  private static final GroupKey GROUP_B = new GroupKey(TASK_B.getAssignedTask().getTask());
+  private static final TaskGroupKey GROUP_A = TaskGroupKey.from(TASK_A.getAssignedTask().getTask());
+  private static final TaskGroupKey GROUP_B = TaskGroupKey.from(TASK_B.getAssignedTask().getTask());
 
   private StorageTestUtil storageUtil;
   private StateManager stateManager;
@@ -344,7 +344,7 @@ public class TaskSchedulerImplTest extends EasyMockTest {
 
   private static class AssignmentCapture {
     public Capture<Function<HostOffer, Assignment>> assigner = createCapture();
-    public Capture<GroupKey> groupKey = createCapture();
+    public Capture<TaskGroupKey> groupKey = createCapture();
   }
 
   private AssignmentCapture expectLaunchAttempt(boolean taskLaunched)
@@ -358,7 +358,7 @@ public class TaskSchedulerImplTest extends EasyMockTest {
 
   private void assignAndAssert(
       Result result,
-      GroupKey groupKey,
+      TaskGroupKey groupKey,
       HostOffer offer,
       AssignmentCapture capture) {
 

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/f64b3f85/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerTest.java b/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerTest.java
index 87bc531..891cc09 100644
--- a/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerTest.java
@@ -44,10 +44,10 @@ import org.apache.aurora.gen.TaskConfig;
 import org.apache.aurora.scheduler.HostOffer;
 import org.apache.aurora.scheduler.async.OfferManager.OfferManagerImpl;
 import org.apache.aurora.scheduler.async.OfferManager.OfferReturnDelay;
-import org.apache.aurora.scheduler.async.TaskGroups.GroupKey;
 import org.apache.aurora.scheduler.async.TaskScheduler.TaskSchedulerImpl;
 import org.apache.aurora.scheduler.async.preemptor.Preemptor;
 import org.apache.aurora.scheduler.base.Query;
+import org.apache.aurora.scheduler.base.TaskGroupKey;
 import org.apache.aurora.scheduler.base.Tasks;
 import org.apache.aurora.scheduler.events.PubsubEvent;
 import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
@@ -451,7 +451,7 @@ public class TaskSchedulerTest extends EasyMockTest {
     replayAndCreateScheduler();
 
     offerManager.addOffer(OFFER_A);
-    offerManager.launchFirst(offerAcceptor, new GroupKey(ITaskConfig.build(new TaskConfig())));
+    offerManager.launchFirst(offerAcceptor, TaskGroupKey.from(ITaskConfig.build(new TaskConfig())));
     offerExpirationCapture.getValue().run();
   }
 


Mime
View raw message