flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mbala...@apache.org
Subject [26/27] flink git commit: [storm-compat] Moved Storm-compatibility to flink-contrib and split flink-contrib into small sub-projects
Date Mon, 15 Jun 2015 09:33:16 GMT
[storm-compat] Moved Storm-compatibility to flink-contrib and split flink-contrib into small sub-projects

Closes #573


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/12b13f9c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/12b13f9c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/12b13f9c

Branch: refs/heads/master
Commit: 12b13f9c066e0cee2b1ee29e9c64b064f50fa033
Parents: e39699f
Author: mbalassi <mbalassi@apache.org>
Authored: Wed Jun 10 01:40:03 2015 +0200
Committer: mbalassi <mbalassi@apache.org>
Committed: Sun Jun 14 23:02:53 2015 +0200

----------------------------------------------------------------------
 .../flink-storm-compatibility-core/README.md    |  15 +
 .../flink-storm-compatibility-core/pom.xml      | 105 +++
 .../stormcompatibility/api/FlinkClient.java     | 301 ++++++++
 .../api/FlinkLocalCluster.java                  | 115 +++
 .../api/FlinkOutputFieldsDeclarer.java          | 166 +++++
 .../stormcompatibility/api/FlinkSubmitter.java  | 206 +++++
 .../stormcompatibility/api/FlinkTopology.java   |  97 +++
 .../api/FlinkTopologyBuilder.java               | 303 ++++++++
 .../api/FlinkTopologyContext.java               | 161 ++++
 .../wrappers/AbstractStormCollector.java        | 111 +++
 .../wrappers/AbstractStormSpoutWrapper.java     | 123 +++
 .../wrappers/StormBoltCollector.java            |  92 +++
 .../wrappers/StormBoltWrapper.java              | 117 +++
 .../wrappers/StormFiniteSpoutWrapper.java       | 134 ++++
 .../wrappers/StormOutputFieldsDeclarer.java     |  74 ++
 .../wrappers/StormSpoutCollector.java           |  80 ++
 .../wrappers/StormSpoutWrapper.java             |  78 ++
 .../stormcompatibility/wrappers/StormTuple.java | 253 +++++++
 .../wrappers/StormWrapperSetupHelper.java       | 112 +++
 .../api/FlinkOutputFieldsDeclarerTest.java      | 172 +++++
 .../api/FlinkTopologyContextTest.java           |  74 ++
 .../api/FlinkTopologyTest.java                  |  62 ++
 .../stormcompatibility/util/AbstractTest.java   |  39 +
 .../wrappers/FiniteTestSpout.java               |  78 ++
 .../wrappers/StormBoltCollectorTest.java        | 101 +++
 .../wrappers/StormBoltWrapperTest.java          | 189 +++++
 .../wrappers/StormFiniteSpoutWrapperTest.java   | 110 +++
 .../wrappers/StormOutputFieldsDeclarerTest.java |  80 ++
 .../wrappers/StormSpoutCollectorTest.java       |  88 +++
 .../wrappers/StormSpoutWrapperTest.java         |  64 ++
 .../wrappers/StormTupleTest.java                | 349 +++++++++
 .../wrappers/StormWrapperSetupHelperTest.java   | 136 ++++
 .../wrappers/TestContext.java                   |  40 +
 .../README.md                                   |  15 +
 .../flink-storm-compatibility-examples/pom.xml  | 283 +++++++
 .../src/assembly/word-count-storm.xml           |  72 ++
 .../excamation/ExclamationTopology.java         |  99 +++
 .../excamation/StormBoltExclamation.java        | 113 +++
 .../excamation/StormExclamationLocal.java       |  52 ++
 .../StormExclamationRemoteByClient.java         |  61 ++
 .../StormExclamationRemoteBySubmitter.java      |  60 ++
 .../excamation/StormSpoutExclamation.java       | 118 +++
 .../stormoperators/ExclamationBolt.java         |  58 ++
 .../singlejoin/SingleJoinTopology.java          |  90 +++
 .../singlejoin/StormSingleJoinLocal.java        |  50 ++
 .../singlejoin/stormoperators/AgeSpout.java     |  54 ++
 .../singlejoin/stormoperators/GenderSpout.java  |  45 ++
 .../stormoperators/SingleJoinBolt.java          | 132 ++++
 .../util/AbstractStormBoltSink.java             |  76 ++
 .../util/AbstractStormSpout.java                |  70 ++
 .../util/OutputFormatter.java                   |  29 +
 .../util/SimpleOutputFormatter.java             |  31 +
 .../util/StormBoltFileSink.java                 |  72 ++
 .../util/StormBoltPrintSink.java                |  45 ++
 .../stormcompatibility/util/StormFileSpout.java |  78 ++
 .../util/StormInMemorySpout.java                |  43 ++
 .../util/TupleOutputFormatter.java              |  38 +
 .../wordcount/BoltTokenizerWordCount.java       | 122 +++
 .../wordcount/SpoutSourceWordCount.java         | 152 ++++
 .../wordcount/StormWordCountLocal.java          |  75 ++
 .../wordcount/StormWordCountRemoteByClient.java |  85 +++
 .../StormWordCountRemoteBySubmitter.java        |  83 +++
 .../wordcount/WordCountTopology.java            | 119 +++
 .../stormoperators/StormBoltCounter.java        |  90 +++
 .../stormoperators/StormBoltTokenizer.java      |  76 ++
 .../api/FlinkTestCluster.java                   | 107 +++
 .../exclamation/StormBoltExclamationITCase.java |  47 ++
 .../StormExclamationLocalITCase.java            |  49 ++
 .../StormSpoutExclamationITCase.java            |  47 ++
 .../exclamation/util/ExclamationData.java       |  98 +++
 .../wordcount/BoltTokenizerWordCountITCase.java |  45 ++
 .../wordcount/SpoutSourceWordCountITCase.java   |  45 ++
 .../wordcount/StormWordCountLocalITCase.java    |  48 ++
 flink-contrib/flink-storm-compatibility/pom.xml |  40 +
 flink-contrib/flink-streaming-contrib/pom.xml   |  58 ++
 .../main/java/org/apache/flink/contrib/.hidden  |   0
 .../flink/contrib/streaming/CollectSink.java    | 152 ++++
 .../contrib/streaming/DataStreamIterator.java   | 134 ++++
 .../contrib/streaming/DataStreamUtils.java      |  86 +++
 .../test/java/org/apache/flink/contrib/.hidden  |   0
 .../flink/contrib/streaming/CollectITCase.java  |  60 ++
 flink-contrib/flink-tweet-inputformat/pom.xml   |  64 ++
 .../main/java/org/apache/flink/contrib/.hidden  |   0
 .../io/SimpleTweetInputFormat.java              |  93 +++
 .../tweetinputformat/io/TweetHandler.java       | 747 +++++++++++++++++++
 .../tweetinputformat/model/User/Users.java      | 479 ++++++++++++
 .../model/places/Attributes.java                | 113 +++
 .../model/places/BoundingBox.java               |  60 ++
 .../tweetinputformat/model/places/Places.java   | 130 ++++
 .../model/tweet/Contributors.java               |  78 ++
 .../model/tweet/Coordinates.java                |  56 ++
 .../model/tweet/CurrentUserRetweet.java         |  55 ++
 .../tweetinputformat/model/tweet/Tweet.java     | 346 +++++++++
 .../model/tweet/entities/Entities.java          |  90 +++
 .../model/tweet/entities/HashTags.java          |  58 ++
 .../model/tweet/entities/Media.java             | 143 ++++
 .../model/tweet/entities/Size.java              |  64 ++
 .../model/tweet/entities/Symbol.java            |  52 ++
 .../model/tweet/entities/URL.java               |  70 ++
 .../model/tweet/entities/UserMention.java       |  80 ++
 .../src/main/resources/.hidden                  |   0
 .../src/main/resources/HashTagTweetSample.json  |   4 +
 .../test/java/org/apache/flink/contrib/.hidden  |   0
 .../SimpleTweetInputFormatTest.java             |  98 +++
 flink-contrib/pom.xml                           |  40 +-
 .../main/java/org/apache/flink/contrib/.hidden  |   0
 .../flink/contrib/streaming/CollectSink.java    | 152 ----
 .../contrib/streaming/DataStreamIterator.java   | 134 ----
 .../contrib/streaming/DataStreamUtils.java      |  86 ---
 .../io/SimpleTweetInputFormat.java              |  93 ---
 .../tweetinputformat/io/TweetHandler.java       | 747 -------------------
 .../tweetinputformat/model/User/Users.java      | 479 ------------
 .../model/places/Attributes.java                | 113 ---
 .../model/places/BoundingBox.java               |  60 --
 .../tweetinputformat/model/places/Places.java   | 130 ----
 .../model/tweet/Contributors.java               |  78 --
 .../model/tweet/Coordinates.java                |  56 --
 .../model/tweet/CurrentUserRetweet.java         |  55 --
 .../tweetinputformat/model/tweet/Tweet.java     | 346 ---------
 .../model/tweet/entities/Entities.java          |  90 ---
 .../model/tweet/entities/HashTags.java          |  58 --
 .../model/tweet/entities/Media.java             | 143 ----
 .../model/tweet/entities/Size.java              |  64 --
 .../model/tweet/entities/Symbol.java            |  52 --
 .../model/tweet/entities/URL.java               |  70 --
 .../model/tweet/entities/UserMention.java       |  80 --
 flink-contrib/src/main/resources/.hidden        |   0
 .../src/main/resources/HashTagTweetSample.json  |   4 -
 .../test/java/org/apache/flink/contrib/.hidden  |   0
 .../flink/contrib/streaming/CollectITCase.java  |  60 --
 .../SimpleTweetInputFormatTest.java             |  98 ---
 .../flink-storm-compatibility/README.md         |  15 -
 .../flink-storm-compatibility/pom.xml           | 105 ---
 .../stormcompatibility/api/FlinkClient.java     | 301 --------
 .../api/FlinkLocalCluster.java                  | 115 ---
 .../api/FlinkOutputFieldsDeclarer.java          | 166 -----
 .../stormcompatibility/api/FlinkSubmitter.java  | 206 -----
 .../stormcompatibility/api/FlinkTopology.java   |  97 ---
 .../api/FlinkTopologyBuilder.java               | 303 --------
 .../api/FlinkTopologyContext.java               | 161 ----
 .../wrappers/AbstractStormCollector.java        | 111 ---
 .../wrappers/AbstractStormSpoutWrapper.java     | 123 ---
 .../wrappers/StormBoltCollector.java            |  92 ---
 .../wrappers/StormBoltWrapper.java              | 117 ---
 .../wrappers/StormFiniteSpoutWrapper.java       | 134 ----
 .../wrappers/StormOutputFieldsDeclarer.java     |  74 --
 .../wrappers/StormSpoutCollector.java           |  80 --
 .../wrappers/StormSpoutWrapper.java             |  78 --
 .../stormcompatibility/wrappers/StormTuple.java | 253 -------
 .../wrappers/StormWrapperSetupHelper.java       | 112 ---
 .../api/FlinkOutputFieldsDeclarerTest.java      | 172 -----
 .../api/FlinkTopologyContextTest.java           |  74 --
 .../api/FlinkTopologyTest.java                  |  62 --
 .../stormcompatibility/util/AbstractTest.java   |  39 -
 .../wrappers/FiniteTestSpout.java               |  78 --
 .../wrappers/StormBoltCollectorTest.java        | 101 ---
 .../wrappers/StormBoltWrapperTest.java          | 189 -----
 .../wrappers/StormFiniteSpoutWrapperTest.java   | 110 ---
 .../wrappers/StormOutputFieldsDeclarerTest.java |  80 --
 .../wrappers/StormSpoutCollectorTest.java       |  88 ---
 .../wrappers/StormSpoutWrapperTest.java         |  64 --
 .../wrappers/StormTupleTest.java                | 349 ---------
 .../wrappers/StormWrapperSetupHelperTest.java   | 136 ----
 .../wrappers/TestContext.java                   |  40 -
 .../flink-storm-examples/README.md              |  15 -
 .../flink-storm-examples/pom.xml                | 283 -------
 .../src/assembly/word-count-storm.xml           |  72 --
 .../excamation/ExclamationTopology.java         |  99 ---
 .../excamation/StormBoltExclamation.java        | 113 ---
 .../excamation/StormExclamationLocal.java       |  52 --
 .../StormExclamationRemoteByClient.java         |  61 --
 .../StormExclamationRemoteBySubmitter.java      |  60 --
 .../excamation/StormSpoutExclamation.java       | 118 ---
 .../stormoperators/ExclamationBolt.java         |  58 --
 .../singlejoin/SingleJoinTopology.java          |  90 ---
 .../singlejoin/StormSingleJoinLocal.java        |  50 --
 .../singlejoin/stormoperators/AgeSpout.java     |  54 --
 .../singlejoin/stormoperators/GenderSpout.java  |  45 --
 .../stormoperators/SingleJoinBolt.java          | 132 ----
 .../util/AbstractStormBoltSink.java             |  76 --
 .../util/AbstractStormSpout.java                |  70 --
 .../util/OutputFormatter.java                   |  29 -
 .../util/SimpleOutputFormatter.java             |  31 -
 .../util/StormBoltFileSink.java                 |  72 --
 .../util/StormBoltPrintSink.java                |  45 --
 .../stormcompatibility/util/StormFileSpout.java |  78 --
 .../util/StormInMemorySpout.java                |  43 --
 .../util/TupleOutputFormatter.java              |  38 -
 .../wordcount/BoltTokenizerWordCount.java       | 122 ---
 .../wordcount/SpoutSourceWordCount.java         | 152 ----
 .../wordcount/StormWordCountLocal.java          |  75 --
 .../wordcount/StormWordCountRemoteByClient.java |  85 ---
 .../StormWordCountRemoteBySubmitter.java        |  83 ---
 .../wordcount/WordCountTopology.java            | 119 ---
 .../stormoperators/StormBoltCounter.java        |  90 ---
 .../stormoperators/StormBoltTokenizer.java      |  76 --
 .../api/FlinkTestCluster.java                   | 107 ---
 .../exclamation/StormBoltExclamationITCase.java |  47 --
 .../StormExclamationLocalITCase.java            |  49 --
 .../StormSpoutExclamationITCase.java            |  47 --
 .../exclamation/util/ExclamationData.java       |  98 ---
 .../wordcount/BoltTokenizerWordCountITCase.java |  45 --
 .../wordcount/SpoutSourceWordCountITCase.java   |  45 --
 .../wordcount/StormWordCountLocalITCase.java    |  48 --
 flink-staging/flink-streaming/pom.xml           |   2 -
 pom.xml                                         |   2 +-
 206 files changed, 10716 insertions(+), 10580 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/12b13f9c/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/README.md
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/README.md b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/README.md
new file mode 100644
index 0000000..0d490a3
--- /dev/null
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/README.md
@@ -0,0 +1,15 @@
+# flink-storm-compatibility
+
+The Storm compatibility layer allows to embed spouts or bolt unmodified within a regular Flink streaming program (`StormSpoutWrapper` and `StormBoltWrapper`). Additionally, a whole Storm topology can be submitted to Flink (see `FlinkTopologyBuilder`, `FlinkLocalCluster`, and `FlinkSubmitter`). Only a few minor changes to the original submitting code are required. The code that builds the topology itself, can be reused unmodified. See `flink-storm-examples` for a simple word-count example.
+
+The following Strom features are not (yet/fully) supported by the compatibility layer right now:
+* the spout/bolt configuration within `open()`/`prepare()` is not yet supported (ie, `Map conf` parameter)
+* topology and tuple meta information (ie, `TopologyContext` not fully supported)
+* access to tuple attributes (ie, fields) only by index (access by name is coming)
+* only default stream is supported currently (ie, only a single output stream)
+* no fault-tolerance guarantees (ie, calls to `ack()`/`fail()` and anchoring is ignored)
+* for whole Storm topologies the following is not supported by Flink:
+  * direct emit connection pattern
+  * activating/deactivating and rebalancing of topologies
+  * task hooks
+  * custom metrics

