aurora-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ma...@apache.org
Subject aurora git commit: Implementing task reconciler.
Date Fri, 22 May 2015 21:25:49 GMT
Repository: aurora
Updated Branches:
  refs/heads/master 998993dd8 -> 6db13baf2


Implementing task reconciler.

Bugs closed: AURORA-1047

Reviewed at https://reviews.apache.org/r/34440/


Project: http://git-wip-us.apache.org/repos/asf/aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/6db13baf
Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/6db13baf
Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/6db13baf

Branch: refs/heads/master
Commit: 6db13baf284182a8d60d154a4146fad388f8ce5b
Parents: 998993d
Author: Maxim Khutornenko <maxim@apache.org>
Authored: Fri May 22 14:25:19 2015 -0700
Committer: Maxim Khutornenko <maxim@apache.org>
Committed: Fri May 22 14:25:19 2015 -0700

----------------------------------------------------------------------
 .../aurora/benchmark/fakes/FakeDriver.java      |   7 +
 .../aurora/scheduler/async/AsyncModule.java     |  42 ++++++
 .../aurora/scheduler/async/TaskReconciler.java  | 145 +++++++++++++++++++
 .../apache/aurora/scheduler/mesos/Driver.java   |   9 ++
 .../scheduler/mesos/SchedulerDriverService.java |  19 ++-
 .../async/JobUpdateHistoryPrunerTest.java       |   2 +-
 .../scheduler/async/TaskReconcilerTest.java     | 110 ++++++++++++++
 .../testing/FakeScheduledExecutor.java          |  14 +-
 8 files changed, 341 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/aurora/blob/6db13baf/src/jmh/java/org/apache/aurora/benchmark/fakes/FakeDriver.java
----------------------------------------------------------------------
diff --git a/src/jmh/java/org/apache/aurora/benchmark/fakes/FakeDriver.java b/src/jmh/java/org/apache/aurora/benchmark/fakes/FakeDriver.java
index 316ab1c..d1bb8f2 100644
--- a/src/jmh/java/org/apache/aurora/benchmark/fakes/FakeDriver.java
+++ b/src/jmh/java/org/apache/aurora/benchmark/fakes/FakeDriver.java
@@ -13,6 +13,8 @@
  */
 package org.apache.aurora.benchmark.fakes;
 
+import java.util.Collection;
+
 import com.google.common.util.concurrent.AbstractIdleService;
 
 import org.apache.aurora.scheduler.mesos.Driver;
@@ -58,4 +60,9 @@ public class FakeDriver extends AbstractIdleService implements Driver {
   protected void shutDown() throws Exception {
     // no-op
   }
+
+  @Override
+  public void reconcileTasks(Collection<Protos.TaskStatus> statuses) {
+    // no-op
+  }
 }

http://git-wip-us.apache.org/repos/asf/aurora/blob/6db13baf/src/main/java/org/apache/aurora/scheduler/async/AsyncModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/AsyncModule.java b/src/main/java/org/apache/aurora/scheduler/async/AsyncModule.java
index e9d47fd..5f24668 100644
--- a/src/main/java/org/apache/aurora/scheduler/async/AsyncModule.java
+++ b/src/main/java/org/apache/aurora/scheduler/async/AsyncModule.java
@@ -51,6 +51,7 @@ import org.apache.aurora.scheduler.async.OfferManager.OfferReturnDelay;
 import org.apache.aurora.scheduler.async.RescheduleCalculator.RescheduleCalculatorImpl;
 import org.apache.aurora.scheduler.async.TaskGroups.TaskGroupsSettings;
 import org.apache.aurora.scheduler.async.TaskHistoryPruner.HistoryPrunnerSettings;
+import org.apache.aurora.scheduler.async.TaskReconciler.TaskReconcilerSettings;
 import org.apache.aurora.scheduler.async.TaskScheduler.TaskSchedulerImpl;
 import org.apache.aurora.scheduler.async.preemptor.BiCache;
 import org.apache.aurora.scheduler.async.preemptor.BiCache.BiCacheSettings;
