flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-4363) Implement TaskManager basic startup of all components in java
Date Mon, 22 Aug 2016 09:49:20 GMT

    [ https://issues.apache.org/jira/browse/FLINK-4363?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15430462#comment-15430462
] 

ASF GitHub Bot commented on FLINK-4363:
---------------------------------------

Github user mxm commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2400#discussion_r75648406
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutor.java
---
    @@ -35,27 +79,634 @@
      */
     public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
     
    -	/** The unique resource ID of this TaskExecutor */
    +	private static final Logger LOG = LoggerFactory.getLogger(TaskExecutor.class);
    +
    +	/** Return code for critical errors during the runtime */
    +	private static final int RUNTIME_FAILURE_RETURN_CODE = 2;
    +
    +	/** The name of the TaskManager actor */
    +	private static final String TASK_MANAGER_NAME = "taskmanager";
    +
    +	/** The unique resource ID of this TaskManager */
     	private final ResourceID resourceID;
     
     	/** The access to the leader election and metadata storage services */
     	private final HighAvailabilityServices haServices;
     
    -	// --------- resource manager --------
    +	/** The task manager configuration */
    +	private final TaskManagerConfiguration taskManagerConfig;
    +
    +	/** The connection information of the task manager */
    +	private final InstanceConnectionInfo connectionInfo;
    +
    +	/** The I/O manager component in the task manager */
    +	private final IOManager ioManager;
    +
    +	/** The memory manager component in the task manager */
    +	private final MemoryManager memoryManager;
    +
    +	/** The network component in the task manager */
    +	private final NetworkEnvironment networkEnvironment;
    +
    +	/** The number of slots in the task manager, should be 1 for YARN */
    +	private final int numberOfSlots;
     
    +	// --------- resource manager --------
     	private TaskExecutorToResourceManagerConnection resourceManagerConnection;
     
     	// ------------------------------------------------------------------------
     
     	public TaskExecutor(
    +			TaskManagerConfiguration taskManagerConfig,
    +			ResourceID resourceID,
    +			InstanceConnectionInfo connectionInfo,
    +			MemoryManager memoryManager,
    +			IOManager ioManager,
    +			NetworkEnvironment networkEnvironment,
    +			int numberOfSlots,
     			RpcService rpcService,
    -			HighAvailabilityServices haServices,
    -			ResourceID resourceID) {
    +			HighAvailabilityServices haServices) {
     
     		super(rpcService);
     
    -		this.haServices = checkNotNull(haServices);
    +		this.taskManagerConfig = checkNotNull(taskManagerConfig);
     		this.resourceID = checkNotNull(resourceID);
    +		this.connectionInfo = checkNotNull(connectionInfo);
    +		this.memoryManager = checkNotNull(memoryManager);
    +		this.ioManager = checkNotNull(ioManager);
    +		this.networkEnvironment = checkNotNull(networkEnvironment);
    +		this.numberOfSlots = checkNotNull(numberOfSlots);
    +		this.haServices = checkNotNull(haServices);
    +	}
    +
    +	/**
    +	 * Starts and runs the TaskManager.
    +	 * <p/>
    +	 * This method first tries to select the network interface to use for the TaskManager
    +	 * communication. The network interface is used both for the actor communication
    +	 * (coordination) as well as for the data exchange between task managers. Unless
    +	 * the hostname/interface is explicitly configured in the configuration, this
    +	 * method will try out various interfaces and methods to connect to the JobManager
    +	 * and select the one where the connection attempt is successful.
    +	 * <p/>
    +	 * After selecting the network interface, this method brings up an actor system
    +	 * for the TaskManager and its actors, starts the TaskManager's services
    +	 * (library cache, shuffle network stack, ...), and starts the TaskManager itself.
    +	 *
    +	 * @param configuration    The configuration for the TaskManager.
    +	 * @param taskManagerClass The actor class to instantiate.
    +	 *                         Allows to use TaskManager subclasses for example for YARN.
    +	 */
    +	public static void selectNetworkInterfaceAndRunTaskManager(
    +		Configuration configuration,
    +		ResourceID resourceID,
    +		Class<? extends TaskManager> taskManagerClass) throws Exception {
    +
    +		Tuple2<String, Integer> tuple2 = selectNetworkInterfaceAndPort(configuration);
    +
    +		runTaskManager(tuple2._1(), resourceID, tuple2._2(), configuration, taskManagerClass);
    +	}
    +
    +	private static Tuple2<String, Integer> selectNetworkInterfaceAndPort(Configuration
configuration)
    +		throws Exception {
    +		String taskManagerHostname = configuration.getString(ConfigConstants.TASK_MANAGER_HOSTNAME_KEY,
null);
    +		if (taskManagerHostname != null) {
    +			LOG.info("Using configured hostname/address for TaskManager: " + taskManagerHostname);
    +		} else {
    +			LeaderRetrievalService leaderRetrievalService = LeaderRetrievalUtils.createLeaderRetrievalService(configuration);
    +			FiniteDuration lookupTimeout = AkkaUtils.getLookupTimeout(configuration);
    +
    +			InetAddress taskManagerAddress = LeaderRetrievalUtils.findConnectingAddress(leaderRetrievalService,
lookupTimeout);
    +			taskManagerHostname = taskManagerAddress.getHostName();
    +			LOG.info("TaskManager will use hostname/address '{}' ({}) for communication.",
    +				taskManagerHostname, taskManagerAddress.getHostAddress());
    +		}
    +
    +		// if no task manager port has been configured, use 0 (system will pick any free port)
    +		int actorSystemPort = configuration.getInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY,
0);
    +		if (actorSystemPort < 0 || actorSystemPort > 65535) {
    +			throw new IllegalConfigurationException("Invalid value for '" +
    +				ConfigConstants.TASK_MANAGER_IPC_PORT_KEY +
    +				"' (port for the TaskManager actor system) : " + actorSystemPort +
    +				" - Leave config parameter empty or use 0 to let the system choose a port automatically.");
    +		}
    +
    +		return new Tuple2<>(taskManagerHostname, actorSystemPort);
    +	}
    +
    +	/**
    +	 * Starts and runs the TaskManager. Brings up an actor system for the TaskManager and
its
    +	 * actors, starts the TaskManager's services (library cache, shuffle network stack,
...),
    +	 * and starts the TaskManager itself.
    +	 * <p/>
    +	 * This method will also spawn a process reaper for the TaskManager (kill the process
if
    +	 * the actor fails) and optionally start the JVM memory logging thread.
    +	 *
    +	 * @param taskManagerHostname The hostname/address of the interface where the actor
system
    +	 *                            will communicate.
    +	 * @param resourceID          The id of the resource which the task manager will run
on.
    +	 * @param actorSystemPort     The port at which the actor system will communicate.
    +	 * @param configuration       The configuration for the TaskManager.
    +	 * @param taskManagerClass    The actor class to instantiate. Allows the use of TaskManager
    +	 *                            subclasses for example for YARN.
    +	 */
    +	private static void runTaskManager(
    +		String taskManagerHostname,
    +		ResourceID resourceID,
    +		int actorSystemPort,
    +		Configuration configuration,
    +		Class<? extends TaskManager> taskManagerClass) throws Exception {
    +
    +		LOG.info("Starting TaskManager");
    +
    +		// Bring up the TaskManager actor system first, bind it to the given address.
    +
    +		LOG.info("Starting TaskManager actor system at " +
    +			NetUtils.hostAndPortToUrlString(taskManagerHostname, actorSystemPort));
    +
    +		ActorSystem taskManagerSystem;
    +		try {
    +			Tuple2<String, Object> address = new Tuple2<String, Object>(taskManagerHostname,
actorSystemPort);
    +			Config akkaConfig = AkkaUtils.getAkkaConfig(configuration, new Some<>(address));
    +			LOG.debug("Using akka configuration\n " + akkaConfig);
    +			taskManagerSystem = AkkaUtils.createActorSystem(akkaConfig);
    +		} catch (Throwable t) {
    +			if (t instanceof org.jboss.netty.channel.ChannelException) {
    +				Throwable cause = t.getCause();
    +				if (cause != null && t.getCause() instanceof java.net.BindException) {
    +					String address = NetUtils.hostAndPortToUrlString(taskManagerHostname, actorSystemPort);
    +					throw new IOException("Unable to bind TaskManager actor system to address " +
    +						address + " - " + cause.getMessage(), t);
    +				}
    +			}
    +			throw new Exception("Could not create TaskManager actor system", t);
    +		}
    +
    +		// start all the TaskManager services (network stack,  library cache, ...)
    +		// and the TaskManager actor
    +		try {
    +			LOG.info("Starting TaskManager actor");
    +			ActorRef taskManagerActor = startTaskManagerComponentsAndActor(
    +				configuration,
    +				resourceID,
    +				taskManagerSystem,
    +				taskManagerHostname,
    +				TASK_MANAGER_NAME,
    +				null,
    +				false,
    +				taskManagerClass);
    +
    +			// start a process reaper that watches the JobManager. If the TaskManager actor dies,
    +			// the process reaper will kill the JVM process (to ensure easy failure detection)
    +			LOG.debug("Starting TaskManager process reaper");
    +
    +			taskManagerSystem.actorOf(
    +				Props.create(ProcessReaper.class, taskManagerActor, LOG, RUNTIME_FAILURE_RETURN_CODE),
    +				"TaskManager_Process_Reaper");
    +
    +			// if desired, start the logging daemon that periodically logs the memory usage information
    +			if (LOG.isInfoEnabled() && configuration.getBoolean(
    +				ConfigConstants.TASK_MANAGER_DEBUG_MEMORY_USAGE_START_LOG_THREAD,
    +				ConfigConstants.DEFAULT_TASK_MANAGER_DEBUG_MEMORY_USAGE_START_LOG_THREAD)) {
    +				LOG.info("Starting periodic memory usage logger");
    +
    +				long interval = configuration.getLong(
    +					ConfigConstants.TASK_MANAGER_DEBUG_MEMORY_USAGE_LOG_INTERVAL_MS,
    +					ConfigConstants.DEFAULT_TASK_MANAGER_DEBUG_MEMORY_USAGE_LOG_INTERVAL_MS);
    +
    +				MemoryLogger logger = new MemoryLogger(LOG, interval, taskManagerSystem);
    +				logger.start();
    +			}
    +
    +			// block until everything is done
    +			taskManagerSystem.awaitTermination();
    +		} catch (Throwable t) {
    +			LOG.error("Error while starting up taskManager", t);
    +			try {
    +				taskManagerSystem.shutdown();
    +			} catch (Throwable tt) {
    +				LOG.warn("Could not cleanly shut down actor system", tt);
    +			}
    +			throw t;
    +		}
    +	}
    +
    +	// --------------------------------------------------------------------------
    +	//  Starting and running the TaskManager
    +	// --------------------------------------------------------------------------
    +
    +	/**
    +	 * @param configuration                 The configuration for the TaskManager.
    +	 * @param resourceID                    The id of the resource which the task manager
will run on.
    +	 * @param actorSystem                  The actor system that should run the TaskManager
actor.
    +	 * @param taskManagerHostname       The hostname/address that describes the TaskManager's
data location.
    +	 * @param taskManagerActorName      Optionally the name of the TaskManager actor. If
none is given,
    +	 *                                      the actor will use a random name.
    +	 * @param leaderRetrievalService        Optionally, a leader retrieval service can be
provided. If none is given,
    +	 *                                      then a LeaderRetrievalService is constructed
from the configuration.
    +	 * @param localTaskManagerCommunication If true, the TaskManager will not initiate the
TCP network stack.
    +	 * @param taskManagerClass      The class of the TaskManager actor. May be used to give
    +	 *                                      subclasses that understand additional actor
messages.
    +	 * @return An ActorRef to the TaskManager actor.
    +	 * @throws org.apache.flink.configuration.IllegalConfigurationException     Thrown,
if the given config contains illegal values.
    +	 * @throws java.io.IOException      Thrown, if any of the I/O components (such as buffer
pools,
    +	 *                                       I/O manager, ...) cannot be properly started.
    +	 * @throws java.lang.Exception      Thrown is some other error occurs while parsing
the configuration
    +	 *                                      or starting the TaskManager components.
    +	 */
    +	public static ActorRef startTaskManagerComponentsAndActor(
    +		Configuration configuration,
    +		ResourceID resourceID,
    +		ActorSystem actorSystem,
    +		String taskManagerHostname,
    +		String taskManagerActorName,
    +		LeaderRetrievalService leaderRetrievalService,
    +		boolean localTaskManagerCommunication,
    +		Class<? extends TaskManager> taskManagerClass) throws Exception {
    +
    +		Tuple4<TaskManagerConfiguration, NetworkEnvironmentConfiguration, InstanceConnectionInfo,
MemoryType> tuple4
    +			= parseTaskManagerConfiguration(configuration, taskManagerHostname, localTaskManagerCommunication);
    +
    +		TaskManagerConfiguration taskManagerConfig = tuple4._1();
    +		NetworkEnvironmentConfiguration netConfig = tuple4._2();
    +		InstanceConnectionInfo connectionInfo = tuple4._3();
    +		MemoryType memType = tuple4._4();
    +
    +		// pre-start checks
    +		checkTempDirs(taskManagerConfig.tmpDirPaths());
    +
    +		ExecutionContext executionContext = ExecutionContexts$.MODULE$.fromExecutor(new ForkJoinPool());
    +
    +		// we start the network first, to make sure it can allocate its buffers first
    +		NetworkEnvironment network = new NetworkEnvironment(executionContext, taskManagerConfig.timeout(),
netConfig);
    +
    +		// computing the amount of memory to use depends on how much memory is available
    +		// it strictly needs to happen AFTER the network stack has been initialized
    +
    +		// check if a value has been configured
    +		long configuredMemory = configuration.getLong(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY,
-1L);
    +		checkConfigParameter(configuredMemory == -1 || configuredMemory > 0, configuredMemory,
    +			ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY,
    +			"MemoryManager needs at least one MB of memory. " +
    +				"If you leave this config parameter empty, the system automatically " +
    +				"pick a fraction of the available memory.");
    +
    +		long memorySize;
    +		boolean preAllocateMemory = configuration.getBoolean(
    +			ConfigConstants.TASK_MANAGER_MEMORY_PRE_ALLOCATE_KEY,
    +			ConfigConstants.DEFAULT_TASK_MANAGER_MEMORY_PRE_ALLOCATE);
    +		if (configuredMemory > 0) {
    +			if (preAllocateMemory) {
    +				LOG.info("Using {} MB for managed memory." , configuredMemory);
    +			} else {
    +				LOG.info("Limiting managed memory to {} MB, memory will be allocated lazily." , configuredMemory);
    +			}
    +			memorySize = configuredMemory << 20; // megabytes to bytes
    +		} else {
    +			float fraction = configuration.getFloat(
    +				ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY,
    +				ConfigConstants.DEFAULT_MEMORY_MANAGER_MEMORY_FRACTION);
    +			checkConfigParameter(fraction > 0.0f && fraction < 1.0f, fraction,
    +				ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY,
    +				"MemoryManager fraction of the free memory must be between 0.0 and 1.0");
    +
    +			if (memType == MemoryType.HEAP) {
    +				long relativeMemSize = (long) (EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag()
* fraction);
    +				if (preAllocateMemory) {
    +					LOG.info("Using {} of the currently free heap space for managed heap memory ({}
MB)." ,
    +						fraction , relativeMemSize >> 20);
    +				} else {
    +					LOG.info("Limiting managed memory to {} of the currently free heap space ({} MB),
" +
    +						"memory will be allocated lazily." , fraction , relativeMemSize >> 20);
    +				}
    +				memorySize = relativeMemSize;
    +			} else if (memType == MemoryType.OFF_HEAP) {
    +				// The maximum heap memory has been adjusted according to the fraction
    +				long maxMemory = EnvironmentInformation.getMaxJvmHeapMemory();
    +				long directMemorySize = (long) (maxMemory / (1.0 - fraction) * fraction);
    +				if (preAllocateMemory) {
    +					LOG.info("Using {} of the maximum memory size for managed off-heap memory ({} MB)."
,
    +						fraction, directMemorySize >> 20);
    +				} else {
    +					LOG.info("Limiting managed memory to {} of the maximum memory size ({} MB)," +
    +						" memory will be allocated lazily.", fraction, directMemorySize >> 20);
    +				}
    +				memorySize = directMemorySize;
    +			} else {
    +				throw new RuntimeException("No supported memory type detected.");
    +			}
    +		}
    +
    +		// now start the memory manager
    +		MemoryManager memoryManager;
    +		try {
    +			memoryManager = new MemoryManager(
    +				memorySize,
    +				taskManagerConfig.numberOfSlots(),
    +				netConfig.networkBufferSize(),
    +				memType,
    +				preAllocateMemory);
    +		} catch (OutOfMemoryError e) {
    +			if (memType == MemoryType.HEAP) {
    +				throw new Exception("OutOfMemory error (" + e.getMessage() +
    +					") while allocating the TaskManager heap memory (" + memorySize + " bytes).", e);
    +			} else if (memType == MemoryType.OFF_HEAP) {
    +				throw new Exception("OutOfMemory error (" + e.getMessage() +
    +					") while allocating the TaskManager off-heap memory (" + memorySize +
    +					" bytes).Try increasing the maximum direct memory (-XX:MaxDirectMemorySize)", e);
    +			} else {
    +				throw e;
    +			}
    +		}
    +
    +		// start the I/O manager, it will create some temp directories.
    +		IOManager ioManager = new IOManagerAsync(taskManagerConfig.tmpDirPaths());
    +
    +		if (leaderRetrievalService == null){
    +			leaderRetrievalService = LeaderRetrievalUtils.createLeaderRetrievalService(configuration);
    +		}
    +
    +		// create the actor properties (which define the actor constructor parameters)
    +		Props tmProps = Props.create(
    +			taskManagerClass,
    +			taskManagerConfig,
    +			resourceID,
    +			connectionInfo,
    +			memoryManager,
    +			ioManager,
    +			network,
    +			taskManagerConfig.numberOfSlots(),
    +			leaderRetrievalService);
    +
    +		ActorRef taskManagerActorRef;
    +		if (taskManagerActorName != null && !taskManagerActorName.equals("")) {
    +			taskManagerActorRef = actorSystem.actorOf(tmProps, taskManagerActorName);
    +		} else {
    +			taskManagerActorRef = actorSystem.actorOf(tmProps);
    +		}
    +
    +		return taskManagerActorRef;
    +	}
    +
    +	// --------------------------------------------------------------------------
    +	//  Parsing and checking the TaskManager Configuration
    +	// --------------------------------------------------------------------------
    +
    +	/**
    +	 * Utility method to extract TaskManager config parameters from the configuration and
to
    +	 * sanity check them.
    +	 *
    +	 * @param configuration                 The configuration.
    +	 * @param taskManagerHostname           The host name under which the TaskManager communicates.
    +	 * @param localTaskManagerCommunication True, to skip initializing the network stack.
    +	 *                                      Use only in cases where only one task manager
runs.
    +	 * @return A tuple (TaskManagerConfiguration, network configuration,
    +	 * InstanceConnectionInfo, JobManager actor Akka URL).
    +	 */
    +	private static Tuple4<TaskManagerConfiguration, NetworkEnvironmentConfiguration,
