flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rmetz...@apache.org
Subject flink git commit: [FLINK-2087] Add streaming mode switch to YARN
Date Sun, 07 Jun 2015 22:58:46 GMT
Repository: flink
Updated Branches:
  refs/heads/master 85c55dcbb -> 58b9a3772


[FLINK-2087] Add streaming mode switch to YARN

This closes #788


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/58b9a377
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/58b9a377
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/58b9a377

Branch: refs/heads/master
Commit: 58b9a3772f5027f58335fb299b122e8ecb9db218
Parents: 85c55dc
Author: Robert Metzger <rmetzger@apache.org>
Authored: Thu Jun 4 21:50:19 2015 +0200
Committer: Robert Metzger <rmetzger@apache.org>
Committed: Mon Jun 8 00:58:13 2015 +0200

----------------------------------------------------------------------
 docs/setup/yarn_setup.md                            |  1 +
 .../apache/flink/client/FlinkYarnSessionCli.java    |  8 ++++++++
 .../flink/runtime/yarn/AbstractFlinkYarnClient.java |  6 ++++++
 .../flink/runtime/taskmanager/TaskManager.scala     |  2 +-
 .../streaming/connectors/kafka/KafkaITCase.java     |  2 ++
 .../java/org/apache/flink/yarn/FlinkYarnClient.java |  8 ++++++++
 .../org/apache/flink/yarn/ApplicationMaster.scala   | 16 +++++++++++++++-
 .../apache/flink/yarn/ApplicationMasterActor.scala  | 14 ++++++++++++--
 8 files changed, 53 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/58b9a377/docs/setup/yarn_setup.md
----------------------------------------------------------------------
diff --git a/docs/setup/yarn_setup.md b/docs/setup/yarn_setup.md
index cf9f6f8..71a217f 100644
--- a/docs/setup/yarn_setup.md
+++ b/docs/setup/yarn_setup.md
@@ -99,6 +99,7 @@ Usage:
      -q,--query                      Display available YARN resources (memory, cores)
      -qu,--queue <arg>               Specify YARN queue.
      -s,--slots <arg>                Number of slots per TaskManager
+     -st,--streaming                 Start Flink in streaming mode
      -tm,--taskManagerMemory <arg>   Memory per TaskManager Container [in MB]
 ~~~
 

