aurora-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ma...@apache.org
Subject git commit: Adding min retention interval for task history.
Date Fri, 21 Mar 2014 19:11:12 GMT
Repository: incubator-aurora
Updated Branches:
  refs/heads/master 08bb7b303 -> 79a6043ef


Adding min retention interval for task history.

Bugs closed: AURORA-263

Reviewed at https://reviews.apache.org/r/19243/


Project: http://git-wip-us.apache.org/repos/asf/incubator-aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-aurora/commit/79a6043e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-aurora/tree/79a6043e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-aurora/diff/79a6043e

Branch: refs/heads/master
Commit: 79a6043ef0feb6883e26b83f8946db34ea878d2e
Parents: 08bb7b3
Author: Maxim Khutornenko <maxim@apache.org>
Authored: Fri Mar 21 12:10:36 2014 -0700
Committer: Maxim Khutornenko <maxim@apache.org>
Committed: Fri Mar 21 12:10:36 2014 -0700

----------------------------------------------------------------------
 .../aurora/scheduler/async/AsyncModule.java     | 19 +++++--
 .../aurora/scheduler/async/HistoryPruner.java   | 55 ++++++++++++--------
 .../scheduler/async/HistoryPrunerTest.java      | 30 +++++++----
 3 files changed, 68 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/79a6043e/src/main/java/org/apache/aurora/scheduler/async/AsyncModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/AsyncModule.java b/src/main/java/org/apache/aurora/scheduler/async/AsyncModule.java
index be67e7d..535acd2 100644
--- a/src/main/java/org/apache/aurora/scheduler/async/AsyncModule.java
+++ b/src/main/java/org/apache/aurora/scheduler/async/AsyncModule.java
@@ -47,6 +47,7 @@ import com.twitter.common.util.TruncatedBinaryBackoff;
 
 import org.apache.aurora.scheduler.async.GcExecutorLauncher.GcExecutorSettings;
 import org.apache.aurora.scheduler.async.GcExecutorLauncher.RandomGcExecutorSettings;
+import org.apache.aurora.scheduler.async.HistoryPruner.HistoryPrunnerSettings;
 import org.apache.aurora.scheduler.async.OfferQueue.OfferQueueImpl;
 import org.apache.aurora.scheduler.async.OfferQueue.OfferReturnDelay;
 import org.apache.aurora.scheduler.async.RescheduleCalculator.RescheduleCalculatorImpl;
@@ -60,7 +61,6 @@ import static java.lang.annotation.ElementType.METHOD;
 import static java.lang.annotation.ElementType.PARAMETER;
 import static java.lang.annotation.RetentionPolicy.RUNTIME;
 
-import static org.apache.aurora.scheduler.async.HistoryPruner.PruneThreshold;
 import static org.apache.aurora.scheduler.async.Preemptor.PreemptorImpl;
 import static org.apache.aurora.scheduler.async.Preemptor.PreemptorImpl.PreemptionDelay;
 import static org.apache.aurora.scheduler.async.TaskScheduler.TaskSchedulerImpl.ReservationDuration;
