flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From trohrm...@apache.org
Subject flink git commit: [fix] Resolve code warnings in JobManager and TaskManager
Date Wed, 16 Dec 2015 10:18:27 GMT
Repository: flink
Updated Branches:
  refs/heads/master 8f6f29426 -> 7f01f56f1


[fix] Resolve code warnings in JobManager and TaskManager


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7f01f56f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7f01f56f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7f01f56f

Branch: refs/heads/master
Commit: 7f01f56f1a6fb647bdc189da08ecc26cefcea24d
Parents: 8f6f294
Author: Till Rohrmann <trohrmann@apache.org>
Authored: Tue Dec 15 15:45:34 2015 +0100
Committer: Till Rohrmann <trohrmann@apache.org>
Committed: Wed Dec 16 11:16:41 2015 +0100

----------------------------------------------------------------------
 .../apache/flink/runtime/client/JobClient.java  |  2 +-
 .../flink/runtime/jobmanager/JobManager.scala   | 76 +++++++++-----------
 .../flink/runtime/taskmanager/TaskManager.scala | 72 +++++++++----------
 3 files changed, 67 insertions(+), 83 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/7f01f56f/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java
index b908eb1..60d942b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java
@@ -213,7 +213,7 @@ public class JobClient {
 					jobGraph,
 					ListeningBehaviour.DETACHED // only receive the Acknowledge for the job submission message
 				),
-					timeout);
+				timeout);
 			
 			result = Await.result(future, timeout);
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/7f01f56f/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 2645a7c..242e40b 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -19,7 +19,6 @@
 package org.apache.flink.runtime.jobmanager
 
 import java.io.{File, IOException}
-import java.lang.reflect.{Constructor, InvocationTargetException}
 import java.net.{UnknownHostException, InetAddress, InetSocketAddress}
 import java.util.UUID
 
@@ -43,7 +42,6 @@ import org.apache.flink.runtime.jobgraph.{JobGraph, JobStatus, JobVertexID}
 import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore.SubmittedJobGraphListener
 import org.apache.flink.runtime.jobmanager.scheduler.{Scheduler => FlinkScheduler}
 import org.apache.flink.runtime.leaderelection.{LeaderContender, LeaderElectionService, StandaloneLeaderElectionService}
-import org.apache.flink.runtime.leaderretrieval.{StandaloneLeaderRetrievalService, LeaderRetrievalService}
 import org.apache.flink.runtime.messages.ArchiveMessages.ArchiveExecutionGraph
 import org.apache.flink.runtime.messages.ExecutionGraphMessages.JobStatusChanged
 import org.apache.flink.runtime.messages.JobManagerMessages._
