aurora-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From wfar...@apache.org
Subject incubator-aurora git commit: Export stats for source and reason for LOST tasks, and status delivery latency.
Date Tue, 24 Feb 2015 01:04:25 GMT
Repository: incubator-aurora
Updated Branches:
  refs/heads/master 1449a201d -> 19378c19d


Export stats for source and reason for LOST tasks, and status delivery latency.

Bugs closed: AURORA-1028

Reviewed at https://reviews.apache.org/r/31248/


Project: http://git-wip-us.apache.org/repos/asf/incubator-aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-aurora/commit/19378c19
Tree: http://git-wip-us.apache.org/repos/asf/incubator-aurora/tree/19378c19
Diff: http://git-wip-us.apache.org/repos/asf/incubator-aurora/diff/19378c19

Branch: refs/heads/master
Commit: 19378c19d29babd3c4cece9f5c5d05ebf18d1c2d
Parents: 1449a20
Author: Bill Farner <wfarner@apache.org>
Authored: Mon Feb 23 17:00:17 2015 -0800
Committer: Bill Farner <wfarner@apache.org>
Committed: Mon Feb 23 17:00:17 2015 -0800

----------------------------------------------------------------------
 .../aurora/scheduler/events/PubsubEvent.java    |  54 +++++++++
 .../scheduler/mesos/MesosSchedulerImpl.java     |  38 +++++-
 .../scheduler/mesos/SchedulerDriverModule.java  |  32 +++--
 .../aurora/scheduler/mesos/TaskStatusStats.java | 118 ++++++++++++++++++
 .../scheduler/mesos/MesosSchedulerImplTest.java |  18 +++
 .../scheduler/mesos/TaskStatusStatsTest.java    | 121 +++++++++++++++++++
 6 files changed, 365 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/19378c19/src/main/java/org/apache/aurora/scheduler/events/PubsubEvent.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/events/PubsubEvent.java b/src/main/java/org/apache/aurora/scheduler/events/PubsubEvent.java
index 1d8f012..c103472 100644
--- a/src/main/java/org/apache/aurora/scheduler/events/PubsubEvent.java
+++ b/src/main/java/org/apache/aurora/scheduler/events/PubsubEvent.java
@@ -23,6 +23,8 @@ import org.apache.aurora.scheduler.base.Tasks;
 import org.apache.aurora.scheduler.filter.SchedulingFilter.Veto;
 import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
 import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
+import org.apache.mesos.Protos;
+import org.apache.mesos.Protos.TaskStatus;
 
 import static java.util.Objects.requireNonNull;
 
@@ -261,4 +263,56 @@ public interface PubsubEvent {
     }
   }
 
