flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [1/4] incubator-flink git commit: [FLINK-1349] [runtime] Various cleanups to make scala runtime code interact smoother with java
Date Tue, 06 Jan 2015 10:45:28 GMT
Repository: incubator-flink
Updated Branches:
  refs/heads/master 24c47362e -> 2b86e9536


[FLINK-1349] [runtime] Various cleanups to make scala runtime code interact smoother with
java


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/cec30ff4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/cec30ff4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/cec30ff4

Branch: refs/heads/master
Commit: cec30ff4f1b75a9a4642819ac28c1ee1939d4343
Parents: 972a7b0
Author: Stephan Ewen <sewen@apache.org>
Authored: Tue Jan 6 00:40:38 2015 +0100
Committer: Stephan Ewen <sewen@apache.org>
Committed: Tue Jan 6 11:07:02 2015 +0100

----------------------------------------------------------------------
 .../flink/runtime/executiongraph/Execution.java | 11 ++++----
 .../runtime/messages/TaskManagerMessages.scala  | 27 ++++++++++++++++++-
 .../flink/runtime/taskmanager/TaskManager.scala | 17 +++++++++---
 .../runtime/taskmanager/TaskManagerTest.java    | 28 +++++++++++---------
 .../testingUtils/TestingTaskManager.scala       |  8 +++++-
 .../TestingTaskManagerMessages.scala            | 28 +++++++++++++++++---
 6 files changed, 91 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/cec30ff4/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index 9100000..e7f4333 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -86,8 +86,8 @@ public class Execution implements Serializable {
 	
 	private static final int NUM_CANCEL_CALL_TRIES = 3;
 
-	public static FiniteDuration timeout = new FiniteDuration(ConfigConstants
-			.DEFAULT_AKKA_ASK_TIMEOUT, TimeUnit.SECONDS);
+	public static FiniteDuration timeout = new FiniteDuration(
+			ConfigConstants.DEFAULT_AKKA_ASK_TIMEOUT, TimeUnit.SECONDS);
 	
 	// --------------------------------------------------------------------------------------------
 	
@@ -289,9 +289,9 @@ public class Execution implements Serializable {
 
 				@Override
 				public void onComplete(Throwable failure, Object success) throws Throwable {
-					if(failure != null){
+					if (failure != null) {
 						markFailed(failure);
-					}else{
+					} else {
 						TaskOperationResult result = (TaskOperationResult) success;
 						if (success == null) {
 							markFailed(new Exception("Failed to deploy the task to slot " + slot + ": TaskOperationResult
was null"));
@@ -305,8 +305,7 @@ public class Execution implements Serializable {
 						else {
 							// deployment failed :(
 							markFailed(new Exception("Failed to deploy the task " +
-									getVertexWithAttempt() + " to slot " + slot + ": " + result
-									.description()));
+									getVertexWithAttempt() + " to slot " + slot + ": " + result.description()));
 						}
 					}
 				}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/cec30ff4/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskManagerMessages.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskManagerMessages.scala
b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskManagerMessages.scala
index aad9efe..f69b629 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskManagerMessages.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskManagerMessages.scala
@@ -21,10 +21,11 @@ package org.apache.flink.runtime.messages
 import org.apache.flink.core.io.InputSplit
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID
-import org.apache.flink.runtime.instance.InstanceID
+import org.apache.flink.runtime.instance.InstanceID 
 
 object TaskManagerMessages {
 
+  
   /**
    * Cancels the task associated with [[attemptID]]. The result is sent back to the sender
as a
    * [[TaskOperationResult]] message.
@@ -113,4 +114,28 @@ object TaskManagerMessages {
    * @param cause reason for the external failure
    */
   case class FailTask(executionID: ExecutionAttemptID, cause: Throwable)
+  
+    // --------------------------------------------------------------------------
+  // Utility methods to allow simpler case object access from Java
+  // --------------------------------------------------------------------------
+  
+  def getNotifyWhenRegisteredAtJobManagerMessage() : AnyRef = {
+    NotifyWhenRegisteredAtJobManager
+  }
+  
+  def getRegisteredAtJobManagerMessage() : AnyRef = {
+    RegisteredAtJobManager
+  }
+  
+  def getRegisterAtJobManagerMessage() : AnyRef = {
+    RegisterAtJobManager
+  }
+
+  def getSendHeartbeatMessage() : AnyRef = {
+    SendHeartbeat
+  }
+
+  def getLogMemoryUsageMessage() : AnyRef = {
+    RegisteredAtJobManager
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/cec30ff4/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index 5b6ee86..4676ae9 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -171,14 +171,16 @@ class TaskManager(val connectionInfo: InstanceConnectionInfo, val jobManagerAkka
 
       if (registered) {
         registrationScheduler.foreach(_.cancel())
-      } else if (registrationAttempts <= TaskManager.MAX_REGISTRATION_ATTEMPTS) {
+      }
+      else if (registrationAttempts <= TaskManager.MAX_REGISTRATION_ATTEMPTS) {
 
         log.info(s"Try to register at master ${jobManagerAkkaURL}. ${registrationAttempts}.
" +
           s"Attempt")
         val jobManager = context.actorSelection(jobManagerAkkaURL)
 
         jobManager ! RegisterTaskManager(connectionInfo, hardwareDescription, numberOfSlots)
-      } else {
+      }
+      else {
         log.error("TaskManager could not register at JobManager.");
         self ! PoisonPill
       }
@@ -212,6 +214,10 @@ class TaskManager(val connectionInfo: InstanceConnectionInfo, val jobManagerAkka
         waitForRegistration.clear()
       }
     }
+    
+    case SubmitTask(tdd) => {
+      submitTask(tdd)
+    }
 
     case CancelTask(executionID) => {
       runningTasks.get(executionID) match {
@@ -502,7 +508,8 @@ class TaskManager(val connectionInfo: InstanceConnectionInfo, val jobManagerAkka
 }
 
 /**
- * TaskManager companion object. Contains TaskManager executable entry point, command line
parsing, and constants.
+ * TaskManager companion object. Contains TaskManager executable entry point, command
+ * line parsing, and constants.
  */
 object TaskManager {
 
@@ -749,7 +756,9 @@ object TaskManager {
       s"NON HEAP: $nonHeapUsed/$nonHeapCommitted/$nonHeapMax MB (used/committed/max)]"
   }
 
-  private def getGarbageCollectorStatsAsString(gcMXBeans: Iterable[GarbageCollectorMXBean]):
String = {
+  private def getGarbageCollectorStatsAsString(gcMXBeans: Iterable[GarbageCollectorMXBean])

+    : String =
+  {
     val beans = gcMXBeans map {
       bean =>
         s"[${bean.getName}, GC TIME (ms): ${bean.getCollectionTime}, " +

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/cec30ff4/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
index 500532d..4355298 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
@@ -26,6 +26,7 @@ import akka.japi.Creator;
 import akka.pattern.Patterns;
 import akka.testkit.JavaTestKit;
 import akka.util.Timeout;
+
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.GlobalConfiguration;
@@ -46,15 +47,16 @@ import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.messages.JobManagerMessages;
 import org.apache.flink.runtime.messages.RegistrationMessages;
 import org.apache.flink.runtime.messages.TaskManagerMessages.CancelTask;
-import org.apache.flink.runtime.messages.TaskManagerMessages.NotifyWhenRegisteredAtJobManager$;
 import org.apache.flink.runtime.messages.TaskManagerMessages.SubmitTask;
 import org.apache.flink.runtime.messages.TaskManagerMessages.TaskOperationResult;
+import org.apache.flink.runtime.messages.TaskManagerMessages;
 import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.runtime.types.IntegerRecord;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
+
 import scala.concurrent.Await;
 import scala.concurrent.Future;
 import scala.concurrent.duration.FiniteDuration;
@@ -162,8 +164,8 @@ public class TaskManagerTest {
 
 							expectMsgEquals(new TaskOperationResult(eid1, true));
 							expectMsgEquals(new TaskOperationResult(eid2, true));
-
-							tm.tell(TestingTaskManagerMessages.RequestRunningTasks$.MODULE$, getRef());
+							
+							tm.tell(TestingTaskManagerMessages.getRequestRunningTasksMessage(), getRef());
 
 							Map<ExecutionAttemptID, Task> runningTasks = expectMsgClass(TestingTaskManagerMessages
 									.ResponseRunningTasks.class).asJava();
@@ -187,7 +189,7 @@ public class TaskManagerTest {
 
 							assertEquals(ExecutionState.CANCELED, t1.getExecutionState());
 
-							tm.tell(TestingTaskManagerMessages.RequestRunningTasks$.MODULE$, getRef());
+							tm.tell(TestingTaskManagerMessages.getRequestRunningTasksMessage(), getRef());
 							runningTasks = expectMsgClass(TestingTaskManagerMessages
 									.ResponseRunningTasks.class).asJava();
 
@@ -206,7 +208,7 @@ public class TaskManagerTest {
 
 							assertEquals(ExecutionState.CANCELED, t2.getExecutionState());
 
-							tm.tell(TestingTaskManagerMessages.RequestRunningTasks$.MODULE$, getRef());
+							tm.tell(TestingTaskManagerMessages.getRequestRunningTasksMessage(), getRef());
 							runningTasks = expectMsgClass(TestingTaskManagerMessages
 									.ResponseRunningTasks.class).asJava();
 
@@ -276,7 +278,7 @@ public class TaskManagerTest {
 							expectMsgEquals(true);
 							expectMsgEquals(true);
 
-							tm.tell(TestingTaskManagerMessages.RequestRunningTasks$.MODULE$, getRef());
+							tm.tell(TestingTaskManagerMessages.getRequestRunningTasksMessage(), getRef());
 							Map<ExecutionAttemptID, Task> tasks = expectMsgClass(TestingTaskManagerMessages
 									.ResponseRunningTasks.class).asJava();
 
@@ -337,7 +339,7 @@ public class TaskManagerTest {
 						tm.tell(new SubmitTask(tdd1), getRef());
 						expectMsgEquals(new TaskOperationResult(eid1, true));
 
-						tm.tell(TestingTaskManagerMessages.RequestRunningTasks$.MODULE$, getRef());
+						tm.tell(TestingTaskManagerMessages.getRequestRunningTasksMessage(), getRef());
 						Map<ExecutionAttemptID, Task> tasks = expectMsgClass(TestingTaskManagerMessages.ResponseRunningTasks
 								.class).asJava();
 
@@ -359,7 +361,7 @@ public class TaskManagerTest {
 				assertEquals(ExecutionState.FINISHED, t2.getExecutionState());
 			}
 
-						tm.tell(TestingTaskManagerMessages.RequestRunningTasks$.MODULE$, getRef());
+						tm.tell(TestingTaskManagerMessages.getRequestRunningTasksMessage(), getRef());
 						tasks = expectMsgClass(TestingTaskManagerMessages.ResponseRunningTasks
 								.class).asJava();
 
@@ -424,7 +426,7 @@ public class TaskManagerTest {
 						expectMsgEquals(new TaskOperationResult(eid2, true));
 						expectMsgEquals(new TaskOperationResult(eid1, true));
 
-						tm.tell(TestingTaskManagerMessages.RequestRunningTasks$.MODULE$, getRef());
+						tm.tell(TestingTaskManagerMessages.getRequestRunningTasksMessage(), getRef());
 						Map<ExecutionAttemptID, Task> tasks = expectMsgClass(TestingTaskManagerMessages
 								.ResponseRunningTasks.class).asJava();
 
@@ -450,7 +452,7 @@ public class TaskManagerTest {
 							Await.ready(response, d);
 						}
 
-						tm.tell(TestingTaskManagerMessages.RequestRunningTasks$.MODULE$, getRef());
+						tm.tell(TestingTaskManagerMessages.getRequestRunningTasksMessage(), getRef());
 						tasks = expectMsgClass(TestingTaskManagerMessages
 								.ResponseRunningTasks.class).asJava();
 
@@ -514,6 +516,7 @@ public class TaskManagerTest {
 		}
 	}
 
+	@SuppressWarnings("serial")
 	public static class SimpleLookupJobManagerCreator implements Creator<SimpleLookupJobManager>{
 		private final ChannelID receiverID;
 
@@ -527,6 +530,7 @@ public class TaskManagerTest {
 		}
 	}
 
+	@SuppressWarnings("serial")
 	public static class SimpleLookupFailingUpdateJobManagerCreator implements
 			Creator<SimpleLookupFailingUpdateJobManager>{
 		private final ChannelID receiverID;
@@ -550,8 +554,8 @@ public class TaskManagerTest {
 
 		ActorRef taskManager = TestingUtils.startTestingTaskManagerWithConfiguration("localhost",
cfg, system);
 
-		Future<Object> response = Patterns.ask(taskManager, NotifyWhenRegisteredAtJobManager$.MODULE$,
-				timeout);
+		Future<Object> response = Patterns.ask(taskManager, 
+				TaskManagerMessages.getNotifyWhenRegisteredAtJobManagerMessage(), timeout);
 
 		try {
 			FiniteDuration d = new FiniteDuration(20, TimeUnit.SECONDS);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/cec30ff4/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala
index 080af11..b1aa437 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala
@@ -39,8 +39,10 @@ trait TestingTaskManager extends ActorLogMessages {
   }
 
   def receiveTestMessages: Receive = {
+    
     case RequestRunningTasks =>
       sender ! ResponseRunningTasks(runningTasks.toMap)
+      
     case NotifyWhenTaskRemoved(executionID) =>
       runningTasks.get(executionID) match {
         case Some(_) =>
@@ -48,16 +50,19 @@ trait TestingTaskManager extends ActorLogMessages {
           waitForRemoval += (executionID -> (set + sender))
         case None => sender ! true
       }
+      
     case UnregisterTask(executionID) =>
       super.receiveWithLogMessages(UnregisterTask(executionID))
       waitForRemoval.get(executionID) match {
         case Some(actors) => for(actor <- actors) actor ! true
         case None =>
       }
+      
     case RequestBroadcastVariablesWithReferences => {
       sender ! ResponseBroadcastVariablesWithReferences(
         bcVarManager.getNumberOfVariablesWithReferences)
     }
+    
     case NotifyWhenJobRemoved(jobID) => {
       if(runningTasks.values.exists(_.getJobID == jobID)){
         val set = waitForJobRemoval.getOrElse(jobID, Set())
@@ -71,13 +76,14 @@ trait TestingTaskManager extends ActorLogMessages {
         }
       }
     }
+    
     case CheckIfJobRemoved(jobID) => {
       if(runningTasks.values.forall(_.getJobID != jobID)){
         waitForJobRemoval.get(jobID) match {
           case Some(listeners) => listeners foreach (_ ! true)
           case None =>
         }
-      }else{
+      } else {
         import context.dispatcher
         context.system.scheduler.scheduleOnce(200 milliseconds, this.self, CheckIfJobRemoved(jobID))
       }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/cec30ff4/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManagerMessages.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManagerMessages.scala
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManagerMessages.scala
index cb5282e..38cc829 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManagerMessages.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManagerMessages.scala
@@ -22,16 +22,36 @@ import org.apache.flink.runtime.executiongraph.ExecutionAttemptID
 import org.apache.flink.runtime.jobgraph.JobID
 import org.apache.flink.runtime.taskmanager.Task
 
-object TestingTaskManagerMessages{
+/**
+ * Additional messages that the [[TestingTaskManager]] understands.
+ */
+object TestingTaskManagerMessages {
+  
   case class NotifyWhenTaskRemoved(executionID: ExecutionAttemptID)
-
-  case object RequestRunningTasks
+  
   case class ResponseRunningTasks(tasks: Map[ExecutionAttemptID, Task]){
     import collection.JavaConverters._
     def asJava: java.util.Map[ExecutionAttemptID, Task] = tasks.asJava
   }
-  case object RequestBroadcastVariablesWithReferences
+  
   case class ResponseBroadcastVariablesWithReferences(number: Int)
 
   case class CheckIfJobRemoved(jobID: JobID)
+  
+  case object RequestRunningTasks
+  
+  case object RequestBroadcastVariablesWithReferences
+  
+  // --------------------------------------------------------------------------
+  // Utility methods to allow simpler case object access from Java
+  // --------------------------------------------------------------------------
+  
+  def getRequestRunningTasksMessage() : AnyRef = {
+    RequestRunningTasks
+  }
+  
+  def getRequestBroadcastVariablesWithReferencesMessage() : AnyRef = {
+    RequestBroadcastVariablesWithReferences
+  }
 }
+


Mime
View raw message