@@ -143,7 +141,7 @@ class JobManager(
    * The method also starts the leader election service.
    */
   override def preStart(): Unit = {
-    log.info(s"Starting JobManager at ${getAddress}.")
+    log.info(s"Starting JobManager at $getAddress.")
 
     try {
       leaderElectionService.start(this)
@@ -172,11 +170,11 @@ class JobManager(
   }
 
   override def postStop(): Unit = {
-    log.info(s"Stopping JobManager ${getAddress}.")
+    log.info(s"Stopping JobManager $getAddress.")
 
     val newFuturesToComplete = cancelAndClearEverything(
       new Exception("The JobManager is shutting down."),
-      true)
+      removeJobFromStateBackend = true)
 
     implicit val executionContext = context.dispatcher
 
@@ -235,8 +233,8 @@ class JobManager(
   override def handleMessage: Receive = {
 
     case GrantLeadership(newLeaderSessionID) =>
-      log.info(s"JobManager ${getAddress} was granted leadership with leader session ID "
+
-        s"${newLeaderSessionID}.")
+      log.info(s"JobManager $getAddress was granted leadership with leader session ID " +
+        s"$newLeaderSessionID.")
 
       leaderSessionID = newLeaderSessionID
 
@@ -257,7 +255,7 @@ class JobManager(
 
       val newFuturesToComplete = cancelAndClearEverything(
         new Exception("JobManager is no longer the leader."),
-        false)
+        removeJobFromStateBackend = false)
 
       futuresToComplete = Some(futuresToComplete.getOrElse(Seq()) ++ newFuturesToComplete)
 
@@ -349,7 +347,7 @@ class JobManager(
 
     case RecoverJob(jobId) =>
       future {
-        // The ActorRef, which is part of the submitted job graph can only be deserialized
in the
+        // The ActorRef, which is part of the submitted job graph can only be de-serialized
in the
         // scope of an actor system.
         akka.serialization.JavaSerializer.currentSystem.withValue(
           context.system.asInstanceOf[ExtendedActorSystem]) {
@@ -374,7 +372,7 @@ class JobManager(
 
     case RecoverAllJobs =>
       future {
-        // The ActorRef, which is part of the submitted job graph can only be deserialized
in the
+        // The ActorRef, which is part of the submitted job graph can only be de-serialized
in the
         // scope of an actor system.
         akka.serialization.JavaSerializer.currentSystem.withValue(
           context.system.asInstanceOf[ExtendedActorSystem]) {
@@ -515,11 +513,11 @@ class JobManager(
                 context.system.scheduler.scheduleOnce(jobInfo.sessionTimeout seconds) {
                   // remove only if no activity occurred in the meantime
                   if (lastActivity == jobInfo.lastActive) {
-                    self ! decorateMessage(RemoveJob(jobID, true))
+                    self ! decorateMessage(RemoveJob(jobID, removeJobFromStateBackend = true))
                   }
                 }(context.dispatcher)
               } else {
-                self ! decorateMessage(RemoveJob(jobID, true))
+                self ! decorateMessage(RemoveJob(jobID, removeJobFromStateBackend = true))
               }
 
               // is the client waiting for the job result?
@@ -571,7 +569,7 @@ class JobManager(
             }(context.dispatcher)
           }
         case None =>
-          self ! decorateMessage(RemoveJob(jobID, true))
+          self ! decorateMessage(RemoveJob(jobID, removeJobFromStateBackend = true))
       }
 
     case ScheduleOrUpdateConsumers(jobId, partitionId) =>
@@ -674,7 +672,7 @@ class JobManager(
       )
 
     case Heartbeat(instanceID, metricsReport, accumulators) =>
-      log.debug(s"Received hearbeat message from $instanceID.")
+      log.debug(s"Received heartbeat message from $instanceID.")
 
       updateAccumulators(accumulators)
 
@@ -714,7 +712,7 @@ class JobManager(
       currentJobs.get(jobID) match {
         case Some((graph, info)) =>
           if (graph.getState.isTerminalState) {
-            removeJob(graph.getJobID, true) match {
+            removeJob(graph.getJobID, removeJobFromStateBackend = true) match {
               case Some(futureToComplete) =>
                 futuresToComplete = Some(futuresToComplete.getOrElse(Seq()) :+ futureToComplete)
               case None =>
@@ -842,7 +840,7 @@ class JobManager(
         // initialize the vertices that have a master initialization hook
         // file output formats create directories here, input formats create splits
         if (log.isDebugEnabled) {
-          log.debug(s"Running initialization on master for job ${jobId} (${jobName}).")
+          log.debug(s"Running initialization on master for job $jobId ($jobName).")
         }
 
         val numSlots = scheduler.getTotalNumberOfSlots()
@@ -872,13 +870,13 @@ class JobManager(
         val sortedTopology = jobGraph.getVerticesSortedTopologicallyFromSources()
         if (log.isDebugEnabled) {
           log.debug(s"Adding ${sortedTopology.size()} vertices from " +
-            s"job graph ${jobId} (${jobName}).")
+            s"job graph $jobId ($jobName).")
         }
         executionGraph.attachJobGraph(sortedTopology)
 
         if (log.isDebugEnabled) {
           log.debug("Successfully created execution graph from job " +
-            s"graph ${jobId} (${jobName}).")
+            s"graph $jobId ($jobName).")
         }
 
         // configure the state checkpointing
@@ -938,7 +936,7 @@ class JobManager(
       }
       catch {
         case t: Throwable =>
-          log.error(s"Failed to submit job ${jobId} (${jobName})", t)
+          log.error(s"Failed to submit job $jobId ($jobName)", t)
 
           libraryCacheManager.unregisterJob(jobId)
           currentJobs.remove(jobId)
@@ -950,7 +948,7 @@ class JobManager(
           val rt: Throwable = if (t.isInstanceOf[JobExecutionException]) {
             t
           } else {
-            new JobExecutionException(jobId, s"Failed to submit job ${jobId} (${jobName})",
t)
+            new JobExecutionException(jobId, s"Failed to submit job $jobId ($jobName)", t)
           }
 
           jobInfo.client ! decorateMessage(JobResultFailure(new SerializedThrowable(rt)))
@@ -984,7 +982,7 @@ class JobManager(
           } else {
             // Remove the job graph. Otherwise it will be lingering around and possibly removed
from
             // ZooKeeper by this JM.
-            self ! decorateMessage(RemoveJob(jobId, false))
+            self ! decorateMessage(RemoveJob(jobId, removeJobFromStateBackend = false))
 
             log.warn(s"Submitted job $jobId, but not leader. The other leader needs to recover
" +
               "this. I am not scheduling the job for execution.")
@@ -994,9 +992,8 @@ class JobManager(
             executionGraph.fail(t)
           }
           catch {
-            case tt: Throwable => {
+            case tt: Throwable =>
               log.error("Error while marking ExecutionGraph as failed.", tt)
-            }
           }
         }
       }(context.dispatcher)
@@ -1273,9 +1270,8 @@ class JobManager(
             submittedJobGraphs.removeJobGraph(jobID)
           }
           catch {
-            case t: Throwable => {
+            case t: Throwable =>
               log.error("Error during submitted job graph clean up.", t)
-            }
           }
         }
 
@@ -1325,9 +1321,9 @@ class JobManager(
     AkkaUtils.getAkkaURL(context.system, self)
   }
 
-  /** Handles error occuring in the leader election service
+  /** Handles error occurring in the leader election service
     *
-    * @param exception
+    * @param exception Exception being thrown in the leader election service
     */
   override def handleError(exception: Exception): Unit = {
     log.error("Received an error from the LeaderElectionService.", exception)
@@ -1393,11 +1389,10 @@ object JobManager {
       parseArgs(args)
     }
     catch {
-      case t: Throwable => {
+      case t: Throwable =>
         LOG.error(t.getMessage(), t)
         System.exit(STARTUP_FAILURE_RETURN_CODE)
         null
-      }
     }
 
     // we want to check that the JobManager hostname is in the config
@@ -1452,10 +1447,9 @@ object JobManager {
       }
     }
     catch {
-      case t: Throwable => {
+      case t: Throwable =>
         LOG.error("Failed to run JobManager.", t)
         System.exit(STARTUP_FAILURE_RETURN_CODE)
-      }
     }
   }
 
@@ -1498,7 +1492,7 @@ object JobManager {
     * @param configuration The configuration object for the JobManager
     * @param executionMode The execution mode in which to run. Execution mode LOCAL with
spawn an
     *                      additional TaskManager in the same process.
-    * @param listeningAddress The hostname where the JobManager should lsiten for messages.
+    * @param listeningAddress The hostname where the JobManager should listen for messages.
     * @param listeningPort The port where the JobManager should listen for messages
     * @param jobManagerClass The class of the JobManager to be started
     * @param archiveClass The class of the Archivist to be started
@@ -1531,7 +1525,7 @@ object JobManager {
       AkkaUtils.createActorSystem(akkaConfig)
     }
     catch {
-      case t: Throwable => {
+      case t: Throwable =>
         if (t.isInstanceOf[org.jboss.netty.channel.ChannelException]) {
           val cause = t.getCause()
           if (cause != null && t.getCause().isInstanceOf[java.net.BindException])
{
@@ -1541,7 +1535,6 @@ object JobManager {
           }
         }
         throw new Exception("Could not create JobManager actor system", t)
-      }
     }
 
     val address = AkkaUtils.getAddress(jobManagerSystem)
@@ -1602,7 +1595,7 @@ object JobManager {
           listeningAddress,
           Some(TaskManager.TASK_MANAGER_NAME),
           None,
-          true,
+          localTaskManagerCommunication = true,
           classOf[TaskManager])
 
         LOG.debug("Starting TaskManager process reaper")
@@ -1624,7 +1617,7 @@ object JobManager {
       (jobManagerSystem, jobManager, archive, webMonitor)
     }
     catch {
-      case t: Throwable => {
+      case t: Throwable =>
         LOG.error("Error while starting up JobManager", t)
         try {
           jobManagerSystem.shutdown()
@@ -1632,7 +1625,6 @@ object JobManager {
           case tt: Throwable => LOG.warn("Could not cleanly shut down actor system", tt)
         }
         throw t
-      }
     }
   }
 
@@ -1678,7 +1670,7 @@ object JobManager {
 
     val config = parser.parse(args, new JobManagerCliOptions()).getOrElse {
       throw new Exception(
-        s"Invalid command line agruments: ${args.mkString(" ")}. Usage: ${parser.usage}")
+        s"Invalid command line arguments: ${args.mkString(" ")}. Usage: ${parser.usage}")
     }
     
     val configDir = config.getConfigDir()
@@ -1782,7 +1774,7 @@ object JobManager {
       catch {
         case n: NumberFormatException => throw new Exception(
           s"Invalid config value for ${ConfigConstants.DEFAULT_EXECUTION_RETRY_DELAY_KEY}:
" +
-            s"${pauseString}. Value must be a valid duration (such as 100 milli or 1 min)");
+            s"$pauseString. Value must be a valid duration (such as 100 milli or 1 min)");
       }
 
     val executionContext = ExecutionContext.fromExecutor(new ForkJoinPool())
@@ -1801,7 +1793,7 @@ object JobManager {
       instanceManager.addInstanceListener(scheduler)
     }
     catch {
-      case t: Throwable => {
+      case t: Throwable =>
         if (libraryCacheManager != null) {
           libraryCacheManager.shutdown()
         }
@@ -1815,7 +1807,6 @@ object JobManager {
           blobServer.shutdown()
         }
         throw t
-      }
     }
 
     // Create recovery related components
@@ -1993,10 +1984,9 @@ object JobManager {
       hostPort = new InetSocketAddress(inetAddress, port)
     }
     catch {
-      case e: UnknownHostException => {
+      case e: UnknownHostException =>
         throw new UnknownHostException(s"Cannot resolve the JobManager hostname '$hostname'
" +
           s"specified in the configuration")
-      }
     }
 
     JobManager.getRemoteJobManagerAkkaURL(hostPort, Option.empty)

http://git-wip-us.apache.org/repos/asf/flink/blob/7f01f56f/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index 78051b8..950b82c 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -267,9 +267,8 @@ class TaskManager(
     // messages for coordinating checkpoints
     case message: AbstractCheckpointMessage => handleCheckpointingMessage(message)
 
-    case JobManagerLeaderAddress(address, leaderSessionID) => {
-      handleJobManagerLeaderAddress(address, leaderSessionID)
-    }
+    case JobManagerLeaderAddress(address, newLeaderSessionID) =>
+      handleJobManagerLeaderAddress(address, newLeaderSessionID)
 
     // registration messages for connecting and disconnecting from / to the JobManager
     case message: RegistrationMessage => handleRegistrationMessage(message)
@@ -415,7 +414,7 @@ class TaskManager(
           if (task != null) {
             task.failExternally(cause)
           } else {
-            log.debug(s"Cannot find task to fail for execution ${executionID})")
+            log.debug(s"Cannot find task to fail for execution $executionID)")
           }
 
         // cancels a task
@@ -425,7 +424,7 @@ class TaskManager(
             task.cancelExecution()
             sender ! decorateMessage(new TaskOperationResult(executionID, true))
           } else {
-            log.debug(s"Cannot find task to cancel for execution ${executionID})")
+            log.debug(s"Cannot find task to cancel for execution $executionID)")
             sender ! decorateMessage(
               new TaskOperationResult(
                 executionID,
@@ -458,13 +457,13 @@ class TaskManager(
         val checkpointId = message.getCheckpointId
         val timestamp = message.getTimestamp
         
-        log.debug(s"Receiver TriggerCheckpoint ${checkpointId}@${timestamp} for $taskExecutionId.")
+        log.debug(s"Receiver TriggerCheckpoint $checkpointId@$timestamp for $taskExecutionId.")
 
         val task = runningTasks.get(taskExecutionId)
         if (task != null) {
           task.triggerCheckpointBarrier(checkpointId, timestamp)
         } else {
-          log.debug(s"Taskmanager received a checkpoint request for unknown task $taskExecutionId.")
+          log.debug(s"TaskManager received a checkpoint request for unknown task $taskExecutionId.")
         }
 
       case message: NotifyCheckpointComplete =>
@@ -472,14 +471,14 @@ class TaskManager(
         val checkpointId = message.getCheckpointId
         val timestamp = message.getTimestamp
 
-        log.debug(s"Receiver ConfirmCheckpoint ${checkpointId}@${timestamp} for $taskExecutionId.")
+        log.debug(s"Receiver ConfirmCheckpoint $checkpointId@$timestamp for $taskExecutionId.")
 
         val task = runningTasks.get(taskExecutionId)
         if (task != null) {
           task.notifyCheckpointComplete(checkpointId)
         } else {
           log.debug(
-            s"Taskmanager received a checkpoint confirmation for unknown task $taskExecutionId.")
+            s"TaskManager received a checkpoint confirmation for unknown task $taskExecutionId.")
         }
 
       // unknown checkpoint message
@@ -520,12 +519,12 @@ class TaskManager(
         else {
           if (!jobManagerAkkaURL.equals(Option(jobManagerURL))) {
             throw new Exception("Invalid internal state: Trying to register at JobManager
" +
-              s"${jobManagerURL} even though the current JobManagerAkkaURL is set to " +
+              s"$jobManagerURL even though the current JobManagerAkkaURL is set to " +
               s"${jobManagerAkkaURL.getOrElse("")}")
           }
 
-          log.info(s"Trying to register at JobManager ${jobManagerURL} " +
-            s"(attempt ${attempt}, timeout: ${timeout})")
+          log.info(s"Trying to register at JobManager $jobManagerURL " +
+            s"(attempt $attempt, timeout: $timeout)")
 
           val jobManager = context.actorSelection(jobManagerURL)
 
@@ -604,8 +603,8 @@ class TaskManager(
 
       case RefuseRegistration(reason) =>
         if (currentJobManager.isEmpty) {
-          log.error(s"The registration at JobManager ${jobManagerAkkaURL} was refused, "
+
-            s"because: ${reason}. Retrying later...")
+          log.error(s"The registration at JobManager $jobManagerAkkaURL was refused, " +
+            s"because: $reason. Retrying later...")
 
         if(jobManagerAkkaURL.isDefined) {
           // try the registration again after some time
@@ -889,7 +888,7 @@ class TaskManager(
         fileCache,
         runtimeInfo)
 
-      log.info(s"Received task ${task.getTaskInfo.getTaskNameWithSubtasks}")
+      log.info(s"Received task ${task.getTaskInfo.getTaskNameWithSubtasks()}")
 
       val execId = tdd.getExecutionId
       // add the task to the map
@@ -1002,7 +1001,7 @@ class TaskManager(
         }
       }
 
-      log.info(s"Unregistering task and sending final execution state " +
+      log.info(s"Un-registering task and sending final execution state " +
         s"${task.getExecutionState} to JobManager for task ${task.getTaskInfo.getTaskName}
" +
         s"(${task.getExecutionId})")
 
@@ -1107,8 +1106,8 @@ class TaskManager(
     * connected to another JobManager, it first disconnects from it. If the new JobManager
     * address is not null, then it starts the registration process.
     *
-    * @param newJobManagerAkkaURL
-    * @param leaderSessionID
+    * @param newJobManagerAkkaURL Akka URL of the new job manager
+    * @param leaderSessionID New leader session ID associated with the leader
     */
   private def handleJobManagerLeaderAddress(
       newJobManagerAkkaURL: String,
@@ -1119,7 +1118,7 @@ class TaskManager(
       case Some(jm) =>
         Option(newJobManagerAkkaURL) match {
           case Some(newJMAkkaURL) =>
-            handleJobManagerDisconnect(jm, s"JobManager ${newJMAkkaURL} was elected as leader.")
+            handleJobManagerDisconnect(jm, s"JobManager $newJMAkkaURL was elected as leader.")
           case None =>
             handleJobManagerDisconnect(jm, s"Old JobManager lost its leadership.")
         }
@@ -1175,11 +1174,11 @@ object TaskManager {
   /** The name of the TaskManager actor */
   val TASK_MANAGER_NAME = "taskmanager"
 
-  /** Maximum time (msecs) that the TaskManager will spend searching for a
+  /** Maximum time (milli seconds) that the TaskManager will spend searching for a
     * suitable network interface to use for communication */
   val MAX_STARTUP_CONNECT_TIME = 120000L
 
-  /** Time (msecs) after which the TaskManager will start logging failed
+  /** Time (milli seconds) after which the TaskManager will start logging failed
     * connection attempts */
   val STARTUP_CONNECT_LOG_SUPPRESS = 10000L
 
@@ -1218,11 +1217,10 @@ object TaskManager {
       parseArgsAndLoadConfig(args)
     }
     catch {
-      case t: Throwable => {
+      case t: Throwable =>
         LOG.error(t.getMessage(), t)
         System.exit(STARTUP_FAILURE_RETURN_CODE)
         null
-      }
     }
 
     // run the TaskManager (if requested in an authentication enabled context)
@@ -1241,10 +1239,9 @@ object TaskManager {
       }
     }
     catch {
-      case t: Throwable => {
+      case t: Throwable =>
         LOG.error("Failed to run TaskManager.", t)
         System.exit(STARTUP_FAILURE_RETURN_CODE)
-      }
     }
   }
 
@@ -1272,7 +1269,7 @@ object TaskManager {
     // parse the CLI arguments
     val cliConfig = parser.parse(args, new TaskManagerCliOptions()).getOrElse {
       throw new Exception(
-        s"Invalid command line agruments: ${args.mkString(" ")}. Usage: ${parser.usage}")
+        s"Invalid command line arguments: ${args.mkString(" ")}. Usage: ${parser.usage}")
     }
 
     // load the configuration
@@ -1346,7 +1343,7 @@ object TaskManager {
         lookupTimeout)
 
       taskManagerHostname = taskManagerAddress.getHostName()
-      LOG.info(s"TaskManager will use hostname/address '${taskManagerHostname}' " +
+      LOG.info(s"TaskManager will use hostname/address '$taskManagerHostname' " +
         s"(${taskManagerAddress.getHostAddress()}) for communication.")
     }
 
@@ -1430,7 +1427,7 @@ object TaskManager {
       AkkaUtils.createActorSystem(akkaConfig)
     }
     catch {
-      case t: Throwable => {
+      case t: Throwable =>
         if (t.isInstanceOf[org.jboss.netty.channel.ChannelException]) {
           val cause = t.getCause()
           if (cause != null && t.getCause().isInstanceOf[java.net.BindException])
{
@@ -1440,7 +1437,6 @@ object TaskManager {
           }
         }
         throw new Exception("Could not create TaskManager actor system", t)
-      }
     }
 
     // start all the TaskManager services (network stack,  library cache, ...)
@@ -1453,7 +1449,7 @@ object TaskManager {
         taskManagerHostname,
         Some(TASK_MANAGER_NAME),
         None,
-        false,
+        localTaskManagerCommunication = false,
         taskManagerClass)
 
       // start a process reaper that watches the JobManager. If the TaskManager actor dies,
@@ -1483,7 +1479,7 @@ object TaskManager {
       taskManagerSystem.awaitTermination()
     }
     catch {
-      case t: Throwable => {
+      case t: Throwable =>
         LOG.error("Error while starting up taskManager", t)
         try {
           taskManagerSystem.shutdown()
@@ -1491,7 +1487,6 @@ object TaskManager {
           case tt: Throwable => LOG.warn("Could not cleanly shut down actor system", tt)
         }
         throw t
-      }
     }
   }
 
@@ -1632,11 +1627,11 @@ object TaskManager {
         memType match {
           case MemoryType.HEAP =>
             throw new Exception(s"OutOfMemory error (${e.getMessage()})" +
-              s" while allocating the TaskManager heap memory (${memorySize} bytes).", e)
+              s" while allocating the TaskManager heap memory ($memorySize bytes).", e)
 
           case MemoryType.OFF_HEAP =>
             throw new Exception(s"OutOfMemory error (${e.getMessage()})" +
-              s" while allocating the TaskManager off-heap memory (${memorySize} bytes).
" +
+              s" while allocating the TaskManager off-heap memory ($memorySize bytes). "
+
               s"Try increasing the maximum direct memory (-XX:MaxDirectMemorySize)", e)
 
           case _ => throw e
@@ -1853,7 +1848,7 @@ object TaskManager {
       ConfigConstants.LIBRARY_CACHE_MANAGER_CLEANUP_INTERVAL,
       ConfigConstants.DEFAULT_LIBRARY_CACHE_MANAGER_CLEANUP_INTERVAL) * 1000
 
-    val finiteRegistratioDuration = try {
+    val finiteRegistrationDuration = try {
       val maxRegistrationDuration = Duration(configuration.getString(
         ConfigConstants.TASK_MANAGER_MAX_REGISTRATION_DURATION,
         ConfigConstants.DEFAULT_TASK_MANAGER_MAX_REGISTRATION_DURATION))
@@ -1873,7 +1868,7 @@ object TaskManager {
       tmpDirs,
       cleanupInterval,
       timeout,
-      finiteRegistratioDuration,
+      finiteRegistrationDuration,
       slots,
       configuration)
 
@@ -1934,7 +1929,7 @@ object TaskManager {
     : Unit = {
     if (!condition) {
       throw new IllegalConfigurationException(
-        s"Invalid configuration value for '${name}' : ${parameter} - ${errorMessage}")
+        s"Invalid configuration value for '$name' : $parameter - $errorMessage")
     }
   }
 
@@ -2018,10 +2013,9 @@ object TaskManager {
           fetchCPULoadMethod.map(_.invoke(osBean).asInstanceOf[Double]).getOrElse(-1.0)
         }
         catch {
-          case t: Throwable => {
+          case t: Throwable =>
             LOG.warn("Error retrieving CPU Load through OperatingSystemMXBean", t)
             -1.0
-          }
         }
       }
     })


Mime
View raw message