flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [2/4] incubator-flink git commit: [FLINK-1349] [runtime] Various fixes concerning Akka - Remove obsolete code from old IPC net utils - Smaller Writable/IOReadableWritable serialzation buffer start size (most messages are rather small) - For message lo
Date Tue, 06 Jan 2015 10:45:29 GMT
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/972a7b01/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index d8950bd..5b6ee86 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -23,7 +23,7 @@ import java.lang.management.{GarbageCollectorMXBean, MemoryMXBean, ManagementFac
 import java.net.{InetAddress, InetSocketAddress}
 import java.util
 import java.util.concurrent.{FutureTask, TimeUnit}
-
+import scala.collection.JavaConverters._
 import akka.actor._
 import akka.pattern.ask
 import org.apache.flink.api.common.cache.DistributedCache
@@ -34,35 +34,46 @@ import org.apache.flink.runtime.akka.AkkaUtils
 import org.apache.flink.runtime.blob.BlobCache
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager
 import org.apache.flink.runtime.execution.{CancelTaskException, ExecutionState, RuntimeEnvironment}
-import org.apache.flink.runtime.execution.librarycache.{BlobLibraryCacheManager,
-FallbackLibraryCacheManager, LibraryCacheManager}
+import org.apache.flink.runtime.execution.librarycache.{BlobLibraryCacheManager, FallbackLibraryCacheManager,
LibraryCacheManager}
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID
 import org.apache.flink.runtime.filecache.FileCache
 import org.apache.flink.runtime.instance.{InstanceConnectionInfo, HardwareDescription, InstanceID}
 import org.apache.flink.runtime.io.disk.iomanager.{IOManagerAsync}
 import org.apache.flink.runtime.io.network.netty.NettyConnectionManager
-import org.apache.flink.runtime.io.network.{NetworkConnectionManager, LocalConnectionManager,
-ChannelManager}
+import org.apache.flink.runtime.io.network.{NetworkConnectionManager, LocalConnectionManager,
ChannelManager}
 import org.apache.flink.runtime.jobgraph.JobID
 import org.apache.flink.runtime.jobmanager.JobManager
 import org.apache.flink.runtime.memorymanager.DefaultMemoryManager
 import org.apache.flink.runtime.messages.JobManagerMessages.UpdateTaskExecutionState
-import org.apache.flink.runtime.messages.RegistrationMessages.{RegisterTaskManager,
-AcknowledgeRegistration}
+import org.apache.flink.runtime.messages.RegistrationMessages.{RegisterTaskManager, AcknowledgeRegistration}
 import org.apache.flink.runtime.messages.TaskManagerMessages._
-import org.apache.flink.runtime.messages.TaskManagerProfilerMessages.{UnmonitorTask, MonitorTask,
-RegisterProfilingListener}
+import org.apache.flink.runtime.messages.TaskManagerProfilerMessages.{UnmonitorTask, MonitorTask,
RegisterProfilingListener}
 import org.apache.flink.runtime.net.NetUtils
 import org.apache.flink.runtime.profiling.ProfilingUtils
 import org.apache.flink.runtime.util.EnvironmentInformation
 import org.apache.flink.util.ExceptionUtils
 import org.slf4j.LoggerFactory
-
 import scala.concurrent.Future
 import scala.concurrent.duration._
 import scala.util.Failure
 import scala.util.Success
-
+import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor
+
+/**
+ * 
+ * 
+ * The TaskManager has the following phases:
+ * 
+ *  - Waiting to be registered with its JobManager. In that phase, it periodically sends

+ *    [[RegisterAtJobManager]] messages to itself, which trigger the sending of
+ *    a [[RegisterTaskManager]] message to the JobManager.
+ *  
+ *  - Upon successful registration, the JobManager replies with an [[AcknowledgeRegistration]]
+ *    message. This stops the registration messages and initializes all fields
+ *    that require the JobManager's actor reference
+ *    
+ *  - ...
+ */
 class TaskManager(val connectionInfo: InstanceConnectionInfo, val jobManagerAkkaURL: String,
                   val taskManagerConfig: TaskManagerConfiguration,
                   val networkConnectionConfig: NetworkConnectionConfiguration)
@@ -70,17 +81,12 @@ class TaskManager(val connectionInfo: InstanceConnectionInfo, val jobManagerAkka
 
   import context._
   import taskManagerConfig.{timeout => tmTimeout, _}
-  import scala.collection.JavaConverters._
   implicit val timeout = tmTimeout
 
   log.info(s"Starting task manager at ${self.path}.")
 
-  val REGISTRATION_DELAY = 0 seconds
-  val REGISTRATION_INTERVAL = 10 seconds
-  val MAX_REGISTRATION_ATTEMPTS = 10
-  val HEARTBEAT_INTERVAL = 5000 millisecond
-
   TaskManager.checkTempDirs(tmpDirPaths)
+   
   val ioManager = new IOManagerAsync(tmpDirPaths)
   val memoryManager = new DefaultMemoryManager(memorySize, numberOfSlots, pageSize)
   val bcVarManager = new BroadcastVariableManager();
@@ -92,29 +98,24 @@ class TaskManager(val connectionInfo: InstanceConnectionInfo, val jobManagerAkka
   val waitForRegistration = scala.collection.mutable.Set[ActorRef]();
 
   val profiler = profilingInterval match {
-    case Some(interval) => Some(TaskManager.startProfiler(self.path.toSerializationFormat,
-      interval))
+    case Some(interval) => 
+              Some(TaskManager.startProfiler(self.path.toSerializationFormat, interval))
     case None => None
   }
 
-  var libraryCacheManager: LibraryCacheManager = null
+  var libraryCacheManager: LibraryCacheManager = _
   var channelManager: Option[ChannelManager] = None
   var registrationScheduler: Option[Cancellable] = None
   var registrationAttempts: Int = 0
   var registered: Boolean = false
   var currentJobManager = ActorRef.noSender
   var instanceID: InstanceID = null;
-  var memoryMXBean: Option[MemoryMXBean] = None
-  var gcMXBeans: Option[Iterable[GarbageCollectorMXBean]] = None
   var heartbeatScheduler: Option[Cancellable] = None
 
   if (log.isDebugEnabled) {
     memoryLogggingIntervalMs.foreach {
       interval =>
         val d = FiniteDuration(interval, TimeUnit.MILLISECONDS)
-        memoryMXBean = Some(ManagementFactory.getMemoryMXBean)
-        gcMXBeans = Some(ManagementFactory.getGarbageCollectorMXBeans.asScala)
-
         context.system.scheduler.schedule(d, d, self, LogMemoryUsage)
     }
   }
@@ -137,8 +138,7 @@ class TaskManager(val connectionInfo: InstanceConnectionInfo, val jobManagerAkka
         try {
           channelManager.shutdown()
         } catch {
-          case t: Throwable =>
-            log.error(t, "ChannelManager did not shutdown properly.")
+          case t: Throwable => log.error(t, "ChannelManager did not shutdown properly.")
         }
     }
 
@@ -146,16 +146,22 @@ class TaskManager(val connectionInfo: InstanceConnectionInfo, val jobManagerAkka
     memoryManager.shutdown()
     fileCache.shutdown()
 
-    if(libraryCacheManager != null){
-      libraryCacheManager.shutdown()
+    if (libraryCacheManager != null) {
+      try {
+        libraryCacheManager.shutdown()
+      }
+      catch {
+        case t: Throwable => log.error(t, "LibraryCacheManager did not shutdown properly.")
+      }
     }
   }
 
-  def tryJobManagerRegistration(): Unit = {
+  private def tryJobManagerRegistration(): Unit = {
     registrationAttempts = 0
     import context.dispatcher
-    registrationScheduler = Some(context.system.scheduler.schedule(REGISTRATION_DELAY,
-      REGISTRATION_INTERVAL, self, RegisterAtJobManager))
+    registrationScheduler = Some(context.system.scheduler.schedule(
+        TaskManager.REGISTRATION_DELAY, TaskManager.REGISTRATION_INTERVAL,
+        self, RegisterAtJobManager))
   }
 
 
@@ -165,7 +171,7 @@ class TaskManager(val connectionInfo: InstanceConnectionInfo, val jobManagerAkka
 
       if (registered) {
         registrationScheduler.foreach(_.cancel())
-      } else if (registrationAttempts <= MAX_REGISTRATION_ATTEMPTS) {
+      } else if (registrationAttempts <= TaskManager.MAX_REGISTRATION_ATTEMPTS) {
 
         log.info(s"Try to register at master ${jobManagerAkkaURL}. ${registrationAttempts}.
" +
           s"Attempt")
@@ -187,16 +193,13 @@ class TaskManager(val connectionInfo: InstanceConnectionInfo, val jobManagerAkka
         context.watch(currentJobManager)
 
         log.info(s"TaskManager successfully registered at JobManager ${
-          currentJobManager.path
-            .toString
-        }.")
+          currentJobManager.path.toString }.")
 
         setupChannelManager()
         setupLibraryCacheManager(blobPort)
 
-        heartbeatScheduler = Some(context.system.scheduler.schedule(HEARTBEAT_INTERVAL,
-          HEARTBEAT_INTERVAL, self,
-          SendHeartbeat))
+        heartbeatScheduler = Some(context.system.scheduler.schedule(
+            TaskManager.HEARTBEAT_INTERVAL, TaskManager.HEARTBEAT_INTERVAL, self, SendHeartbeat))
 
         profiler foreach {
           _.tell(RegisterProfilingListener, JobManager.getProfiler(currentJobManager))
@@ -218,56 +221,121 @@ class TaskManager(val connectionInfo: InstanceConnectionInfo, val jobManagerAkka
           }
           sender ! new TaskOperationResult(executionID, true)
         case None =>
-          sender ! new TaskOperationResult(executionID, false, "No task with that execution
ID " +
-            "was " +
-            "found.")
+          sender ! new TaskOperationResult(executionID, false,
+                                           "No task with that execution ID was found.")
       }
     }
 
-    case SubmitTask(tdd) => {
-      val jobID = tdd.getJobID
-      val vertexID = tdd.getVertexID
-      val executionID = tdd.getExecutionId
-      val taskIndex = tdd.getIndexInSubtaskGroup
-      val numSubtasks = tdd.getCurrentNumberOfSubtasks
-      var startRegisteringTask = 0L
-      var task: Task = null
+    case UnregisterTask(executionID) => {
+      unregisterTask(executionID)
+    }
 
-      try {
-        if(log.isDebugEnabled){
-          startRegisteringTask = System.currentTimeMillis()
+    case SendHeartbeat => {
+      currentJobManager ! Heartbeat(instanceID)
+    }
+
+    case LogMemoryUsage => {
+      logMemoryStats()
+    }
+
+    case NotifyWhenRegisteredAtJobManager => {
+       if (registered) {
+         sender ! RegisteredAtJobManager
+       } else {
+         waitForRegistration += sender
+      }
+    }
+
+    case FailTask(executionID, cause) => {
+      runningTasks.get(executionID) match {
+        case Some(task) =>
+          Future{
+            task.failExternally(cause)
+          }
+        case None =>
+      }
+    }
+
+    case Terminated(jobManager) => {
+      log.info(s"Job manager ${jobManager.path} is no longer reachable. "
+                 + "Cancelling all tasks and trying to reregister.")
+      
+      cancelAndClearEverything(new Throwable("Lost connection to JobManager"))
+      tryJobManagerRegistration()
+    }
+  }
+
+  def notifyExecutionStateChange(jobID: JobID, executionID: ExecutionAttemptID,
+                                 executionState: ExecutionState,
+                                 optionalError: Throwable): Unit = {
+    log.info(s"Update execution state to ${executionState}.")
+    val futureResponse = (currentJobManager ? UpdateTaskExecutionState(new TaskExecutionState
+    (jobID, executionID, executionState, optionalError)))(timeout)
+
+    futureResponse.mapTo[Boolean].onComplete {
+      case Success(result) =>
+        if(!result){
+          self ! FailTask(executionID, new IllegalStateException("Task has been disposed
on " +
+            "JobManager."))
         }
-        libraryCacheManager.registerTask(jobID, executionID, tdd.getRequiredJarFiles());
 
-        if(log.isDebugEnabled){
-          log.debug(s"Register task ${executionID} took ${(System.currentTimeMillis() -
-            startRegisteringTask)/1000.0}s")
+        if (!result || executionState == ExecutionState.FINISHED || executionState ==
+          ExecutionState.CANCELED || executionState == ExecutionState.FAILED) {
+          self ! UnregisterTask(executionID)
         }
+      case Failure(t) => {
+        log.warning(s"Execution state change notification failed for task ${executionID}
" +
+          s"of job ${jobID}. Cause ${t.getMessage}.")
+        self ! UnregisterTask(executionID)
+      }
+    }
+  }
 
-        val userCodeClassLoader = libraryCacheManager.getClassLoader(jobID)
+  private def submitTask(tdd: TaskDeploymentDescriptor) : Unit = {
+    val jobID = tdd.getJobID
+    val vertexID = tdd.getVertexID
+    val executionID = tdd.getExecutionId
+    val taskIndex = tdd.getIndexInSubtaskGroup
+    val numSubtasks = tdd.getCurrentNumberOfSubtasks
+    var startRegisteringTask = 0L
+    var task: Task = null
 
-        if (userCodeClassLoader == null) {
-          throw new RuntimeException("No user code Classloader available.")
-        }
+    try {
+      if (log.isDebugEnabled) {
+        startRegisteringTask = System.currentTimeMillis()
+      }
+      libraryCacheManager.registerTask(jobID, executionID, tdd.getRequiredJarFiles());
 
-        task = new Task(jobID, vertexID, taskIndex, numSubtasks, executionID,
-          tdd.getTaskName, this)
+      if (log.isDebugEnabled) {
+        log.debug(s"Register task ${executionID} took ${
+          (System.currentTimeMillis() - startRegisteringTask)/1000.0}s")
+      }
 
-        runningTasks.put(executionID, task) match {
-          case Some(_) => throw new RuntimeException(s"TaskManager contains already a
task with " +
-            s"executionID ${executionID}.")
-          case None =>
-        }
+      val userCodeClassLoader = libraryCacheManager.getClassLoader(jobID)
 
-        val splitProvider = new TaskInputSplitProvider(currentJobManager, jobID, vertexID,
+      if (userCodeClassLoader == null) {
+        throw new RuntimeException("No user code Classloader available.")
+      }
+
+      task = new Task(jobID, vertexID, taskIndex, numSubtasks, executionID,
+        tdd.getTaskName, this)
+
+      runningTasks.put(executionID, task) match {
+        case Some(_) => throw new RuntimeException(
+            s"TaskManager contains already a task with executionID ${executionID}.")
+        case None =>
+      }
+
+      val splitProvider = new TaskInputSplitProvider(currentJobManager, jobID, vertexID,
           executionID, timeout)
-        val env = new RuntimeEnvironment(task, tdd, userCodeClassLoader, memoryManager,
+
+      val env = new RuntimeEnvironment(task, tdd, userCodeClassLoader, memoryManager,
           ioManager, splitProvider, currentJobManager, bcVarManager)
 
-        task.setEnvironment(env)
+      task.setEnvironment(env)
 
-        // register the task with the network stack and profilers
-        channelManager match {
+      // register the task with the network stack and profilers
+      channelManager match {
           case Some(cm) => cm.register(task)
           case None => throw new RuntimeException("ChannelManager has not been properly
" +
             "instantiated.")
@@ -311,88 +379,15 @@ class TaskManager(val connectionInfo: InstanceConnectionInfo, val jobManagerAkka
             }
 
             libraryCacheManager.unregisterTask(jobID, executionID)
-          }catch{
+          } catch {
             case t: Throwable => log.error("Error during cleanup of task deployment.",
t)
           }
 
           sender ! new TaskOperationResult(executionID, false, message)
       }
-    }
-
-    case UnregisterTask(executionID) => {
-      unregisterTask(executionID)
-    }
-
-    case SendHeartbeat => {
-      currentJobManager ! Heartbeat(instanceID)
-    }
-
-    case LogMemoryUsage => {
-      memoryMXBean foreach {
-        mxbean =>
-            if(log.isDebugEnabled) {
-              log.debug(TaskManager.getMemoryUsageStatsAsString(mxbean))
-            }
-      }
-
-      gcMXBeans foreach {
-        mxbeans =>
-            if(log.isDebugEnabled) {
-              log.debug(TaskManager.getGarbageCollectorStatsAsString(mxbeans))
-            }
-      }
-    }
-
-    case NotifyWhenRegisteredAtJobManager => {
-      registered match {
-        case true => sender ! RegisteredAtJobManager
-        case false => waitForRegistration += sender
-      }
-    }
-
-    case FailTask(executionID, cause) => {
-      runningTasks.get(executionID) match {
-        case Some(task) =>
-          Future{
-            task.failExternally(cause)
-          }
-        case None =>
-      }
-    }
-
-    case Terminated(jobManager) => {
-      log.info(s"Job manager ${jobManager.path} is no longer reachable. Try to reregister.")
-      tryJobManagerRegistration()
-    }
   }
-
-  def notifyExecutionStateChange(jobID: JobID, executionID: ExecutionAttemptID,
-                                 executionState: ExecutionState,
-                                 optionalError: Throwable): Unit = {
-    log.info(s"Update execution state to ${executionState}.")
-    val futureResponse = (currentJobManager ? UpdateTaskExecutionState(new TaskExecutionState
-    (jobID, executionID, executionState, optionalError)))(timeout)
-
-    futureResponse.mapTo[Boolean].onComplete {
-      case Success(result) =>
-        if(!result){
-          self ! FailTask(executionID, new IllegalStateException("Task has been disposed
on " +
-            "JobManager."))
-        }
-
-        if (!result || executionState == ExecutionState.FINISHED || executionState ==
-          ExecutionState.CANCELED || executionState == ExecutionState.FAILED) {
-          self ! UnregisterTask(executionID)
-        }
-      case Failure(t) => {
-        log.warning(s"Execution state change notification failed for task ${executionID}
" +
-          s"of job ${jobID}. Cause ${t.getMessage}.")
-        self ! UnregisterTask(executionID)
-      }
-    }
-  }
-
-  def setupChannelManager(): Unit = {
+  
+  private def setupChannelManager(): Unit = {
     //shutdown existing channel manager
     channelManager foreach {
       cm =>
@@ -424,12 +419,23 @@ class TaskManager(val connectionInfo: InstanceConnectionInfo, val jobManagerAkka
     }
   }
 
-  def setupLibraryCacheManager(blobPort: Int): Unit = {
-    if(blobPort > 0){
+  private def setupLibraryCacheManager(blobPort: Int): Unit = {
+    // shutdown existing library cache manager
+    if (libraryCacheManager != null) {
+      try {
+        libraryCacheManager.shutdown()
+      }
+      catch {
+        case t: Throwable => log.error(t, "Could not properly shut down LibraryCacheManager.")
+      }
+      libraryCacheManager = null
+    }
+    
+    if (blobPort > 0) {
       val address = new InetSocketAddress(currentJobManager.path.address.host.getOrElse
         ("localhost"), blobPort)
       libraryCacheManager = new BlobLibraryCacheManager(new BlobCache(address), cleanupInterval)
-    }else{
+    } else {
       libraryCacheManager = new FallbackLibraryCacheManager
     }
   }
@@ -437,7 +443,7 @@ class TaskManager(val connectionInfo: InstanceConnectionInfo, val jobManagerAkka
   /**
    * Removes all tasks from this TaskManager.
    */
-  def cancelAndClearEverything(cause: Throwable) {
+  private def cancelAndClearEverything(cause: Throwable) {
     if (runningTasks.size > 0) {
       log.info("Cancelling all computations and discarding all cached data.")
       for (t <- runningTasks.values) {
@@ -447,7 +453,7 @@ class TaskManager(val connectionInfo: InstanceConnectionInfo, val jobManagerAkka
     }
   }
 
-  def unregisterTask(executionID: ExecutionAttemptID): Unit = {
+  private def unregisterTask(executionID: ExecutionAttemptID): Unit = {
     log.info(s"Unregister task with execution ID ${executionID}.")
     runningTasks.remove(executionID) match {
       case Some(task) =>
@@ -460,7 +466,7 @@ class TaskManager(val connectionInfo: InstanceConnectionInfo, val jobManagerAkka
     }
   }
 
-  def removeAllTaskResources(task: Task): Unit = {
+  private def removeAllTaskResources(task: Task): Unit = {
     if(task.getEnvironment != null) {
       try {
         for (entry <- DistributedCache.readFileInfoFromConfig(task.getEnvironment
@@ -483,8 +489,21 @@ class TaskManager(val connectionInfo: InstanceConnectionInfo, val jobManagerAkka
 
     task.unregisterMemoryManager(memoryManager)
   }
+  
+  private def logMemoryStats(): Unit = {
+    if (log.isDebugEnabled) {
+      val memoryMXBean = ManagementFactory.getMemoryMXBean()
+      val gcMXBeans = ManagementFactory.getGarbageCollectorMXBeans().asScala
+      
+      log.debug(TaskManager.getMemoryUsageStatsAsString(memoryMXBean))
+      log.debug(TaskManager.getGarbageCollectorStatsAsString(gcMXBeans))
+    }
+  }
 }
 
+/**
+ * TaskManager companion object. Contains TaskManager executable entry point, command line
parsing, and constants.
+ */
 object TaskManager {
 
   val LOG = LoggerFactory.getLogger(classOf[TaskManager])
@@ -492,7 +511,13 @@ object TaskManager {
 
   val TASK_MANAGER_NAME = "taskmanager"
   val PROFILER_NAME = "profiler"
+    
+  val REGISTRATION_DELAY = 0 seconds
+  val REGISTRATION_INTERVAL = 10 seconds
+  val MAX_REGISTRATION_ATTEMPTS = 10
+  val HEARTBEAT_INTERVAL = 5000 millisecond
 
+  
   def main(args: Array[String]): Unit = {
     val (hostname, port, configuration) = parseArgs(args)
 
@@ -599,7 +624,7 @@ object TaskManager {
 
     val networkConnectionConfiguration = if(localExecution){
       LocalNetworkConfiguration(numBuffers, bufferSize)
-    }else{
+    } else {
       val numInThreads = configuration.getInteger(
         ConfigConstants.TASK_MANAGER_NET_NUM_IN_THREADS_KEY,
         ConfigConstants.DEFAULT_TASK_MANAGER_NET_NUM_IN_THREADS)
@@ -708,7 +733,7 @@ object TaskManager {
     }
   }
 
-  def getMemoryUsageStatsAsString(memoryMXBean: MemoryMXBean): String = {
+  private def getMemoryUsageStatsAsString(memoryMXBean: MemoryMXBean): String = {
     val heap = memoryMXBean.getHeapMemoryUsage()
     val nonHeap = memoryMXBean.getNonHeapMemoryUsage
 
@@ -724,7 +749,7 @@ object TaskManager {
       s"NON HEAP: $nonHeapUsed/$nonHeapCommitted/$nonHeapMax MB (used/committed/max)]"
   }
 
-  def getGarbageCollectorStatsAsString(gcMXBeans: Iterable[GarbageCollectorMXBean]): String
= {
+  private def getGarbageCollectorStatsAsString(gcMXBeans: Iterable[GarbageCollectorMXBean]):
String = {
     val beans = gcMXBeans map {
       bean =>
         s"[${bean.getName}, GC TIME (ms): ${bean.getCollectionTime}, " +

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/972a7b01/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
index 1735f68..c7914fc 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
@@ -104,7 +104,7 @@ public class ExecutionGraphDeploymentTest {
 			ExecutionVertex vertex = ejv.getTaskVertices()[3];
 
 			// create synchronous task manager
-			final TestActorRef simpleTaskManager = TestActorRef.create(system,
+			final TestActorRef<?> simpleTaskManager = TestActorRef.create(system,
 					Props.create(ExecutionGraphTestUtils
 					.SimpleAcknowledgingTaskManager.class));
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/972a7b01/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
index 207be1a..a4adda8 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
@@ -32,6 +32,7 @@ import akka.actor.Status;
 import akka.actor.UntypedActor;
 import akka.japi.Creator;
 import akka.testkit.JavaTestKit;
+
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.instance.AllocatedSlot;
@@ -46,7 +47,9 @@ import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
+@SuppressWarnings("serial")
 public class ExecutionVertexCancelTest {
+	
 	private static ActorSystem system;
 
 	@BeforeClass
@@ -209,7 +212,7 @@ public class ExecutionVertexCancelTest {
 
 					// task manager mock actor
 					// first return NOT SUCCESS (task not found, cancel call overtook deploy call), then
success (cancel call after deploy call)
-					TestActorRef taskManager = TestActorRef.create(system, Props.create(new
+					TestActorRef<?> taskManager = TestActorRef.create(system, Props.create(new
 							CancelSequenceTaskManagerCreator(new
 							TaskOperationResult(execId, false), new TaskOperationResult(execId, true))));
 
@@ -281,7 +284,7 @@ public class ExecutionVertexCancelTest {
 					final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0]);
 					final ExecutionAttemptID execId = vertex.getCurrentExecutionAttempt().getAttemptId();
 
-					final TestActorRef taskManager = TestActorRef.create(system,
+					final TestActorRef<?> taskManager = TestActorRef.create(system,
 							Props.create(new CancelSequenceTaskManagerCreator(new
 									TaskOperationResult(execId, true))));
 
@@ -465,7 +468,6 @@ public class ExecutionVertexCancelTest {
 					final ExecutionJobVertex ejv = getExecutionVertex(jid);
 
 					final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0]);
-					final ExecutionAttemptID execId = vertex.getCurrentExecutionAttempt().getAttemptId();
 
 					final ActorRef taskManager = system.actorOf(Props.create(new CancelSequenceTaskManagerCreator()));
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/972a7b01/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSinkTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSinkTaskTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSinkTaskTest.java
index 3219a21..cb657a0 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSinkTaskTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSinkTaskTest.java
@@ -34,7 +34,6 @@ import org.slf4j.LoggerFactory;
 import org.apache.flink.api.common.typeutils.record.RecordComparatorFactory;
 import org.apache.flink.api.java.record.io.DelimitedOutputFormat;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.operators.testutils.InfiniteInputIterator;
 import org.apache.flink.runtime.operators.testutils.TaskCancelThread;
 import org.apache.flink.runtime.operators.testutils.TaskTestBase;
@@ -54,7 +53,7 @@ public class DataSinkTaskTest extends TaskTestBase
 
 	private static final int NETWORK_BUFFER_SIZE = 1024;
 	
-	private final String tempTestPath = Path.constructTestPath(DataSinkTaskTest.class, "dst_test");
+	private final String tempTestPath = constructTestPath(DataSinkTaskTest.class, "dst_test");
 	
 	@After
 	public void cleanUp() {
@@ -491,5 +490,19 @@ public class DataSinkTaskTest extends TaskTestBase
 		}
 	}
 	
+	public static String constructTestPath(Class<?> forClass, String folder) {
+		// we create test path that depends on class to prevent name clashes when two tests
+		// create temp files with the same name
+		String path = System.getProperty("java.io.tmpdir");
+		if (!(path.endsWith("/") || path.endsWith("\\")) ) {
+			path += System.getProperty("file.separator");
+		}
+		path += (forClass.getName() + "-" + folder);
+		return path;
+	}
+	
+	public static String constructTestURI(Class<?> forClass, String folder) {
+		return new File(constructTestPath(forClass, folder)).toURI().toString();
+	}
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/972a7b01/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSourceTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSourceTaskTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSourceTaskTest.java
index b04ab30..aac7bc9 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSourceTaskTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSourceTaskTest.java
@@ -31,7 +31,6 @@ import java.util.List;
 import org.junit.Assert;
 
 import org.apache.flink.api.java.record.io.DelimitedInputFormat;
-import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.operators.testutils.NirvanaOutputList;
 import org.apache.flink.runtime.operators.testutils.TaskCancelThread;
 import org.apache.flink.runtime.operators.testutils.TaskTestBase;
@@ -50,7 +49,7 @@ public class DataSourceTaskTest extends TaskTestBase {
 
 	private List<Record> outList;
 	
-	private String tempTestPath = Path.constructTestPath(DataSourceTaskTest.class, "dst_test");
+	private String tempTestPath = DataSinkTaskTest.constructTestPath(DataSourceTaskTest.class,
"dst_test");
 	
 	@After
 	public void cleanUp() {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/972a7b01/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
----------------------------------------------------------------------
diff --git a/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
b/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
index fe2d7fc..6c4b228 100644
--- a/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
+++ b/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
@@ -22,6 +22,7 @@ import akka.actor.ActorRef;
 import akka.dispatch.Futures;
 import akka.pattern.Patterns;
 import akka.util.Timeout;
+
 import org.apache.commons.io.FileUtils;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
@@ -29,6 +30,7 @@ import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages;
 import org.apache.hadoop.fs.FileSystem;
 import org.junit.Assert;
+
 import scala.concurrent.Await;
 import scala.concurrent.Future;
 import scala.concurrent.duration.FiniteDuration;
@@ -351,4 +353,19 @@ public class TestBaseUtils {
 			f.delete();
 		}
 	}
+	
+	public static String constructTestPath(Class<?> forClass, String folder) {
+		// we create test path that depends on class to prevent name clashes when two tests
+		// create temp files with the same name
+		String path = System.getProperty("java.io.tmpdir");
+		if (!(path.endsWith("/") || path.endsWith("\\")) ) {
+			path += System.getProperty("file.separator");
+		}
+		path += (forClass.getName() + "-" + folder);
+		return path;
+	}
+	
+	public static String constructTestURI(Class<?> forClass, String folder) {
+		return new File(constructTestPath(forClass, folder)).toURI().toString();
+	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/972a7b01/flink-tests/pom.xml
----------------------------------------------------------------------
diff --git a/flink-tests/pom.xml b/flink-tests/pom.xml
index 0750264..21ccbdb 100644
--- a/flink-tests/pom.xml
+++ b/flink-tests/pom.xml
@@ -126,7 +126,6 @@ under the License.
 		<dependency>
 			<groupId>org.scalatest</groupId>
 			<artifactId>scalatest_2.10</artifactId>
-			<version>2.2.0</version>
 			<scope>test</scope>
 		</dependency>
 	</dependencies>

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/972a7b01/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDanglingPageRank.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDanglingPageRank.java
b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDanglingPageRank.java
index 6327db9..d79b39b 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDanglingPageRank.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDanglingPageRank.java
@@ -24,7 +24,6 @@ import org.apache.flink.api.common.typeutils.TypePairComparatorFactory;
 import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
 import org.apache.flink.api.java.record.io.FileOutputFormat;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.io.network.channels.ChannelType;
 import org.apache.flink.runtime.iterative.task.IterationHeadPactTask;
 import org.apache.flink.runtime.iterative.task.IterationIntermediatePactTask;
@@ -56,6 +55,7 @@ import org.apache.flink.test.iterative.nephele.customdanglingpagerank.types.Vert
 import org.apache.flink.test.iterative.nephele.customdanglingpagerank.types.VertexWithRankSerializerFactory;
 import org.apache.flink.test.iterative.nephele.danglingpagerank.DiffL1NormConvergenceCriterion;
 import org.apache.flink.test.iterative.nephele.danglingpagerank.PageRankStatsAggregator;
+import org.apache.flink.test.util.TestBaseUtils;
 
 public class CustomCompensatableDanglingPageRank {
 	
@@ -101,7 +101,7 @@ public class CustomCompensatableDanglingPageRank {
 		String pageWithRankInputPath = ""; //"file://" + PlayConstants.PLAY_DIR + "test-inputs/danglingpagerank/pageWithRank";
 		String adjacencyListInputPath = ""; //"file://" + PlayConstants.PLAY_DIR +
 //			"test-inputs/danglingpagerank/adjacencylists";
-		String outputPath =  Path.constructTestURI(CustomCompensatableDanglingPageRank.class, "flink_iterations");
+		String outputPath = TestBaseUtils.constructTestURI(CustomCompensatableDanglingPageRank.class,
"flink_iterations");
 //		String confPath = PlayConstants.PLAY_DIR + "local-conf";
 		int minorConsumer = 2;
 		int matchMemory = 5;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/972a7b01/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDanglingPageRankWithCombiner.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDanglingPageRankWithCombiner.java
b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDanglingPageRankWithCombiner.java
index 8bede88..6add8c2 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDanglingPageRankWithCombiner.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDanglingPageRankWithCombiner.java
@@ -24,7 +24,6 @@ import org.apache.flink.api.common.typeutils.TypePairComparatorFactory;
 import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
 import org.apache.flink.api.java.record.io.FileOutputFormat;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.io.network.channels.ChannelType;
 import org.apache.flink.runtime.iterative.task.IterationHeadPactTask;
 import org.apache.flink.runtime.iterative.task.IterationIntermediatePactTask;
@@ -57,6 +56,7 @@ import org.apache.flink.test.iterative.nephele.customdanglingpagerank.types.Vert
 import org.apache.flink.test.iterative.nephele.customdanglingpagerank.types.VertexWithRankSerializerFactory;
 import org.apache.flink.test.iterative.nephele.danglingpagerank.DiffL1NormConvergenceCriterion;
 import org.apache.flink.test.iterative.nephele.danglingpagerank.PageRankStatsAggregator;
+import org.apache.flink.test.util.TestBaseUtils;
 
 public class CustomCompensatableDanglingPageRankWithCombiner {
 	
@@ -102,7 +102,7 @@ public class CustomCompensatableDanglingPageRankWithCombiner {
 		String pageWithRankInputPath = ""; //"file://" + PlayConstants.PLAY_DIR + "test-inputs/danglingpagerank/pageWithRank";
 		String adjacencyListInputPath = ""; //"file://" + PlayConstants.PLAY_DIR +
 //			"test-inputs/danglingpagerank/adjacencylists";
-		String outputPath =  Path.constructTestURI(CustomCompensatableDanglingPageRankWithCombiner.class,
"flink_iterations");
+		String outputPath =  TestBaseUtils.constructTestURI(CustomCompensatableDanglingPageRankWithCombiner.class,
"flink_iterations");
 		int minorConsumer = 2;
 		int matchMemory = 5;
 		int coGroupSortMemory = 5;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/972a7b01/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java
index 6cd484d..8fb61af 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java
@@ -34,14 +34,11 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
 import org.apache.flink.test.javaApiOperators.util.CollectionDataSets.CrazyNested;
 import org.apache.flink.test.javaApiOperators.util.CollectionDataSets.CustomType;
-import org.apache.flink.test.javaApiOperators.util.CollectionDataSets.FromTuple;
 import org.apache.flink.test.javaApiOperators.util.CollectionDataSets.FromTupleWithCTor;
 import org.apache.flink.test.javaApiOperators.util.CollectionDataSets.POJO;
 import org.apache.flink.test.javaApiOperators.util.CollectionDataSets.PojoContainingTupleAndWritable;
 import org.apache.flink.test.util.MultipleProgramsTestBase;
 import org.apache.flink.util.Collector;
-import org.hamcrest.core.IsEqual;
-import org.hamcrest.core.IsNot;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Rule;
@@ -302,14 +299,13 @@ public class GroupReduceITCase extends MultipleProgramsTestBase {
 				"23,2,Hi again!\n";
 	}
 
-	@Test public void testCorrectnessOfGroupReduceOnCustomTypeWithKeyExtractorAndCombine() throws
-			Exception {
+	@Test
+	public void testCorrectnessOfGroupReduceOnCustomTypeWithKeyExtractorAndCombine()
+			throws Exception {
 		/*
 		 * check correctness of groupReduce on custom type with key extractor and combine
 		 */
-
-		org.junit.Assume.assumeThat(mode, new IsNot(new IsEqual<ExecutionMode>(ExecutionMode
-				.COLLECTION)));
+		org.junit.Assume.assumeTrue(mode != ExecutionMode.COLLECTION);
 
 		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 
@@ -339,11 +335,11 @@ public class GroupReduceITCase extends MultipleProgramsTestBase {
 
 	@Test
 	public void testCorrectnessOfGroupReduceOnTuplesWithCombine() throws Exception {
+		
 		/*
 		 * check correctness of groupReduce on tuples with combine
 		 */
-		org.junit.Assume.assumeThat(mode, new IsNot(new IsEqual<ExecutionMode>(ExecutionMode
-				.COLLECTION)));
+		org.junit.Assume.assumeTrue(mode != ExecutionMode.COLLECTION);
 
 		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 		env.setDegreeOfParallelism(2); // important because it determines how often the combiner
is called
@@ -365,11 +361,11 @@ public class GroupReduceITCase extends MultipleProgramsTestBase {
 
 	@Test
 	public void testCorrectnessOfAllGroupReduceForTuplesWithCombine() throws Exception {
+		
 		/*
 		 * check correctness of all-groupreduce for tuples with combine
 		 */
-		org.junit.Assume.assumeThat(mode, new IsNot(new IsEqual<ExecutionMode>(ExecutionMode
-				.COLLECTION)));
+		org.junit.Assume.assumeTrue(mode != ExecutionMode.COLLECTION);
 
 		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 
@@ -525,16 +521,10 @@ public class GroupReduceITCase extends MultipleProgramsTestBase {
 	}
 
 	public static class GroupReducer2 implements GroupReduceFunction<FromTupleWithCTor, Integer>
{
-		private static final long serialVersionUID = 1L;
+
 		@Override
-		public void reduce(Iterable<FromTupleWithCTor> values,
-				Collector<Integer> out)
-		throws Exception {
-			int c = 0;
-			for(FromTuple v : values) {
-				c++;
-			}
-			out.collect(c);
+		public void reduce(Iterable<FromTupleWithCTor> values, Collector<Integer> out)
{
+			out.collect(countElements(values));
 		}
 	}
 
@@ -556,17 +546,10 @@ public class GroupReduceITCase extends MultipleProgramsTestBase {
 	}
 
 	public static class GroupReducer3 implements GroupReduceFunction<PojoContainingTupleAndWritable,
Integer> {
-		private static final long serialVersionUID = 1L;
-								
+		
 		@Override
-		public void reduce(Iterable<PojoContainingTupleAndWritable> values,
-				Collector<Integer> out)
-		throws Exception {
-			int c = 0;
-			for(PojoContainingTupleAndWritable v : values) {
-				c++;
-			}
-			out.collect(c);
+		public void reduce(Iterable<PojoContainingTupleAndWritable> values, Collector<Integer>
out) {
+			out.collect(countElements(values));
 		}
 	}
 
@@ -590,14 +573,8 @@ public class GroupReduceITCase extends MultipleProgramsTestBase {
 	public static class GroupReducer4 implements GroupReduceFunction<Tuple3<Integer,CrazyNested,
POJO>, Integer> {
 		private static final long serialVersionUID = 1L;
 		@Override
-		public void reduce(Iterable<Tuple3<Integer,CrazyNested, POJO>> values,
-				Collector<Integer> out)
-		throws Exception {
-			int c = 0;
-			for(Tuple3<Integer,CrazyNested, POJO> v : values) {
-				c++;
-			}
-			out.collect(c);
+		public void reduce(Iterable<Tuple3<Integer,CrazyNested, POJO>> values, Collector<Integer>
out) {
+			out.collect(countElements(values));
 		}
 	}
 
@@ -1188,8 +1165,15 @@ public class GroupReduceITCase extends MultipleProgramsTestBase {
 	}
 	
 	public static final class IdentityMapper<T> extends RichMapFunction<T, T> {
-
 		@Override
 		public T map(T value) { return value; }
 	}
+	
+	private static int countElements(Iterable<?> iterable) {
+		int c = 0;
+		for (@SuppressWarnings("unused") Object o : iterable) {
+			c++;
+		}
+		return c;
+	}
 }


Mime
View raw message