+  class TaskStatusReceived implements PubsubEvent {
+    private final Protos.TaskState state;
+    private final Optional<TaskStatus.Source> source;
+    private final Optional<TaskStatus.Reason> reason;
+    private final Optional<Long> epochTimestampMicros;
+
+    public TaskStatusReceived(
+        Protos.TaskState state,
+        Optional<TaskStatus.Source> source,
+        Optional<TaskStatus.Reason> reason,
+        Optional<Long> epochTimestampMicros) {
+
+      this.state = requireNonNull(state);
+      this.source = requireNonNull(source);
+      this.reason = requireNonNull(reason);
+      this.epochTimestampMicros = requireNonNull(epochTimestampMicros);
+    }
+
+    public Protos.TaskState getState() {
+      return state;
+    }
+
+    public Optional<TaskStatus.Source> getSource() {
+      return source;
+    }
+
+    public Optional<TaskStatus.Reason> getReason() {
+      return reason;
+    }
+
+    public Optional<Long> getEpochTimestampMicros() {
+      return epochTimestampMicros;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (!(o instanceof TaskStatusReceived)) {
+        return false;
+      }
+
+      TaskStatusReceived other = (TaskStatusReceived) o;
+      return Objects.equals(state, other.state)
+          && Objects.equals(source, other.source)
+          && Objects.equals(reason, other.reason)
+          && Objects.equals(epochTimestampMicros, other.epochTimestampMicros);
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hash(state, source, reason, epochTimestampMicros);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/19378c19/src/main/java/org/apache/aurora/scheduler/mesos/MesosSchedulerImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/mesos/MesosSchedulerImpl.java b/src/main/java/org/apache/aurora/scheduler/mesos/MesosSchedulerImpl.java
index ffc30bb..aa8aaad 100644
--- a/src/main/java/org/apache/aurora/scheduler/mesos/MesosSchedulerImpl.java
+++ b/src/main/java/org/apache/aurora/scheduler/mesos/MesosSchedulerImpl.java
@@ -24,6 +24,8 @@ import java.util.logging.Logger;
 import javax.inject.Inject;
 import javax.inject.Qualifier;
 
+import com.google.common.base.Function;
+import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.twitter.common.application.Lifecycle;
 import com.twitter.common.inject.TimedInterceptor.Timed;
@@ -36,6 +38,7 @@ import org.apache.aurora.scheduler.base.SchedulerException;
 import org.apache.aurora.scheduler.events.EventSink;
 import org.apache.aurora.scheduler.events.PubsubEvent.DriverDisconnected;
 import org.apache.aurora.scheduler.events.PubsubEvent.DriverRegistered;
+import org.apache.aurora.scheduler.events.PubsubEvent.TaskStatusReceived;
 import org.apache.aurora.scheduler.storage.AttributeStore;
 import org.apache.aurora.scheduler.storage.Storage;
 import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
@@ -192,15 +195,40 @@ class MesosSchedulerImpl implements Scheduler {
     }
   }
 
+  private static void logStatusUpdate(TaskStatus status) {
+    StringBuilder message = new StringBuilder("Received status update for task ")
+        .append(status.getTaskId().getValue())
+        .append(" in state ")
+        .append(status.getState());
+    if (status.hasSource()) {
+      message.append(" from ").append(status.getSource());
+    }
+    if (status.hasReason()) {
+      message.append(" with ").append(status.getReason());
+    }
+    if (status.hasMessage()) {
+      message.append(": ").append(status.getMessage());
+    }
+    LOG.info(message.toString());
+  }
+
+  private static final Function<Double, Long> SECONDS_TO_MICROS = new Function<Double,
Long>() {
+    @Override
+    public Long apply(Double seconds) {
+      return (long) (seconds * 1E6);
+    }
+  };
+
   @AllowUnchecked
   @Timed("scheduler_status_update")
   @Override
   public void statusUpdate(SchedulerDriver driver, TaskStatus status) {
-    String info = status.hasData() ? status.getData().toStringUtf8() : null;
-    String infoMsg = info == null ? "" : " with info " + info;
-    String coreMsg = status.hasMessage() ? " with core message " + status.getMessage() :
"";
-    LOG.info("Received status update for task " + status.getTaskId().getValue()
-        + " in state " + status.getState() + infoMsg + coreMsg);
+    logStatusUpdate(status);
+    eventSink.post(new TaskStatusReceived(
+        status.getState(),
+        Optional.fromNullable(status.getSource()),
+        Optional.fromNullable(status.getReason()),
+        Optional.fromNullable(status.getTimestamp()).transform(SECONDS_TO_MICROS)));
 
     try {
       for (TaskLauncher launcher : taskLaunchers) {

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/19378c19/src/main/java/org/apache/aurora/scheduler/mesos/SchedulerDriverModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/mesos/SchedulerDriverModule.java b/src/main/java/org/apache/aurora/scheduler/mesos/SchedulerDriverModule.java
index 59ad9e6..d7d659b 100644
--- a/src/main/java/org/apache/aurora/scheduler/mesos/SchedulerDriverModule.java
+++ b/src/main/java/org/apache/aurora/scheduler/mesos/SchedulerDriverModule.java
@@ -18,28 +18,38 @@ import java.util.logging.Logger;
 
 import javax.inject.Singleton;
 
+import com.google.inject.AbstractModule;
 import com.google.inject.PrivateModule;
 
 import org.apache.aurora.scheduler.base.AsyncUtil;
+import org.apache.aurora.scheduler.events.PubsubEventModule;
 import org.apache.mesos.Scheduler;
 
 /**
  * A module that creates a {@link Driver} binding.
  */
-public class SchedulerDriverModule extends PrivateModule {
+public class SchedulerDriverModule extends AbstractModule {
   private static final Logger LOG = Logger.getLogger(SchedulerDriverModule.class.getName());
 
   @Override
   protected void configure() {
-    bind(Driver.class).to(SchedulerDriverService.class);
-    bind(SchedulerDriverService.class).in(Singleton.class);
-    expose(Driver.class);
-
-    bind(Scheduler.class).to(MesosSchedulerImpl.class);
-    bind(MesosSchedulerImpl.class).in(Singleton.class);
-
-    // TODO(zmanji): Create singleThreadedExecutor (non-scheduled) variant.
-    bind(Executor.class).annotatedWith(MesosSchedulerImpl.SchedulerExecutor.class)
-        .toInstance(AsyncUtil.singleThreadLoggingScheduledExecutor("SchedulerImpl-%d", LOG));
+    install(new PrivateModule() {
+      @Override
+      protected void configure() {
+        bind(Driver.class).to(SchedulerDriverService.class);
+        bind(SchedulerDriverService.class).in(Singleton.class);
+        expose(Driver.class);
+
+        bind(Scheduler.class).to(MesosSchedulerImpl.class);
+        bind(MesosSchedulerImpl.class).in(Singleton.class);
+
+        // TODO(zmanji): Create singleThreadedExecutor (non-scheduled) variant.
+        bind(Executor.class).annotatedWith(MesosSchedulerImpl.SchedulerExecutor.class)
+            .toInstance(AsyncUtil.singleThreadLoggingScheduledExecutor("SchedulerImpl-%d",
LOG));
+      }
+    });
+
+    PubsubEventModule.bindSubscriber(binder(), TaskStatusStats.class);
+    bind(TaskStatusStats.class).in(Singleton.class);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/19378c19/src/main/java/org/apache/aurora/scheduler/mesos/TaskStatusStats.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/mesos/TaskStatusStats.java b/src/main/java/org/apache/aurora/scheduler/mesos/TaskStatusStats.java
new file mode 100644
index 0000000..68214f2
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/mesos/TaskStatusStats.java
@@ -0,0 +1,118 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.mesos;
+
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.logging.Logger;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import com.google.common.eventbus.Subscribe;
+import com.google.inject.Inject;
+import com.twitter.common.stats.StatsProvider;
+import com.twitter.common.stats.StatsProvider.RequestTimer;
+import com.twitter.common.util.Clock;
+
+import org.apache.aurora.scheduler.events.PubsubEvent.EventSubscriber;
+import org.apache.aurora.scheduler.events.PubsubEvent.TaskStatusReceived;
+import org.apache.mesos.Protos;
+import org.apache.mesos.Protos.TaskStatus.Reason;
+import org.apache.mesos.Protos.TaskStatus.Source;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * A status event listener that exports statistics about the contents of status updates.
+ */
+class TaskStatusStats implements EventSubscriber {
+
+  private static final Logger LOG = Logger.getLogger(TaskStatusStats.class.getName());
+
+  private final Clock clock;
+
+  private final LoadingCache<Source, AtomicLong> lostSourceCounters;
+  private final LoadingCache<Reason, AtomicLong> lostReasonCounters;
+
+  private final LoadingCache<Source, RequestTimer> latencyTimers;
+
+  @Inject
+  TaskStatusStats(final StatsProvider statsProvider, Clock clock) {
+    requireNonNull(statsProvider);
+    this.clock = requireNonNull(clock);
+
+    lostSourceCounters = CacheBuilder.newBuilder()
+        .build(new CacheLoader<Source, AtomicLong>() {
+          @Override
+          public AtomicLong load(Source source) {
+            return statsProvider.makeCounter(lostCounterName(source));
+          }
+        });
+    lostReasonCounters = CacheBuilder.newBuilder()
+        .build(new CacheLoader<Reason, AtomicLong>() {
+          @Override
+          public AtomicLong load(Reason reason) {
+            return statsProvider.makeCounter(lostReasonCounterName(reason));
+          }
+        });
+    latencyTimers = CacheBuilder.newBuilder()
+        .build(new CacheLoader<Source, RequestTimer>() {
+          @Override
+          public RequestTimer load(Source source) {
+            return statsProvider.makeRequestTimer(latencyTimerName(source));
+          }
+        });
+  }
+
+  @VisibleForTesting
+  static String lostCounterName(Source source) {
+    return "task_lost_" + source;
+  }
+
+  @VisibleForTesting
+  static String lostReasonCounterName(Reason reason) {
+    return "task_lost_" + reason;
+  }
+
+  @VisibleForTesting
+  static String latencyTimerName(Source source) {
+    return "task_delivery_delay_" + source;
+  }
+
+  @Subscribe
+  public void accumulate(TaskStatusReceived event) {
+    if (event.getState() == Protos.TaskState.TASK_LOST) {
+      if (event.getSource().isPresent()) {
+        lostSourceCounters.getUnchecked(event.getSource().get()).incrementAndGet();
+      }
+      if (event.getReason().isPresent()) {
+        lostReasonCounters.getUnchecked(event.getReason().get()).incrementAndGet();
+      }
+    }
+
+    if (event.getSource().isPresent() && event.getEpochTimestampMicros().isPresent())
{
+      long nowMicros = clock.nowMillis() * 1000;
+      // Avoid distorting stats by recording zero or negative values.  This can result if
delivery
+      // is faster than the clock resolution (1 ms) or there is clock skew between the systems.
+      // In reality, this value is likely to be inaccurate, especially at the resolution
of millis.
+      if (event.getEpochTimestampMicros().get() < nowMicros) {
+        latencyTimers.getUnchecked(event.getSource().get())
+            .requestComplete(nowMicros - event.getEpochTimestampMicros().get());
+      } else {
+        LOG.fine("Not recording stats for status update with timestamp <= now");
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/19378c19/src/test/java/org/apache/aurora/scheduler/mesos/MesosSchedulerImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/mesos/MesosSchedulerImplTest.java b/src/test/java/org/apache/aurora/scheduler/mesos/MesosSchedulerImplTest.java
index d02c6b3..d621586 100644
--- a/src/test/java/org/apache/aurora/scheduler/mesos/MesosSchedulerImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/mesos/MesosSchedulerImplTest.java
@@ -43,6 +43,7 @@ import org.apache.aurora.scheduler.base.SchedulerException;
 import org.apache.aurora.scheduler.events.EventSink;
 import org.apache.aurora.scheduler.events.PubsubEvent.DriverDisconnected;
 import org.apache.aurora.scheduler.events.PubsubEvent.DriverRegistered;
+import org.apache.aurora.scheduler.events.PubsubEvent.TaskStatusReceived;
 import org.apache.aurora.scheduler.storage.Storage;
 import org.apache.aurora.scheduler.storage.Storage.StorageException;
 import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
@@ -55,6 +56,8 @@ import org.apache.mesos.Protos.SlaveID;
 import org.apache.mesos.Protos.TaskID;
 import org.apache.mesos.Protos.TaskState;
 import org.apache.mesos.Protos.TaskStatus;
+import org.apache.mesos.Protos.TaskStatus.Reason;
+import org.apache.mesos.Protos.TaskStatus.Source;
 import org.apache.mesos.SchedulerDriver;
 import org.junit.Before;
 import org.junit.Test;
@@ -108,9 +111,20 @@ public class MesosSchedulerImplTest extends EasyMockTest {
 
   private static final TaskStatus STATUS = TaskStatus.newBuilder()
       .setState(TaskState.TASK_RUNNING)
+      .setSource(Source.SOURCE_SLAVE)
+      // Only testing data plumbing, this field with TASK_RUNNING would not normally happen,
+      .setReason(Reason.REASON_COMMAND_EXECUTOR_FAILED)
+      .setTimestamp(1D)
       .setTaskId(TaskID.newBuilder().setValue("task-id").build())
       .build();
 
+  private static final TaskStatusReceived PUBSUB_EVENT = new TaskStatusReceived(
+      STATUS.getState(),
+      Optional.of(STATUS.getSource()),
+      Optional.of(STATUS.getReason()),
+      Optional.of(1000000L)
+  );
+
   private StorageTestUtil storageUtil;
   private TaskLauncher systemLauncher;
   private TaskLauncher userLauncher;
@@ -222,6 +236,7 @@ public class MesosSchedulerImplTest extends EasyMockTest {
     new StatusFixture() {
       @Override
       void expectations() throws Exception {
+        eventSink.post(PUBSUB_EVENT);
         expect(systemLauncher.statusUpdate(STATUS)).andReturn(false);
         expect(userLauncher.statusUpdate(STATUS)).andReturn(false);
       }
@@ -233,6 +248,7 @@ public class MesosSchedulerImplTest extends EasyMockTest {
     new StatusFixture() {
       @Override
       void expectations() throws Exception {
+        eventSink.post(PUBSUB_EVENT);
         expect(systemLauncher.statusUpdate(STATUS)).andReturn(true);
       }
     }.run();
@@ -243,6 +259,7 @@ public class MesosSchedulerImplTest extends EasyMockTest {
     new StatusFixture() {
       @Override
       void expectations() throws Exception {
+        eventSink.post(PUBSUB_EVENT);
         expect(systemLauncher.statusUpdate(STATUS)).andReturn(false);
         expect(userLauncher.statusUpdate(STATUS)).andReturn(true);
       }
@@ -254,6 +271,7 @@ public class MesosSchedulerImplTest extends EasyMockTest {
     new StatusFixture() {
       @Override
       void expectations() throws Exception {
+        eventSink.post(PUBSUB_EVENT);
         expect(systemLauncher.statusUpdate(STATUS)).andReturn(false);
         expect(userLauncher.statusUpdate(STATUS)).andThrow(new StorageException("Injected."));
       }

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/19378c19/src/test/java/org/apache/aurora/scheduler/mesos/TaskStatusStatsTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/mesos/TaskStatusStatsTest.java b/src/test/java/org/apache/aurora/scheduler/mesos/TaskStatusStatsTest.java
new file mode 100644
index 0000000..4bbeff9
--- /dev/null
+++ b/src/test/java/org/apache/aurora/scheduler/mesos/TaskStatusStatsTest.java
@@ -0,0 +1,121 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.mesos;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+import com.google.common.base.Optional;
+import com.google.common.eventbus.EventBus;
+import com.twitter.common.quantity.Amount;
+import com.twitter.common.quantity.Time;
+import com.twitter.common.stats.StatsProvider;
+import com.twitter.common.stats.StatsProvider.RequestTimer;
+import com.twitter.common.testing.easymock.EasyMockTest;
+import com.twitter.common.util.testing.FakeClock;
+
+import org.apache.aurora.scheduler.events.PubsubEvent.TaskStatusReceived;
+import org.apache.mesos.Protos.TaskState;
+import org.apache.mesos.Protos.TaskStatus.Reason;
+import org.apache.mesos.Protos.TaskStatus.Source;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.apache.aurora.scheduler.mesos.TaskStatusStats.latencyTimerName;
+import static org.apache.aurora.scheduler.mesos.TaskStatusStats.lostCounterName;
+import static org.apache.aurora.scheduler.mesos.TaskStatusStats.lostReasonCounterName;
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.expectLastCall;
+import static org.junit.Assert.assertEquals;
+
+public class TaskStatusStatsTest extends EasyMockTest {
+
+  private static final Amount<Long, Time> ONE_SECOND = Amount.of(1L, Time.SECONDS);
+
+  private StatsProvider statsProvider;
+  private FakeClock clock;
+  private EventBus eventBus;
+
+  @Before
+  public void setUp() {
+    statsProvider = createMock(StatsProvider.class);
+    clock = new FakeClock();
+    eventBus = new EventBus();
+    eventBus.register(new TaskStatusStats(statsProvider, clock));
+  }
+
+  private long agoMicros(Amount<Long, Time> duration) {
+    return clock.nowMillis() * 1000 - duration.as(Time.MICROSECONDS);
+  }
+
+  @Test
+  public void testAccumulateEvents() {
+    RequestTimer masterDeliveryDelay = createMock(RequestTimer.class);
+    expect(statsProvider.makeRequestTimer(latencyTimerName(Source.SOURCE_MASTER)))
+        .andReturn(masterDeliveryDelay);
+    masterDeliveryDelay.requestComplete(ONE_SECOND.as(Time.MICROSECONDS));
+    expectLastCall().times(2);
+
+    AtomicLong masterLostCounter = new AtomicLong();
+    expect(statsProvider.makeCounter(lostCounterName(Source.SOURCE_MASTER)))
+        .andReturn(masterLostCounter);
+
+    AtomicLong slaveDisconnectedCounter = new AtomicLong();
+    expect(statsProvider.makeCounter(lostReasonCounterName(Reason.REASON_SLAVE_DISCONNECTED)))
+        .andReturn(slaveDisconnectedCounter);
+
+    control.replay();
+
+    clock.advance(Amount.of(1L, Time.HOURS));
+    eventBus.post(new TaskStatusReceived(
+        TaskState.TASK_RUNNING,
+        Optional.of(Source.SOURCE_MASTER),
+        Optional.<Reason>absent(),
+        Optional.of(agoMicros(ONE_SECOND))));
+
+    clock.advance(ONE_SECOND);
+    eventBus.post(new TaskStatusReceived(
+        TaskState.TASK_LOST,
+        Optional.of(Source.SOURCE_MASTER),
+        Optional.of(Reason.REASON_SLAVE_DISCONNECTED),
+        Optional.of(agoMicros(ONE_SECOND))));
+
+    // No counting for these since they do not have both a source and timestamp.
+    eventBus.post(new TaskStatusReceived(
+        TaskState.TASK_LOST,
+        Optional.<Source>absent(),
+        Optional.<Reason>absent(),
+        Optional.<Long>absent()));
+    eventBus.post(new TaskStatusReceived(
+        TaskState.TASK_LOST,
+        Optional.<Source>absent(),
+        Optional.<Reason>absent(),
+        Optional.of(agoMicros(ONE_SECOND))));
+    eventBus.post(new TaskStatusReceived(
+        TaskState.TASK_LOST,
+        Optional.of(Source.SOURCE_MASTER),
+        Optional.of(Reason.REASON_SLAVE_DISCONNECTED),
+        Optional.<Long>absent()));
+
+    // No time tracking for this since the timestamp is the current time.
+    eventBus.post(new TaskStatusReceived(
+        TaskState.TASK_LOST,
+        Optional.of(Source.SOURCE_MASTER),
+        Optional.of(Reason.REASON_SLAVE_DISCONNECTED),
+        Optional.of(clock.nowMillis() * 1000)
+    ));
+
+    assertEquals(3L, masterLostCounter.get());
+    assertEquals(3L, slaveDisconnectedCounter.get());
+  }
+}


Mime
View raw message