nemo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jan...@apache.org
Subject [incubator-nemo] branch master updated: [NEMO-61] Fix lost execution metric collection problem (#60)
Date Fri, 29 Jun 2018 07:04:57 GMT
This is an automated email from the ASF dual-hosted git repository.

jangho pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-nemo.git


The following commit(s) were added to refs/heads/master by this push:
     new b88accb  [NEMO-61] Fix lost execution metric collection problem (#60)
b88accb is described below

commit b88accb3f942641cd4f20a794f71364f7afcf5e6
Author: Jae Hyeon Park <usezmap@gmail.com>
AuthorDate: Fri Jun 29 16:04:55 2018 +0900

    [NEMO-61] Fix lost execution metric collection problem (#60)
    
    JIRA: [NEMO-61: Fix lost execution metric collection problem](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-61)
    
    **Major changes:**
    - Add metric flush request and response types in control message type.
    - `RuntimeMaster#terminate` now wait until all metrics from executors arrive.
    
    **Minor changes to note:**
    - Expose `ExecutorReqistry#viewExecutors` to public.
    
    **Tests for the changes:**
    - MetricFlushTest ensures MetricManagerMaster can send flush requests to MetricManagerWorkers
and workers respond appropriately by flushing remaining metric logs.
    
    **Other comments:**
    - Related to NEMO-20.
    
    resolves [NEMO-61](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-61)
---
 .../common/message/ncs/NcsMessageEnvironment.java  |   2 +
 runtime/common/src/main/proto/ControlMessage.proto |   6 +-
 .../common/message/local/LocalMessageTest.java     |   2 -
 .../edu/snu/nemo/runtime/executor/Executor.java    |  21 +--
 .../nemo/runtime/executor/MetricManagerWorker.java |  11 ++
 .../nemo/runtime/executor/MetricMessageSender.java |  13 ++
 .../executor/datatransfer/DataTransferTest.java    |  10 +-
 .../nemo/runtime/master/MetricManagerMaster.java   |  19 ++-
 .../edu/snu/nemo/runtime/master/RuntimeMaster.java |  21 +++
 .../runtime/master/scheduler/ExecutorRegistry.java |   2 +-
 .../snu/nemo/runtime/master/MetricFlushTest.java   | 141 +++++++++++++++++++++
 11 files changed, 227 insertions(+), 21 deletions(-)

diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/message/ncs/NcsMessageEnvironment.java
b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/message/ncs/NcsMessageEnvironment.java
index 3b7e44c..4d07752 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/message/ncs/NcsMessageEnvironment.java
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/message/ncs/NcsMessageEnvironment.java
@@ -198,6 +198,8 @@ public final class NcsMessageEnvironment implements MessageEnvironment
{
       case DataSizeMetric:
       case ExecutorDataCollected:
       case MetricMessageReceived:
+      case RequestMetricFlush:
+      case MetricFlushed:
         return MessageType.Send;
       case RequestBlockLocation:
         return MessageType.Request;
diff --git a/runtime/common/src/main/proto/ControlMessage.proto b/runtime/common/src/main/proto/ControlMessage.proto
index d662f66..7a3cd7f 100644
--- a/runtime/common/src/main/proto/ControlMessage.proto
+++ b/runtime/common/src/main/proto/ControlMessage.proto
@@ -55,8 +55,10 @@ enum MessageType {
     RequestBlockLocation = 4;
     BlockLocationInfo = 5;
     ExecutorFailed = 6;
-    MetricMessageReceived = 7;
-    ExecutorDataCollected = 8;
+    ExecutorDataCollected = 7;
+    MetricMessageReceived = 8;
+    RequestMetricFlush = 9;
+    MetricFlushed = 10;
 }
 
 message Message {
diff --git a/runtime/common/src/test/java/edu/snu/nemo/runtime/common/message/local/LocalMessageTest.java
b/runtime/common/src/test/java/edu/snu/nemo/runtime/common/message/local/LocalMessageTest.java
index a05b7d2..a39916b 100644
--- a/runtime/common/src/test/java/edu/snu/nemo/runtime/common/message/local/LocalMessageTest.java
+++ b/runtime/common/src/test/java/edu/snu/nemo/runtime/common/message/local/LocalMessageTest.java
@@ -19,8 +19,6 @@ import edu.snu.nemo.runtime.common.message.MessageContext;
 import edu.snu.nemo.runtime.common.message.MessageEnvironment;
 import edu.snu.nemo.runtime.common.message.MessageListener;
 import edu.snu.nemo.runtime.common.message.MessageSender;
-import edu.snu.nemo.runtime.common.message.local.LocalMessageDispatcher;
-import edu.snu.nemo.runtime.common.message.local.LocalMessageEnvironment;
 import org.junit.Assert;
 import org.junit.Test;
 
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/Executor.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/Executor.java
index f5a2e83..8852e0e 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/Executor.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/Executor.java
@@ -161,15 +161,18 @@ public final class Executor {
     @Override
     public void onMessage(final ControlMessage.Message message) {
       switch (message.getType()) {
-      case ScheduleTask:
-        final ControlMessage.ScheduleTaskMsg scheduleTaskMsg = message.getScheduleTaskMsg();
-        final Task task =
-            SerializationUtils.deserialize(scheduleTaskMsg.getTask().toByteArray());
-        onTaskReceived(task);
-        break;
-      default:
-        throw new IllegalMessageException(
-            new Exception("This message should not be received by an executor :" + message.getType()));
+        case ScheduleTask:
+          final ControlMessage.ScheduleTaskMsg scheduleTaskMsg = message.getScheduleTaskMsg();
+          final Task task =
+              SerializationUtils.deserialize(scheduleTaskMsg.getTask().toByteArray());
+          onTaskReceived(task);
+          break;
+        case RequestMetricFlush:
+          metricMessageSender.flush();
+          break;
+        default:
+          throw new IllegalMessageException(
+              new Exception("This message should not be received by an executor :" + message.getType()));
       }
     }
 
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/MetricManagerWorker.java
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/MetricManagerWorker.java
index 7783036..f6ffd08 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/MetricManagerWorker.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/MetricManagerWorker.java
@@ -53,6 +53,17 @@ public final class MetricManagerWorker implements MetricMessageSender {
                                                       flushingPeriod, TimeUnit.MILLISECONDS);
   }
 
+  @Override
+  public void flush() {
+    flushMetricMessageQueueToMaster();
+    persistentConnectionToMasterMap.getMessageSender(MessageEnvironment.RUNTIME_MASTER_MESSAGE_LISTENER_ID).send(
+        ControlMessage.Message.newBuilder()
+            .setId(RuntimeIdGenerator.generateMessageId())
+            .setListenerId(MessageEnvironment.RUNTIME_MASTER_MESSAGE_LISTENER_ID)
+            .setType(ControlMessage.MessageType.MetricFlushed)
+            .build());
+  }
+
   private synchronized void flushMetricMessageQueueToMaster() {
     if (!metricMessageQueue.isEmpty()) {
       // Build batched metric messages
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/MetricMessageSender.java
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/MetricMessageSender.java
index 1d433db..dcd5155 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/MetricMessageSender.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/MetricMessageSender.java
@@ -23,7 +23,20 @@ import org.apache.reef.tang.annotations.DefaultImplementation;
 @DefaultImplementation(MetricManagerWorker.class)
 public interface MetricMessageSender extends AutoCloseable {
 
+  /**
+   * Send metric to master.
+   * @param metricKey key of the metric
+   * @param metricValue value of the metric
+   */
   void send(final String metricKey, final String metricValue);
 
+  /**
+   * Flush all metric inside of the queue.
+   */
+  void flush();
+
+  /**
+   * Flush the metric queue and close the metric dispatch.
+   */
   void close();
 }
diff --git a/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/datatransfer/DataTransferTest.java
b/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/datatransfer/DataTransferTest.java
index c2919a3..df9454a 100644
--- a/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/datatransfer/DataTransferTest.java
+++ b/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/datatransfer/DataTransferTest.java
@@ -43,11 +43,8 @@ import edu.snu.nemo.runtime.executor.Executor;
 import edu.snu.nemo.runtime.executor.MetricManagerWorker;
 import edu.snu.nemo.runtime.executor.data.BlockManagerWorker;
 import edu.snu.nemo.runtime.executor.data.SerializerManager;
-import edu.snu.nemo.runtime.master.ClientRPC;
-import edu.snu.nemo.runtime.master.MetricMessageHandler;
-import edu.snu.nemo.runtime.master.BlockManagerMaster;
+import edu.snu.nemo.runtime.master.*;
 import edu.snu.nemo.runtime.master.eventhandler.UpdatePhysicalPlanEventHandler;
-import edu.snu.nemo.runtime.master.RuntimeMaster;
 import edu.snu.nemo.runtime.master.resource.ContainerManager;
 import edu.snu.nemo.runtime.master.scheduler.ExecutorRegistry;
 import edu.snu.nemo.runtime.master.scheduler.*;
@@ -91,7 +88,7 @@ import static org.mockito.Mockito.mock;
  */
 @RunWith(PowerMockRunner.class)
 @PrepareForTest({PubSubEventHandlerWrapper.class, UpdatePhysicalPlanEventHandler.class, MetricMessageHandler.class,
-    SourceVertex.class, ClientRPC.class})
+    SourceVertex.class, ClientRPC.class, MetricManagerMaster.class})
 public final class DataTransferTest {
   private static final String EXECUTOR_ID_PREFIX = "Executor";
   private static final InterTaskDataStoreProperty.Value MEMORY_STORE = InterTaskDataStoreProperty.Value.MemoryStore;
@@ -139,11 +136,12 @@ public final class DataTransferTest {
         schedulerRunner, taskQueue, master, pubSubEventHandler, updatePhysicalPlanEventHandler,
executorRegistry);
     final AtomicInteger executorCount = new AtomicInteger(0);
     final ClientRPC clientRPC = mock(ClientRPC.class);
+    final MetricManagerMaster metricManagerMaster = mock(MetricManagerMaster.class);
 
     // Necessary for wiring up the message environments
     final RuntimeMaster runtimeMaster =
         new RuntimeMaster(scheduler, containerManager, master,
-            metricMessageHandler, messageEnvironment, clientRPC, EMPTY_DAG_DIRECTORY);
+            metricMessageHandler, messageEnvironment, clientRPC, metricManagerMaster, EMPTY_DAG_DIRECTORY);
 
     final Injector injector1 = Tang.Factory.getTang().newInjector();
     injector1.bindVolatileInstance(MessageEnvironment.class, messageEnvironment);
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/MetricManagerMaster.java
b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/MetricManagerMaster.java
index c1571ef..5b6a8fd 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/MetricManagerMaster.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/MetricManagerMaster.java
@@ -17,6 +17,10 @@ package edu.snu.nemo.runtime.master;
 
 import javax.inject.Inject;
 
+import edu.snu.nemo.runtime.common.RuntimeIdGenerator;
+import edu.snu.nemo.runtime.common.comm.ControlMessage;
+import edu.snu.nemo.runtime.common.message.MessageEnvironment;
+import edu.snu.nemo.runtime.master.scheduler.ExecutorRegistry;
 import org.apache.reef.annotations.audience.DriverSide;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -35,11 +39,24 @@ public final class MetricManagerMaster implements MetricMessageHandler
{
   private static final Logger LOG = LoggerFactory.getLogger(MetricManagerMaster.class.getName());
   private final Map<String, List<String>> compUnitIdToMetricInJson;
   private boolean isTerminated;
+  private final ExecutorRegistry executorRegistry;
 
   @Inject
-  private MetricManagerMaster() {
+  private MetricManagerMaster(final ExecutorRegistry executorRegistry) {
     this.compUnitIdToMetricInJson = new HashMap<>();
     this.isTerminated = false;
+    this.executorRegistry = executorRegistry;
+  }
+
+  public synchronized void sendMetricFlushRequest() {
+    executorRegistry.viewExecutors(executors -> executors.forEach(executor -> {
+      final ControlMessage.Message message = ControlMessage.Message.newBuilder()
+          .setId(RuntimeIdGenerator.generateMessageId())
+          .setListenerId(MessageEnvironment.EXECUTOR_MESSAGE_LISTENER_ID)
+          .setType(ControlMessage.MessageType.RequestMetricFlush)
+          .build();
+      executor.sendControlMessage(message);
+    }));
   }
 
   @Override
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/RuntimeMaster.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/RuntimeMaster.java
index 61ffd20..6d409d8 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/RuntimeMaster.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/RuntimeMaster.java
@@ -69,6 +69,7 @@ import static edu.snu.nemo.runtime.common.state.TaskState.State.ON_HOLD;
 public final class RuntimeMaster {
   private static final Logger LOG = LoggerFactory.getLogger(RuntimeMaster.class.getName());
   private static final int DAG_LOGGING_PERIOD = 3000;
+  private static final int METRIC_ARRIVE_TIMEOUT = 10000;
 
   private final ExecutorService runtimeMasterThread;
 
@@ -79,6 +80,7 @@ public final class RuntimeMaster {
   private final MessageEnvironment masterMessageEnvironment;
   private final Map<Integer, Long> aggregatedMetricData;
   private final ClientRPC clientRPC;
+  private final MetricManagerMaster metricManagerMaster;
 
   // For converting json data. This is a thread safe.
   private final ObjectMapper objectMapper;
@@ -88,6 +90,7 @@ public final class RuntimeMaster {
 
   private final AtomicInteger resourceRequestCount;
 
+  private CountDownLatch metricCountDownLatch;
 
   @Inject
   public RuntimeMaster(final Scheduler scheduler,
@@ -96,6 +99,7 @@ public final class RuntimeMaster {
                        final MetricMessageHandler metricMessageHandler,
                        final MessageEnvironment masterMessageEnvironment,
                        final ClientRPC clientRPC,
+                       final MetricManagerMaster metricManagerMaster,
                        @Parameter(JobConf.DAGDirectory.class) final String dagDirectory)
{
     // We would like to use a single thread for runtime master operations
     // since the processing logic in master takes a very short amount of time
@@ -110,6 +114,7 @@ public final class RuntimeMaster {
     this.masterMessageEnvironment
         .setupListener(MessageEnvironment.RUNTIME_MASTER_MESSAGE_LISTENER_ID, new MasterControlMessageReceiver());
     this.clientRPC = clientRPC;
+    this.metricManagerMaster = metricManagerMaster;
     this.dagDirectory = dagDirectory;
     this.irVertices = new HashSet<>();
     this.resourceRequestCount = new AtomicInteger(0);
@@ -145,6 +150,17 @@ public final class RuntimeMaster {
   }
 
   public void terminate() {
+    // send metric flush request to all executors
+    metricManagerMaster.sendMetricFlushRequest();
+    try {
+      // wait for metric flush
+      if (!metricCountDownLatch.await(METRIC_ARRIVE_TIMEOUT, TimeUnit.MILLISECONDS)) {
+        LOG.warn("Terminating master before all executor terminated messages arrived.");
+      }
+    } catch (final InterruptedException e) {
+      LOG.warn("Waiting executor terminating process interrupted.");
+    }
+
     runtimeMasterThread.execute(() -> {
 
       scheduler.terminate();
@@ -176,6 +192,7 @@ public final class RuntimeMaster {
           resourceRequestCount.getAndAdd(executorNum);
           containerManager.requestContainer(executorNum, builder.build());
         }
+        metricCountDownLatch = new CountDownLatch(resourceRequestCount.get());
       } catch (final Exception e) {
         throw new ContainerException(e);
       }
@@ -237,6 +254,7 @@ public final class RuntimeMaster {
   public void onExecutorFailed(final FailedEvaluator failedEvaluator) {
     runtimeMasterThread.execute(() -> {
       LOG.info("onExecutorFailed: {}", failedEvaluator.getId());
+      metricCountDownLatch.countDown();
 
       // Note that getFailedContextList() can be empty if the failure occurred
       // prior to launching an Executor on the Evaluator.
@@ -310,6 +328,9 @@ public final class RuntimeMaster {
             .setDataCollected(ControlMessage.DataCollectMessage.newBuilder().setData(serializedData).build())
             .build());
         break;
+      case MetricFlushed:
+        metricCountDownLatch.countDown();
+        break;
       default:
         throw new IllegalMessageException(
             new Exception("This message should not be received by Master :" + message.getType()));
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/ExecutorRegistry.java
b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/ExecutorRegistry.java
index 343151f..8f052f0 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/ExecutorRegistry.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/ExecutorRegistry.java
@@ -59,7 +59,7 @@ public final class ExecutorRegistry {
     }
   }
 
-  synchronized void viewExecutors(final Consumer<Set<ExecutorRepresenter>> consumer)
{
+  public synchronized void viewExecutors(final Consumer<Set<ExecutorRepresenter>>
consumer) {
     consumer.accept(getRunningExecutors());
   }
 
diff --git a/tests/src/test/java/edu/snu/nemo/runtime/master/MetricFlushTest.java b/tests/src/test/java/edu/snu/nemo/runtime/master/MetricFlushTest.java
new file mode 100644
index 0000000..75f964f
--- /dev/null
+++ b/tests/src/test/java/edu/snu/nemo/runtime/master/MetricFlushTest.java
@@ -0,0 +1,141 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.runtime.master;
+
+import edu.snu.nemo.runtime.common.comm.ControlMessage;
+import edu.snu.nemo.runtime.common.message.MessageContext;
+import edu.snu.nemo.runtime.common.message.MessageEnvironment;
+import edu.snu.nemo.runtime.common.message.MessageListener;
+import edu.snu.nemo.runtime.common.message.MessageSender;
+import edu.snu.nemo.runtime.common.message.local.LocalMessageDispatcher;
+import edu.snu.nemo.runtime.common.message.local.LocalMessageEnvironment;
+import edu.snu.nemo.runtime.executor.MetricManagerWorker;
+import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
+import edu.snu.nemo.runtime.master.scheduler.ExecutorRegistry;
+import org.apache.reef.tang.Configuration;
+import org.apache.reef.tang.Injector;
+import org.apache.reef.tang.Tang;
+import org.apache.reef.tang.exceptions.InjectionException;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.stubbing.Answer;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.function.Consumer;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+
+/**
+ * Ensures metrics collected by {@link edu.snu.nemo.runtime.executor.MetricManagerWorker}
are properly sent to master
+ * before the job finishes.
+ */
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({ExecutorRepresenter.class, ExecutorRegistry.class})
+public final class MetricFlushTest {
+  private static final Tang TANG = Tang.Factory.getTang();
+  private static final String MASTER = "MASTER";
+  private static final String WORKER = "WORKER";
+  private static final int EXECUTOR_NUM = 5;
+  private static MessageSender masterToWorkerSender;
+
+  @Test(timeout = 10000)
+  public void test() throws InjectionException, ExecutionException, InterruptedException
{
+    final CountDownLatch latch = new CountDownLatch(EXECUTOR_NUM);
+
+    final LocalMessageDispatcher localMessagedispatcher = new LocalMessageDispatcher();
+
+    final Configuration configuration = TANG.newConfigurationBuilder()
+        .build();
+    final Injector injector = TANG.newInjector(configuration);
+
+    final Injector masterInjector = injector.forkInjector();
+    final Injector workerInjector = injector.forkInjector();
+
+    final LocalMessageEnvironment masterMessageEnvironment = new LocalMessageEnvironment(MASTER,
+        localMessagedispatcher);
+    masterInjector.bindVolatileInstance(MessageEnvironment.class, masterMessageEnvironment);
+
+    final LocalMessageEnvironment workerMessageEnvironment = new LocalMessageEnvironment(WORKER,
+        localMessagedispatcher);
+    workerInjector.bindVolatileInstance(MessageEnvironment.class, workerMessageEnvironment);
+
+    masterToWorkerSender = masterMessageEnvironment
+        .asyncConnect(WORKER, MessageEnvironment.EXECUTOR_MESSAGE_LISTENER_ID).get();
+
+    final Set<ExecutorRepresenter> executorRepresenterSet = new HashSet<>();
+
+    for (int i = 0; i < EXECUTOR_NUM; i++) {
+      executorRepresenterSet.add(newWorker());
+    }
+
+    final ExecutorRegistry executorRegistry = mock(ExecutorRegistry.class);
+    doAnswer((Answer<Void>) invocationOnMock -> {
+      final Consumer<Set<ExecutorRepresenter>> consumer = (Consumer) invocationOnMock.getArguments()[0];
+      consumer.accept(executorRepresenterSet);
+      return null;
+    }).when(executorRegistry).viewExecutors(any());
+
+    masterInjector.bindVolatileInstance(ExecutorRegistry.class, executorRegistry);
+
+    final MetricManagerMaster metricManagerMaster = masterInjector.getInstance(MetricManagerMaster.class);
+    final MetricManagerWorker metricManagerWorker = workerInjector.getInstance(MetricManagerWorker.class);
+
+    masterMessageEnvironment.setupListener(MessageEnvironment.RUNTIME_MASTER_MESSAGE_LISTENER_ID,
+        new MessageListener<Object>() {
+        @Override
+        public void onMessage(Object message) {
+          latch.countDown();
+        }
+
+        @Override
+        public void onMessageWithContext(Object message, MessageContext messageContext) {
+        }
+    });
+
+    workerMessageEnvironment.setupListener(MessageEnvironment.EXECUTOR_MESSAGE_LISTENER_ID,
+        new MessageListener<Object>() {
+          @Override
+          public void onMessage(Object message) {
+            metricManagerWorker.flush();
+          }
+
+          @Override
+          public void onMessageWithContext(Object message, MessageContext messageContext)
{
+          }
+        });
+
+    metricManagerMaster.sendMetricFlushRequest();
+
+    latch.await();
+  }
+
+  private ExecutorRepresenter newWorker() {
+    final ExecutorRepresenter workerRepresenter = mock(ExecutorRepresenter.class);
+    doAnswer((Answer<Void>) invocationOnMock -> {
+      final ControlMessage.Message msg = (ControlMessage.Message) invocationOnMock.getArguments()[0];
+      masterToWorkerSender.send(msg);
+      return null;
+    }).when(workerRepresenter).sendControlMessage(any());
+    return workerRepresenter;
+  }
+}


Mime
View raw message