flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From m..@apache.org
Subject [1/2] flink git commit: [tests] add flag for synchronous execution of futures in TestingCluster
Date Tue, 21 Jul 2015 14:35:29 GMT
Repository: flink
Updated Branches:
  refs/heads/master a2eb6cc87 -> 0f589aad8


[tests] add flag for synchronous execution of futures in TestingCluster

This adds a flag to TestingCluster to disabled asynchronous execution of
futures via the CallingThreadDispatcher.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0f45f2b7
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0f45f2b7
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0f45f2b7

Branch: refs/heads/master
Commit: 0f45f2b73445352bb80fed72474c9ffffdaceb3b
Parents: a2eb6cc
Author: Maximilian Michels <mxm@apache.org>
Authored: Tue Jul 21 11:50:56 2015 +0200
Committer: Maximilian Michels <mxm@apache.org>
Committed: Tue Jul 21 16:29:58 2015 +0200

----------------------------------------------------------------------
 .../runtime/testingUtils/TestingCluster.scala   | 26 ++++++++++++++++----
 1 file changed, 21 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/0f45f2b7/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
index f2535fa..ce0ef8d 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.testingUtils
 
 import akka.actor.{ActorRef, Props, ActorSystem}
+import akka.testkit.CallingThreadDispatcher
 import org.apache.flink.configuration.{ConfigConstants, Configuration}
 import org.apache.flink.runtime.StreamingMode
 import org.apache.flink.runtime.jobmanager.{MemoryArchivist, JobManager}
@@ -36,14 +37,22 @@ import org.apache.flink.runtime.taskmanager.TaskManager
  */
 class TestingCluster(userConfiguration: Configuration,
                      singleActorSystem: Boolean,
+                     synchronousDispatcher: Boolean,
                      streamingMode: StreamingMode)
-  extends FlinkMiniCluster(userConfiguration, singleActorSystem, streamingMode) {
+  extends FlinkMiniCluster(userConfiguration,
+                           singleActorSystem,
+                           streamingMode) {
   
 
-  def this(userConfiguration: Configuration, singleActorSystem: Boolean) 
-        = this(userConfiguration, singleActorSystem, StreamingMode.BATCH_ONLY)
+  def this(userConfiguration: Configuration,
+           singleActorSystem: Boolean,
+           synchronousDispatcher: Boolean)
+       = this(userConfiguration, singleActorSystem, synchronousDispatcher, StreamingMode.BATCH_ONLY)
 
-  def this(userConfiguration: Configuration) = this(userConfiguration, true)
+  def this(userConfiguration: Configuration, singleActorSystem: Boolean)
+       = this(userConfiguration, singleActorSystem, false)
+
+  def this(userConfiguration: Configuration) = this(userConfiguration, true, false)
   
   // --------------------------------------------------------------------------
   
@@ -87,7 +96,14 @@ class TestingCluster(userConfiguration: Configuration,
         streamingMode)
       with TestingJobManager)
 
-    actorSystem.actorOf(jobManagerProps, JobManager.JOB_MANAGER_NAME)
+    val dispatcherJobManagerProps = if (synchronousDispatcher) {
+      // disable asynchronous futures (e.g. accumulator update in Heartbeat)
+      jobManagerProps.withDispatcher(CallingThreadDispatcher.Id)
+    } else {
+      jobManagerProps
+    }
+
+    actorSystem.actorOf(dispatcherJobManagerProps, JobManager.JOB_MANAGER_NAME)
   }
 
   override def startTaskManager(index: Int, system: ActorSystem) = {


Mime
View raw message