flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From trohrm...@apache.org
Subject [2/3] flink git commit: [FLINK-1554] [runtime] Allows the LocalFlinkMiniCluster to start multiple TaskManager in the same ActorSystem.
Date Tue, 17 Feb 2015 12:17:24 GMT
[FLINK-1554] [runtime] Allows the LocalFlinkMiniCluster to start multiple TaskManager in the
same ActorSystem.

This closes #403.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/649f1583
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/649f1583
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/649f1583

Branch: refs/heads/master
Commit: 649f158391aebcefcaabcdaa02d5c8b95be45777
Parents: 447ce0a
Author: Till Rohrmann <trohrmann@apache.org>
Authored: Mon Feb 16 15:04:42 2015 +0100
Committer: Till Rohrmann <trohrmann@apache.org>
Committed: Tue Feb 17 11:53:36 2015 +0100

----------------------------------------------------------------------
 .../flink/runtime/jobmanager/JobManager.scala   |  2 +-
 .../runtime/minicluster/FlinkMiniCluster.scala  | 11 +++
 .../minicluster/LocalFlinkMiniCluster.scala     |  7 ++
 .../flink/runtime/taskmanager/TaskManager.scala | 31 ++++---
 .../runtime/testingUtils/TestingCluster.scala   |  3 +-
 flink-tests/pom.xml                             |  5 ++
 .../LocalFlinkMiniClusterITCase.java            | 88 ++++++++++++++++++++
 .../scala/org/apache/flink/yarn/YarnUtils.scala |  5 +-
 8 files changed, 136 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/649f1583/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index b490b32..bfce7a2 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -664,7 +664,7 @@ object JobManager {
       if(executionMode.equals(LOCAL)){
         LOG.info("Starting embedded TaskManager for JobManager's LOCAL mode execution")
 
-        TaskManager.startActorWithConfiguration("", configuration,
+        TaskManager.startActorWithConfiguration("", TaskManager.TASK_MANAGER_NAME, configuration,
           localAkkaCommunication = false, localTaskManagerCommunication = true)(jobManagerSystem)
       }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/649f1583/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
index 6eea21c..3eb1d1e 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.minicluster
 
 import java.net.InetAddress
+import akka.pattern.Patterns.gracefulStop
 
 import akka.pattern.ask
 import akka.actor.{ActorRef, ActorSystem}
@@ -131,6 +132,16 @@ abstract class FlinkMiniCluster(userConfiguration: Configuration,
   }
 
   def shutdown(): Unit = {
+    val futures = taskManagerActors map {
+        gracefulStop(_, timeout)
+    }
+
+    val future = gracefulStop(jobManagerActor, timeout)
+
+    implicit val executionContext = AkkaUtils.globalExecutionContext
+
+    Await.ready(Future.sequence(future +: futures), timeout)
+
     if(!singleActorSystem){
       taskManagerActorSystems foreach {
         _.shutdown()

http://git-wip-us.apache.org/repos/asf/flink/blob/649f1583/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
index 8b16969..88006ac 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
@@ -90,7 +90,14 @@ class LocalFlinkMiniCluster(userConfiguration: Configuration, singleActorSystem:
 
     val localExecution = numTaskManagers == 1
 
+    val taskManagerName = if(singleActorSystem) {
+      TaskManager.TASK_MANAGER_NAME + "_" + (index + 1)
+    } else {
+      TaskManager.TASK_MANAGER_NAME
+    }
+
     TaskManager.startActorWithConfiguration(HOSTNAME,
+      taskManagerName,
       config,
       singleActorSystem,
       localExecution)(system)

http://git-wip-us.apache.org/repos/asf/flink/blob/649f1583/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index 5a647d1..6d610a4 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -711,18 +711,19 @@ object TaskManager {
       LOG.info("Security is enabled. Starting secure TaskManager.")
       SecurityUtils.runSecured(new FlinkSecuredRunner[Unit] {
         override def run(): Unit = {
-          startActor(hostname, port, configuration)
+          startActor(hostname, port, configuration, TaskManager.TASK_MANAGER_NAME)
         }
       })
     } else {
-      startActor(hostname, port, configuration)
+      startActor(hostname, port, configuration, TaskManager.TASK_MANAGER_NAME)
     }
   }
 
-  def startActor(hostname: String, port: Int, configuration: Configuration) : Unit = {
+  def startActor(hostname: String, port: Int, configuration: Configuration,
+                 taskManagerName: String) : Unit = {
 
     val (taskManagerSystem, _) = startActorSystemAndActor(hostname, port, configuration,
-      localAkkaCommunication = false, localTaskManagerCommunication = false)
+      taskManagerName, localAkkaCommunication = false, localTaskManagerCommunication = false)
 
     taskManagerSystem.awaitTermination()
   }
@@ -780,6 +781,7 @@ object TaskManager {
   }
 
   def startActorSystemAndActor(hostname: String, port: Int, configuration: Configuration,
+                               taskManagerName: String,
                                localAkkaCommunication: Boolean,
                                localTaskManagerCommunication: Boolean): (ActorSystem, ActorRef)
= {
     implicit val actorSystem = AkkaUtils.createActorSystem(configuration, Some((hostname,
port)))
@@ -788,7 +790,7 @@ object TaskManager {
       parseConfiguration(hostname, configuration, localAkkaCommunication,
         localTaskManagerCommunication)
 
-    (actorSystem, startActor(connectionInfo, jobManagerURL, taskManagerConfig,
+    (actorSystem, startActor(taskManagerName, connectionInfo, jobManagerURL, taskManagerConfig,
       networkConfig))
   }
 
@@ -916,19 +918,23 @@ object TaskManager {
     (connectionInfo, jobManagerURL, taskManagerConfig, networkConfig)
   }
 
-  def startActor(connectionInfo: InstanceConnectionInfo, jobManagerURL: String,
+  def startActor(taskManagerName: String,
+                 connectionInfo: InstanceConnectionInfo,
+                 jobManagerURL: String,
                  taskManagerConfig: TaskManagerConfiguration,
                  networkConfig: NetworkEnvironmentConfiguration)
                 (implicit actorSystem: ActorSystem): ActorRef = {
-    startActor(Props(new TaskManager(connectionInfo, jobManagerURL, taskManagerConfig,
-      networkConfig)))
+    startActor(taskManagerName,
+      Props(new TaskManager(connectionInfo, jobManagerURL, taskManagerConfig, networkConfig)))
   }
 
-  def startActor(props: Props)(implicit actorSystem: ActorSystem): ActorRef = {
-    actorSystem.actorOf(props, TASK_MANAGER_NAME)
+  def startActor(taskManagerName: String, props: Props)
+                (implicit actorSystem: ActorSystem): ActorRef = {
+    actorSystem.actorOf(props, taskManagerName)
   }
 
-  def startActorWithConfiguration(hostname: String, configuration: Configuration,
+  def startActorWithConfiguration(hostname: String, taskManagerName: String,
+                                  configuration: Configuration,
                                   localAkkaCommunication: Boolean,
                                   localTaskManagerCommunication: Boolean)
                                  (implicit system: ActorSystem) = {
@@ -936,7 +942,8 @@ object TaskManager {
       parseConfiguration(hostname, configuration, localAkkaCommunication,
         localTaskManagerCommunication)
 
-    startActor(connectionInfo, jobManagerURL, taskManagerConfig, networkConnectionConfiguration)
+    startActor(taskManagerName, connectionInfo, jobManagerURL, taskManagerConfig,
+      networkConnectionConfiguration)
   }
 
   def startProfiler(instancePath: String, reportInterval: Long)(implicit system: ActorSystem):

http://git-wip-us.apache.org/repos/asf/flink/blob/649f1583/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
index e2660d5..9d132b6 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
@@ -68,7 +68,8 @@ FlinkMiniCluster(userConfiguration, singleActorSystem) {
         localAkkaCommunication = singleActorSystem, localTaskManagerCommunication = true)
 
     system.actorOf(Props(new TaskManager(connectionInfo, jobManagerURL, taskManagerConfig,
-      networkConnectionConfig) with TestingTaskManager), TaskManager.TASK_MANAGER_NAME +
index)
+      networkConnectionConfig) with TestingTaskManager), TaskManager.TASK_MANAGER_NAME +
"_" +
+      (index + 1))
   }
 
   def restartJobManager(): Unit = {

http://git-wip-us.apache.org/repos/asf/flink/blob/649f1583/flink-tests/pom.xml
----------------------------------------------------------------------
diff --git a/flink-tests/pom.xml b/flink-tests/pom.xml
index 985c675..33ca642 100644
--- a/flink-tests/pom.xml
+++ b/flink-tests/pom.xml
@@ -135,6 +135,11 @@ under the License.
 			<artifactId>scalatest_2.10</artifactId>
 			<scope>test</scope>
 		</dependency>
+
+		<dependency>
+			<groupId>com.typesafe.akka</groupId>
+			<artifactId>akka-testkit_2.10</artifactId>
+		</dependency>
 		
 		<dependency>
 			<groupId>joda-time</groupId>

http://git-wip-us.apache.org/repos/asf/flink/blob/649f1583/flink-tests/src/test/java/org/apache/flink/test/runtime/minicluster/LocalFlinkMiniClusterITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/runtime/minicluster/LocalFlinkMiniClusterITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/runtime/minicluster/LocalFlinkMiniClusterITCase.java
new file mode 100644
index 0000000..7933932
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/runtime/minicluster/LocalFlinkMiniClusterITCase.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.runtime.minicluster;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.testkit.JavaTestKit;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class LocalFlinkMiniClusterITCase {
+
+	static ActorSystem system;
+
+	@BeforeClass
+	public static void setup() {
+		system = ActorSystem.create("Testkit", AkkaUtils.getDefaultAkkaConfig());
+	}
+
+	@AfterClass
+	public static void teardown() {
+		JavaTestKit.shutdownActorSystem(system);
+		system = null;
+	}
+
+	@Test
+	public void testLocalFlinkMiniClusterWithMultipleTaskManagers() {
+		LocalFlinkMiniCluster miniCluster = null;
+
+		final int numTMs = 3;
+		final int numSlots = 14;
+
+		try{
+			Configuration config = new Configuration();
+			config.setInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, numTMs);
+			config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlots);
+			miniCluster = new LocalFlinkMiniCluster(config, true);
+
+			final ActorRef jm = miniCluster.getJobManager();
+
+			new JavaTestKit(system) {{
+				new Within(TestingUtils.TESTING_DURATION()) {
+
+					@Override
+					protected void run() {
+						jm.tell(JobManagerMessages.getRequestNumberRegisteredTaskManager(),
+								getRef());
+
+						expectMsgEquals(TestingUtils.TESTING_DURATION(), numTMs);
+
+						jm.tell(JobManagerMessages.getRequestTotalNumberOfSlots(), getRef());
+
+						expectMsgEquals(TestingUtils.TESTING_DURATION(), numTMs*numSlots);
+					}
+				};
+			}};
+
+
+		} finally {
+			if (miniCluster != null) {
+				miniCluster.stop();
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/649f1583/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnUtils.scala
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnUtils.scala b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnUtils.scala
index 775fcd0..66da8ec 100644
--- a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnUtils.scala
+++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnUtils.scala
@@ -38,7 +38,8 @@ object YarnUtils {
       TaskManager.parseConfiguration(hostname, config, localAkkaCommunication = false,
         localTaskManagerCommunication = false)
 
-    (actorSystem, TaskManager.startActor(Props(new TaskManager(connectionInfo, jobManagerURL,
-      taskManagerConfig, networkConnectionConfiguration) with YarnTaskManager))(actorSystem))
+    (actorSystem, TaskManager.startActor(TaskManager.TASK_MANAGER_NAME,
+      Props(new TaskManager(connectionInfo, jobManagerURL, taskManagerConfig,
+        networkConnectionConfiguration) with YarnTaskManager))(actorSystem))
   }
 }


Mime
View raw message