flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mbala...@apache.org
Subject [1/2] flink git commit: [streaming] StreamConfig now uses InstantiationUtils for serialization
Date Fri, 24 Apr 2015 13:21:43 GMT
Repository: flink
Updated Branches:
  refs/heads/master 23473639b -> 046f39ea4


[streaming] StreamConfig now uses InstantiationUtils for serialization


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3f3830dd
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3f3830dd
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3f3830dd

Branch: refs/heads/master
Commit: 3f3830dd0a26b0b051e9e59e6fdaa0bac3be66f8
Parents: 2347363
Author: mbalassi <mbalassi@apache.org>
Authored: Wed Apr 22 18:05:16 2015 +0200
Committer: mbalassi <mbalassi@apache.org>
Committed: Fri Apr 24 15:16:48 2015 +0200

----------------------------------------------------------------------
 .../flink/streaming/api/graph/StreamConfig.java | 127 ++++++++++++-------
 1 file changed, 79 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/3f3830dd/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
index 2481990..fdfec00 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
@@ -17,14 +17,13 @@
 
 package org.apache.flink.streaming.api.graph;
 
+import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-import org.apache.commons.lang3.SerializationException;
-import org.apache.commons.lang3.SerializationUtils;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.collector.selector.OutputSelectorWrapper;
 import org.apache.flink.streaming.api.operators.StreamOperator;
@@ -113,7 +112,7 @@ public class StreamConfig implements Serializable {
 			return (StreamRecordSerializer<T>) InstantiationUtil.readObjectFromConfig(this.config,
 					TYPE_SERIALIZER_IN_1, cl);
 		} catch (Exception e) {
-			throw new RuntimeException("Could not instantiate serializer.");
+			throw new StreamTaskException("Could not instantiate serializer.", e);
 		}
 	}
 
@@ -123,7 +122,7 @@ public class StreamConfig implements Serializable {
 			return (StreamRecordSerializer<T>) InstantiationUtil.readObjectFromConfig(this.config,
 					TYPE_SERIALIZER_IN_2, cl);
 		} catch (Exception e) {
-			throw new RuntimeException("Could not instantiate serializer.");
+			throw new StreamTaskException("Could not instantiate serializer.", e);
 		}
 	}
 
@@ -133,7 +132,7 @@ public class StreamConfig implements Serializable {
 			return (StreamRecordSerializer<T>) InstantiationUtil.readObjectFromConfig(this.config,
 					TYPE_SERIALIZER_OUT_1, cl);
 		} catch (Exception e) {
-			throw new RuntimeException("Could not instantiate serializer.");
+			throw new StreamTaskException("Could not instantiate serializer.", e);
 		}
 	}
 
@@ -143,12 +142,16 @@ public class StreamConfig implements Serializable {
 			return (StreamRecordSerializer<T>) InstantiationUtil.readObjectFromConfig(this.config,
 					TYPE_SERIALIZER_OUT_2, cl);
 		} catch (Exception e) {
-			throw new RuntimeException("Could not instantiate serializer.");
+			throw new StreamTaskException("Could not instantiate serializer.", e);
 		}
 	}
 
 	private void setTypeSerializer(String key, StreamRecordSerializer<?> typeWrapper)
{
-		config.setBytes(key, SerializationUtils.serialize(typeWrapper));
+		try {
+			InstantiationUtil.writeObjectToConfig(typeWrapper, this.config, key);
+		} catch (IOException e) {
+			throw new StreamTaskException("Could not serialize type serializer.", e);
+		}
 	}
 
 	public void setBufferTimeout(long timeout) {
@@ -164,10 +167,10 @@ public class StreamConfig implements Serializable {
 			config.setClass(USER_FUNCTION, operator.getClass());
 
 			try {
-				config.setBytes(SERIALIZEDUDF, SerializationUtils.serialize(operator));
-			} catch (SerializationException e) {
-				throw new RuntimeException("Cannot serialize operator object "
-						+ operator.getClass(), e);
+				InstantiationUtil.writeObjectToConfig(operator, this.config, SERIALIZEDUDF);
+			} catch (IOException e) {
+				throw new StreamTaskException("Cannot serialize operator object "
+						+ operator.getClass() + ".", e);
 			}
 		}
 	}
@@ -177,15 +180,15 @@ public class StreamConfig implements Serializable {
 		try {
 			return (T) InstantiationUtil.readObjectFromConfig(this.config, SERIALIZEDUDF, cl);
 		} catch (Exception e) {
-			throw new StreamTaskException("Cannot instantiate user function", e);
+			throw new StreamTaskException("Cannot instantiate user function.", e);
 		}
 	}
 
 	public void setOutputSelectorWrapper(OutputSelectorWrapper<?> outputSelectorWrapper)
{
 		try {
-			config.setBytes(OUTPUT_SELECTOR_WRAPPER, SerializationUtils.serialize(outputSelectorWrapper));
-		} catch (SerializationException e) {
-			throw new RuntimeException("Cannot serialize OutputSelectorWrapper");
+			InstantiationUtil.writeObjectToConfig(outputSelectorWrapper, this.config, OUTPUT_SELECTOR_WRAPPER);
+		} catch (IOException e) {
+			throw new StreamTaskException("Cannot serialize OutputSelectorWrapper.", e);
 		}
 	}
 