http://git-wip-us.apache.org/repos/asf/flink/blob/58b9a377/flink-clients/src/main/java/org/apache/flink/client/FlinkYarnSessionCli.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/FlinkYarnSessionCli.java
b/flink-clients/src/main/java/org/apache/flink/client/FlinkYarnSessionCli.java
index 7352457..0fa7173 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/FlinkYarnSessionCli.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/FlinkYarnSessionCli.java
@@ -73,6 +73,7 @@ public class FlinkYarnSessionCli {
 	private final Option CONTAINER;
 	private final Option SLOTS;
 	private final Option DETACHED;
+	private final Option STREAMING;
 
 	/**
 	 * Dynamic properties allow the user to specify additional configuration values with -D,
such as
@@ -95,6 +96,7 @@ public class FlinkYarnSessionCli {
 		SLOTS = new Option(shortPrefix + "s", longPrefix + "slots", true, "Number of slots per
TaskManager");
 		DYNAMIC_PROPERTIES = new Option(shortPrefix + "D", true, "Dynamic properties");
 		DETACHED = new Option(shortPrefix + "d", longPrefix + "detached", false, "Start detached");
+		STREAMING = new Option(shortPrefix + "st", longPrefix + "streaming", false, "Start Flink
in streaming mode");
 	}
 
 	public AbstractFlinkYarnClient createFlinkYarnClient(CommandLine cmd) {
@@ -214,6 +216,10 @@ public class FlinkYarnSessionCli {
 			detachedMode = true;
 			flinkYarnClient.setDetachedMode(detachedMode);
 		}
+
+		if (cmd.hasOption(STREAMING.getOpt())) {
+			flinkYarnClient.setStreamingMode(true);
+		}
 		return flinkYarnClient;
 	}
 
@@ -237,6 +243,7 @@ public class FlinkYarnSessionCli {
 		opt.addOption(SLOTS);
 		opt.addOption(DYNAMIC_PROPERTIES);
 		opt.addOption(DETACHED);
+		opt.addOption(STREAMING);
 		formatter.printHelp(" ", opt);
 	}
 
@@ -342,6 +349,7 @@ public class FlinkYarnSessionCli {
 		options.addOption(SLOTS);
 		options.addOption(DYNAMIC_PROPERTIES);
 		options.addOption(DETACHED);
+		options.addOption(STREAMING);
 	}
 
 	public int run(String[] args) {

http://git-wip-us.apache.org/repos/asf/flink/blob/58b9a377/flink-runtime/src/main/java/org/apache/flink/runtime/yarn/AbstractFlinkYarnClient.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/yarn/AbstractFlinkYarnClient.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/yarn/AbstractFlinkYarnClient.java
index 039e926..0e307e0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/yarn/AbstractFlinkYarnClient.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/yarn/AbstractFlinkYarnClient.java
@@ -132,4 +132,10 @@ public abstract class AbstractFlinkYarnClient {
 	 * directory in HDFS that contains the jar files and configuration which is shipped to all
the containers.
 	 */
 	public abstract String getSessionFilesDir();
+
+	/**
+	 * Instruct Flink to start in streaming mode
+	 * @param streamingMode
+	 */
+	public abstract  void setStreamingMode(boolean streamingMode);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/58b9a377/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index 7bf5bc5..0b76bda 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -1248,7 +1248,7 @@ object TaskManager {
                      streamingMode: StreamingMode,
                      taskManagerClass: Class[_ <: TaskManager]) : Unit = {
 
-    LOG.info("Starting TaskManager")
+    LOG.info(s"Starting TaskManager in streaming mode $streamingMode")
 
     // Bring up the TaskManager actor system first, bind it to the given address.
 

http://git-wip-us.apache.org/repos/asf/flink/blob/58b9a377/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java
b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java
index 0a12b07..43e3cc3 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java
@@ -74,6 +74,7 @@ import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.ClassRule;
+import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 import org.slf4j.Logger;
@@ -281,6 +282,7 @@ public class KafkaITCase {
 	 *
 	 */
 	@Test
+	@Ignore
 	public void testPersistentSourceWithOffsetUpdates() throws Exception {
 		LOG.info("Starting testPersistentSourceWithOffsetUpdates()");
 

http://git-wip-us.apache.org/repos/asf/flink/blob/58b9a377/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnClient.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnClient.java b/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnClient.java
index 88b2987..118f4ad 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnClient.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnClient.java
@@ -93,6 +93,7 @@ public class FlinkYarnClient extends AbstractFlinkYarnClient {
 	public static final String ENV_CLIENT_USERNAME = "_CLIENT_USERNAME";
 	public static final String ENV_SLOTS = "_SLOTS";
 	public static final String ENV_DETACHED = "_DETACHED";
+	public static final String ENV_STREAMING_MODE = "_STREAMING_MODE";
 	public static final String ENV_DYNAMIC_PROPERTIES = "_DYNAMIC_PROPERTIES";
 
 
@@ -140,6 +141,7 @@ public class FlinkYarnClient extends AbstractFlinkYarnClient {
 	private org.apache.flink.configuration.Configuration flinkConfiguration;
 
 	private boolean detached;
+	private boolean streamingMode;
 
 
 	public FlinkYarnClient() {
@@ -576,6 +578,7 @@ public class FlinkYarnClient extends AbstractFlinkYarnClient {
 		appMasterEnv.put(FlinkYarnClient.ENV_CLIENT_USERNAME, UserGroupInformation.getCurrentUser().getShortUserName());
 		appMasterEnv.put(FlinkYarnClient.ENV_SLOTS, String.valueOf(slots));
 		appMasterEnv.put(FlinkYarnClient.ENV_DETACHED, String.valueOf(detached));
+		appMasterEnv.put(FlinkYarnClient.ENV_STREAMING_MODE, String.valueOf(streamingMode));
 
 		if(dynamicPropertiesEncoded != null) {
 			appMasterEnv.put(FlinkYarnClient.ENV_DYNAMIC_PROPERTIES, dynamicPropertiesEncoded);
@@ -726,6 +729,11 @@ public class FlinkYarnClient extends AbstractFlinkYarnClient {
 		return sessionFilesDir.toString();
 	}
 
+	@Override
+	public void setStreamingMode(boolean streamingMode) {
+		this.streamingMode = streamingMode;
+	}
+
 	public static class YarnDeploymentException extends RuntimeException {
 		public YarnDeploymentException() {
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/58b9a377/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMaster.scala
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMaster.scala b/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMaster.scala
index c1a937e..5dd197d 100644
--- a/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMaster.scala
+++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMaster.scala
@@ -56,7 +56,7 @@ object ApplicationMaster {
     EnvironmentInformation.checkJavaVersion()
     org.apache.flink.runtime.util.SignalHandler.register(LOG.logger)
     
-    val streamingMode = StreamingMode.BATCH_ONLY
+    var streamingMode = StreamingMode.BATCH_ONLY
 
     val ugi = UserGroupInformation.createRemoteUser(yarnClientUsername)
 
@@ -84,6 +84,11 @@ object ApplicationMaster {
 
           val logDirs = env.get(Environment.LOG_DIRS.key())
 
+          if(hasStreamingMode(env)) {
+            LOG.info("Starting ApplicationMaster/JobManager in streaming mode")
+            streamingMode = StreamingMode.STREAMING
+          }
+
           // Note that we use the "ownHostname" given by YARN here, to make sure
           // we use the hostnames given by YARN consistently throughout akka.
           // for akka "localhost" and "localhost.localdomain" are different actors.
@@ -246,4 +251,13 @@ object ApplicationMaster {
 
     (configuration, jobManagerSystem, jobManager, archiver)
   }
+
+
+  def hasStreamingMode(env: java.util.Map[String, String]): Boolean = {
+    val sModeString = env.get(FlinkYarnClient.ENV_STREAMING_MODE)
+    if(sModeString != null) {
+      return sModeString.toBoolean
+    }
+    false
+  }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/58b9a377/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMasterActor.scala
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMasterActor.scala
b/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMasterActor.scala
index 26d1f69..999610f 100644
--- a/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMasterActor.scala
+++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMasterActor.scala
@@ -461,8 +461,10 @@ trait ApplicationMasterActor extends ActorLogMessages {
       runningContainers = 0
       failedContainers = 0
 
+      val hs = ApplicationMaster.hasStreamingMode(env)
       containerLaunchContext = Some(createContainerLaunchContext(heapLimit, hasLogback, hasLog4j,
-        yarnClientUsername, conf, taskManagerLocalResources))
+        yarnClientUsername, conf, taskManagerLocalResources, hs))
+
 
       context.system.scheduler.scheduleOnce(FAST_YARN_HEARTBEAT_DELAY, self, HeartbeatWithYarn)
     } recover {
@@ -499,7 +501,8 @@ trait ApplicationMasterActor extends ActorLogMessages {
 
   private def createContainerLaunchContext(heapLimit: Int, hasLogback: Boolean, hasLog4j:
Boolean,
                                    yarnClientUsername: String, yarnConf: Configuration,
-                                   taskManagerLocalResources: Map[String, LocalResource]):
+                                   taskManagerLocalResources: Map[String, LocalResource],
+                                   streamingMode: Boolean):
   ContainerLaunchContext = {
     log.info("Create container launch context.")
     val ctx = Records.newRecord(classOf[ContainerLaunchContext])
@@ -525,6 +528,13 @@ trait ApplicationMasterActor extends ActorLogMessages {
       s"${ApplicationConstants.LOG_DIR_EXPANSION_VAR}/taskmanager-stdout.log 2> " +
       s"${ApplicationConstants.LOG_DIR_EXPANSION_VAR}/taskmanager-stderr.log"
 
+    tmCommand ++= " --streamingMode"
+    if(streamingMode) {
+      tmCommand ++= " streaming"
+    } else {
+      tmCommand ++= " batch"
+    }
+
     ctx.setCommands(Collections.singletonList(tmCommand.toString()))
 
     log.info(s"Starting TM with command=${tmCommand.toString()}")


Mime
View raw message