aurora-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ma...@apache.org
Subject aurora git commit: Adding TierManager initial implementation.
Date Tue, 18 Aug 2015 22:43:46 GMT
Repository: aurora
Updated Branches:
  refs/heads/master 9b8868fb7 -> f5025f3c6


Adding TierManager initial implementation.

Bugs closed: AURORA-1437

Reviewed at https://reviews.apache.org/r/37560/


Project: http://git-wip-us.apache.org/repos/asf/aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/f5025f3c
Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/f5025f3c
Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/f5025f3c

Branch: refs/heads/master
Commit: f5025f3c607d81333d4a498ad3bb8fe3417ba392
Parents: 9b8868f
Author: Maxim Khutornenko <maxim@apache.org>
Authored: Tue Aug 18 15:40:58 2015 -0700
Committer: Maxim Khutornenko <maxim@apache.org>
Committed: Tue Aug 18 15:40:58 2015 -0700

----------------------------------------------------------------------
 .../org/apache/aurora/scheduler/Resources.java  | 26 ++++++++-
 .../aurora/scheduler/SchedulerModule.java       |  4 ++
 .../org/apache/aurora/scheduler/TierInfo.java   | 61 ++++++++++++++++++++
 .../apache/aurora/scheduler/TierManager.java    | 41 +++++++++++++
 .../aurora/scheduler/state/TaskAssigner.java    | 14 ++++-
 .../apache/aurora/scheduler/ResourcesTest.java  | 38 +++++++++---
 .../aurora/scheduler/TierManagerTest.java       | 30 ++++++++++
 .../scheduler/state/TaskAssignerImplTest.java   | 13 ++++-
 8 files changed, 215 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/aurora/blob/f5025f3c/src/main/java/org/apache/aurora/scheduler/Resources.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/Resources.java b/src/main/java/org/apache/aurora/scheduler/Resources.java
index 40df262..3b4fbb6 100644
--- a/src/main/java/org/apache/aurora/scheduler/Resources.java
+++ b/src/main/java/org/apache/aurora/scheduler/Resources.java
@@ -17,8 +17,10 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Set;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Function;
 import com.google.common.base.Predicate;
+import com.google.common.base.Predicates;
 import com.google.common.collect.ContiguousSet;
 import com.google.common.collect.DiscreteDomain;
 import com.google.common.collect.ImmutableList;