InstanceConnectionInfo, MemoryType>
    +		parseTaskManagerConfiguration(Configuration configuration, String taskManagerHostname,
boolean localTaskManagerCommunication)
    +		throws Exception {
    +
    +		// ------- read values from the config and check them ---------
    +		//                      (a lot of them)
    +
    +		// ----> hosts / ports for communication and data exchange
    +
    +		int dataport = configuration.getInteger(ConfigConstants.TASK_MANAGER_DATA_PORT_KEY,
    +			ConfigConstants.DEFAULT_TASK_MANAGER_DATA_PORT);
    +		if (dataport == 0) {
    +			dataport = NetUtils.getAvailablePort();
    +		}
    +		checkConfigParameter(dataport > 0, dataport, ConfigConstants.TASK_MANAGER_DATA_PORT_KEY,
    +			"Leave config parameter empty or use 0 to let the system choose a port automatically.");
    +
    +		InetAddress taskManagerAddress = InetAddress.getByName(taskManagerHostname);
    +		InstanceConnectionInfo connectionInfo = new InstanceConnectionInfo(taskManagerAddress,
dataport);
    +
    +		// ----> memory / network stack (shuffles/broadcasts), task slots, temp directories
    +
    +		// we need this because many configs have been written with a "-1" entry
    +		int slots = configuration.getInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1);
    +		if (slots == -1) {
    +			slots = 1;
    +		}
    +		checkConfigParameter(slots >= 1, slots, ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS,
    +			"Number of task slots must be at least one.");
    +
    +		int numNetworkBuffers = configuration.getInteger(
    +			ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY,
    +			ConfigConstants.DEFAULT_TASK_MANAGER_NETWORK_NUM_BUFFERS);
    +		checkConfigParameter(numNetworkBuffers > 0, numNetworkBuffers,
    +			ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY, "");
    +
    +		int pageSize = configuration.getInteger(
    +			ConfigConstants.TASK_MANAGER_MEMORY_SEGMENT_SIZE_KEY,
    +			ConfigConstants.DEFAULT_TASK_MANAGER_MEMORY_SEGMENT_SIZE);
    +		// check page size of for minimum size
    +		checkConfigParameter(pageSize >= MemoryManager.MIN_PAGE_SIZE, pageSize,
    +			ConfigConstants.TASK_MANAGER_MEMORY_SEGMENT_SIZE_KEY,
    +			"Minimum memory segment size is " + MemoryManager.MIN_PAGE_SIZE);
    +		// check page size for power of two
    +		checkConfigParameter(MathUtils.isPowerOf2(pageSize), pageSize,
    +			ConfigConstants.TASK_MANAGER_MEMORY_SEGMENT_SIZE_KEY,
    +			"Memory segment size must be a power of 2.");
    +
    +		// check whether we use heap or off-heap memory
    +		MemoryType memType;
    +		if (configuration.getBoolean(ConfigConstants.TASK_MANAGER_MEMORY_OFF_HEAP_KEY, false))
{
    +			memType = MemoryType.OFF_HEAP;
    +		} else {
    +			memType = MemoryType.HEAP;
    +		}
    +
    +		// initialize the memory segment factory accordingly
    +		if (memType == MemoryType.HEAP) {
    +			if (!MemorySegmentFactory.isInitialized()) {
    +				MemorySegmentFactory.initializeFactory(HeapMemorySegment.FACTORY);
    +			} else if (MemorySegmentFactory.getFactory() != HeapMemorySegment.FACTORY) {
    +				throw new Exception("Memory type is set to heap memory, but memory segment " +
    +					"factory has been initialized for off-heap memory segments");
    +			}
    +		} else {
    +			if (!MemorySegmentFactory.isInitialized()) {
    +				MemorySegmentFactory.initializeFactory(HybridMemorySegment.FACTORY);
    +			} else if (MemorySegmentFactory.getFactory() != HybridMemorySegment.FACTORY) {
    +				throw new Exception("Memory type is set to off-heap memory, but memory segment "
+
    +					"factory has been initialized for heap memory segments");
    +			}
    +		}
    +
    +		String[] tmpDirs = configuration.getString(
    +			ConfigConstants.TASK_MANAGER_TMP_DIR_KEY,
    +			ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH).split(",|" + File.pathSeparator);
    +
    +		NettyConfig nettyConfig = null;
    --- End diff --
    
    `final` and no need to assign `null`.


> Implement TaskManager basic startup of all components in java
> -------------------------------------------------------------
>
>                 Key: FLINK-4363
>                 URL: https://issues.apache.org/jira/browse/FLINK-4363
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Cluster Management
>            Reporter: Zhijiang Wang
>            Assignee: Zhijiang Wang
>
> Similar with current {{TaskManager}},but implement initialization and startup all components
in java instead of scala.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message