aurora-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From wfar...@apache.org
Subject git commit: Handle resourceOffers callback asynchronsly.
Date Thu, 16 Oct 2014 21:45:45 GMT
Repository: incubator-aurora
Updated Branches:
  refs/heads/master 3c3b04fd6 -> 6fea534df


Handle resourceOffers callback asynchronsly.

Bugs closed: AURORA-848

Reviewed at https://reviews.apache.org/r/26821/


Project: http://git-wip-us.apache.org/repos/asf/incubator-aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-aurora/commit/6fea534d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-aurora/tree/6fea534d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-aurora/diff/6fea534d

Branch: refs/heads/master
Commit: 6fea534df7bc476fe0b115f35232a352ca4e1ff5
Parents: 3c3b04f
Author: Zameer Manji <zmanji@twopensource.com>
Authored: Thu Oct 16 14:38:14 2014 -0700
Committer: Bill Farner <wfarner@twitter.com>
Committed: Thu Oct 16 14:38:14 2014 -0700

----------------------------------------------------------------------
 .../aurora/scheduler/MesosSchedulerImpl.java    | 67 ++++++++++++++------
 .../aurora/scheduler/SchedulerModule.java       |  5 ++
 .../scheduler/MesosSchedulerImplTest.java       |  4 ++
 3 files changed, 55 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/6fea534d/src/main/java/org/apache/aurora/scheduler/MesosSchedulerImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/MesosSchedulerImpl.java b/src/main/java/org/apache/aurora/scheduler/MesosSchedulerImpl.java
index ed3c8a7..c424ecd 100644
--- a/src/main/java/org/apache/aurora/scheduler/MesosSchedulerImpl.java
+++ b/src/main/java/org/apache/aurora/scheduler/MesosSchedulerImpl.java
@@ -13,12 +13,16 @@
  */
 package org.apache.aurora.scheduler;
 
+import java.lang.annotation.Retention;
+import java.lang.annotation.Target;
 import java.util.List;
+import java.util.concurrent.Executor;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
 import javax.inject.Inject;
+import javax.inject.Qualifier;
 
 import com.google.common.base.Preconditions;
 import com.twitter.common.application.Lifecycle;
@@ -44,6 +48,10 @@ import org.apache.mesos.Protos.TaskStatus;
 import org.apache.mesos.Scheduler;
 import org.apache.mesos.SchedulerDriver;
 
+import static java.lang.annotation.ElementType.FIELD;
+import static java.lang.annotation.ElementType.METHOD;
+import static java.lang.annotation.ElementType.PARAMETER;
+import static java.lang.annotation.RetentionPolicy.RUNTIME;
 import static java.util.Objects.requireNonNull;
 
 /**
@@ -65,9 +73,17 @@ class MesosSchedulerImpl implements Scheduler {
   private final Storage storage;
   private final Lifecycle lifecycle;
   private final EventSink eventSink;
+  private final Executor executor;
   private volatile boolean isRegistered = false;
 
   /**
+   * Binding annotation for the executor the incoming Mesos message handler uses.
+   */
+  @Qualifier
+  @Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME)
+  @interface SchedulerExecutor { }
+
+  /**
    * Creates a new scheduler.
    *
    * @param storage Store to save host attributes into.
@@ -78,18 +94,21 @@ class MesosSchedulerImpl implements Scheduler {
    *                      launchers, ceasing after the first match (based on a return value
of
    *                      {@code true}.
    * @param eventSink Pubsub sink to send driver status changes to.
+   * @param executor Executor for async work
    */
   @Inject
   public MesosSchedulerImpl(
       Storage storage,
       final Lifecycle lifecycle,
       List<TaskLauncher> taskLaunchers,
-      EventSink eventSink) {
+      EventSink eventSink,
+      @SchedulerExecutor Executor executor) {
 
     this.storage = requireNonNull(storage);
     this.lifecycle = requireNonNull(lifecycle);
     this.taskLaunchers = requireNonNull(taskLaunchers);
     this.eventSink = requireNonNull(eventSink);
+    this.executor = requireNonNull(executor);
   }
 
   @Override
