flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject flink git commit: [FLINK-2553] Example Jars not build correctly - reworked package structure of utils and wordcount.stormoperators package - reword class hierarchy of *FileSpout and *InMemorySpout - fixed pom.xml to assembly jars correctly - simp
Date Thu, 27 Aug 2015 13:17:37 GMT
Repository: flink
Updated Branches:
  refs/heads/master 541a06cfc -> 824785e26


[FLINK-2553] Example Jars not build correctly
  - reworked package structure of utils and wordcount.stormoperators package
  - reword class hierarchy of *FileSpout and *InMemorySpout
  - fixed pom.xml to assembly jars correctly
  - simplified example jar file names
  - replace maven-assembly-plugin with maven-shade-plugin (removed assembly.xml file)
  - extended README and documenation to building and using correct jars

Additional minor changes:
 - comment typo in FlinkSubmitter
 - removed version number in hardcoded jar file name

This closes #1037


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/824785e2
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/824785e2
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/824785e2

Branch: refs/heads/master
Commit: 824785e2689b6b56e923490d74d52dd29f5fe412
Parents: 541a06c
Author: mjsax <mjsax@informatik.hu-berlin.de>
Authored: Thu Aug 20 16:07:09 2015 +0200
Committer: Stephan Ewen <sewen@apache.org>
Committed: Thu Aug 27 13:55:40 2015 +0200

----------------------------------------------------------------------
 docs/apis/storm_compatibility.md                |   7 ++
 .../stormcompatibility/api/FlinkSubmitter.java  |   9 +-
 .../wrappers/StormOutputFieldsDeclarerTest.java |   2 +-
 .../README.md                                   |   3 +-
 .../flink-storm-compatibility-examples/pom.xml  | 118 +++++++++++++++----
 .../src/assembly/word-count-storm.xml           |  72 -----------
 .../singlejoin/stormoperators/GenderSpout.java  |   2 +
 .../util/FiniteStormFileSpout.java              |  50 +++-----
 .../util/FiniteStormInMemorySpout.java          |  14 +--
 .../stormcompatibility/util/StormFileSpout.java |   4 +-
 .../util/StormInMemorySpout.java                |  15 ++-
 .../util/StormWordCountFileSpout.java           |  38 ------
 .../util/StormWordCountInMemorySpout.java       |  39 ------
 .../wordcount/SpoutSourceWordCount.java         |   8 +-
 .../wordcount/StormWordCountRemoteByClient.java |   2 +-
 .../StormWordCountRemoteBySubmitter.java        |   2 +-
 .../wordcount/WordCountTopology.java            |   6 +-
 .../stormoperators/StormWordCountFileSpout.java |  39 ++++++
 .../StormWordCountInMemorySpout.java            |  40 +++++++
 19 files changed, 227 insertions(+), 243 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/824785e2/docs/apis/storm_compatibility.md