@@ -195,7 +198,7 @@ public class StreamConfig implements Serializable {
 			return (OutputSelectorWrapper<T>) InstantiationUtil.readObjectFromConfig(this.config,
 					OUTPUT_SELECTOR_WRAPPER, cl);
 		} catch (Exception e) {
-			throw new StreamTaskException("Cannot deserialize and instantiate OutputSelectorWrapper",
e);
+			throw new StreamTaskException("Cannot deserialize and instantiate OutputSelectorWrapper.",
e);
 		}
 	}
 
@@ -216,19 +219,26 @@ public class StreamConfig implements Serializable {
 	}
 
 	public void setSelectedNames(Integer output, List<String> selected) {
-		if (selected != null) {
-			config.setBytes(OUTPUT_NAME + output,
-					SerializationUtils.serialize((Serializable) selected));
-		} else {
-			config.setBytes(OUTPUT_NAME + output,
-					SerializationUtils.serialize(new ArrayList<String>()));
+		if (selected == null) {
+			selected = new ArrayList<String>();
+		}
+
+		try {
+			InstantiationUtil.writeObjectToConfig(selected, this.config, OUTPUT_NAME + output);
+		} catch (IOException e) {
+			throw new StreamTaskException("Cannot serialize OutputSelector for name \"" + output+
"\".", e);
 		}
 	}
 
 	@SuppressWarnings("unchecked")
-	public List<String> getSelectedNames(Integer output) {
-		return (List<String>) SerializationUtils.deserialize(config.getBytes(OUTPUT_NAME
+ output,
-				null));
+	public List<String> getSelectedNames(Integer output, ClassLoader cl) {
+		List<String> selectedNames;
+		try {
+			selectedNames = (List<String>) InstantiationUtil.readObjectFromConfig(this.config,
OUTPUT_NAME + output, cl);
+		} catch (Exception e) {
+			throw new StreamTaskException("Cannot deserialize OutputSelector for name \"" + output
+ "\".", e);
+		}
+		return selectedNames == null ? new ArrayList<String>() : selectedNames;
 	}
 
 	public void setNumberOfInputs(int numberOfInputs) {
@@ -248,7 +258,11 @@ public class StreamConfig implements Serializable {
 	}
 
 	public void setNonChainedOutputs(List<StreamEdge> outputvertexIDs) {
-		config.setBytes(NONCHAINED_OUTPUTS, SerializationUtils.serialize((Serializable) outputvertexIDs));
+		try {
+			InstantiationUtil.writeObjectToConfig(outputvertexIDs, this.config, NONCHAINED_OUTPUTS);
+		} catch (IOException e) {
+			throw new StreamTaskException("Cannot serialize non chained outputs.", e);
+		}
 	}
 
 	@SuppressWarnings("unchecked")
@@ -257,13 +271,16 @@ public class StreamConfig implements Serializable {
 			List<StreamEdge> nonChainedOutputs = (List<StreamEdge>) InstantiationUtil.readObjectFromConfig(this.config,
NONCHAINED_OUTPUTS, cl);
 			return nonChainedOutputs == null ?  new ArrayList<StreamEdge>() : nonChainedOutputs;
 		} catch (Exception e) {
-			throw new RuntimeException("Could not instantiate outputs.");
+			throw new StreamTaskException("Could not instantiate non chained outputs.", e);
 		}
 	}
 
 	public void setChainedOutputs(List<StreamEdge> chainedOutputs) {
-		config.setBytes(CHAINED_OUTPUTS,
-				SerializationUtils.serialize((Serializable) chainedOutputs));
+		try {
+			InstantiationUtil.writeObjectToConfig(chainedOutputs, this.config, CHAINED_OUTPUTS);
+		} catch (IOException e) {
+			throw new StreamTaskException("Cannot serialize chained outputs.", e);
+		}
 	}
 
 	@SuppressWarnings("unchecked")
@@ -272,78 +289,92 @@ public class StreamConfig implements Serializable {
 			List<StreamEdge> chainedOutputs = (List<StreamEdge>) InstantiationUtil.readObjectFromConfig(this.config,
CHAINED_OUTPUTS, cl);
 			return chainedOutputs == null ? new ArrayList<StreamEdge>() : chainedOutputs;
 		} catch (Exception e) {
-			throw new RuntimeException("Could not instantiate chained outputs.");
+			throw new StreamTaskException("Could not instantiate chained outputs.", e);
 		}
 	}
 
 	public void setOutEdges(List<StreamEdge> outEdges) {
-		config.setBytes(OUT_STREAM_EDGES, SerializationUtils.serialize((Serializable) outEdges));
+		try {
+			InstantiationUtil.writeObjectToConfig(outEdges, this.config, OUT_STREAM_EDGES);
+		} catch (IOException e) {
+			throw new StreamTaskException("Cannot serialize outward edges.", e);
+		}
 	}
 
 	@SuppressWarnings("unchecked")
 	public List<StreamEdge> getOutEdges(ClassLoader cl) {
 		try {
-			return (List<StreamEdge>) InstantiationUtil.readObjectFromConfig(
+			List<StreamEdge> outEdges = (List<StreamEdge>) InstantiationUtil.readObjectFromConfig(
 					this.config, OUT_STREAM_EDGES, cl);
+			return outEdges == null ? new ArrayList<StreamEdge>() : outEdges;
 		} catch (Exception e) {
-			throw new RuntimeException("Could not instantiate outputs.");
+			throw new StreamTaskException("Could not instantiate outputs.", e);
 		}
 	}
 
 	public void setInPhysicalEdges(List<StreamEdge> inEdges) {
-		config.setBytes(IN_STREAM_EDGES, SerializationUtils.serialize((Serializable) inEdges));
+		try {
+			InstantiationUtil.writeObjectToConfig(inEdges, this.config, IN_STREAM_EDGES);
+		} catch (IOException e) {
+			throw new StreamTaskException("Cannot serialize inward edges.", e);
+		}
 	}
 
 	@SuppressWarnings("unchecked")
 	public List<StreamEdge> getInPhysicalEdges(ClassLoader cl) {
 		try {
-			return (List<StreamEdge>) InstantiationUtil.readObjectFromConfig(
+			List<StreamEdge> inEdges = (List<StreamEdge>) InstantiationUtil.readObjectFromConfig(
 					this.config, IN_STREAM_EDGES, cl);
+			return inEdges == null ? new ArrayList<StreamEdge>() : inEdges;
 		} catch (Exception e) {
-			throw new RuntimeException("Could not instantiate inputs.");
+			throw new StreamTaskException("Could not instantiate inputs.", e);
 		}
 	}
 
 	public void setStateMonitoring(boolean stateMonitoring) {
-
 		config.setBoolean(STATE_MONITORING, stateMonitoring);
-
 	}
 
-	public boolean getStateMonitoring()
-	{
+	public boolean getStateMonitoring() {
 		return config.getBoolean(STATE_MONITORING, false);
 	}
 
 	public void setOutEdgesInOrder(List<StreamEdge> outEdgeList) {
-		config.setBytes(EDGES_IN_ORDER, SerializationUtils.serialize((Serializable) outEdgeList));
+		try {
+			InstantiationUtil.writeObjectToConfig(outEdgeList, this.config, EDGES_IN_ORDER);
+		} catch (IOException e) {
+			throw new StreamTaskException("Could not serialize outputs in order.", e);
+		}
 	}
 
 	@SuppressWarnings("unchecked")
 	public List<StreamEdge> getOutEdgesInOrder(ClassLoader cl) {
 		try {
-			return (List<StreamEdge>) InstantiationUtil.readObjectFromConfig(
+			List<StreamEdge> outEdgesInOrder = (List<StreamEdge>) InstantiationUtil.readObjectFromConfig(
 					this.config, EDGES_IN_ORDER, cl);
+			return outEdgesInOrder == null ? new ArrayList<StreamEdge>() : outEdgesInOrder;
 		} catch (Exception e) {
-			throw new RuntimeException("Could not instantiate outputs.");
+			throw new StreamTaskException("Could not instantiate outputs in order.", e);
 		}
 	}
 
 	public void setTransitiveChainedTaskConfigs(Map<Integer, StreamConfig> chainedTaskConfigs)
{
-		config.setBytes(CHAINED_TASK_CONFIG,
-				SerializationUtils.serialize((Serializable) chainedTaskConfigs));
+
+		try {
+			InstantiationUtil.writeObjectToConfig(chainedTaskConfigs, this.config, CHAINED_TASK_CONFIG);
+		} catch (IOException e) {
+			throw new StreamTaskException("Could not serialize configuration.", e);
+		}
 	}
 
 	@SuppressWarnings("unchecked")
 	public Map<Integer, StreamConfig> getTransitiveChainedTaskConfigs(ClassLoader cl)
{
 		try {
-
 			Map<Integer, StreamConfig> confs = (Map<Integer, StreamConfig>) InstantiationUtil
 					.readObjectFromConfig(this.config, CHAINED_TASK_CONFIG, cl);
-
 			return confs == null ? new HashMap<Integer, StreamConfig>() : confs;
 		} catch (Exception e) {
-			throw new RuntimeException("Could not instantiate configuration.");
+			throw new StreamTaskException("Could not instantiate configuration.", e);
 		}
 	}
 


Mime
View raw message