flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gyf...@apache.org
Subject [1/2] flink git commit: [FLINK-2304] Add named attribute access to Storm compatibility layer - extended FlinkTuple to enable named attribute access - extended BoltWrapper for user defined input schema - extended FlinkTopologyBuilder to handle decla
Date Wed, 22 Jul 2015 11:48:15 GMT
Repository: flink
Updated Branches:
  refs/heads/master 148395bcd -> 03320503e


http://git-wip-us.apache.org/repos/asf/flink/blob/03320503/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/WordCountTopology.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/WordCountTopology.java
b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/WordCountTopology.java
index d39a526..f028266 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/WordCountTopology.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/WordCountTopology.java
@@ -19,16 +19,19 @@ package org.apache.flink.stormcompatibility.wordcount;
 
 import backtype.storm.generated.StormTopology;
 import backtype.storm.tuple.Fields;
+
 import org.apache.flink.examples.java.wordcount.util.WordCountData;
 import org.apache.flink.stormcompatibility.api.FlinkTopologyBuilder;
 import org.apache.flink.stormcompatibility.util.OutputFormatter;
 import org.apache.flink.stormcompatibility.util.StormBoltFileSink;
 import org.apache.flink.stormcompatibility.util.StormBoltPrintSink;
-import org.apache.flink.stormcompatibility.util.StormFileSpout;
-import org.apache.flink.stormcompatibility.util.StormInMemorySpout;
+import org.apache.flink.stormcompatibility.util.StormWordCountFileSpout;
+import org.apache.flink.stormcompatibility.util.StormWordCountInMemorySpout;
 import org.apache.flink.stormcompatibility.util.TupleOutputFormatter;
 import org.apache.flink.stormcompatibility.wordcount.stormoperators.StormBoltCounter;
+import org.apache.flink.stormcompatibility.wordcount.stormoperators.StormBoltCounterByName;
 import org.apache.flink.stormcompatibility.wordcount.stormoperators.StormBoltTokenizer;