----------------------------------------------------------------------
diff --git a/docs/apis/storm_compatibility.md b/docs/apis/storm_compatibility.md
index 3a0c025..a6083f8 100644
--- a/docs/apis/storm_compatibility.md
+++ b/docs/apis/storm_compatibility.md
@@ -258,4 +258,11 @@ DataStream<String> rawInput = env.addSource(
 
 You can find more examples in Maven module `flink-storm-compatibilty-examples`.
 For the different versions of WordCount, see [README.md](https://github.com/apache/flink/tree/master/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/README.md).
+To run the examples, you need to assemble a correct jar file.
+`flink-storm-compatibility-examples-0.10-SNAPSHOT.jar` is **no** valid jar file for job execution (it is only a standard maven artifact).
 
+There are example jars for embedded Spout and Bolt, namely `WordCount-SpoutSource.jar` and `WordCount-BoltTokenizer.jar`, respectively.
+Compare `pom.xml` to see how both jars are built.
+Furthermore, there is one example for whole Storm topologies (`WordCount-StormTopology.jar`).
+
+You can run each of those examples via `bin/flink run <jarname>.jar`. The correct entry point class is contained in each jar's manifest file.

http://git-wip-us.apache.org/repos/asf/flink/blob/824785e2/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkSubmitter.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkSubmitter.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkSubmitter.java
index bcc2afb..5f3f31e 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkSubmitter.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkSubmitter.java
@@ -14,7 +14,6 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.flink.stormcompatibility.api;
 
 import backtype.storm.Config;
@@ -60,7 +59,7 @@ public class FlinkSubmitter {
 	 */
 	public static void submitTopology(final String name, final Map<?, ?> stormConf, final FlinkTopology topology,
 			final SubmitOptions opts)
-			throws AlreadyAliveException, InvalidTopologyException {
+					throws AlreadyAliveException, InvalidTopologyException {
 		submitTopology(name, stormConf, topology);
 	}
 
@@ -109,7 +108,7 @@ public class FlinkSubmitter {
 				try {
 					for (final File file : ((ContextEnvironment) ExecutionEnvironment.getExecutionEnvironment())
 							.getJars()) {
-						// TODO verify that there is onnly one jar
+						// TODO verify that there is only one jar
 						localJar = file.getAbsolutePath();
 					}
 				} catch (final ClassCastException e) {
@@ -147,7 +146,7 @@ public class FlinkSubmitter {
 	 */
 	public static void submitTopologyWithProgressBar(final String name, final Map<?, ?> stormConf,
 			final FlinkTopology topology)
-			throws AlreadyAliveException, InvalidTopologyException {
+					throws AlreadyAliveException, InvalidTopologyException {
 		submitTopology(name, stormConf, topology);
 	}
 
@@ -180,7 +179,7 @@ public class FlinkSubmitter {
 		if (localJar == null) {
 			throw new RuntimeException(
 					"Must submit topologies using the 'storm' client script so that StormSubmitter knows which jar " +
-							"to upload");
+					"to upload");
 		}
 
 		return localJar;

http://git-wip-us.apache.org/repos/asf/flink/blob/824785e2/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormOutputFieldsDeclarerTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormOutputFieldsDeclarerTest.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormOutputFieldsDeclarerTest.java
index a28b6e5..561939f 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormOutputFieldsDeclarerTest.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormOutputFieldsDeclarerTest.java
@@ -39,7 +39,7 @@ public class StormOutputFieldsDeclarerTest extends AbstractTest {
 				.intValue());
 
 		final String sid = "streamId";
-		numberOfAttributes = 0 + this.r.nextInt(26);
+		numberOfAttributes = this.r.nextInt(26);
 		declarer.declareStream(sid, createSchema(numberOfAttributes));
 		Assert.assertEquals(2, declarer.outputSchemas.size());
 		Assert.assertEquals(numberOfAttributes, declarer.outputSchemas.get(sid).intValue());

http://git-wip-us.apache.org/repos/asf/flink/blob/824785e2/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/README.md
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/README.md b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/README.md
index c5e501b..6290df2 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/README.md
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/README.md
@@ -15,5 +15,6 @@ This module contains multiple versions of a simple word-count-example to illustr
       * `StormWordCountRemoteBySubmitter` submits the topology to a remote Flink cluster (simliar to the usage of `StormSubmitter` in Storm)
 
 Additionally, this module package the three examples word-count programs as jar files to be submitted to a Flink cluster via `bin/flink run example.jar`.
+(Valid jars are `WordCount-SpoutSource.jar`, `WordCount-BoltTokenizer.jar`, and `WordCount-StormTopology.jar`)
 
-The package `org.apache.flink.stormcompatiblitly.stormoperators` contain original Storm spouts and bolts that can be used unmodified within Storm or Flink.  
+The package `org.apache.flink.stormcompatiblitly.wordcount.stormoperators` contain original Storm spouts and bolts that can be used unmodified within Storm or Flink.

http://git-wip-us.apache.org/repos/asf/flink/blob/824785e2/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/pom.xml
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/pom.xml b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/pom.xml
index 430972b..2f3c02d 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/pom.xml
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/pom.xml
@@ -103,6 +103,10 @@ under the License.
 									<type>jar</type>
 									<overWrite>false</overWrite>
 									<outputDirectory>${project.build.directory}/classes</outputDirectory>
+									<!-- need to exclude to be able to run
+									       * StormWordCountRemoteByClient and
+									       * StormWordCountRemoteBySubmitter
+									     within Eclipse -->
 									<excludes>defaults.yaml</excludes>
 								</artifactItem>
 								<artifactItem>
@@ -112,7 +116,7 @@ under the License.
 									<type>jar</type>
 									<overWrite>false</overWrite>
 									<outputDirectory>${project.build.directory}/classes</outputDirectory>
-								</artifactItem>snakeyaml
+								</artifactItem>
 								<artifactItem>
 									<groupId>org.yaml</groupId>
 									<artifactId>snakeyaml</artifactId>
@@ -143,7 +147,8 @@ under the License.
 							<goal>jar</goal>
 						</goals>
 						<configuration>
-							<classifier>WordCountSpoutSource</classifier>
+							<finalName>WordCount</finalName>
+							<classifier>SpoutSource</classifier>
 
 							<archive>
 								<manifestEntries>
@@ -168,9 +173,11 @@ under the License.
 								<!-- Word Count -->
 								<include>org/apache/flink/stormcompatibility/wordcount/SpoutSourceWordCount.class</include>
 								<include>org/apache/flink/stormcompatibility/wordcount/SpoutSourceWordCount$*.class</include>
-								<include>org/apache/flink/stormcompatibility/wordcount/stormoperators/AbstractStormSpout.class</include>
-								<include>org/apache/flink/stormcompatibility/wordcount/stormoperators/StormFileSpout.class</include>
-								<include>org/apache/flink/stormcompatibility/wordcount/stormoperators/StormInMemorySpout.class</include>
+								<include>org/apache/flink/stormcompatibility/wordcount/stormoperators/StormWordCountFileSpout.class</include>
+								<include>org/apache/flink/stormcompatibility/wordcount/stormoperators/StormWordCountInMemorySpout.class</include>
+								<include>org/apache/flink/stormcompatibility/util/AbstractStormSpout.class</include>
+								<include>org/apache/flink/stormcompatibility/util/StormFileSpout.class</include>
+								<include>org/apache/flink/stormcompatibility/util/StormInMemorySpout.class</include>
 								<include>org/apache/flink/examples/java/wordcount/util/WordCountData.class</include>
 							</includes>
 						</configuration>
@@ -185,7 +192,8 @@ under the License.
 							<goal>jar</goal>
 						</goals>
 						<configuration>
-							<classifier>WordCountBoltTokenizer</classifier>
+							<finalName>WordCount</finalName>
+							<classifier>BoltTokenizer</classifier>
 
 							<archive>
 								<manifestEntries>
@@ -215,6 +223,25 @@ under the License.
 						</configuration>
 					</execution>
 
+					<!-- WordCount Storm topology-->
+					<!-- Example for whole topologies (ie, if FlinkTopologyBuilder is used) -->
+					<!-- We cannot use maven-jar-plugin because 'defaults.yaml' must be included in jar.
+					     However, we excluded 'defaults.yaml' in dependency-plugin to get clean Eclipse environment.
+					     Thus, 'defaults.yaml' is not available for maven-jar-plugin.
+					     Nevertheless, we register an empty jar with corresponding name, such that the final jar can be installed to local maven repository.
+					     We use maven-shade-plugin to build the actual jar (which will replace the empty jar). -->
+					<execution>
+						<id>WordCount-StormTopology</id>
+						<phase>package</phase>
+						<goals>
+							<goal>jar</goal>
+						</goals>
+						<configuration>
+							<finalName>WordCount</finalName>
+							<classifier>StormTopology</classifier>
+						</configuration>
+					</execution>
+
 					<execution>
 						<goals>
 							<goal>test-jar</goal>
@@ -224,28 +251,77 @@ under the License.
 			</plugin>
 
 			<!-- WordCount Storm topology-->
-			<!-- example for whole topologies (ie, if FlinkTopologyBuilder is used) -->
 			<!-- Cannot use maven-jar-plugin because 'defaults.yaml' must be included in jar -->
+			<!-- Build StormTopolgy jar to overwrite empty jar created with maven-jar-plugin. -->
 			<plugin>
-				<artifactId>maven-assembly-plugin</artifactId>
-				<configuration>
-					<descriptors>
-						<descriptor>src/assembly/word-count-storm.xml</descriptor>
-					</descriptors>
-					<archive>
-						<manifestEntries>
-							<program-class>org.apache.flink.stormcompatibility.wordcount.StormWordCountRemoteBySubmitter</program-class>
-						</manifestEntries>
-					</archive>
-				</configuration>
-
+				<artifactId>maven-shade-plugin</artifactId>
+				<groupId>org.apache.maven.plugins</groupId>
+				<version>2.4.1</version>
 				<executions>
 					<execution>
-						<id>WordCountStorm</id>
+						<id>WordCount-StormTopology</id>
 						<phase>package</phase>
 						<goals>
-							<goal>single</goal>
+							<goal>shade</goal>
 						</goals>
+						<configuration>
+							<finalName>WordCount-StormTopology</finalName>
+
+							<artifactSet>
+								<includes>
+									<include>org.apache.storm:storm-core</include>
+									<!-- Storm's recursive dependencies -->
+									<include>org.yaml:snakeyaml</include>
+									<include>com.googlecode.json-simple:json-simple</include>
+									<include>org.apache.flink:flink-storm-compatibility-core</include>
+									<include>org.apache.flink:flink-storm-compatibility-examples</include>
+								</includes>
+							</artifactSet>
+							<filters>
+								<filter>
+									<artifact>org.apache.storm:storm-core</artifact>
+									<includes>
+										<include>defaults.yaml</include>
+										<include>backtype/storm/*.class</include>
+										<include>backtype/storm/topology/*.class</include>
+										<include>backtype/storm/spout/*.class</include>
+										<include>backtype/storm/task/*.class</include>
+										<include>backtype/storm/tuple/*.class</include>
+										<include>backtype/storm/generated/*.class</include>
+										<include>backtype/storm/metric/**/*.class</include>
+										<include>backtype/storm/utils/*.class</include>
+										<include>backtype/storm/serialization/*.class</include>
+										<include>org/apache/storm/curator/**/*.class</include>
+										<include>org/apache/thrift7/**/*.class</include>
+										<!-- Storm's recursive dependencies -->
+										<include>org/json/simple/**/*.class</include>
+										<include>org/yaml/snakeyaml/**/*.class</include>
+									</includes>
+								</filter>
+								<filter>
+									<artifact>org.apache.flink:flink-storm-compatibility-examples</artifact>
+									<includes>
+										<include>org/apache/flink/stormcompatibility/wordcount/StormWordCountRemoteBySubmitter.class</include>
+										<include>org/apache/flink/stormcompatibility/wordcount/WordCountTopology.class</include>
+										<include>org/apache/flink/stormcompatibility/wordcount/stormoperators/*.class</include>
+										<include>org/apache/flink/stormcompatibility/util/*.class</include>
+										<include>org/apache/flink/examples/java/wordcount/util/WordCountData.class</include>
+									</includes>
+								</filter>
+								<filter>
+									<artifact>org.apache.flink:flink-storm-compatibility-core</artifact>
+									<includes>
+										<include>org/apache/flink/stormcompatibility/api/*.class</include>
+										<include>org/apache/flink/stormcompatibility/wrappers/*.class</include>
+									</includes>
+								</filter>
+							</filters>
+							<transformers>
+								<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
+									<mainClass>org.apache.flink.stormcompatibility.wordcount.StormWordCountRemoteBySubmitter</mainClass>
+								</transformer>
+							</transformers>
+						</configuration>
 					</execution>
 				</executions>
 			</plugin>

http://git-wip-us.apache.org/repos/asf/flink/blob/824785e2/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/assembly/word-count-storm.xml
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/assembly/word-count-storm.xml b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/assembly/word-count-storm.xml
deleted file mode 100644
index 96ac429..0000000
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/assembly/word-count-storm.xml
+++ /dev/null
@@ -1,72 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.3"
-          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-          xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.3
-                http://maven.apache.org/xsd/assembly-1.1.3.xsd">
-
-    <id>WordCountStorm</id>
-    <formats>
-        <format>jar</format>
-    </formats>
-
-    <includeBaseDirectory>false</includeBaseDirectory>
-
-    <dependencySets>
-        <dependencySet>
-            <outputDirectory>/</outputDirectory>
-            <unpack>true</unpack>
-            <includes>
-                <include>org.apache.storm:storm-core:jar</include>
-                <include>org.apache.flink:flink-storm-compatibility-core:jar</include>
-                <include>org.apache.flink:flink-storm-compatibility-examples:jar</include>
-            </includes>
-            <unpackOptions>
-                <includes>
-                    <!-- from storm-core -->
-                    <include>defaults.yaml</include>
-                    <include>backtype/storm/*.class</include>
-                    <include>backtype/storm/topology/*.class</include>
-                    <include>backtype/storm/spout/*.class</include>
-                    <include>backtype/storm/task/*.class</include>
-                    <include>backtype/storm/tuple/*.class</include>
-                    <include>backtype/storm/generated/*.class</include>
-                    <include>backtype/storm/metric/**/*.class</include>
-                    <include>backtype/storm/utils/*.class</include>
-                    <include>backtype/storm/serialization/*.class</include>
-                    <include>org/apache/storm/curator/**/*.class</include>
-                    <include>org/apache/thrift7/**/*.class</include>
-                    <!-- Storm's recursive dependencies -->
-                    <include>org/json/simple/**/*.class</include>
-                    <include>org/yaml/snakeyaml/**/*.class</include>
-                    <!-- compatibility layer -->
-                    <include>org/apache/flink/stormcompatibility/api/*.class</include>
-                    <include>org/apache/flink/stormcompatibility/wrappers/*.class</include>
-                    <!-- Word Count -->
-                    <include>org/apache/flink/stormcompatibility/wordcount/StormWordCountRemoteBySubmitter.class</include>
-                    <include>org/apache/flink/stormcompatibility/wordcount/WordCountTopology.class</include>
-                    <include>org/apache/flink/stormcompatibility/wordcount/stormoperators/*.class</include>
-                    <include>org/apache/flink/stormcompatibility/util/*.class</include>
-                    <include>org/apache/flink/examples/java/wordcount/util/WordCountData.class</include>
-                </includes>
-            </unpackOptions>
-        </dependencySet>
-    </dependencySets>
-</assembly>

http://git-wip-us.apache.org/repos/asf/flink/blob/824785e2/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/singlejoin/stormoperators/GenderSpout.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/singlejoin/stormoperators/GenderSpout.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/singlejoin/stormoperators/GenderSpout.java
index d507998..238b6db 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/singlejoin/stormoperators/GenderSpout.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/singlejoin/stormoperators/GenderSpout.java
@@ -23,6 +23,8 @@ import backtype.storm.tuple.Values;
 import org.apache.flink.stormcompatibility.util.AbstractStormSpout;
 
 public class GenderSpout extends AbstractStormSpout {
+	private static final long serialVersionUID = -5079110197950743927L;
+
 	private int counter = 9;
 	private Fields outFields;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/824785e2/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/FiniteStormFileSpout.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/FiniteStormFileSpout.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/FiniteStormFileSpout.java
index d45ad76..dddbb4b 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/FiniteStormFileSpout.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/FiniteStormFileSpout.java
@@ -1,13 +1,12 @@
 /*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *    http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -15,17 +14,14 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.flink.stormcompatibility.util;
 
 import backtype.storm.spout.SpoutOutputCollector;
 import backtype.storm.task.TopologyContext;
 import backtype.storm.tuple.Values;
+
 import org.apache.flink.stormcompatibility.wrappers.FiniteStormSpout;
 
-import java.io.BufferedReader;
-import java.io.FileNotFoundException;
-import java.io.FileReader;
 import java.io.IOException;
 import java.util.Map;
 
@@ -33,43 +29,24 @@ import java.util.Map;
  * Implements a Storm Spout that reads data from a given local file. The spout stops automatically
  * when it reached the end of the file.
  */
-public class FiniteStormFileSpout extends AbstractStormSpout implements FiniteStormSpout {
-	private static final long serialVersionUID = -6996907090003590436L;
+public class FiniteStormFileSpout extends StormFileSpout implements FiniteStormSpout {
+	private static final long serialVersionUID = -1472978008607215864L;
 
-	private final String path;
-	private BufferedReader reader;
 	private String line;
 	private boolean newLineRead;
 
-	public FiniteStormFileSpout(final String path) {
-		this.path = path;
+	public FiniteStormFileSpout(String path) {
+		super(path);
 	}
 
 	@SuppressWarnings("rawtypes")
 	@Override
-	public void open(final Map conf, final TopologyContext context,
-			final SpoutOutputCollector collector) {
+	public void open(final Map conf, final TopologyContext context, final SpoutOutputCollector collector) {
 		super.open(conf, context, collector);
-		try {
-			this.reader = new BufferedReader(new FileReader(this.path));
-		} catch (final FileNotFoundException e) {
-			throw new RuntimeException(e);
-		}
 		newLineRead = false;
 	}
 
 	@Override
-	public void close() {
-		if (this.reader != null) {
-			try {
-				this.reader.close();
-			} catch (final IOException e) {
-				throw new RuntimeException(e);
-			}
-		}
-	}
-
-	@Override
 	public void nextTuple() {
 		this.collector.emit(new Values(line));
 		newLineRead = false;
@@ -78,6 +55,7 @@ public class FiniteStormFileSpout extends AbstractStormSpout implements FiniteSt
 	/**
 	 * Can be called before nextTuple() any times including 0.
 	 */
+	@Override
 	public boolean reachedEnd() {
 		try {
 			readLine();

http://git-wip-us.apache.org/repos/asf/flink/blob/824785e2/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/FiniteStormInMemorySpout.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/FiniteStormInMemorySpout.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/FiniteStormInMemorySpout.java
index 899c569..6fb764d 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/FiniteStormInMemorySpout.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/FiniteStormInMemorySpout.java
@@ -18,29 +18,21 @@
 
 package org.apache.flink.stormcompatibility.util;
 
-import backtype.storm.tuple.Values;
 import org.apache.flink.stormcompatibility.wrappers.FiniteStormSpout;
 
 /**
  * Implements a Storm Spout that reads String[] data stored in the memory. The spout stops
  * automatically when it emitted all of the data.
  */
-public class FiniteStormInMemorySpout extends AbstractStormSpout implements FiniteStormSpout {
-
+public class FiniteStormInMemorySpout extends StormInMemorySpout<String> implements
+		FiniteStormSpout {
 	private static final long serialVersionUID = -4008858647468647019L;
 
-	private String[] source;
-	private int counter = 0;
-
 	public FiniteStormInMemorySpout(String[] source) {
-		this.source = source;
+		super(source);
 	}
 
 	@Override
-	public void nextTuple() {
-			this.collector.emit(new Values(source[this.counter++]));
-	}
-
 	public boolean reachedEnd() {
 		return counter >= source.length;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/824785e2/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/StormFileSpout.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/StormFileSpout.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/StormFileSpout.java
index f52a7bd..7d89c75 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/StormFileSpout.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/StormFileSpout.java
@@ -33,8 +33,8 @@ import java.util.Map;
 public class StormFileSpout extends AbstractStormSpout {
 	private static final long serialVersionUID = -6996907090003590436L;
 
-	private final String path;
-	private BufferedReader reader;
+	protected final String path;
+	protected BufferedReader reader;
 
 	public StormFileSpout(final String path) {
 		this.path = path;

http://git-wip-us.apache.org/repos/asf/flink/blob/824785e2/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/StormInMemorySpout.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/StormInMemorySpout.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/StormInMemorySpout.java
index 99ef324..f6ae622 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/StormInMemorySpout.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/StormInMemorySpout.java
@@ -18,25 +18,24 @@
 package org.apache.flink.stormcompatibility.util;
 
 import backtype.storm.tuple.Values;
-import org.apache.flink.examples.java.wordcount.util.WordCountData;
 
 /**
- * Implements a Storm Spout that reads data from {@link WordCountData#WORDS}.
+ * Implements a Storm Spout that reads data from an in.
  */
-public class StormInMemorySpout extends AbstractStormSpout {
+public class StormInMemorySpout<T> extends AbstractStormSpout {
 	private static final long serialVersionUID = -4008858647468647019L;
 
-	private String[] source;
-	private int counter = 0;
+	protected T[] source;
+	protected int counter = 0;
 
-	public StormInMemorySpout(String[] source) {
+	public StormInMemorySpout(T[] source) {
 		this.source = source;
 	}
 
 	@Override
 	public void nextTuple() {
-		if (this.counter < WordCountData.WORDS.length) {
-			this.collector.emit(new Values(WordCountData.WORDS[this.counter++]));
+		if (this.counter < source.length) {
+			this.collector.emit(new Values(source[this.counter++]));
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/824785e2/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/StormWordCountFileSpout.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/StormWordCountFileSpout.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/StormWordCountFileSpout.java
deleted file mode 100644
index 1fc4023..0000000
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/StormWordCountFileSpout.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.stormcompatibility.util;
-
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.tuple.Fields;
-
-/**
- * Implements a Storm Spout that reads data from a given local file.
- */
-public final class StormWordCountFileSpout extends StormFileSpout {
-	private static final long serialVersionUID = 2372251989250954503L;
-
-	public StormWordCountFileSpout(String path) {
-		super(path);
-		// TODO Auto-generated constructor stub
-	}
-
-	@Override
-	public void declareOutputFields(final OutputFieldsDeclarer declarer) {
-		declarer.declare(new Fields("sentence"));
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/824785e2/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/StormWordCountInMemorySpout.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/StormWordCountInMemorySpout.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/StormWordCountInMemorySpout.java
deleted file mode 100644
index 408cbfb..0000000
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/StormWordCountInMemorySpout.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.stormcompatibility.util;
-
-import org.apache.flink.examples.java.wordcount.util.WordCountData;
-
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.tuple.Fields;
-
-/**
- * Implements a Storm Spout that reads data from {@link WordCountData#WORDS}.
- */
-public final class StormWordCountInMemorySpout extends StormInMemorySpout {
-	private static final long serialVersionUID = 8832143302409465843L;
-
-	public StormWordCountInMemorySpout(String[] source) {
-		super(source);
-	}
-
-	@Override
-	public void declareOutputFields(final OutputFieldsDeclarer declarer) {
-		declarer.declare(new Fields("sentence"));
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/824785e2/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/SpoutSourceWordCount.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/SpoutSourceWordCount.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/SpoutSourceWordCount.java
index 0f04fea..cbd054b 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/SpoutSourceWordCount.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/SpoutSourceWordCount.java
@@ -24,8 +24,8 @@ import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.examples.java.wordcount.util.WordCountData;
-import org.apache.flink.stormcompatibility.util.StormFileSpout;
-import org.apache.flink.stormcompatibility.util.StormInMemorySpout;
+import org.apache.flink.stormcompatibility.wordcount.stormoperators.StormWordCountFileSpout;
+import org.apache.flink.stormcompatibility.wordcount.stormoperators.StormWordCountInMemorySpout;
 import org.apache.flink.stormcompatibility.wrappers.StormFiniteSpoutWrapper;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -142,13 +142,13 @@ public class SpoutSourceWordCount {
 			final String[] tokens = textPath.split(":");
 			final String localFile = tokens[tokens.length - 1];
 			return env.addSource(
-					new StormFiniteSpoutWrapper<String>(new StormFileSpout(localFile),
+					new StormFiniteSpoutWrapper<String>(new StormWordCountFileSpout(localFile),
 							new String[] { Utils.DEFAULT_STREAM_ID }),
 					TypeExtractor.getForClass(String.class)).setParallelism(1);
 		}
 
 		return env.addSource(
-				new StormFiniteSpoutWrapper<String>(new StormInMemorySpout(WordCountData.WORDS),
+				new StormFiniteSpoutWrapper<String>(new StormWordCountInMemorySpout(),
 						new String[] { Utils.DEFAULT_STREAM_ID }),
 				TypeExtractor.getForClass(String.class)).setParallelism(1);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/824785e2/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/StormWordCountRemoteByClient.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/StormWordCountRemoteByClient.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/StormWordCountRemoteByClient.java
index 0bbe11b..3c79eda 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/StormWordCountRemoteByClient.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/StormWordCountRemoteByClient.java
@@ -51,7 +51,7 @@ import org.apache.flink.stormcompatibility.api.FlinkTopologyBuilder;
  */
 public class StormWordCountRemoteByClient {
 	public final static String topologyId = "Streaming WordCount";
-	private final static String uploadedJarLocation = "target/flink-storm-examples-0.9-SNAPSHOT-WordCountStorm.jar";
+	private final static String uploadedJarLocation = "target/WordCount-StormTopology.jar";
 
 	// *************************************************************************
 	// PROGRAM

http://git-wip-us.apache.org/repos/asf/flink/blob/824785e2/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/StormWordCountRemoteBySubmitter.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/StormWordCountRemoteBySubmitter.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/StormWordCountRemoteBySubmitter.java
index 264dc41..de84f55 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/StormWordCountRemoteBySubmitter.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/StormWordCountRemoteBySubmitter.java
@@ -71,7 +71,7 @@ public class StormWordCountRemoteBySubmitter {
 		// conf.put(Config.NIMBUS_THRIFT_PORT, new Integer(6123));
 
 		// The user jar file must be specified via JVM argument if executed via Java.
-		// => -Dstorm.jar=target/flink-storm-examples-0.9-SNAPSHOT-WordCountStorm.jar
+		// => -Dstorm.jar=target/WordCount-StormTopology.jar
 		// If bin/flink is used, the jar file is detected automatically.
 		FlinkSubmitter.submitTopology(topologyId, conf, builder.createTopology());
 

http://git-wip-us.apache.org/repos/asf/flink/blob/824785e2/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/WordCountTopology.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/WordCountTopology.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/WordCountTopology.java
index 367ca9e..45be821 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/WordCountTopology.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/WordCountTopology.java
@@ -25,13 +25,13 @@ import org.apache.flink.stormcompatibility.api.FlinkTopologyBuilder;
 import org.apache.flink.stormcompatibility.util.OutputFormatter;
 import org.apache.flink.stormcompatibility.util.StormBoltFileSink;
 import org.apache.flink.stormcompatibility.util.StormBoltPrintSink;
-import org.apache.flink.stormcompatibility.util.StormWordCountFileSpout;
-import org.apache.flink.stormcompatibility.util.StormWordCountInMemorySpout;
 import org.apache.flink.stormcompatibility.util.TupleOutputFormatter;
 import org.apache.flink.stormcompatibility.wordcount.stormoperators.StormBoltCounter;
 import org.apache.flink.stormcompatibility.wordcount.stormoperators.StormBoltCounterByName;
 import org.apache.flink.stormcompatibility.wordcount.stormoperators.StormBoltTokenizer;
 import org.apache.flink.stormcompatibility.wordcount.stormoperators.StormBoltTokenizerByName;
+import org.apache.flink.stormcompatibility.wordcount.stormoperators.StormWordCountFileSpout;
+import org.apache.flink.stormcompatibility.wordcount.stormoperators.StormWordCountInMemorySpout;
 
 /**
  * Implements the "WordCount" program that computes a simple word occurrence histogram over text files in a streaming
@@ -72,7 +72,7 @@ public class WordCountTopology {
 			final String inputFile = tokens[tokens.length - 1];
 			builder.setSpout(spoutId, new StormWordCountFileSpout(inputFile));
 		} else {
-			builder.setSpout(spoutId, new StormWordCountInMemorySpout(WordCountData.WORDS));
+			builder.setSpout(spoutId, new StormWordCountInMemorySpout());
 		}
 
 		if (indexOrName) {

http://git-wip-us.apache.org/repos/asf/flink/blob/824785e2/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/stormoperators/StormWordCountFileSpout.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/stormoperators/StormWordCountFileSpout.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/stormoperators/StormWordCountFileSpout.java
new file mode 100644
index 0000000..e994760
--- /dev/null
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/stormoperators/StormWordCountFileSpout.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.stormcompatibility.wordcount.stormoperators;
+
+import org.apache.flink.stormcompatibility.util.StormFileSpout;
+
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.tuple.Fields;
+
+/**
+ * Implements a Storm Spout that reads data from a given local file.
+ */
+public final class StormWordCountFileSpout extends StormFileSpout {
+	private static final long serialVersionUID = 2372251989250954503L;
+
+	public StormWordCountFileSpout(String path) {
+		super(path);
+	}
+
+	@Override
+	public void declareOutputFields(final OutputFieldsDeclarer declarer) {
+		declarer.declare(new Fields("sentence"));
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/824785e2/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/stormoperators/StormWordCountInMemorySpout.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/stormoperators/StormWordCountInMemorySpout.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/stormoperators/StormWordCountInMemorySpout.java
new file mode 100644
index 0000000..372f66f
--- /dev/null
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/stormoperators/StormWordCountInMemorySpout.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.stormcompatibility.wordcount.stormoperators;
+
+import org.apache.flink.examples.java.wordcount.util.WordCountData;
+import org.apache.flink.stormcompatibility.util.StormInMemorySpout;
+
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.tuple.Fields;
+
+/**
+ * Implements a Storm Spout that reads data from {@link WordCountData#WORDS}.
+ */
+public final class StormWordCountInMemorySpout extends StormInMemorySpout<String> {
+	private static final long serialVersionUID = 8832143302409465843L;
+
+	public StormWordCountInMemorySpout() {
+		super(WordCountData.WORDS);
+	}
+
+	@Override
+	public void declareOutputFields(final OutputFieldsDeclarer declarer) {
+		declarer.declare(new Fields("sentence"));
+	}
+}


Mime
View raw message