Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 46F13200B96 for ; Wed, 21 Sep 2016 11:52:40 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 45CFA160AE1; Wed, 21 Sep 2016 09:52:40 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 122C8160AE2 for ; Wed, 21 Sep 2016 11:52:36 +0200 (CEST) Received: (qmail 62855 invoked by uid 500); 21 Sep 2016 09:52:36 -0000 Mailing-List: contact commits-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list commits@flink.apache.org Received: (qmail 62569 invoked by uid 99); 21 Sep 2016 09:52:36 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 21 Sep 2016 09:52:36 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id D4BE7DF99D; Wed, 21 Sep 2016 09:52:35 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 8bit From: trohrmann@apache.org To: commits@flink.apache.org Date: Wed, 21 Sep 2016 09:53:13 -0000 Message-Id: <7bc9b331f70c4e4aa9a05fb9d327ce86@git.apache.org> In-Reply-To: <50011b84f8e94b19ac9845fd15a1fafe@git.apache.org> References: <50011b84f8e94b19ac9845fd15a1fafe@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [40/50] [abbrv] flink git commit: [FLINK-4363] Implement TaskManager basic startup of all components in java archived-at: Wed, 21 Sep 2016 09:52:40 -0000 [FLINK-4363] Implement TaskManager basic startup of all components in java This closes #2400 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/35e80103 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/35e80103 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/35e80103 Branch: refs/heads/flip-6 Commit: 35e8010337fd8ccee231724f6ea2f4f5eb196c60 Parents: ffd20e9 Author: 淘江 Authored: Tue Aug 30 11:28:14 2016 +0800 Committer: Till Rohrmann Committed: Wed Sep 21 11:39:16 2016 +0200 ---------------------------------------------------------------------- .../runtime/rpc/taskexecutor/TaskExecutor.java | 686 ++++++++++++++++++- .../taskexecutor/TaskExecutorConfiguration.java | 151 ++++ .../rpc/taskexecutor/TaskExecutorTest.java | 8 +- 3 files changed, 822 insertions(+), 23 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/35e80103/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutor.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutor.java index f201e00..36d6310 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutor.java @@ -18,15 +18,60 @@ package org.apache.flink.runtime.rpc.taskexecutor; +import akka.actor.ActorSystem; +import akka.dispatch.ExecutionContexts$; +import akka.util.Timeout; +import com.typesafe.config.Config; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.IllegalConfigurationException; +import org.apache.flink.core.memory.HeapMemorySegment; +import org.apache.flink.core.memory.HybridMemorySegment; +import org.apache.flink.core.memory.MemorySegmentFactory; +import org.apache.flink.core.memory.MemoryType; +import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; +import org.apache.flink.runtime.instance.InstanceConnectionInfo; +import org.apache.flink.runtime.io.disk.iomanager.IOManager; +import org.apache.flink.runtime.io.disk.iomanager.IOManager.IOMode; +import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync; +import org.apache.flink.runtime.io.network.NetworkEnvironment; +import org.apache.flink.runtime.io.network.netty.NettyConfig; +import org.apache.flink.runtime.leaderelection.LeaderElectionService; import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener; +import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; +import org.apache.flink.runtime.memory.MemoryManager; import org.apache.flink.runtime.rpc.RpcEndpoint; import org.apache.flink.runtime.rpc.RpcMethod; import org.apache.flink.runtime.rpc.RpcService; - +import org.apache.flink.runtime.rpc.akka.AkkaRpcService; +import org.apache.flink.runtime.taskmanager.MemoryLogger; +import org.apache.flink.runtime.util.EnvironmentInformation; +import org.apache.flink.runtime.util.LeaderRetrievalUtils; +import org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration; +import org.apache.flink.util.MathUtils; +import org.apache.flink.util.NetUtils; + +import scala.Tuple2; +import scala.Option; +import scala.Some; +import scala.concurrent.ExecutionContext; +import scala.concurrent.duration.Duration; +import scala.concurrent.duration.FiniteDuration; + +import java.io.File; +import java.io.IOException; +import java.net.InetAddress; +import java.net.InetSocketAddress; import java.util.UUID; +import java.util.concurrent.ForkJoinPool; +import java.util.concurrent.TimeUnit; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -36,12 +81,29 @@ import static org.apache.flink.util.Preconditions.checkNotNull; */ public class TaskExecutor extends RpcEndpoint { + private static final Logger LOG = LoggerFactory.getLogger(TaskExecutor.class); + /** The unique resource ID of this TaskExecutor */ private final ResourceID resourceID; /** The access to the leader election and metadata storage services */ private final HighAvailabilityServices haServices; + /** The task manager configuration */ + private final TaskExecutorConfiguration taskExecutorConfig; + + /** The I/O manager component in the task manager */ + private final IOManager ioManager; + + /** The memory manager component in the task manager */ + private final MemoryManager memoryManager; + + /** The network component in the task manager */ + private final NetworkEnvironment networkEnvironment; + + /** The number of slots in the task manager, should be 1 for YARN */ + private final int numberOfSlots; + // --------- resource manager -------- private TaskExecutorToResourceManagerConnection resourceManagerConnection; @@ -49,22 +111,24 @@ public class TaskExecutor extends RpcEndpoint { // ------------------------------------------------------------------------ public TaskExecutor( + TaskExecutorConfiguration taskExecutorConfig, + ResourceID resourceID, + MemoryManager memoryManager, + IOManager ioManager, + NetworkEnvironment networkEnvironment, + int numberOfSlots, RpcService rpcService, - HighAvailabilityServices haServices, - ResourceID resourceID) { + HighAvailabilityServices haServices) { super(rpcService); - this.haServices = checkNotNull(haServices); + this.taskExecutorConfig = checkNotNull(taskExecutorConfig); this.resourceID = checkNotNull(resourceID); - } - - // ------------------------------------------------------------------------ - // Properties - // ------------------------------------------------------------------------ - - public ResourceID getResourceID() { - return resourceID; + this.memoryManager = checkNotNull(memoryManager); + this.ioManager = checkNotNull(ioManager); + this.networkEnvironment = checkNotNull(networkEnvironment); + this.numberOfSlots = checkNotNull(numberOfSlots); + this.haServices = checkNotNull(haServices); } // ------------------------------------------------------------------------ @@ -83,7 +147,6 @@ public class TaskExecutor extends RpcEndpoint { } } - // ------------------------------------------------------------------------ // RPC methods - ResourceManager related // ------------------------------------------------------------------------ @@ -94,12 +157,12 @@ public class TaskExecutor extends RpcEndpoint { if (newLeaderAddress != null) { // the resource manager switched to a new leader log.info("ResourceManager leader changed from {} to {}. Registering at new leader.", - resourceManagerConnection.getResourceManagerAddress(), newLeaderAddress); + resourceManagerConnection.getResourceManagerAddress(), newLeaderAddress); } else { // address null means that the current leader is lost without a new leader being there, yet log.info("Current ResourceManager {} lost leader status. Waiting for new ResourceManager leader.", - resourceManagerConnection.getResourceManagerAddress()); + resourceManagerConnection.getResourceManagerAddress()); } // drop the current connection or connection attempt @@ -112,21 +175,604 @@ public class TaskExecutor extends RpcEndpoint { // establish a connection to the new leader if (newLeaderAddress != null) { log.info("Attempting to register at ResourceManager {}", newLeaderAddress); - resourceManagerConnection = - new TaskExecutorToResourceManagerConnection(log, this, newLeaderAddress, newLeaderId); + resourceManagerConnection = + new TaskExecutorToResourceManagerConnection(log, this, newLeaderAddress, newLeaderId); resourceManagerConnection.start(); } } + /** + * Starts and runs the TaskManager. + *

