Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id E95AE200B7E for ; Mon, 22 Aug 2016 11:45:23 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id E8001160AB3; Mon, 22 Aug 2016 09:45:23 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 9E87C160A91 for ; Mon, 22 Aug 2016 11:45:22 +0200 (CEST) Received: (qmail 18888 invoked by uid 500); 22 Aug 2016 09:45:21 -0000 Mailing-List: contact issues-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list issues@flink.apache.org Received: (qmail 18854 invoked by uid 99); 22 Aug 2016 09:45:21 -0000 Received: from arcas.apache.org (HELO arcas) (140.211.11.28) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 22 Aug 2016 09:45:21 +0000 Received: from arcas.apache.org (localhost [127.0.0.1]) by arcas (Postfix) with ESMTP id B65E52C014C for ; Mon, 22 Aug 2016 09:45:21 +0000 (UTC) Date: Mon, 22 Aug 2016 09:45:21 +0000 (UTC) From: "ASF GitHub Bot (JIRA)" To: issues@flink.apache.org Message-ID: In-Reply-To: References: Subject: [jira] [Commented] (FLINK-4363) Implement TaskManager basic startup of all components in java MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 7bit X-JIRA-FingerPrint: 30527f35849b9dde25b450d4833f0394 archived-at: Mon, 22 Aug 2016 09:45:24 -0000 [ https://issues.apache.org/jira/browse/FLINK-4363?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15430448#comment-15430448 ] ASF GitHub Bot commented on FLINK-4363: --------------------------------------- Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2400#discussion_r75647841 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutor.java --- @@ -35,27 +79,634 @@ */ public class TaskExecutor extends RpcEndpoint { - /** The unique resource ID of this TaskExecutor */ + private static final Logger LOG = LoggerFactory.getLogger(TaskExecutor.class); + + /** Return code for critical errors during the runtime */ + private static final int RUNTIME_FAILURE_RETURN_CODE = 2; + + /** The name of the TaskManager actor */ + private static final String TASK_MANAGER_NAME = "taskmanager"; + + /** The unique resource ID of this TaskManager */ private final ResourceID resourceID; /** The access to the leader election and metadata storage services */ private final HighAvailabilityServices haServices; - // --------- resource manager -------- + /** The task manager configuration */ + private final TaskManagerConfiguration taskManagerConfig; + + /** The connection information of the task manager */ + private final InstanceConnectionInfo connectionInfo; + + /** The I/O manager component in the task manager */ + private final IOManager ioManager; + + /** The memory manager component in the task manager */ + private final MemoryManager memoryManager; + + /** The network component in the task manager */ + private final NetworkEnvironment networkEnvironment; + + /** The number of slots in the task manager, should be 1 for YARN */ + private final int numberOfSlots; + // --------- resource manager -------- private TaskExecutorToResourceManagerConnection resourceManagerConnection; // ------------------------------------------------------------------------ public TaskExecutor( + TaskManagerConfiguration taskManagerConfig, + ResourceID resourceID, + InstanceConnectionInfo connectionInfo, + MemoryManager memoryManager, + IOManager ioManager, + NetworkEnvironment networkEnvironment, + int numberOfSlots, RpcService rpcService, - HighAvailabilityServices haServices, - ResourceID resourceID) { + HighAvailabilityServices haServices) { super(rpcService); - this.haServices = checkNotNull(haServices); + this.taskManagerConfig = checkNotNull(taskManagerConfig); this.resourceID = checkNotNull(resourceID); + this.connectionInfo = checkNotNull(connectionInfo); + this.memoryManager = checkNotNull(memoryManager); + this.ioManager = checkNotNull(ioManager); + this.networkEnvironment = checkNotNull(networkEnvironment); + this.numberOfSlots = checkNotNull(numberOfSlots); + this.haServices = checkNotNull(haServices); + } + + /** + * Starts and runs the TaskManager. + *

+ * This method first tries to select the network interface to use for the TaskManager + * communication. The network interface is used both for the actor communication + * (coordination) as well as for the data exchange between task managers. Unless + * the hostname/interface is explicitly configured in the configuration, this + * method will try out various interfaces and methods to connect to the JobManager + * and select the one where the connection attempt is successful. + *

+ * After selecting the network interface, this method brings up an actor system + * for the TaskManager and its actors, starts the TaskManager's services + * (library cache, shuffle network stack, ...), and starts the TaskManager itself. + * + * @param configuration The configuration for the TaskManager. + * @param taskManagerClass The actor class to instantiate. + * Allows to use TaskManager subclasses for example for YARN. + */ + public static void selectNetworkInterfaceAndRunTaskManager( + Configuration configuration, + ResourceID resourceID, + Class taskManagerClass) throws Exception { + + Tuple2 tuple2 = selectNetworkInterfaceAndPort(configuration); + + runTaskManager(tuple2._1(), resourceID, tuple2._2(), configuration, taskManagerClass); + } + + private static Tuple2 selectNetworkInterfaceAndPort(Configuration configuration) + throws Exception { + String taskManagerHostname = configuration.getString(ConfigConstants.TASK_MANAGER_HOSTNAME_KEY, null); + if (taskManagerHostname != null) { + LOG.info("Using configured hostname/address for TaskManager: " + taskManagerHostname); + } else { + LeaderRetrievalService leaderRetrievalService = LeaderRetrievalUtils.createLeaderRetrievalService(configuration); + FiniteDuration lookupTimeout = AkkaUtils.getLookupTimeout(configuration); + + InetAddress taskManagerAddress = LeaderRetrievalUtils.findConnectingAddress(leaderRetrievalService, lookupTimeout); + taskManagerHostname = taskManagerAddress.getHostName(); + LOG.info("TaskManager will use hostname/address '{}' ({}) for communication.", + taskManagerHostname, taskManagerAddress.getHostAddress()); + } + + // if no task manager port has been configured, use 0 (system will pick any free port) + int actorSystemPort = configuration.getInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, 0); + if (actorSystemPort < 0 || actorSystemPort > 65535) { + throw new IllegalConfigurationException("Invalid value for '" + + ConfigConstants.TASK_MANAGER_IPC_PORT_KEY + + "' (port for the TaskManager actor system) : " + actorSystemPort + + " - Leave config parameter empty or use 0 to let the system choose a port automatically."); + } + + return new Tuple2<>(taskManagerHostname, actorSystemPort); + } + + /** + * Starts and runs the TaskManager. Brings up an actor system for the TaskManager and its + * actors, starts the TaskManager's services (library cache, shuffle network stack, ...), + * and starts the TaskManager itself. + *

+ * This method will also spawn a process reaper for the TaskManager (kill the process if + * the actor fails) and optionally start the JVM memory logging thread. + * + * @param taskManagerHostname The hostname/address of the interface where the actor system + * will communicate. + * @param resourceID The id of the resource which the task manager will run on. + * @param actorSystemPort The port at which the actor system will communicate. + * @param configuration The configuration for the TaskManager. + * @param taskManagerClass The actor class to instantiate. Allows the use of TaskManager + * subclasses for example for YARN. + */ + private static void runTaskManager( + String taskManagerHostname, + ResourceID resourceID, + int actorSystemPort, + Configuration configuration, + Class taskManagerClass) throws Exception { + + LOG.info("Starting TaskManager"); + + // Bring up the TaskManager actor system first, bind it to the given address. + + LOG.info("Starting TaskManager actor system at " + + NetUtils.hostAndPortToUrlString(taskManagerHostname, actorSystemPort)); + + ActorSystem taskManagerSystem; + try { + Tuple2 address = new Tuple2(taskManagerHostname, actorSystemPort); + Config akkaConfig = AkkaUtils.getAkkaConfig(configuration, new Some<>(address)); + LOG.debug("Using akka configuration\n " + akkaConfig); + taskManagerSystem = AkkaUtils.createActorSystem(akkaConfig); + } catch (Throwable t) { + if (t instanceof org.jboss.netty.channel.ChannelException) { + Throwable cause = t.getCause(); + if (cause != null && t.getCause() instanceof java.net.BindException) { + String address = NetUtils.hostAndPortToUrlString(taskManagerHostname, actorSystemPort); + throw new IOException("Unable to bind TaskManager actor system to address " + + address + " - " + cause.getMessage(), t); + } + } + throw new Exception("Could not create TaskManager actor system", t); + } + + // start all the TaskManager services (network stack, library cache, ...) + // and the TaskManager actor + try { + LOG.info("Starting TaskManager actor"); + ActorRef taskManagerActor = startTaskManagerComponentsAndActor( + configuration, + resourceID, + taskManagerSystem, + taskManagerHostname, + TASK_MANAGER_NAME, + null, + false, + taskManagerClass); + + // start a process reaper that watches the JobManager. If the TaskManager actor dies, + // the process reaper will kill the JVM process (to ensure easy failure detection) + LOG.debug("Starting TaskManager process reaper"); + + taskManagerSystem.actorOf( + Props.create(ProcessReaper.class, taskManagerActor, LOG, RUNTIME_FAILURE_RETURN_CODE), + "TaskManager_Process_Reaper"); + + // if desired, start the logging daemon that periodically logs the memory usage information + if (LOG.isInfoEnabled() && configuration.getBoolean( + ConfigConstants.TASK_MANAGER_DEBUG_MEMORY_USAGE_START_LOG_THREAD, + ConfigConstants.DEFAULT_TASK_MANAGER_DEBUG_MEMORY_USAGE_START_LOG_THREAD)) { + LOG.info("Starting periodic memory usage logger"); + + long interval = configuration.getLong( + ConfigConstants.TASK_MANAGER_DEBUG_MEMORY_USAGE_LOG_INTERVAL_MS, + ConfigConstants.DEFAULT_TASK_MANAGER_DEBUG_MEMORY_USAGE_LOG_INTERVAL_MS); + + MemoryLogger logger = new MemoryLogger(LOG, interval, taskManagerSystem); + logger.start(); + } + + // block until everything is done + taskManagerSystem.awaitTermination(); + } catch (Throwable t) { + LOG.error("Error while starting up taskManager", t); + try { + taskManagerSystem.shutdown(); + } catch (Throwable tt) { + LOG.warn("Could not cleanly shut down actor system", tt); + } + throw t; + } + } + + // -------------------------------------------------------------------------- + // Starting and running the TaskManager + // -------------------------------------------------------------------------- + + /** + * @param configuration The configuration for the TaskManager. + * @param resourceID The id of the resource which the task manager will run on. + * @param actorSystem The actor system that should run the TaskManager actor. + * @param taskManagerHostname The hostname/address that describes the TaskManager's data location. + * @param taskManagerActorName Optionally the name of the TaskManager actor. If none is given, + * the actor will use a random name. + * @param leaderRetrievalService Optionally, a leader retrieval service can be provided. If none is given, + * then a LeaderRetrievalService is constructed from the configuration. + * @param localTaskManagerCommunication If true, the TaskManager will not initiate the TCP network stack. + * @param taskManagerClass The class of the TaskManager actor. May be used to give + * subclasses that understand additional actor messages. + * @return An ActorRef to the TaskManager actor. + * @throws org.apache.flink.configuration.IllegalConfigurationException Thrown, if the given config contains illegal values. + * @throws java.io.IOException Thrown, if any of the I/O components (such as buffer pools, + * I/O manager, ...) cannot be properly started. + * @throws java.lang.Exception Thrown is some other error occurs while parsing the configuration + * or starting the TaskManager components. + */ + public static ActorRef startTaskManagerComponentsAndActor( + Configuration configuration, + ResourceID resourceID, + ActorSystem actorSystem, + String taskManagerHostname, + String taskManagerActorName, + LeaderRetrievalService leaderRetrievalService, + boolean localTaskManagerCommunication, + Class taskManagerClass) throws Exception { + + Tuple4 tuple4 + = parseTaskManagerConfiguration(configuration, taskManagerHostname, localTaskManagerCommunication); + + TaskManagerConfiguration taskManagerConfig = tuple4._1(); + NetworkEnvironmentConfiguration netConfig = tuple4._2(); + InstanceConnectionInfo connectionInfo = tuple4._3(); + MemoryType memType = tuple4._4(); + + // pre-start checks + checkTempDirs(taskManagerConfig.tmpDirPaths()); + + ExecutionContext executionContext = ExecutionContexts$.MODULE$.fromExecutor(new ForkJoinPool()); + + // we start the network first, to make sure it can allocate its buffers first + NetworkEnvironment network = new NetworkEnvironment(executionContext, taskManagerConfig.timeout(), netConfig); + + // computing the amount of memory to use depends on how much memory is available + // it strictly needs to happen AFTER the network stack has been initialized + + // check if a value has been configured + long configuredMemory = configuration.getLong(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, -1L); + checkConfigParameter(configuredMemory == -1 || configuredMemory > 0, configuredMemory, + ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, + "MemoryManager needs at least one MB of memory. " + + "If you leave this config parameter empty, the system automatically " + + "pick a fraction of the available memory."); + + long memorySize; + boolean preAllocateMemory = configuration.getBoolean( + ConfigConstants.TASK_MANAGER_MEMORY_PRE_ALLOCATE_KEY, + ConfigConstants.DEFAULT_TASK_MANAGER_MEMORY_PRE_ALLOCATE); + if (configuredMemory > 0) { + if (preAllocateMemory) { + LOG.info("Using {} MB for managed memory." , configuredMemory); + } else { + LOG.info("Limiting managed memory to {} MB, memory will be allocated lazily." , configuredMemory); + } + memorySize = configuredMemory << 20; // megabytes to bytes + } else { + float fraction = configuration.getFloat( + ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY, + ConfigConstants.DEFAULT_MEMORY_MANAGER_MEMORY_FRACTION); + checkConfigParameter(fraction > 0.0f && fraction < 1.0f, fraction, + ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY, + "MemoryManager fraction of the free memory must be between 0.0 and 1.0"); + + if (memType == MemoryType.HEAP) { + long relativeMemSize = (long) (EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag() * fraction); + if (preAllocateMemory) { + LOG.info("Using {} of the currently free heap space for managed heap memory ({} MB)." , + fraction , relativeMemSize >> 20); + } else { + LOG.info("Limiting managed memory to {} of the currently free heap space ({} MB), " + + "memory will be allocated lazily." , fraction , relativeMemSize >> 20); + } + memorySize = relativeMemSize; + } else if (memType == MemoryType.OFF_HEAP) { + // The maximum heap memory has been adjusted according to the fraction + long maxMemory = EnvironmentInformation.getMaxJvmHeapMemory(); + long directMemorySize = (long) (maxMemory / (1.0 - fraction) * fraction); + if (preAllocateMemory) { + LOG.info("Using {} of the maximum memory size for managed off-heap memory ({} MB)." , + fraction, directMemorySize >> 20); + } else { + LOG.info("Limiting managed memory to {} of the maximum memory size ({} MB)," + + " memory will be allocated lazily.", fraction, directMemorySize >> 20); + } + memorySize = directMemorySize; + } else { + throw new RuntimeException("No supported memory type detected."); + } + } + + // now start the memory manager + MemoryManager memoryManager; + try { + memoryManager = new MemoryManager( + memorySize, + taskManagerConfig.numberOfSlots(), + netConfig.networkBufferSize(), + memType, + preAllocateMemory); + } catch (OutOfMemoryError e) { + if (memType == MemoryType.HEAP) { + throw new Exception("OutOfMemory error (" + e.getMessage() + + ") while allocating the TaskManager heap memory (" + memorySize + " bytes).", e); + } else if (memType == MemoryType.OFF_HEAP) { + throw new Exception("OutOfMemory error (" + e.getMessage() + + ") while allocating the TaskManager off-heap memory (" + memorySize + + " bytes).Try increasing the maximum direct memory (-XX:MaxDirectMemorySize)", e); + } else { + throw e; + } + } + + // start the I/O manager, it will create some temp directories. + IOManager ioManager = new IOManagerAsync(taskManagerConfig.tmpDirPaths()); + + if (leaderRetrievalService == null){ + leaderRetrievalService = LeaderRetrievalUtils.createLeaderRetrievalService(configuration); + } + + // create the actor properties (which define the actor constructor parameters) + Props tmProps = Props.create( + taskManagerClass, + taskManagerConfig, + resourceID, + connectionInfo, + memoryManager, + ioManager, + network, + taskManagerConfig.numberOfSlots(), + leaderRetrievalService); + + ActorRef taskManagerActorRef; + if (taskManagerActorName != null && !taskManagerActorName.equals("")) { + taskManagerActorRef = actorSystem.actorOf(tmProps, taskManagerActorName); + } else { + taskManagerActorRef = actorSystem.actorOf(tmProps); + } + + return taskManagerActorRef; + } + + // -------------------------------------------------------------------------- + // Parsing and checking the TaskManager Configuration + // -------------------------------------------------------------------------- + + /** + * Utility method to extract TaskManager config parameters from the configuration and to + * sanity check them. + * + * @param configuration The configuration. + * @param taskManagerHostname The host name under which the TaskManager communicates. + * @param localTaskManagerCommunication True, to skip initializing the network stack. + * Use only in cases where only one task manager runs. + * @return A tuple (TaskManagerConfiguration, network configuration, + * InstanceConnectionInfo, JobManager actor Akka URL). + */ + private static Tuple4 + parseTaskManagerConfiguration(Configuration configuration, String taskManagerHostname, boolean localTaskManagerCommunication) + throws Exception { + + // ------- read values from the config and check them --------- + // (a lot of them) + + // ----> hosts / ports for communication and data exchange + + int dataport = configuration.getInteger(ConfigConstants.TASK_MANAGER_DATA_PORT_KEY, + ConfigConstants.DEFAULT_TASK_MANAGER_DATA_PORT); + if (dataport == 0) { + dataport = NetUtils.getAvailablePort(); + } + checkConfigParameter(dataport > 0, dataport, ConfigConstants.TASK_MANAGER_DATA_PORT_KEY, + "Leave config parameter empty or use 0 to let the system choose a port automatically."); + + InetAddress taskManagerAddress = InetAddress.getByName(taskManagerHostname); + InstanceConnectionInfo connectionInfo = new InstanceConnectionInfo(taskManagerAddress, dataport); + + // ----> memory / network stack (shuffles/broadcasts), task slots, temp directories + + // we need this because many configs have been written with a "-1" entry + int slots = configuration.getInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1); + if (slots == -1) { + slots = 1; + } + checkConfigParameter(slots >= 1, slots, ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, + "Number of task slots must be at least one."); + + int numNetworkBuffers = configuration.getInteger( + ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY, + ConfigConstants.DEFAULT_TASK_MANAGER_NETWORK_NUM_BUFFERS); + checkConfigParameter(numNetworkBuffers > 0, numNetworkBuffers, + ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY, ""); + + int pageSize = configuration.getInteger( + ConfigConstants.TASK_MANAGER_MEMORY_SEGMENT_SIZE_KEY, + ConfigConstants.DEFAULT_TASK_MANAGER_MEMORY_SEGMENT_SIZE); + // check page size of for minimum size + checkConfigParameter(pageSize >= MemoryManager.MIN_PAGE_SIZE, pageSize, + ConfigConstants.TASK_MANAGER_MEMORY_SEGMENT_SIZE_KEY, + "Minimum memory segment size is " + MemoryManager.MIN_PAGE_SIZE); + // check page size for power of two + checkConfigParameter(MathUtils.isPowerOf2(pageSize), pageSize, + ConfigConstants.TASK_MANAGER_MEMORY_SEGMENT_SIZE_KEY, + "Memory segment size must be a power of 2."); + + // check whether we use heap or off-heap memory + MemoryType memType; + if (configuration.getBoolean(ConfigConstants.TASK_MANAGER_MEMORY_OFF_HEAP_KEY, false)) { + memType = MemoryType.OFF_HEAP; + } else { + memType = MemoryType.HEAP; + } + + // initialize the memory segment factory accordingly + if (memType == MemoryType.HEAP) { + if (!MemorySegmentFactory.isInitialized()) { + MemorySegmentFactory.initializeFactory(HeapMemorySegment.FACTORY); + } else if (MemorySegmentFactory.getFactory() != HeapMemorySegment.FACTORY) { + throw new Exception("Memory type is set to heap memory, but memory segment " + + "factory has been initialized for off-heap memory segments"); + } + } else { + if (!MemorySegmentFactory.isInitialized()) { + MemorySegmentFactory.initializeFactory(HybridMemorySegment.FACTORY); + } else if (MemorySegmentFactory.getFactory() != HybridMemorySegment.FACTORY) { + throw new Exception("Memory type is set to off-heap memory, but memory segment " + + "factory has been initialized for heap memory segments"); + } + } + + String[] tmpDirs = configuration.getString( + ConfigConstants.TASK_MANAGER_TMP_DIR_KEY, + ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH).split(",|" + File.pathSeparator); + + NettyConfig nettyConfig = null; + if (!localTaskManagerCommunication) { + nettyConfig = new NettyConfig(connectionInfo.address(), connectionInfo.dataPort(), pageSize, slots, configuration); + } + + // Default spill I/O mode for intermediate results + String syncOrAsync = configuration.getString( + ConfigConstants.TASK_MANAGER_NETWORK_DEFAULT_IO_MODE, + ConfigConstants.DEFAULT_TASK_MANAGER_NETWORK_DEFAULT_IO_MODE); + + IOMode ioMode; --- End diff -- Would use `final` here. > Implement TaskManager basic startup of all components in java > ------------------------------------------------------------- > > Key: FLINK-4363 > URL: https://issues.apache.org/jira/browse/FLINK-4363 > Project: Flink > Issue Type: Sub-task > Components: Cluster Management > Reporter: Zhijiang Wang > Assignee: Zhijiang Wang > > Similar with current {{TaskManager}},but implement initialization and startup all components in java instead of scala. -- This message was sent by Atlassian JIRA (v6.3.4#6332)