flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From u..@apache.org
Subject [2/2] flink git commit: [contrib, storm] Forward Kryo registrations to Flink
Date Wed, 03 Feb 2016 10:38:36 GMT
[contrib, storm] Forward Kryo registrations to Flink

Backport of be055b7a9f8ecb09e5e4e0bbddb98639173a09a7.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e5b19108
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e5b19108
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e5b19108

Branch: refs/heads/release-0.10
Commit: e5b1910811c40c5f3cc6485cdefd1e3516d55dce
Parents: 602151f
Author: Ufuk Celebi <uce@apache.org>
Authored: Wed Feb 3 11:23:34 2016 +0100
Committer: Ufuk Celebi <uce@apache.org>
Committed: Wed Feb 3 11:23:35 2016 +0100

----------------------------------------------------------------------
 .../org/apache/flink/storm/api/FlinkClient.java | 41 +++++++++++++++++---
 .../flink/storm/api/FlinkLocalCluster.java      |  2 +
 2 files changed, 38 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e5b19108/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkClient.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkClient.java
b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkClient.java
index 3607fad..1256cc5 100644
--- a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkClient.java
+++ b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkClient.java
@@ -30,9 +30,9 @@ import backtype.storm.generated.Nimbus;
 import backtype.storm.generated.NotAliveException;
 import backtype.storm.utils.NimbusClient;
 import backtype.storm.utils.Utils;
-
+import com.esotericsoftware.kryo.Serializer;
 import com.google.common.collect.Lists;
-
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.client.program.Client;
 import org.apache.flink.client.program.JobWithJars;
@@ -48,7 +48,8 @@ import org.apache.flink.runtime.jobmanager.JobManager;
 import org.apache.flink.runtime.messages.JobManagerMessages;
 import org.apache.flink.runtime.messages.JobManagerMessages.RunningJobsStatus;
 import org.apache.flink.storm.util.StormConfig;
-
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import scala.Some;
 import scala.concurrent.Await;
 import scala.concurrent.Future;
@@ -69,6 +70,8 @@ import java.util.Map;
  */
 public class FlinkClient {
 
+	private static final Logger LOG = LoggerFactory.getLogger(FlinkClient.class);
+
 	/** The client's configuration */
 	private final Map<?,?> conf;
 	/** The jobmanager's host name */
@@ -181,10 +184,14 @@ public class FlinkClient {
 		}
 
 		/* set storm configuration */
-		if (this.conf != null) {
-			topology.getConfig().setGlobalJobParameters(new StormConfig(this.conf));
+		try {
+			FlinkClient.addStormConfigToTopology(topology, conf);
+		} catch (ClassNotFoundException e) {
+			LOG.error("Could not register class for Kryo serialization.", e);
+			throw new InvalidTopologyException("Could not register class for Kryo serialization.");
 		}
 
+
 		final JobGraph jobGraph = topology.getStreamGraph().getJobGraph(name);
 		jobGraph.addJar(new Path(uploadedJarUri));
 
@@ -321,4 +328,28 @@ public class FlinkClient {
 				actorSystem, AkkaUtils.getLookupTimeout(configuration));
 	}
 
+	@SuppressWarnings({ "unchecked", "rawtypes" })
+	static void addStormConfigToTopology(FlinkTopology topology, Map conf) throws ClassNotFoundException
{
+		if (conf != null) {
+			ExecutionConfig flinkConfig = topology.getExecutionEnvironment().getConfig();
+
+			flinkConfig.setGlobalJobParameters(new StormConfig(conf));
+
+			// add all registered types to ExecutionConfig
+			List<?> registeredClasses = (List<?>) conf.get(Config.TOPOLOGY_KRYO_REGISTER);
+			if (registeredClasses != null) {
+				for (Object klass : registeredClasses) {
+					if (klass instanceof String) {
+						flinkConfig.registerKryoType(Class.forName((String) klass));
+					} else {
+						for (Map.Entry<String, String> register : ((Map<String, String>) klass).entrySet())
{
+							flinkConfig.registerTypeWithKryoSerializer(Class.forName(register.getKey()),
+									(Class<? extends Serializer<?>>) Class.forName(register.getValue()));
+						}
+					}
+				}
+			}
+		}
+	}
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/e5b19108/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkLocalCluster.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkLocalCluster.java
b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkLocalCluster.java
index 868801b..67d67a0 100644
--- a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkLocalCluster.java
+++ b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkLocalCluster.java
@@ -73,6 +73,8 @@ public class FlinkLocalCluster {
 			topology.getConfig().setGlobalJobParameters(new StormConfig(conf));
 		}
 
+		FlinkClient.addStormConfigToTopology(topology, conf);
+
 		JobGraph jobGraph = topology.getStreamGraph().getJobGraph(topologyName);
 		this.flink.submitJobDetached(jobGraph);
 	}


Mime
View raw message