+ * This method first tries to select the network interface to use for the TaskManager + * communication. The network interface is used both for the actor communication + * (coordination) as well as for the data exchange between task managers. Unless + * the hostname/interface is explicitly configured in the configuration, this + * method will try out various interfaces and methods to connect to the JobManager + * and select the one where the connection attempt is successful. + *

+ * After selecting the network interface, this method brings up an actor system + * for the TaskManager and its actors, starts the TaskManager's services + * (library cache, shuffle network stack, ...), and starts the TaskManager itself. + * + * @param configuration The configuration for the TaskManager. + * @param resourceID The id of the resource which the task manager will run on. + */ + public static void selectNetworkInterfaceAndRunTaskManager( + Configuration configuration, + ResourceID resourceID) throws Exception { + + final InetSocketAddress taskManagerAddress = selectNetworkInterfaceAndPort(configuration); + + runTaskManager(taskManagerAddress.getHostName(), resourceID, taskManagerAddress.getPort(), configuration); + } + + private static InetSocketAddress selectNetworkInterfaceAndPort(Configuration configuration) + throws Exception { + String taskManagerHostname = configuration.getString(ConfigConstants.TASK_MANAGER_HOSTNAME_KEY, null); + if (taskManagerHostname != null) { + LOG.info("Using configured hostname/address for TaskManager: " + taskManagerHostname); + } else { + LeaderRetrievalService leaderRetrievalService = LeaderRetrievalUtils.createLeaderRetrievalService(configuration); + FiniteDuration lookupTimeout = AkkaUtils.getLookupTimeout(configuration); + + InetAddress taskManagerAddress = LeaderRetrievalUtils.findConnectingAddress(leaderRetrievalService, lookupTimeout); + taskManagerHostname = taskManagerAddress.getHostName(); + LOG.info("TaskManager will use hostname/address '{}' ({}) for communication.", + taskManagerHostname, taskManagerAddress.getHostAddress()); + } + + // if no task manager port has been configured, use 0 (system will pick any free port) + final int actorSystemPort = configuration.getInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, 0); + if (actorSystemPort < 0 || actorSystemPort > 65535) { + throw new IllegalConfigurationException("Invalid value for '" + + ConfigConstants.TASK_MANAGER_IPC_PORT_KEY + + "' (port for the TaskManager actor system) : " + actorSystemPort + + " - Leave config parameter empty or use 0 to let the system choose a port automatically."); + } + + return new InetSocketAddress(taskManagerHostname, actorSystemPort); + } + + /** + * Starts and runs the TaskManager. Brings up an actor system for the TaskManager and its + * actors, starts the TaskManager's services (library cache, shuffle network stack, ...), + * and starts the TaskManager itself. + *

