flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mbala...@apache.org
Subject [17/27] flink git commit: [storm-compat] Moved Storm-compatibility to flink-contrib and split flink-contrib into small sub-projects
Date Mon, 15 Jun 2015 09:33:07 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/12b13f9c/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/stormoperators/ExclamationBolt.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/stormoperators/ExclamationBolt.java b/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/stormoperators/ExclamationBolt.java
deleted file mode 100644
index 14232b7..0000000
--- a/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/stormoperators/ExclamationBolt.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.stormcompatibility.excamation.stormoperators;
-
-import backtype.storm.task.OutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.IRichBolt;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Tuple;
-import backtype.storm.tuple.Values;
-
-import java.util.Map;
-
-public class ExclamationBolt implements IRichBolt {
-	OutputCollector _collector;
-
-	@SuppressWarnings("rawtypes")
-	@Override
-	public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
-		_collector = collector;
-	}
-
-	@Override
-	public void cleanup() {
-	}
-
-	@Override
-	public void execute(Tuple tuple) {
-		_collector.emit(tuple, new Values(tuple.getString(0) + "!!!"));
-	}
-
-	@Override
-	public void declareOutputFields(OutputFieldsDeclarer declarer) {
-		declarer.declare(new Fields("word"));
-	}
-
-	@Override
-	public Map<String, Object> getComponentConfiguration() {
-		return null;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/12b13f9c/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/singlejoin/SingleJoinTopology.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/singlejoin/SingleJoinTopology.java b/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/singlejoin/SingleJoinTopology.java
deleted file mode 100644
index 79c7125..0000000
--- a/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/singlejoin/SingleJoinTopology.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.stormcompatibility.singlejoin;
-
-import backtype.storm.tuple.Fields;
-import org.apache.flink.stormcompatibility.api.FlinkTopologyBuilder;
-import org.apache.flink.stormcompatibility.singlejoin.stormoperators.AgeSpout;
-import org.apache.flink.stormcompatibility.singlejoin.stormoperators.GenderSpout;
-import org.apache.flink.stormcompatibility.singlejoin.stormoperators.SingleJoinBolt;
-import org.apache.flink.stormcompatibility.util.OutputFormatter;
-import org.apache.flink.stormcompatibility.util.StormBoltFileSink;
-import org.apache.flink.stormcompatibility.util.StormBoltPrintSink;
-import org.apache.flink.stormcompatibility.util.TupleOutputFormatter;
-
-public class SingleJoinTopology {
-
-	public final static String spoutId1 = "gender";
-	public final static String spoutId2 = "age";
-	public final static String boltId = "singleJoin";
-	public final static String sinkId = "sink";
-	private final static OutputFormatter formatter = new TupleOutputFormatter();
-
-	public static FlinkTopologyBuilder buildTopology() {
-
-		final FlinkTopologyBuilder builder = new FlinkTopologyBuilder();
-
-		// get input data
-		builder.setSpout(spoutId1, new GenderSpout(new Fields("id", "gender")));
-		builder.setSpout(spoutId2, new AgeSpout(new Fields("id", "age")));
-
-		builder.setBolt(boltId, new SingleJoinBolt(new Fields("gender", "age")))
-		.fieldsGrouping(spoutId1, new Fields("id"))
-		.fieldsGrouping(spoutId2, new Fields("id"));
-		//.shuffleGrouping(spoutId1)
-		//.shuffleGrouping(spoutId2);
-
-		// emit result
-		if (fileInputOutput) {
-			// read the text file from given input path
-			final String[] tokens = outputPath.split(":");
-			final String outputFile = tokens[tokens.length - 1];
-			builder.setBolt(sinkId, new StormBoltFileSink(outputFile, formatter)).shuffleGrouping(boltId);
-		} else {
-			builder.setBolt(sinkId, new StormBoltPrintSink(formatter), 4).shuffleGrouping(boltId);
-		}
-
-		return builder;
-	}
-
-	// *************************************************************************
-	// UTIL METHODS
-	// *************************************************************************
-
-	private static boolean fileInputOutput = false;
-	private static String outputPath;
-
-	static boolean parseParameters(final String[] args) {
-
-		if (args.length > 0) {
-			// parse input arguments
-			fileInputOutput = true;
-			if (args.length == 1) {
-				outputPath = args[0];
-			} else {
-				System.err.println("Usage: StormSingleJoin* <result path>");
-				return false;
-			}
-		} else {
-			System.out.println("Executing StormSingleJoin* example with built-in default data");
-			System.out.println("  Provide parameters to read input data from a file");
-			System.out.println("  Usage: StormSingleJoin* <result path>");
-		}
-		return true;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/12b13f9c/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/singlejoin/StormSingleJoinLocal.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/singlejoin/StormSingleJoinLocal.java b/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/singlejoin/StormSingleJoinLocal.java
deleted file mode 100644
index d70914a..0000000
--- a/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/singlejoin/StormSingleJoinLocal.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.stormcompatibility.singlejoin;
-
-import backtype.storm.utils.Utils;
-import org.apache.flink.stormcompatibility.api.FlinkLocalCluster;
-import org.apache.flink.stormcompatibility.api.FlinkTopologyBuilder;
-
-public class StormSingleJoinLocal {
-	public final static String topologyId = "Streaming SingleJoin";
-
-	// *************************************************************************
-	// PROGRAM
-	// *************************************************************************
-
-	public static void main(final String[] args) throws Exception {
-
-		if (!SingleJoinTopology.parseParameters(args)) {
-			return;
-		}
-
-		// build Topology the Storm way
-		final FlinkTopologyBuilder builder = SingleJoinTopology.buildTopology();
-
-		// execute program locally
-		final FlinkLocalCluster cluster = FlinkLocalCluster.getLocalCluster();
-		cluster.submitTopology(topologyId, null, builder.createTopology());
-
-		Utils.sleep(5 * 1000);
-
-		// TODO kill does no do anything so far
-		cluster.killTopology(topologyId);
-		cluster.shutdown();
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/12b13f9c/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/singlejoin/stormoperators/AgeSpout.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/singlejoin/stormoperators/AgeSpout.java b/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/singlejoin/stormoperators/AgeSpout.java
deleted file mode 100644
index 49761c3..0000000
--- a/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/singlejoin/stormoperators/AgeSpout.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.stormcompatibility.singlejoin.stormoperators;
-
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Values;
-import org.apache.flink.stormcompatibility.util.AbstractStormSpout;
-
-public class AgeSpout extends AbstractStormSpout {
-	private static final long serialVersionUID = -4008858647468647019L;
-
-	private int counter = 0;
-	private String gender;
-	private Fields outFields;
-
-	public AgeSpout(Fields outFields) {
-		this.outFields = outFields;
-	}
-
-	@Override
-	public void nextTuple() {
-		if (this.counter < 10) {
-			if (counter % 2 == 0) {
-				gender = "male";
-			} else {
-				gender = "female";
-			}
-			this.collector.emit(new Values(counter, gender));
-			counter++;
-		}
-	}
-
-	@Override
-	public void declareOutputFields(OutputFieldsDeclarer declarer) {
-		declarer.declare(outFields);
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/12b13f9c/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/singlejoin/stormoperators/GenderSpout.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/singlejoin/stormoperators/GenderSpout.java b/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/singlejoin/stormoperators/GenderSpout.java
deleted file mode 100644
index d507998..0000000
--- a/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/singlejoin/stormoperators/GenderSpout.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.stormcompatibility.singlejoin.stormoperators;
-
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Values;
-import org.apache.flink.stormcompatibility.util.AbstractStormSpout;
-
-public class GenderSpout extends AbstractStormSpout {
-	private int counter = 9;
-	private Fields outFields;
-
-	public GenderSpout(Fields outFields) {
-		this.outFields = outFields;
-	}
-
-	@Override
-	public void nextTuple() {
-		if (counter >= 0) {
-			this.collector.emit(new Values(counter, counter + 20));
-			counter--;
-		}
-	}
-
-	@Override
-	public void declareOutputFields(OutputFieldsDeclarer declarer) {
-		declarer.declare(outFields);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/12b13f9c/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/singlejoin/stormoperators/SingleJoinBolt.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/singlejoin/stormoperators/SingleJoinBolt.java b/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/singlejoin/stormoperators/SingleJoinBolt.java
deleted file mode 100644
index cd53140..0000000
--- a/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/singlejoin/stormoperators/SingleJoinBolt.java
+++ /dev/null
@@ -1,132 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.stormcompatibility.singlejoin.stormoperators;
-
-import backtype.storm.generated.GlobalStreamId;
-import backtype.storm.task.OutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.IRichBolt;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Tuple;
-import backtype.storm.utils.TimeCacheMap;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-@SuppressWarnings("deprecation")
-public class SingleJoinBolt implements IRichBolt {
-	OutputCollector collector;
-	Fields idFields;
-	Fields outFields;
-	int numSources = 2;
-	TimeCacheMap<List<Object>, Map<GlobalStreamId, Tuple>> pending;
-	Map<String, GlobalStreamId> fieldLocations;
-
-	public SingleJoinBolt(Fields outFields) {
-		this.outFields = outFields;
-	}
-
-	@SuppressWarnings({"rawtypes", "null"})
-	@Override
-	public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
-		fieldLocations = new HashMap<String, GlobalStreamId>();
-		this.collector = collector;
-		int timeout = 100;
-		pending = new TimeCacheMap<List<Object>, Map<GlobalStreamId, Tuple>>(timeout, new ExpireCallback());
-		// numSources = context.getThisSources().size();
-		Set<String> idFields = null;
-		for (GlobalStreamId source : context.getThisSources().keySet()) {
-			Fields fields = context.getComponentOutputFields(source.get_componentId(), source.get_streamId());
-			Set<String> setFields = new HashSet<String>(fields.toList());
-			if (idFields == null) {
-				idFields = setFields;
-			} else {
-				idFields.retainAll(setFields);
-			}
-
-			for (String outfield : outFields) {
-				for (String sourcefield : fields) {
-					if (outfield.equals(sourcefield)) {
-						fieldLocations.put(outfield, source);
-					}
-				}
-			}
-		}
-		this.idFields = new Fields(new ArrayList<String>(idFields));
-
-		if (fieldLocations.size() != outFields.size()) {
-			throw new RuntimeException("Cannot find all outfields among sources");
-		}
-	}
-
-	@Override
-	public void execute(Tuple tuple) {
-		List<Object> id = tuple.select(idFields);
-		GlobalStreamId streamId = new GlobalStreamId(tuple.getSourceComponent(), tuple.getSourceStreamId());
-		if (!pending.containsKey(id)) {
-			pending.put(id, new HashMap<GlobalStreamId, Tuple>());
-		}
-		Map<GlobalStreamId, Tuple> parts = pending.get(id);
-		if (parts.containsKey(streamId)) {
-			throw new RuntimeException("Received same side of single join twice");
-		}
-		parts.put(streamId, tuple);
-		if (parts.size() == numSources) {
-			pending.remove(id);
-			List<Object> joinResult = new ArrayList<Object>();
-			for (String outField : outFields) {
-				GlobalStreamId loc = fieldLocations.get(outField);
-				joinResult.add(parts.get(loc).getValueByField(outField));
-			}
-			collector.emit(new ArrayList<Tuple>(parts.values()), joinResult);
-
-			for (Tuple part : parts.values()) {
-				collector.ack(part);
-			}
-		}
-	}
-
-	@Override
-	public void cleanup() {
-		/* nothing to do */
-	}
-
-	@Override
-	public void declareOutputFields(OutputFieldsDeclarer declarer) {
-		declarer.declare(outFields);
-	}
-
-	@Override
-	public Map<String, Object> getComponentConfiguration() {
-		return null;
-	}
-
-	private class ExpireCallback implements TimeCacheMap.ExpiredCallback<List<Object>, Map<GlobalStreamId, Tuple>> {
-		@Override
-		public void expire(List<Object> id, Map<GlobalStreamId, Tuple> tuples) {
-			for (Tuple tuple : tuples.values()) {
-				collector.fail(tuple);
-			}
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/12b13f9c/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/util/AbstractStormBoltSink.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/util/AbstractStormBoltSink.java b/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/util/AbstractStormBoltSink.java
deleted file mode 100644
index b121744..0000000
--- a/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/util/AbstractStormBoltSink.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.stormcompatibility.util;
-
-import backtype.storm.task.OutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.IRichBolt;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.tuple.Tuple;
-
-import java.util.Map;
-
-/**
- * Implements a sink that write the received data so some external output. The result is formatted like
- * {@code (a1, a2, ..., an)} with {@code Object.toString()} for each attribute).
- */
-public abstract class AbstractStormBoltSink implements IRichBolt {
-	private static final long serialVersionUID = -1626323806848080430L;
-
-	private StringBuilder lineBuilder;
-	private String prefix = "";
-	private final OutputFormatter formatter;
-
-	public AbstractStormBoltSink(final OutputFormatter formatter) {
-		this.formatter = formatter;
-	}
-
-	@SuppressWarnings("rawtypes")
-	@Override
-	public final void prepare(final Map stormConf, final TopologyContext context,
-			final OutputCollector collector) {
-		this.prepareSimple(stormConf, context);
-		if (context.getComponentCommon(context.getThisComponentId()).get_parallelism_hint() > 1) {
-			this.prefix = context.getThisTaskId() + "> ";
-		}
-	}
-
-	protected abstract void prepareSimple(final Map<?, ?> stormConf, final TopologyContext context);
-
-	@Override
-	public final void execute(final Tuple input) {
-		this.lineBuilder = new StringBuilder();
-		this.lineBuilder.append(this.prefix);
-		this.lineBuilder.append(this.formatter.format(input));
-		this.writeExternal(this.lineBuilder.toString());
-	}
-
-	protected abstract void writeExternal(final String line);
-
-	@Override
-	public void cleanup() {/* nothing to do */}
-
-	@Override
-	public final void declareOutputFields(final OutputFieldsDeclarer declarer) {/* nothing to do */}
-
-	@Override
-	public Map<String, Object> getComponentConfiguration() {
-		return null;
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/12b13f9c/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/util/AbstractStormSpout.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/util/AbstractStormSpout.java b/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/util/AbstractStormSpout.java
deleted file mode 100644
index d4d185f..0000000
--- a/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/util/AbstractStormSpout.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.stormcompatibility.util;
-
-import backtype.storm.spout.SpoutOutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.IRichSpout;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.tuple.Fields;
-
-import java.util.Map;
-
-/**
- * Base class for Storm Spout that reads data line by line from an arbitrary source. The declared output schema has a
- * single attribute calle {@code line} and should be of type {@link String}.
- */
-public abstract class AbstractStormSpout implements IRichSpout {
-	private static final long serialVersionUID = 8876828403487806771L;
-
-	public final static String ATTRIBUTE_LINE = "line";
-
-	protected SpoutOutputCollector collector;
-
-	@SuppressWarnings("rawtypes")
-	@Override
-	public void open(final Map conf, final TopologyContext context, final SpoutOutputCollector collector) {
-		this.collector = collector;
-	}
-
-	@Override
-	public void close() {/* noting to do */}
-
-	@Override
-	public void activate() {/* noting to do */}
-
-	@Override
-	public void deactivate() {/* noting to do */}
-
-	@Override
-	public void ack(final Object msgId) {/* noting to do */}
-
-	@Override
-	public void fail(final Object msgId) {/* noting to do */}
-
-	@Override
-	public void declareOutputFields(final OutputFieldsDeclarer declarer) {
-		declarer.declare(new Fields(ATTRIBUTE_LINE));
-	}
-
-	@Override
-	public Map<String, Object> getComponentConfiguration() {
-		return null;
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/12b13f9c/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/util/OutputFormatter.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/util/OutputFormatter.java b/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/util/OutputFormatter.java
deleted file mode 100644
index bfc3135..0000000
--- a/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/util/OutputFormatter.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.stormcompatibility.util;
-
-import java.io.Serializable;
-
-import backtype.storm.tuple.Tuple;
-
-public interface OutputFormatter extends Serializable {
-
-	public String format(Tuple input);
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/12b13f9c/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/util/SimpleOutputFormatter.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/util/SimpleOutputFormatter.java b/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/util/SimpleOutputFormatter.java
deleted file mode 100644
index a9d72d9..0000000
--- a/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/util/SimpleOutputFormatter.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.stormcompatibility.util;
-
-import backtype.storm.tuple.Tuple;
-
-public class SimpleOutputFormatter implements OutputFormatter {
-	private static final long serialVersionUID = 6349573860144270338L;
-
-	@Override
-	public String format(final Tuple input) {
-		return input.getValue(0).toString();
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/12b13f9c/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/util/StormBoltFileSink.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/util/StormBoltFileSink.java b/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/util/StormBoltFileSink.java
deleted file mode 100644
index a92bc6a..0000000
--- a/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/util/StormBoltFileSink.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.stormcompatibility.util;
-
-import backtype.storm.task.TopologyContext;
-
-import java.io.BufferedWriter;
-import java.io.FileWriter;
-import java.io.IOException;
-import java.util.Map;
-
-/**
- * Implements a sink that write the received data to the given file (as a result of {@code Object.toString()} for each
- * attribute).
- */
-public final class StormBoltFileSink extends AbstractStormBoltSink {
-	private static final long serialVersionUID = 2014027288631273666L;
-
-	private final String path;
-	private BufferedWriter writer;
-
-	public StormBoltFileSink(final String path, final OutputFormatter formatter) {
-		super(formatter);
-		this.path = path;
-	}
-
-	@SuppressWarnings("rawtypes")
-	@Override
-	public void prepareSimple(final Map stormConf, final TopologyContext context) {
-		try {
-			this.writer = new BufferedWriter(new FileWriter(this.path));
-		} catch (final IOException e) {
-			throw new RuntimeException(e);
-		}
-	}
-
-	@Override
-	public void writeExternal(final String line) {
-		try {
-			this.writer.write(line + "\n");
-		} catch (final IOException e) {
-			throw new RuntimeException(e);
-		}
-	}
-
-	@Override
-	public void cleanup() {
-		if (this.writer != null) {
-			try {
-				this.writer.close();
-			} catch (final IOException e) {
-				throw new RuntimeException(e);
-			}
-		}
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/12b13f9c/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/util/StormBoltPrintSink.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/util/StormBoltPrintSink.java b/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/util/StormBoltPrintSink.java
deleted file mode 100644
index 3bf49d0..0000000
--- a/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/util/StormBoltPrintSink.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.stormcompatibility.util;
-
-import backtype.storm.task.TopologyContext;
-
-import java.util.Map;
-
-/**
- * Implements a sink that prints the received data to {@code stdout}.
- */
-public final class StormBoltPrintSink extends AbstractStormBoltSink {
-	private static final long serialVersionUID = -6650011223001009519L;
-
-	public StormBoltPrintSink(OutputFormatter formatter) {
-		super(formatter);
-	}
-
-	@SuppressWarnings("rawtypes")
-	@Override
-	public void prepareSimple(final Map stormConf, final TopologyContext context) {
-		/* nothing to do */
-	}
-
-	@Override
-	public void writeExternal(final String line) {
-		System.out.println(line);
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/12b13f9c/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/util/StormFileSpout.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/util/StormFileSpout.java b/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/util/StormFileSpout.java
deleted file mode 100644
index c38b599..0000000
--- a/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/util/StormFileSpout.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.stormcompatibility.util;
-
-import backtype.storm.spout.SpoutOutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.tuple.Values;
-
-import java.io.BufferedReader;
-import java.io.FileNotFoundException;
-import java.io.FileReader;
-import java.io.IOException;
-import java.util.Map;
-
-/**
- * Implements a Storm Spout that reads data from a given local file.
- */
-public final class StormFileSpout extends AbstractStormSpout {
-	private static final long serialVersionUID = -6996907090003590436L;
-
-	private final String path;
-	private BufferedReader reader;
-
-	public StormFileSpout(final String path) {
-		this.path = path;
-	}
-
-	@SuppressWarnings("rawtypes")
-	@Override
-	public void open(final Map conf, final TopologyContext context, final SpoutOutputCollector collector) {
-		super.open(conf, context, collector);
-		try {
-			this.reader = new BufferedReader(new FileReader(this.path));
-		} catch (final FileNotFoundException e) {
-			throw new RuntimeException(e);
-		}
-	}
-
-	@Override
-	public void close() {
-		if (this.reader != null) {
-			try {
-				this.reader.close();
-			} catch (final IOException e) {
-				throw new RuntimeException(e);
-			}
-		}
-	}
-
-	@Override
-	public void nextTuple() {
-		String line;
-		try {
-			line = this.reader.readLine();
-			if (line != null) {
-				this.collector.emit(new Values(line));
-			}
-		} catch (final IOException e) {
-			throw new RuntimeException(e);
-		}
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/12b13f9c/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/util/StormInMemorySpout.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/util/StormInMemorySpout.java b/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/util/StormInMemorySpout.java
deleted file mode 100644
index 3e6081c..0000000
--- a/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/util/StormInMemorySpout.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.stormcompatibility.util;
-
-import backtype.storm.tuple.Values;
-import org.apache.flink.examples.java.wordcount.util.WordCountData;
-
-/**
- * Implements a Storm Spout that reads data from {@link WordCountData#WORDS}.
- */
-public final class StormInMemorySpout extends AbstractStormSpout {
-	private static final long serialVersionUID = -4008858647468647019L;
-
-	private String[] source;
-	private int counter = 0;
-
-	public StormInMemorySpout(String[] source) {
-		this.source = source;
-	}
-
-	@Override
-	public void nextTuple() {
-		if (this.counter < WordCountData.WORDS.length) {
-			this.collector.emit(new Values(WordCountData.WORDS[this.counter++]));
-		}
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/12b13f9c/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/util/TupleOutputFormatter.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/util/TupleOutputFormatter.java b/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/util/TupleOutputFormatter.java
deleted file mode 100644
index 6419ee3..0000000
--- a/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/util/TupleOutputFormatter.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.stormcompatibility.util;
-
-import backtype.storm.tuple.Tuple;
-
-public class TupleOutputFormatter implements OutputFormatter {
-	private static final long serialVersionUID = -599665757723851761L;
-
-	@Override
-	public String format(final Tuple input) {
-		final StringBuilder stringBuilder = new StringBuilder();
-		stringBuilder.append("(");
-		for (final Object attribute : input.getValues()) {
-			stringBuilder.append(attribute);
-			stringBuilder.append(",");
-		}
-		stringBuilder.replace(stringBuilder.length() - 1, stringBuilder.length(), ")");
-		return stringBuilder.toString();
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/12b13f9c/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCount.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCount.java b/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCount.java
deleted file mode 100644
index 606a3ce..0000000
--- a/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCount.java
+++ /dev/null
@@ -1,122 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.stormcompatibility.wordcount;
-
-import backtype.storm.topology.IRichBolt;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.examples.java.wordcount.util.WordCountData;
-import org.apache.flink.stormcompatibility.wordcount.stormoperators.StormBoltTokenizer;
-import org.apache.flink.stormcompatibility.wrappers.StormBoltWrapper;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-
-/**
- * Implements the "WordCount" program that computes a simple word occurrence histogram over text files in a streaming
- * fashion. The tokenizer step is performed by a Storm {@link IRichBolt bolt}.
- * <p/>
- * <p/>
- * The input is a plain text file with lines separated by newline characters.
- * <p/>
- * <p/>
- * Usage: <code>WordCount &lt;text path&gt; &lt;result path&gt;</code><br>
- * If no parameters are provided, the program is run with default data from {@link WordCountData}.
- * <p/>
- * <p/>
- * This example shows how to:
- * <ul>
- * <li>use a Storm bolt within a Flink Streaming program.
- * </ul>
- */
-public class BoltTokenizerWordCount {
-
-	// *************************************************************************
-	// PROGRAM
-	// *************************************************************************
-
-	public static void main(final String[] args) throws Exception {
-
-		if (!parseParameters(args)) {
-			return;
-		}
-
-		// set up the execution environment
-		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
-		// get input data
-		final DataStream<String> text = getTextDataStream(env);
-
-		final DataStream<Tuple2<String, Integer>> counts = text
-				// split up the lines in pairs (2-tuples) containing: (word,1)
-				// this is done by a Storm bolt that is wrapped accordingly
-				.transform("StormBoltTokenizer",
-						TypeExtractor.getForObject(new Tuple2<String, Integer>("", 0)),
-						new StormBoltWrapper<String, Tuple2<String, Integer>>(new StormBoltTokenizer()))
-						// split up the lines in pairs (2-tuples) containing: (word,1)
-						// group by the tuple field "0" and sum up tuple field "1"
-				.groupBy(0).sum(1);
-
-		// emit result
-		if (fileOutput) {
-			counts.writeAsText(outputPath);
-		} else {
-			counts.print();
-		}
-
-		// execute program
-		env.execute("Streaming WordCount with Storm bolt tokenizer");
-	}
-
-	// *************************************************************************
-	// UTIL METHODS
-	// *************************************************************************
-
-	private static boolean fileOutput = false;
-	private static String textPath;
-	private static String outputPath;
-
-	private static boolean parseParameters(final String[] args) {
-
-		if (args.length > 0) {
-			// parse input arguments
-			fileOutput = true;
-			if (args.length == 2) {
-				textPath = args[0];
-				outputPath = args[1];
-			} else {
-				System.err.println("Usage: BoltTokenizerWordCount <text path> <result path>");
-				return false;
-			}
-		} else {
-			System.out.println("Executing BoltTokenizerWordCount example with built-in default data");
-			System.out.println("  Provide parameters to read input data from a file");
-			System.out.println("  Usage: BoltTokenizerWordCount <text path> <result path>");
-		}
-		return true;
-	}
-
-	private static DataStream<String> getTextDataStream(final StreamExecutionEnvironment env) {
-		if (fileOutput) {
-			// read the text file from given input path
-			return env.readTextFile(textPath);
-		}
-
-		return env.fromElements(WordCountData.WORDS);
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/12b13f9c/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/SpoutSourceWordCount.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/SpoutSourceWordCount.java b/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/SpoutSourceWordCount.java
deleted file mode 100644
index 0ae51c6..0000000
--- a/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/SpoutSourceWordCount.java
+++ /dev/null
@@ -1,152 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.stormcompatibility.wordcount;
-
-import backtype.storm.topology.IRichSpout;
-import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.examples.java.wordcount.util.WordCountData;
-import org.apache.flink.stormcompatibility.util.StormFileSpout;
-import org.apache.flink.stormcompatibility.util.StormInMemorySpout;
-import org.apache.flink.stormcompatibility.wrappers.StormFiniteSpoutWrapper;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.util.Collector;
-
-/**
- * Implements the "WordCount" program that computes a simple word occurrence histogram over text files in a streaming
- * fashion. The used data source is a Storm {@link IRichSpout bolt}.
- * <p/>
- * <p/>
- * The input is a plain text file with lines separated by newline characters.
- * <p/>
- * <p/>
- * Usage: <code>WordCount &lt;text path&gt; &lt;result path&gt;</code><br>
- * If no parameters are provided, the program is run with default data from {@link WordCountData}.
- * <p/>
- * <p/>
- * This example shows how to:
- * <ul>
- * <li>use a Storm bolt within a Flink Streaming program.
- * </ul>
- */
-public class SpoutSourceWordCount {
-
-	// *************************************************************************
-	// PROGRAM
-	// *************************************************************************
-
-	public static void main(final String[] args) throws Exception {
-
-		if (!parseParameters(args)) {
-			return;
-		}
-
-		// set up the execution environment
-		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
-		// get input data
-		final DataStream<String> text = getTextDataStream(env);
-
-		final DataStream<Tuple2<String, Integer>> counts =
-				// split up the lines in pairs (2-tuples) containing: (word,1)
-				text.flatMap(new Tokenizer())
-						// group by the tuple field "0" and sum up tuple field "1"
-						.groupBy(0).sum(1);
-
-		// emit result
-		if (fileOutput) {
-			counts.writeAsText(outputPath);
-		} else {
-			counts.print();
-		}
-
-		// execute program
-		env.execute("Streaming WordCount with Storm spout source");
-	}
-
-	// *************************************************************************
-	// USER FUNCTIONS
-	// *************************************************************************
-
-	/**
-	 * Implements the string tokenizer that splits sentences into words as a user-defined FlatMapFunction. The function
-	 * takes a line (String) and splits it into multiple pairs in the form of "(word,1)" (Tuple2<String, Integer>).
-	 */
-	public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public void flatMap(final String value, final Collector<Tuple2<String, Integer>> out) throws Exception {
-			// normalize and split the line
-			final String[] tokens = value.toLowerCase().split("\\W+");
-
-			// emit the pairs
-			for (final String token : tokens) {
-				if (token.length() > 0) {
-					out.collect(new Tuple2<String, Integer>(token, 1));
-				}
-			}
-		}
-	}
-
-	// *************************************************************************
-	// UTIL METHODS
-	// *************************************************************************
-
-	private static boolean fileOutput = false;
-	private static String textPath;
-	private static String outputPath;
-
-	private static boolean parseParameters(final String[] args) {
-
-		if (args.length > 0) {
-			// parse input arguments
-			fileOutput = true;
-			if (args.length == 2) {
-				textPath = args[0];
-				outputPath = args[1];
-			} else {
-				System.err.println("Usage: SpoutSourceWordCount <text path> <result path>");
-				return false;
-			}
-		} else {
-			System.out.println("Executing SpoutSourceWordCount example with built-in default data");
-			System.out.println("  Provide parameters to read input data from a file");
-			System.out.println("  Usage: SpoutSourceWordCount <text path> <result path>");
-		}
-		return true;
-	}
-
-	private static DataStream<String> getTextDataStream(final StreamExecutionEnvironment env) {
-		if (fileOutput) {
-			// read the text file from given input path
-			final String[] tokens = textPath.split(":");
-			final String localFile = tokens[tokens.length - 1];
-			return env.addSource(
-					new StormFiniteSpoutWrapper<String>(new StormFileSpout(localFile), true),
-					TypeExtractor.getForClass(String.class)).setParallelism(1);
-		}
-
-		return env.addSource(new StormFiniteSpoutWrapper<String>(new StormInMemorySpout(WordCountData.WORDS), true),
-				TypeExtractor.getForClass(String.class));
-
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/12b13f9c/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/StormWordCountLocal.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/StormWordCountLocal.java b/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/StormWordCountLocal.java
deleted file mode 100644
index 7b4f471..0000000
--- a/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/StormWordCountLocal.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.stormcompatibility.wordcount;
-
-import backtype.storm.LocalCluster;
-import backtype.storm.generated.StormTopology;
-import backtype.storm.utils.Utils;
-import org.apache.flink.examples.java.wordcount.util.WordCountData;
-import org.apache.flink.stormcompatibility.api.FlinkLocalCluster;
-import org.apache.flink.stormcompatibility.api.FlinkTopologyBuilder;
-
-/**
- * Implements the "WordCount" program that computes a simple word occurrence histogram over text files in a streaming
- * fashion. The program is constructed as a regular {@link StormTopology} and submitted to Flink for execution in the
- * same way as to a Storm {@link LocalCluster}.
- * <p/>
- * This example shows how to run program directly within Java, thus it cannot be used to submit a {@link StormTopology}
- * via Flink command line clients (ie, bin/flink).
- * <p/>
- * <p/>
- * The input is a plain text file with lines separated by newline characters.
- * <p/>
- * <p/>
- * Usage: <code>WordCount &lt;text path&gt; &lt;result path&gt;</code><br>
- * If no parameters are provided, the program is run with default data from {@link WordCountData}.
- * <p/>
- * <p/>
- * This example shows how to:
- * <ul>
- * <li>run a regular Storm program locally on Flink
- * </ul>
- */
-public class StormWordCountLocal {
-	public final static String topologyId = "Streaming WordCount";
-
-	// *************************************************************************
-	// PROGRAM
-	// *************************************************************************
-
-	public static void main(final String[] args) throws Exception {
-
-		if (!WordCountTopology.parseParameters(args)) {
-			return;
-		}
-
-		// build Topology the Storm way
-		final FlinkTopologyBuilder builder = WordCountTopology.buildTopology();
-
-		// execute program locally
-		final FlinkLocalCluster cluster = FlinkLocalCluster.getLocalCluster();
-		cluster.submitTopology(topologyId, null, builder.createTopology());
-
-		Utils.sleep(5 * 1000);
-
-		// TODO kill does no do anything so far
-		cluster.killTopology(topologyId);
-		cluster.shutdown();
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/12b13f9c/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/StormWordCountRemoteByClient.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/StormWordCountRemoteByClient.java b/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/StormWordCountRemoteByClient.java
deleted file mode 100644
index 9e56c14..0000000
--- a/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/StormWordCountRemoteByClient.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.stormcompatibility.wordcount;
-
-import backtype.storm.Config;
-import backtype.storm.generated.AlreadyAliveException;
-import backtype.storm.generated.InvalidTopologyException;
-import backtype.storm.generated.NotAliveException;
-import backtype.storm.generated.StormTopology;
-import backtype.storm.utils.NimbusClient;
-import backtype.storm.utils.Utils;
-import org.apache.flink.examples.java.wordcount.util.WordCountData;
-import org.apache.flink.stormcompatibility.api.FlinkClient;
-import org.apache.flink.stormcompatibility.api.FlinkTopologyBuilder;
-
-/**
- * Implements the "WordCount" program that computes a simple word occurrence histogram over text files in a streaming
- * fashion. The program is constructed as a regular {@link StormTopology} and submitted to Flink for execution in the
- * same way as to a Storm cluster similar to {@link NimbusClient}. The Flink cluster can be local or remote.
- * <p/>
- * This example shows how to submit the program via Java, thus it cannot be used to submit a {@link StormTopology} via
- * Flink command line clients (ie, bin/flink).
- * <p/>
- * <p/>
- * The input is a plain text file with lines separated by newline characters.
- * <p/>
- * <p/>
- * Usage: <code>WordCount &lt;text path&gt; &lt;result path&gt;</code><br>
- * If no parameters are provided, the program is run with default data from {@link WordCountData}.
- * <p/>
- * <p/>
- * This example shows how to:
- * <ul>
- * <li>submit a regular Storm program to a local or remote Flink cluster.
- * </ul>
- */
-public class StormWordCountRemoteByClient {
-	public final static String topologyId = "Streaming WordCount";
-	private final static String uploadedJarLocation = "target/flink-storm-examples-0.9-SNAPSHOT-WordCountStorm.jar";
-
-	// *************************************************************************
-	// PROGRAM
-	// *************************************************************************
-
-	public static void main(final String[] args) throws AlreadyAliveException, InvalidTopologyException,
-			NotAliveException {
-
-		if (!WordCountTopology.parseParameters(args)) {
-			return;
-		}
-
-		// build Topology the Storm way
-		final FlinkTopologyBuilder builder = WordCountTopology.buildTopology();
-
-		// execute program on Flink cluster
-		final Config conf = new Config();
-		// can be changed to remote address
-		conf.put(Config.NIMBUS_HOST, "localhost");
-		// use default flink jobmanger.rpc.port
-		conf.put(Config.NIMBUS_THRIFT_PORT, 6123);
-
-		final FlinkClient cluster = FlinkClient.getConfiguredClient(conf);
-		cluster.submitTopology(topologyId, uploadedJarLocation, builder.createTopology());
-
-		Utils.sleep(5 * 1000);
-
-		cluster.killTopology(topologyId);
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/12b13f9c/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/StormWordCountRemoteBySubmitter.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/StormWordCountRemoteBySubmitter.java b/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/StormWordCountRemoteBySubmitter.java
deleted file mode 100644
index a1fb79d..0000000
--- a/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/StormWordCountRemoteBySubmitter.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.stormcompatibility.wordcount;
-
-import backtype.storm.Config;
-import backtype.storm.StormSubmitter;
-import backtype.storm.generated.StormTopology;
-import org.apache.flink.examples.java.wordcount.util.WordCountData;
-import org.apache.flink.stormcompatibility.api.FlinkClient;
-import org.apache.flink.stormcompatibility.api.FlinkSubmitter;
-import org.apache.flink.stormcompatibility.api.FlinkTopologyBuilder;
-
-/**
- * Implements the "WordCount" program that computes a simple word occurrence histogram over text files in a streaming
- * fashion. The program is constructed as a regular {@link StormTopology} and submitted to Flink for execution in the
- * same way as to a Storm cluster similar to {@link StormSubmitter}. The Flink cluster can be local or remote.
- * <p/>
- * This example shows how to submit the program via Java as well as Flink's command line client (ie, bin/flink).
- * <p/>
- * <p/>
- * The input is a plain text file with lines separated by newline characters.
- * <p/>
- * <p/>
- * Usage: <code>WordCount &lt;text path&gt; &lt;result path&gt;</code><br>
- * If no parameters are provided, the program is run with default data from {@link WordCountData}.
- * <p/>
- * <p/>
- * This example shows how to:
- * <ul>
- * <li>submit a regular Storm program to a local or remote Flink cluster.
- * </ul>
- */
-public class StormWordCountRemoteBySubmitter {
-	public final static String topologyId = "Streaming WordCount";
-
-	// *************************************************************************
-	// PROGRAM
-	// *************************************************************************
-
-	public static void main(final String[] args) throws Exception {
-
-		if (!WordCountTopology.parseParameters(args)) {
-			return;
-		}
-
-		// build Topology the Storm way
-		final FlinkTopologyBuilder builder = WordCountTopology.buildTopology();
-
-		// execute program on Flink cluster
-		final Config conf = new Config();
-		// We can set Jobmanager host/port values manually or leave them blank
-		// if not set and
-		// - executed within Java, default values "localhost" and "6123" are set by FlinkSubmitter
-		// - executed via bin/flink values from flink-conf.yaml are set by FlinkSubmitter.
-		// conf.put(Config.NIMBUS_HOST, "localhost");
-		// conf.put(Config.NIMBUS_THRIFT_PORT, new Integer(6123));
-
-		// The user jar file must be specified via JVM argument if executed via Java.
-		// => -Dstorm.jar=target/flink-storm-examples-0.9-SNAPSHOT-WordCountStorm.jar
-		// If bin/flink is used, the jar file is detected automatically.
-		FlinkSubmitter.submitTopology(topologyId, conf, builder.createTopology());
-
-		Thread.sleep(5 * 1000);
-
-		FlinkClient.getConfiguredClient(conf).killTopology(topologyId);
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/12b13f9c/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/WordCountTopology.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/WordCountTopology.java b/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/WordCountTopology.java
deleted file mode 100644
index d39a526..0000000
--- a/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/WordCountTopology.java
+++ /dev/null
@@ -1,119 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.stormcompatibility.wordcount;
-
-import backtype.storm.generated.StormTopology;
-import backtype.storm.tuple.Fields;
-import org.apache.flink.examples.java.wordcount.util.WordCountData;
-import org.apache.flink.stormcompatibility.api.FlinkTopologyBuilder;
-import org.apache.flink.stormcompatibility.util.OutputFormatter;
-import org.apache.flink.stormcompatibility.util.StormBoltFileSink;
-import org.apache.flink.stormcompatibility.util.StormBoltPrintSink;
-import org.apache.flink.stormcompatibility.util.StormFileSpout;
-import org.apache.flink.stormcompatibility.util.StormInMemorySpout;
-import org.apache.flink.stormcompatibility.util.TupleOutputFormatter;
-import org.apache.flink.stormcompatibility.wordcount.stormoperators.StormBoltCounter;
-import org.apache.flink.stormcompatibility.wordcount.stormoperators.StormBoltTokenizer;
-
-/**
- * Implements the "WordCount" program that computes a simple word occurrence histogram over text files in a streaming
- * fashion. The program is constructed as a regular {@link StormTopology}.
- * <p/>
- * <p/>
- * The input is a plain text file with lines separated by newline characters.
- * <p/>
- * <p/>
- * Usage: <code>WordCount &lt;text path&gt; &lt;result path&gt;</code><br>
- * If no parameters are provided, the program is run with default data from {@link WordCountData}.
- * <p/>
- * <p/>
- * This example shows how to:
- * <ul>
- * <li>how to construct a regular Storm topology as Flink program
- * </ul>
- */
-public class WordCountTopology {
-	public final static String spoutId = "source";
-	public final static String tokenierzerId = "tokenizer";
-	public final static String counterId = "counter";
-	public final static String sinkId = "sink";
-	private final static OutputFormatter formatter = new TupleOutputFormatter();
-
-	public static FlinkTopologyBuilder buildTopology() {
-
-		final FlinkTopologyBuilder builder = new FlinkTopologyBuilder();
-
-		// get input data
-		if (fileInputOutput) {
-			// read the text file from given input path
-			final String[] tokens = textPath.split(":");
-			final String inputFile = tokens[tokens.length - 1];
-			builder.setSpout(spoutId, new StormFileSpout(inputFile));
-		} else {
-			builder.setSpout(spoutId, new StormInMemorySpout(WordCountData.WORDS));
-		}
-
-		// split up the lines in pairs (2-tuples) containing: (word,1)
-		builder.setBolt(tokenierzerId, new StormBoltTokenizer(), 4).shuffleGrouping(spoutId);
-		// group by the tuple field "0" and sum up tuple field "1"
-		builder.setBolt(counterId, new StormBoltCounter(), 4).fieldsGrouping(tokenierzerId,
-				new Fields(StormBoltTokenizer.ATTRIBUTE_WORD));
-
-		// emit result
-		if (fileInputOutput) {
-			// read the text file from given input path
-			final String[] tokens = outputPath.split(":");
-			final String outputFile = tokens[tokens.length - 1];
-			builder.setBolt(sinkId, new StormBoltFileSink(outputFile, formatter)).shuffleGrouping(counterId);
-		} else {
-			builder.setBolt(sinkId, new StormBoltPrintSink(formatter), 4).shuffleGrouping(counterId);
-		}
-
-		return builder;
-	}
-
-	// *************************************************************************
-	// UTIL METHODS
-	// *************************************************************************
-
-	private static boolean fileInputOutput = false;
-	private static String textPath;
-	private static String outputPath;
-
-	static boolean parseParameters(final String[] args) {
-
-		if (args.length > 0) {
-			// parse input arguments
-			fileInputOutput = true;
-			if (args.length == 2) {
-				textPath = args[0];
-				outputPath = args[1];
-			} else {
-				System.err.println("Usage: StormWordCount* <text path> <result path>");
-				return false;
-			}
-		} else {
-			System.out.println("Executing StormWordCount* example with built-in default data");
-			System.out.println("  Provide parameters to read input data from a file");
-			System.out.println("  Usage: StormWordCount* <text path> <result path>");
-		}
-
-		return true;
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/12b13f9c/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/stormoperators/StormBoltCounter.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/stormoperators/StormBoltCounter.java b/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/stormoperators/StormBoltCounter.java
deleted file mode 100644
index 214ca5e..0000000
--- a/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/stormoperators/StormBoltCounter.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.stormcompatibility.wordcount.stormoperators;
-
-import backtype.storm.task.OutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.IRichBolt;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Tuple;
-import backtype.storm.tuple.Values;
-
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * Implements the word counter that the occurrence of each unique word. The bolt takes a pair (input tuple schema:
- * {@code <String,Integer>}) and sums the given word count for each unique word (output tuple schema:
- * {@code <String,Integer>} ).
- */
-public class StormBoltCounter implements IRichBolt {
-	private static final long serialVersionUID = 399619605462625934L;
-
-	public static final String ATTRIBUTE_WORD = "word";
-	public static final String ATTRIBUTE_COUNT = "count";
-
-	private final HashMap<String, Count> counts = new HashMap<String, Count>();
-	private OutputCollector collector;
-
-	@SuppressWarnings("rawtypes")
-	@Override
-	public void prepare(final Map stormConf, final TopologyContext context, final OutputCollector collector) {
-		this.collector = collector;
-	}
-
-	@Override
-	public void execute(final Tuple input) {
-		final String word = input.getString(StormBoltTokenizer.ATTRIBUTE_WORD_INDEX);
-
-		Count currentCount = this.counts.get(word);
-		if (currentCount == null) {
-			currentCount = new Count();
-			this.counts.put(word, currentCount);
-		}
-		currentCount.count += input.getInteger(StormBoltTokenizer.ATTRIBUTE_COUNT_INDEX);
-
-		this.collector.emit(new Values(word, currentCount.count));
-	}
-
-	@Override
-	public void cleanup() {/* nothing to do */}
-
-	@Override
-	public void declareOutputFields(final OutputFieldsDeclarer declarer) {
-		declarer.declare(new Fields(ATTRIBUTE_WORD, ATTRIBUTE_COUNT));
-	}
-
-	@Override
-	public Map<String, Object> getComponentConfiguration() {
-		return null;
-	}
-
-	/**
-	 * A counter helper to emit immutable tuples to the given stormCollector and avoid unnecessary object
-	 * creating/deletion.
-	 *
-	 * @author mjsax
-	 */
-	private static final class Count {
-		public int count;
-
-		public Count() {/* nothing to do */}
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/12b13f9c/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/stormoperators/StormBoltTokenizer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/stormoperators/StormBoltTokenizer.java b/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/stormoperators/StormBoltTokenizer.java
deleted file mode 100644
index 96bd87c..0000000
--- a/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/stormoperators/StormBoltTokenizer.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.stormcompatibility.wordcount.stormoperators;
-
-import backtype.storm.task.OutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.IRichBolt;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Tuple;
-import backtype.storm.tuple.Values;
-
-import java.util.Map;
-
-/**
- * Implements the string tokenizer that splits sentences into words as a Storm bolt. The bolt takes a line (input tuple
- * schema: {@code <String>}) and splits it into multiple pairs in the form of "(word,1)" (output tuple schema:
- * {@code <String,Integer>}).
- */
-public final class StormBoltTokenizer implements IRichBolt {
-	private static final long serialVersionUID = -8589620297208175149L;
-
-	public static final String ATTRIBUTE_WORD = "word";
-	public static final String ATTRIBUTE_COUNT = "count";
-
-	public static final int ATTRIBUTE_WORD_INDEX = 0;
-	public static final int ATTRIBUTE_COUNT_INDEX = 1;
-
-	private OutputCollector collector;
-
-	@SuppressWarnings("rawtypes")
-	@Override
-	public void prepare(final Map stormConf, final TopologyContext context, final OutputCollector collector) {
-		this.collector = collector;
-	}
-
-	@Override
-	public void execute(final Tuple input) {
-		final String[] tokens = input.getString(0).toLowerCase().split("\\W+");
-
-		for (final String token : tokens) {
-			if (token.length() > 0) {
-				this.collector.emit(new Values(token, 1));
-			}
-		}
-	}
-
-	@Override
-	public void cleanup() {/* nothing to do */}
-
-	@Override
-	public void declareOutputFields(final OutputFieldsDeclarer declarer) {
-		declarer.declare(new Fields(ATTRIBUTE_WORD, ATTRIBUTE_COUNT));
-	}
-
-	@Override
-	public Map<String, Object> getComponentConfiguration() {
-		return null;
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/12b13f9c/flink-staging/flink-streaming/flink-storm-examples/src/test/java/org/apache/flink/stormcompatibility/api/FlinkTestCluster.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-storm-examples/src/test/java/org/apache/flink/stormcompatibility/api/FlinkTestCluster.java b/flink-staging/flink-streaming/flink-storm-examples/src/test/java/org/apache/flink/stormcompatibility/api/FlinkTestCluster.java
deleted file mode 100644
index 68f1216..0000000
--- a/flink-staging/flink-streaming/flink-storm-examples/src/test/java/org/apache/flink/stormcompatibility/api/FlinkTestCluster.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.stormcompatibility.api;
-
-import backtype.storm.LocalCluster;
-import backtype.storm.generated.ClusterSummary;
-import backtype.storm.generated.KillOptions;
-import backtype.storm.generated.RebalanceOptions;
-import backtype.storm.generated.StormTopology;
-import backtype.storm.generated.SubmitOptions;
-import backtype.storm.generated.TopologyInfo;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.util.TestStreamEnvironment;
-
-import java.util.Map;
-
-/**
- * {@link FlinkTestCluster} mimics a Storm {@link LocalCluster} for ITCases via a {@link TestStreamEnvironment}.
- */
-public class FlinkTestCluster extends FlinkLocalCluster {
-
-	@Override
-	public void submitTopology(final String topologyName, final Map<?, ?> conf, final FlinkTopology topology)
-			throws Exception {
-		this.submitTopologyWithOpts(topologyName, conf, topology, null);
-	}
-
-	@Override
-	public void submitTopologyWithOpts(final String topologyName, final Map<?, ?> conf, final FlinkTopology topology,
-			final SubmitOptions submitOpts)
-			throws Exception {
-		final TestStreamEnvironment env = (TestStreamEnvironment) StreamExecutionEnvironment.getExecutionEnvironment();
-		env.start(topology.getStreamGraph().getJobGraph(topologyName));
-	}
-
-	@Override
-	public void killTopology(final String topologyName) {
-	}
-
-	@Override
-	public void killTopologyWithOpts(final String name, final KillOptions options) {
-	}
-
-	@Override
-	public void activate(final String topologyName) {
-	}
-
-	@Override
-	public void deactivate(final String topologyName) {
-	}
-
-	@Override
-	public void rebalance(final String name, final RebalanceOptions options) {
-	}
-
-	@Override
-	public void shutdown() {
-		final TestStreamEnvironment env = (TestStreamEnvironment) StreamExecutionEnvironment.getExecutionEnvironment();
-		try {
-			env.shutdown();
-		} catch (final InterruptedException e) {
-			e.printStackTrace();
-		}
-	}
-
-	@Override
-	public String getTopologyConf(final String id) {
-		return null;
-	}
-
-	@Override
-	public StormTopology getTopology(final String id) {
-		return null;
-	}
-
-	@Override
-	public ClusterSummary getClusterInfo() {
-		return null;
-	}
-
-	@Override
-	public TopologyInfo getTopologyInfo(final String id) {
-		return null;
-	}
-
-	@Override
-	public Map<?, ?> getState() {
-		return null;
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/12b13f9c/flink-staging/flink-streaming/flink-storm-examples/src/test/java/org/apache/flink/stormcompatibility/exclamation/StormBoltExclamationITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-storm-examples/src/test/java/org/apache/flink/stormcompatibility/exclamation/StormBoltExclamationITCase.java b/flink-staging/flink-streaming/flink-storm-examples/src/test/java/org/apache/flink/stormcompatibility/exclamation/StormBoltExclamationITCase.java
deleted file mode 100644
index 75dd5fc..0000000
--- a/flink-staging/flink-streaming/flink-storm-examples/src/test/java/org/apache/flink/stormcompatibility/exclamation/StormBoltExclamationITCase.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.stormcompatibility.exclamation;
-
-import org.apache.flink.stormcompatibility.excamation.StormBoltExclamation;
-import org.apache.flink.stormcompatibility.exclamation.util.ExclamationData;
-import org.apache.flink.streaming.util.StreamingProgramTestBase;
-import org.apache.flink.test.testdata.WordCountData;
-
-public class StormBoltExclamationITCase extends StreamingProgramTestBase {
-
-	protected String textPath;
-	protected String resultPath;
-
-	@Override
-	protected void preSubmit() throws Exception {
-		this.textPath = this.createTempFile("text.txt", WordCountData.TEXT);
-		this.resultPath = this.getTempDirPath("result");
-	}
-
-	@Override
-	protected void postSubmit() throws Exception {
-		compareResultsByLinesInMemory(ExclamationData.TEXT_WITH_EXCLAMATIONS, this.resultPath);
-	}
-
-	@Override
-	protected void testProgram() throws Exception {
-		StormBoltExclamation.main(new String[]{this.textPath, this.resultPath});
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/12b13f9c/flink-staging/flink-streaming/flink-storm-examples/src/test/java/org/apache/flink/stormcompatibility/exclamation/StormExclamationLocalITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-storm-examples/src/test/java/org/apache/flink/stormcompatibility/exclamation/StormExclamationLocalITCase.java b/flink-staging/flink-streaming/flink-storm-examples/src/test/java/org/apache/flink/stormcompatibility/exclamation/StormExclamationLocalITCase.java
deleted file mode 100644
index d6bcf30..0000000
--- a/flink-staging/flink-streaming/flink-storm-examples/src/test/java/org/apache/flink/stormcompatibility/exclamation/StormExclamationLocalITCase.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.stormcompatibility.exclamation;
-
-import org.apache.flink.stormcompatibility.api.FlinkLocalCluster;
-import org.apache.flink.stormcompatibility.api.FlinkTestCluster;
-import org.apache.flink.stormcompatibility.excamation.StormExclamationLocal;
-import org.apache.flink.stormcompatibility.exclamation.util.ExclamationData;
-import org.apache.flink.streaming.util.StreamingProgramTestBase;
-import org.apache.flink.test.testdata.WordCountData;
-
-public class StormExclamationLocalITCase extends StreamingProgramTestBase {
-
-	protected String textPath;
-	protected String resultPath;
-
-	@Override
-	protected void preSubmit() throws Exception {
-		FlinkLocalCluster.initialize(new FlinkTestCluster());
-		this.textPath = this.createTempFile("text.txt", WordCountData.TEXT);
-		this.resultPath = this.getTempDirPath("result");
-	}
-
-	@Override
-	protected void postSubmit() throws Exception {
-		compareResultsByLinesInMemory(ExclamationData.TEXT_WITH_EXCLAMATIONS, this.resultPath);
-	}
-
-	@Override
-	protected void testProgram() throws Exception {
-		StormExclamationLocal.main(new String[]{this.textPath, this.resultPath});
-	}
-}


Mime
View raw message