@@ -102,6 +102,15 @@ public class AsyncModule extends AbstractModule {
   private static final Arg<Amount<Long, Time>> HISTORY_PRUNE_THRESHOLD =
       Arg.create(Amount.of(2L, Time.DAYS));
 
+  @CmdLine(name = "history_max_per_job_threshold",
+      help = "Maximum number of terminated tasks to retain in a job history.")
+  private static final Arg<Integer> HISTORY_MAX_PER_JOB_THRESHOLD = Arg.create(100);
+
+  @CmdLine(name = "history_min_retention_threshold",
+      help = "Minimum guaranteed time for task history retention before any pruning is attempted.")
+  private static final Arg<Amount<Long, Time>> HISTORY_MIN_RETENTION_THRESHOLD
=
+      Arg.create(Amount.of(1L, Time.HOURS));
+
   @CmdLine(name = "max_schedule_attempts_per_sec",
       help = "Maximum number of scheduling attempts to make per second.")
   private static final Arg<Double> MAX_SCHEDULE_ATTEMPTS_PER_SEC = Arg.create(20D);
@@ -245,9 +254,11 @@ public class AsyncModule extends AbstractModule {
       protected void configure() {
         // TODO(ksweeney): Create a configuration validator module so this can be injected.
         // TODO(William Farner): Revert this once large task counts is cheap ala hierarchichal
store
-        bind(Integer.class).annotatedWith(PruneThreshold.class).toInstance(100);
-        bind(new TypeLiteral<Amount<Long, Time>>() { }).annotatedWith(PruneThreshold.class)
-            .toInstance(HISTORY_PRUNE_THRESHOLD.get());
+        bind(HistoryPrunnerSettings.class).toInstance(new HistoryPrunnerSettings(
+            HISTORY_PRUNE_THRESHOLD.get(),
+            HISTORY_MIN_RETENTION_THRESHOLD.get(),
+            HISTORY_MAX_PER_JOB_THRESHOLD.get()
+        ));
         bind(ScheduledExecutorService.class).toInstance(executor);
 
         bind(HistoryPruner.class).in(Singleton.class);

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/79a6043e/src/main/java/org/apache/aurora/scheduler/async/HistoryPruner.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/HistoryPruner.java b/src/main/java/org/apache/aurora/scheduler/async/HistoryPruner.java
index 5bf9838..59f615c 100644
--- a/src/main/java/org/apache/aurora/scheduler/async/HistoryPruner.java
+++ b/src/main/java/org/apache/aurora/scheduler/async/HistoryPruner.java
@@ -15,8 +15,6 @@
  */
 package org.apache.aurora.scheduler.async;
 
-import java.lang.annotation.Retention;
-import java.lang.annotation.Target;
 import java.util.Collection;
 import java.util.Set;
 import java.util.concurrent.ScheduledExecutorService;
@@ -26,11 +24,11 @@ import java.util.logging.Logger;
 import javax.inject.Inject;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Predicate;
 import com.google.common.collect.FluentIterable;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
 import com.google.common.eventbus.Subscribe;
-import com.google.inject.BindingAnnotation;
 import com.twitter.common.quantity.Amount;
 import com.twitter.common.quantity.Time;
 import com.twitter.common.util.Clock;
@@ -43,11 +41,6 @@ import org.apache.aurora.scheduler.storage.Storage;
 import org.apache.aurora.scheduler.storage.entities.IJobKey;
 import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
 
-import static java.lang.annotation.ElementType.FIELD;
-import static java.lang.annotation.ElementType.METHOD;
-import static java.lang.annotation.ElementType.PARAMETER;
-import static java.lang.annotation.RetentionPolicy.RUNTIME;
-
 import static com.google.common.base.Preconditions.checkNotNull;
 
 import static org.apache.aurora.scheduler.events.PubsubEvent.EventSubscriber;
@@ -63,34 +56,53 @@ public class HistoryPruner implements EventSubscriber {
   private final ScheduledExecutorService executor;
   private final StateManager stateManager;
   private final Clock clock;
-  private final long pruneThresholdMillis;
-  private final int perJobHistoryGoal;
+  private final HistoryPrunnerSettings settings;
   private final Storage storage;
 
-  @BindingAnnotation
-  @Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME)
-  public @interface PruneThreshold { }
+  private final Predicate<IScheduledTask> safeToDelete = new Predicate<IScheduledTask>()
{
+    @Override
+    public boolean apply(IScheduledTask task) {
+      return Tasks.getLatestEvent(task).getTimestamp()
+          <= clock.nowMillis() - settings.minRetentionThresholdMillis;
+    }
+  };
+
+  static class HistoryPrunnerSettings {
+    private final long pruneThresholdMillis;
+    private final long minRetentionThresholdMillis;
+    private final int perJobHistoryGoal;
+
+    HistoryPrunnerSettings(
+        Amount<Long, Time> inactivePruneThreshold,
+        Amount<Long, Time> minRetentionThreshold,
+        int perJobHistoryGoal) {
+
+      this.pruneThresholdMillis = inactivePruneThreshold.as(Time.MILLISECONDS);
+      this.minRetentionThresholdMillis = minRetentionThreshold.as(Time.MILLISECONDS);
+      this.perJobHistoryGoal = perJobHistoryGoal;
+    }
+  }
 
   @Inject
   HistoryPruner(
       final ScheduledExecutorService executor,
       final StateManager stateManager,
       final Clock clock,
-      @PruneThreshold Amount<Long, Time> inactivePruneThreshold,
-      @PruneThreshold int perJobHistoryGoal,
-      Storage storage) {
+      final HistoryPrunnerSettings settings,
+      final Storage storage) {
 
     this.executor = checkNotNull(executor);
     this.stateManager = checkNotNull(stateManager);
     this.clock = checkNotNull(clock);
-    this.pruneThresholdMillis = inactivePruneThreshold.as(Time.MILLISECONDS);
-    this.perJobHistoryGoal = perJobHistoryGoal;
+    this.settings = checkNotNull(settings);
     this.storage = checkNotNull(storage);
   }
 
   @VisibleForTesting
   long calculateTimeout(long taskEventTimestampMillis) {
-    return pruneThresholdMillis - Math.max(0, clock.nowMillis() - taskEventTimestampMillis);
+    return Math.max(
+        settings.minRetentionThresholdMillis,
+        settings.pruneThresholdMillis - Math.max(0, clock.nowMillis() - taskEventTimestampMillis));
   }
 
   /**
@@ -143,11 +155,12 @@ public class HistoryPruner implements EventSubscriber {
       public void run() {
         Collection<IScheduledTask> inactiveTasks =
             Storage.Util.weaklyConsistentFetchTasks(storage, jobHistoryQuery(jobKey));
-        int tasksToPrune = inactiveTasks.size() - perJobHistoryGoal;
+        int tasksToPrune = inactiveTasks.size() - settings.perJobHistoryGoal;
         if (tasksToPrune > 0) {
-          if (inactiveTasks.size() > perJobHistoryGoal) {
+          if (inactiveTasks.size() > settings.perJobHistoryGoal) {
             Set<String> toPrune = FluentIterable
                 .from(Tasks.LATEST_ACTIVITY.sortedCopy(inactiveTasks))
+                .filter(safeToDelete)
                 .limit(tasksToPrune)
                 .transform(Tasks.SCHEDULED_TO_ID)
                 .toSet();

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/79a6043e/src/test/java/org/apache/aurora/scheduler/async/HistoryPrunerTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/async/HistoryPrunerTest.java b/src/test/java/org/apache/aurora/scheduler/async/HistoryPrunerTest.java
index f7c9e5e..378ece9 100644
--- a/src/test/java/org/apache/aurora/scheduler/async/HistoryPrunerTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/async/HistoryPrunerTest.java
@@ -40,6 +40,7 @@ import org.apache.aurora.gen.ScheduleStatus;
 import org.apache.aurora.gen.ScheduledTask;
 import org.apache.aurora.gen.TaskConfig;
 import org.apache.aurora.gen.TaskEvent;
+import org.apache.aurora.scheduler.async.HistoryPruner.HistoryPrunnerSettings;
 import org.apache.aurora.scheduler.base.Tasks;
 import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
 import org.apache.aurora.scheduler.state.StateManager;
@@ -67,6 +68,7 @@ public class HistoryPrunerTest extends EasyMockTest {
   private static final String TASK_ID = "task_id";
   private static final String SLAVE_HOST = "HOST_A";
   private static final Amount<Long, Time> ONE_MS = Amount.of(1L, Time.MILLISECONDS);
+  private static final Amount<Long, Time> ONE_MINUTE = Amount.of(1L, Time.MINUTES);
   private static final Amount<Long, Time> ONE_DAY = Amount.of(1L, Time.DAYS);
   private static final Amount<Long, Time> ONE_HOUR = Amount.of(1L, Time.HOURS);
   private static final int PER_JOB_HISTORY = 2;
@@ -90,8 +92,7 @@ public class HistoryPrunerTest extends EasyMockTest {
         executor,
         stateManager,
         clock,
-        ONE_DAY,
-        PER_JOB_HISTORY,
+        new HistoryPrunnerSettings(ONE_DAY, ONE_MINUTE, PER_JOB_HISTORY),
         storageUtil.storage);
   }
 
@@ -120,15 +121,15 @@ public class HistoryPrunerTest extends EasyMockTest {
     long taskATimestamp = clock.nowMillis();
     IScheduledTask a = makeTask("a", FINISHED);
 
-    clock.advance(ONE_MS);
+    clock.advance(ONE_MINUTE);
     long taskBTimestamp = clock.nowMillis();
     IScheduledTask b = makeTask("b", LOST);
 
-    clock.advance(ONE_MS);
+    clock.advance(ONE_MINUTE);
     long taskCTimestamp = clock.nowMillis();
     IScheduledTask c = makeTask("c", FINISHED);
 
-    clock.advance(ONE_MS);
+    clock.advance(ONE_MINUTE);
     IScheduledTask d = makeTask("d", FINISHED);
     IScheduledTask e = makeTask("job-x", "e", FINISHED);
 
@@ -190,22 +191,29 @@ public class HistoryPrunerTest extends EasyMockTest {
   @Test
   public void testJobHistoryExceeded() {
     IScheduledTask a = makeTask("a", RUNNING);
-    clock.advance(ONE_HOUR);
+    clock.advance(ONE_MS);
     IScheduledTask aKilled = copy(a, KILLED);
 
     IScheduledTask b = makeTask("b", RUNNING);
-    clock.advance(ONE_HOUR);
+    clock.advance(ONE_MS);
     IScheduledTask bKilled = copy(b, KILLED);
 
     IScheduledTask c = makeTask("c", RUNNING);
-    clock.advance(ONE_HOUR);
+    clock.advance(ONE_MS);
     IScheduledTask cLost = copy(c, LOST);
 
+    IScheduledTask d = makeTask("d", RUNNING);
+    clock.advance(ONE_MS);
+    IScheduledTask dLost = copy(d, LOST);
+
     expectNoImmediatePrune(ImmutableSet.of(a));
     expectDefaultDelayedPrune();
     expectNoImmediatePrune(ImmutableSet.of(a, b));
     expectDefaultDelayedPrune();
-    expectImmediatePrune(ImmutableSet.of(a, b, c), a);
+    expectNoImmediatePrune(ImmutableSet.of(a, b)); // no pruning yet due to min threshold
+    expectDefaultDelayedPrune();
+    clock.advance(ONE_HOUR);
+    expectImmediatePrune(ImmutableSet.of(a, b, c, d), a, b); // now prune 2 tasks
     expectDefaultDelayedPrune();
 
     control.replay();
@@ -213,6 +221,7 @@ public class HistoryPrunerTest extends EasyMockTest {
     changeState(a, aKilled);
     changeState(b, bKilled);
     changeState(c, cLost);
+    changeState(d, dLost);
   }
 
   // TODO(William Farner): Consider removing the thread safety tests.  Now that intrinsic
locks
@@ -253,8 +262,7 @@ public class HistoryPrunerTest extends EasyMockTest {
         realExecutor,
         stateManager,
         clock,
-        Amount.of(1L, Time.MILLISECONDS),
-        PER_JOB_HISTORY,
+        new HistoryPrunnerSettings(Amount.of(1L, Time.MILLISECONDS), ONE_MS, PER_JOB_HISTORY),
         storageUtil.storage);
   }
 


Mime
View raw message