http://git-wip-us.apache.org/repos/asf/flink/blob/12b13f9c/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/pom.xml
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/pom.xml b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/pom.xml
new file mode 100644
index 0000000..b09e63a
--- /dev/null
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/pom.xml
@@ -0,0 +1,105 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+	<modelVersion>4.0.0</modelVersion>
+
+	<parent>
+		<groupId>org.apache.flink</groupId>
+		<artifactId>flink-storm-compatibility-parent</artifactId>
+		<version>0.9-SNAPSHOT</version>
+		<relativePath>..</relativePath>
+	</parent>
+
+	<artifactId>flink-storm-compatibility-core</artifactId>
+	<name>flink-storm-compatibility-core</name>
+
+	<packaging>jar</packaging>
+
+	<dependencies>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-streaming-core</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.storm</groupId>
+			<artifactId>storm-core</artifactId>
+			<version>0.9.4</version>
+			<exclusions>
+				<exclusion>
+					<groupId>org.slf4j</groupId>
+					<artifactId>log4j-over-slf4j</artifactId>
+				</exclusion>
+			</exclusions>
+		</dependency>
+
+	</dependencies>
+
+	<build>
+		<plugins>
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-jar-plugin</artifactId>
+				<executions>
+					<execution>
+						<goals>
+							<goal>test-jar</goal>
+						</goals>
+					</execution>
+				</executions>
+			</plugin>
+		</plugins>
+
+		<pluginManagement>
+			<plugins>
+				<!--This plugin's configuration is used to store Eclipse m2e settings only. It has no influence on the Maven build itself.-->
+				<plugin>
+					<groupId>org.eclipse.m2e</groupId>
+					<artifactId>lifecycle-mapping</artifactId>
+					<version>1.0.0</version>
+					<configuration>
+						<lifecycleMappingMetadata>
+							<pluginExecutions>
+								<pluginExecution>
+									<pluginExecutionFilter>
+										<groupId>org.apache.maven.plugins</groupId>
+										<artifactId>maven-dependency-plugin</artifactId>
+										<versionRange>[2.9,)</versionRange>
+										<goals>
+											<goal>unpack</goal>
+										</goals>
+									</pluginExecutionFilter>
+									<action>
+										<ignore/>
+									</action>
+								</pluginExecution>
+							</pluginExecutions>
+						</lifecycleMappingMetadata>
+					</configuration>
+				</plugin>
+			</plugins>
+		</pluginManagement>
+
+	</build>
+
+</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/12b13f9c/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkClient.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkClient.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkClient.java
new file mode 100644
index 0000000..242c154
--- /dev/null
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkClient.java
@@ -0,0 +1,301 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.stormcompatibility.api;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.pattern.Patterns;
+import akka.util.Timeout;
+import backtype.storm.Config;
+import backtype.storm.generated.AlreadyAliveException;
+import backtype.storm.generated.InvalidTopologyException;
+import backtype.storm.generated.KillOptions;
+import backtype.storm.generated.Nimbus;
+import backtype.storm.generated.NotAliveException;
+import backtype.storm.utils.NimbusClient;
+import backtype.storm.utils.Utils;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.client.program.Client;
+import org.apache.flink.client.program.JobWithJars;
+import org.apache.flink.client.program.ProgramInvocationException;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.client.JobStatusMessage;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobmanager.JobManager;
+import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.runtime.messages.JobManagerMessages.CancelJob;
+import org.apache.flink.runtime.messages.JobManagerMessages.RunningJobsStatus;
+import scala.Some;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * {@link FlinkClient} mimics a Storm {@link NimbusClient} and {@link Nimbus}{@code .Client} at once, to interact with
+ * Flink's JobManager instead of Storm's Nimbus.
+ */
+public class FlinkClient {
+
+	//The jobmanager's host name
+	private final String jobManagerHost;
+	//The jobmanager's rpc port
+	private final int jobManagerPort;
+	//The user specified timeout in milliseconds
+	private final String timeout;
+
+	// The following methods are derived from "backtype.storm.utils.NimbusClient"
+
+	/**
+	 * Instantiates a new {@link FlinkClient} for the given configuration, host name, and port. If values for {@link
+	 * Config#NIMBUS_HOST} and {@link Config#NIMBUS_THRIFT_PORT} of the given configuration are ignored.
+	 *
+	 * @param conf
+	 * 		A configuration.
+	 * @param host
+	 * 		The jobmanager's host name.
+	 * @param port
+	 * 		The jobmanager's rpc port.
+	 */
+	public FlinkClient(final String host, final int port) {
+		this(host, port, null);
+	}
+
+	/**
+	 * Instantiates a new {@link FlinkClient} for the given configuration, host name, and port. If values for {@link
+	 * Config#NIMBUS_HOST} and {@link Config#NIMBUS_THRIFT_PORT} of the given configuration are ignored.
+	 *
+	 * @param conf
+	 * 		A configuration.
+	 * @param host
+	 * 		The jobmanager's host name.
+	 * @param port
+	 * 		The jobmanager's rpc port.
+	 * @param timeout
+	 * 		Timeout
+	 */
+	public FlinkClient(final String host, final int port, final Integer timeout) {
+		this.jobManagerHost = host;
+		this.jobManagerPort = port;
+		if (timeout != null) {
+			this.timeout = timeout + " ms";
+		} else {
+			this.timeout = null;
+		}
+	}
+
+	/**
+	 * Returns a {@link FlinkClient} that uses the configured {@link Config#NIMBUS_HOST} and {@link
+	 * Config#NIMBUS_THRIFT_PORT} as JobManager address.
+	 *
+	 * @param conf
+	 * 		Configuration that contains the jobmanager's hostname and port.
+	 * @return A configured {@link FlinkClient}.
+	 */
+	@SuppressWarnings("rawtypes")
+	public static FlinkClient getConfiguredClient(final Map conf) {
+		final String nimbusHost = (String) conf.get(Config.NIMBUS_HOST);
+		final int nimbusPort = Utils.getInt(conf.get(Config.NIMBUS_THRIFT_PORT)).intValue();
+		return new FlinkClient(nimbusHost, nimbusPort);
+	}
+
+	/**
+	 * Return a reference to itself.
+	 * <p/>
+	 * {@link FlinkClient} mimics both, {@link NimbusClient} and {@link Nimbus}{@code .Client}, at once.
+	 *
+	 * @return A reference to itself.
+	 */
+	public FlinkClient getClient() {
+		return this;
+	}
+
+	public void close() {/* nothing to do */}
+
+	// The following methods are derived from "backtype.storm.generated.Nimubs.Client"
+
+	/**
+	 * Parameter {@code uploadedJarLocation} is actually used to point to the local jar, because Flink does not support
+	 * uploading a jar file before hand. Jar files are always uploaded directly when a program is submitted.
+	 */
+	public void submitTopology(final String name, final String uploadedJarLocation, final FlinkTopology topology)
+			throws AlreadyAliveException, InvalidTopologyException {
+		this.submitTopologyWithOpts(name, uploadedJarLocation, topology);
+	}
+
+	/**
+	 * Parameter {@code uploadedJarLocation} is actually used to point to the local jar, because Flink does not support
+	 * uploading a jar file before hand. Jar files are always uploaded directly when a program is submitted.
+	 */
+	public void submitTopologyWithOpts(final String name, final String uploadedJarLocation, final FlinkTopology
+			topology)
+			throws AlreadyAliveException, InvalidTopologyException {
+
+		if (this.getTopologyJobId(name) != null) {
+			throw new AlreadyAliveException();
+		}
+
+		final File uploadedJarFile = new File(uploadedJarLocation);
+		try {
+			JobWithJars.checkJarFile(uploadedJarFile);
+		} catch (final IOException e) {
+			throw new RuntimeException("Problem with jar file " + uploadedJarFile.getAbsolutePath(), e);
+		}
+
+		final List<File> jarFiles = new ArrayList<File>();
+		jarFiles.add(uploadedJarFile);
+
+		final JobGraph jobGraph = topology.getStreamGraph().getJobGraph(name);
+		jobGraph.addJar(new Path(uploadedJarFile.getAbsolutePath()));
+
+		final Configuration configuration = jobGraph.getJobConfiguration();
+
+		final Client client;
+		try {
+			client = new Client(new InetSocketAddress(this.jobManagerHost, this.jobManagerPort), configuration,
+					JobWithJars.buildUserCodeClassLoader(jarFiles, JobWithJars.class.getClassLoader()), -1);
+		} catch (final UnknownHostException e) {
+			throw new RuntimeException("Cannot execute job due to UnknownHostException", e);
+		}
+
+		try {
+			client.run(jobGraph, false);
+		} catch (final ProgramInvocationException e) {
+			throw new RuntimeException("Cannot execute job due to ProgramInvocationException", e);
+		}
+	}
+
+	public void killTopology(final String name) throws NotAliveException {
+		this.killTopologyWithOpts(name, null);
+	}
+
+	public void killTopologyWithOpts(final String name, final KillOptions options) throws NotAliveException {
+		final JobID jobId = this.getTopologyJobId(name);
+		if (jobId == null) {
+			throw new NotAliveException();
+		}
+
+		try {
+			final ActorRef jobManager = this.getJobManager();
+
+			if (options != null) {
+				try {
+					Thread.sleep(1000 * options.get_wait_secs());
+				} catch (final InterruptedException e) {
+					throw new RuntimeException(e);
+				}
+			}
+
+			final FiniteDuration askTimeout = this.getTimeout();
+			final Future<Object> response = Patterns.ask(jobManager, new CancelJob(jobId), new Timeout(askTimeout));
+			try {
+				Await.result(response, askTimeout);
+			} catch (final Exception e) {
+				throw new RuntimeException("Killing topology " + name + " with Flink job ID " + jobId + " failed", e);
+			}
+		} catch (final IOException e) {
+			throw new RuntimeException("Could not connect to Flink JobManager with address " + this.jobManagerHost
+					+ ":" + this.jobManagerPort, e);
+		}
+	}
+
+	/**
+	 * Package internal method to get a Flink {@link JobID} from a Storm topology name.
+	 *
+	 * @param id
+	 * 		The Storm topology name.
+	 * @return Flink's internally used {@link JobID}.
+	 */
+	JobID getTopologyJobId(final String id) {
+		final Configuration configuration = GlobalConfiguration.getConfiguration();
+		if (this.timeout != null) {
+			configuration.setString(ConfigConstants.AKKA_ASK_TIMEOUT, this.timeout);
+		}
+
+		try {
+			final ActorRef jobManager = this.getJobManager();
+
+			final FiniteDuration askTimeout = this.getTimeout();
+			final Future<Object> response = Patterns.ask(jobManager, JobManagerMessages.getRequestRunningJobsStatus(),
+					new Timeout(askTimeout));
+
+			Object result;
+			try {
+				result = Await.result(response, askTimeout);
+			} catch (final Exception e) {
+				throw new RuntimeException("Could not retrieve running jobs from the JobManager", e);
+			}
+
+			if (result instanceof RunningJobsStatus) {
+				final List<JobStatusMessage> jobs = ((RunningJobsStatus) result).getStatusMessages();
+
+				for (final JobStatusMessage status : jobs) {
+					if (status.getJobName().equals(id)) {
+						return status.getJobId();
+					}
+				}
+			} else {
+				throw new RuntimeException("ReqeustRunningJobs requires a response of type "
+						+ "RunningJobs. Instead the response is of type " + result.getClass() + ".");
+			}
+		} catch (final IOException e) {
+			throw new RuntimeException("Could not connect to Flink JobManager with address " + this.jobManagerHost
+					+ ":" + this.jobManagerPort, e);
+		}
+
+		return null;
+	}
+
+	private FiniteDuration getTimeout() {
+		final Configuration configuration = GlobalConfiguration.getConfiguration();
+		if (this.timeout != null) {
+			configuration.setString(ConfigConstants.AKKA_ASK_TIMEOUT, this.timeout);
+		}
+
+		return AkkaUtils.getTimeout(configuration);
+	}
+
+	private ActorRef getJobManager() throws IOException {
+		final Configuration configuration = GlobalConfiguration.getConfiguration();
+
+		ActorSystem actorSystem;
+		try {
+			final scala.Tuple2<String, Object> systemEndpoint = new scala.Tuple2<String, Object>("", 0);
+			actorSystem = AkkaUtils.createActorSystem(configuration, new Some<scala.Tuple2<String, Object>>(
+					systemEndpoint));
+		} catch (final Exception e) {
+			throw new RuntimeException("Could not start actor system to communicate with JobManager", e);
+		}
+
+		return JobManager.getJobManagerRemoteReference(new InetSocketAddress(this.jobManagerHost, this.jobManagerPort),
+				actorSystem, AkkaUtils.getLookupTimeout(configuration));
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/12b13f9c/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkLocalCluster.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkLocalCluster.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkLocalCluster.java
new file mode 100644
index 0000000..e82e97a
--- /dev/null
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkLocalCluster.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.stormcompatibility.api;
+
+import backtype.storm.LocalCluster;
+import backtype.storm.generated.ClusterSummary;
+import backtype.storm.generated.KillOptions;
+import backtype.storm.generated.RebalanceOptions;
+import backtype.storm.generated.StormTopology;
+import backtype.storm.generated.SubmitOptions;
+import backtype.storm.generated.TopologyInfo;
+import org.apache.flink.streaming.util.ClusterUtil;
+
+import java.util.Map;
+
+/**
+ * {@link FlinkLocalCluster} mimics a Storm {@link LocalCluster}.
+ */
+public class FlinkLocalCluster {
+
+	public void submitTopology(final String topologyName, final Map<?, ?> conf, final FlinkTopology topology)
+			throws Exception {
+		this.submitTopologyWithOpts(topologyName, conf, topology, null);
+	}
+
+	public void submitTopologyWithOpts(final String topologyName, final Map<?, ?> conf, final FlinkTopology topology,
+			final SubmitOptions submitOpts) throws Exception {
+		ClusterUtil
+				.startOnMiniCluster(topology.getStreamGraph().getJobGraph(topologyName), topology.getNumberOfTasks());
+	}
+
+	public void killTopology(final String topologyName) {
+		this.killTopologyWithOpts(topologyName, null);
+	}
+
+	public void killTopologyWithOpts(final String name, final KillOptions options) {
+	}
+
+	public void activate(final String topologyName) {
+	}
+
+	public void deactivate(final String topologyName) {
+	}
+
+	public void rebalance(final String name, final RebalanceOptions options) {
+	}
+
+	public void shutdown() {
+		ClusterUtil.stopOnMiniCluster();
+	}
+
+	public String getTopologyConf(final String id) {
+		return null;
+	}
+
+	public StormTopology getTopology(final String id) {
+		return null;
+	}
+
+	public ClusterSummary getClusterInfo() {
+		return null;
+	}
+
+	public TopologyInfo getTopologyInfo(final String id) {
+		return null;
+	}
+
+	public Map<?, ?> getState() {
+		return null;
+	}
+
+	// A different {@link FlinkLocalCluster} to be used for execution of ITCases
+	private static FlinkLocalCluster currentCluster = null;
+
+	/**
+	 * Returns a {@link FlinkLocalCluster} that should be used for execution. If no cluster was set by {@link
+	 * #initialize(FlinkLocalCluster)} in advance, a new {@link FlinkLocalCluster} is returned.
+	 *
+	 * @return a {@link FlinkLocalCluster} to be used for execution
+	 */
+	public static FlinkLocalCluster getLocalCluster() {
+		if (currentCluster == null) {
+			currentCluster = new FlinkLocalCluster();
+		}
+
+		return currentCluster;
+	}
+
+	/**
+	 * Sets a different {@link FlinkLocalCluster} to be used for execution.
+	 *
+	 * @param cluster
+	 * 		the {@link FlinkLocalCluster} to be used for execution
+	 */
+	public static void initialize(final FlinkLocalCluster cluster) {
+		currentCluster = cluster;
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/12b13f9c/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkOutputFieldsDeclarer.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkOutputFieldsDeclarer.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkOutputFieldsDeclarer.java
new file mode 100644
index 0000000..d6a0230
--- /dev/null
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkOutputFieldsDeclarer.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.stormcompatibility.api;
+
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.tuple.Fields;
+import backtype.storm.utils.Utils;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+
+import java.util.List;
+
+/**
+ * {@link FlinkOutputFieldsDeclarer} is used to get the declared output schema of a
+ * {@link backtype.storm.topology.IRichSpout spout} or {@link backtype.storm.topology.IRichBolt
+ * bolt}.<br />
+ * <br />
+ * <strong>CAUTION: Currently, Flink does only support the default output stream. Furthermore,
+ * direct emit is not supported.</strong>
+ */
+final class FlinkOutputFieldsDeclarer implements OutputFieldsDeclarer {
+
+	// the declared output schema
+	private Fields outputSchema;
+
+	@Override
+	public void declare(final Fields fields) {
+		this.declareStream(Utils.DEFAULT_STREAM_ID, false, fields);
+	}
+
+	/**
+	 * {@inheritDoc}
+	 * <p/>
+	 * Direct emit is no supported by Flink. Parameter {@code direct} must be {@code false}.
+	 *
+	 * @throws UnsupportedOperationException
+	 * 		if {@code direct} is {@code true}
+	 */
+	@Override
+	public void declare(final boolean direct, final Fields fields) {
+		this.declareStream(Utils.DEFAULT_STREAM_ID, direct, fields);
+	}
+
+	/**
+	 * {@inheritDoc}
+	 * <p/>
+	 * Currently, Flink only supports the default output stream. Thus, parameter {@code streamId} must be equals to
+	 * {@link Utils#DEFAULT_STREAM_ID}.
+	 *
+	 * @throws UnsupportedOperationException
+	 * 		if {@code streamId} is not equal to {@link Utils#DEFAULT_STREAM_ID}
+	 */
+	@Override
+	public void declareStream(final String streamId, final Fields fields) {
+		this.declareStream(streamId, false, fields);
+	}
+
+	/**
+	 * {@inheritDoc}
+	 * <p/>
+	 * Currently, Flink only supports the default output stream. Thus, parameter {@code streamId} must be equals to
+	 * {@link Utils#DEFAULT_STREAM_ID}. Furthermore, direct emit is no supported by Flink and parameter {@code direct}
+	 * must be {@code false}.
+	 *
+	 * @throws UnsupportedOperationException
+	 * 		if {@code streamId} is not equal to {@link Utils#DEFAULT_STREAM_ID} or {@code direct} is {@code true}
+	 */
+	@Override
+	public void declareStream(final String streamId, final boolean direct, final Fields fields) {
+		if (!Utils.DEFAULT_STREAM_ID.equals(streamId)) {
+			throw new UnsupportedOperationException("Currently, only the default output stream is supported by Flink");
+		}
+		if (direct) {
+			throw new UnsupportedOperationException("Direct emit is not supported by Flink");
+		}
+
+		this.outputSchema = fields;
+	}
+
+	/**
+	 * Returns {@link TypeInformation} for the declared output schema. If no or an empty output schema was declared,
+	 * {@code null} is returned.
+	 *
+	 * @return output type information for the declared output schema; or {@code null} if no output schema was declared
+	 * @throws IllegalArgumentException
+	 * 		if more then 25 attributes are declared
+	 */
+	public TypeInformation<?> getOutputType() throws IllegalArgumentException {
+		if ((this.outputSchema == null) || (this.outputSchema.size() == 0)) {
+			return null;
+		}
+
+		Tuple t;
+		final int numberOfAttributes = this.outputSchema.size();
+
+		if (numberOfAttributes == 1) {
+			return TypeExtractor.getForClass(Object.class);
+		} else if (numberOfAttributes <= 25) {
+			try {
+				t = Tuple.getTupleClass(numberOfAttributes).newInstance();
+			} catch (final InstantiationException e) {
+				throw new RuntimeException(e);
+			} catch (final IllegalAccessException e) {
+				throw new RuntimeException(e);
+			}
+		} else {
+			throw new IllegalArgumentException("Flink supports only a maximum number of 25 attributes");
+		}
+
+		// TODO: declare only key fields as DefaultComparable
+		for (int i = 0; i < numberOfAttributes; ++i) {
+			t.setField(new DefaultComparable(), i);
+		}
+
+		return TypeExtractor.getForObject(t);
+	}
+
+	/**
+	 * {@link DefaultComparable} is a {@link Comparable} helper class that is used to get the correct {@link
+	 * TypeInformation} from {@link TypeExtractor} within {@link #getOutputType()}. If key fields are not comparable,
+	 * Flink cannot use them and will throw an exception.
+	 */
+	private static class DefaultComparable implements Comparable<DefaultComparable> {
+
+		public DefaultComparable() {
+		}
+
+		@Override
+		public int compareTo(final DefaultComparable o) {
+			return 0;
+		}
+	}
+
+	/**
+	 * Computes the indexes within the declared output schema, for a list of given field-grouping attributes.
+	 *
+	 * @return array of {@code int}s that contains the index without the output schema for each attribute in the given
+	 * list
+	 */
+	public int[] getGroupingFieldIndexes(final List<String> groupingFields) {
+		final int[] fieldIndexes = new int[groupingFields.size()];
+
+		for (int i = 0; i < fieldIndexes.length; ++i) {
+			fieldIndexes[i] = this.outputSchema.fieldIndex(groupingFields.get(i));
+		}
+
+		return fieldIndexes;
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/12b13f9c/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkSubmitter.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkSubmitter.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkSubmitter.java
new file mode 100644
index 0000000..c86ee8a
--- /dev/null
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkSubmitter.java
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.stormcompatibility.api;
+
+import backtype.storm.Config;
+import backtype.storm.StormSubmitter;
+import backtype.storm.generated.AlreadyAliveException;
+import backtype.storm.generated.InvalidTopologyException;
+import backtype.storm.generated.SubmitOptions;
+import backtype.storm.utils.Utils;
+
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.client.program.ContextEnvironment;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.json.simple.JSONValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.util.Map;
+
+/**
+ * {@link FlinkSubmitter} mimics a {@link StormSubmitter} to submit Storm topologies to a Flink cluster.
+ */
+public class FlinkSubmitter {
+	public final static Logger logger = LoggerFactory.getLogger(FlinkSubmitter.class);
+
+	/**
+	 * Submits a topology to run on the cluster. A topology runs forever or until explicitly killed.
+	 *
+	 * @param name
+	 * 		the name of the storm.
+	 * @param stormConf
+	 * 		the topology-specific configuration. See {@link Config}.
+	 * @param topology
+	 * 		the processing to execute.
+	 * @param opts
+	 * 		to manipulate the starting of the topology.
+	 * @throws AlreadyAliveException
+	 * 		if a topology with this name is already running
+	 * @throws InvalidTopologyException
+	 * 		if an invalid topology was submitted
+	 */
+	public static void submitTopology(final String name, final Map<?, ?> stormConf, final FlinkTopology topology,
+			final SubmitOptions opts)
+			throws AlreadyAliveException, InvalidTopologyException {
+		submitTopology(name, stormConf, topology);
+	}
+
+	/**
+	 * Submits a topology to run on the cluster. A topology runs forever or until explicitly killed. The given {@link
+	 * FlinkProgressListener} is ignored because progress bars are not supported by Flink.
+	 *
+	 * @param name
+	 * 		the name of the storm.
+	 * @param stormConf
+	 * 		the topology-specific configuration. See {@link Config}.
+	 * @param topology
+	 * 		the processing to execute.
+	 * @param opts
+	 * 		to manipulate the starting of the topology
+	 * @param progressListener
+	 * 		to track the progress of the jar upload process
+	 * @throws AlreadyAliveException
+	 * 		if a topology with this name is already running
+	 * @throws InvalidTopologyException
+	 * 		if an invalid topology was submitted
+	 */
+	@SuppressWarnings({"rawtypes", "unchecked"})
+	public static void submitTopology(final String name, final Map stormConf, final FlinkTopology topology)
+			throws AlreadyAliveException, InvalidTopologyException {
+		if (!Utils.isValidConf(stormConf)) {
+			throw new IllegalArgumentException("Storm conf is not valid. Must be json-serializable");
+		}
+
+		final Configuration flinkConfig = GlobalConfiguration.getConfiguration();
+		if (!stormConf.containsKey(Config.NIMBUS_HOST)) {
+			stormConf.put(Config.NIMBUS_HOST,
+					flinkConfig.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "localhost"));
+		}
+		if (!stormConf.containsKey(Config.NIMBUS_THRIFT_PORT)) {
+			stormConf.put(Config.NIMBUS_THRIFT_PORT,
+					new Integer(flinkConfig.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY,
+							6123)));
+		}
+
+		final String serConf = JSONValue.toJSONString(stormConf);
+
+		final FlinkClient client = FlinkClient.getConfiguredClient(stormConf);
+		if (client.getTopologyJobId(name) != null) {
+			throw new RuntimeException("Topology with name `" + name + "` already exists on cluster");
+		}
+		String localJar = System.getProperty("storm.jar");
+		if (localJar == null) {
+			try {
+				for (final File file : ((ContextEnvironment) ExecutionEnvironment.getExecutionEnvironment())
+						.getJars()) {
+					// TODO verify that there is onnly one jar
+					localJar = file.getAbsolutePath();
+				}
+			} catch (final ClassCastException e) {
+				// ignore
+			}
+		}
+		try {
+			logger.info("Submitting topology " + name + " in distributed mode with conf " + serConf);
+			client.submitTopologyWithOpts(name, localJar, topology);
+		} catch (final InvalidTopologyException e) {
+			logger.warn("Topology submission exception: " + e.get_msg());
+			throw e;
+		} catch (final AlreadyAliveException e) {
+			logger.warn("Topology already alive exception", e);
+			throw e;
+		} finally {
+			client.close();
+		}
+
+		logger.info("Finished submitting topology: " + name);
+	}
+
+	/**
+	 * Same as {@link #submitTopology(String, Map, FlinkTopology, SubmitOptions)}. Progress bars are not supported by
+	 * Flink.
+	 *
+	 * @param name
+	 * 		the name of the storm.
+	 * @param stormConf
+	 * 		the topology-specific configuration. See {@link Config}.
+	 * @param topology
+	 * 		the processing to execute.
+	 * @param opts
+	 * 		to manipulate the starting of the topology
+	 * @throws AlreadyAliveException
+	 * 		if a topology with this name is already running
+	 * @throws InvalidTopologyException
+	 * 		if an invalid topology was submitted
+	 */
+	public static void submitTopologyWithProgressBar(final String name, final Map<?, ?> stormConf,
+			final FlinkTopology topology)
+			throws AlreadyAliveException, InvalidTopologyException {
+		submitTopology(name, stormConf, topology);
+	}
+
+	/**
+	 * In Flink, jar files are submitted directly when a program is started. Thus, this method does nothing. The
+	 * returned value is parameter localJar, because this give the best integration of Storm behavior within a Flink
+	 * environment.
+	 *
+	 * @param conf
+	 * 		the topology-specific configuration. See {@link Config}.
+	 * @param localJar
+	 * 		file path of the jar file to submit
+	 * @return the value of parameter localJar
+	 */
+	@SuppressWarnings("rawtypes")
+	public static String submitJar(final Map conf, final String localJar) {
+		return submitJar(localJar);
+	}
+
+	/**
+	 * In Flink, jar files are submitted directly when a program is started. Thus, this method does nothing. The
+	 * returned value is parameter localJar, because this give the best integration of Storm behavior within a Flink
+	 * environment.
+	 *
+	 * @param conf
+	 * 		the topology-specific configuration. See {@link Config}.
+	 * @param localJar
+	 * 		file path of the jar file to submit
+	 * @param listener
+	 * 		progress listener to track the jar file upload
+	 * @return the value of parameter localJar
+	 */
+	public static String submitJar(final String localJar) {
+		if (localJar == null) {
+			throw new RuntimeException(
+					"Must submit topologies using the 'storm' client script so that StormSubmitter knows which jar " +
+							"to upload");
+		}
+
+		return localJar;
+	}
+
+	/**
+	 * Dummy interface use to track progress of file upload. Does not do anything. Kept for compatibility.
+	 */
+	public interface FlinkProgressListener {
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/12b13f9c/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkTopology.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkTopology.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkTopology.java
new file mode 100644
index 0000000..4b7f0dc
--- /dev/null
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkTopology.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.stormcompatibility.api;
+
+import backtype.storm.generated.StormTopology;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+/**
+ * {@link FlinkTopology} mimics a {@link StormTopology} and is implemented in terms of a {@link
+ * StreamExecutionEnvironment} . In contrast to a regular {@link StreamExecutionEnvironment}, a {@link FlinkTopology}
+ * cannot be executed directly, but must be handed over to a {@link FlinkLocalCluster}, {@link FlinkSubmitter}, or
+ * {@link FlinkClient}.
+ */
+class FlinkTopology extends StreamExecutionEnvironment {
+
+	// The corresponding {@link StormTopology} that is mimicked by this {@link FlinkTopology}
+	private final StormTopology stormTopology;
+	// The number of declared tasks for the whole program (ie, sum over all dops)
+	private int numberOfTasks = 0;
+
+	public FlinkTopology(final StormTopology stormTopology) {
+		// Set default parallelism to 1, to mirror Storm default behavior
+		super.setParallelism(1);
+		this.stormTopology = stormTopology;
+	}
+
+	/**
+	 * Is not supported. In order to execute use {@link FlinkLocalCluster}, {@link FlinkSubmitter}, or {@link
+	 * FlinkClient}.
+	 *
+	 * @throws UnsupportedOperationException
+	 * 		at every invocation
+	 */
+	@Override
+	public JobExecutionResult execute() throws Exception {
+		throw new UnsupportedOperationException(
+				"A FlinkTopology cannot be executed directly. Use FlinkLocalCluster, FlinkSubmitter, or FlinkClient " +
+						"instead.");
+	}
+
+	/**
+	 * Is not supported. In order to execute use {@link FlinkLocalCluster}, {@link FlinkSubmitter} or {@link
+	 * FlinkClient}.
+	 *
+	 * @throws UnsupportedOperationException
+	 * 		at every invocation
+	 */
+	@Override
+	public JobExecutionResult execute(final String jobName) throws Exception {
+		throw new UnsupportedOperationException(
+				"A FlinkTopology cannot be executed directly. Use FlinkLocalCluster, FlinkSubmitter, or FlinkClient " +
+						"instead.");
+	}
+
+	//TODO
+	public String getStormTopologyAsString() {
+		return this.stormTopology.toString();
+	}
+
+	/**
+	 * Increased the number of declared tasks of this program by the given value.
+	 *
+	 * @param dop
+	 * 		The dop of a new operator that increases the number of overall tasks.
+	 */
+	public void increaseNumberOfTasks(final int dop) {
+		assert (dop > 0);
+		this.numberOfTasks += dop;
+	}
+
+	/**
+	 * Return the number or required tasks to execute this program.
+	 *
+	 * @return the number or required tasks to execute this program
+	 */
+	public int getNumberOfTasks() {
+		return this.numberOfTasks;
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/12b13f9c/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkTopologyBuilder.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkTopologyBuilder.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkTopologyBuilder.java
new file mode 100644
index 0000000..41abbb1
--- /dev/null
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkTopologyBuilder.java
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.stormcompatibility.api;
+
+import backtype.storm.generated.ComponentCommon;
+import backtype.storm.generated.GlobalStreamId;
+import backtype.storm.generated.Grouping;
+import backtype.storm.generated.StormTopology;
+import backtype.storm.topology.BasicBoltExecutor;
+import backtype.storm.topology.BoltDeclarer;
+import backtype.storm.topology.IBasicBolt;
+import backtype.storm.topology.IRichBolt;
+import backtype.storm.topology.IRichSpout;
+import backtype.storm.topology.IRichStateSpout;
+import backtype.storm.topology.SpoutDeclarer;
+import backtype.storm.topology.TopologyBuilder;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.stormcompatibility.wrappers.StormBoltWrapper;
+import org.apache.flink.stormcompatibility.wrappers.StormSpoutWrapper;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.Set;
+
+/**
+ * {@link FlinkTopologyBuilder} mimics a {@link TopologyBuilder}, but builds a Flink program instead of a Storm
+ * topology. Most methods (except {@link #createTopology()} are copied from the original {@link TopologyBuilder}
+ * implementation to ensure equal behavior.<br />
+ * <br />
+ * <strong>CAUTION: {@link IRichStateSpout StateSpout}s and multiple output streams per spout/bolt are currently not
+ * supported.</strong>
+ */
+public class FlinkTopologyBuilder {
+
+	// A Storm {@link TopologyBuilder} to build a real Storm topology
+	private final TopologyBuilder stormBuilder = new TopologyBuilder();
+	// All user spouts by their ID
+	private final HashMap<String, IRichSpout> spouts = new HashMap<String, IRichSpout>();
+	// All user bolts by their ID
+	private final HashMap<String, IRichBolt> bolts = new HashMap<String, IRichBolt>();
+
+	/**
+	 * Creates a Flink program that used the specified spouts and bolts.
+	 */
+	@SuppressWarnings({"rawtypes", "unchecked"})
+	public FlinkTopology createTopology() {
+		final StormTopology stormTopolgoy = this.stormBuilder.createTopology();
+		final FlinkTopology env = new FlinkTopology(stormTopolgoy);
+		env.setParallelism(1);
+
+		final HashMap<String, SingleOutputStreamOperator> availableOperators =
+				new HashMap<String, SingleOutputStreamOperator>();
+
+		for (final Entry<String, IRichSpout> spout : this.spouts.entrySet()) {
+			final String spoutId = spout.getKey();
+			final IRichSpout userSpout = spout.getValue();
+
+			final FlinkOutputFieldsDeclarer declarer = new FlinkOutputFieldsDeclarer();
+			userSpout.declareOutputFields(declarer);
+
+			/* TODO in order to support multiple output streams, use an additional wrapper (or modify StormSpoutWrapper
+			 * and StormCollector)
+			 * -> add an additional output attribute tagging the output stream, and use .split() and .select() to split
+			 * the streams
+			 */
+			final DataStreamSource source = env.addSource(new StormSpoutWrapper(userSpout), declarer.getOutputType());
+			availableOperators.put(spoutId, source);
+
+			int dop = 1;
+			final ComponentCommon common = stormTopolgoy.get_spouts().get(spoutId).get_common();
+			if (common.is_set_parallelism_hint()) {
+				dop = common.get_parallelism_hint();
+				source.setParallelism(dop);
+			}
+			env.increaseNumberOfTasks(dop);
+		}
+
+		final HashMap<String, IRichBolt> unprocessedBolts = new HashMap<String, IRichBolt>();
+		unprocessedBolts.putAll(this.bolts);
+
+		final HashMap<String, Set<Entry<GlobalStreamId, Grouping>>> unprocessdInputsPerBolt =
+				new HashMap<String, Set<Entry<GlobalStreamId, Grouping>>>();
+
+		/* Because we do not know the order in which an iterator steps over a set, we might process a consumer before
+		 * its producer
+		 * ->thus, we might need to repeat multiple times
+		 */
+		while (unprocessedBolts.size() > 0) {
+
+			final Iterator<Entry<String, IRichBolt>> boltsIterator = unprocessedBolts.entrySet().iterator();
+			while (boltsIterator.hasNext()) {
+
+				final Entry<String, IRichBolt> bolt = boltsIterator.next();
+				final String boltId = bolt.getKey();
+				final IRichBolt userBolt = bolt.getValue();
+
+				final FlinkOutputFieldsDeclarer declarer = new FlinkOutputFieldsDeclarer();
+				userBolt.declareOutputFields(declarer);
+
+				final ComponentCommon common = stormTopolgoy.get_bolts().get(boltId).get_common();
+
+				Set<Entry<GlobalStreamId, Grouping>> unprocessedInputs = unprocessdInputsPerBolt.get(boltId);
+				if (unprocessedInputs == null) {
+					unprocessedInputs = new HashSet<Entry<GlobalStreamId, Grouping>>();
+					unprocessedInputs.addAll(common.get_inputs().entrySet());
+					unprocessdInputsPerBolt.put(boltId, unprocessedInputs);
+				}
+
+				// connect each available producer to the current bolt
+				final Iterator<Entry<GlobalStreamId, Grouping>> inputStreamsIterator = unprocessedInputs.iterator();
+				while (inputStreamsIterator.hasNext()) {
+
+					final Entry<GlobalStreamId, Grouping> inputStream = inputStreamsIterator.next();
+					final String producerId = inputStream.getKey().get_componentId();
+
+					DataStream<?> inputDataStream = availableOperators.get(producerId);
+
+					if (inputDataStream != null) {
+						// if producer was processed already
+						final Grouping grouping = inputStream.getValue();
+						if (grouping.is_set_shuffle()) {
+							// Storm uses a round-robin shuffle strategy
+							inputDataStream = inputDataStream.rebalance();
+						} else if (grouping.is_set_fields()) {
+							// global grouping is emulated in Storm via an empty fields grouping list
+							final List<String> fields = grouping.get_fields();
+							if (fields.size() > 0) {
+								inputDataStream = inputDataStream.groupBy(declarer.getGroupingFieldIndexes(grouping
+										.get_fields()));
+							} else {
+								inputDataStream = inputDataStream.global();
+							}
+						} else if (grouping.is_set_all()) {
+							inputDataStream = inputDataStream.broadcast();
+						} else if (!grouping.is_set_local_or_shuffle()) {
+							throw new UnsupportedOperationException(
+									"Flink only supports (local-or-)shuffle, fields, all, and global grouping");
+						}
+
+						final TypeInformation<?> outType = declarer.getOutputType();
+
+						final SingleOutputStreamOperator operator = inputDataStream.transform(boltId, outType,
+								new StormBoltWrapper(userBolt));
+						if (outType != null) {
+							// only for non-sink nodes
+							availableOperators.put(boltId, operator);
+						}
+
+						int dop = 1;
+						if (common.is_set_parallelism_hint()) {
+							dop = common.get_parallelism_hint();
+							operator.setParallelism(dop);
+						}
+						env.increaseNumberOfTasks(dop);
+
+						inputStreamsIterator.remove();
+					}
+				}
+
+				if (unprocessedInputs.size() == 0) {
+					// all inputs are connected; processing bolt completed
+					boltsIterator.remove();
+				}
+			}
+		}
+		return env;
+	}
+
+	/**
+	 * Define a new bolt in this topology with parallelism of just one thread.
+	 *
+	 * @param id
+	 * 		the id of this component. This id is referenced by other components that want to consume this bolt's
+	 * 		outputs.
+	 * @param bolt
+	 * 		the bolt
+	 * @return use the returned object to declare the inputs to this component
+	 */
+	public BoltDeclarer setBolt(final String id, final IRichBolt bolt) {
+		return this.setBolt(id, bolt, null);
+	}
+
+	/**
+	 * Define a new bolt in this topology with the specified amount of parallelism.
+	 *
+	 * @param id
+	 * 		the id of this component. This id is referenced by other components that want to consume this bolt's
+	 * 		outputs.
+	 * @param bolt
+	 * 		the bolt
+	 * @param parallelism_hint
+	 * 		the number of tasks that should be assigned to execute this bolt. Each task will run on a thread in a
+	 * 		process somewhere around the cluster.
+	 * @return use the returned object to declare the inputs to this component
+	 */
+	public BoltDeclarer setBolt(final String id, final IRichBolt bolt, final Number parallelism_hint) {
+		final BoltDeclarer declarer = this.stormBuilder.setBolt(id, bolt, parallelism_hint);
+		this.bolts.put(id, bolt);
+		return declarer;
+	}
+
+	/**
+	 * Define a new bolt in this topology. This defines a basic bolt, which is a simpler to use but more restricted
+	 * kind
+	 * of bolt. Basic bolts are intended for non-aggregation processing and automate the anchoring/acking process to
+	 * achieve proper reliability in the topology.
+	 *
+	 * @param id
+	 * 		the id of this component. This id is referenced by other components that want to consume this bolt's
+	 * 		outputs.
+	 * @param bolt
+	 * 		the basic bolt
+	 * @return use the returned object to declare the inputs to this component
+	 */
+	public BoltDeclarer setBolt(final String id, final IBasicBolt bolt) {
+		return this.setBolt(id, bolt, null);
+	}
+
+	/**
+	 * Define a new bolt in this topology. This defines a basic bolt, which is a simpler to use but more restricted
+	 * kind
+	 * of bolt. Basic bolts are intended for non-aggregation processing and automate the anchoring/acking process to
+	 * achieve proper reliability in the topology.
+	 *
+	 * @param id
+	 * 		the id of this component. This id is referenced by other components that want to consume this bolt's
+	 * 		outputs.
+	 * @param bolt
+	 * 		the basic bolt
+	 * @param parallelism_hint
+	 * 		the number of tasks that should be assigned to execute this bolt. Each task will run on a thread in a
+	 * 		process somwehere around the cluster.
+	 * @return use the returned object to declare the inputs to this component
+	 */
+	public BoltDeclarer setBolt(final String id, final IBasicBolt bolt, final Number parallelism_hint) {
+		return this.setBolt(id, new BasicBoltExecutor(bolt), parallelism_hint);
+	}
+
+	/**
+	 * Define a new spout in this topology.
+	 *
+	 * @param id
+	 * 		the id of this component. This id is referenced by other components that want to consume this spout's
+	 * 		outputs.
+	 * @param spout
+	 * 		the spout
+	 */
+	public SpoutDeclarer setSpout(final String id, final IRichSpout spout) {
+		return this.setSpout(id, spout, null);
+	}
+
+	/**
+	 * Define a new spout in this topology with the specified parallelism. If the spout declares itself as
+	 * non-distributed, the parallelism_hint will be ignored and only one task will be allocated to this component.
+	 *
+	 * @param id
+	 * 		the id of this component. This id is referenced by other components that want to consume this spout's
+	 * 		outputs.
+	 * @param parallelism_hint
+	 * 		the number of tasks that should be assigned to execute this spout. Each task will run on a thread in a
+	 * 		process somwehere around the cluster.
+	 * @param spout
+	 * 		the spout
+	 */
+	public SpoutDeclarer setSpout(final String id, final IRichSpout spout, final Number parallelism_hint) {
+		final SpoutDeclarer declarer = this.stormBuilder.setSpout(id, spout, parallelism_hint);
+		this.spouts.put(id, spout);
+		return declarer;
+	}
+
+	// TODO add StateSpout support (Storm 0.9.4 does not yet support StateSpouts itself)
+	/* not implemented by Storm 0.9.4
+	 * public void setStateSpout(final String id, final IRichStateSpout stateSpout) {
+	 * this.stormBuilder.setStateSpout(id, stateSpout);
+	 * }
+	 * public void setStateSpout(final String id, final IRichStateSpout stateSpout, final Number parallelism_hint) {
+	 * this.stormBuilder.setStateSpout(id, stateSpout, parallelism_hint);
+	 * }
+	 */
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/12b13f9c/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkTopologyContext.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkTopologyContext.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkTopologyContext.java
new file mode 100644
index 0000000..a761617
--- /dev/null
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkTopologyContext.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.stormcompatibility.api;
+
+import backtype.storm.generated.StormTopology;
+import backtype.storm.hooks.ITaskHook;
+import backtype.storm.metric.api.CombinedMetric;
+import backtype.storm.metric.api.ICombiner;
+import backtype.storm.metric.api.IMetric;
+import backtype.storm.metric.api.IReducer;
+import backtype.storm.metric.api.ReducedMetric;
+import backtype.storm.state.ISubscribedState;
+import backtype.storm.task.TopologyContext;
+
+import java.util.Collection;
+import java.util.Map;
+
+/**
+ * {@link FlinkTopologyContext} is a {@link TopologyContext} that overwrites certain method that are not applicable when
+ * a Storm topology is executed within Flink.
+ */
+public class FlinkTopologyContext extends TopologyContext {
+
+	/**
+	 * Instantiates a new {@link FlinkTopologyContext} for a given Storm topology. The context object is instantiated
+	 * for each parallel task
+	 *
+	 * @param topology
+	 * 		The Storm topology that is currently executed
+	 * @param taskToComponents
+	 * 		A map from task IDs to Component IDs
+	 * @param taskId
+	 * 		The ID of the task the context belongs to.
+	 */
+	public FlinkTopologyContext(final StormTopology topology, final Map<Integer, String> taskToComponents,
+			final Integer taskId) {
+		super(topology, null, taskToComponents, null, null, null, null, null, taskId, null, null, null, null, null,
+				null, null);
+	}
+
+	/**
+	 * Not supported by Flink.
+	 *
+	 * @throws UnsupportedOperationException
+	 * 		at every invocation
+	 */
+	@Override
+	public void addTaskHook(final ITaskHook hook) {
+		throw new UnsupportedOperationException("Task hooks are not supported by Flink");
+	}
+
+	/**
+	 * Not supported by Flink.
+	 *
+	 * @throws UnsupportedOperationException
+	 * 		at every invocation
+	 */
+	@Override
+	public Collection<ITaskHook> getHooks() {
+		throw new UnsupportedOperationException("Task hooks are not supported by Flink");
+	}
+
+	/**
+	 * Not supported by Flink.
+	 *
+	 * @throws UnsupportedOperationException
+	 * 		at every invocation
+	 */
+	@Override
+	public IMetric getRegisteredMetricByName(final String name) {
+		throw new UnsupportedOperationException("Metrics are not supported by Flink");
+
+	}
+
+	/**
+	 * Not supported by Flink.
+	 *
+	 * @throws UnsupportedOperationException
+	 * 		at every invocation
+	 */
+	@SuppressWarnings("rawtypes")
+	@Override
+	public CombinedMetric registerMetric(final String name, final ICombiner combiner, final int timeBucketSizeInSecs) {
+		throw new UnsupportedOperationException("Metrics are not supported by Flink");
+	}
+
+	/**
+	 * Not supported by Flink.
+	 *
+	 * @throws UnsupportedOperationException
+	 * 		at every invocation
+	 */
+	@SuppressWarnings("rawtypes")
+	@Override
+	public ReducedMetric registerMetric(final String name, final IReducer combiner, final int timeBucketSizeInSecs) {
+		throw new UnsupportedOperationException("Metrics are not supported by Flink");
+	}
+
+	/**
+	 * Not supported by Flink.
+	 *
+	 * @throws UnsupportedOperationException
+	 * 		at every invocation
+	 */
+	@SuppressWarnings("unchecked")
+	@Override
+	public IMetric registerMetric(final String name, final IMetric metric, final int timeBucketSizeInSecs) {
+		throw new UnsupportedOperationException("Metrics are not supported by Flink");
+	}
+
+	/**
+	 * Not supported by Flink.
+	 *
+	 * @throws UnsupportedOperationException
+	 * 		at every invocation
+	 */
+	@Override
+	public <T extends ISubscribedState> T setAllSubscribedState(final T obj) {
+		throw new UnsupportedOperationException("Not supported by Flink");
+
+	}
+
+	/**
+	 * Not supported by Flink.
+	 *
+	 * @throws UnsupportedOperationException
+	 * 		at every invocation
+	 */
+	@Override
+	public <T extends ISubscribedState> T setSubscribedState(final String componentId, final T obj) {
+		throw new UnsupportedOperationException("Not supported by Flink");
+	}
+
+	/**
+	 * Not supported by Flink.
+	 *
+	 * @throws UnsupportedOperationException
+	 * 		at every invocation
+	 */
+	@Override
+	public <T extends ISubscribedState> T setSubscribedState(final String componentId, final String streamId, final T
+			obj) {
+		throw new UnsupportedOperationException("Not supported by Flink");
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/12b13f9c/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/AbstractStormCollector.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/AbstractStormCollector.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/AbstractStormCollector.java
new file mode 100644
index 0000000..e8048b0
--- /dev/null
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/AbstractStormCollector.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.stormcompatibility.wrappers;
+
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.api.java.tuple.Tuple25;
+import java.util.List;
+
+/**
+ * A {@link AbstractStormCollector} transforms Storm tuples to Flink tuples.
+ */
+abstract class AbstractStormCollector<OUT> {
+
+	/**
+	 * Flink output tuple of concrete type {@link Tuple1} to {@link Tuple25}.
+	 */
+	protected final Tuple outputTuple;
+	/**
+	 * The number of attributes of the output tuples. (Determines the concrete type of
+	 * {@link #outputTuple}). If {@link #numberOfAttributes} is zero, {@link #outputTuple} is not
+	 * used and "raw" data type is used.
+	 */
+	protected final int numberOfAttributes;
+	/**
+	 * Is set to {@code true} each time a tuple is emitted.
+	 */
+	boolean tupleEmitted = false;
+
+	/**
+	 * Instantiates a new {@link AbstractStormCollector} that emits Flink tuples via
+	 * {@link #doEmit(Object)}. If the number of attributes is specified as zero, any output type is
+	 * supported. If the number of attributes is between 1 to 25, the output type is {@link Tuple1}
+	 * to {@link Tuple25}.
+	 * 
+	 * @param numberOfAttributes
+	 * 		The number of attributes of the emitted tuples.
+	 * @throws UnsupportedOperationException
+	 * 		if the specified number of attributes is not in the valid range of [0,25]
+	 */
+	public AbstractStormCollector(final int numberOfAttributes) throws UnsupportedOperationException {
+		this.numberOfAttributes = numberOfAttributes;
+
+		if (this.numberOfAttributes <= 0) {
+			this.outputTuple = null;
+		} else if (this.numberOfAttributes <= 25) {
+			try {
+				this.outputTuple = org.apache.flink.api.java.tuple.Tuple
+						.getTupleClass(this.numberOfAttributes).newInstance();
+			} catch (final InstantiationException e) {
+				throw new RuntimeException(e);
+			} catch (final IllegalAccessException e) {
+				throw new RuntimeException(e);
+			}
+		} else {
+			throw new UnsupportedOperationException(
+					"SimpleStormBoltWrapper can handle not more then 25 attributes, but "
+					+ this.numberOfAttributes + " are declared by the given bolt");
+		}
+	}
+
+	/**
+	 * Transforms a Storm tuple into a Flink tuple of type {@code OUT} and emits this tuple via
+	 * {@link #doEmit(Object)}.
+	 * 
+	 * @param tuple
+	 * 		The Storm tuple to be emitted.
+	 * @return the return value of {@link #doEmit(Object)}
+	 */
+	@SuppressWarnings("unchecked")
+	protected final List<Integer> tansformAndEmit(final List<Object> tuple) {
+		List<Integer> taskIds;
+		if (this.numberOfAttributes > 0) {
+			assert (tuple.size() == this.numberOfAttributes);
+			for (int i = 0; i < this.numberOfAttributes; ++i) {
+				this.outputTuple.setField(tuple.get(i), i);
+			}
+			taskIds = doEmit((OUT) this.outputTuple);
+		} else {
+			assert (tuple.size() == 1);
+			taskIds = doEmit((OUT) tuple.get(0));
+		}
+		this.tupleEmitted = true;
+
+		return taskIds;
+	}
+
+	/**
+	 * Emits a Flink tuple.
+	 * 
+	 * @param flinkTuple
+	 * 		The tuple to be emitted.
+	 * @return the IDs of the tasks this tuple was sent to
+	 */
+	protected abstract List<Integer> doEmit(OUT flinkTuple);
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/12b13f9c/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/AbstractStormSpoutWrapper.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/AbstractStormSpoutWrapper.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/AbstractStormSpoutWrapper.java
new file mode 100644
index 0000000..f685f13
--- /dev/null
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/AbstractStormSpoutWrapper.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.stormcompatibility.wrappers;
+
+import backtype.storm.spout.SpoutOutputCollector;
+import backtype.storm.topology.IRichSpout;
+
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.api.java.tuple.Tuple25;
+import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
+
+/**
+ * A {@link AbstractStormSpoutWrapper} wraps an {@link IRichSpout} in order to execute the Storm bolt within a Flink
+ * Streaming program. It takes the spout's output tuples and transforms them into Flink tuples of type {@code OUT} (see
+ * {@link StormSpoutCollector} for supported types).<br />
+ * <br />
+ * <strong>CAUTION: currently, only simple spouts are supported! (ie, spouts that do not use the Storm configuration
+ * <code>Map</code> or <code>TopologyContext</code> that is provided by the spouts's <code>prepare(..)</code> method.
+ * Furthermore, ack and fail back calls as well as tuple IDs are not supported so far.</strong>
+ */
+public abstract class AbstractStormSpoutWrapper<OUT> extends RichParallelSourceFunction<OUT> {
+	private static final long serialVersionUID = 4993283609095408765L;
+
+	// Number of attributes of the bolt's output tuples.
+	private final int numberOfAttributes;
+	/**
+	 * The wrapped Storm {@link IRichSpout spout}.
+	 */
+	protected final IRichSpout spout;
+	/**
+	 * The wrapper of the given Flink collector.
+	 */
+	protected StormSpoutCollector<OUT> collector;
+	/**
+	 * Indicates, if the source is still running or was canceled.
+	 */
+	protected boolean isRunning = true;
+
+	/**
+	 * Instantiates a new {@link AbstractStormSpoutWrapper} that wraps the given Storm {@link IRichSpout spout} such
+	 * that it can be used within a Flink streaming program. The output type will be one of {@link Tuple1} to
+	 * {@link Tuple25} depending on the spout's declared number of attributes.
+	 *
+	 * @param spout
+	 * 		The Storm {@link IRichSpout spout} to be used.
+	 * @throws IllegalArgumentException
+	 * 		If the number of declared output attributes is not with range [1;25].
+	 */
+	public AbstractStormSpoutWrapper(final IRichSpout spout) throws IllegalArgumentException {
+		this(spout, false);
+	}
+
+	/**
+	 * Instantiates a new {@link AbstractStormSpoutWrapper} that wraps the given Storm {@link IRichSpout spout} such
+	 * that it can be used within a Flink streaming program. The output type can be any type if parameter
+	 * {@code rawOutput} is {@code true} and the spout's number of declared output tuples is 1. If {@code rawOutput} is
+	 * {@code false} the output type will be one of {@link Tuple1} to {@link Tuple25} depending on the spout's declared
+	 * number of attributes.
+	 *
+	 * @param spout
+	 * 		The Storm {@link IRichSpout spout} to be used.
+	 * @param rawOutput
+	 * 		Set to {@code true} if a single attribute output stream, should not be of type {@link Tuple1} but be
+	 * 		of a raw type.
+	 * @throws IllegalArgumentException
+	 * 		If {@code rawOuput} is {@code true} and the number of declared output attributes is not 1 or if
+	 * 		{@code rawOuput} is {@code false} and the number of declared output attributes is not with range
+	 * 		[1;25].
+	 */
+	public AbstractStormSpoutWrapper(final IRichSpout spout, final boolean rawOutput) throws IllegalArgumentException {
+		this.spout = spout;
+		this.numberOfAttributes = StormWrapperSetupHelper.getNumberOfAttributes(spout, rawOutput);
+	}
+
+	@Override
+	public final void run(final SourceContext<OUT> ctx) throws Exception {
+		this.collector = new StormSpoutCollector<OUT>(this.numberOfAttributes, ctx);
+		this.spout.open(null,
+				StormWrapperSetupHelper
+						.convertToTopologyContext((StreamingRuntimeContext) super.getRuntimeContext(), true),
+				new SpoutOutputCollector(this.collector));
+		this.spout.activate();
+		this.execute();
+	}
+
+	/**
+	 * Needs to be implemented to call the given Spout's {@link IRichSpout#nextTuple() nextTuple()} method. This method
+	 * might use a {@code while(true)}-loop to emit an infinite number of tuples.
+	 */
+	protected abstract void execute();
+
+	/**
+	 * {@inheritDoc}
+	 * <p/>
+	 * Sets the {@link #isRunning} flag to {@code false}.
+	 */
+	@Override
+	public void cancel() {
+		this.isRunning = false;
+	}
+
+	@Override
+	public void close() throws Exception {
+		this.spout.close();
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/12b13f9c/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormBoltCollector.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormBoltCollector.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormBoltCollector.java
new file mode 100644
index 0000000..59b5cf0
--- /dev/null
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormBoltCollector.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.stormcompatibility.wrappers;
+
+import backtype.storm.task.IOutputCollector;
+import backtype.storm.tuple.Tuple;
+
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.api.java.tuple.Tuple25;
+import org.apache.flink.streaming.api.operators.Output;
+
+import java.util.Collection;
+import java.util.List;
+
+/**
+ * A {@link StormBoltCollector} is used by {@link StormBoltWrapper} to provided an Storm compatible
+ * output collector to the wrapped bolt. It transforms the emitted Storm tuples into Flink tuples
+ * and emits them via the provide {@link Output} object.
+ */
+class StormBoltCollector<OUT> extends AbstractStormCollector<OUT> implements IOutputCollector {
+
+	// The Flink output object
+	private final Output<OUT> flinkOutput;
+
+	/**
+	 * Instantiates a new {@link StormBoltCollector} that emits Flink tuples to the given Flink
+	 * output object. If the number of attributes is specified as zero, any output type is
+	 * supported. If the number of attributes is between 1 to 25, the output type is {@link Tuple1}
+	 * to {@link Tuple25}.
+	 * 
+	 * @param numberOfAttributes
+	 *        The number of attributes of the emitted tuples.
+	 * @param flinkOutput
+	 *        The Flink output object to be used.
+	 * @throws UnsupportedOperationException
+	 *         if the specified number of attributes is not in the valid range of [0,25]
+	 */
+	public StormBoltCollector(final int numberOfAttributes, final Output<OUT> flinkOutput) throws UnsupportedOperationException {
+		super(numberOfAttributes);
+		assert (flinkOutput != null);
+		this.flinkOutput = flinkOutput;
+	}
+
+	@Override
+	protected List<Integer> doEmit(final OUT flinkTuple) {
+		this.flinkOutput.collect(flinkTuple);
+		// TODO
+		return null;
+	}
+
+	@Override
+	public void reportError(final Throwable error) {
+		// not sure, if Flink can support this
+		throw new UnsupportedOperationException("Not implemented yet");
+	}
+
+	@Override
+	public List<Integer> emit(final String streamId, final Collection<Tuple> anchors, final List<Object> tuple) {
+		return this.tansformAndEmit(tuple);
+	}
+
+	@Override
+	public void emitDirect(final int taskId, final String streamId, final Collection<Tuple> anchors, final List<Object> tuple) {
+		throw new UnsupportedOperationException("Direct emit is not supported by Flink");
+	}
+
+	@Override
+	public void ack(final Tuple input) {
+		throw new UnsupportedOperationException("Currently, acking/failing is not supported by Flink");
+	}
+
+	@Override
+	public void fail(final Tuple input) {
+		throw new UnsupportedOperationException("Currently, acking/failing is not supported by Flink");
+	}
+
+}


Mime
View raw message