+import org.apache.flink.stormcompatibility.wordcount.stormoperators.StormBoltTokenizerByName;
 
 /**
  * Implements the "WordCount" program that computes a simple word occurrence histogram over
text files in a streaming
@@ -55,6 +58,10 @@ public class WordCountTopology {
 	private final static OutputFormatter formatter = new TupleOutputFormatter();
 
 	public static FlinkTopologyBuilder buildTopology() {
+		return buildTopology(true);
+	}
+
+	public static FlinkTopologyBuilder buildTopology(boolean indexOrName) {
 
 		final FlinkTopologyBuilder builder = new FlinkTopologyBuilder();
 
@@ -63,16 +70,25 @@ public class WordCountTopology {
 			// read the text file from given input path
 			final String[] tokens = textPath.split(":");
 			final String inputFile = tokens[tokens.length - 1];
-			builder.setSpout(spoutId, new StormFileSpout(inputFile));
+			builder.setSpout(spoutId, new StormWordCountFileSpout(inputFile));
 		} else {
-			builder.setSpout(spoutId, new StormInMemorySpout(WordCountData.WORDS));
+			builder.setSpout(spoutId, new StormWordCountInMemorySpout(WordCountData.WORDS));
 		}
 
-		// split up the lines in pairs (2-tuples) containing: (word,1)
-		builder.setBolt(tokenierzerId, new StormBoltTokenizer(), 4).shuffleGrouping(spoutId);
-		// group by the tuple field "0" and sum up tuple field "1"
-		builder.setBolt(counterId, new StormBoltCounter(), 4).fieldsGrouping(tokenierzerId,
-				new Fields(StormBoltTokenizer.ATTRIBUTE_WORD));
+		if (indexOrName) {
+			// split up the lines in pairs (2-tuples) containing: (word,1)
+			builder.setBolt(tokenierzerId, new StormBoltTokenizer(), 4).shuffleGrouping(spoutId);
+			// group by the tuple field "0" and sum up tuple field "1"
+			builder.setBolt(counterId, new StormBoltCounter(), 4).fieldsGrouping(tokenierzerId,
+					new Fields(StormBoltTokenizer.ATTRIBUTE_WORD));
+		} else {
+			// split up the lines in pairs (2-tuples) containing: (word,1)
+			builder.setBolt(tokenierzerId, new StormBoltTokenizerByName(), 4).shuffleGrouping(
+					spoutId);
+			// group by the tuple field "0" and sum up tuple field "1"
+			builder.setBolt(counterId, new StormBoltCounterByName(), 4).fieldsGrouping(
+					tokenierzerId, new Fields(StormBoltTokenizerByName.ATTRIBUTE_WORD));
+		}
 
 		// emit result
 		if (fileInputOutput) {

http://git-wip-us.apache.org/repos/asf/flink/blob/03320503/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/stormoperators/StormBoltCounterByName.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/stormoperators/StormBoltCounterByName.java
b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/stormoperators/StormBoltCounterByName.java
new file mode 100644
index 0000000..bf940c3
--- /dev/null
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/stormoperators/StormBoltCounterByName.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.stormcompatibility.wordcount.stormoperators;
+
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.IRichBolt;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Tuple;
+import backtype.storm.tuple.Values;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Implements the word counter that the occurrence of each unique word. The bolt takes a
pair (input tuple schema:
+ * {@code <String,Integer>}) and sums the given word count for each unique word (output
tuple schema:
+ * {@code <String,Integer>} ).
+ */
+public class StormBoltCounterByName implements IRichBolt {
+	private static final long serialVersionUID = 399619605462625934L;
+
+	public static final String ATTRIBUTE_WORD = "word";
+	public static final String ATTRIBUTE_COUNT = "count";
+
+	private final HashMap<String, Count> counts = new HashMap<String, Count>();
+	private OutputCollector collector;
+
+	@SuppressWarnings("rawtypes")
+	@Override
+	public void prepare(final Map stormConf, final TopologyContext context, final OutputCollector
collector) {
+		this.collector = collector;
+	}
+
+	@Override
+	public void execute(final Tuple input) {
+		final String word = input.getStringByField(StormBoltTokenizer.ATTRIBUTE_WORD);
+
+		Count currentCount = this.counts.get(word);
+		if (currentCount == null) {
+			currentCount = new Count();
+			this.counts.put(word, currentCount);
+		}
+		currentCount.count += input.getIntegerByField(StormBoltTokenizer.ATTRIBUTE_COUNT);
+
+		this.collector.emit(new Values(word, currentCount.count));
+	}
+
+	@Override
+	public void cleanup() {/* nothing to do */}
+
+	@Override
+	public void declareOutputFields(final OutputFieldsDeclarer declarer) {
+		declarer.declare(new Fields(ATTRIBUTE_WORD, ATTRIBUTE_COUNT));
+	}
+
+	@Override
+	public Map<String, Object> getComponentConfiguration() {
+		return null;
+	}
+
+	/**
+	 * A counter helper to emit immutable tuples to the given stormCollector and avoid unnecessary
object
+	 * creating/deletion.
+	 */
+	private static final class Count {
+		public int count;
+
+		public Count() {/* nothing to do */}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/03320503/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/stormoperators/StormBoltTokenizer.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/stormoperators/StormBoltTokenizer.java
b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/stormoperators/StormBoltTokenizer.java
index 96bd87c..dfb3e37 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/stormoperators/StormBoltTokenizer.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/stormoperators/StormBoltTokenizer.java
@@ -31,6 +31,8 @@ import java.util.Map;
  * Implements the string tokenizer that splits sentences into words as a Storm bolt. The
bolt takes a line (input tuple
  * schema: {@code <String>}) and splits it into multiple pairs in the form of "(word,1)"
(output tuple schema:
  * {@code <String,Integer>}).
+ * <p>
+ * Same as {@link StormBoltTokenizerByName}, but accesses input attribute by index (instead
of name).
  */
 public final class StormBoltTokenizer implements IRichBolt {
 	private static final long serialVersionUID = -8589620297208175149L;

http://git-wip-us.apache.org/repos/asf/flink/blob/03320503/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/stormoperators/StormBoltTokenizerByName.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/stormoperators/StormBoltTokenizerByName.java
b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/stormoperators/StormBoltTokenizerByName.java
new file mode 100644
index 0000000..8796b95
--- /dev/null
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/stormoperators/StormBoltTokenizerByName.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.stormcompatibility.wordcount.stormoperators;
+
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.IRichBolt;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Tuple;
+import backtype.storm.tuple.Values;
+
+import java.util.Map;
+
+/**
+ * Implements the string tokenizer that splits sentences into words as a Storm bolt. The
bolt takes a line (input tuple
+ * schema: {@code <String>}) and splits it into multiple pairs in the form of "(word,1)"
(output tuple schema:
+ * {@code <String,Integer>}).
+ * <p>
+ * Same as {@link StormBoltTokenizer}, but accesses input attribute by name (instead of index).
+ */
+public final class StormBoltTokenizerByName implements IRichBolt {
+	private static final long serialVersionUID = -8589620297208175149L;
+
+	public static final String ATTRIBUTE_WORD = "word";
+	public static final String ATTRIBUTE_COUNT = "count";
+
+	public static final int ATTRIBUTE_WORD_INDEX = 0;
+	public static final int ATTRIBUTE_COUNT_INDEX = 1;
+
+	private OutputCollector collector;
+
+	@SuppressWarnings("rawtypes")
+	@Override
+	public void prepare(final Map stormConf, final TopologyContext context, final OutputCollector
collector) {
+		this.collector = collector;
+	}
+
+	@Override
+	public void execute(final Tuple input) {
+		final String[] tokens = input.getStringByField("sentence").toLowerCase().split("\\W+");
+
+		for (final String token : tokens) {
+			if (token.length() > 0) {
+				this.collector.emit(new Values(token, 1));
+			}
+		}
+	}
+
+	@Override
+	public void cleanup() {/* nothing to do */}
+
+	@Override
+	public void declareOutputFields(final OutputFieldsDeclarer declarer) {
+		declarer.declare(new Fields(ATTRIBUTE_WORD, ATTRIBUTE_COUNT));
+	}
+
+	@Override
+	public Map<String, Object> getComponentConfiguration() {
+		return null;
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/03320503/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/stormoperators/WordCountDataPojos.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/stormoperators/WordCountDataPojos.java
b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/stormoperators/WordCountDataPojos.java
new file mode 100644
index 0000000..f965a28
--- /dev/null
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/stormoperators/WordCountDataPojos.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.stormcompatibility.wordcount.stormoperators;
+
+import java.io.Serializable;
+
+import org.apache.flink.examples.java.wordcount.util.WordCountData;
+
+public class WordCountDataPojos {
+	public static Sentence[] SENTENCES;
+
+	static {
+		SENTENCES = new Sentence[WordCountData.WORDS.length];
+		for (int i = 0; i < SENTENCES.length; ++i) {
+			SENTENCES[i] = new Sentence(WordCountData.WORDS[i]);
+		}
+	}
+
+	public static class Sentence implements Serializable {
+		private static final long serialVersionUID = -7336372859203407522L;
+
+		private String sentence;
+
+		public Sentence() {
+		}
+
+		public Sentence(String sentence) {
+			this.sentence = sentence;
+		}
+
+		public String getSentence() {
+			return sentence;
+		}
+
+		public void setSentence(String sentence) {
+			this.sentence = sentence;
+		}
+
+		@Override
+		public String toString() {
+			return "(" + this.sentence + ")";
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/03320503/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/stormoperators/WordCountDataTuple.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/stormoperators/WordCountDataTuple.java
b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/stormoperators/WordCountDataTuple.java
new file mode 100644
index 0000000..732f0ae
--- /dev/null
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/stormoperators/WordCountDataTuple.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.stormcompatibility.wordcount.stormoperators;
+
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.examples.java.wordcount.util.WordCountData;
+
+@SuppressWarnings("unchecked")
+public class WordCountDataTuple {
+	public static Tuple1<String>[] TUPLES;
+
+	static {
+		TUPLES = new Tuple1[WordCountData.WORDS.length];
+		for (int i = 0; i < TUPLES.length; ++i) {
+			TUPLES[i] = new Tuple1<String>(WordCountData.WORDS[i]);
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/03320503/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCountPojoITCase.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCountPojoITCase.java
b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCountPojoITCase.java
new file mode 100644
index 0000000..dc75c25
--- /dev/null
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCountPojoITCase.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.stormcompatibility.wordcount;
+
+import org.apache.flink.streaming.util.StreamingProgramTestBase;
+import org.apache.flink.test.testdata.WordCountData;
+
+public class BoltTokenizerWordCountPojoITCase extends StreamingProgramTestBase {
+
+	protected String textPath;
+	protected String resultPath;
+
+	@Override
+	protected void preSubmit() throws Exception {
+		this.textPath = this.createTempFile("text.txt", WordCountData.TEXT);
+		this.resultPath = this.getTempDirPath("result");
+	}
+
+	@Override
+	protected void postSubmit() throws Exception {
+		compareResultsByLinesInMemory(WordCountData.STREAMING_COUNTS_AS_TUPLES, this.resultPath);
+	}
+
+	@Override
+	protected void testProgram() throws Exception {
+		BoltTokenizerWordCountPojo.main(new String[]{this.textPath, this.resultPath});
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/03320503/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCountWithNamesITCase.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCountWithNamesITCase.java
b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCountWithNamesITCase.java
new file mode 100644
index 0000000..e147f53
--- /dev/null
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCountWithNamesITCase.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.stormcompatibility.wordcount;
+
+import org.apache.flink.streaming.util.StreamingProgramTestBase;
+import org.apache.flink.test.testdata.WordCountData;
+
+public class BoltTokenizerWordCountWithNamesITCase extends StreamingProgramTestBase {
+
+	protected String textPath;
+	protected String resultPath;
+
+	@Override
+	protected void preSubmit() throws Exception {
+		this.textPath = this.createTempFile("text.txt", WordCountData.TEXT);
+		this.resultPath = this.getTempDirPath("result");
+	}
+
+	@Override
+	protected void postSubmit() throws Exception {
+		compareResultsByLinesInMemory(WordCountData.STREAMING_COUNTS_AS_TUPLES, this.resultPath);
+	}
+
+	@Override
+	protected void testProgram() throws Exception {
+		BoltTokenizerWordCountWithNames.main(new String[]{this.textPath, this.resultPath});
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/03320503/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/wordcount/StormWordCountLocalNamedITCase.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/wordcount/StormWordCountLocalNamedITCase.java
b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/wordcount/StormWordCountLocalNamedITCase.java
new file mode 100644
index 0000000..8b9a729
--- /dev/null
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/wordcount/StormWordCountLocalNamedITCase.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.stormcompatibility.wordcount;
+
+import org.apache.flink.stormcompatibility.api.FlinkLocalCluster;
+import org.apache.flink.stormcompatibility.api.FlinkTestCluster;
+import org.apache.flink.streaming.util.StreamingProgramTestBase;
+import org.apache.flink.test.testdata.WordCountData;
+
+public class StormWordCountLocalNamedITCase extends StreamingProgramTestBase {
+
+	protected String textPath;
+	protected String resultPath;
+
+	@Override
+	protected void preSubmit() throws Exception {
+		FlinkLocalCluster.initialize(new FlinkTestCluster());
+		this.textPath = this.createTempFile("text.txt", WordCountData.TEXT);
+		this.resultPath = this.getTempDirPath("result");
+	}
+
+	@Override
+	protected void postSubmit() throws Exception {
+		compareResultsByLinesInMemory(WordCountData.STREAMING_COUNTS_AS_TUPLES, this.resultPath);
+	}
+
+	@Override
+	protected void testProgram() throws Exception {
+		StormWordCountNamedLocal.main(new String[] { this.textPath, this.resultPath });
+	}
+
+}


Mime
View raw message