flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [2/3] flink git commit: [FLINK-1985] [streaming] Add ExecutionConfig serialization for streaming jobs
Date Thu, 21 May 2015 19:15:21 GMT
[FLINK-1985] [streaming] Add ExecutionConfig serialization for streaming jobs

This closes #682


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/495a5c3c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/495a5c3c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/495a5c3c

Branch: refs/heads/master
Commit: 495a5c3c47af7f09d80b6e534cbfe339f004e26d
Parents: f8a381f
Author: mjsax <mjsax@informatik.hu-berlin.de>
Authored: Fri May 15 18:29:48 2015 +0200
Committer: Stephan Ewen <sewen@apache.org>
Committed: Thu May 21 18:30:17 2015 +0200

----------------------------------------------------------------------
 .../api/graph/StreamingJobGraphGenerator.java   |  9 ++
 .../graph/StreamingJobGraphGeneratorTest.java   | 88 ++++++++++++++++++++
 2 files changed, 97 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/495a5c3c/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
index 6bad4c8..d16ee58 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.streaming.api.graph;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
@@ -26,6 +27,7 @@ import java.util.Map;
 import java.util.Map.Entry;
 
 import org.apache.commons.lang.StringUtils;
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.jobgraph.AbstractJobVertex;
 import org.apache.flink.runtime.jobgraph.DistributionPattern;
@@ -43,6 +45,7 @@ import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
 import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner.PartitioningStrategy;
 import org.apache.flink.streaming.runtime.tasks.StreamIterationHead;
 import org.apache.flink.streaming.runtime.tasks.StreamIterationTail;
+import org.apache.flink.util.InstantiationUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -93,6 +96,12 @@ public class StreamingJobGraphGenerator {
 		
 		configureCheckpointing();
 
+		try {
+			InstantiationUtil.writeObjectToConfig(this.streamGraph.getExecutionConfig(), this.jobGraph.getJobConfiguration(),
ExecutionConfig.CONFIG_KEY);
+		} catch (IOException e) {
+			throw new RuntimeException("Config object could not be written to Job Configuration: ",
e);
+		}
+		
 		return jobGraph;
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/495a5c3c/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
new file mode 100644
index 0000000..4e7c963
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.graph;
+
+import java.io.IOException;
+import java.util.Random;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.streaming.util.TestStreamEnvironment;
+import org.apache.flink.util.InstantiationUtil;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class StreamingJobGraphGeneratorTest {
+	private static final Logger LOG = LoggerFactory.getLogger(StreamingJobGraphGeneratorTest.class);
+	
+	@Test
+	public void testExecutionConfigSerialization() throws IOException, ClassNotFoundException
{
+		final long seed = System.currentTimeMillis();
+		LOG.info("Test seed: {}", new Long(seed));
+		final Random r = new Random(seed);
+		
+		TestStreamEnvironment env = new TestStreamEnvironment(4, 32);
+		StreamGraph streamingJob = new StreamGraph(env);
+		StreamingJobGraphGenerator compiler = new StreamingJobGraphGenerator(streamingJob);
+		
+		boolean closureCleanerEnabled = r.nextBoolean(), forceAvroEnabled = r.nextBoolean(), forceKryoEnabled
= r.nextBoolean(), objectReuseEnabled = r.nextBoolean(), sysoutLoggingEnabled = r.nextBoolean();
+		int dop = 1 + r.nextInt(10);
+		
+		ExecutionConfig config = streamingJob.getExecutionConfig();
+		if(closureCleanerEnabled) {
+			config.enableClosureCleaner();
+		} else {
+			config.disableClosureCleaner();
+		}
+		if(forceAvroEnabled) {
+			config.enableForceAvro();
+		} else {
+			config.disableForceAvro();
+		}
+		if(forceKryoEnabled) {
+			config.enableForceKryo();
+		} else {
+			config.disableForceKryo();
+		}
+		if(objectReuseEnabled) {
+			config.enableObjectReuse();
+		} else {
+			config.disableObjectReuse();
+		}
+		if(sysoutLoggingEnabled) {
+			config.enableSysoutLogging();
+		} else {
+			config.disableSysoutLogging();
+		}
+		config.setParallelism(dop);
+		
+		JobGraph jobGraph = compiler.createJobGraph("test");
+		ExecutionConfig executionConfig = (ExecutionConfig) InstantiationUtil.readObjectFromConfig(
+				jobGraph.getJobConfiguration(),
+				ExecutionConfig.CONFIG_KEY,
+				Thread.currentThread().getContextClassLoader());
+		
+		Assert.assertEquals(closureCleanerEnabled, executionConfig.isClosureCleanerEnabled());
+		Assert.assertEquals(forceAvroEnabled, executionConfig.isForceAvroEnabled());
+		Assert.assertEquals(forceKryoEnabled, executionConfig.isForceKryoEnabled());
+		Assert.assertEquals(objectReuseEnabled, executionConfig.isObjectReuseEnabled());
+		Assert.assertEquals(sysoutLoggingEnabled, executionConfig.isSysoutLoggingEnabled());
+		Assert.assertEquals(dop, executionConfig.getParallelism());
+	}
+}


Mime
View raw message