@@ -137,29 +156,35 @@ class MesosSchedulerImpl implements Scheduler {
     // securing the storage lock between saves.  We also save the host attributes before
passing
     // offers elsewhere to ensure that host attributes are available before attempting to
     // schedule tasks associated with offers.
-    // TODO(wfarner): Reconsider the requirements here, we might be able to save host offers
-    //                asynchronously and augment the task scheduler to skip over offers when
the
-    //                host attributes cannot be found. (AURORA-116)
-    storage.write(new MutateWork.NoResult.Quiet() {
+    // TODO(wfarner): Reconsider the requirements here, augment the task scheduler to skip
over
+    //                offers when the host attributes cannot be found. (AURORA-137)
+
+    executor.execute(new Runnable() {
       @Override
-      protected void execute(MutableStoreProvider storeProvider) {
-        for (final Offer offer : offers) {
-          storeProvider.getAttributeStore().saveHostAttributes(Conversions.getAttributes(offer));
+      public void run() {
+        storage.write(new MutateWork.NoResult.Quiet() {
+          @Override
+          protected void execute(MutableStoreProvider storeProvider) {
+            for (final Offer offer : offers) {
+              storeProvider.getAttributeStore()
+                  .saveHostAttributes(Conversions.getAttributes(offer));
+            }
+          }
+        });
+
+        for (Offer offer : offers) {
+          if (LOG.isLoggable(Level.FINE)) {
+            LOG.log(Level.FINE, String.format("Received offer: %s", offer));
+          }
+          totalResourceOffers.incrementAndGet();
+          for (TaskLauncher launcher : taskLaunchers) {
+            if (launcher.willUse(offer)) {
+              break;
+            }
+          }
         }
       }
     });
-
-    for (Offer offer : offers) {
-      if (LOG.isLoggable(Level.FINE)) {
-        LOG.log(Level.FINE, String.format("Received offer: %s", offer));
-      }
-      totalResourceOffers.incrementAndGet();
-      for (TaskLauncher launcher : taskLaunchers) {
-        if (launcher.willUse(offer)) {
-          break;
-        }
-      }
-    }
   }
 
   @Override
@@ -218,7 +243,7 @@ class MesosSchedulerImpl implements Scheduler {
   @Override
   public void frameworkMessage(
       SchedulerDriver driver,
-      ExecutorID executor,
+      ExecutorID executorID,
       SlaveID slave,
       byte[] data) {
 

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/6fea534d/src/main/java/org/apache/aurora/scheduler/SchedulerModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/SchedulerModule.java b/src/main/java/org/apache/aurora/scheduler/SchedulerModule.java
index 31c8650..09c8bc9 100644
--- a/src/main/java/org/apache/aurora/scheduler/SchedulerModule.java
+++ b/src/main/java/org/apache/aurora/scheduler/SchedulerModule.java
@@ -14,6 +14,7 @@
 package org.apache.aurora.scheduler;
 
 import java.util.List;
+import java.util.concurrent.Executor;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.logging.Logger;
 
@@ -67,6 +68,10 @@ public class SchedulerModule extends AbstractModule {
 
     bind(UserTaskLauncher.class).in(Singleton.class);
 
+    // TODO(zmanji): Create singleThreadedExecutor (non-scheduled) variant.
+    bind(Executor.class).annotatedWith(MesosSchedulerImpl.SchedulerExecutor.class)
+        .toInstance(AsyncUtil.singleThreadLoggingScheduledExecutor("SchedulerImpl-%d", LOG));
+
     install(new PrivateModule() {
       @Override
       protected void configure() {

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/6fea534d/src/test/java/org/apache/aurora/scheduler/MesosSchedulerImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/MesosSchedulerImplTest.java b/src/test/java/org/apache/aurora/scheduler/MesosSchedulerImplTest.java
index e3af344..8dd908e 100644
--- a/src/test/java/org/apache/aurora/scheduler/MesosSchedulerImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/MesosSchedulerImplTest.java
@@ -17,10 +17,12 @@ import java.lang.Thread.UncaughtExceptionHandler;
 import java.nio.charset.StandardCharsets;
 import java.util.Arrays;
 import java.util.List;
+import java.util.concurrent.Executor;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import com.google.common.collect.ImmutableList;
 import com.google.common.testing.TearDown;
+import com.google.common.util.concurrent.MoreExecutors;
 import com.google.inject.AbstractModule;
 import com.google.inject.Guice;
 import com.google.inject.Injector;
@@ -110,6 +112,8 @@ public class MesosSchedulerImplTest extends EasyMockTest {
         bind(new TypeLiteral<List<TaskLauncher>>() { })
             .toInstance(Arrays.asList(systemLauncher, userLauncher));
         bind(EventSink.class).toInstance(eventSink);
+        bind(Executor.class).annotatedWith(MesosSchedulerImpl.SchedulerExecutor.class)
+            .toInstance(MoreExecutors.sameThreadExecutor());
       }
     });
     scheduler = injector.getInstance(MesosSchedulerImpl.class);


Mime
View raw message