flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From wangzhijiang999 <...@git.apache.org>
Subject [GitHub] flink pull request #2461: [FLINK-4505][Cluster Management] Implement TaskMan...
Date Fri, 02 Sep 2016 10:58:24 GMT
Github user wangzhijiang999 commented on a diff in the pull request:

    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/YarnTaskExecutorFactory.java
    @@ -0,0 +1,198 @@
    +package org.apache.flink.runtime.taskexecutor;
    +import org.apache.flink.api.common.JobID;
    +import org.apache.flink.configuration.ConfigConstants;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.configuration.IllegalConfigurationException;
    +import org.apache.flink.runtime.akka.AkkaUtils;
    +import org.apache.flink.runtime.clusterframework.types.ResourceID;
    +import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
    +import org.apache.flink.runtime.leaderelection.LeaderElectionService;
    +import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
    +import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
    +import org.apache.flink.runtime.taskmanager.MemoryLogger;
    +import org.apache.flink.runtime.util.LeaderRetrievalUtils;
    +import org.apache.flink.util.NetUtils;
    +import akka.actor.ActorSystem;
    +import akka.util.Timeout;
    +import scala.Some;
    +import scala.Tuple2;
    +import scala.concurrent.duration.FiniteDuration;
    +import com.typesafe.config.Config;
    +import java.io.IOException;
    +import java.net.InetAddress;
    +import java.net.InetSocketAddress;
    +import java.util.concurrent.TimeUnit;
    + * An factory for creating {@link TaskExecutor} and starting it in yarn mode.
    + */
    +public class YarnTaskExecutorFactory extends TaskExecutorFactory {
    +	public YarnTaskExecutorFactory(Configuration configuration, ResourceID resourceID) {
    +		super(configuration, resourceID);
    +	}
    +	@Override
    +	public TaskExecutor createAndStartTaskExecutor() throws Exception {
    +		return selectNetworkInterfaceAndRunTaskManager(configuration, resourceID);
    +	}
    +	/**
    +	 * Starts and runs the TaskManager.
    +	 * <p/>
    +	 * This method first tries to select the network interface to use for the TaskManager
    +	 * communication. The network interface is used both for the actor communication
    +	 * (coordination) as well as for the data exchange between task managers. Unless
    +	 * the hostname/interface is explicitly configured in the configuration, this
    +	 * method will try out various interfaces and methods to connect to the JobManager
    +	 * and select the one where the connection attempt is successful.
    +	 * <p/>
    +	 * After selecting the network interface, this method brings up an actor system
    +	 * for the TaskManager and its actors, starts the TaskManager's services
    +	 * (library cache, shuffle network stack, ...), and starts the TaskManager itself.
    +	 *
    +	 * @param configuration    The configuration for the TaskManager.
    +	 * @param resourceID       The id of the resource which the task manager will run on.
    +	 */
    +	private TaskExecutor selectNetworkInterfaceAndRunTaskManager(
    --- End diff --
    The parameters provided from different modes are different as I referred to previous implementation
for **TaskManager**. For example, **YarnTaskManagerRunner** invokes the method "selectNetworkInterfaceAndRunTaskManager",
and **ForkableFlinkMiniCluster** invokes the method "startTaskManagerComponentsAndActor" to
start **TaskManager** before. So I retained the previous ways for different modes.

If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.

View raw message