+ * This method will also spawn a process reaper for the TaskManager (kill the process if + * the actor fails) and optionally start the JVM memory logging thread. + * + * @param taskManagerHostname The hostname/address of the interface where the actor system + * will communicate. + * @param resourceID The id of the resource which the task manager will run on. + * @param actorSystemPort The port at which the actor system will communicate. + * @param configuration The configuration for the TaskManager. + */ + private static void runTaskManager( + String taskManagerHostname, + ResourceID resourceID, + int actorSystemPort, + final Configuration configuration) throws Exception { + + LOG.info("Starting TaskManager"); + + // Bring up the TaskManager actor system first, bind it to the given address. + + LOG.info("Starting TaskManager actor system at " + + NetUtils.hostAndPortToUrlString(taskManagerHostname, actorSystemPort)); + + final ActorSystem taskManagerSystem; + try { + Tuple2 address = new Tuple2(taskManagerHostname, actorSystemPort); + Config akkaConfig = AkkaUtils.getAkkaConfig(configuration, new Some<>(address)); + LOG.debug("Using akka configuration\n " + akkaConfig); + taskManagerSystem = AkkaUtils.createActorSystem(akkaConfig); + } catch (Throwable t) { + if (t instanceof org.jboss.netty.channel.ChannelException) { + Throwable cause = t.getCause(); + if (cause != null && t.getCause() instanceof java.net.BindException) { + String address = NetUtils.hostAndPortToUrlString(taskManagerHostname, actorSystemPort); + throw new IOException("Unable to bind TaskManager actor system to address " + + address + " - " + cause.getMessage(), t); + } + } + throw new Exception("Could not create TaskManager actor system", t); + } + + // start akka rpc service based on actor system + final Timeout timeout = new Timeout(AkkaUtils.getTimeout(configuration).toMillis(), TimeUnit.MILLISECONDS); + final AkkaRpcService akkaRpcService = new AkkaRpcService(taskManagerSystem, timeout); + + // start high availability service to implement getResourceManagerLeaderRetriever method only + final HighAvailabilityServices haServices = new HighAvailabilityServices() { + @Override + public LeaderRetrievalService getResourceManagerLeaderRetriever() throws Exception { + return LeaderRetrievalUtils.createLeaderRetrievalService(configuration); + } + + @Override + public LeaderElectionService getResourceManagerLeaderElectionService() throws Exception { + return null; + } + + @Override + public LeaderElectionService getJobMasterLeaderElectionService(JobID jobID) throws Exception { + return null; + } + }; + + // start all the TaskManager services (network stack, library cache, ...) + // and the TaskManager actor + try { + LOG.info("Starting TaskManager actor"); + TaskExecutor taskExecutor = startTaskManagerComponentsAndActor( + configuration, + resourceID, + akkaRpcService, + taskManagerHostname, + haServices, + false); + + taskExecutor.start(); + + // if desired, start the logging daemon that periodically logs the memory usage information + if (LOG.isInfoEnabled() && configuration.getBoolean( + ConfigConstants.TASK_MANAGER_DEBUG_MEMORY_USAGE_START_LOG_THREAD, + ConfigConstants.DEFAULT_TASK_MANAGER_DEBUG_MEMORY_USAGE_START_LOG_THREAD)) { + LOG.info("Starting periodic memory usage logger"); + + long interval = configuration.getLong( + ConfigConstants.TASK_MANAGER_DEBUG_MEMORY_USAGE_LOG_INTERVAL_MS, + ConfigConstants.DEFAULT_TASK_MANAGER_DEBUG_MEMORY_USAGE_LOG_INTERVAL_MS); + + MemoryLogger logger = new MemoryLogger(LOG, interval, taskManagerSystem); + logger.start(); + } + + // block until everything is done + taskManagerSystem.awaitTermination(); + } catch (Throwable t) { + LOG.error("Error while starting up taskManager", t); + try { + taskManagerSystem.shutdown(); + } catch (Throwable tt) { + LOG.warn("Could not cleanly shut down actor system", tt); + } + throw t; + } + } + + // -------------------------------------------------------------------------- + // Starting and running the TaskManager + // -------------------------------------------------------------------------- + + /** + * @param configuration The configuration for the TaskManager. + * @param resourceID The id of the resource which the task manager will run on. + * @param rpcService The rpc service which is used to start and connect to the TaskManager RpcEndpoint . + * @param taskManagerHostname The hostname/address that describes the TaskManager's data location. + * @param haServices Optionally, a high availability service can be provided. If none is given, + * then a HighAvailabilityServices is constructed from the configuration. + * @param localTaskManagerCommunication If true, the TaskManager will not initiate the TCP network stack. + * @return An ActorRef to the TaskManager actor. + * @throws org.apache.flink.configuration.IllegalConfigurationException Thrown, if the given config contains illegal values. + * @throws java.io.IOException Thrown, if any of the I/O components (such as buffer pools, + * I/O manager, ...) cannot be properly started. + * @throws java.lang.Exception Thrown is some other error occurs while parsing the configuration + * or starting the TaskManager components. + */ + public static TaskExecutor startTaskManagerComponentsAndActor( + Configuration configuration, + ResourceID resourceID, + RpcService rpcService, + String taskManagerHostname, + HighAvailabilityServices haServices, + boolean localTaskManagerCommunication) throws Exception { + + final TaskExecutorConfiguration taskExecutorConfig = parseTaskManagerConfiguration( + configuration, taskManagerHostname, localTaskManagerCommunication); + + MemoryType memType = taskExecutorConfig.getNetworkConfig().memoryType(); + + // pre-start checks + checkTempDirs(taskExecutorConfig.getTmpDirPaths()); + + ExecutionContext executionContext = ExecutionContexts$.MODULE$.fromExecutor(new ForkJoinPool()); + + // we start the network first, to make sure it can allocate its buffers first + final NetworkEnvironment network = new NetworkEnvironment( + executionContext, + taskExecutorConfig.getTimeout(), + taskExecutorConfig.getNetworkConfig(), + taskExecutorConfig.getConnectionInfo()); + + // computing the amount of memory to use depends on how much memory is available + // it strictly needs to happen AFTER the network stack has been initialized + + // check if a value has been configured + long configuredMemory = configuration.getLong(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, -1L); + checkConfigParameter(configuredMemory == -1 || configuredMemory > 0, configuredMemory, + ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, + "MemoryManager needs at least one MB of memory. " + + "If you leave this config parameter empty, the system automatically " + + "pick a fraction of the available memory."); + + final long memorySize; + boolean preAllocateMemory = configuration.getBoolean( + ConfigConstants.TASK_MANAGER_MEMORY_PRE_ALLOCATE_KEY, + ConfigConstants.DEFAULT_TASK_MANAGER_MEMORY_PRE_ALLOCATE); + if (configuredMemory > 0) { + if (preAllocateMemory) { + LOG.info("Using {} MB for managed memory." , configuredMemory); + } else { + LOG.info("Limiting managed memory to {} MB, memory will be allocated lazily." , configuredMemory); + } + memorySize = configuredMemory << 20; // megabytes to bytes + } else { + float fraction = configuration.getFloat( + ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY, + ConfigConstants.DEFAULT_MEMORY_MANAGER_MEMORY_FRACTION); + checkConfigParameter(fraction > 0.0f && fraction < 1.0f, fraction, + ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY, + "MemoryManager fraction of the free memory must be between 0.0 and 1.0"); + + if (memType == MemoryType.HEAP) { + long relativeMemSize = (long) (EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag() * fraction); + if (preAllocateMemory) { + LOG.info("Using {} of the currently free heap space for managed heap memory ({} MB)." , + fraction , relativeMemSize >> 20); + } else { + LOG.info("Limiting managed memory to {} of the currently free heap space ({} MB), " + + "memory will be allocated lazily." , fraction , relativeMemSize >> 20); + } + memorySize = relativeMemSize; + } else if (memType == MemoryType.OFF_HEAP) { + // The maximum heap memory has been adjusted according to the fraction + long maxMemory = EnvironmentInformation.getMaxJvmHeapMemory(); + long directMemorySize = (long) (maxMemory / (1.0 - fraction) * fraction); + if (preAllocateMemory) { + LOG.info("Using {} of the maximum memory size for managed off-heap memory ({} MB)." , + fraction, directMemorySize >> 20); + } else { + LOG.info("Limiting managed memory to {} of the maximum memory size ({} MB)," + + " memory will be allocated lazily.", fraction, directMemorySize >> 20); + } + memorySize = directMemorySize; + } else { + throw new RuntimeException("No supported memory type detected."); + } + } + + // now start the memory manager + final MemoryManager memoryManager; + try { + memoryManager = new MemoryManager( + memorySize, + taskExecutorConfig.getNumberOfSlots(), + taskExecutorConfig.getNetworkConfig().networkBufferSize(), + memType, + preAllocateMemory); + } catch (OutOfMemoryError e) { + if (memType == MemoryType.HEAP) { + throw new Exception("OutOfMemory error (" + e.getMessage() + + ") while allocating the TaskManager heap memory (" + memorySize + " bytes).", e); + } else if (memType == MemoryType.OFF_HEAP) { + throw new Exception("OutOfMemory error (" + e.getMessage() + + ") while allocating the TaskManager off-heap memory (" + memorySize + + " bytes).Try increasing the maximum direct memory (-XX:MaxDirectMemorySize)", e); + } else { + throw e; + } + } + + // start the I/O manager, it will create some temp directories. + final IOManager ioManager = new IOManagerAsync(taskExecutorConfig.getTmpDirPaths()); + + final TaskExecutor taskExecutor = new TaskExecutor( + taskExecutorConfig, + resourceID, + memoryManager, + ioManager, + network, + taskExecutorConfig.getNumberOfSlots(), + rpcService, + haServices); + + return taskExecutor; + } + + // -------------------------------------------------------------------------- + // Parsing and checking the TaskManager Configuration + // -------------------------------------------------------------------------- + + /** + * Utility method to extract TaskManager config parameters from the configuration and to + * sanity check them. + * + * @param configuration The configuration. + * @param taskManagerHostname The host name under which the TaskManager communicates. + * @param localTaskManagerCommunication True, to skip initializing the network stack. + * Use only in cases where only one task manager runs. + * @return TaskExecutorConfiguration that wrappers InstanceConnectionInfo, NetworkEnvironmentConfiguration, etc. + */ + private static TaskExecutorConfiguration parseTaskManagerConfiguration( + Configuration configuration, + String taskManagerHostname, + boolean localTaskManagerCommunication) throws Exception { + + // ------- read values from the config and check them --------- + // (a lot of them) + + // ----> hosts / ports for communication and data exchange + + int dataport = configuration.getInteger(ConfigConstants.TASK_MANAGER_DATA_PORT_KEY, + ConfigConstants.DEFAULT_TASK_MANAGER_DATA_PORT); + if (dataport == 0) { + dataport = NetUtils.getAvailablePort(); + } + checkConfigParameter(dataport > 0, dataport, ConfigConstants.TASK_MANAGER_DATA_PORT_KEY, + "Leave config parameter empty or use 0 to let the system choose a port automatically."); + + InetAddress taskManagerAddress = InetAddress.getByName(taskManagerHostname); + final InstanceConnectionInfo connectionInfo = new InstanceConnectionInfo(taskManagerAddress, dataport); + + // ----> memory / network stack (shuffles/broadcasts), task slots, temp directories + + // we need this because many configs have been written with a "-1" entry + int slots = configuration.getInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1); + if (slots == -1) { + slots = 1; + } + checkConfigParameter(slots >= 1, slots, ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, + "Number of task slots must be at least one."); + + final int numNetworkBuffers = configuration.getInteger( + ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY, + ConfigConstants.DEFAULT_TASK_MANAGER_NETWORK_NUM_BUFFERS); + checkConfigParameter(numNetworkBuffers > 0, numNetworkBuffers, + ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY, ""); + + final int pageSize = configuration.getInteger( + ConfigConstants.TASK_MANAGER_MEMORY_SEGMENT_SIZE_KEY, + ConfigConstants.DEFAULT_TASK_MANAGER_MEMORY_SEGMENT_SIZE); + // check page size of for minimum size + checkConfigParameter(pageSize >= MemoryManager.MIN_PAGE_SIZE, pageSize, + ConfigConstants.TASK_MANAGER_MEMORY_SEGMENT_SIZE_KEY, + "Minimum memory segment size is " + MemoryManager.MIN_PAGE_SIZE); + // check page size for power of two + checkConfigParameter(MathUtils.isPowerOf2(pageSize), pageSize, + ConfigConstants.TASK_MANAGER_MEMORY_SEGMENT_SIZE_KEY, + "Memory segment size must be a power of 2."); + + // check whether we use heap or off-heap memory + final MemoryType memType; + if (configuration.getBoolean(ConfigConstants.TASK_MANAGER_MEMORY_OFF_HEAP_KEY, false)) { + memType = MemoryType.OFF_HEAP; + } else { + memType = MemoryType.HEAP; + } + + // initialize the memory segment factory accordingly + if (memType == MemoryType.HEAP) { + if (!MemorySegmentFactory.initializeIfNotInitialized(HeapMemorySegment.FACTORY)) { + throw new Exception("Memory type is set to heap memory, but memory segment " + + "factory has been initialized for off-heap memory segments"); + } + } else { + if (!MemorySegmentFactory.initializeIfNotInitialized(HybridMemorySegment.FACTORY)) { + throw new Exception("Memory type is set to off-heap memory, but memory segment " + + "factory has been initialized for heap memory segments"); + } + } + + final String[] tmpDirs = configuration.getString( + ConfigConstants.TASK_MANAGER_TMP_DIR_KEY, + ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH).split(",|" + File.pathSeparator); + + final NettyConfig nettyConfig; + if (!localTaskManagerCommunication) { + nettyConfig = new NettyConfig(connectionInfo.address(), connectionInfo.dataPort(), pageSize, slots, configuration); + } else { + nettyConfig = null; + } + + // Default spill I/O mode for intermediate results + final String syncOrAsync = configuration.getString( + ConfigConstants.TASK_MANAGER_NETWORK_DEFAULT_IO_MODE, + ConfigConstants.DEFAULT_TASK_MANAGER_NETWORK_DEFAULT_IO_MODE); + + final IOMode ioMode; + if (syncOrAsync.equals("async")) { + ioMode = IOManager.IOMode.ASYNC; + } else { + ioMode = IOManager.IOMode.SYNC; + } + + final int queryServerPort = configuration.getInteger( + ConfigConstants.QUERYABLE_STATE_SERVER_PORT, + ConfigConstants.DEFAULT_QUERYABLE_STATE_SERVER_PORT); + + final int queryServerNetworkThreads = configuration.getInteger( + ConfigConstants.QUERYABLE_STATE_SERVER_NETWORK_THREADS, + ConfigConstants.DEFAULT_QUERYABLE_STATE_SERVER_NETWORK_THREADS); + + final int queryServerQueryThreads = configuration.getInteger( + ConfigConstants.QUERYABLE_STATE_SERVER_QUERY_THREADS, + ConfigConstants.DEFAULT_QUERYABLE_STATE_SERVER_QUERY_THREADS); + + final NetworkEnvironmentConfiguration networkConfig = new NetworkEnvironmentConfiguration( + numNetworkBuffers, + pageSize, + memType, + ioMode, + queryServerPort, + queryServerNetworkThreads, + queryServerQueryThreads, + localTaskManagerCommunication ? Option.empty() : new Some<>(nettyConfig), + new Tuple2<>(500, 3000)); + + // ----> timeouts, library caching, profiling + + final FiniteDuration timeout; + try { + timeout = AkkaUtils.getTimeout(configuration); + } catch (Exception e) { + throw new IllegalArgumentException( + "Invalid format for '" + ConfigConstants.AKKA_ASK_TIMEOUT + + "'.Use formats like '50 s' or '1 min' to specify the timeout."); + } + LOG.info("Messages between TaskManager and JobManager have a max timeout of " + timeout); + + final long cleanupInterval = configuration.getLong( + ConfigConstants.LIBRARY_CACHE_MANAGER_CLEANUP_INTERVAL, + ConfigConstants.DEFAULT_LIBRARY_CACHE_MANAGER_CLEANUP_INTERVAL) * 1000; + + final FiniteDuration finiteRegistrationDuration; + try { + Duration maxRegistrationDuration = Duration.create(configuration.getString( + ConfigConstants.TASK_MANAGER_MAX_REGISTRATION_DURATION, + ConfigConstants.DEFAULT_TASK_MANAGER_MAX_REGISTRATION_DURATION)); + if (maxRegistrationDuration.isFinite()) { + finiteRegistrationDuration = new FiniteDuration(maxRegistrationDuration.toSeconds(), TimeUnit.SECONDS); + } else { + finiteRegistrationDuration = null; + } + } catch (NumberFormatException e) { + throw new IllegalArgumentException("Invalid format for parameter " + + ConfigConstants.TASK_MANAGER_MAX_REGISTRATION_DURATION, e); + } + + final FiniteDuration initialRegistrationPause; + try { + Duration pause = Duration.create(configuration.getString( + ConfigConstants.TASK_MANAGER_INITIAL_REGISTRATION_PAUSE, + ConfigConstants.DEFAULT_TASK_MANAGER_INITIAL_REGISTRATION_PAUSE)); + if (pause.isFinite()) { + initialRegistrationPause = new FiniteDuration(pause.toSeconds(), TimeUnit.SECONDS); + } else { + throw new IllegalArgumentException("The initial registration pause must be finite: " + pause); + } + } catch (NumberFormatException e) { + throw new IllegalArgumentException("Invalid format for parameter " + + ConfigConstants.TASK_MANAGER_INITIAL_REGISTRATION_PAUSE, e); + } + + final FiniteDuration maxRegistrationPause; + try { + Duration pause = Duration.create(configuration.getString( + ConfigConstants.TASK_MANAGER_MAX_REGISTARTION_PAUSE, + ConfigConstants.DEFAULT_TASK_MANAGER_MAX_REGISTRATION_PAUSE)); + if (pause.isFinite()) { + maxRegistrationPause = new FiniteDuration(pause.toSeconds(), TimeUnit.SECONDS); + } else { + throw new IllegalArgumentException("The maximum registration pause must be finite: " + pause); + } + } catch (NumberFormatException e) { + throw new IllegalArgumentException("Invalid format for parameter " + + ConfigConstants.TASK_MANAGER_INITIAL_REGISTRATION_PAUSE, e); + } + + final FiniteDuration refusedRegistrationPause; + try { + Duration pause = Duration.create(configuration.getString( + ConfigConstants.TASK_MANAGER_REFUSED_REGISTRATION_PAUSE, + ConfigConstants.DEFAULT_TASK_MANAGER_REFUSED_REGISTRATION_PAUSE)); + if (pause.isFinite()) { + refusedRegistrationPause = new FiniteDuration(pause.toSeconds(), TimeUnit.SECONDS); + } else { + throw new IllegalArgumentException("The refused registration pause must be finite: " + pause); + } + } catch (NumberFormatException e) { + throw new IllegalArgumentException("Invalid format for parameter " + + ConfigConstants.TASK_MANAGER_INITIAL_REGISTRATION_PAUSE, e); + } + + return new TaskExecutorConfiguration( + tmpDirs, + cleanupInterval, + connectionInfo, + networkConfig, + timeout, + finiteRegistrationDuration, + slots, + configuration, + initialRegistrationPause, + maxRegistrationPause, + refusedRegistrationPause); + } + + /** + * Validates a condition for a config parameter and displays a standard exception, if the + * the condition does not hold. + * + * @param condition The condition that must hold. If the condition is false, an exception is thrown. + * @param parameter The parameter value. Will be shown in the exception message. + * @param name The name of the config parameter. Will be shown in the exception message. + * @param errorMessage The optional custom error message to append to the exception message. + */ + private static void checkConfigParameter( + boolean condition, + Object parameter, + String name, + String errorMessage) { + if (!condition) { + throw new IllegalConfigurationException("Invalid configuration value for " + name + " : " + parameter + " - " + errorMessage); + } + } + + /** + * Validates that all the directories denoted by the strings do actually exist, are proper + * directories (not files), and are writable. + * + * @param tmpDirs The array of directory paths to check. + * @throws Exception Thrown if any of the directories does not exist or is not writable + * or is a file, rather than a directory. + */ + private static void checkTempDirs(String[] tmpDirs) throws IOException { + for (String dir : tmpDirs) { + if (dir != null && !dir.equals("")) { + File file = new File(dir); + if (!file.exists()) { + throw new IOException("Temporary file directory " + file.getAbsolutePath() + " does not exist."); + } + if (!file.isDirectory()) { + throw new IOException("Temporary file directory " + file.getAbsolutePath() + " is not a directory."); + } + if (!file.canWrite()) { + throw new IOException("Temporary file directory " + file.getAbsolutePath() + " is not writable."); + } + + if (LOG.isInfoEnabled()) { + long totalSpaceGb = file.getTotalSpace() >> 30; + long usableSpaceGb = file.getUsableSpace() >> 30; + double usablePercentage = (double)usableSpaceGb / totalSpaceGb * 100; + String path = file.getAbsolutePath(); + LOG.info(String.format("Temporary file directory '%s': total %d GB, " + "usable %d GB (%.2f%% usable)", + path, totalSpaceGb, usableSpaceGb, usablePercentage)); + } + } else { + throw new IllegalArgumentException("Temporary file directory #$id is null."); + } + } + } + + // ------------------------------------------------------------------------ + // Properties + // ------------------------------------------------------------------------ + + public ResourceID getResourceID() { + return resourceID; + } + // ------------------------------------------------------------------------ - // Error handling + // Error Handling // ------------------------------------------------------------------------ /** * Notifies the TaskExecutor that a fatal error has occurred and it cannot proceed. * This method should be used when asynchronous threads want to notify the * TaskExecutor of a fatal error. - * + * * @param t The exception describing the fatal error */ void onFatalErrorAsync(final Throwable t) { @@ -141,7 +787,7 @@ public class TaskExecutor extends RpcEndpoint { /** * Notifies the TaskExecutor that a fatal error has occurred and it cannot proceed. * This method must only be called from within the TaskExecutor's main thread. - * + * * @param t The exception describing the fatal error */ void onFatalError(Throwable t) { http://git-wip-us.apache.org/repos/asf/flink/blob/35e80103/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutorConfiguration.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutorConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutorConfiguration.java new file mode 100644 index 0000000..32484e1 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutorConfiguration.java @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rpc.taskexecutor; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.instance.InstanceConnectionInfo; +import org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration; + +import scala.concurrent.duration.FiniteDuration; + +import java.io.Serializable; +import java.util.concurrent.TimeUnit; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * {@link TaskExecutor} Configuration + */ +public class TaskExecutorConfiguration implements Serializable { + + private static final long serialVersionUID = 1L; + + private final String[] tmpDirPaths; + + private final long cleanupInterval; + + private final int numberOfSlots; + + private final Configuration configuration; + + private final FiniteDuration timeout; + private final FiniteDuration maxRegistrationDuration; + private final FiniteDuration initialRegistrationPause; + private final FiniteDuration maxRegistrationPause; + private final FiniteDuration refusedRegistrationPause; + + private final NetworkEnvironmentConfiguration networkConfig; + + private final InstanceConnectionInfo connectionInfo; + + public TaskExecutorConfiguration( + String[] tmpDirPaths, + long cleanupInterval, + InstanceConnectionInfo connectionInfo, + NetworkEnvironmentConfiguration networkConfig, + FiniteDuration timeout, + FiniteDuration maxRegistrationDuration, + int numberOfSlots, + Configuration configuration) { + + this (tmpDirPaths, + cleanupInterval, + connectionInfo, + networkConfig, + timeout, + maxRegistrationDuration, + numberOfSlots, + configuration, + new FiniteDuration(500, TimeUnit.MILLISECONDS), + new FiniteDuration(30, TimeUnit.SECONDS), + new FiniteDuration(10, TimeUnit.SECONDS)); + } + + public TaskExecutorConfiguration( + String[] tmpDirPaths, + long cleanupInterval, + InstanceConnectionInfo connectionInfo, + NetworkEnvironmentConfiguration networkConfig, + FiniteDuration timeout, + FiniteDuration maxRegistrationDuration, + int numberOfSlots, + Configuration configuration, + FiniteDuration initialRegistrationPause, + FiniteDuration maxRegistrationPause, + FiniteDuration refusedRegistrationPause) { + + this.tmpDirPaths = checkNotNull(tmpDirPaths); + this.cleanupInterval = checkNotNull(cleanupInterval); + this.connectionInfo = checkNotNull(connectionInfo); + this.networkConfig = checkNotNull(networkConfig); + this.timeout = checkNotNull(timeout); + this.maxRegistrationDuration = maxRegistrationDuration; + this.numberOfSlots = checkNotNull(numberOfSlots); + this.configuration = checkNotNull(configuration); + this.initialRegistrationPause = checkNotNull(initialRegistrationPause); + this.maxRegistrationPause = checkNotNull(maxRegistrationPause); + this.refusedRegistrationPause = checkNotNull(refusedRegistrationPause); + } + + // -------------------------------------------------------------------------------------------- + // Properties + // -------------------------------------------------------------------------------------------- + + public String[] getTmpDirPaths() { + return tmpDirPaths; + } + + public long getCleanupInterval() { + return cleanupInterval; + } + + public InstanceConnectionInfo getConnectionInfo() { return connectionInfo; } + + public NetworkEnvironmentConfiguration getNetworkConfig() { return networkConfig; } + + public FiniteDuration getTimeout() { + return timeout; + } + + public FiniteDuration getMaxRegistrationDuration() { + return maxRegistrationDuration; + } + + public int getNumberOfSlots() { + return numberOfSlots; + } + + public Configuration getConfiguration() { + return configuration; + } + + public FiniteDuration getInitialRegistrationPause() { + return initialRegistrationPause; + } + + public FiniteDuration getMaxRegistrationPause() { + return maxRegistrationPause; + } + + public FiniteDuration getRefusedRegistrationPause() { + return refusedRegistrationPause; + } + +} + http://git-wip-us.apache.org/repos/asf/flink/blob/35e80103/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutorTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutorTest.java index b831ead..25a670c 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutorTest.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.rpc.taskexecutor; +import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.highavailability.NonHaServices; import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices; @@ -49,9 +50,9 @@ public class TaskExecutorTest extends TestLogger { rpc.registerGateway(resourceManagerAddress, rmGateway); NonHaServices haServices = new NonHaServices(resourceManagerAddress); - TaskExecutor taskManager = new TaskExecutor(rpc, haServices, resourceID); + TaskExecutor taskManager = TaskExecutor.startTaskManagerComponentsAndActor( + new Configuration(), resourceID, rpc, "localhost", haServices, true); String taskManagerAddress = taskManager.getAddress(); - taskManager.start(); verify(rmGateway, timeout(5000)).registerTaskExecutor( @@ -84,7 +85,8 @@ public class TaskExecutorTest extends TestLogger { TestingHighAvailabilityServices haServices = new TestingHighAvailabilityServices(); haServices.setResourceManagerLeaderRetriever(testLeaderService); - TaskExecutor taskManager = new TaskExecutor(rpc, haServices, resourceID); + TaskExecutor taskManager = TaskExecutor.startTaskManagerComponentsAndActor( + new Configuration(), resourceID, rpc, "localhost", haServices, true); String taskManagerAddress = taskManager.getAddress(); taskManager.start();