nemo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jeongy...@apache.org
Subject [incubator-nemo] branch master updated: [NEMO-68] Restrict the number of parallel connections between executors (#22)
Date Mon, 28 May 2018 10:49:50 GMT
This is an automated email from the ASF dual-hosted git repository.

jeongyoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-nemo.git


The following commit(s) were added to refs/heads/master by this push:
     new 0d9a6ec  [NEMO-68] Restrict the number of parallel connections between executors
(#22)
0d9a6ec is described below

commit 0d9a6ec6cfc3e33d12a9c54445768457c7dad1fe
Author: JangHo Seo <jangho@jangho.io>
AuthorDate: Mon May 28 19:49:47 2018 +0900

    [NEMO-68] Restrict the number of parallel connections between executors (#22)
    
    JIRA: NEMO-68: Restrict the number of parallel connections between executors
    
    Major changes:
    * Added JobConf MaxNumDownloadsForARuntimeEdge to cap the number of parallel block downloads
for a runtime edge.
    * Added BlockTransferConnectionQueue to support MaxNumDownloadsForARuntimeEdge.
    
    Minor changes to note:
    None
    
    Tests for the changes:
    * BlockTransferConnectionQueueTest tests BlockTransferConnectionQueue.
    
    Other comments:
    None
    
    resolves NEMO-68
---
 conf/src/main/java/edu/snu/nemo/conf/JobConf.java  |  8 +++
 .../runtime/executor/data/BlockManagerWorker.java  | 16 +++--
 .../data/BlockTransferConnectionQueue.java         | 83 ++++++++++++++++++++++
 .../data/BlockTransferConnectionQueueTest.java     | 75 +++++++++++++++++++
 4 files changed, 178 insertions(+), 4 deletions(-)

diff --git a/conf/src/main/java/edu/snu/nemo/conf/JobConf.java b/conf/src/main/java/edu/snu/nemo/conf/JobConf.java
index 19a654d..da3d671 100644
--- a/conf/src/main/java/edu/snu/nemo/conf/JobConf.java
+++ b/conf/src/main/java/edu/snu/nemo/conf/JobConf.java
@@ -142,6 +142,14 @@ public final class JobConf extends ConfigurationModuleBuilder {
   }
 
   /**
+   * Maximum number of parallel downloads for a runtime edge.
+   */
+  @NamedParameter(doc = "Maximum number of parallel downloads for a runtime edge.", short_name
= "max_downloads",
+      default_value = "30")
+  public final class MaxNumDownloadsForARuntimeEdge implements Name<Integer> {
+  }
+
+  /**
    * Max number of attempts for task scheduling.
    */
   @NamedParameter(doc = "Max number of schedules", short_name = "max_schedule_attempt", default_value
= "3")
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/BlockManagerWorker.java
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/BlockManagerWorker.java
index bdc616f..d7b13f2 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/BlockManagerWorker.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/BlockManagerWorker.java
@@ -72,6 +72,7 @@ public final class BlockManagerWorker {
   private final Map<String, AtomicInteger> blockToRemainingRead;
   private final SerializerManager serializerManager;
   private final Map<String, CompletableFuture<ControlMessage.Message>> pendingBlockLocationRequest;
+  private final BlockTransferConnectionQueue blockTransferConnectionQueue;
 
   /**
    * Constructor.
@@ -85,6 +86,7 @@ public final class BlockManagerWorker {
    * @param persistentConnectionToMasterMap the connection map.
    * @param byteTransfer                    the byte transfer.
    * @param serializerManager               the serializer manager.
+   * @param blockTransferConnectionQueue    restricts parallel connections
    */
   @Inject
   private BlockManagerWorker(@Parameter(JobConf.ExecutorId.class) final String executorId,
@@ -95,7 +97,8 @@ public final class BlockManagerWorker {
                              final RemoteFileStore remoteFileStore,
                              final PersistentConnectionToMasterMap persistentConnectionToMasterMap,
                              final ByteTransfer byteTransfer,
-                             final SerializerManager serializerManager) {
+                             final SerializerManager serializerManager,
+                             final BlockTransferConnectionQueue blockTransferConnectionQueue)
{
     this.executorId = executorId;
     this.memoryStore = memoryStore;
     this.serializedMemoryStore = serializedMemoryStore;
@@ -107,6 +110,7 @@ public final class BlockManagerWorker {
     this.blockToRemainingRead = new ConcurrentHashMap<>();
     this.serializerManager = serializerManager;
     this.pendingBlockLocationRequest = new ConcurrentHashMap<>();
+    this.blockTransferConnectionQueue = blockTransferConnectionQueue;
   }
 
   /**
@@ -229,9 +233,13 @@ public final class BlockManagerWorker {
             .setRuntimeEdgeId(runtimeEdgeId)
             .setKeyRange(ByteString.copyFrom(SerializationUtils.serialize(keyRange)))
             .build();
-        return byteTransfer.newInputContext(targetExecutorId, descriptor.toByteArray())
-            .thenCompose(context -> context.getCompletedFuture())
-            .thenApply(streams -> new DataUtil.InputStreamIterator(streams,
+        final CompletableFuture<ByteInputContext> contextFuture = blockTransferConnectionQueue
+            .requestConnectPermission(runtimeEdgeId)
+            .thenCompose(obj -> byteTransfer.newInputContext(targetExecutorId, descriptor.toByteArray()));
+        contextFuture.thenApply(context -> context.getCompletedFuture()
+            .thenAccept(f -> blockTransferConnectionQueue.onConnectionFinished(runtimeEdgeId)));
+        return contextFuture
+            .thenApply(context -> new DataUtil.InputStreamIterator(context.getInputStreams(),
                 serializerManager.getSerializer(runtimeEdgeId)));
       }
     });
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/BlockTransferConnectionQueue.java
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/BlockTransferConnectionQueue.java
new file mode 100644
index 0000000..0870082
--- /dev/null
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/BlockTransferConnectionQueue.java
@@ -0,0 +1,83 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.runtime.executor.data;
+
+import edu.snu.nemo.conf.JobConf;
+import org.apache.reef.tang.annotations.Parameter;
+
+import javax.inject.Inject;
+import java.util.ArrayDeque;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * A class to restrict parallel connection per runtime edge.
+ * Executors can suffer from performance degradation and network-related exceptions when
there are massive connections,
+ * especially under low network bandwidth or high volume of data.
+ */
+public final class BlockTransferConnectionQueue {
+  private final Map<String, Integer> runtimeEdgeIdToNumCurrentConnections = new HashMap<>();
+  private final Map<String, Queue<CompletableFuture<Void>>> runtimeEdgeIdToPendingConnections
= new HashMap<>();
+  private final int maxNum;
+
+  @Inject
+  private BlockTransferConnectionQueue(@Parameter(JobConf.MaxNumDownloadsForARuntimeEdge.class)
final int maxNum) {
+    this.maxNum = maxNum;
+  }
+
+  /**
+   * Request a permission to make a connection.
+   * @param runtimeEdgeId the corresponding runtime edge id.
+   * @return a future that will be completed when the connection is granted.
+   */
+  public synchronized CompletableFuture<Void> requestConnectPermission(final String
runtimeEdgeId) {
+    runtimeEdgeIdToNumCurrentConnections.putIfAbsent(runtimeEdgeId, 0);
+    runtimeEdgeIdToPendingConnections.computeIfAbsent(runtimeEdgeId, id -> new ArrayDeque<>());
+    final int currentOutstandingConnections = runtimeEdgeIdToNumCurrentConnections.get(runtimeEdgeId);
+
+    if (currentOutstandingConnections < maxNum) {
+      // grant immediately
+      runtimeEdgeIdToNumCurrentConnections.put(runtimeEdgeId, currentOutstandingConnections
+ 1);
+      return CompletableFuture.completedFuture(null);
+    } else {
+      // add to pending queue
+      final CompletableFuture<Void> future = new CompletableFuture<>();
+      runtimeEdgeIdToPendingConnections.get(runtimeEdgeId).add(future);
+      return future;
+    }
+  }
+
+  /**
+   * Indicates the connection has finished.
+   * @param runtimeEdgeId the corresponding runtime edge id.
+   */
+  public synchronized void onConnectionFinished(final String runtimeEdgeId) {
+    final Queue<CompletableFuture<Void>> pendingConnections = runtimeEdgeIdToPendingConnections.get(runtimeEdgeId);
+    if (pendingConnections.size() == 0) {
+      // Just decrease the number of current connections.
+      // Since we have no pending connections, we leave pendingConnections queue untouched.
+      final int numCurrentConnections = runtimeEdgeIdToNumCurrentConnections.get(runtimeEdgeId);
+      runtimeEdgeIdToNumCurrentConnections.put(runtimeEdgeId, numCurrentConnections - 1);
+    } else {
+      // Since pendingConnections has at least one element, the poll method invocation will
immediately return.
+      // One connection is completed, and another connection kicks in; the number of current
connection stays same
+      final CompletableFuture<Void> nextFuture = pendingConnections.poll();
+      nextFuture.complete(null);
+    }
+  }
+}
diff --git a/tests/src/test/java/edu/snu/nemo/tests/runtime/executor/data/BlockTransferConnectionQueueTest.java
b/tests/src/test/java/edu/snu/nemo/tests/runtime/executor/data/BlockTransferConnectionQueueTest.java
new file mode 100644
index 0000000..bc1ba30
--- /dev/null
+++ b/tests/src/test/java/edu/snu/nemo/tests/runtime/executor/data/BlockTransferConnectionQueueTest.java
@@ -0,0 +1,75 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.tests.runtime.executor.data;
+
+import edu.snu.nemo.conf.JobConf;
+import edu.snu.nemo.runtime.executor.data.BlockTransferConnectionQueue;
+import org.apache.reef.tang.Configuration;
+import org.apache.reef.tang.Injector;
+import org.apache.reef.tang.Tang;
+import org.apache.reef.tang.exceptions.InjectionException;
+import org.junit.Test;
+
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import static org.junit.Assert.assertFalse;
+
+public final class BlockTransferConnectionQueueTest {
+  private static final String THREAD_NAME = BlockTransferConnectionQueue.class.getSimpleName()
+ "-TestThread";
+  private static final String RUNTIME_EDGE_0 = "RuntimeEdge0";
+  private static final int WAIT_TIME = 1000;
+  /**
+   * Creates {@link BlockTransferConnectionQueue} for testing.
+   * @param maxNum value for {@link JobConf.MaxNumDownloadsForARuntimeEdge} parameter.
+   * @return {@link BlockTransferConnectionQueue} object created.
+   */
+  private final BlockTransferConnectionQueue getQueue(final int maxNum) {
+    final Configuration conf = Tang.Factory.getTang().newConfigurationBuilder()
+        .bindNamedParameter(JobConf.MaxNumDownloadsForARuntimeEdge.class, String.valueOf(maxNum))
+        .build();
+    final Injector injector = Tang.Factory.getTang().newInjector(conf);
+    try {
+      return injector.getInstance(BlockTransferConnectionQueue.class);
+    } catch (final InjectionException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Test(timeout = WAIT_TIME * 2)
+  public void test() throws InterruptedException, ExecutionException {
+    final ExecutorService executorService = Executors.newSingleThreadExecutor(
+        runnable -> new Thread(runnable, THREAD_NAME));
+    final BlockTransferConnectionQueue queue = getQueue(3);
+    final Future executorServiceFuture = executorService.submit(() -> {
+      try {
+        queue.requestConnectPermission(RUNTIME_EDGE_0).get();
+        queue.requestConnectPermission(RUNTIME_EDGE_0).get();
+        queue.requestConnectPermission(RUNTIME_EDGE_0).get();
+        queue.requestConnectPermission(RUNTIME_EDGE_0).get();
+      } catch (final InterruptedException | ExecutionException e) {
+        throw new RuntimeException(e);
+      }
+    });
+    Thread.sleep(WAIT_TIME);
+    // We must have one pending connection request.
+    assertFalse(executorServiceFuture.isDone());
+    queue.onConnectionFinished(RUNTIME_EDGE_0);
+    // The remaining request should be accepted before test timeout.
+    executorServiceFuture.get();
+  }
+}

-- 
To stop receiving notification emails like this one, please contact
jeongyoon@apache.org.

Mime
View raw message