flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mj...@apache.org
Subject flink git commit: [FLINK-2525] Add configuration support in Storm-compatibility
Date Fri, 02 Oct 2015 08:50:29 GMT
Repository: flink
Updated Branches:
  refs/heads/master 9f7110748 -> 9fe285a77


[FLINK-2525] Add configuration support in Storm-compatibility

This closes #1046


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9fe285a7
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9fe285a7
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9fe285a7

Branch: refs/heads/master
Commit: 9fe285a77de5cd1a35ceb58f9295751fd3dd9e15
Parents: 9f71107
Author: ffbin <869218239@qq.com>
Authored: Fri Oct 2 00:04:40 2015 +0200
Committer: mjsax <mjsax@informatik.hu-berlin.de>
Committed: Fri Oct 2 10:48:48 2015 +0200

----------------------------------------------------------------------
 docs/apis/storm_compatibility.md                |  31 +++++
 .../flink-storm-compatibility-core/README.md    |   1 -
 .../stormcompatibility/api/FlinkClient.java     |  11 +-
 .../api/FlinkLocalCluster.java                  |  26 ++--
 .../stormcompatibility/util/StormConfig.java    | 123 +++++++++++++++++++
 .../wrappers/AbstractStormSpoutWrapper.java     |  16 ++-
 .../wrappers/StormBoltWrapper.java              |  15 ++-
 .../wrappers/FiniteStormSpoutWrapperTest.java   |  25 ++--
 .../wrappers/FiniteTestSpout.java               |   3 +-
 .../wrappers/StormBoltWrapperTest.java          |  85 ++++++++++---
 .../wrappers/StormFiniteSpoutWrapperTest.java   |  28 ++---
 .../wrappers/StormSpoutWrapperTest.java         |  52 +++++++-
 .../excamation/ExclamationTopology.java         |  12 +-
 .../excamation/ExclamationWithStormBolt.java    |  23 ++--
 .../excamation/ExclamationWithStormSpout.java   |  14 ++-
 .../excamation/StormExclamationLocal.java       |   7 +-
 .../stormoperators/ExclamationBolt.java         |  23 +++-
 .../util/FiniteStormFileSpout.java              |   2 +
 .../stormcompatibility/util/StormFileSpout.java |  12 +-
 .../ExclamationWithStormBoltITCase.java         |   4 +-
 .../StormExclamationLocalITCase.java            |   4 +-
 .../flink/api/common/ExecutionConfig.java       |   2 +-
 22 files changed, 435 insertions(+), 84 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/9fe285a7/docs/apis/storm_compatibility.md
