flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mxm <...@git.apache.org>
Subject [GitHub] flink pull request #2400: [FLINK-4363] Implement TaskManager basic startup o...
Date Mon, 22 Aug 2016 09:38:00 GMT
Github user mxm commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2400#discussion_r75646889
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutor.java
---
    @@ -35,27 +79,634 @@
      */
     public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
     
    -	/** The unique resource ID of this TaskExecutor */
    +	private static final Logger LOG = LoggerFactory.getLogger(TaskExecutor.class);
    +
    +	/** Return code for critical errors during the runtime */
    +	private static final int RUNTIME_FAILURE_RETURN_CODE = 2;
    +
    +	/** The name of the TaskManager actor */
    +	private static final String TASK_MANAGER_NAME = "taskmanager";
    +
    +	/** The unique resource ID of this TaskManager */
     	private final ResourceID resourceID;
     
     	/** The access to the leader election and metadata storage services */
     	private final HighAvailabilityServices haServices;
     
    -	// --------- resource manager --------
    +	/** The task manager configuration */
    +	private final TaskManagerConfiguration taskManagerConfig;
    +
    +	/** The connection information of the task manager */
    +	private final InstanceConnectionInfo connectionInfo;
    +
    +	/** The I/O manager component in the task manager */
    +	private final IOManager ioManager;
    +
    +	/** The memory manager component in the task manager */
    +	private final MemoryManager memoryManager;
    +
    +	/** The network component in the task manager */
    +	private final NetworkEnvironment networkEnvironment;
    +
    +	/** The number of slots in the task manager, should be 1 for YARN */
    +	private final int numberOfSlots;
     
    +	// --------- resource manager --------
     	private TaskExecutorToResourceManagerConnection resourceManagerConnection;
     
     	// ------------------------------------------------------------------------
     
     	public TaskExecutor(
    +			TaskManagerConfiguration taskManagerConfig,
    +			ResourceID resourceID,
    +			InstanceConnectionInfo connectionInfo,
    +			MemoryManager memoryManager,
    +			IOManager ioManager,
    +			NetworkEnvironment networkEnvironment,
    +			int numberOfSlots,
     			RpcService rpcService,
    -			HighAvailabilityServices haServices,
    -			ResourceID resourceID) {
    +			HighAvailabilityServices haServices) {
     
     		super(rpcService);
     
    -		this.haServices = checkNotNull(haServices);
    +		this.taskManagerConfig = checkNotNull(taskManagerConfig);
     		this.resourceID = checkNotNull(resourceID);
    +		this.connectionInfo = checkNotNull(connectionInfo);
    +		this.memoryManager = checkNotNull(memoryManager);
    +		this.ioManager = checkNotNull(ioManager);
    +		this.networkEnvironment = checkNotNull(networkEnvironment);
    +		this.numberOfSlots = checkNotNull(numberOfSlots);
    +		this.haServices = checkNotNull(haServices);
    +	}
    +
    +	/**
    +	 * Starts and runs the TaskManager.
    +	 * <p/>
    +	 * This method first tries to select the network interface to use for the TaskManager
    +	 * communication. The network interface is used both for the actor communication
    +	 * (coordination) as well as for the data exchange between task managers. Unless
    +	 * the hostname/interface is explicitly configured in the configuration, this
    +	 * method will try out various interfaces and methods to connect to the JobManager
    +	 * and select the one where the connection attempt is successful.
    +	 * <p/>
    +	 * After selecting the network interface, this method brings up an actor system
    +	 * for the TaskManager and its actors, starts the TaskManager's services
    +	 * (library cache, shuffle network stack, ...), and starts the TaskManager itself.
    +	 *
    +	 * @param configuration    The configuration for the TaskManager.
    +	 * @param taskManagerClass The actor class to instantiate.
    +	 *                         Allows to use TaskManager subclasses for example for YARN.
    +	 */
    +	public static void selectNetworkInterfaceAndRunTaskManager(
    +		Configuration configuration,
    +		ResourceID resourceID,
    +		Class<? extends TaskManager> taskManagerClass) throws Exception {
    +
    +		Tuple2<String, Integer> tuple2 = selectNetworkInterfaceAndPort(configuration);
    +
    +		runTaskManager(tuple2._1(), resourceID, tuple2._2(), configuration, taskManagerClass);
    +	}
    +
    +	private static Tuple2<String, Integer> selectNetworkInterfaceAndPort(Configuration
configuration)
    +		throws Exception {
    +		String taskManagerHostname = configuration.getString(ConfigConstants.TASK_MANAGER_HOSTNAME_KEY,
null);
    +		if (taskManagerHostname != null) {
    +			LOG.info("Using configured hostname/address for TaskManager: " + taskManagerHostname);
    +		} else {
    +			LeaderRetrievalService leaderRetrievalService = LeaderRetrievalUtils.createLeaderRetrievalService(configuration);
    +			FiniteDuration lookupTimeout = AkkaUtils.getLookupTimeout(configuration);
    +
    +			InetAddress taskManagerAddress = LeaderRetrievalUtils.findConnectingAddress(leaderRetrievalService,
lookupTimeout);
    +			taskManagerHostname = taskManagerAddress.getHostName();
    +			LOG.info("TaskManager will use hostname/address '{}' ({}) for communication.",
    +				taskManagerHostname, taskManagerAddress.getHostAddress());
    +		}
    +
    +		// if no task manager port has been configured, use 0 (system will pick any free port)
    +		int actorSystemPort = configuration.getInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY,
0);
    +		if (actorSystemPort < 0 || actorSystemPort > 65535) {
    +			throw new IllegalConfigurationException("Invalid value for '" +
    +				ConfigConstants.TASK_MANAGER_IPC_PORT_KEY +
    +				"' (port for the TaskManager actor system) : " + actorSystemPort +
    +				" - Leave config parameter empty or use 0 to let the system choose a port automatically.");
    +		}
    +
    +		return new Tuple2<>(taskManagerHostname, actorSystemPort);
    +	}
    +
    +	/**
    +	 * Starts and runs the TaskManager. Brings up an actor system for the TaskManager and
its
    +	 * actors, starts the TaskManager's services (library cache, shuffle network stack,
...),
    +	 * and starts the TaskManager itself.
    +	 * <p/>
    +	 * This method will also spawn a process reaper for the TaskManager (kill the process
if
    +	 * the actor fails) and optionally start the JVM memory logging thread.
    +	 *
    +	 * @param taskManagerHostname The hostname/address of the interface where the actor
system
    +	 *                            will communicate.
    +	 * @param resourceID          The id of the resource which the task manager will run
on.
    +	 * @param actorSystemPort     The port at which the actor system will communicate.
    +	 * @param configuration       The configuration for the TaskManager.
    +	 * @param taskManagerClass    The actor class to instantiate. Allows the use of TaskManager
    +	 *                            subclasses for example for YARN.
    +	 */
    +	private static void runTaskManager(
    +		String taskManagerHostname,
    +		ResourceID resourceID,
    +		int actorSystemPort,
    +		Configuration configuration,
    +		Class<? extends TaskManager> taskManagerClass) throws Exception {
    +
    +		LOG.info("Starting TaskManager");
    +
    +		// Bring up the TaskManager actor system first, bind it to the given address.
    +
    +		LOG.info("Starting TaskManager actor system at " +
    +			NetUtils.hostAndPortToUrlString(taskManagerHostname, actorSystemPort));
    +
    +		ActorSystem taskManagerSystem;
    --- End diff --
    
    Would make this final.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

Mime
View raw message