@@ -181,6 +182,32 @@ public class AsyncModule extends AbstractModule {
   @CmdLine(name = "gc_executor_path", help = "Path to the gc executor launch script.")
   private static final Arg<String> GC_EXECUTOR_PATH = Arg.create(null);
 
+  // TODO(maxim): Disabled by default until AURORA-715 is complete.
+  @CmdLine(name = "reconciliation_initial_delay",
+      help = "Initial amount of time to delay task reconciliation after scheduler start up.")
+  private static final Arg<Amount<Long, Time>> RECONCILIATION_INITIAL_DELAY =
+      Arg.create(Amount.of(Long.MAX_VALUE, Time.MINUTES));
+
+  @Positive
+  @CmdLine(name = "reconciliation_explicit_interval",
+      help = "Interval on which scheduler will ask Mesos for status updates of all non-terminal
"
+      + "tasks known to scheduler.")
+  private static final Arg<Amount<Long, Time>> RECONCILIATION_EXPLICIT_INTERVAL
=
+      Arg.create(Amount.of(60L, Time.MINUTES));
+
+  @Positive
+  @CmdLine(name = "reconciliation_implicit_interval",
+      help = "Interval on which scheduler will ask Mesos for status updates of all non-terminal
"
+          + "tasks known to Mesos.")
+  private static final Arg<Amount<Long, Time>> RECONCILIATION_IMPLICIT_INTERVAL
=
+      Arg.create(Amount.of(60L, Time.MINUTES));
+
+  @CmdLine(name = "reconciliation_schedule_spread",
+      help = "Difference between explicit and implicit reconciliation intervals intended
to "
+          + "create a non-overlapping task reconciliation schedule.")
+  private static final Arg<Amount<Long, Time>> RECONCILIATION_SCHEDULE_SPREAD
=
+      Arg.create(Amount.of(30L, Time.MINUTES));
+
   @Qualifier
   @Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME)
   private @interface AsyncExecutor { }