----------------------------------------------------------------------
diff --git a/docs/apis/storm_compatibility.md b/docs/apis/storm_compatibility.md
index a6083f8..d676db8 100644
--- a/docs/apis/storm_compatibility.md
+++ b/docs/apis/storm_compatibility.md
@@ -169,10 +169,41 @@ The input type is `Tuple1<String>` and `Fields("sentence")` specify that `input.
 
 See [BoltTokenizerWordCountPojo](https://github.com/apache/flink/tree/master/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCountPojo.java) and [BoltTokenizerWordCountWithNames](https://github.com/apache/flink/tree/master/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCountWithNames.java) for examples.  
 
+## Configuring Spouts and Bolts
+
+In Storm, Spouts and Bolts can be configured with a globally distributed `Map` object that is given to `submitTopology(...)` method of `LocalCluster` or `StormSubmitter`.
+This `Map` is provided by the user next to the topology and gets forwarded as a parameter to the calls `Spout.open(...)` and `Bolt.prepare(...)`.
+If a whole topology is executed using `FlinkTopologyBuilder` etc., there is no special attention required &ndash; it works as in regular Storm.
+
+For embedded usage, Flink's configuration mechanism must be used.
+A global configuration can be set in a `StreamExecutionEnvironment` via `.getConfig().setGlobalJobParameters(...)`.
+Flink's regular `Configuration` class can be used to configure Spouts and Bolts.
+However, `Configuration` does not support arbitrary key data types as Storm does (only `String` keys are allowed).
+Thus, Flink additionally provides `StormConfig` class that can be used like a raw `Map` to provide full compatibility to Storm.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+~~~java
+StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+StormConfig config = new StormConfig();
+// set config values
+[...]
+
+// set global Storm configuration
+env.getConfig().setGlobalJobParameters(config);
+
+// assemble program with embedded Spouts and/or Bolts
+[...]
+~~~
+</div>
+</div>
+
 ## Multiple Output Streams
 
 Flink can also handle the declaration of multiple output streams for Spouts and Bolts.
 If a whole topology is executed using `FlinkTopologyBuilder` etc., there is no special attention required &ndash; it works as in regular Storm.
+
 For embedded usage, the output stream will be of data type `SplitStreamType<T>` and must be split by using `DataStream.split(...)` and `SplitDataStream.select(...)`.
 Flink provides the predefined output selector `FlinkStormStreamSelector<T>` for `.split(...)` already.
 Furthermore, the wrapper type `SplitStreamTuple<T>` can be removed using `SplitStreamMapper<T>`.

http://git-wip-us.apache.org/repos/asf/flink/blob/9fe285a7/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/README.md
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/README.md b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/README.md
index aef4847..f42dc24 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/README.md
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/README.md
@@ -3,7 +3,6 @@
 The Storm compatibility layer allows to embed spouts or bolt unmodified within a regular Flink streaming program (`StormSpoutWrapper` and `StormBoltWrapper`). Additionally, a whole Storm topology can be submitted to Flink (see `FlinkTopologyBuilder`, `FlinkLocalCluster`, and `FlinkSubmitter`). Only a few minor changes to the original submitting code are required. The code that builds the topology itself, can be reused unmodified. See `flink-storm-examples` for a simple word-count example.
 
 The following Strom features are not (yet/fully) supported by the compatibility layer right now:
-* the spout/bolt configuration within `open()`/`prepare()` is not yet supported (ie, `Map conf` parameter)
 * topology and tuple meta information (ie, `TopologyContext` not fully supported)
 * no fault-tolerance guarantees (ie, calls to `ack()`/`fail()` and anchoring is ignored)
 * for whole Storm topologies the following is not supported by Flink:

http://git-wip-us.apache.org/repos/asf/flink/blob/9fe285a7/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkClient.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkClient.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkClient.java
index 7078e90..4676102 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkClient.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkClient.java
@@ -32,6 +32,7 @@ import backtype.storm.utils.NimbusClient;
 import backtype.storm.utils.Utils;
 
 import com.google.common.collect.Lists;
+
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.client.program.Client;
 import org.apache.flink.client.program.JobWithJars;
@@ -47,6 +48,7 @@ import org.apache.flink.runtime.jobmanager.JobManager;
 import org.apache.flink.runtime.messages.JobManagerMessages;
 import org.apache.flink.runtime.messages.JobManagerMessages.CancelJob;
 import org.apache.flink.runtime.messages.JobManagerMessages.RunningJobsStatus;
+import org.apache.flink.stormcompatibility.util.StormConfig;
 
 import scala.Some;
 import scala.concurrent.Await;
@@ -66,7 +68,6 @@ import java.util.Map;
 public class FlinkClient {
 
 	/** The client's configuration */
-	@SuppressWarnings("unused")
 	private final Map<?,?> conf;
 	/** The jobmanager's host name */
 	private final String jobManagerHost;
@@ -161,7 +162,7 @@ public class FlinkClient {
 	 */
 	public void submitTopologyWithOpts(final String name, final String uploadedJarLocation, final FlinkTopology
 			topology)
-			throws AlreadyAliveException, InvalidTopologyException {
+					throws AlreadyAliveException, InvalidTopologyException {
 
 		if (this.getTopologyJobId(name) != null) {
 			throw new AlreadyAliveException();
@@ -174,11 +175,15 @@ public class FlinkClient {
 			throw new RuntimeException("Problem with jar file " + uploadedJarFile.getAbsolutePath(), e);
 		}
 
+		/* set storm configuration */
+		if (this.conf != null) {
+			topology.getConfig().setGlobalJobParameters(new StormConfig(this.conf));
+		}
+
 		final JobGraph jobGraph = topology.getStreamGraph().getJobGraph(name);
 		jobGraph.addJar(new Path(uploadedJarFile.getAbsolutePath()));
 
 		final Configuration configuration = jobGraph.getJobConfiguration();
-
 		configuration.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, jobManagerHost);
 		configuration.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, jobManagerPort);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9fe285a7/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkLocalCluster.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkLocalCluster.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkLocalCluster.java
index c139201..9b3fb54 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkLocalCluster.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkLocalCluster.java
@@ -31,6 +31,7 @@ import org.apache.flink.runtime.StreamingMode;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.minicluster.FlinkMiniCluster;
 import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
+import org.apache.flink.stormcompatibility.util.StormConfig;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -45,11 +46,11 @@ public class FlinkLocalCluster {
 
 	/** The log used by this mini cluster */
 	private static final Logger LOG = LoggerFactory.getLogger(FlinkLocalCluster.class);
-	
+
 	/** The flink mini cluster on which to execute the programs */
 	private final FlinkMiniCluster flink;
 
-	
+
 	public FlinkLocalCluster() {
 		this.flink = new LocalFlinkMiniCluster(new Configuration(), true, StreamingMode.STREAMING);
 		this.flink.start();
@@ -59,17 +60,22 @@ public class FlinkLocalCluster {
 		this.flink = Objects.requireNonNull(flink);
 	}
 
-	public void submitTopology(final String topologyName, final Map<?, ?> conf, final FlinkTopology topology)
+	@SuppressWarnings("rawtypes")
+	public void submitTopology(final String topologyName, final Map conf, final FlinkTopology topology)
 			throws Exception {
 		this.submitTopologyWithOpts(topologyName, conf, topology, null);
 	}
 
-	public void submitTopologyWithOpts(final String topologyName, final Map<?, ?> conf, final FlinkTopology topology,
-			final SubmitOptions submitOpts) throws Exception {
-		
+	@SuppressWarnings("rawtypes")
+	public void submitTopologyWithOpts(final String topologyName, final Map conf, final FlinkTopology topology, final SubmitOptions submitOpts) throws Exception {
 		LOG.info("Running Storm topology on FlinkLocalCluster");
+
+		if(conf != null) {
+			topology.getConfig().setGlobalJobParameters(new StormConfig(conf));
+		}
+
 		JobGraph jobGraph = topology.getStreamGraph().getJobGraph(topologyName);
-		flink.submitJobDetached(jobGraph);
+		this.flink.submitJobDetached(jobGraph);
 	}
 
 	public void killTopology(final String topologyName) {
@@ -115,7 +121,7 @@ public class FlinkLocalCluster {
 	// ------------------------------------------------------------------------
 	//  Access to default local cluster
 	// ------------------------------------------------------------------------
-	
+
 	// A different {@link FlinkLocalCluster} to be used for execution of ITCases
 	private static LocalClusterFactory currentFactory = new DefaultLocalClusterFactory();
 
@@ -138,7 +144,7 @@ public class FlinkLocalCluster {
 	public static void initialize(LocalClusterFactory clusterFactory) {
 		currentFactory = Objects.requireNonNull(clusterFactory);
 	}
-	
+
 	// ------------------------------------------------------------------------
 	//  Cluster factory
 	// ------------------------------------------------------------------------
@@ -159,7 +165,7 @@ public class FlinkLocalCluster {
 	 * A factory that instantiates a FlinkLocalCluster.
 	 */
 	public static class DefaultLocalClusterFactory implements LocalClusterFactory {
-		
+
 		@Override
 		public FlinkLocalCluster createLocalCluster() {
 			return new FlinkLocalCluster();

http://git-wip-us.apache.org/repos/asf/flink/blob/9fe285a7/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/util/StormConfig.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/util/StormConfig.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/util/StormConfig.java
new file mode 100644
index 0000000..6726ae8
--- /dev/null
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/util/StormConfig.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.stormcompatibility.util;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.flink.api.common.ExecutionConfig.GlobalJobParameters;
+
+import backtype.storm.Config;
+
+/**
+ * {@link StormConfig} is used to provide a user-defined Storm configuration (ie, a raw {@link Map} or {@link Config}
+ * object) for embedded Spouts and Bolts.
+ */
+@SuppressWarnings("rawtypes")
+public final class StormConfig extends GlobalJobParameters implements Map {
+	private static final long serialVersionUID = 8019519109673698490L;
+
+	/** Contains the actual configuration that is provided to Spouts and Bolts. */
+	private final Map config = new HashMap();
+
+	/**
+	 * Creates an empty configuration.
+	 */
+	public StormConfig() {
+	}
+
+	/**
+	 * Creates an configuration with initial values provided by the given {@code Map}.
+	 * 
+	 * @param config
+	 *            Initial values for this configuration.
+	 */
+	@SuppressWarnings("unchecked")
+	public StormConfig(Map config) {
+		this.config.putAll(config);
+	}
+
+
+	@Override
+	public int size() {
+		return this.config.size();
+	}
+
+	@Override
+	public boolean isEmpty() {
+		return this.config.isEmpty();
+	}
+
+	@Override
+	public boolean containsKey(Object key) {
+		return this.config.containsKey(key);
+	}
+
+	@Override
+	public boolean containsValue(Object value) {
+		return this.config.containsValue(value);
+	}
+
+	@Override
+	public Object get(Object key) {
+		return this.config.get(key);
+	}
+
+	@SuppressWarnings("unchecked")
+	@Override
+	public Object put(Object key, Object value) {
+		return this.config.put(key, value);
+	}
+
+	@Override
+	public Object remove(Object key) {
+		return this.config.remove(key);
+	}
+
+	@SuppressWarnings("unchecked")
+	@Override
+	public void putAll(Map m) {
+		this.config.putAll(m);
+	}
+
+	@Override
+	public void clear() {
+		this.config.clear();
+	}
+
+	@SuppressWarnings("unchecked")
+	@Override
+	public Set<Object> keySet() {
+		return this.config.keySet();
+	}
+
+	@SuppressWarnings("unchecked")
+	@Override
+	public Collection<Object> values() {
+		return this.config.values();
+	}
+
+	@SuppressWarnings("unchecked")
+	@Override
+	public Set<java.util.Map.Entry<Object, Object>> entrySet() {
+		return this.config.entrySet();
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9fe285a7/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/AbstractStormSpoutWrapper.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/AbstractStormSpoutWrapper.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/AbstractStormSpoutWrapper.java
index 62059fe..c531580 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/AbstractStormSpoutWrapper.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/AbstractStormSpoutWrapper.java
@@ -23,9 +23,11 @@ import java.util.HashMap;
 import backtype.storm.spout.SpoutOutputCollector;
 import backtype.storm.topology.IRichSpout;
 
+import org.apache.flink.api.common.ExecutionConfig.GlobalJobParameters;
 import org.apache.flink.api.java.tuple.Tuple0;
 import org.apache.flink.api.java.tuple.Tuple1;
 import org.apache.flink.api.java.tuple.Tuple25;
+import org.apache.flink.stormcompatibility.util.StormConfig;
 import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
 import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
 
@@ -99,7 +101,19 @@ public abstract class AbstractStormSpoutWrapper<OUT> extends RichParallelSourceF
 	@Override
 	public final void run(final SourceContext<OUT> ctx) throws Exception {
 		this.collector = new StormSpoutCollector<OUT>(this.numberOfAttributes, ctx);
-		this.spout.open(null,
+
+		GlobalJobParameters config = super.getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
+		StormConfig stormConfig = new StormConfig();
+
+		if (config != null) {
+			if (config instanceof StormConfig) {
+				stormConfig = (StormConfig) config;
+			} else {
+				stormConfig.putAll(config.toMap());
+			}
+		}
+
+		this.spout.open(stormConfig,
 				StormWrapperSetupHelper
 				.convertToTopologyContext((StreamingRuntimeContext) super.getRuntimeContext(), true),
 				new SpoutOutputCollector(this.collector));

http://git-wip-us.apache.org/repos/asf/flink/blob/9fe285a7/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormBoltWrapper.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormBoltWrapper.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormBoltWrapper.java
index c4ba9ba..6b58b0a 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormBoltWrapper.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormBoltWrapper.java
@@ -24,11 +24,13 @@ import backtype.storm.task.TopologyContext;
 import backtype.storm.topology.IRichBolt;
 import backtype.storm.tuple.Fields;
 
+import org.apache.flink.api.common.ExecutionConfig.GlobalJobParameters;
 import org.apache.flink.api.java.tuple.Tuple0;
 import org.apache.flink.api.java.tuple.Tuple1;
 import org.apache.flink.api.java.tuple.Tuple25;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.stormcompatibility.util.SplitStreamType;
+import org.apache.flink.stormcompatibility.util.StormConfig;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.operators.TimestampedCollector;
@@ -205,7 +207,18 @@ public class StormBoltWrapper<IN, OUT> extends AbstractStreamOperator<OUT> imple
 					this.numberOfAttributes, flinkCollector));
 		}
 
-		this.bolt.prepare(null, topologyContext, stormCollector);
+		GlobalJobParameters config = super.executionConfig.getGlobalJobParameters();
+		StormConfig stormConfig = new StormConfig();
+
+		if (config != null) {
+			if (config instanceof StormConfig) {
+				stormConfig = (StormConfig) config;
+			} else {
+				stormConfig.putAll(config.toMap());
+			}
+		}
+
+		this.bolt.prepare(stormConfig, topologyContext, stormCollector);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/9fe285a7/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/FiniteStormSpoutWrapperTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/FiniteStormSpoutWrapperTest.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/FiniteStormSpoutWrapperTest.java
index 776e65d..381e130 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/FiniteStormSpoutWrapperTest.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/FiniteStormSpoutWrapperTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.stormcompatibility.wrappers;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
 import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
 import org.junit.Test;
@@ -37,14 +38,14 @@ public class FiniteStormSpoutWrapperTest {
 	@SuppressWarnings("unchecked")
 	@Test
 	public void runAndExecuteTest1() throws Exception {
-
-		FiniteStormSpout stormSpout =
-				mock(FiniteStormSpout.class);
+		final FiniteStormSpout stormSpout = mock(FiniteStormSpout.class);
 		when(stormSpout.reachedEnd()).thenReturn(false, false, false, true, false, false, true);
 
-		FiniteStormSpoutWrapper<?> wrapper =
-				new FiniteStormSpoutWrapper<Object>(stormSpout);
-		wrapper.setRuntimeContext(mock(StreamingRuntimeContext.class));
+		final StreamingRuntimeContext taskContext = mock(StreamingRuntimeContext.class);
+		when(taskContext.getExecutionConfig()).thenReturn(new ExecutionConfig());
+
+		final FiniteStormSpoutWrapper<?> wrapper = new FiniteStormSpoutWrapper<Object>(stormSpout);
+		wrapper.setRuntimeContext(taskContext);
 
 		wrapper.run(mock(SourceContext.class));
 		verify(stormSpout, times(3)).nextTuple();
@@ -53,14 +54,14 @@ public class FiniteStormSpoutWrapperTest {
 	@SuppressWarnings("unchecked")
 	@Test
 	public void runAndExecuteTest2() throws Exception {
-
-		FiniteStormSpout stormSpout =
-				mock(FiniteStormSpout.class);
+		final FiniteStormSpout stormSpout = mock(FiniteStormSpout.class);
 		when(stormSpout.reachedEnd()).thenReturn(true, false, true, false, true, false, true);
 
-		FiniteStormSpoutWrapper<?> wrapper =
-				new FiniteStormSpoutWrapper<Object>(stormSpout);
-		wrapper.setRuntimeContext(mock(StreamingRuntimeContext.class));
+		final StreamingRuntimeContext taskContext = mock(StreamingRuntimeContext.class);
+		when(taskContext.getExecutionConfig()).thenReturn(new ExecutionConfig());
+
+		final FiniteStormSpoutWrapper<?> wrapper = new FiniteStormSpoutWrapper<Object>(stormSpout);
+		wrapper.setRuntimeContext(taskContext);
 
 		wrapper.run(mock(SourceContext.class));
 		verify(stormSpout, never()).nextTuple();

http://git-wip-us.apache.org/repos/asf/flink/blob/9fe285a7/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/FiniteTestSpout.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/FiniteTestSpout.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/FiniteTestSpout.java
index 96b5aea..eef35cf 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/FiniteTestSpout.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/FiniteTestSpout.java
@@ -38,8 +38,7 @@ class FiniteTestSpout implements IRichSpout {
 
 	@SuppressWarnings("rawtypes")
 	@Override
-	public void open(final Map conf, final TopologyContext context,
-			@SuppressWarnings("hiding") final SpoutOutputCollector collector) {
+	public void open(final Map conf, final TopologyContext context, final SpoutOutputCollector collector) {
 		this.collector = collector;
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9fe285a7/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormBoltWrapperTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormBoltWrapperTest.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormBoltWrapperTest.java
index db34096..5cfb151 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormBoltWrapperTest.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormBoltWrapperTest.java
@@ -25,11 +25,13 @@ import backtype.storm.tuple.Fields;
 import backtype.storm.tuple.Values;
 import backtype.storm.utils.Utils;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.api.java.tuple.Tuple1;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.stormcompatibility.util.AbstractTest;
 import org.apache.flink.stormcompatibility.util.SplitStreamType;
+import org.apache.flink.stormcompatibility.util.StormConfig;
 import org.apache.flink.streaming.api.operators.Output;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer;
@@ -44,7 +46,14 @@ import org.powermock.modules.junit4.PowerMockRunner;
 import java.util.HashSet;
 import java.util.Map;
 
-import static org.mockito.Mockito.*;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Matchers.isNull;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.same;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 @RunWith(PowerMockRunner.class)
 @PrepareForTest({StreamRecordSerializer.class, StormWrapperSetupHelper.class})
@@ -105,7 +114,7 @@ public class StormBoltWrapperTest extends AbstractTest {
 			flinkTuple = Tuple.getTupleClass(numberOfAttributes).newInstance();
 		}
 
-		String[] schema;
+		final String[] schema;
 		if (numberOfAttributes == -1) {
 			schema = new String[1];
 		} else {
@@ -123,6 +132,7 @@ public class StormBoltWrapperTest extends AbstractTest {
 		}
 
 		final StreamingRuntimeContext taskContext = mock(StreamingRuntimeContext.class);
+		when(taskContext.getExecutionConfig()).thenReturn(mock(ExecutionConfig.class));
 
 		final IRichBolt bolt = mock(IRichBolt.class);
 
@@ -132,7 +142,7 @@ public class StormBoltWrapperTest extends AbstractTest {
 
 		final StormBoltWrapper wrapper = new StormBoltWrapper(bolt, (Fields) null);
 		wrapper.setup(mock(Output.class), taskContext);
-		wrapper.open(new Configuration());
+		wrapper.open(null);
 
 		wrapper.processElement(record);
 		if (numberOfAttributes == -1) {
@@ -152,10 +162,12 @@ public class StormBoltWrapperTest extends AbstractTest {
 		when(record.getValue()).thenReturn(2).thenReturn(3);
 
 		final StreamingRuntimeContext taskContext = mock(StreamingRuntimeContext.class);
-		Output output = mock(Output.class);
+		when(taskContext.getExecutionConfig()).thenReturn(mock(ExecutionConfig.class));
 
-		TestBolt bolt = new TestBolt();
-		HashSet<String> raw = new HashSet<String>();
+		final Output output = mock(Output.class);
+
+		final TestBolt bolt = new TestBolt();
+		final HashSet<String> raw = new HashSet<String>();
 		if (rawOutType1) {
 			raw.add("stream1");
 		}
@@ -165,9 +177,9 @@ public class StormBoltWrapperTest extends AbstractTest {
 
 		final StormBoltWrapper wrapper = new StormBoltWrapper(bolt, (Fields) null, raw);
 		wrapper.setup(output, taskContext);
-		wrapper.open(new Configuration());
+		wrapper.open(null);
 
-		SplitStreamType splitRecord = new SplitStreamType<Integer>();
+		final SplitStreamType splitRecord = new SplitStreamType<Integer>();
 		if (rawOutType1) {
 			splitRecord.streamId = "stream1";
 			splitRecord.value = 2;
@@ -192,30 +204,70 @@ public class StormBoltWrapperTest extends AbstractTest {
 	@SuppressWarnings("unchecked")
 	@Test
 	public void testOpen() throws Exception {
-		final IRichBolt bolt = mock(IRichBolt.class);
+		final StormConfig stormConfig = new StormConfig();
+		final Configuration flinkConfig = new Configuration();
+
+		final ExecutionConfig taskConfig = mock(ExecutionConfig.class);
+		when(taskConfig.getGlobalJobParameters()).thenReturn(null).thenReturn(stormConfig)
+		.thenReturn(flinkConfig);
+
+		final StreamingRuntimeContext taskContext = mock(StreamingRuntimeContext.class);
+		when(taskContext.getExecutionConfig()).thenReturn(taskConfig);
 
 		final StormOutputFieldsDeclarer declarer = new StormOutputFieldsDeclarer();
 		declarer.declare(new Fields("dummy"));
 		PowerMockito.whenNew(StormOutputFieldsDeclarer.class).withNoArguments().thenReturn(declarer);
 
+		final IRichBolt bolt = mock(IRichBolt.class);
 		final StormBoltWrapper<Object, Object> wrapper = new StormBoltWrapper<Object, Object>(bolt);
-		wrapper.setup(mock(Output.class), mock(StreamingRuntimeContext.class));
-
-		wrapper.open(mock(Configuration.class));
+		wrapper.setup(mock(Output.class), taskContext);
 
+		// test without configuration
+		wrapper.open(null);
 		verify(bolt).prepare(any(Map.class), any(TopologyContext.class), any(OutputCollector.class));
+
+		// test with StormConfig
+		wrapper.open(null);
+		verify(bolt).prepare(same(stormConfig), any(TopologyContext.class),
+				any(OutputCollector.class));
+
+		// test with Configuration
+		wrapper.open(null);
+		verify(bolt, times(3)).prepare(eq(flinkConfig.toMap()), any(TopologyContext.class),
+				any(OutputCollector.class));
 	}
 
 	@SuppressWarnings("unchecked")
 	@Test
 	public void testOpenSink() throws Exception {
+		final StormConfig stormConfig = new StormConfig();
+		final Configuration flinkConfig = new Configuration();
+
+		final ExecutionConfig taskConfig = mock(ExecutionConfig.class);
+		when(taskConfig.getGlobalJobParameters()).thenReturn(null).thenReturn(stormConfig)
+		.thenReturn(flinkConfig);
+
+		final StreamingRuntimeContext taskContext = mock(StreamingRuntimeContext.class);
+		when(taskContext.getExecutionConfig()).thenReturn(taskConfig);
+
 		final IRichBolt bolt = mock(IRichBolt.class);
 		final StormBoltWrapper<Object, Object> wrapper = new StormBoltWrapper<Object, Object>(bolt);
-		wrapper.setup(mock(Output.class), mock(StreamingRuntimeContext.class));
+		wrapper.setup(mock(Output.class), taskContext);
 
-		wrapper.open(mock(Configuration.class));
+		// test without configuration
+		wrapper.open(null);
+		verify(bolt).prepare(any(Map.class), any(TopologyContext.class),
+				isNull(OutputCollector.class));
 
-		verify(bolt).prepare(any(Map.class), any(TopologyContext.class), isNull(OutputCollector.class));
+		// test with StormConfig
+		wrapper.open(null);
+		verify(bolt).prepare(same(stormConfig), any(TopologyContext.class),
+				isNull(OutputCollector.class));
+
+		// test with Configuration
+		wrapper.open(null);
+		verify(bolt, times(3)).prepare(eq(flinkConfig.toMap()), any(TopologyContext.class),
+				isNull(OutputCollector.class));
 	}
 
 	@SuppressWarnings("unchecked")
@@ -230,12 +282,11 @@ public class StormBoltWrapperTest extends AbstractTest {
 		final StormBoltWrapper<Object, Object> wrapper = new StormBoltWrapper<Object, Object>(bolt);
 
 		final StreamingRuntimeContext taskContext = mock(StreamingRuntimeContext.class);
-		// when(taskContext.getOutputCollector()).thenReturn(mock(Collector.class));
 		wrapper.setup(mock(Output.class), taskContext);
 
 		wrapper.close();
 		wrapper.dispose();
-		
+
 		verify(bolt).cleanup();
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9fe285a7/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormFiniteSpoutWrapperTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormFiniteSpoutWrapperTest.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormFiniteSpoutWrapperTest.java
index c890ab1..a4eea7e 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormFiniteSpoutWrapperTest.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormFiniteSpoutWrapperTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.stormcompatibility.wrappers;
 import backtype.storm.topology.IRichSpout;
 import backtype.storm.tuple.Fields;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.java.tuple.Tuple1;
 import org.apache.flink.stormcompatibility.util.AbstractTest;
 import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
@@ -36,6 +37,7 @@ import java.util.LinkedList;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 @RunWith(PowerMockRunner.class)
 @PrepareForTest(StormWrapperSetupHelper.class)
@@ -48,10 +50,13 @@ public class StormFiniteSpoutWrapperTest extends AbstractTest {
 		declarer.declare(new Fields("dummy"));
 		PowerMockito.whenNew(StormOutputFieldsDeclarer.class).withNoArguments().thenReturn(declarer);
 
+		final StreamingRuntimeContext taskContext = mock(StreamingRuntimeContext.class);
+		when(taskContext.getExecutionConfig()).thenReturn(new ExecutionConfig());
+
 		final IRichSpout spout = mock(IRichSpout.class);
 		final int numberOfCalls = this.r.nextInt(50);
 		final StormFiniteSpoutWrapper<?> spoutWrapper = new StormFiniteSpoutWrapper<Object>(spout, numberOfCalls);
-		spoutWrapper.setRuntimeContext(mock(StreamingRuntimeContext.class));
+		spoutWrapper.setRuntimeContext(taskContext);
 
 		spoutWrapper.run(mock(SourceContext.class));
 		verify(spout, times(numberOfCalls)).nextTuple();
@@ -66,10 +71,13 @@ public class StormFiniteSpoutWrapperTest extends AbstractTest {
 			expectedResult.add(new Tuple1<Integer>(new Integer(i)));
 		}
 
+		final StreamingRuntimeContext taskContext = mock(StreamingRuntimeContext.class);
+		when(taskContext.getExecutionConfig()).thenReturn(new ExecutionConfig());
+
 		final IRichSpout spout = new FiniteTestSpout(numberOfCalls);
 		final StormFiniteSpoutWrapper<Tuple1<Integer>> spoutWrapper = new StormFiniteSpoutWrapper<Tuple1<Integer>>(
 				spout);
-		spoutWrapper.setRuntimeContext(mock(StreamingRuntimeContext.class));
+		spoutWrapper.setRuntimeContext(taskContext);
 
 		final TestContext collector = new TestContext();
 		spoutWrapper.run(collector);
@@ -84,10 +92,13 @@ public class StormFiniteSpoutWrapperTest extends AbstractTest {
 		final LinkedList<Tuple1<Integer>> expectedResult = new LinkedList<Tuple1<Integer>>();
 		expectedResult.add(new Tuple1<Integer>(new Integer(numberOfCalls - 1)));
 
+		StreamingRuntimeContext taskContext = mock(StreamingRuntimeContext.class);
+		when(taskContext.getExecutionConfig()).thenReturn(new ExecutionConfig());
+
 		final IRichSpout spout = new FiniteTestSpout(numberOfCalls);
 		final StormFiniteSpoutWrapper<Tuple1<Integer>> spoutWrapper = new StormFiniteSpoutWrapper<Tuple1<Integer>>(
 				spout);
-		spoutWrapper.setRuntimeContext(mock(StreamingRuntimeContext.class));
+		spoutWrapper.setRuntimeContext(taskContext);
 
 		spoutWrapper.cancel();
 		final TestContext collector = new TestContext();
@@ -96,15 +107,4 @@ public class StormFiniteSpoutWrapperTest extends AbstractTest {
 		Assert.assertEquals(expectedResult, collector.result);
 	}
 
-	@Test
-	public void testClose() throws Exception {
-		final IRichSpout spout = mock(IRichSpout.class);
-		final StormFiniteSpoutWrapper<Tuple1<Integer>> spoutWrapper = new StormFiniteSpoutWrapper<Tuple1<Integer>>(
-				spout);
-
-		spoutWrapper.close();
-
-		verify(spout).close();
-	}
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9fe285a7/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormSpoutWrapperTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormSpoutWrapperTest.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormSpoutWrapperTest.java
index 6d2f196..04dc48d 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormSpoutWrapperTest.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormSpoutWrapperTest.java
@@ -17,9 +17,16 @@
 
 package org.apache.flink.stormcompatibility.wrappers;
 
+import backtype.storm.spout.SpoutOutputCollector;
+import backtype.storm.task.TopologyContext;
 import backtype.storm.topology.IRichSpout;
+
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.stormcompatibility.util.AbstractTest;
+import org.apache.flink.stormcompatibility.util.StormConfig;
+import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
 import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
 import org.junit.Assert;
 import org.junit.Test;
@@ -28,21 +35,64 @@ import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
 
 import java.util.LinkedList;
+import java.util.Map;
 
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Matchers.same;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 @RunWith(PowerMockRunner.class)
 @PrepareForTest(StormWrapperSetupHelper.class)
 public class StormSpoutWrapperTest extends AbstractTest {
 
+	@SuppressWarnings({ "rawtypes", "unchecked" })
+	@Test
+	public void testRunPrepare() throws Exception {
+		final StormConfig stormConfig = new StormConfig();
+		final Configuration flinkConfig = new Configuration();
+
+		final ExecutionConfig taskConfig = mock(ExecutionConfig.class);
+		when(taskConfig.getGlobalJobParameters()).thenReturn(null).thenReturn(stormConfig)
+				.thenReturn(flinkConfig);
+
+		final StreamingRuntimeContext taskContext = mock(StreamingRuntimeContext.class);
+		when(taskContext.getExecutionConfig()).thenReturn(taskConfig);
+
+		final IRichSpout spout = mock(IRichSpout.class);
+		final StormSpoutWrapper spoutWrapper = new StormSpoutWrapper(spout);
+		spoutWrapper.setRuntimeContext(taskContext);
+		spoutWrapper.isRunning = false;
+
+		// test without configuration
+		spoutWrapper.run(mock(SourceContext.class));
+		verify(spout).open(any(Map.class), any(TopologyContext.class),
+				any(SpoutOutputCollector.class));
+
+		// test with StormConfig
+		spoutWrapper.run(mock(SourceContext.class));
+		verify(spout).open(same(stormConfig), any(TopologyContext.class),
+				any(SpoutOutputCollector.class));
+
+		// test with Configuration
+		spoutWrapper.run(mock(SourceContext.class));
+		verify(spout, times(3)).open(eq(flinkConfig.toMap()), any(TopologyContext.class),
+				any(SpoutOutputCollector.class));
+	}
+
 	@Test
 	public void testRunExecuteCancelInfinite() throws Exception {
 		final int numberOfCalls = 5 + this.r.nextInt(5);
 
+		final StreamingRuntimeContext taskContext = mock(StreamingRuntimeContext.class);
+		when(taskContext.getExecutionConfig()).thenReturn(new ExecutionConfig());
+
 		final IRichSpout spout = new FiniteTestSpout(numberOfCalls);
 		final StormSpoutWrapper<Tuple1<Integer>> spoutWrapper = new StormSpoutWrapper<Tuple1<Integer>>(spout);
-		spoutWrapper.setRuntimeContext(mock(StreamingRuntimeContext.class));
+		spoutWrapper.setRuntimeContext(taskContext);
 
 		spoutWrapper.cancel();
 		final TestContext collector = new TestContext();

http://git-wip-us.apache.org/repos/asf/flink/blob/9fe285a7/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/ExclamationTopology.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/ExclamationTopology.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/ExclamationTopology.java
index b7c98a8..d8d620b 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/ExclamationTopology.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/ExclamationTopology.java
@@ -93,19 +93,25 @@ public class ExclamationTopology {
 	private static boolean fileInputOutput = false;
 	private static String textPath;
 	private static String outputPath;
+	private static int exclamationNum = 3;
+
+	static int getExclamation() {
+		return exclamationNum;
+	}
 
 	static boolean parseParameters(final String[] args) {
 
 		if (args.length > 0) {
 			// parse input arguments
 			fileInputOutput = true;
-			if (args.length == 2) {
+			if (args.length == 3) {
 				textPath = args[0];
 				outputPath = args[1];
+				exclamationNum = Integer.parseInt(args[2]);
 			} else {
 				System.err.println(
 						"Usage: StormExclamation[Local|RemoteByClient|RemoteBySubmitter] <text " +
-								"path> <result path>");
+						"path> <result path>  <number of exclamation marks>");
 				return false;
 			}
 		} else {
@@ -113,7 +119,7 @@ public class ExclamationTopology {
 			System.out.println("  Provide parameters to read input data from a file");
 			System.out.println(
 					"  Usage: StormExclamation[Local|RemoteByClient|RemoteBySubmitter] <text path>" +
-							" <result path>");
+					" <result path> <number of exclamation marks>");
 		}
 
 		return true;

http://git-wip-us.apache.org/repos/asf/flink/blob/9fe285a7/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/ExclamationWithStormBolt.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/ExclamationWithStormBolt.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/ExclamationWithStormBolt.java
index ee5d9f9..c8af3a6 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/ExclamationWithStormBolt.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/ExclamationWithStormBolt.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.examples.java.wordcount.util.WordCountData;
 import org.apache.flink.stormcompatibility.excamation.stormoperators.ExclamationBolt;
+import org.apache.flink.stormcompatibility.util.StormConfig;
 import org.apache.flink.stormcompatibility.wrappers.StormBoltWrapper;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -36,7 +37,7 @@ import backtype.storm.utils.Utils;
  * The input is a plain text file with lines separated by newline characters.
  * <p/>
  * <p/>
- * Usage: <code>StormExclamationWithStormBolt &lt;text path&gt; &lt;result path&gt;</code><br/>
+ * Usage: <code>StormExclamationWithStormBolt &lt;text path&gt; &lt;result path&gt; &lt;number of exclamation marks&gt;</code><br/>
  * If no parameters are provided, the program is run with default data from
  * {@link WordCountData}.
  * <p/>
@@ -61,15 +62,20 @@ public class ExclamationWithStormBolt {
 		// set up the execution environment
 		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 
+		// set Storm configuration
+		StormConfig config = new StormConfig();
+		config.put(ExclamationBolt.EXCLAMATION_COUNT, new Integer(exclamationNum));
+		env.getConfig().setGlobalJobParameters(config);
+
 		// get input data
 		final DataStream<String> text = getTextDataStream(env);
 
 		final DataStream<String> exclaimed = text
 				.transform("StormBoltTokenizer",
 						TypeExtractor.getForObject(""),
-				new StormBoltWrapper<String, String>(new ExclamationBolt(),
-						new String[] { Utils.DEFAULT_STREAM_ID }))
-						.map(new ExclamationMap());
+						new StormBoltWrapper<String, String>(new ExclamationBolt(),
+								new String[] { Utils.DEFAULT_STREAM_ID }))
+				.map(new ExclamationMap());
 
 		// emit result
 		if (fileOutput) {
@@ -87,6 +93,7 @@ public class ExclamationWithStormBolt {
 	// *************************************************************************
 
 	private static class ExclamationMap implements MapFunction<String, String> {
+		private static final long serialVersionUID = 4614754344067170619L;
 
 		@Override
 		public String map(String value) throws Exception {
@@ -101,23 +108,25 @@ public class ExclamationWithStormBolt {
 	private static boolean fileOutput = false;
 	private static String textPath;
 	private static String outputPath;
+	private static int exclamationNum = 3;
 
 	private static boolean parseParameters(final String[] args) {
 
 		if (args.length > 0) {
 			// parse input arguments
 			fileOutput = true;
-			if (args.length == 2) {
+			if (args.length == 3) {
 				textPath = args[0];
 				outputPath = args[1];
+				exclamationNum = Integer.parseInt(args[2]);
 			} else {
-				System.err.println("Usage: ExclamationWithStormBolt <text path> <result path>");
+				System.err.println("Usage: ExclamationWithStormBolt <text path> <result path> <number of exclamation marks>");
 				return false;
 			}
 		} else {
 			System.out.println("Executing ExclamationWithStormBolt example with built-in default data");
 			System.out.println("  Provide parameters to read input data from a file");
-			System.out.println("  Usage: ExclamationWithStormBolt <text path> <result path>");
+			System.out.println("  Usage: ExclamationWithStormBolt <text path> <result path> <number of exclamation marks>");
 		}
 		return true;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/9fe285a7/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/ExclamationWithStormSpout.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/ExclamationWithStormSpout.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/ExclamationWithStormSpout.java
index 962a318..99c816d 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/ExclamationWithStormSpout.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/ExclamationWithStormSpout.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.examples.java.wordcount.util.WordCountData;
 import org.apache.flink.stormcompatibility.util.FiniteStormFileSpout;
 import org.apache.flink.stormcompatibility.util.FiniteStormInMemorySpout;
+import org.apache.flink.stormcompatibility.util.StormConfig;
 import org.apache.flink.stormcompatibility.wrappers.FiniteStormSpoutWrapper;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -125,11 +126,16 @@ public class ExclamationWithStormSpout {
 
 	private static DataStream<String> getTextDataStream(final StreamExecutionEnvironment env) {
 		if (fileOutput) {
-			// read the text file from given input path
 			final String[] tokens = textPath.split(":");
-			final String localFile = tokens[tokens.length - 1];
+			final String inputFile = tokens[tokens.length - 1];
+
+			// set Storm configuration
+			StormConfig config = new StormConfig();
+			config.put(FiniteStormFileSpout.INPUT_FILE_PATH, inputFile);
+			env.getConfig().setGlobalJobParameters(config);
+
 			return env.addSource(
-					new FiniteStormSpoutWrapper<String>(new FiniteStormFileSpout(localFile),
+					new FiniteStormSpoutWrapper<String>(new FiniteStormFileSpout(),
 							new String[] { Utils.DEFAULT_STREAM_ID }),
 							TypeExtractor.getForClass(String.class)).setParallelism(1);
 		}
@@ -137,7 +143,7 @@ public class ExclamationWithStormSpout {
 		return env.addSource(
 				new FiniteStormSpoutWrapper<String>(new FiniteStormInMemorySpout(
 						WordCountData.WORDS), new String[] { Utils.DEFAULT_STREAM_ID }),
-				TypeExtractor.getForClass(String.class)).setParallelism(1);
+						TypeExtractor.getForClass(String.class)).setParallelism(1);
 
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9fe285a7/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/StormExclamationLocal.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/StormExclamationLocal.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/StormExclamationLocal.java
index bd1220c..c87b3a5 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/StormExclamationLocal.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/StormExclamationLocal.java
@@ -17,9 +17,12 @@
 
 package org.apache.flink.stormcompatibility.excamation;
 
+import backtype.storm.Config;
 import backtype.storm.utils.Utils;
+
 import org.apache.flink.stormcompatibility.api.FlinkLocalCluster;
 import org.apache.flink.stormcompatibility.api.FlinkTopologyBuilder;
+import org.apache.flink.stormcompatibility.excamation.stormoperators.ExclamationBolt;
 
 /**
  * Implements the "Exclamation" program that attaches five exclamation mark to every line of a text
@@ -61,8 +64,10 @@ public class StormExclamationLocal {
 		final FlinkTopologyBuilder builder = ExclamationTopology.buildTopology();
 
 		// execute program locally
+		Config conf = new Config();
+		conf.put(ExclamationBolt.EXCLAMATION_COUNT, ExclamationTopology.getExclamation());
 		final FlinkLocalCluster cluster = FlinkLocalCluster.getLocalCluster();
-		cluster.submitTopology(topologyId, null, builder.createTopology());
+		cluster.submitTopology(topologyId, conf, builder.createTopology());
 
 		Utils.sleep(10 * 1000);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/9fe285a7/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/stormoperators/ExclamationBolt.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/stormoperators/ExclamationBolt.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/stormoperators/ExclamationBolt.java
index 14232b7..2709eff 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/stormoperators/ExclamationBolt.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/stormoperators/ExclamationBolt.java
@@ -29,12 +29,29 @@ import backtype.storm.tuple.Values;
 import java.util.Map;
 
 public class ExclamationBolt implements IRichBolt {
-	OutputCollector _collector;
+	private final static long serialVersionUID = -6364882114201311380L;
+
+	public final static String EXCLAMATION_COUNT = "exclamation.count";
+
+	private OutputCollector collector;
+	private String exclamation;
 
 	@SuppressWarnings("rawtypes")
 	@Override
 	public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
-		_collector = collector;
+		this.collector = collector;
+
+		Object count = conf.get(EXCLAMATION_COUNT);
+		if (count != null) {
+			int exclamationNum = (Integer) count;
+			StringBuilder builder = new StringBuilder();
+			for (int index = 0; index < exclamationNum; ++index) {
+				builder.append('!');
+			}
+			this.exclamation = builder.toString();
+		} else {
+			this.exclamation = "!";
+		}
 	}
 
 	@Override
@@ -43,7 +60,7 @@ public class ExclamationBolt implements IRichBolt {
 
 	@Override
 	public void execute(Tuple tuple) {
-		_collector.emit(tuple, new Values(tuple.getString(0) + "!!!"));
+		collector.emit(tuple, new Values(tuple.getString(0) + this.exclamation));
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/9fe285a7/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/FiniteStormFileSpout.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/FiniteStormFileSpout.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/FiniteStormFileSpout.java
index dddbb4b..64b3e28 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/FiniteStormFileSpout.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/FiniteStormFileSpout.java
@@ -35,6 +35,8 @@ public class FiniteStormFileSpout extends StormFileSpout implements FiniteStormS
 	private String line;
 	private boolean newLineRead;
 
+	public FiniteStormFileSpout() {}
+
 	public FiniteStormFileSpout(String path) {
 		super(path);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/9fe285a7/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/StormFileSpout.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/StormFileSpout.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/StormFileSpout.java
index 7d89c75..0611e37 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/StormFileSpout.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/StormFileSpout.java
@@ -33,9 +33,13 @@ import java.util.Map;
 public class StormFileSpout extends AbstractStormSpout {
 	private static final long serialVersionUID = -6996907090003590436L;
 
-	protected final String path;
+	public final static String INPUT_FILE_PATH = "input.path";
+
+	protected String path = null;
 	protected BufferedReader reader;
 
+	public StormFileSpout() {}
+
 	public StormFileSpout(final String path) {
 		this.path = path;
 	}
@@ -44,6 +48,12 @@ public class StormFileSpout extends AbstractStormSpout {
 	@Override
 	public void open(final Map conf, final TopologyContext context, final SpoutOutputCollector collector) {
 		super.open(conf, context, collector);
+
+		Object configuredPath = conf.get(INPUT_FILE_PATH);
+		if(configuredPath != null) {
+			this.path = (String)configuredPath;
+		}
+
 		try {
 			this.reader = new BufferedReader(new FileReader(this.path));
 		} catch (final FileNotFoundException e) {

http://git-wip-us.apache.org/repos/asf/flink/blob/9fe285a7/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/exclamation/ExclamationWithStormBoltITCase.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/exclamation/ExclamationWithStormBoltITCase.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/exclamation/ExclamationWithStormBoltITCase.java
index f47a58f..a858f36 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/exclamation/ExclamationWithStormBoltITCase.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/exclamation/ExclamationWithStormBoltITCase.java
@@ -27,11 +27,13 @@ public class ExclamationWithStormBoltITCase extends StormTestBase {
 
 	protected String textPath;
 	protected String resultPath;
+	protected String exclamationNum;
 
 	@Override
 	protected void preSubmit() throws Exception {
 		this.textPath = this.createTempFile("text.txt", WordCountData.TEXT);
 		this.resultPath = this.getTempDirPath("result");
+		this.exclamationNum = "3";
 	}
 
 	@Override
@@ -41,7 +43,7 @@ public class ExclamationWithStormBoltITCase extends StormTestBase {
 
 	@Override
 	protected void testProgram() throws Exception {
-		ExclamationWithStormBolt.main(new String[]{this.textPath, this.resultPath});
+		ExclamationWithStormBolt.main(new String[]{this.textPath, this.resultPath, this.exclamationNum});
 	}
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9fe285a7/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/exclamation/StormExclamationLocalITCase.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/exclamation/StormExclamationLocalITCase.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/exclamation/StormExclamationLocalITCase.java
index 6cba39a..a19f3af 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/exclamation/StormExclamationLocalITCase.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/exclamation/StormExclamationLocalITCase.java
@@ -27,11 +27,13 @@ public class StormExclamationLocalITCase extends StormTestBase {
 
 	protected String textPath;
 	protected String resultPath;
+	protected String exclamationNum;
 
 	@Override
 	protected void preSubmit() throws Exception {
 		this.textPath = this.createTempFile("text.txt", WordCountData.TEXT);
 		this.resultPath = this.getTempDirPath("result");
+		this.exclamationNum = "3";
 	}
 
 	@Override
@@ -41,6 +43,6 @@ public class StormExclamationLocalITCase extends StormTestBase {
 
 	@Override
 	protected void testProgram() throws Exception {
-		StormExclamationLocal.main(new String[]{this.textPath, this.resultPath});
+		StormExclamationLocal.main(new String[]{this.textPath, this.resultPath, this.exclamationNum});
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9fe285a7/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
index 28f3b92..df0248a 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
@@ -647,7 +647,7 @@ public class ExecutionConfig implements Serializable {
 	 * Abstract class for a custom user configuration object registered at the execution config.
 	 *
 	 * This user config is accessible at runtime through
-	 * getRuntimeContext().getExecutionConfig().getUserConfig()
+	 * getRuntimeContext().getExecutionConfig().GlobalJobParameters()
 	 */
 	public static class GlobalJobParameters implements Serializable {
 		private static final long serialVersionUID = 1L;


Mime
View raw message