@@ -48,7 +50,19 @@ public final class Resources {
   /**
    * CPU resource filter.
    */
-  public static final Predicate<Resource> CPU = e -> e.getName().equals(CPUS.getName());
+  private static final Predicate<Resource> CPU = e -> e.getName().equals(CPUS.getName());
+
+  /**
+   * Revocable resource filter.
+   */
+  @VisibleForTesting
+  static final Predicate<Resource> REVOCABLE =
+      Predicates.or(Predicates.not(CPU), Predicates.and(CPU, Resource::hasRevocable));
+
+  /**
+   * Non-revocable resource filter.
+   */
+  private static final Predicate<Resource> NON_REVOCABLE = Predicates.not(Resource::hasRevocable);
 
   private final Iterable<Resource> mesosResources;
 
@@ -77,6 +91,16 @@ public final class Resources {
   }
 
   /**
+   * Filters resources using the provided {@code tierInfo} instance.
+   *
+   * @param tierInfo Tier info.
+   * @return A new {@code Resources} object containing only filtered Mesos resources.
+   */
+  public Resources filter(TierInfo tierInfo) {
+    return filter(tierInfo.isRevocable() ? REVOCABLE : NON_REVOCABLE);
+  }
+
+  /**
    * Gets generalized aggregated resource view.
    *
    * @return {@code ResourceSlot} instance.

http://git-wip-us.apache.org/repos/asf/aurora/blob/f5025f3c/src/main/java/org/apache/aurora/scheduler/SchedulerModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/SchedulerModule.java b/src/main/java/org/apache/aurora/scheduler/SchedulerModule.java
index 45265ea..e8d53c7 100644
--- a/src/main/java/org/apache/aurora/scheduler/SchedulerModule.java
+++ b/src/main/java/org/apache/aurora/scheduler/SchedulerModule.java
@@ -31,6 +31,7 @@ import com.twitter.common.quantity.Time;
 
 import org.apache.aurora.scheduler.SchedulerLifecycle.LeadingOptions;
 import org.apache.aurora.scheduler.TaskIdGenerator.TaskIdGeneratorImpl;
+import org.apache.aurora.scheduler.TierManager.TierManagerImpl;
 import org.apache.aurora.scheduler.base.AsyncUtil;
 import org.apache.aurora.scheduler.events.PubsubEventModule;
 import org.apache.mesos.Protos;
@@ -92,6 +93,9 @@ public class SchedulerModule extends AbstractModule {
 
     bind(TaskStatusHandler.class).to(TaskStatusHandlerImpl.class);
     bind(TaskStatusHandlerImpl.class).in(Singleton.class);
+
+    bind(TierManager.class).to(TierManagerImpl.class);
+    bind(TierManagerImpl.class).in(Singleton.class);
     addSchedulerActiveServiceBinding(binder()).to(TaskStatusHandlerImpl.class);
   }
 }

http://git-wip-us.apache.org/repos/asf/aurora/blob/f5025f3c/src/main/java/org/apache/aurora/scheduler/TierInfo.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/TierInfo.java b/src/main/java/org/apache/aurora/scheduler/TierInfo.java
new file mode 100644
index 0000000..61bf30a
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/TierInfo.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler;
+
+import java.util.Objects;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * Defines common task tier traits and behaviors.
+ */
+public class TierInfo {
+  private final boolean revocable;
+
+  @VisibleForTesting
+  public TierInfo(boolean revocable) {
+    this.revocable = revocable;
+  }
+
+  /**
+   * Checks if this tier intends to run with Mesos revocable resource offers.
+   *
+   * @return {@code true} if this tier requires revocable resource offers, {@code false}
otherwise.
+   */
+  public boolean isRevocable() {
+    return revocable;
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hashCode(revocable);
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (!(obj instanceof TierInfo)) {
+      return false;
+    }
+
+    TierInfo other = (TierInfo) obj;
+    return Objects.equals(revocable, other.revocable);
+  }
+
+  @Override
+  public String toString() {
+    return com.google.common.base.Objects.toStringHelper(this)
+        .add("revocable", revocable)
+        .toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/f5025f3c/src/main/java/org/apache/aurora/scheduler/TierManager.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/TierManager.java b/src/main/java/org/apache/aurora/scheduler/TierManager.java
new file mode 100644
index 0000000..ebfad97
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/TierManager.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler;
+
+import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
+
+/**
+ * Translates job tier configuration into a set of task traits/attributes.
+ * TODO(maxim): Implement external configuration support defined here:
+ * https://docs.google.com/document/d/1gexe2uM_9gjsV62cMmX0VjH85Uokko21vEoENY2jjF0
+ */
+public interface TierManager {
+
+  /**
+   * Gets {@link TierInfo} instance representing task's tier details.
+   *
+   * @param taskConfig Task configuration to get tier for.
+   * @return {@link TierInfo} for the given {@code taskConfig}.
+   */
+  TierInfo getTier(ITaskConfig taskConfig);
+
+  class TierManagerImpl implements TierManager {
+
+    @Override
+    public TierInfo getTier(ITaskConfig taskConfig) {
+      // TODO(maxim): Implement when schema changes are defined.
+      return new TierInfo(false);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/f5025f3c/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java b/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java
index ca4b5b0..2ab110e 100644
--- a/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java
+++ b/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java
@@ -33,6 +33,8 @@ import com.twitter.common.stats.Stats;
 
 import org.apache.aurora.scheduler.HostOffer;
 import org.apache.aurora.scheduler.Resources;
+import org.apache.aurora.scheduler.TierInfo;
+import org.apache.aurora.scheduler.TierManager;
 import org.apache.aurora.scheduler.base.TaskGroupKey;
 import org.apache.aurora.scheduler.filter.SchedulingFilter;
 import org.apache.aurora.scheduler.filter.SchedulingFilter.ResourceRequest;
@@ -86,18 +88,21 @@ public interface TaskAssigner {
     private final SchedulingFilter filter;
     private final MesosTaskFactory taskFactory;
     private final OfferManager offerManager;
+    private final TierManager tierManager;
 
     @Inject
     public TaskAssignerImpl(
         StateManager stateManager,
         SchedulingFilter filter,
         MesosTaskFactory taskFactory,
-        OfferManager offerManager) {
+        OfferManager offerManager,
+        TierManager tierManager) {
 
       this.stateManager = requireNonNull(stateManager);
       this.filter = requireNonNull(filter);
       this.taskFactory = requireNonNull(taskFactory);
       this.offerManager = requireNonNull(offerManager);
+      this.tierManager = requireNonNull(tierManager);
     }
 
     private TaskInfo assign(
@@ -147,9 +152,14 @@ public interface TaskAssigner {
           // This slave is reserved for a different task group -> skip.
           continue;
         }
+
+        TierInfo tierInfo = tierManager.getTier(groupKey.getTask());
         Set<Veto> vetoes = filter.filter(
-            new UnusedResource(Resources.from(offer.getOffer()).slot(), offer.getAttributes()),
+            new UnusedResource(
+                Resources.from(offer.getOffer()).filter(tierInfo).slot(),
+                offer.getAttributes()),
             resourceRequest);
+
         if (vetoes.isEmpty()) {
           TaskInfo taskInfo = assign(
               storeProvider,

http://git-wip-us.apache.org/repos/asf/aurora/blob/f5025f3c/src/test/java/org/apache/aurora/scheduler/ResourcesTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/ResourcesTest.java b/src/test/java/org/apache/aurora/scheduler/ResourcesTest.java
index a5878a4..c48d096 100644
--- a/src/test/java/org/apache/aurora/scheduler/ResourcesTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/ResourcesTest.java
@@ -93,7 +93,7 @@ public class ResourcesTest {
   @Test
   public void testGetSlot() {
     ImmutableList<Resource> resources = ImmutableList.<Resource>builder()
-        .add(createCpuResource(8.0))
+        .add(createCpuResource(8.0, false))
         .add(createMemResource(1024, RAM_MB))
         .add(createMemResource(2048, DISK_MB))
         .add(createPortRange(Pair.of(1, 10)))
@@ -112,13 +112,30 @@ public class ResourcesTest {
   @Test
   public void testFilter() {
     ImmutableList<Resource> resources = ImmutableList.<Resource>builder()
-        .add(createCpuResource(8.0))
+        .add(createCpuResource(8.0, true))
         .add(createMemResource(1024, RAM_MB))
         .build();
 
     assertEquals(
-        Resources.from(createOffer(createCpuResource(8.0))).slot(),
-        Resources.from(createOffer(resources)).filter(Resources.CPU).slot());
+        new ResourceSlot(8.0, Amount.of(1024L, MB), Amount.of(0L, MB), 0),
+        Resources.from(createOffer(resources)).filter(Resources.REVOCABLE).slot());
+  }
+
+  @Test
+  public void testFilterByTier() {
+    ImmutableList<Resource> resources = ImmutableList.<Resource>builder()
+        .add(createCpuResource(8.0, true))
+        .add(createCpuResource(8.0, false))
+        .add(createMemResource(1024, RAM_MB))
+        .build();
+
+    assertEquals(
+        new ResourceSlot(8.0, Amount.of(1024L, MB), Amount.of(0L, MB), 0),
+        Resources.from(createOffer(resources)).filter(new TierInfo(true)).slot());
+
+    assertEquals(
+        new ResourceSlot(8.0, Amount.of(1024L, MB), Amount.of(0L, MB), 0),
+        Resources.from(createOffer(resources)).filter(new TierInfo(false)).slot());
   }
 
   private Resource createPortRange(Pair<Integer, Integer> range) {
@@ -143,12 +160,17 @@ public class ResourcesTest {
         .build();
   }
 
-  private static Resource createCpuResource(double cpus) {
-    return Resource.newBuilder()
+  private static Resource createCpuResource(double cpus, boolean revocable) {
+    Protos.Resource.Builder builder = Resource.newBuilder()
         .setName(CPUS.getName())
         .setType(SCALAR)
-        .setScalar(Protos.Value.Scalar.newBuilder().setValue(cpus))
-        .build();
+        .setScalar(Protos.Value.Scalar.newBuilder().setValue(cpus));
+
+    if (revocable) {
+      builder.setRevocable(Resource.RevocableInfo.newBuilder().build());
+    }
+
+    return builder.build();
   }
 
   private static Resource createMemResource(long mem, ResourceType resourceType) {

http://git-wip-us.apache.org/repos/asf/aurora/blob/f5025f3c/src/test/java/org/apache/aurora/scheduler/TierManagerTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/TierManagerTest.java b/src/test/java/org/apache/aurora/scheduler/TierManagerTest.java
new file mode 100644
index 0000000..37e19ac
--- /dev/null
+++ b/src/test/java/org/apache/aurora/scheduler/TierManagerTest.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler;
+
+import org.apache.aurora.gen.TaskConfig;
+import org.apache.aurora.scheduler.TierManager.TierManagerImpl;
+import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class TierManagerTest {
+
+  @Test
+  public void testIsRevocable() {
+    TierInfo expected = new TierInfo(false);
+    assertEquals(expected, new TierManagerImpl().getTier(ITaskConfig.build(new TaskConfig())));
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/f5025f3c/src/test/java/org/apache/aurora/scheduler/state/TaskAssignerImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/state/TaskAssignerImplTest.java b/src/test/java/org/apache/aurora/scheduler/state/TaskAssignerImplTest.java
index 88958d1..33054a6 100644
--- a/src/test/java/org/apache/aurora/scheduler/state/TaskAssignerImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/state/TaskAssignerImplTest.java
@@ -28,6 +28,8 @@ import org.apache.aurora.gen.ScheduledTask;
 import org.apache.aurora.gen.TaskConfig;
 import org.apache.aurora.scheduler.HostOffer;
 import org.apache.aurora.scheduler.Resources;
+import org.apache.aurora.scheduler.TierInfo;
+import org.apache.aurora.scheduler.TierManager;
 import org.apache.aurora.scheduler.base.TaskGroupKey;
 import org.apache.aurora.scheduler.base.Tasks;
 import org.apache.aurora.scheduler.filter.SchedulingFilter;
@@ -100,6 +102,7 @@ public class TaskAssignerImplTest extends EasyMockTest {
       new UnusedResource(Resources.from(MESOS_OFFER).slot(), OFFER.getAttributes());
   private static final ResourceRequest RESOURCE_REQUEST =
       new ResourceRequest(TASK.getAssignedTask().getTask(), EMPTY);
+  private static final TierInfo DEFAULT_TIER = new TierInfo(false);
 
   private MutableStoreProvider storeProvider;
   private StateManager stateManager;
@@ -107,6 +110,7 @@ public class TaskAssignerImplTest extends EasyMockTest {
   private MesosTaskFactory taskFactory;
   private OfferManager offerManager;
   private TaskAssigner assigner;
+  private TierManager tierManager;
 
   @Before
   public void setUp() throws Exception {
@@ -115,13 +119,15 @@ public class TaskAssignerImplTest extends EasyMockTest {
     taskFactory = createMock(MesosTaskFactory.class);
     stateManager = createMock(StateManager.class);
     offerManager = createMock(OfferManager.class);
-    assigner = new TaskAssignerImpl(stateManager, filter, taskFactory, offerManager);
+    tierManager = createMock(TierManager.class);
+    assigner = new TaskAssignerImpl(stateManager, filter, taskFactory, offerManager, tierManager);
   }
 
   @Test
   public void testAssignNoVetoes() throws Exception {
     expect(offerManager.getOffers(GROUP_KEY)).andReturn(ImmutableSet.of(OFFER));
     offerManager.launchTask(MESOS_OFFER.getId(), TASK_INFO);
+    expect(tierManager.getTier(TASK.getAssignedTask().getTask())).andReturn(DEFAULT_TIER);
     expect(filter.filter(UNUSED, RESOURCE_REQUEST)).andReturn(ImmutableSet.of());
     expect(stateManager.assignTask(
         storeProvider,
@@ -147,6 +153,7 @@ public class TaskAssignerImplTest extends EasyMockTest {
   public void testAssignVetoesWithStaticBan() throws Exception {
     expect(offerManager.getOffers(GROUP_KEY)).andReturn(ImmutableSet.of(OFFER));
     offerManager.banOffer(MESOS_OFFER.getId(), GROUP_KEY);
+    expect(tierManager.getTier(TASK.getAssignedTask().getTask())).andReturn(DEFAULT_TIER);
     expect(filter.filter(UNUSED, RESOURCE_REQUEST))
         .andReturn(ImmutableSet.of(Veto.constraintMismatch("denied")));
 
@@ -163,6 +170,7 @@ public class TaskAssignerImplTest extends EasyMockTest {
   @Test
   public void testAssignVetoesWithNoStaticBan() throws Exception {
     expect(offerManager.getOffers(GROUP_KEY)).andReturn(ImmutableSet.of(OFFER));
+    expect(tierManager.getTier(TASK.getAssignedTask().getTask())).andReturn(DEFAULT_TIER);
     expect(filter.filter(UNUSED, RESOURCE_REQUEST))
         .andReturn(ImmutableSet.of(Veto.unsatisfiedLimit("limit")));
 
@@ -181,6 +189,7 @@ public class TaskAssignerImplTest extends EasyMockTest {
     expect(offerManager.getOffers(GROUP_KEY)).andReturn(ImmutableSet.of(OFFER));
     offerManager.launchTask(MESOS_OFFER.getId(), TASK_INFO);
     expectLastCall().andThrow(new OfferManager.LaunchException("expected"));
+    expect(tierManager.getTier(TASK.getAssignedTask().getTask())).andReturn(DEFAULT_TIER);
     expect(filter.filter(UNUSED, RESOURCE_REQUEST)).andReturn(ImmutableSet.of());
     expect(stateManager.assignTask(
         storeProvider,
@@ -244,6 +253,7 @@ public class TaskAssignerImplTest extends EasyMockTest {
         IHostAttributes.build(new HostAttributes()));
 
     expect(offerManager.getOffers(GROUP_KEY)).andReturn(ImmutableSet.of(offer, OFFER));
+    expect(tierManager.getTier(TASK.getAssignedTask().getTask())).andReturn(DEFAULT_TIER);
     expect(filter.filter(UNUSED, RESOURCE_REQUEST)).andReturn(ImmutableSet.of());
     expect(stateManager.assignTask(
         storeProvider,
@@ -284,6 +294,7 @@ public class TaskAssignerImplTest extends EasyMockTest {
         IHostAttributes.build(new HostAttributes()));
 
     expect(offerManager.getOffers(GROUP_KEY)).andReturn(ImmutableSet.of(mismatched, OFFER));
+    expect(tierManager.getTier(TASK.getAssignedTask().getTask())).andReturn(DEFAULT_TIER).times(2);
     expect(filter.filter(
         new UnusedResource(
             Resources.from(mismatched.getOffer()).slot(),


Mime
View raw message