@@ -312,6 +339,21 @@ public class AsyncModule extends AbstractModule {
     install(new PrivateModule() {
       @Override
       protected void configure() {
+        bind(TaskReconcilerSettings.class).toInstance(new TaskReconcilerSettings(
+            RECONCILIATION_INITIAL_DELAY.get(),
+            RECONCILIATION_EXPLICIT_INTERVAL.get(),
+            RECONCILIATION_IMPLICIT_INTERVAL.get(),
+            RECONCILIATION_SCHEDULE_SPREAD.get()));
+        bind(ScheduledExecutorService.class).toInstance(executor);
+        bind(TaskReconciler.class).in(Singleton.class);
+        expose(TaskReconciler.class);
+      }
+    });
+    SchedulerServicesModule.addSchedulerActiveServiceBinding(binder()).to(TaskReconciler.class);
+
+    install(new PrivateModule() {
+      @Override
+      protected void configure() {
         bind(JobUpdateHistoryPruner.HistoryPrunerSettings.class).toInstance(
             new JobUpdateHistoryPruner.HistoryPrunerSettings(
                 JOB_UPDATE_HISTORY_PRUNING_INTERVAL.get(),

http://git-wip-us.apache.org/repos/asf/aurora/blob/6db13baf/src/main/java/org/apache/aurora/scheduler/async/TaskReconciler.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/TaskReconciler.java b/src/main/java/org/apache/aurora/scheduler/async/TaskReconciler.java
new file mode 100644
index 0000000..23f5f64
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/async/TaskReconciler.java
@@ -0,0 +1,145 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.async;
+
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.atomic.AtomicLong;
+
+import javax.inject.Inject;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Function;
+import com.google.common.collect.FluentIterable;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.util.concurrent.AbstractIdleService;
+import com.twitter.common.quantity.Amount;
+import com.twitter.common.quantity.Time;
+import com.twitter.common.stats.StatsProvider;
+
+import org.apache.aurora.scheduler.base.Query;
+import org.apache.aurora.scheduler.mesos.Driver;
+import org.apache.aurora.scheduler.storage.Storage;
+import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
+import org.apache.mesos.Protos;
+
+import static java.util.Objects.requireNonNull;
+
+import static com.twitter.common.quantity.Time.MINUTES;
+
+/**
+ * A task reconciler that periodically triggers Mesos (implicit) and Aurora (explicit) task
+ * reconciliation to synchronize global task states. More on task reconciliation:
+ * http://mesos.apache.org/documentation/latest/reconciliation.
+ */
+public class TaskReconciler extends AbstractIdleService {
+
+  @VisibleForTesting
+  static final String EXPLICIT_STAT_NAME = "reconciliation_explicit_runs";
+
+  @VisibleForTesting
+  static final String IMPLICIT_STAT_NAME = "reconciliation_implicit_runs";
+
+  private final TaskReconcilerSettings settings;
+  private final Storage storage;
+  private final Driver driver;
+  private final ScheduledExecutorService executor;
+  private final AtomicLong explicitRuns;
+  private final AtomicLong implicitRuns;
+
+  static class TaskReconcilerSettings {
+    private final Amount<Long, Time> initialDelay;
+    private final Amount<Long, Time> explicitInterval;
+    private final Amount<Long, Time> implicitInterval;
+    private final Amount<Long, Time> scheduleSpread;
+
+    @VisibleForTesting
+    TaskReconcilerSettings(
+        Amount<Long, Time> initialDelay,
+        Amount<Long, Time> explicitInterval,
+        Amount<Long, Time> implicitInterval,
+        Amount<Long, Time> scheduleSpread) {
+
+      this.initialDelay = requireNonNull(initialDelay);
+      this.explicitInterval = requireNonNull(explicitInterval);
+      this.implicitInterval = requireNonNull(implicitInterval);
+      this.scheduleSpread = requireNonNull(scheduleSpread);
+    }
+  }
+
+  @Inject
+  TaskReconciler(
+      TaskReconcilerSettings settings,
+      Storage storage,
+      Driver driver,
+      ScheduledExecutorService executor,
+      StatsProvider stats) {
+
+    this.settings = requireNonNull(settings);
+    this.storage = requireNonNull(storage);
+    this.driver = requireNonNull(driver);
+    this.executor = requireNonNull(executor);
+    this.explicitRuns = stats.makeCounter(EXPLICIT_STAT_NAME);
+    this.implicitRuns = stats.makeCounter(IMPLICIT_STAT_NAME);
+  }
+
+  @Override
+  protected void startUp() {
+    // Schedule explicit reconciliation.
+    executor.scheduleAtFixedRate(
+        new Runnable() {
+          @Override
+          public void run() {
+            ImmutableSet<Protos.TaskStatus> active = FluentIterable
+                .from(Storage.Util.fetchTasks(storage, Query.unscoped().active()))
+                .transform(TASK_TO_PROTO)
+                .toSet();
+
+            driver.reconcileTasks(active);
+            explicitRuns.incrementAndGet();
+          }
+        },
+        settings.initialDelay.as(MINUTES),
+        settings.explicitInterval.as(MINUTES),
+        MINUTES.getTimeUnit());
+
+    // Schedule implicit reconciliation.
+    executor.scheduleAtFixedRate(
+        new Runnable() {
+          @Override
+          public void run() {
+            driver.reconcileTasks(ImmutableSet.of());
+            implicitRuns.incrementAndGet();
+          }
+        },
+        settings.initialDelay.as(MINUTES) + settings.scheduleSpread.as(MINUTES),
+        settings.implicitInterval.as(MINUTES),
+        MINUTES.getTimeUnit());
+  }
+
+  @Override
+  protected void shutDown() {
+    // Nothing to do - await VM shutdown.
+  }
+
+  @VisibleForTesting
+  static final Function<IScheduledTask, Protos.TaskStatus> TASK_TO_PROTO =
+      t -> Protos.TaskStatus.newBuilder()
+          // TODO(maxim): State is required by protobuf but ignored by Mesos for reconciliation
+          // purposes. This is the artifact of the native API. The new HTTP Mesos API will
be
+          // accepting task IDs instead. AURORA-1326 tracks solution on the scheduler side.
+          // Setting TASK_RUNNING as a safe dummy value here.
+          .setState(Protos.TaskState.TASK_RUNNING)
+          .setTaskId(Protos.TaskID.newBuilder().setValue(t.getAssignedTask().getTaskId()).build())
+          .build();
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/6db13baf/src/main/java/org/apache/aurora/scheduler/mesos/Driver.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/mesos/Driver.java b/src/main/java/org/apache/aurora/scheduler/mesos/Driver.java
index 975ea02..013c50c 100644
--- a/src/main/java/org/apache/aurora/scheduler/mesos/Driver.java
+++ b/src/main/java/org/apache/aurora/scheduler/mesos/Driver.java
@@ -13,6 +13,8 @@
  */
 package org.apache.aurora.scheduler.mesos;
 
+import java.util.Collection;
+
 import com.google.common.util.concurrent.Service;
 
 import org.apache.mesos.Protos.OfferID;
@@ -67,4 +69,11 @@ public interface Driver extends Service {
    * Aborts the driver.
    */
   void abort();
+
+  /**
+   * Requests task reconciliation.
+   *
+   * @param statuses Task statuses to reconcile.
+   */
+  void reconcileTasks(Collection<TaskStatus> statuses);
 }

http://git-wip-us.apache.org/repos/asf/aurora/blob/6db13baf/src/main/java/org/apache/aurora/scheduler/mesos/SchedulerDriverService.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/mesos/SchedulerDriverService.java b/src/main/java/org/apache/aurora/scheduler/mesos/SchedulerDriverService.java
index 35cada6..5567fe0 100644
--- a/src/main/java/org/apache/aurora/scheduler/mesos/SchedulerDriverService.java
+++ b/src/main/java/org/apache/aurora/scheduler/mesos/SchedulerDriverService.java
@@ -13,6 +13,7 @@
  */
 package org.apache.aurora.scheduler.mesos;
 
+import java.util.Collection;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.logging.Logger;
@@ -120,20 +121,20 @@ class SchedulerDriverService extends AbstractIdleService implements
Driver {
 
   @Override
   public void launchTask(Protos.OfferID offerId, Protos.TaskInfo task) {
-    checkState(isRunning(), "Driver is not running.");
+    ensureRunning();
     Futures.getUnchecked(driverFuture)
         .launchTasks(ImmutableList.of(offerId), ImmutableList.of(task));
   }
 
   @Override
   public void declineOffer(Protos.OfferID offerId) {
-    checkState(isRunning(), "Driver is not running.");
+    ensureRunning();
     Futures.getUnchecked(driverFuture).declineOffer(offerId);
   }
 
   @Override
   public void killTask(String taskId) {
-    checkState(isRunning(), "Driver is not running.");
+    ensureRunning();
     Protos.Status status = Futures.getUnchecked(driverFuture).killTask(
         Protos.TaskID.newBuilder().setValue(taskId).build());
 
@@ -146,7 +147,17 @@ class SchedulerDriverService extends AbstractIdleService implements Driver
{
 
   @Override
   public void acknowledgeStatusUpdate(Protos.TaskStatus status) {
-    checkState(isRunning(), "Driver is not running.");
+    ensureRunning();
     Futures.getUnchecked(driverFuture).acknowledgeStatusUpdate(status);
   }
+
+  @Override
+  public void reconcileTasks(Collection<Protos.TaskStatus> statuses) {
+    ensureRunning();
+    Futures.getUnchecked(driverFuture).reconcileTasks(statuses);
+  }
+
+  private void ensureRunning() {
+    checkState(isRunning(), "Driver is not running.");
+  }
 }

http://git-wip-us.apache.org/repos/asf/aurora/blob/6db13baf/src/test/java/org/apache/aurora/scheduler/async/JobUpdateHistoryPrunerTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/async/JobUpdateHistoryPrunerTest.java
b/src/test/java/org/apache/aurora/scheduler/async/JobUpdateHistoryPrunerTest.java
index 02e8798..f73b2c6 100644
--- a/src/test/java/org/apache/aurora/scheduler/async/JobUpdateHistoryPrunerTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/async/JobUpdateHistoryPrunerTest.java
@@ -64,7 +64,7 @@ public class JobUpdateHistoryPrunerTest extends EasyMockTest {
             1));
 
     pruner.startAsync().awaitRunning();
-    executorClock.advance(Amount.of(2L, Time.MILLISECONDS));
+    executorClock.advance(Amount.of(1L, Time.MILLISECONDS));
     executorClock.advance(Amount.of(1L, Time.MILLISECONDS));
   }
 }

http://git-wip-us.apache.org/repos/asf/aurora/blob/6db13baf/src/test/java/org/apache/aurora/scheduler/async/TaskReconcilerTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/async/TaskReconcilerTest.java b/src/test/java/org/apache/aurora/scheduler/async/TaskReconcilerTest.java
new file mode 100644
index 0000000..f56ffd2
--- /dev/null
+++ b/src/test/java/org/apache/aurora/scheduler/async/TaskReconcilerTest.java
@@ -0,0 +1,110 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.async;
+
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.atomic.AtomicLong;
+
+import com.google.common.collect.ImmutableSet;
+import com.twitter.common.quantity.Amount;
+import com.twitter.common.quantity.Time;
+import com.twitter.common.stats.StatsProvider;
+import com.twitter.common.testing.easymock.EasyMockTest;
+
+import org.apache.aurora.scheduler.base.Query;
+import org.apache.aurora.scheduler.base.TaskTestUtil;
+import org.apache.aurora.scheduler.mesos.Driver;
+import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
+import org.apache.aurora.scheduler.storage.testing.StorageTestUtil;
+import org.apache.aurora.scheduler.testing.FakeScheduledExecutor;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.apache.aurora.scheduler.async.TaskReconciler.EXPLICIT_STAT_NAME;
+import static org.apache.aurora.scheduler.async.TaskReconciler.IMPLICIT_STAT_NAME;
+import static org.apache.aurora.scheduler.async.TaskReconciler.TASK_TO_PROTO;
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.expectLastCall;
+import static org.junit.Assert.assertEquals;
+
+public class TaskReconcilerTest extends EasyMockTest {
+  private StorageTestUtil storageUtil;
+  private StatsProvider statsProvider;
+  private Driver driver;
+  private ScheduledExecutorService executorService;
+  private FakeScheduledExecutor clock;
+
+  @Before
+  public void setUp() {
+    storageUtil = new StorageTestUtil(this);
+    statsProvider = createMock(StatsProvider.class);
+    driver = createMock(Driver.class);
+    executorService = createMock(ScheduledExecutorService.class);
+    clock = FakeScheduledExecutor.scheduleAtFixedRateExecutor(executorService, 2, 5);
+  }
+
+  @Test
+  public void testExecution() {
+    AtomicLong explicitRuns = new AtomicLong();
+    AtomicLong implicitRuns = new AtomicLong();
+    expect(statsProvider.makeCounter(EXPLICIT_STAT_NAME)).andReturn(explicitRuns);
+    expect(statsProvider.makeCounter(IMPLICIT_STAT_NAME)).andReturn(implicitRuns);
+
+    IScheduledTask task = TaskTestUtil.makeTask("id1", TaskTestUtil.JOB);
+    storageUtil.expectOperations();
+    storageUtil.expectTaskFetch(Query.unscoped().active(), task).times(5);
+
+    driver.reconcileTasks(ImmutableSet.of(TASK_TO_PROTO.apply(task)));
+    expectLastCall().times(5);
+
+    driver.reconcileTasks(ImmutableSet.of());
+    expectLastCall().times(2);
+
+    control.replay();
+
+    Amount<Long, Time> initialDelay = Amount.of(10L, Time.MINUTES);
+    Amount<Long, Time> explicitSchedule = Amount.of(60L, Time.MINUTES);
+    Amount<Long, Time> implicitSchedule = Amount.of(180L, Time.MINUTES);
+    Amount<Long, Time> spread = Amount.of(30L, Time.MINUTES);
+
+    TaskReconciler reconciler = new TaskReconciler(
+        new TaskReconciler.TaskReconcilerSettings(
+            initialDelay,
+            explicitSchedule,
+            implicitSchedule,
+            spread),
+        storageUtil.storage,
+        driver,
+        executorService,
+        statsProvider);
+
+    reconciler.startAsync().awaitRunning();
+
+    clock.advance(initialDelay);
+    assertEquals(1L, explicitRuns.get());
+    assertEquals(0L, implicitRuns.get());
+
+    clock.advance(spread);
+    assertEquals(1L, explicitRuns.get());
+    assertEquals(1L, implicitRuns.get());
+
+    clock.advance(explicitSchedule);
+    assertEquals(2L, explicitRuns.get());
+    assertEquals(1L, implicitRuns.get());
+
+    clock.advance(implicitSchedule);
+    assertEquals(5L, explicitRuns.get());
+    assertEquals(2L, implicitRuns.get());
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/6db13baf/src/test/java/org/apache/aurora/scheduler/testing/FakeScheduledExecutor.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/testing/FakeScheduledExecutor.java
b/src/test/java/org/apache/aurora/scheduler/testing/FakeScheduledExecutor.java
index 2beea4f..916483b 100644
--- a/src/test/java/org/apache/aurora/scheduler/testing/FakeScheduledExecutor.java
+++ b/src/test/java/org/apache/aurora/scheduler/testing/FakeScheduledExecutor.java
@@ -91,13 +91,23 @@ public final class FakeScheduledExecutor extends FakeClock {
       ScheduledExecutorService mock,
       int maxInvocations) {
 
+    return scheduleAtFixedRateExecutor(mock, 1, maxInvocations);
+  }
+
+  public static FakeScheduledExecutor scheduleAtFixedRateExecutor(
+      ScheduledExecutorService mock,
+      int maxSchedules,
+      int maxInvocations) {
+
     FakeScheduledExecutor executor = new FakeScheduledExecutor();
     mock.scheduleAtFixedRate(
         EasyMock.<Runnable>anyObject(),
         EasyMock.anyLong(),
         EasyMock.anyLong(),
         EasyMock.<TimeUnit>anyObject());
-    expectLastCall().andAnswer(answerScheduleAtFixedRate(executor, maxInvocations)).once();
+    expectLastCall()
+        .andAnswer(answerScheduleAtFixedRate(executor, maxInvocations))
+        .times(maxSchedules);
 
     return executor;
   }
@@ -114,7 +124,7 @@ public final class FakeScheduledExecutor extends FakeClock {
         long initialDelay = (Long) args[1];
         long period = (Long) args[2];
         TimeUnit unit = (TimeUnit) args[3];
-        for (int i = 1; i <= workCount; i++) {
+        for (int i = 0; i <= workCount; i++) {
           addDelayedWork(executor, toMillis(initialDelay, unit) + i * toMillis(period, unit),
work);
         }
         return null;


Mime
View raw message