Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 6760D200BC1 for ; Tue, 1 Nov 2016 09:40:31 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 660C7160B0F; Tue, 1 Nov 2016 08:40:31 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 86B70160B0C for ; Tue, 1 Nov 2016 09:40:29 +0100 (CET) Received: (qmail 9054 invoked by uid 500); 1 Nov 2016 08:40:28 -0000 Mailing-List: contact commits-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list commits@flink.apache.org Received: (qmail 8775 invoked by uid 99); 1 Nov 2016 08:40:28 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 01 Nov 2016 08:40:28 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 29144E93E5; Tue, 1 Nov 2016 08:40:28 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: trohrmann@apache.org To: commits@flink.apache.org Date: Tue, 01 Nov 2016 08:40:34 -0000 Message-Id: <20bf241e22374d5b8358a3467bb7a1be@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [07/50] [abbrv] flink git commit: [hotfix] Replace TaskManager.createTaskManagerComponents by TaskManagerServices archived-at: Tue, 01 Nov 2016 08:40:31 -0000 [hotfix] Replace TaskManager.createTaskManagerComponents by TaskManagerServices Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8e653397 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8e653397 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8e653397 Branch: refs/heads/flip-6 Commit: 8e653397a931497026c8ae8be05b48a108a58edf Parents: 5219b40 Author: Till Rohrmann Authored: Wed Sep 28 14:04:54 2016 +0200 Committer: Till Rohrmann Committed: Tue Nov 1 09:39:28 2016 +0100 ---------------------------------------------------------------------- .../clusterframework/MesosTaskManager.scala | 3 +- .../taskexecutor/TaskManagerConfiguration.java | 25 +- .../TaskManagerServicesConfiguration.java | 2 +- .../minicluster/LocalFlinkMiniCluster.scala | 47 +- .../flink/runtime/taskmanager/TaskManager.scala | 601 ++----------------- .../taskmanager/TaskManagerConfiguration.scala | 56 -- ...askManagerComponentsStartupShutdownTest.java | 24 +- .../testingUtils/TestingTaskManager.scala | 3 +- .../runtime/testingUtils/TestingUtils.scala | 1 - .../flink/yarn/TestingYarnTaskManager.scala | 3 +- .../org/apache/flink/yarn/YarnTaskManager.scala | 3 +- 11 files changed, 126 insertions(+), 642 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/8e653397/flink-mesos/src/main/scala/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManager.scala ---------------------------------------------------------------------- diff --git a/flink-mesos/src/main/scala/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManager.scala b/flink-mesos/src/main/scala/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManager.scala index 3972a57..e8d6a58 100644 --- a/flink-mesos/src/main/scala/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManager.scala +++ b/flink-mesos/src/main/scala/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManager.scala @@ -24,7 +24,8 @@ import org.apache.flink.runtime.io.network.NetworkEnvironment import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService import org.apache.flink.runtime.memory.MemoryManager import org.apache.flink.runtime.metrics.MetricRegistry -import org.apache.flink.runtime.taskmanager.{TaskManager, TaskManagerConfiguration, TaskManagerLocation} +import org.apache.flink.runtime.taskexecutor.TaskManagerConfiguration +import org.apache.flink.runtime.taskmanager.{TaskManager, TaskManagerLocation} /** An extension of the TaskManager that listens for additional Mesos-related * messages. http://git-wip-us.apache.org/repos/asf/flink/blob/8e653397/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java index 32eb8c1..f58af77 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java @@ -41,6 +41,7 @@ public class TaskManagerConfiguration { private final String[] tmpDirPaths; private final Time timeout; + // null indicates an infinite duration private final Time maxRegistrationDuration; private final Time initialRegistrationPause; private final Time maxRegistrationPause; @@ -48,6 +49,9 @@ public class TaskManagerConfiguration { private final long cleanupInterval; + // TODO: remove necessity for complete configuration object + private final Configuration configuration; + public TaskManagerConfiguration( int numberSlots, String[] tmpDirPaths, @@ -56,16 +60,18 @@ public class TaskManagerConfiguration { Time initialRegistrationPause, Time maxRegistrationPause, Time refusedRegistrationPause, - long cleanupInterval) { + long cleanupInterval, + Configuration configuration) { this.numberSlots = numberSlots; this.tmpDirPaths = Preconditions.checkNotNull(tmpDirPaths); this.timeout = Preconditions.checkNotNull(timeout); - this.maxRegistrationDuration = Preconditions.checkNotNull(maxRegistrationDuration); + this.maxRegistrationDuration = maxRegistrationDuration; this.initialRegistrationPause = Preconditions.checkNotNull(initialRegistrationPause); this.maxRegistrationPause = Preconditions.checkNotNull(maxRegistrationPause); this.refusedRegistrationPause = Preconditions.checkNotNull(refusedRegistrationPause); this.cleanupInterval = Preconditions.checkNotNull(cleanupInterval); + this.configuration = Preconditions.checkNotNull(configuration); } public int getNumberSlots() { @@ -100,6 +106,10 @@ public class TaskManagerConfiguration { return cleanupInterval; } + public Configuration getConfiguration() { + return configuration; + } + // -------------------------------------------------------------------------------------------- // Static factory methods // -------------------------------------------------------------------------------------------- @@ -138,7 +148,7 @@ public class TaskManagerConfiguration { ConfigConstants.TASK_MANAGER_MAX_REGISTRATION_DURATION, ConfigConstants.DEFAULT_TASK_MANAGER_MAX_REGISTRATION_DURATION)); if (maxRegistrationDuration.isFinite()) { - finiteRegistrationDuration = Time.seconds(maxRegistrationDuration.toSeconds()); + finiteRegistrationDuration = Time.milliseconds(maxRegistrationDuration.toMillis()); } else { finiteRegistrationDuration = null; } @@ -153,7 +163,7 @@ public class TaskManagerConfiguration { ConfigConstants.TASK_MANAGER_INITIAL_REGISTRATION_PAUSE, ConfigConstants.DEFAULT_TASK_MANAGER_INITIAL_REGISTRATION_PAUSE)); if (pause.isFinite()) { - initialRegistrationPause = Time.seconds(pause.toSeconds()); + initialRegistrationPause = Time.milliseconds(pause.toMillis()); } else { throw new IllegalArgumentException("The initial registration pause must be finite: " + pause); } @@ -168,7 +178,7 @@ public class TaskManagerConfiguration { ConfigConstants.TASK_MANAGER_MAX_REGISTARTION_PAUSE, ConfigConstants.DEFAULT_TASK_MANAGER_MAX_REGISTRATION_PAUSE)); if (pause.isFinite()) { - maxRegistrationPause = Time.seconds(pause.toSeconds()); + maxRegistrationPause = Time.milliseconds(pause.toMillis()); } else { throw new IllegalArgumentException("The maximum registration pause must be finite: " + pause); } @@ -183,7 +193,7 @@ public class TaskManagerConfiguration { ConfigConstants.TASK_MANAGER_REFUSED_REGISTRATION_PAUSE, ConfigConstants.DEFAULT_TASK_MANAGER_REFUSED_REGISTRATION_PAUSE)); if (pause.isFinite()) { - refusedRegistrationPause = Time.seconds(pause.toSeconds()); + refusedRegistrationPause = Time.milliseconds(pause.toMillis()); } else { throw new IllegalArgumentException("The refused registration pause must be finite: " + pause); } @@ -200,6 +210,7 @@ public class TaskManagerConfiguration { initialRegistrationPause, maxRegistrationPause, refusedRegistrationPause, - cleanupInterval); + cleanupInterval, + configuration); } } http://git-wip-us.apache.org/repos/asf/flink/blob/8e653397/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java index 66d969a..80dfc09 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java @@ -208,7 +208,7 @@ public class TaskManagerServicesConfiguration { int dataport = configuration.getInteger(ConfigConstants.TASK_MANAGER_DATA_PORT_KEY, ConfigConstants.DEFAULT_TASK_MANAGER_DATA_PORT); - checkConfigParameter(dataport > 0, dataport, ConfigConstants.TASK_MANAGER_DATA_PORT_KEY, + checkConfigParameter(dataport >= 0, dataport, ConfigConstants.TASK_MANAGER_DATA_PORT_KEY, "Leave config parameter empty or use 0 to let the system choose a port automatically."); checkConfigParameter(slots >= 1, slots, ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, http://git-wip-us.apache.org/repos/asf/flink/blob/8e653397/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala index cad2648..eac0a51 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala @@ -18,6 +18,7 @@ package org.apache.flink.runtime.minicluster +import java.net.InetAddress import java.util.concurrent.ExecutorService import akka.actor.{ActorRef, ActorSystem, Props} @@ -42,8 +43,9 @@ import org.apache.flink.runtime.memory.MemoryManager import org.apache.flink.runtime.messages.JobManagerMessages import org.apache.flink.runtime.messages.JobManagerMessages.{RunningJobsStatus, StoppingFailure, StoppingResponse} import org.apache.flink.runtime.metrics.MetricRegistry -import org.apache.flink.runtime.taskmanager.{TaskManager, TaskManagerConfiguration, TaskManagerLocation} -import org.apache.flink.runtime.util.EnvironmentInformation +import org.apache.flink.runtime.taskexecutor.{TaskManagerConfiguration, TaskManagerServices, TaskManagerServicesConfiguration} +import org.apache.flink.runtime.taskmanager.{TaskManager, TaskManagerLocation} +import org.apache.flink.runtime.util.{EnvironmentInformation, LeaderRetrievalUtils} import scala.concurrent.Await import scala.concurrent.duration.FiniteDuration @@ -195,31 +197,32 @@ class LocalFlinkMiniCluster( val resourceID = ResourceID.generate() // generate random resource id - val (taskManagerConfig, - taskManagerLocation, - memoryManager, - ioManager, - network, - leaderRetrievalService, - metricsRegistry) = TaskManager.createTaskManagerComponents( + val taskManagerAddress = InetAddress.getByName(hostname) + + val taskManagerConfiguration = TaskManagerConfiguration.fromConfiguration(config) + val taskManagerServicesConfiguration = TaskManagerServicesConfiguration.fromConfiguration( config, - resourceID, - hostname, // network interface to bind to - localExecution, // start network stack? - Some(createLeaderRetrievalService())) + taskManagerAddress, + localExecution) + + val taskManagerServices = TaskManagerServices.fromConfiguration( + taskManagerServicesConfiguration, + resourceID) + + val metricRegistry = taskManagerServices.getMetricRegistry() val props = getTaskManagerProps( taskManagerClass, - taskManagerConfig, + taskManagerConfiguration, resourceID, - taskManagerLocation, - memoryManager, - ioManager, - network, - leaderRetrievalService, - metricsRegistry) - - metricsRegistry.startQueryService(system, resourceID) + taskManagerServices.getTaskManagerLocation(), + taskManagerServices.getMemoryManager(), + taskManagerServices.getIOManager(), + taskManagerServices.getNetworkEnvironment, + createLeaderRetrievalService(), + metricRegistry) + + metricRegistry.startQueryService(system, resourceID) system.actorOf(props, taskManagerActorName) } http://git-wip-us.apache.org/repos/asf/flink/blob/8e653397/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala index c789156..0701f3a 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala @@ -31,7 +31,6 @@ import grizzled.slf4j.Logger import org.apache.commons.lang3.exception.ExceptionUtils import org.apache.flink.configuration._ import org.apache.flink.core.fs.FileSystem -import org.apache.flink.core.memory.{HeapMemorySegment, HybridMemorySegment, MemorySegmentFactory, MemoryType} import org.apache.flink.runtime.accumulators.AccumulatorSnapshot import org.apache.flink.runtime.clusterframework.messages.StopCluster import org.apache.flink.runtime.clusterframework.types.ResourceID @@ -44,12 +43,8 @@ import org.apache.flink.runtime.execution.librarycache.{BlobLibraryCacheManager, import org.apache.flink.runtime.executiongraph.{ExecutionAttemptID, PartitionInfo} import org.apache.flink.runtime.filecache.FileCache import org.apache.flink.runtime.instance.{AkkaActorGateway, HardwareDescription, InstanceID} -import org.apache.flink.runtime.io.disk.iomanager.IOManager.IOMode -import org.apache.flink.runtime.io.disk.iomanager.{IOManager, IOManagerAsync} -import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool -import org.apache.flink.runtime.io.network.{LocalConnectionManager, NetworkEnvironment, TaskEventDispatcher} -import org.apache.flink.runtime.io.network.netty.{NettyConfig, NettyConnectionManager, PartitionStateChecker} -import org.apache.flink.runtime.io.network.partition.{ResultPartitionConsumableNotifier, ResultPartitionManager} +import org.apache.flink.runtime.io.disk.iomanager.IOManager +import org.apache.flink.runtime.io.network.NetworkEnvironment import org.apache.flink.runtime.leaderretrieval.{LeaderRetrievalListener, LeaderRetrievalService} import org.apache.flink.runtime.memory.MemoryManager import org.apache.flink.runtime.messages.{Acknowledge, StackTraceSampleResponse} @@ -59,17 +54,16 @@ import org.apache.flink.runtime.messages.StackTraceSampleMessages.{SampleTaskSta import org.apache.flink.runtime.messages.TaskManagerMessages._ import org.apache.flink.runtime.messages.TaskMessages._ import org.apache.flink.runtime.messages.checkpoint.{AbstractCheckpointMessage, NotifyCheckpointComplete, TriggerCheckpoint} -import org.apache.flink.runtime.metrics.{MetricRegistryConfiguration, MetricRegistry => FlinkMetricRegistry} +import org.apache.flink.runtime.metrics.{MetricRegistry => FlinkMetricRegistry} import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup import org.apache.flink.runtime.metrics.util.MetricUtils import org.apache.flink.runtime.process.ProcessReaper -import org.apache.flink.runtime.query.KvStateRegistry -import org.apache.flink.runtime.query.netty.{DisabledKvStateRequestStats, KvStateServer} import org.apache.flink.runtime.security.SecurityContext.{FlinkSecuredRunner, SecurityConfiguration} import org.apache.flink.runtime.security.SecurityContext +import org.apache.flink.runtime.taskexecutor.{TaskManagerConfiguration, TaskManagerServices, TaskManagerServicesConfiguration} import org.apache.flink.runtime.util._ import org.apache.flink.runtime.{FlinkActor, LeaderSessionMessageFilter, LogMessages} -import org.apache.flink.util.{MathUtils, NetUtils} +import org.apache.flink.util.NetUtils import scala.collection.JavaConverters._ import scala.concurrent._ @@ -136,7 +130,7 @@ class TaskManager( override val log = Logger(getClass) /** The timeout for all actor ask futures */ - protected val askTimeout = new Timeout(config.timeout) + protected val askTimeout = new Timeout(config.getTimeout().getSize, config.getTimeout().getUnit()) /** The TaskManager's physical execution resources */ protected val resources = HardwareDescription.extractFromSystem(memoryManager.getMemorySize()) @@ -148,7 +142,7 @@ class TaskManager( protected val bcVarManager = new BroadcastVariableManager() /** Handler for distributed files cached by this TaskManager */ - protected val fileCache = new FileCache(config.configuration) + protected val fileCache = new FileCache(config.getConfiguration()) private var taskManagerMetricGroup : TaskManagerMetricGroup = _ @@ -172,8 +166,8 @@ class TaskManager( private val runtimeInfo = new TaskManagerRuntimeInfo( location.getHostname(), - new UnmodifiableConfiguration(config.configuration), - config.tmpDirPaths) + new UnmodifiableConfiguration(config.getConfiguration()), + config.getTmpDirPaths()) private var scheduledTaskManagerRegistration: Option[Cancellable] = None private var currentRegistrationRun: UUID = UUID.randomUUID() @@ -587,7 +581,9 @@ class TaskManager( ) // the next timeout computes via exponential backoff with cap - val nextTimeout = (timeout * 2).min(config.maxRegistrationPause) + val nextTimeout = (timeout * 2).min(new FiniteDuration( + config.getMaxRegistrationPause().toMilliseconds, + TimeUnit.MILLISECONDS)) // schedule (with our timeout s delay) a check triggers a new registration // attempt, if we are not registered by then @@ -661,10 +657,14 @@ class TaskManager( if(jobManagerAkkaURL.isDefined) { // try the registration again after some time - val delay: FiniteDuration = config.refusedRegistrationPause - val deadline: Option[Deadline] = config.maxRegistrationDuration.map { - timeout => timeout + delay fromNow - } + val delay: FiniteDuration = new FiniteDuration( + config.getRefusedRegistrationPause().getSize(), + config.getRefusedRegistrationPause().getUnit()) + val deadline: Option[Deadline] = Option(config.getMaxRegistrationDuration()) + .map { + duration => new FiniteDuration(duration.getSize(), duration.getUnit()) + + delay fromNow + } // start a new registration run currentRegistrationRun = UUID.randomUUID() @@ -676,7 +676,9 @@ class TaskManager( self ! decorateMessage( TriggerTaskManagerRegistration( jobManagerAkkaURL.get, - config.initialRegistrationPause, + new FiniteDuration( + config.getInitialRegistrationPause().getSize(), + config.getInitialRegistrationPause().getUnit()), deadline, 1, currentRegistrationRun) @@ -817,7 +819,7 @@ class TaskManager( requestType: LogTypeRequest, jobManager: ActorRef) : Unit = { - val logFilePathOption = Option(config.configuration.getString( + val logFilePathOption = Option(config.getConfiguration().getString( ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, System.getProperty("log.file"))); logFilePathOption match { case None => throw new IOException("TaskManager log files are unavailable. " + @@ -950,9 +952,10 @@ class TaskManager( log.info(s"Determined BLOB server address to be $address. Starting BLOB cache.") try { - val blobcache = new BlobCache(address, config.configuration) + val blobcache = new BlobCache(address, config.getConfiguration()) blobService = Option(blobcache) - libraryCacheManager = Some(new BlobLibraryCacheManager(blobcache, config.cleanupInterval)) + libraryCacheManager = Some( + new BlobLibraryCacheManager(blobcache, config.getCleanupInterval())) } catch { case e: Exception => @@ -1134,7 +1137,9 @@ class TaskManager( tdd.getJobID, tdd.getVertexID, tdd.getExecutionId, - config.timeout) + new FiniteDuration( + config.getTimeout().getSize(), + config.getTimeout().getUnit())) val task = new Task( tdd, @@ -1408,7 +1413,8 @@ class TaskManager( def triggerTaskManagerRegistration(): Unit = { if(jobManagerAkkaURL.isDefined) { // begin attempts to reconnect - val deadline: Option[Deadline] = config.maxRegistrationDuration.map(_.fromNow) + val deadline: Option[Deadline] = Option(config.getMaxRegistrationDuration()) + .map{ duration => new FiniteDuration(duration.getSize(), duration.getUnit()).fromNow } // start a new registration run currentRegistrationRun = UUID.randomUUID() @@ -1418,7 +1424,9 @@ class TaskManager( self ! decorateMessage( TriggerTaskManagerRegistration( jobManagerAkkaURL.get, - config.initialRegistrationPause, + new FiniteDuration( + config.getInitialRegistrationPause().getSize(), + config.getInitialRegistrationPause().getUnit()), deadline, 1, currentRegistrationRun) @@ -1825,32 +1833,37 @@ object TaskManager { taskManagerClass: Class[_ <: TaskManager]) : ActorRef = { - val (taskManagerConfig, - connectionInfo, - memoryManager, - ioManager, - network, - leaderRetrievalService, - metricsRegistry) = createTaskManagerComponents( - configuration, - resourceID, - taskManagerHostname, - localTaskManagerCommunication, - leaderRetrievalServiceOption) + val taskManagerAddress = InetAddress.getByName(taskManagerHostname) + + val taskManagerServicesConfiguration = TaskManagerServicesConfiguration + .fromConfiguration(configuration, taskManagerAddress, false) + + val taskManagerConfiguration = TaskManagerConfiguration.fromConfiguration(configuration) + + val taskManagerServices = TaskManagerServices.fromConfiguration( + taskManagerServicesConfiguration, + resourceID) + + val metricRegistry = taskManagerServices.getMetricRegistry() + + val leaderRetrievalService = leaderRetrievalServiceOption match { + case Some(lrs) => lrs + case None => LeaderRetrievalUtils.createLeaderRetrievalService(configuration) + } // create the actor properties (which define the actor constructor parameters) val tmProps = getTaskManagerProps( taskManagerClass, - taskManagerConfig, + taskManagerConfiguration, resourceID, - connectionInfo, - memoryManager, - ioManager, - network, + taskManagerServices.getTaskManagerLocation(), + taskManagerServices.getMemoryManager(), + taskManagerServices.getIOManager(), + taskManagerServices.getNetworkEnvironment(), leaderRetrievalService, - metricsRegistry) + metricRegistry) - metricsRegistry.startQueryService(actorSystem, resourceID) + metricRegistry.startQueryService(actorSystem, resourceID) taskManagerActorName match { case Some(actorName) => actorSystem.actorOf(tmProps, actorName) @@ -1877,211 +1890,11 @@ object TaskManager { memoryManager, ioManager, networkEnvironment, - taskManagerConfig.numberOfSlots, + taskManagerConfig.getNumberSlots(), leaderRetrievalService, metricsRegistry) } - def createTaskManagerComponents( - configuration: Configuration, - resourceID: ResourceID, - taskManagerHostname: String, - localTaskManagerCommunication: Boolean, - leaderRetrievalServiceOption: Option[LeaderRetrievalService]): - (TaskManagerConfiguration, - TaskManagerLocation, - MemoryManager, - IOManager, - NetworkEnvironment, - LeaderRetrievalService, - FlinkMetricRegistry) = { - - val (taskManagerConfig : TaskManagerConfiguration, - netConfig: NetworkEnvironmentConfiguration, - taskManagerAddress: InetSocketAddress, - memType: MemoryType - ) = parseTaskManagerConfiguration( - configuration, - taskManagerHostname, - localTaskManagerCommunication) - - // pre-start checks - checkTempDirs(taskManagerConfig.tmpDirPaths) - - val networkBufferPool = new NetworkBufferPool( - netConfig.numNetworkBuffers, - netConfig.networkBufferSize, - netConfig.memoryType) - - val connectionManager = Option(netConfig.nettyConfig) match { - case Some(nettyConfig) => new NettyConnectionManager(nettyConfig) - case None => new LocalConnectionManager() - } - - val resultPartitionManager = new ResultPartitionManager() - val taskEventDispatcher = new TaskEventDispatcher() - - val kvStateRegistry = new KvStateRegistry() - - val kvStateServer = Option(netConfig.nettyConfig) match { - case Some(nettyConfig) => - - val numNetworkThreads = if (netConfig.queryServerNetworkThreads == 0) { - nettyConfig.getNumberOfSlots - } else { - netConfig.queryServerNetworkThreads - } - - val numQueryThreads = if (netConfig.queryServerQueryThreads == 0) { - nettyConfig.getNumberOfSlots - } else { - netConfig.queryServerQueryThreads - } - - new KvStateServer( - taskManagerAddress.getAddress(), - netConfig.queryServerPort, - numNetworkThreads, - numQueryThreads, - kvStateRegistry, - new DisabledKvStateRequestStats()) - - case None => null - } - - // we start the network first, to make sure it can allocate its buffers first - val network = new NetworkEnvironment( - networkBufferPool, - connectionManager, - resultPartitionManager, - taskEventDispatcher, - kvStateRegistry, - kvStateServer, - netConfig.ioMode, - netConfig.partitionRequestInitialBackoff, - netConfig.partitinRequestMaxBackoff) - - network.start() - - val taskManagerLocation = new TaskManagerLocation( - resourceID, - taskManagerAddress.getAddress(), - network.getConnectionManager().getDataPort()) - - // computing the amount of memory to use depends on how much memory is available - // it strictly needs to happen AFTER the network stack has been initialized - - // check if a value has been configured - val configuredMemory = configuration.getLong(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, -1L) - checkConfigParameter(configuredMemory == -1 || configuredMemory > 0, configuredMemory, - ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, - "MemoryManager needs at least one MB of memory. " + - "If you leave this config parameter empty, the system automatically " + - "pick a fraction of the available memory.") - - - val preAllocateMemory = configuration.getBoolean( - ConfigConstants.TASK_MANAGER_MEMORY_PRE_ALLOCATE_KEY, - ConfigConstants.DEFAULT_TASK_MANAGER_MEMORY_PRE_ALLOCATE) - - val memorySize = if (configuredMemory > 0) { - if (preAllocateMemory) { - LOG.info(s"Using $configuredMemory MB for managed memory.") - } else { - LOG.info(s"Limiting managed memory to $configuredMemory MB, " + - s"memory will be allocated lazily.") - } - configuredMemory << 20 // megabytes to bytes - } - else { - val fraction = configuration.getFloat( - ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY, - ConfigConstants.DEFAULT_MEMORY_MANAGER_MEMORY_FRACTION) - checkConfigParameter(fraction > 0.0f && fraction < 1.0f, fraction, - ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY, - "MemoryManager fraction of the free memory must be between 0.0 and 1.0") - - if (memType == MemoryType.HEAP) { - val relativeMemSize = (EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag() * - fraction).toLong - - if (preAllocateMemory) { - LOG.info(s"Using $fraction of the currently free heap space for managed " + - s"heap memory (${relativeMemSize >> 20} MB).") - } else { - LOG.info(s"Limiting managed memory to $fraction of the currently free heap space " + - s"(${relativeMemSize >> 20} MB), memory will be allocated lazily.") - } - - relativeMemSize - } - else if (memType == MemoryType.OFF_HEAP) { - - // The maximum heap memory has been adjusted according to the fraction - val maxMemory = EnvironmentInformation.getMaxJvmHeapMemory() - val directMemorySize = (maxMemory / (1.0 - fraction) * fraction).toLong - - if (preAllocateMemory) { - LOG.info(s"Using $fraction of the maximum memory size for " + - s"managed off-heap memory (${directMemorySize >> 20} MB).") - } else { - LOG.info(s"Limiting managed memory to $fraction of the maximum memory size " + - s"(${directMemorySize >> 20} MB), memory will be allocated lazily.") - } - - directMemorySize - } - else { - throw new RuntimeException("No supported memory type detected.") - } - } - - // now start the memory manager - val memoryManager = try { - new MemoryManager( - memorySize, - taskManagerConfig.numberOfSlots, - netConfig.networkBufferSize, - memType, - preAllocateMemory) - } - catch { - case e: OutOfMemoryError => - memType match { - case MemoryType.HEAP => - throw new Exception(s"OutOfMemory error (${e.getMessage()})" + - s" while allocating the TaskManager heap memory ($memorySize bytes).", e) - - case MemoryType.OFF_HEAP => - throw new Exception(s"OutOfMemory error (${e.getMessage()})" + - s" while allocating the TaskManager off-heap memory ($memorySize bytes). " + - s"Try increasing the maximum direct memory (-XX:MaxDirectMemorySize)", e) - - case _ => throw e - } - } - - // start the I/O manager last, it will create some temp directories. - val ioManager: IOManager = new IOManagerAsync(taskManagerConfig.tmpDirPaths) - - val leaderRetrievalService = leaderRetrievalServiceOption match { - case Some(lrs) => lrs - case None => LeaderRetrievalUtils.createLeaderRetrievalService(configuration) - } - - val metricsRegistry = new FlinkMetricRegistry( - MetricRegistryConfiguration.fromConfiguration(configuration)) - - (taskManagerConfig, - taskManagerLocation, - memoryManager, - ioManager, - network, - leaderRetrievalService, - metricsRegistry) - } - - // -------------------------------------------------------------------------- // Resolving the TaskManager actor // -------------------------------------------------------------------------- @@ -2121,239 +1934,6 @@ object TaskManager { // -------------------------------------------------------------------------- /** - * Utility method to extract TaskManager config parameters from the configuration and to - * sanity check them. - * - * @param configuration The configuration. - * @param taskManagerHostname The host name under which the TaskManager communicates. - * @param localTaskManagerCommunication True, to skip initializing the network stack. - * Use only in cases where only one task manager runs. - * @return A tuple (TaskManagerConfiguration, network configuration, inet socket address, - * memory tyep). - */ - @throws(classOf[IllegalArgumentException]) - def parseTaskManagerConfiguration( - configuration: Configuration, - taskManagerHostname: String, - localTaskManagerCommunication: Boolean) - : (TaskManagerConfiguration, - NetworkEnvironmentConfiguration, - InetSocketAddress, - MemoryType) = { - - // ------- read values from the config and check them --------- - // (a lot of them) - - // ----> hosts / ports for communication and data exchange - - val dataport = configuration.getInteger(ConfigConstants.TASK_MANAGER_DATA_PORT_KEY, - ConfigConstants.DEFAULT_TASK_MANAGER_DATA_PORT) - - checkConfigParameter(dataport >= 0, dataport, ConfigConstants.TASK_MANAGER_DATA_PORT_KEY, - "Leave config parameter empty or use 0 to let the system choose a port automatically.") - - val taskManagerAddress = InetAddress.getByName(taskManagerHostname) - val taskManagerInetSocketAddress = new InetSocketAddress(taskManagerAddress, dataport) - - // ----> memory / network stack (shuffles/broadcasts), task slots, temp directories - - // we need this because many configs have been written with a "-1" entry - val slots = configuration.getInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1) match { - case -1 => 1 - case x => x - } - - checkConfigParameter(slots >= 1, slots, ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, - "Number of task slots must be at least one.") - - val numNetworkBuffers = configuration.getInteger( - ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY, - ConfigConstants.DEFAULT_TASK_MANAGER_NETWORK_NUM_BUFFERS) - - checkConfigParameter(numNetworkBuffers > 0, numNetworkBuffers, - ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY) - - val pageSize: Int = configuration.getInteger( - ConfigConstants.TASK_MANAGER_MEMORY_SEGMENT_SIZE_KEY, - ConfigConstants.DEFAULT_TASK_MANAGER_MEMORY_SEGMENT_SIZE) - - // check page size of for minimum size - checkConfigParameter(pageSize >= MemoryManager.MIN_PAGE_SIZE, pageSize, - ConfigConstants.TASK_MANAGER_MEMORY_SEGMENT_SIZE_KEY, - "Minimum memory segment size is " + MemoryManager.MIN_PAGE_SIZE) - - // check page size for power of two - checkConfigParameter(MathUtils.isPowerOf2(pageSize), pageSize, - ConfigConstants.TASK_MANAGER_MEMORY_SEGMENT_SIZE_KEY, - "Memory segment size must be a power of 2.") - - // check whether we use heap or off-heap memory - val memType: MemoryType = - if (configuration.getBoolean(ConfigConstants.TASK_MANAGER_MEMORY_OFF_HEAP_KEY, false)) { - MemoryType.OFF_HEAP - } else { - MemoryType.HEAP - } - - // initialize the memory segment factory accordingly - memType match { - case MemoryType.HEAP => - if (!MemorySegmentFactory.initializeIfNotInitialized(HeapMemorySegment.FACTORY)) { - throw new Exception("Memory type is set to heap memory, but memory segment " + - "factory has been initialized for off-heap memory segments") - } - - case MemoryType.OFF_HEAP => - if (!MemorySegmentFactory.initializeIfNotInitialized(HybridMemorySegment.FACTORY)) { - throw new Exception("Memory type is set to off-heap memory, but memory segment " + - "factory has been initialized for heap memory segments") - } - } - - val tmpDirs = configuration.getString( - ConfigConstants.TASK_MANAGER_TMP_DIR_KEY, - ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH) - .split(",|" + File.pathSeparator) - - val nettyConfig = if (localTaskManagerCommunication) { - None - } else { - Some( - new NettyConfig( - taskManagerInetSocketAddress.getAddress(), - taskManagerInetSocketAddress.getPort(), - pageSize, - slots, - configuration) - ) - } - - // Default spill I/O mode for intermediate results - val syncOrAsync = configuration.getString( - ConfigConstants.TASK_MANAGER_NETWORK_DEFAULT_IO_MODE, - ConfigConstants.DEFAULT_TASK_MANAGER_NETWORK_DEFAULT_IO_MODE) - - val ioMode : IOMode = if (syncOrAsync == "async") IOMode.ASYNC else IOMode.SYNC - - val queryServerPort = configuration.getInteger( - ConfigConstants.QUERYABLE_STATE_SERVER_PORT, - ConfigConstants.DEFAULT_QUERYABLE_STATE_SERVER_PORT) - - val queryServerNetworkThreads = configuration.getInteger( - ConfigConstants.QUERYABLE_STATE_SERVER_NETWORK_THREADS, - ConfigConstants.DEFAULT_QUERYABLE_STATE_SERVER_NETWORK_THREADS) - - val queryServerQueryThreads = configuration.getInteger( - ConfigConstants.QUERYABLE_STATE_SERVER_QUERY_THREADS, - ConfigConstants.DEFAULT_QUERYABLE_STATE_SERVER_QUERY_THREADS) - - val networkConfig = NetworkEnvironmentConfiguration( - numNetworkBuffers, - pageSize, - memType, - ioMode, - queryServerPort, - queryServerNetworkThreads, - queryServerQueryThreads, - nettyConfig.getOrElse(null)) - - // ----> timeouts, library caching, profiling - - val timeout = try { - AkkaUtils.getTimeout(configuration) - } catch { - case e: Exception => throw new IllegalArgumentException( - s"Invalid format for '${ConfigConstants.AKKA_ASK_TIMEOUT}'. " + - s"Use formats like '50 s' or '1 min' to specify the timeout.") - } - LOG.info("Messages between TaskManager and JobManager have a max timeout of " + timeout) - - val cleanupInterval = configuration.getLong( - ConfigConstants.LIBRARY_CACHE_MANAGER_CLEANUP_INTERVAL, - ConfigConstants.DEFAULT_LIBRARY_CACHE_MANAGER_CLEANUP_INTERVAL) * 1000 - - val finiteRegistrationDuration = try { - val maxRegistrationDuration = Duration(configuration.getString( - ConfigConstants.TASK_MANAGER_MAX_REGISTRATION_DURATION, - ConfigConstants.DEFAULT_TASK_MANAGER_MAX_REGISTRATION_DURATION)) - - if (maxRegistrationDuration.isFinite()) { - Some(maxRegistrationDuration.asInstanceOf[FiniteDuration]) - } else { - None - } - } catch { - case e: NumberFormatException => throw new IllegalArgumentException( - "Invalid format for parameter " + ConfigConstants.TASK_MANAGER_MAX_REGISTRATION_DURATION, - e) - } - - val initialRegistrationPause = try { - val pause = Duration(configuration.getString( - ConfigConstants.TASK_MANAGER_INITIAL_REGISTRATION_PAUSE, - ConfigConstants.DEFAULT_TASK_MANAGER_INITIAL_REGISTRATION_PAUSE - )) - - if (pause.isFinite()) { - pause.asInstanceOf[FiniteDuration] - } else { - throw new IllegalArgumentException(s"The initial registration pause must be finite: $pause") - } - } catch { - case e: NumberFormatException => throw new IllegalArgumentException( - "Invalid format for parameter " + ConfigConstants.TASK_MANAGER_INITIAL_REGISTRATION_PAUSE, - e) - } - - val maxRegistrationPause = try { - val pause = Duration(configuration.getString( - ConfigConstants.TASK_MANAGER_MAX_REGISTARTION_PAUSE, - ConfigConstants.DEFAULT_TASK_MANAGER_MAX_REGISTRATION_PAUSE - )) - - if (pause.isFinite()) { - pause.asInstanceOf[FiniteDuration] - } else { - throw new IllegalArgumentException(s"The maximum registration pause must be finite: $pause") - } - } catch { - case e: NumberFormatException => throw new IllegalArgumentException( - "Invalid format for parameter " + ConfigConstants.TASK_MANAGER_INITIAL_REGISTRATION_PAUSE, - e) - } - - val refusedRegistrationPause = try { - val pause = Duration(configuration.getString( - ConfigConstants.TASK_MANAGER_REFUSED_REGISTRATION_PAUSE, - ConfigConstants.DEFAULT_TASK_MANAGER_REFUSED_REGISTRATION_PAUSE - )) - - if (pause.isFinite()) { - pause.asInstanceOf[FiniteDuration] - } else { - throw new IllegalArgumentException(s"The refused registration pause must be finite: $pause") - } - } catch { - case e: NumberFormatException => throw new IllegalArgumentException( - "Invalid format for parameter " + ConfigConstants.TASK_MANAGER_INITIAL_REGISTRATION_PAUSE, - e) - } - - val taskManagerConfig = TaskManagerConfiguration( - tmpDirs, - cleanupInterval, - timeout, - finiteRegistrationDuration, - slots, - configuration, - initialRegistrationPause, - maxRegistrationPause, - refusedRegistrationPause) - - (taskManagerConfig, networkConfig, taskManagerInetSocketAddress, memType) - } - - /** * Gets the protocol, hostname and port of the JobManager from the configuration. Also checks that * the hostname is not null and the port non-negative. * @@ -2389,67 +1969,6 @@ object TaskManager { // -------------------------------------------------------------------------- /** - * Validates a condition for a config parameter and displays a standard exception, if the - * the condition does not hold. - * - * @param condition The condition that must hold. If the condition is false, an - * exception is thrown. - * @param parameter The parameter value. Will be shown in the exception message. - * @param name The name of the config parameter. Will be shown in the exception message. - * @param errorMessage The optional custom error message to append to the exception message. - * @throws IllegalConfigurationException Thrown if the condition is violated. - */ - @throws(classOf[IllegalConfigurationException]) - private def checkConfigParameter( - condition: Boolean, - parameter: Any, - name: String, - errorMessage: String = "") - : Unit = { - if (!condition) { - throw new IllegalConfigurationException( - s"Invalid configuration value for '$name' : $parameter - $errorMessage") - } - } - - /** - * Validates that all the directories denoted by the strings do actually exist, are proper - * directories (not files), and are writable. - * - * @param tmpDirs The array of directory paths to check. - * @throws Exception Thrown if any of the directories does not exist or is not writable - * or is a file, rather than a directory. - */ - @throws(classOf[IOException]) - private def checkTempDirs(tmpDirs: Array[String]): Unit = { - tmpDirs.zipWithIndex.foreach { - case (dir: String, _) => - val file = new File(dir) - - if (!file.exists) { - throw new IOException( - s"Temporary file directory ${file.getAbsolutePath} does not exist.") - } - if (!file.isDirectory) { - throw new IOException( - s"Temporary file directory ${file.getAbsolutePath} is not a directory.") - } - if (!file.canWrite) { - throw new IOException( - s"Temporary file directory ${file.getAbsolutePath} is not writable.") - } - - if (LOG.isInfoEnabled) { - val totalSpaceGb = file.getTotalSpace >> 30 - val usableSpaceGb = file.getUsableSpace >> 30 - val usablePercentage = usableSpaceGb.asInstanceOf[Double] / totalSpaceGb * 100 - - val path = file.getAbsolutePath - - LOG.info(f"Temporary file directory '$path': total $totalSpaceGb GB, " + - f"usable $usableSpaceGb GB ($usablePercentage%.2f%% usable)") - } - case (_, id) => throw new IllegalArgumentException(s"Temporary file directory #$id is null.") } } } http://git-wip-us.apache.org/repos/asf/flink/blob/8e653397/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManagerConfiguration.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManagerConfiguration.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManagerConfiguration.scala deleted file mode 100644 index aab3c5f..0000000 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManagerConfiguration.scala +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.taskmanager - -import java.util.concurrent.TimeUnit - -import org.apache.flink.configuration.Configuration - -import scala.concurrent.duration.FiniteDuration - -case class TaskManagerConfiguration( - tmpDirPaths: Array[String], - cleanupInterval: Long, - timeout: FiniteDuration, - maxRegistrationDuration: Option[FiniteDuration], - numberOfSlots: Int, - configuration: Configuration, - initialRegistrationPause: FiniteDuration, - maxRegistrationPause: FiniteDuration, - refusedRegistrationPause: FiniteDuration) { - - def this( - tmpDirPaths: Array[String], - cleanupInterval: Long, - timeout: FiniteDuration, - maxRegistrationDuration: Option[FiniteDuration], - numberOfSlots: Int, - configuration: Configuration) { - this ( - tmpDirPaths, - cleanupInterval, - timeout, - maxRegistrationDuration, - numberOfSlots, - configuration, - FiniteDuration(500, TimeUnit.MILLISECONDS), - FiniteDuration(30, TimeUnit.SECONDS), - FiniteDuration(10, TimeUnit.SECONDS)) - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/8e653397/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java index 627a25a..500d1bd 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java @@ -26,6 +26,7 @@ import akka.actor.Kill; import akka.actor.Props; import akka.testkit.JavaTestKit; +import org.apache.flink.api.common.time.Time; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.memory.MemoryType; @@ -49,11 +50,11 @@ import org.apache.flink.runtime.messages.TaskManagerMessages; import org.apache.flink.runtime.metrics.MetricRegistry; import org.apache.flink.runtime.metrics.MetricRegistryConfiguration; import org.apache.flink.runtime.query.KvStateRegistry; +import org.apache.flink.runtime.taskexecutor.TaskManagerConfiguration; import org.apache.flink.runtime.util.LeaderRetrievalUtils; import org.junit.Test; -import scala.Option; import scala.concurrent.duration.FiniteDuration; import java.net.InetAddress; @@ -69,7 +70,7 @@ public class TaskManagerComponentsStartupShutdownTest { public void testComponentsStartupShutdown() { final String[] TMP_DIR = new String[] { ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH }; - final FiniteDuration timeout = new FiniteDuration(100, TimeUnit.SECONDS); + final Time timeout = Time.seconds(100); final int BUFFER_SIZE = 32 * 1024; Configuration config = new Configuration(); @@ -93,14 +94,19 @@ public class TaskManagerComponentsStartupShutdownTest { LeaderRetrievalUtils.createLeaderRetrievalService(config, jobManager), StandaloneResourceManager.class); + final int numberOfSlots = 1; + // create the components for the TaskManager manually final TaskManagerConfiguration tmConfig = new TaskManagerConfiguration( - TMP_DIR, - 1000000, - timeout, - Option.empty(), - 1, - config); + numberOfSlots, + TMP_DIR, + timeout, + null, + Time.milliseconds(500), + Time.seconds(30), + Time.seconds(10), + 1000000, // cleanup interval + config); final NetworkEnvironmentConfiguration netConf = new NetworkEnvironmentConfiguration( 32, BUFFER_SIZE, MemoryType.HEAP, IOManager.IOMode.SYNC, 0, 0, 0, @@ -125,8 +131,6 @@ public class TaskManagerComponentsStartupShutdownTest { network.start(); - final int numberOfSlots = 1; - LeaderRetrievalService leaderRetrievalService = new StandaloneLeaderRetrievalService(jobManager.path().toString()); MetricRegistryConfiguration metricRegistryConfiguration = MetricRegistryConfiguration.fromConfiguration(config); http://git-wip-us.apache.org/repos/asf/flink/blob/8e653397/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala index 707401b..09dc5ed 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala @@ -24,7 +24,8 @@ import org.apache.flink.runtime.io.network.NetworkEnvironment import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService import org.apache.flink.runtime.memory.MemoryManager import org.apache.flink.runtime.metrics.MetricRegistry -import org.apache.flink.runtime.taskmanager.{TaskManagerLocation, TaskManager, TaskManagerConfiguration} +import org.apache.flink.runtime.taskexecutor.TaskManagerConfiguration +import org.apache.flink.runtime.taskmanager.{TaskManager, TaskManagerLocation} import scala.language.postfixOps http://git-wip-us.apache.org/repos/asf/flink/blob/8e653397/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala index 73fb928..dba8834 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala @@ -242,7 +242,6 @@ object TestingUtils { ) } - def createTaskManager( actorSystem: ActorSystem, jobManagerURL: String, http://git-wip-us.apache.org/repos/asf/flink/blob/8e653397/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnTaskManager.scala ---------------------------------------------------------------------- diff --git a/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnTaskManager.scala b/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnTaskManager.scala index 1010432..0f82faa 100644 --- a/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnTaskManager.scala +++ b/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnTaskManager.scala @@ -24,7 +24,8 @@ import org.apache.flink.runtime.io.network.NetworkEnvironment import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService import org.apache.flink.runtime.memory.MemoryManager import org.apache.flink.runtime.metrics.MetricRegistry -import org.apache.flink.runtime.taskmanager.{TaskManagerConfiguration, TaskManagerLocation} +import org.apache.flink.runtime.taskexecutor.TaskManagerConfiguration +import org.apache.flink.runtime.taskmanager.TaskManagerLocation import org.apache.flink.runtime.testingUtils.TestingTaskManagerLike /** [[YarnTaskManager]] implementation which mixes in the [[TestingTaskManagerLike]] mixin. http://git-wip-us.apache.org/repos/asf/flink/blob/8e653397/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnTaskManager.scala ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnTaskManager.scala b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnTaskManager.scala index 2ab9b20..be31085 100644 --- a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnTaskManager.scala +++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnTaskManager.scala @@ -23,8 +23,9 @@ import org.apache.flink.runtime.io.disk.iomanager.IOManager import org.apache.flink.runtime.io.network.NetworkEnvironment import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService import org.apache.flink.runtime.memory.MemoryManager -import org.apache.flink.runtime.taskmanager.{TaskManager, TaskManagerConfiguration, TaskManagerLocation} +import org.apache.flink.runtime.taskmanager.{TaskManager, TaskManagerLocation} import org.apache.flink.runtime.metrics.MetricRegistry +import org.apache.flink.runtime.taskexecutor.TaskManagerConfiguration /** An extension of the TaskManager that listens for additional YARN related * messages.