Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id E62BF200B96 for ; Wed, 21 Sep 2016 11:52:39 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id E50EA160ACF; Wed, 21 Sep 2016 09:52:39 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 74D08160AFD for ; Wed, 21 Sep 2016 11:52:37 +0200 (CEST) Received: (qmail 63333 invoked by uid 500); 21 Sep 2016 09:52:36 -0000 Mailing-List: contact commits-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list commits@flink.apache.org Received: (qmail 62716 invoked by uid 99); 21 Sep 2016 09:52:36 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 21 Sep 2016 09:52:36 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id EDCC3E93E5; Wed, 21 Sep 2016 09:52:35 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: trohrmann@apache.org To: commits@flink.apache.org Date: Wed, 21 Sep 2016 09:53:19 -0000 Message-Id: <9d06ffe0e0054f918a7ca836666806c0@git.apache.org> In-Reply-To: <50011b84f8e94b19ac9845fd15a1fafe@git.apache.org> References: <50011b84f8e94b19ac9845fd15a1fafe@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [46/50] [abbrv] flink git commit: [hotfix] [taskmanager] Fixes TaskManager component creation at startup archived-at: Wed, 21 Sep 2016 09:52:40 -0000 [hotfix] [taskmanager] Fixes TaskManager component creation at startup Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/26305430 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/26305430 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/26305430 Branch: refs/heads/flip-6 Commit: 2630543084b917524689c2af9e1090da7f5a92d6 Parents: 9718dcd Author: Till Rohrmann Authored: Thu Sep 8 18:43:15 2016 +0200 Committer: Till Rohrmann Committed: Wed Sep 21 11:39:17 2016 +0200 ---------------------------------------------------------------------- .../runtime/taskexecutor/TaskExecutor.java | 189 ++++++++++++++++--- .../taskexecutor/TaskExecutorConfiguration.java | 9 - 2 files changed, 159 insertions(+), 39 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/26305430/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java index 735730b..a455fe2 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java @@ -19,9 +19,19 @@ package org.apache.flink.runtime.taskexecutor; import akka.actor.ActorSystem; -import akka.dispatch.ExecutionContexts$; import akka.util.Timeout; import com.typesafe.config.Config; +import org.apache.flink.runtime.io.network.ConnectionManager; +import org.apache.flink.runtime.io.network.LocalConnectionManager; +import org.apache.flink.runtime.io.network.TaskEventDispatcher; +import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool; +import org.apache.flink.runtime.io.network.netty.NettyConnectionManager; +import org.apache.flink.runtime.io.network.partition.ResultPartitionManager; +import org.apache.flink.runtime.query.KvStateRegistry; +import org.apache.flink.runtime.query.netty.DisabledKvStateRequestStats; +import org.apache.flink.runtime.query.netty.KvStateServer; +import org.apache.flink.runtime.taskmanager.TaskManagerLocation; +import org.apache.flink.util.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -37,7 +47,6 @@ import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; -import org.apache.flink.runtime.instance.InstanceConnectionInfo; import org.apache.flink.runtime.io.disk.iomanager.IOManager; import org.apache.flink.runtime.io.disk.iomanager.IOManager.IOMode; import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync; @@ -61,7 +70,6 @@ import org.apache.flink.util.NetUtils; import scala.Tuple2; import scala.Option; import scala.Some; -import scala.concurrent.ExecutionContext; import scala.concurrent.duration.Duration; import scala.concurrent.duration.FiniteDuration; @@ -70,9 +78,9 @@ import java.io.IOException; import java.net.InetAddress; import java.net.InetSocketAddress; import java.util.UUID; -import java.util.concurrent.ForkJoinPool; import java.util.concurrent.TimeUnit; +import static org.apache.flink.util.Preconditions.checkArgument; import static org.apache.flink.util.Preconditions.checkNotNull; /** @@ -86,6 +94,8 @@ public class TaskExecutor extends RpcEndpoint { /** The unique resource ID of this TaskExecutor */ private final ResourceID resourceID; + private final TaskManagerLocation taskManagerLocation; + /** The access to the leader election and metadata storage services */ private final HighAvailabilityServices haServices; @@ -113,22 +123,26 @@ public class TaskExecutor extends RpcEndpoint { public TaskExecutor( TaskExecutorConfiguration taskExecutorConfig, ResourceID resourceID, + TaskManagerLocation taskManagerLocation, MemoryManager memoryManager, IOManager ioManager, NetworkEnvironment networkEnvironment, - int numberOfSlots, RpcService rpcService, HighAvailabilityServices haServices) { super(rpcService); + checkArgument(taskExecutorConfig.getNumberOfSlots() > 0, "The number of slots has to be larger than 0."); + this.taskExecutorConfig = checkNotNull(taskExecutorConfig); this.resourceID = checkNotNull(resourceID); + this.taskManagerLocation = checkNotNull(taskManagerLocation); this.memoryManager = checkNotNull(memoryManager); this.ioManager = checkNotNull(ioManager); this.networkEnvironment = checkNotNull(networkEnvironment); - this.numberOfSlots = checkNotNull(numberOfSlots); this.haServices = checkNotNull(haServices); + + this.numberOfSlots = taskExecutorConfig.getNumberOfSlots(); } // ------------------------------------------------------------------------ @@ -360,10 +374,10 @@ public class TaskExecutor extends RpcEndpoint { * then a HighAvailabilityServices is constructed from the configuration. * @param localTaskManagerCommunication If true, the TaskManager will not initiate the TCP network stack. * @return An ActorRef to the TaskManager actor. - * @throws org.apache.flink.configuration.IllegalConfigurationException Thrown, if the given config contains illegal values. - * @throws java.io.IOException Thrown, if any of the I/O components (such as buffer pools, + * @throws IllegalConfigurationException Thrown, if the given config contains illegal values. + * @throws IOException Thrown, if any of the I/O components (such as buffer pools, * I/O manager, ...) cannot be properly started. - * @throws java.lang.Exception Thrown is some other error occurs while parsing the configuration + * @throws Exception Thrown is some other error occurs while parsing the configuration * or starting the TaskManager components. */ public static TaskExecutor startTaskManagerComponentsAndActor( @@ -377,19 +391,105 @@ public class TaskExecutor extends RpcEndpoint { final TaskExecutorConfiguration taskExecutorConfig = parseTaskManagerConfiguration( configuration, taskManagerHostname, localTaskManagerCommunication); + TaskManagerComponents taskManagerComponents = createTaskManagerComponents( + resourceID, + InetAddress.getByName(taskManagerHostname), + taskExecutorConfig, + configuration); + + final TaskExecutor taskExecutor = new TaskExecutor( + taskExecutorConfig, + resourceID, + taskManagerComponents.getTaskManagerLocation(), + taskManagerComponents.getMemoryManager(), + taskManagerComponents.getIOManager(), + taskManagerComponents.getNetworkEnvironment(), + rpcService, + haServices); + + return taskExecutor; + } + + /** + * Creates and returns the task manager components. + * + * @param resourceID resource ID of the task manager + * @param taskManagerAddress address of the task manager + * @param taskExecutorConfig task manager configuration + * @param configuration of Flink + * @return task manager components + * @throws Exception + */ + private static TaskExecutor.TaskManagerComponents createTaskManagerComponents( + ResourceID resourceID, + InetAddress taskManagerAddress, + TaskExecutorConfiguration taskExecutorConfig, + Configuration configuration) throws Exception { MemoryType memType = taskExecutorConfig.getNetworkConfig().memoryType(); // pre-start checks checkTempDirs(taskExecutorConfig.getTmpDirPaths()); - ExecutionContext executionContext = ExecutionContexts$.MODULE$.fromExecutor(new ForkJoinPool()); + NetworkEnvironmentConfiguration networkEnvironmentConfiguration = taskExecutorConfig.getNetworkConfig(); + + NetworkBufferPool networkBufferPool = new NetworkBufferPool( + networkEnvironmentConfiguration.numNetworkBuffers(), + networkEnvironmentConfiguration.networkBufferSize(), + networkEnvironmentConfiguration.memoryType()); + + ConnectionManager connectionManager; + + if (networkEnvironmentConfiguration.nettyConfig().isDefined()) { + connectionManager = new NettyConnectionManager(networkEnvironmentConfiguration.nettyConfig().get()); + } else { + connectionManager = new LocalConnectionManager(); + } + + ResultPartitionManager resultPartitionManager = new ResultPartitionManager(); + TaskEventDispatcher taskEventDispatcher = new TaskEventDispatcher(); + + KvStateRegistry kvStateRegistry = new KvStateRegistry(); + + KvStateServer kvStateServer; + + if (networkEnvironmentConfiguration.nettyConfig().isDefined()) { + NettyConfig nettyConfig = networkEnvironmentConfiguration.nettyConfig().get(); + + int numNetworkThreads = networkEnvironmentConfiguration.queryServerNetworkThreads() == 0 ? + nettyConfig.getNumberOfSlots() : networkEnvironmentConfiguration.queryServerNetworkThreads(); + + int numQueryThreads = networkEnvironmentConfiguration.queryServerQueryThreads() == 0 ? + nettyConfig.getNumberOfSlots() : networkEnvironmentConfiguration.queryServerQueryThreads(); + + kvStateServer = new KvStateServer( + taskManagerAddress, + networkEnvironmentConfiguration.queryServerPort(), + numNetworkThreads, + numQueryThreads, + kvStateRegistry, + new DisabledKvStateRequestStats()); + } else { + kvStateServer = null; + } // we start the network first, to make sure it can allocate its buffers first final NetworkEnvironment network = new NetworkEnvironment( - executionContext, - taskExecutorConfig.getTimeout(), - taskExecutorConfig.getNetworkConfig(), - taskExecutorConfig.getConnectionInfo()); + networkBufferPool, + connectionManager, + resultPartitionManager, + taskEventDispatcher, + kvStateRegistry, + kvStateServer, + networkEnvironmentConfiguration.ioMode(), + networkEnvironmentConfiguration.partitionRequestInitialBackoff(), + networkEnvironmentConfiguration.partitinRequestMaxBackoff()); + + network.start(); + + TaskManagerLocation taskManagerLocation = new TaskManagerLocation( + resourceID, + taskManagerAddress, + network.getConnectionManager().getDataPort()); // computing the amount of memory to use depends on how much memory is available // it strictly needs to happen AFTER the network stack has been initialized @@ -473,17 +573,7 @@ public class TaskExecutor extends RpcEndpoint { // start the I/O manager, it will create some temp directories. final IOManager ioManager = new IOManagerAsync(taskExecutorConfig.getTmpDirPaths()); - final TaskExecutor taskExecutor = new TaskExecutor( - taskExecutorConfig, - resourceID, - memoryManager, - ioManager, - network, - taskExecutorConfig.getNumberOfSlots(), - rpcService, - haServices); - - return taskExecutor; + return new TaskExecutor.TaskManagerComponents(taskManagerLocation, memoryManager, ioManager, network); } // -------------------------------------------------------------------------- @@ -519,7 +609,7 @@ public class TaskExecutor extends RpcEndpoint { "Leave config parameter empty or use 0 to let the system choose a port automatically."); InetAddress taskManagerAddress = InetAddress.getByName(taskManagerHostname); - final InstanceConnectionInfo connectionInfo = new InstanceConnectionInfo(taskManagerAddress, dataport); + final InetSocketAddress taskManagerInetSocketAddress = new InetSocketAddress(taskManagerAddress, dataport); // ----> memory / network stack (shuffles/broadcasts), task slots, temp directories @@ -576,7 +666,12 @@ public class TaskExecutor extends RpcEndpoint { final NettyConfig nettyConfig; if (!localTaskManagerCommunication) { - nettyConfig = new NettyConfig(connectionInfo.address(), connectionInfo.dataPort(), pageSize, slots, configuration); + nettyConfig = new NettyConfig( + taskManagerInetSocketAddress.getAddress(), + taskManagerInetSocketAddress.getPort(), + pageSize, + slots, + configuration); } else { nettyConfig = null; } @@ -613,8 +708,9 @@ public class TaskExecutor extends RpcEndpoint { queryServerPort, queryServerNetworkThreads, queryServerQueryThreads, - localTaskManagerCommunication ? Option.empty() : new Some<>(nettyConfig), - new Tuple2<>(500, 3000)); + Option.apply(nettyConfig), + 500, + 30000); // ----> timeouts, library caching, profiling @@ -695,7 +791,6 @@ public class TaskExecutor extends RpcEndpoint { return new TaskExecutorConfiguration( tmpDirs, cleanupInterval, - connectionInfo, networkConfig, timeout, finiteRegistrationDuration, @@ -829,4 +924,38 @@ public class TaskExecutor extends RpcEndpoint { onFatalErrorAsync(exception); } } + + private static class TaskManagerComponents { + private final TaskManagerLocation taskManagerLocation; + private final MemoryManager memoryManager; + private final IOManager ioManager; + private final NetworkEnvironment networkEnvironment; + + private TaskManagerComponents( + TaskManagerLocation taskManagerLocation, + MemoryManager memoryManager, + IOManager ioManager, + NetworkEnvironment networkEnvironment) { + this.taskManagerLocation = Preconditions.checkNotNull(taskManagerLocation); + this.memoryManager = Preconditions.checkNotNull(memoryManager); + this.ioManager = Preconditions.checkNotNull(ioManager); + this.networkEnvironment = Preconditions.checkNotNull(networkEnvironment); + } + + public MemoryManager getMemoryManager() { + return memoryManager; + } + + public IOManager getIOManager() { + return ioManager; + } + + public NetworkEnvironment getNetworkEnvironment() { + return networkEnvironment; + } + + public TaskManagerLocation getTaskManagerLocation() { + return taskManagerLocation; + } + } } http://git-wip-us.apache.org/repos/asf/flink/blob/26305430/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorConfiguration.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorConfiguration.java index 3707a47..c97c893 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorConfiguration.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorConfiguration.java @@ -19,7 +19,6 @@ package org.apache.flink.runtime.taskexecutor; import org.apache.flink.configuration.Configuration; -import org.apache.flink.runtime.instance.InstanceConnectionInfo; import org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration; import scala.concurrent.duration.FiniteDuration; @@ -52,12 +51,9 @@ public class TaskExecutorConfiguration implements Serializable { private final NetworkEnvironmentConfiguration networkConfig; - private final InstanceConnectionInfo connectionInfo; - public TaskExecutorConfiguration( String[] tmpDirPaths, long cleanupInterval, - InstanceConnectionInfo connectionInfo, NetworkEnvironmentConfiguration networkConfig, FiniteDuration timeout, FiniteDuration maxRegistrationDuration, @@ -66,7 +62,6 @@ public class TaskExecutorConfiguration implements Serializable { this (tmpDirPaths, cleanupInterval, - connectionInfo, networkConfig, timeout, maxRegistrationDuration, @@ -80,7 +75,6 @@ public class TaskExecutorConfiguration implements Serializable { public TaskExecutorConfiguration( String[] tmpDirPaths, long cleanupInterval, - InstanceConnectionInfo connectionInfo, NetworkEnvironmentConfiguration networkConfig, FiniteDuration timeout, FiniteDuration maxRegistrationDuration, @@ -92,7 +86,6 @@ public class TaskExecutorConfiguration implements Serializable { this.tmpDirPaths = checkNotNull(tmpDirPaths); this.cleanupInterval = checkNotNull(cleanupInterval); - this.connectionInfo = checkNotNull(connectionInfo); this.networkConfig = checkNotNull(networkConfig); this.timeout = checkNotNull(timeout); this.maxRegistrationDuration = maxRegistrationDuration; @@ -115,8 +108,6 @@ public class TaskExecutorConfiguration implements Serializable { return cleanupInterval; } - public InstanceConnectionInfo getConnectionInfo() { return connectionInfo; } - public NetworkEnvironmentConfiguration getNetworkConfig() { return networkConfig; } public FiniteDuration getTimeout() {