flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mj...@apache.org
Subject flink git commit: [Storm-Compatibility] Forward Storm Kryo registrations to Flink
Date Thu, 14 Jan 2016 22:43:56 GMT
Repository: flink
Updated Branches:
  refs/heads/master d1c93d286 -> be055b7a9


[Storm-Compatibility] Forward Storm Kryo registrations to Flink

This closes #1495.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/be055b7a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/be055b7a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/be055b7a

Branch: refs/heads/master
Commit: be055b7a9f8ecb09e5e4e0bbddb98639173a09a7
Parents: d1c93d2
Author: mjsax <mjsax@apache.org>
Authored: Sun Jan 10 16:59:37 2016 +0100
Committer: mjsax <mjsax@apache.org>
Committed: Thu Jan 14 23:20:18 2016 +0100

----------------------------------------------------------------------
 .../org/apache/flink/storm/api/FlinkClient.java | 46 +++++++++++++++++---
 .../flink/storm/api/FlinkLocalCluster.java      | 14 +++---
 2 files changed, 45 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/be055b7a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkClient.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkClient.java
b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkClient.java
index fa7ae79..2ad7f56 100644
--- a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkClient.java
+++ b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkClient.java
@@ -31,8 +31,10 @@ import backtype.storm.generated.NotAliveException;
 import backtype.storm.utils.NimbusClient;
 import backtype.storm.utils.Utils;
 
+import com.esotericsoftware.kryo.Serializer;
 import com.google.common.collect.Lists;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.client.program.Client;
 import org.apache.flink.client.program.JobWithJars;
@@ -48,8 +50,10 @@ import org.apache.flink.runtime.jobmanager.JobManager;
 import org.apache.flink.runtime.messages.JobManagerMessages;
 import org.apache.flink.runtime.messages.JobManagerMessages.RunningJobsStatus;
 import org.apache.flink.storm.util.StormConfig;
-
 import org.apache.flink.streaming.api.graph.StreamGraph;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import scala.Some;
 import scala.concurrent.Await;
 import scala.concurrent.Future;
@@ -63,6 +67,7 @@ import java.net.URL;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 
 /**
  * {@link FlinkClient} mimics a Storm {@link NimbusClient} and {@link Nimbus}{@code .Client}
at once, to interact with
@@ -70,6 +75,9 @@ import java.util.Map;
  */
 public class FlinkClient {
 
+	/** The log used by this client. */
+	private static final Logger LOG = LoggerFactory.getLogger(FlinkClient.class);
+
 	/** The client's configuration */
 	private final Map<?,?> conf;
 	/** The jobmanager's host name */
@@ -163,9 +171,8 @@ public class FlinkClient {
 	 * Parameter {@code uploadedJarLocation} is actually used to point to the local jar, because
Flink does not support
 	 * uploading a jar file before hand. Jar files are always uploaded directly when a program
is submitted.
 	 */
-	public void submitTopologyWithOpts(final String name, final String uploadedJarLocation,
final FlinkTopology
-			topology)
-					throws AlreadyAliveException, InvalidTopologyException {
+	public void submitTopologyWithOpts(final String name, final String uploadedJarLocation,
final FlinkTopology topology)
+			throws AlreadyAliveException, InvalidTopologyException {
 
 		if (this.getTopologyJobId(name) != null) {
 			throw new AlreadyAliveException();
@@ -181,9 +188,11 @@ public class FlinkClient {
 			throw new RuntimeException("Problem with jar file " + uploadedJarLocation, e);
 		}
 
-		/* set storm configuration */
-		if (this.conf != null) {
-			topology.getExecutionEnvironment().getConfig().setGlobalJobParameters(new StormConfig(this.conf));
+		try {
+			FlinkClient.addStormConfigToTopology(topology, conf);
+		} catch(ClassNotFoundException e) {
+			LOG.error("Could not register class for Kryo serialization.", e);
+			throw new InvalidTopologyException("Could not register class for Kryo serialization.");
 		}
 
 		final StreamGraph streamGraph = topology.getExecutionEnvironment().getStreamGraph();
@@ -325,4 +334,27 @@ public class FlinkClient {
 				actorSystem, AkkaUtils.getLookupTimeout(configuration));
 	}
 
+	@SuppressWarnings({ "unchecked", "rawtypes" })
+	static void addStormConfigToTopology(FlinkTopology topology, Map conf) throws ClassNotFoundException
{
+		if (conf != null) {
+			ExecutionConfig flinkConfig = topology.getExecutionEnvironment().getConfig();
+
+			flinkConfig.setGlobalJobParameters(new StormConfig(conf));
+
+			// add all registered types to ExecutionConfig
+			List<?> registeredClasses = (List<?>) conf.get(Config.TOPOLOGY_KRYO_REGISTER);
+			if (registeredClasses != null) {
+				for (Object klass : registeredClasses) {
+					if (klass instanceof String) {
+						flinkConfig.registerKryoType(Class.forName((String) klass));
+					} else {
+						for (Entry<String,String> register : ((Map<String,String>)klass).entrySet())
{
+							flinkConfig.registerTypeWithKryoSerializer(Class.forName(register.getKey()),
+									(Class<? extends Serializer<?>>)Class.forName(register.getValue()));
+						}
+					}
+				}
+			}
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/be055b7a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkLocalCluster.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkLocalCluster.java
b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkLocalCluster.java
index 2ce3c0f..04374fd 100644
--- a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkLocalCluster.java
+++ b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkLocalCluster.java
@@ -31,7 +31,6 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.minicluster.FlinkMiniCluster;
 import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
-import org.apache.flink.storm.util.StormConfig;
 import org.apache.flink.streaming.api.graph.StreamGraph;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -74,33 +73,32 @@ public class FlinkLocalCluster {
 		LOG.info("Running Storm topology on FlinkLocalCluster");
 
 		boolean submitBlocking = false;
-		if(conf != null) {
-			topology.getExecutionEnvironment().getConfig().setGlobalJobParameters(new StormConfig(conf));
-
+		if (conf != null) {
 			Object blockingFlag = conf.get(SUBMIT_BLOCKING);
 			if(blockingFlag != null && blockingFlag instanceof Boolean) {
 				submitBlocking = ((Boolean)blockingFlag).booleanValue();
 			}
 		}
 
+		FlinkClient.addStormConfigToTopology(topology, conf);
+
 		StreamGraph streamGraph = topology.getExecutionEnvironment().getStreamGraph();
 		streamGraph.setJobName(topologyName);
 
 		JobGraph jobGraph = streamGraph.getJobGraph();
 
-		if (flink == null) {
-
+		if (this.flink == null) {
 			Configuration configuration = new Configuration();
 			configuration.addAll(jobGraph.getJobConfiguration());
 
 			configuration.setLong(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, -1L);
 			configuration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, jobGraph.getMaximumParallelism());
 
-			flink = new LocalFlinkMiniCluster(configuration, true);
+			this.flink = new LocalFlinkMiniCluster(configuration, true);
 			this.flink.start();
 		}
 
-		if(submitBlocking) {
+		if (submitBlocking) {
 			this.flink.submitJobAndWait(jobGraph, false);
 		} else {
 			this.flink.submitJobDetached(jobGraph);


Mime
View raw message