flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mj...@apache.org
Subject [04/15] flink git commit: [Storm Compatibility] Maven module restucturing and cleanup - removed storm-parent; renamed storm-core and storm-examples - updated internal Java package structure * renamed package "stormcompatibility" to "storm" *
Date Tue, 06 Oct 2015 11:31:29 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/wordcount/WordCountLocalITCase.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/wordcount/WordCountLocalITCase.java b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/wordcount/WordCountLocalITCase.java
new file mode 100644
index 0000000..39e7a25
--- /dev/null
+++ b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/wordcount/WordCountLocalITCase.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.storm.wordcount;
+
+import org.apache.flink.storm.util.StormTestBase;
+import org.apache.flink.storm.wordcount.WordCountLocal;
+import org.apache.flink.test.testdata.WordCountData;
+
+public class WordCountLocalITCase extends StormTestBase {
+
+	protected String textPath;
+	protected String resultPath;
+
+	@Override
+	protected void preSubmit() throws Exception {
+		this.textPath = this.createTempFile("text.txt", WordCountData.TEXT);
+		this.resultPath = this.getTempDirPath("result");
+	}
+
+	@Override
+	protected void postSubmit() throws Exception {
+		compareResultsByLinesInMemory(WordCountData.STREAMING_COUNTS_AS_TUPLES, this.resultPath);
+	}
+
+	@Override
+	protected void testProgram() throws Exception {
+		WordCountLocal.main(new String[]{this.textPath, this.resultPath});
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/wordcount/WordCountLocalNamedITCase.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/wordcount/WordCountLocalNamedITCase.java b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/wordcount/WordCountLocalNamedITCase.java
new file mode 100644
index 0000000..78acfe5
--- /dev/null
+++ b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/wordcount/WordCountLocalNamedITCase.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.storm.wordcount;
+
+import org.apache.flink.storm.util.StormTestBase;
+import org.apache.flink.storm.wordcount.WordCountLocalByName;
+import org.apache.flink.test.testdata.WordCountData;
+
+public class WordCountLocalNamedITCase extends StormTestBase {
+
+	protected String textPath;
+	protected String resultPath;
+
+	@Override
+	protected void preSubmit() throws Exception {
+		this.textPath = this.createTempFile("text.txt", WordCountData.TEXT);
+		this.resultPath = this.getTempDirPath("result");
+	}
+
+	@Override
+	protected void postSubmit() throws Exception {
+		compareResultsByLinesInMemory(WordCountData.STREAMING_COUNTS_AS_TUPLES, this.resultPath);
+	}
+
+	@Override
+	protected void testProgram() throws Exception {
+		WordCountLocalByName.main(new String[] { this.textPath, this.resultPath });
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-examples/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-examples/src/test/resources/log4j-test.properties b/flink-contrib/flink-storm-examples/src/test/resources/log4j-test.properties
new file mode 100644
index 0000000..0b686e5
--- /dev/null
+++ b/flink-contrib/flink-storm-examples/src/test/resources/log4j-test.properties
@@ -0,0 +1,27 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# Set root logger level to DEBUG and its only appender to A1.
+log4j.rootLogger=OFF, A1
+
+# A1 is set to be a ConsoleAppender.
+log4j.appender.A1=org.apache.log4j.ConsoleAppender
+
+# A1 uses PatternLayout.
+log4j.appender.A1.layout=org.apache.log4j.PatternLayout
+log4j.appender.A1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-examples/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-examples/src/test/resources/log4j.properties b/flink-contrib/flink-storm-examples/src/test/resources/log4j.properties
new file mode 100644
index 0000000..ed2bbcb
--- /dev/null
+++ b/flink-contrib/flink-storm-examples/src/test/resources/log4j.properties
@@ -0,0 +1,27 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# This file ensures that tests executed from the IDE show log output
+
+log4j.rootLogger=OFF, console
+
+# Log all infos in the given file
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.target = System.err
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-examples/src/test/resources/logback-test.xml
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-examples/src/test/resources/logback-test.xml b/flink-contrib/flink-storm-examples/src/test/resources/logback-test.xml
new file mode 100644
index 0000000..4f56748
--- /dev/null
+++ b/flink-contrib/flink-storm-examples/src/test/resources/logback-test.xml
@@ -0,0 +1,30 @@
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<configuration>
+    <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+        <encoder>
+            <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{60} %X{sourceThread} - %msg%n</pattern>
+        </encoder>
+    </appender>
+
+    <root level="WARN">
+        <appender-ref ref="STDOUT"/>
+    </root>
+    <logger name="org.apache.flink.runtime.client.JobClient" level="OFF"/>
+</configuration>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm/README.md
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/README.md b/flink-contrib/flink-storm/README.md
new file mode 100644
index 0000000..239780c
--- /dev/null
+++ b/flink-contrib/flink-storm/README.md
@@ -0,0 +1,15 @@
+# flink-storm
+
+`flink-storm` is compatibility layer for Apache Storm and allows to embed Spouts or Bolts unmodified within a regular Flink streaming program (`SpoutWrapper` and `BoltWrapper`).
+Additionally, a whole Storm topology can be submitted to Flink (see `FlinkTopologyBuilder`, `FlinkLocalCluster`, and `FlinkSubmitter`).
+Only a few minor changes to the original submitting code are required.
+The code that builds the topology itself, can be reused unmodified. See `flink-storm-examples` for a simple word-count example.
+
+The following Storm features are not (yet/fully) supported by the compatibility layer right now:
+* tuple meta information
+* no fault-tolerance guarantees (ie, calls to `ack()`/`fail()` and anchoring is ignored)
+* for whole Storm topologies the following is not supported by Flink:
+  * direct emit connection pattern
+  * activating/deactivating and rebalancing of topologies
+  * task hooks
+  * metrics

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm/pom.xml
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/pom.xml b/flink-contrib/flink-storm/pom.xml
new file mode 100644
index 0000000..657b974
--- /dev/null
+++ b/flink-contrib/flink-storm/pom.xml
@@ -0,0 +1,114 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+	<modelVersion>4.0.0</modelVersion>
+
+	<parent>
+		<groupId>org.apache.flink</groupId>
+		<artifactId>flink-contrib-parent</artifactId>
+		<version>0.10-SNAPSHOT</version>
+		<relativePath>..</relativePath>
+	</parent>
+
+	<artifactId>flink-storm</artifactId>
+	<name>flink-storm</name>
+
+	<packaging>jar</packaging>
+
+	<dependencies>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-streaming-core</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.storm</groupId>
+			<artifactId>storm-core</artifactId>
+			<version>0.9.4</version>
+			<exclusions>
+				<exclusion>
+					<groupId>org.slf4j</groupId>
+					<artifactId>log4j-over-slf4j</artifactId>
+				</exclusion>
+				<exclusion>
+					<artifactId>logback-classic</artifactId>
+					<groupId>ch.qos.logback</groupId>
+				</exclusion>
+			</exclusions>
+		</dependency>
+
+		<dependency>
+			<groupId>com.google.guava</groupId>
+			<artifactId>guava</artifactId>
+			<version>${guava.version}</version>
+		</dependency>
+	</dependencies>
+
+	<build>
+		<plugins>
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-jar-plugin</artifactId>
+				<executions>
+					<execution>
+						<goals>
+							<goal>test-jar</goal>
+						</goals>
+					</execution>
+				</executions>
+			</plugin>
+		</plugins>
+
+		<pluginManagement>
+			<plugins>
+				<!--This plugin's configuration is used to store Eclipse m2e settings only. It has no influence on the Maven build itself.-->
+				<plugin>
+					<groupId>org.eclipse.m2e</groupId>
+					<artifactId>lifecycle-mapping</artifactId>
+					<version>1.0.0</version>
+					<configuration>
+						<lifecycleMappingMetadata>
+							<pluginExecutions>
+								<pluginExecution>
+									<pluginExecutionFilter>
+										<groupId>org.apache.maven.plugins</groupId>
+										<artifactId>maven-dependency-plugin</artifactId>
+										<versionRange>[2.9,)</versionRange>
+										<goals>
+											<goal>unpack</goal>
+										</goals>
+									</pluginExecutionFilter>
+									<action>
+										<ignore/>
+									</action>
+								</pluginExecution>
+							</pluginExecutions>
+						</lifecycleMappingMetadata>
+					</configuration>
+				</plugin>
+			</plugins>
+		</pluginManagement>
+
+	</build>
+
+</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkClient.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkClient.java b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkClient.java
new file mode 100644
index 0000000..5f0ee21
--- /dev/null
+++ b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkClient.java
@@ -0,0 +1,315 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.storm.api;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.pattern.Patterns;
+import akka.util.Timeout;
+import backtype.storm.Config;
+import backtype.storm.generated.AlreadyAliveException;
+import backtype.storm.generated.InvalidTopologyException;
+import backtype.storm.generated.KillOptions;
+import backtype.storm.generated.Nimbus;
+import backtype.storm.generated.NotAliveException;
+import backtype.storm.utils.NimbusClient;
+import backtype.storm.utils.Utils;
+
+import com.google.common.collect.Lists;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.client.program.Client;
+import org.apache.flink.client.program.JobWithJars;
+import org.apache.flink.client.program.ProgramInvocationException;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.client.JobStatusMessage;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobmanager.JobManager;
+import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.runtime.messages.JobManagerMessages.CancelJob;
+import org.apache.flink.runtime.messages.JobManagerMessages.RunningJobsStatus;
+import org.apache.flink.storm.util.StormConfig;
+
+import scala.Some;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * {@link FlinkClient} mimics a Storm {@link NimbusClient} and {@link Nimbus}{@code .Client} at once, to interact with
+ * Flink's JobManager instead of Storm's Nimbus.
+ */
+public class FlinkClient {
+
+	/** The client's configuration */
+	private final Map<?,?> conf;
+	/** The jobmanager's host name */
+	private final String jobManagerHost;
+	/** The jobmanager's rpc port */
+	private final int jobManagerPort;
+	/** The user specified timeout in milliseconds */
+	private final String timeout;
+
+	// The following methods are derived from "backtype.storm.utils.NimbusClient"
+
+	/**
+	 * Instantiates a new {@link FlinkClient} for the given configuration, host name, and port. If values for {@link
+	 * Config#NIMBUS_HOST} and {@link Config#NIMBUS_THRIFT_PORT} of the given configuration are ignored.
+	 *
+	 * @param conf
+	 * 		A configuration.
+	 * @param host
+	 * 		The jobmanager's host name.
+	 * @param port
+	 * 		The jobmanager's rpc port.
+	 */
+	@SuppressWarnings("rawtypes")
+	public FlinkClient(final Map conf, final String host, final int port) {
+		this(conf, host, port, null);
+	}
+
+	/**
+	 * Instantiates a new {@link FlinkClient} for the given configuration, host name, and port. If values for {@link
+	 * Config#NIMBUS_HOST} and {@link Config#NIMBUS_THRIFT_PORT} of the given configuration are ignored.
+	 *
+	 * @param conf
+	 * 		A configuration.
+	 * @param host
+	 * 		The jobmanager's host name.
+	 * @param port
+	 * 		The jobmanager's rpc port.
+	 * @param timeout
+	 * 		Timeout
+	 */
+	@SuppressWarnings("rawtypes")
+	public FlinkClient(final Map conf, final String host, final int port, final Integer timeout) {
+		this.conf = conf;
+		this.jobManagerHost = host;
+		this.jobManagerPort = port;
+		if (timeout != null) {
+			this.timeout = timeout + " ms";
+		} else {
+			this.timeout = null;
+		}
+	}
+
+	/**
+	 * Returns a {@link FlinkClient} that uses the configured {@link Config#NIMBUS_HOST} and {@link
+	 * Config#NIMBUS_THRIFT_PORT} as JobManager address.
+	 *
+	 * @param conf
+	 * 		Configuration that contains the jobmanager's hostname and port.
+	 * @return A configured {@link FlinkClient}.
+	 */
+	@SuppressWarnings("rawtypes")
+	public static FlinkClient getConfiguredClient(final Map conf) {
+		final String nimbusHost = (String) conf.get(Config.NIMBUS_HOST);
+		final int nimbusPort = Utils.getInt(conf.get(Config.NIMBUS_THRIFT_PORT)).intValue();
+		return new FlinkClient(conf, nimbusHost, nimbusPort);
+	}
+
+	/**
+	 * Return a reference to itself.
+	 * <p/>
+	 * {@link FlinkClient} mimics both, {@link NimbusClient} and {@link Nimbus}{@code .Client}, at once.
+	 *
+	 * @return A reference to itself.
+	 */
+	public FlinkClient getClient() {
+		return this;
+	}
+
+	// The following methods are derived from "backtype.storm.generated.Nimubs.Client"
+
+	/**
+	 * Parameter {@code uploadedJarLocation} is actually used to point to the local jar, because Flink does not support
+	 * uploading a jar file before hand. Jar files are always uploaded directly when a program is submitted.
+	 */
+	public void submitTopology(final String name, final String uploadedJarLocation, final FlinkTopology topology)
+			throws AlreadyAliveException, InvalidTopologyException {
+		this.submitTopologyWithOpts(name, uploadedJarLocation, topology);
+	}
+
+	/**
+	 * Parameter {@code uploadedJarLocation} is actually used to point to the local jar, because Flink does not support
+	 * uploading a jar file before hand. Jar files are always uploaded directly when a program is submitted.
+	 */
+	public void submitTopologyWithOpts(final String name, final String uploadedJarLocation, final FlinkTopology
+			topology)
+					throws AlreadyAliveException, InvalidTopologyException {
+
+		if (this.getTopologyJobId(name) != null) {
+			throw new AlreadyAliveException();
+		}
+
+		final File uploadedJarFile = new File(uploadedJarLocation);
+		try {
+			JobWithJars.checkJarFile(uploadedJarFile);
+		} catch (final IOException e) {
+			throw new RuntimeException("Problem with jar file " + uploadedJarFile.getAbsolutePath(), e);
+		}
+
+		/* set storm configuration */
+		if (this.conf != null) {
+			topology.getConfig().setGlobalJobParameters(new StormConfig(this.conf));
+		}
+
+		final JobGraph jobGraph = topology.getStreamGraph().getJobGraph(name);
+		jobGraph.addJar(new Path(uploadedJarFile.getAbsolutePath()));
+
+		final Configuration configuration = jobGraph.getJobConfiguration();
+		configuration.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, jobManagerHost);
+		configuration.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, jobManagerPort);
+
+		final Client client;
+		try {
+			client = new Client(configuration);
+		} catch (IOException e) {
+			throw new RuntimeException("Could not establish a connection to the job manager", e);
+		}
+
+		try {
+			ClassLoader classLoader = JobWithJars.buildUserCodeClassLoader(
+					Lists.newArrayList(uploadedJarFile),
+					this.getClass().getClassLoader());
+			client.runDetached(jobGraph, classLoader);
+		} catch (final ProgramInvocationException e) {
+			throw new RuntimeException("Cannot execute job due to ProgramInvocationException", e);
+		}
+	}
+
+	public void killTopology(final String name) throws NotAliveException {
+		this.killTopologyWithOpts(name, null);
+	}
+
+	public void killTopologyWithOpts(final String name, final KillOptions options) throws NotAliveException {
+		final JobID jobId = this.getTopologyJobId(name);
+		if (jobId == null) {
+			throw new NotAliveException();
+		}
+
+		try {
+			final ActorRef jobManager = this.getJobManager();
+
+			if (options != null) {
+				try {
+					Thread.sleep(1000 * options.get_wait_secs());
+				} catch (final InterruptedException e) {
+					throw new RuntimeException(e);
+				}
+			}
+
+			final FiniteDuration askTimeout = this.getTimeout();
+			final Future<Object> response = Patterns.ask(jobManager, new CancelJob(jobId), new Timeout(askTimeout));
+			try {
+				Await.result(response, askTimeout);
+			} catch (final Exception e) {
+				throw new RuntimeException("Killing topology " + name + " with Flink job ID " + jobId + " failed", e);
+			}
+		} catch (final IOException e) {
+			throw new RuntimeException("Could not connect to Flink JobManager with address " + this.jobManagerHost
+					+ ":" + this.jobManagerPort, e);
+		}
+	}
+
+	// Flink specific additional methods
+
+	/**
+	 * Package internal method to get a Flink {@link JobID} from a Storm topology name.
+	 *
+	 * @param id
+	 * 		The Storm topology name.
+	 * @return Flink's internally used {@link JobID}.
+	 */
+	JobID getTopologyJobId(final String id) {
+		final Configuration configuration = GlobalConfiguration.getConfiguration();
+		if (this.timeout != null) {
+			configuration.setString(ConfigConstants.AKKA_ASK_TIMEOUT, this.timeout);
+		}
+
+		try {
+			final ActorRef jobManager = this.getJobManager();
+
+			final FiniteDuration askTimeout = this.getTimeout();
+			final Future<Object> response = Patterns.ask(jobManager, JobManagerMessages.getRequestRunningJobsStatus(),
+					new Timeout(askTimeout));
+
+			Object result;
+			try {
+				result = Await.result(response, askTimeout);
+			} catch (final Exception e) {
+				throw new RuntimeException("Could not retrieve running jobs from the JobManager", e);
+			}
+
+			if (result instanceof RunningJobsStatus) {
+				final List<JobStatusMessage> jobs = ((RunningJobsStatus) result).getStatusMessages();
+
+				for (final JobStatusMessage status : jobs) {
+					if (status.getJobName().equals(id)) {
+						return status.getJobId();
+					}
+				}
+			} else {
+				throw new RuntimeException("ReqeustRunningJobs requires a response of type "
+						+ "RunningJobs. Instead the response is of type " + result.getClass() + ".");
+			}
+		} catch (final IOException e) {
+			throw new RuntimeException("Could not connect to Flink JobManager with address " + this.jobManagerHost
+					+ ":" + this.jobManagerPort, e);
+		}
+
+		return null;
+	}
+
+	private FiniteDuration getTimeout() {
+		final Configuration configuration = GlobalConfiguration.getConfiguration();
+		if (this.timeout != null) {
+			configuration.setString(ConfigConstants.AKKA_ASK_TIMEOUT, this.timeout);
+		}
+
+		return AkkaUtils.getTimeout(configuration);
+	}
+
+	private ActorRef getJobManager() throws IOException {
+		final Configuration configuration = GlobalConfiguration.getConfiguration();
+
+		ActorSystem actorSystem;
+		try {
+			final scala.Tuple2<String, Object> systemEndpoint = new scala.Tuple2<String, Object>("", 0);
+			actorSystem = AkkaUtils.createActorSystem(configuration, new Some<scala.Tuple2<String, Object>>(
+					systemEndpoint));
+		} catch (final Exception e) {
+			throw new RuntimeException("Could not start actor system to communicate with JobManager", e);
+		}
+
+		return JobManager.getJobManagerActorRef(new InetSocketAddress(this.jobManagerHost, this.jobManagerPort),
+				actorSystem, AkkaUtils.getLookupTimeout(configuration));
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkLocalCluster.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkLocalCluster.java b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkLocalCluster.java
new file mode 100644
index 0000000..868801b
--- /dev/null
+++ b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkLocalCluster.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.storm.api;
+
+import backtype.storm.LocalCluster;
+import backtype.storm.generated.ClusterSummary;
+import backtype.storm.generated.KillOptions;
+import backtype.storm.generated.RebalanceOptions;
+import backtype.storm.generated.StormTopology;
+import backtype.storm.generated.SubmitOptions;
+import backtype.storm.generated.TopologyInfo;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.StreamingMode;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.minicluster.FlinkMiniCluster;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
+import org.apache.flink.storm.util.StormConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * {@link FlinkLocalCluster} mimics a Storm {@link LocalCluster}.
+ */
+public class FlinkLocalCluster {
+
+	/** The log used by this mini cluster */
+	private static final Logger LOG = LoggerFactory.getLogger(FlinkLocalCluster.class);
+
+	/** The flink mini cluster on which to execute the programs */
+	private final FlinkMiniCluster flink;
+
+
+	public FlinkLocalCluster() {
+		this.flink = new LocalFlinkMiniCluster(new Configuration(), true, StreamingMode.STREAMING);
+		this.flink.start();
+	}
+
+	public FlinkLocalCluster(FlinkMiniCluster flink) {
+		this.flink = Objects.requireNonNull(flink);
+	}
+
+	@SuppressWarnings("rawtypes")
+	public void submitTopology(final String topologyName, final Map conf, final FlinkTopology topology)
+			throws Exception {
+		this.submitTopologyWithOpts(topologyName, conf, topology, null);
+	}
+
+	@SuppressWarnings("rawtypes")
+	public void submitTopologyWithOpts(final String topologyName, final Map conf, final FlinkTopology topology, final SubmitOptions submitOpts) throws Exception {
+		LOG.info("Running Storm topology on FlinkLocalCluster");
+
+		if(conf != null) {
+			topology.getConfig().setGlobalJobParameters(new StormConfig(conf));
+		}
+
+		JobGraph jobGraph = topology.getStreamGraph().getJobGraph(topologyName);
+		this.flink.submitJobDetached(jobGraph);
+	}
+
+	public void killTopology(final String topologyName) {
+		this.killTopologyWithOpts(topologyName, null);
+	}
+
+	public void killTopologyWithOpts(final String name, final KillOptions options) {
+	}
+
+	public void activate(final String topologyName) {
+	}
+
+	public void deactivate(final String topologyName) {
+	}
+
+	public void rebalance(final String name, final RebalanceOptions options) {
+	}
+
+	public void shutdown() {
+		flink.stop();
+	}
+
+	public String getTopologyConf(final String id) {
+		return null;
+	}
+
+	public StormTopology getTopology(final String id) {
+		return null;
+	}
+
+	public ClusterSummary getClusterInfo() {
+		return null;
+	}
+
+	public TopologyInfo getTopologyInfo(final String id) {
+		return null;
+	}
+
+	public Map<?, ?> getState() {
+		return null;
+	}
+
+	// ------------------------------------------------------------------------
+	//  Access to default local cluster
+	// ------------------------------------------------------------------------
+
+	// A different {@link FlinkLocalCluster} to be used for execution of ITCases
+	private static LocalClusterFactory currentFactory = new DefaultLocalClusterFactory();
+
+	/**
+	 * Returns a {@link FlinkLocalCluster} that should be used for execution. If no cluster was set by
+	 * {@link #initialize(LocalClusterFactory)} in advance, a new {@link FlinkLocalCluster} is returned.
+	 *
+	 * @return a {@link FlinkLocalCluster} to be used for execution
+	 */
+	public static FlinkLocalCluster getLocalCluster() {
+		return currentFactory.createLocalCluster();
+	}
+
+	/**
+	 * Sets a different factory for FlinkLocalClusters to be used for execution.
+	 *
+	 * @param clusterFactory
+	 * 		The LocalClusterFactory to create the local clusters for execution.
+	 */
+	public static void initialize(LocalClusterFactory clusterFactory) {
+		currentFactory = Objects.requireNonNull(clusterFactory);
+	}
+
+	// ------------------------------------------------------------------------
+	//  Cluster factory
+	// ------------------------------------------------------------------------
+
+	/**
+	 * A factory that creates local clusters.
+	 */
+	public static interface LocalClusterFactory {
+
+		/**
+		 * Creates a local flink cluster.
+		 * @return A local flink cluster.
+		 */
+		FlinkLocalCluster createLocalCluster();
+	}
+
+	/**
+	 * A factory that instantiates a FlinkLocalCluster.
+	 */
+	public static class DefaultLocalClusterFactory implements LocalClusterFactory {
+
+		@Override
+		public FlinkLocalCluster createLocalCluster() {
+			return new FlinkLocalCluster();
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkOutputFieldsDeclarer.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkOutputFieldsDeclarer.java b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkOutputFieldsDeclarer.java
new file mode 100644
index 0000000..88d2dfe
--- /dev/null
+++ b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkOutputFieldsDeclarer.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.storm.api;
+
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.tuple.Fields;
+import backtype.storm.utils.Utils;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+
+import java.util.HashMap;
+import java.util.List;
+
+/**
+ * {@link FlinkOutputFieldsDeclarer} is used to get the declared output schema of a
+ * {@link backtype.storm.topology.IRichSpout spout} or {@link backtype.storm.topology.IRichBolt bolt}.<br />
+ * <br />
+ * <strong>CAUTION: Flink does not support direct emit.</strong>
+ */
+final class FlinkOutputFieldsDeclarer implements OutputFieldsDeclarer {
+
+	/** The declared output streams and schemas. */
+	final HashMap<String, Fields> outputStreams = new HashMap<String, Fields>();
+
+	@Override
+	public void declare(final Fields fields) {
+		this.declareStream(Utils.DEFAULT_STREAM_ID, false, fields);
+	}
+
+	/**
+	 * {@inheritDoc}
+	 * <p/>
+	 * Direct emit is no supported by Flink. Parameter {@code direct} must be {@code false}.
+	 *
+	 * @throws UnsupportedOperationException
+	 * 		if {@code direct} is {@code true}
+	 */
+	@Override
+	public void declare(final boolean direct, final Fields fields) {
+		this.declareStream(Utils.DEFAULT_STREAM_ID, direct, fields);
+	}
+
+	@Override
+	public void declareStream(final String streamId, final Fields fields) {
+		this.declareStream(streamId, false, fields);
+	}
+
+	/**
+	 * {@inheritDoc}
+	 * <p/>
+	 * Direct emit is no supported by Flink. Parameter {@code direct} must be {@code false}.
+	 *
+	 * @throws UnsupportedOperationException
+	 * 		if {@code direct} is {@code true}
+	 */
+	@Override
+	public void declareStream(final String streamId, final boolean direct, final Fields fields) {
+		if (direct) {
+			throw new UnsupportedOperationException("Direct emit is not supported by Flink");
+		}
+
+		this.outputStreams.put(streamId, fields);
+	}
+
+	/**
+	 * Returns {@link TypeInformation} for the declared output schema for a specific stream.
+	 * 
+	 * @param streamId
+	 *            A stream ID.
+	 * 
+	 * @return output type information for the declared output schema of the specified stream; or {@code null} if
+	 *         {@code streamId == null}
+	 * 
+	 * @throws IllegalArgumentException
+	 *             If no output schema was declared for the specified stream or if more then 25 attributes got declared.
+	 */
+	TypeInformation<?> getOutputType(final String streamId) throws IllegalArgumentException {
+		if (streamId == null) {
+			return null;
+		}
+
+		Fields outputSchema = this.outputStreams.get(streamId);
+		if (outputSchema == null) {
+			throw new IllegalArgumentException("Stream with ID '" + streamId
+					+ "' was not declared.");
+		}
+
+		Tuple t;
+		final int numberOfAttributes = outputSchema.size();
+
+		if (numberOfAttributes == 1) {
+			return TypeExtractor.getForClass(Object.class);
+		} else if (numberOfAttributes <= 25) {
+			try {
+				t = Tuple.getTupleClass(numberOfAttributes).newInstance();
+			} catch (final InstantiationException e) {
+				throw new RuntimeException(e);
+			} catch (final IllegalAccessException e) {
+				throw new RuntimeException(e);
+			}
+		} else {
+			throw new IllegalArgumentException("Flink supports only a maximum number of 25 attributes");
+		}
+
+		// TODO: declare only key fields as DefaultComparable
+		for (int i = 0; i < numberOfAttributes; ++i) {
+			t.setField(new DefaultComparable(), i);
+		}
+
+		return TypeExtractor.getForObject(t);
+	}
+
+	/**
+	 * {@link DefaultComparable} is a {@link Comparable} helper class that is used to get the correct {@link
+	 * TypeInformation} from {@link TypeExtractor} within {@link #getOutputType()}. If key fields are not comparable,
+	 * Flink cannot use them and will throw an exception.
+	 */
+	private static class DefaultComparable implements Comparable<DefaultComparable> {
+
+		public DefaultComparable() {
+		}
+
+		@Override
+		public int compareTo(final DefaultComparable o) {
+			return 0;
+		}
+	}
+
+	/**
+	 * Computes the indexes within the declared output schema of the specified stream, for a list of given
+	 * field-grouping attributes.
+	 * 
+	 * @param streamId
+	 *            A stream ID.
+	 * @param groupingFields
+	 *            The names of the key fields.
+	 * 
+	 * @return array of {@code int}s that contains the index within the output schema for each attribute in the given
+	 *         list
+	 */
+	int[] getGroupingFieldIndexes(final String streamId, final List<String> groupingFields) {
+		final int[] fieldIndexes = new int[groupingFields.size()];
+
+		for (int i = 0; i < fieldIndexes.length; ++i) {
+			fieldIndexes[i] = this.outputStreams.get(streamId).fieldIndex(groupingFields.get(i));
+		}
+
+		return fieldIndexes;
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkSubmitter.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkSubmitter.java b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkSubmitter.java
new file mode 100644
index 0000000..9b03c68
--- /dev/null
+++ b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkSubmitter.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.storm.api;
+
+import backtype.storm.Config;
+import backtype.storm.StormSubmitter;
+import backtype.storm.generated.AlreadyAliveException;
+import backtype.storm.generated.InvalidTopologyException;
+import backtype.storm.generated.SubmitOptions;
+import backtype.storm.utils.Utils;
+
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.client.program.ContextEnvironment;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.json.simple.JSONValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.util.Map;
+
+/**
+ * {@link FlinkSubmitter} mimics a {@link StormSubmitter} to submit Storm topologies to a Flink cluster.
+ */
+public class FlinkSubmitter {
+	public final static Logger logger = LoggerFactory.getLogger(FlinkSubmitter.class);
+
+	/**
+	 * Submits a topology to run on the cluster. A topology runs forever or until explicitly killed.
+	 *
+	 * @param name
+	 * 		the name of the storm.
+	 * @param stormConf
+	 * 		the topology-specific configuration. See {@link Config}.
+	 * @param topology
+	 * 		the processing to execute.
+	 * @param opts
+	 * 		to manipulate the starting of the topology.
+	 * @throws AlreadyAliveException
+	 * 		if a topology with this name is already running
+	 * @throws InvalidTopologyException
+	 * 		if an invalid topology was submitted
+	 */
+	public static void submitTopology(final String name, final Map<?, ?> stormConf, final FlinkTopology topology,
+			final SubmitOptions opts)
+					throws AlreadyAliveException, InvalidTopologyException {
+		submitTopology(name, stormConf, topology);
+	}
+
+	/**
+	 * Submits a topology to run on the cluster. A topology runs forever or until explicitly killed. The given {@link
+	 * FlinkProgressListener} is ignored because progress bars are not supported by Flink.
+	 *
+	 * @param name
+	 * 		the name of the storm.
+	 * @param stormConf
+	 * 		the topology-specific configuration. See {@link Config}.
+	 * @param topology
+	 * 		the processing to execute.
+	 * @throws AlreadyAliveException
+	 * 		if a topology with this name is already running
+	 * @throws InvalidTopologyException
+	 * 		if an invalid topology was submitted
+	 */
+	@SuppressWarnings({"rawtypes", "unchecked"})
+	public static void submitTopology(final String name, final Map stormConf, final FlinkTopology topology)
+			throws AlreadyAliveException, InvalidTopologyException {
+		if (!Utils.isValidConf(stormConf)) {
+			throw new IllegalArgumentException("Storm conf is not valid. Must be json-serializable");
+		}
+
+		final Configuration flinkConfig = GlobalConfiguration.getConfiguration();
+		if (!stormConf.containsKey(Config.NIMBUS_HOST)) {
+			stormConf.put(Config.NIMBUS_HOST,
+					flinkConfig.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "localhost"));
+		}
+		if (!stormConf.containsKey(Config.NIMBUS_THRIFT_PORT)) {
+			stormConf.put(Config.NIMBUS_THRIFT_PORT,
+					new Integer(flinkConfig.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY,
+							6123)));
+		}
+
+		final String serConf = JSONValue.toJSONString(stormConf);
+
+		final FlinkClient client = FlinkClient.getConfiguredClient(stormConf);
+		try {
+			if (client.getTopologyJobId(name) != null) {
+				throw new RuntimeException("Topology with name `" + name + "` already exists on cluster");
+			}
+			String localJar = System.getProperty("storm.jar");
+			if (localJar == null) {
+				try {
+					for (final File file : ((ContextEnvironment) ExecutionEnvironment.getExecutionEnvironment())
+							.getJars()) {
+						// TODO verify that there is only one jar
+						localJar = file.getAbsolutePath();
+					}
+				} catch (final ClassCastException e) {
+					// ignore
+				}
+			}
+
+			logger.info("Submitting topology " + name + " in distributed mode with conf " + serConf);
+			client.submitTopologyWithOpts(name, localJar, topology);
+		} catch (final InvalidTopologyException e) {
+			logger.warn("Topology submission exception: " + e.get_msg());
+			throw e;
+		} catch (final AlreadyAliveException e) {
+			logger.warn("Topology already alive exception", e);
+			throw e;
+		}
+
+		logger.info("Finished submitting topology: " + name);
+	}
+
+	/**
+	 * Same as {@link #submitTopology(String, Map, FlinkTopology, SubmitOptions)}. Progress bars are not supported by
+	 * Flink.
+	 *
+	 * @param name
+	 * 		the name of the storm.
+	 * @param stormConf
+	 * 		the topology-specific configuration. See {@link Config}.
+	 * @param topology
+	 * 		the processing to execute.
+	 * @throws AlreadyAliveException
+	 * 		if a topology with this name is already running
+	 * @throws InvalidTopologyException
+	 * 		if an invalid topology was submitted
+	 */
+	public static void submitTopologyWithProgressBar(final String name, final Map<?, ?> stormConf,
+			final FlinkTopology topology)
+					throws AlreadyAliveException, InvalidTopologyException {
+		submitTopology(name, stormConf, topology);
+	}
+
+	/**
+	 * In Flink, jar files are submitted directly when a program is started. Thus, this method does nothing. The
+	 * returned value is parameter localJar, because this give the best integration of Storm behavior within a Flink
+	 * environment.
+	 *
+	 * @param conf
+	 * 		the topology-specific configuration. See {@link Config}.
+	 * @param localJar
+	 * 		file path of the jar file to submit
+	 * @return the value of parameter localJar
+	 */
+	@SuppressWarnings("rawtypes")
+	public static String submitJar(final Map conf, final String localJar) {
+		return submitJar(localJar);
+	}
+
+	/**
+	 * In Flink, jar files are submitted directly when a program is started. Thus, this method does nothing. The
+	 * returned value is parameter localJar, because this give the best integration of Storm behavior within a Flink
+	 * environment.
+	 *
+	 * @param localJar
+	 * 		file path of the jar file to submit
+	 * @return the value of parameter localJar
+	 */
+	public static String submitJar(final String localJar) {
+		if (localJar == null) {
+			throw new RuntimeException(
+					"Must submit topologies using the 'storm' client script so that StormSubmitter knows which jar " +
+					"to upload");
+		}
+
+		return localJar;
+	}
+
+	/**
+	 * Dummy interface use to track progress of file upload. Does not do anything. Kept for compatibility.
+	 */
+	public interface FlinkProgressListener {
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkTopology.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkTopology.java b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkTopology.java
new file mode 100644
index 0000000..531d6df
--- /dev/null
+++ b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkTopology.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.storm.api;
+
+import backtype.storm.generated.StormTopology;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+/**
+ * {@link FlinkTopology} mimics a {@link StormTopology} and is implemented in terms of a {@link
+ * StreamExecutionEnvironment} . In contrast to a regular {@link StreamExecutionEnvironment}, a {@link FlinkTopology}
+ * cannot be executed directly, but must be handed over to a {@link FlinkLocalCluster}, {@link FlinkSubmitter}, or
+ * {@link FlinkClient}.
+ */
+public class FlinkTopology extends StreamExecutionEnvironment {
+
+	/** The number of declared tasks for the whole program (ie, sum over all dops) */
+	private int numberOfTasks = 0;
+
+	public FlinkTopology() {
+		// Set default parallelism to 1, to mirror Storm default behavior
+		super.setParallelism(1);
+	}
+
+	/**
+	 * Is not supported. In order to execute use {@link FlinkLocalCluster}, {@link FlinkSubmitter}, or {@link
+	 * FlinkClient}.
+	 *
+	 * @throws UnsupportedOperationException
+	 * 		at every invocation
+	 */
+	@Override
+	public JobExecutionResult execute() throws Exception {
+		throw new UnsupportedOperationException(
+				"A FlinkTopology cannot be executed directly. Use FlinkLocalCluster, FlinkSubmitter, or FlinkClient " +
+				"instead.");
+	}
+
+	/**
+	 * Is not supported. In order to execute use {@link FlinkLocalCluster}, {@link FlinkSubmitter} or {@link
+	 * FlinkClient}.
+	 *
+	 * @throws UnsupportedOperationException
+	 * 		at every invocation
+	 */
+	@Override
+	public JobExecutionResult execute(final String jobName) throws Exception {
+		throw new UnsupportedOperationException(
+				"A FlinkTopology cannot be executed directly. Use FlinkLocalCluster, FlinkSubmitter, or FlinkClient " +
+				"instead.");
+	}
+
+	/**
+	 * Increased the number of declared tasks of this program by the given value.
+	 *
+	 * @param dop
+	 * 		The dop of a new operator that increases the number of overall tasks.
+	 */
+	public void increaseNumberOfTasks(final int dop) {
+		assert (dop > 0);
+		this.numberOfTasks += dop;
+	}
+
+	/**
+	 * Return the number or required tasks to execute this program.
+	 *
+	 * @return the number or required tasks to execute this program
+	 */
+	public int getNumberOfTasks() {
+		return this.numberOfTasks;
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkTopologyBuilder.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkTopologyBuilder.java b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkTopologyBuilder.java
new file mode 100644
index 0000000..99de0e2
--- /dev/null
+++ b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkTopologyBuilder.java
@@ -0,0 +1,397 @@
+/*
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.storm.api;
+
+import backtype.storm.generated.ComponentCommon;
+import backtype.storm.generated.GlobalStreamId;
+import backtype.storm.generated.Grouping;
+import backtype.storm.generated.StormTopology;
+import backtype.storm.topology.BasicBoltExecutor;
+import backtype.storm.topology.BoltDeclarer;
+import backtype.storm.topology.IBasicBolt;
+import backtype.storm.topology.IRichBolt;
+import backtype.storm.topology.IRichSpout;
+import backtype.storm.topology.IRichStateSpout;
+import backtype.storm.topology.SpoutDeclarer;
+import backtype.storm.topology.TopologyBuilder;
+import backtype.storm.tuple.Fields;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.storm.util.SplitStreamType;
+import org.apache.flink.storm.util.SplitStreamTypeKeySelector;
+import org.apache.flink.storm.util.StormStreamSelector;
+import org.apache.flink.storm.wrappers.BoltWrapper;
+import org.apache.flink.storm.wrappers.SpoutWrapper;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.datastream.SplitStream;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.Set;
+
+/**
+ * {@link FlinkTopologyBuilder} mimics a {@link TopologyBuilder}, but builds a Flink program instead of a Storm
+ * topology. Most methods (except {@link #createTopology()} are copied from the original {@link TopologyBuilder}
+ * implementation to ensure equal behavior.<br />
+ * <br />
+ * <strong>CAUTION: {@link IRichStateSpout StateSpout}s are currently not supported.</strong>
+ */
+public class FlinkTopologyBuilder {
+
+	/** A Storm {@link TopologyBuilder} to build a real Storm topology */
+	private final TopologyBuilder stormBuilder = new TopologyBuilder();
+	/** All user spouts by their ID */
+	private final HashMap<String, IRichSpout> spouts = new HashMap<String, IRichSpout>();
+	/** All user bolts by their ID */
+	private final HashMap<String, IRichBolt> bolts = new HashMap<String, IRichBolt>();
+	/** All declared streams and output schemas by operator ID */
+	private final HashMap<String, HashMap<String, Fields>> outputStreams = new HashMap<String, HashMap<String, Fields>>();
+	/** All spouts&bolts declarers by their ID */
+	private final HashMap<String, FlinkOutputFieldsDeclarer> declarers = new HashMap<String, FlinkOutputFieldsDeclarer>();
+	// needs to be a class member for internal testing purpose
+	private StormTopology stormTopology;
+
+
+	/**
+	 * Creates a Flink program that uses the specified spouts and bolts.
+	 */
+	@SuppressWarnings({"rawtypes", "unchecked"})
+	public FlinkTopology createTopology() {
+		this.stormTopology = this.stormBuilder.createTopology();
+
+		final FlinkTopology env = new FlinkTopology();
+		env.setParallelism(1);
+
+		final HashMap<String, HashMap<String, DataStream>> availableInputs = new HashMap<String, HashMap<String, DataStream>>();
+
+		for (final Entry<String, IRichSpout> spout : this.spouts.entrySet()) {
+			final String spoutId = spout.getKey();
+			final IRichSpout userSpout = spout.getValue();
+
+			final FlinkOutputFieldsDeclarer declarer = new FlinkOutputFieldsDeclarer();
+			userSpout.declareOutputFields(declarer);
+			final HashMap<String,Fields> sourceStreams = declarer.outputStreams;
+			this.outputStreams.put(spoutId, sourceStreams);
+			declarers.put(spoutId, declarer);
+
+			final SpoutWrapper spoutWrapper = new SpoutWrapper(userSpout);
+			spoutWrapper.setStormTopology(stormTopology);
+
+			DataStreamSource source;
+			final HashMap<String, DataStream> outputStreams = new HashMap<String, DataStream>();
+			if (sourceStreams.size() == 1) {
+				final String outputStreamId = (String) sourceStreams.keySet().toArray()[0];
+				source = env.addSource(spoutWrapper, spoutId,
+						declarer.getOutputType(outputStreamId));
+				outputStreams.put(outputStreamId, source);
+			} else {
+				source = env.addSource(spoutWrapper, spoutId,
+						TypeExtractor.getForClass(SplitStreamType.class));
+				SplitStream splitSource = source.split(new StormStreamSelector());
+
+				for (String streamId : sourceStreams.keySet()) {
+					outputStreams.put(streamId, splitSource.select(streamId));
+				}
+			}
+			availableInputs.put(spoutId, outputStreams);
+
+			int dop = 1;
+			final ComponentCommon common = stormTopology.get_spouts().get(spoutId).get_common();
+			if (common.is_set_parallelism_hint()) {
+				dop = common.get_parallelism_hint();
+				source.setParallelism(dop);
+			} else {
+				common.set_parallelism_hint(1);
+			}
+			env.increaseNumberOfTasks(dop);
+		}
+
+		final HashMap<String, IRichBolt> unprocessedBolts = new HashMap<String, IRichBolt>();
+		unprocessedBolts.putAll(this.bolts);
+
+		final HashMap<String, Set<Entry<GlobalStreamId, Grouping>>> unprocessdInputsPerBolt =
+				new HashMap<String, Set<Entry<GlobalStreamId, Grouping>>>();
+
+		/* Because we do not know the order in which an iterator steps over a set, we might process a consumer before
+		 * its producer
+		 * ->thus, we might need to repeat multiple times
+		 */
+		boolean makeProgress = true;
+		while (unprocessedBolts.size() > 0) {
+			if (!makeProgress) {
+				throw new RuntimeException(
+						"Unable to build Topology. Could not connect the following bolts: "
+								+ unprocessedBolts.keySet());
+			}
+			makeProgress = false;
+
+			final Iterator<Entry<String, IRichBolt>> boltsIterator = unprocessedBolts.entrySet().iterator();
+			while (boltsIterator.hasNext()) {
+
+				final Entry<String, IRichBolt> bolt = boltsIterator.next();
+				final String boltId = bolt.getKey();
+				final IRichBolt userBolt = bolt.getValue();
+
+				final ComponentCommon common = stormTopology.get_bolts().get(boltId).get_common();
+
+				Set<Entry<GlobalStreamId, Grouping>> unprocessedInputs = unprocessdInputsPerBolt.get(boltId);
+				if (unprocessedInputs == null) {
+					unprocessedInputs = new HashSet<Entry<GlobalStreamId, Grouping>>();
+					unprocessedInputs.addAll(common.get_inputs().entrySet());
+					unprocessdInputsPerBolt.put(boltId, unprocessedInputs);
+				}
+
+				// connect each available producer to the current bolt
+				final Iterator<Entry<GlobalStreamId, Grouping>> inputStreamsIterator = unprocessedInputs.iterator();
+				while (inputStreamsIterator.hasNext()) {
+
+					final Entry<GlobalStreamId, Grouping> stormInputStream = inputStreamsIterator.next();
+					final String producerId = stormInputStream.getKey().get_componentId();
+					final String inputStreamId = stormInputStream.getKey().get_streamId();
+
+					final HashMap<String, DataStream> producer = availableInputs.get(producerId);
+					if (producer != null) {
+						makeProgress = true;
+
+						DataStream inputStream = producer.get(inputStreamId);
+						if (inputStream != null) {
+							final FlinkOutputFieldsDeclarer declarer = new FlinkOutputFieldsDeclarer();
+							userBolt.declareOutputFields(declarer);
+							final HashMap<String, Fields> boltOutputStreams = declarer.outputStreams;
+							this.outputStreams.put(boltId, boltOutputStreams);
+							this.declarers.put(boltId, declarer);
+
+							// if producer was processed already
+							final Grouping grouping = stormInputStream.getValue();
+							if (grouping.is_set_shuffle()) {
+								// Storm uses a round-robin shuffle strategy
+								inputStream = inputStream.rebalance();
+							} else if (grouping.is_set_fields()) {
+								// global grouping is emulated in Storm via an empty fields grouping list
+								final List<String> fields = grouping.get_fields();
+								if (fields.size() > 0) {
+									FlinkOutputFieldsDeclarer prodDeclarer = this.declarers.get(producerId);
+									if (producer.size() == 1) {
+										inputStream = inputStream.keyBy(prodDeclarer
+												.getGroupingFieldIndexes(inputStreamId,
+														grouping.get_fields()));
+									} else {
+										inputStream = inputStream
+												.keyBy(new SplitStreamTypeKeySelector(
+														prodDeclarer.getGroupingFieldIndexes(
+																inputStreamId,
+																grouping.get_fields())));
+									}
+								} else {
+									inputStream = inputStream.global();
+								}
+							} else if (grouping.is_set_all()) {
+								inputStream = inputStream.broadcast();
+							} else if (!grouping.is_set_local_or_shuffle()) {
+								throw new UnsupportedOperationException(
+										"Flink only supports (local-or-)shuffle, fields, all, and global grouping");
+							}
+
+							final SingleOutputStreamOperator outputStream;
+							final BoltWrapper boltWrapper;
+							if (boltOutputStreams.size() < 2) { // single output stream or sink
+								String outputStreamId = null;
+								if (boltOutputStreams.size() == 1) {
+									outputStreamId = (String) boltOutputStreams.keySet().toArray()[0];
+								}
+								final TypeInformation<?> outType = declarer
+										.getOutputType(outputStreamId);
+
+								boltWrapper = new BoltWrapper(userBolt, this.outputStreams
+										.get(producerId).get(inputStreamId));
+								outputStream = inputStream.transform(boltId, outType, boltWrapper);
+
+								if (outType != null) {
+									// only for non-sink nodes
+									final HashMap<String, DataStream> op = new HashMap<String, DataStream>();
+									op.put(outputStreamId, outputStream);
+									availableInputs.put(boltId, op);
+								}
+							} else {
+								final TypeInformation<?> outType = TypeExtractor
+										.getForClass(SplitStreamType.class);
+
+								boltWrapper = new BoltWrapper(userBolt, this.outputStreams.get(producerId).get(inputStreamId));
+								outputStream = inputStream.transform(boltId, outType, boltWrapper);
+
+								final SplitStream splitStreams = outputStream
+										.split(new StormStreamSelector());
+
+								final HashMap<String, DataStream> op = new HashMap<String, DataStream>();
+								for (String outputStreamId : boltOutputStreams.keySet()) {
+									op.put(outputStreamId, splitStreams.select(outputStreamId));
+								}
+								availableInputs.put(boltId, op);
+							}
+							boltWrapper.setStormTopology(stormTopology);
+
+							int dop = 1;
+							if (common.is_set_parallelism_hint()) {
+								dop = common.get_parallelism_hint();
+								outputStream.setParallelism(dop);
+							} else {
+								common.set_parallelism_hint(1);
+							}
+							env.increaseNumberOfTasks(dop);
+
+							inputStreamsIterator.remove();
+						} else {
+							throw new RuntimeException("Cannot connect '" + boltId + "' to '"
+									+ producerId + "'. Stream '" + inputStreamId + "' not found.");
+						}
+					}
+				}
+
+				if (unprocessedInputs.size() == 0) {
+					// all inputs are connected; processing bolt completed
+					boltsIterator.remove();
+				}
+			}
+		}
+		return env;
+	}
+
+	/**
+	 * Define a new bolt in this topology with parallelism of just one thread.
+	 *
+	 * @param id
+	 * 		the id of this component. This id is referenced by other components that want to consume this bolt's
+	 * 		outputs.
+	 * @param bolt
+	 * 		the bolt
+	 * @return use the returned object to declare the inputs to this component
+	 */
+	public BoltDeclarer setBolt(final String id, final IRichBolt bolt) {
+		return this.setBolt(id, bolt, null);
+	}
+
+	/**
+	 * Define a new bolt in this topology with the specified amount of parallelism.
+	 *
+	 * @param id
+	 * 		the id of this component. This id is referenced by other components that want to consume this bolt's
+	 * 		outputs.
+	 * @param bolt
+	 * 		the bolt
+	 * @param parallelism_hint
+	 * 		the number of tasks that should be assigned to execute this bolt. Each task will run on a thread in a
+	 * 		process somewhere around the cluster.
+	 * @return use the returned object to declare the inputs to this component
+	 */
+	public BoltDeclarer setBolt(final String id, final IRichBolt bolt, final Number parallelism_hint) {
+		final BoltDeclarer declarer = this.stormBuilder.setBolt(id, bolt, parallelism_hint);
+		this.bolts.put(id, bolt);
+		return declarer;
+	}
+
+	/**
+	 * Define a new bolt in this topology. This defines a basic bolt, which is a simpler to use but more restricted
+	 * kind
+	 * of bolt. Basic bolts are intended for non-aggregation processing and automate the anchoring/acking process to
+	 * achieve proper reliability in the topology.
+	 *
+	 * @param id
+	 * 		the id of this component. This id is referenced by other components that want to consume this bolt's
+	 * 		outputs.
+	 * @param bolt
+	 * 		the basic bolt
+	 * @return use the returned object to declare the inputs to this component
+	 */
+	public BoltDeclarer setBolt(final String id, final IBasicBolt bolt) {
+		return this.setBolt(id, bolt, null);
+	}
+
+	/**
+	 * Define a new bolt in this topology. This defines a basic bolt, which is a simpler to use but more restricted
+	 * kind
+	 * of bolt. Basic bolts are intended for non-aggregation processing and automate the anchoring/acking process to
+	 * achieve proper reliability in the topology.
+	 *
+	 * @param id
+	 * 		the id of this component. This id is referenced by other components that want to consume this bolt's
+	 * 		outputs.
+	 * @param bolt
+	 * 		the basic bolt
+	 * @param parallelism_hint
+	 * 		the number of tasks that should be assigned to execute this bolt. Each task will run on a thread in a
+	 * 		process somwehere around the cluster.
+	 * @return use the returned object to declare the inputs to this component
+	 */
+	public BoltDeclarer setBolt(final String id, final IBasicBolt bolt, final Number parallelism_hint) {
+		return this.setBolt(id, new BasicBoltExecutor(bolt), parallelism_hint);
+	}
+
+	/**
+	 * Define a new spout in this topology.
+	 *
+	 * @param id
+	 * 		the id of this component. This id is referenced by other components that want to consume this spout's
+	 * 		outputs.
+	 * @param spout
+	 * 		the spout
+	 */
+	public SpoutDeclarer setSpout(final String id, final IRichSpout spout) {
+		return this.setSpout(id, spout, null);
+	}
+
+	/**
+	 * Define a new spout in this topology with the specified parallelism. If the spout declares itself as
+	 * non-distributed, the parallelism_hint will be ignored and only one task will be allocated to this component.
+	 *
+	 * @param id
+	 * 		the id of this component. This id is referenced by other components that want to consume this spout's
+	 * 		outputs.
+	 * @param parallelism_hint
+	 * 		the number of tasks that should be assigned to execute this spout. Each task will run on a thread in a
+	 * 		process somwehere around the cluster.
+	 * @param spout
+	 * 		the spout
+	 */
+	public SpoutDeclarer setSpout(final String id, final IRichSpout spout, final Number parallelism_hint) {
+		final SpoutDeclarer declarer = this.stormBuilder.setSpout(id, spout, parallelism_hint);
+		this.spouts.put(id, spout);
+		return declarer;
+	}
+
+	// TODO add StateSpout support (Storm 0.9.4 does not yet support StateSpouts itself)
+	/* not implemented by Storm 0.9.4
+	 * public void setStateSpout(final String id, final IRichStateSpout stateSpout) {
+	 * this.stormBuilder.setStateSpout(id, stateSpout);
+	 * }
+	 * public void setStateSpout(final String id, final IRichStateSpout stateSpout, final Number parallelism_hint) {
+	 * this.stormBuilder.setStateSpout(id, stateSpout, parallelism_hint);
+	 * }
+	 */
+
+	// for internal testing purpose only
+	StormTopology getStormTopology() {
+		return this.stormTopology;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/util/FiniteSpout.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/util/FiniteSpout.java b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/util/FiniteSpout.java
new file mode 100644
index 0000000..99c2583
--- /dev/null
+++ b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/util/FiniteSpout.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.storm.util;
+
+import backtype.storm.topology.IRichSpout;
+
+/**
+ * This interface represents a spout that emits a finite number of records. Common spouts emit infinite streams by
+ * default. To change this behavior and take advantage of Flink's finite-source capabilities, the spout should implement
+ * this interface.
+ */
+public interface FiniteSpout extends IRichSpout {
+
+	/**
+	 * When returns true, the spout has reached the end of the stream.
+	 *
+	 * @return true, if the spout's stream reached its end, false otherwise
+	 */
+	public boolean reachedEnd();
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/util/SplitStreamMapper.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/util/SplitStreamMapper.java b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/util/SplitStreamMapper.java
new file mode 100644
index 0000000..1fb5e02
--- /dev/null
+++ b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/util/SplitStreamMapper.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.storm.util;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.SplitStream;
+
+/**
+ * Strips {@link SplitStreamType}{@code <T>} away, ie, extracts the wrapped record of type {@code T}. Can be used to get
+ * a "clean" stream from a Spout/Bolt that declared multiple output streams (after the streams got separated using
+ * {@link DataStream#split(org.apache.flink.streaming.api.collector.selector.OutputSelector) .split(...)} and
+ * {@link SplitStream#select(String...) .select(...)}).
+ * 
+ * @param <T>
+ */
+public class SplitStreamMapper<T> implements MapFunction<SplitStreamType<T>, T> {
+	private static final long serialVersionUID = 3550359150160908564L;
+
+	@Override
+	public T map(SplitStreamType<T> value) throws Exception {
+		return value.value;
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/util/SplitStreamType.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/util/SplitStreamType.java b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/util/SplitStreamType.java
new file mode 100644
index 0000000..a4b5f8e
--- /dev/null
+++ b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/util/SplitStreamType.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.storm.util;
+
+import org.apache.flink.streaming.api.datastream.DataStream;
+
+/**
+ * Used by {@link org.apache.flink.storm.wrappers.AbstractStormCollector AbstractStormCollector} to wrap
+ * output tuples if multiple output streams are declared. For this case, the Flink output data stream must be split via
+ * {@link DataStream#split(org.apache.flink.streaming.api.collector.selector.OutputSelector) .split(...)} using
+ * {@link StormStreamSelector}.
+ */
+public class SplitStreamType<T> {
+
+	/** The stream ID this tuple belongs to. */
+	public String streamId;
+	/** The actual data value. */
+	public T value;
+
+	@Override
+	public String toString() {
+		return "<sid:" + this.streamId + ",v:" + this.value + ">";
+	}
+
+	@Override
+	public boolean equals(Object o) {
+		if (this == o) {
+			return true;
+		}
+		if (o == null || getClass() != o.getClass()) {
+			return false;
+		}
+		SplitStreamType<?> other = (SplitStreamType<?>) o;
+
+		return this.streamId.equals(other.streamId) && this.value.equals(other.value);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/util/SplitStreamTypeKeySelector.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/util/SplitStreamTypeKeySelector.java b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/util/SplitStreamTypeKeySelector.java
new file mode 100644
index 0000000..44c693c
--- /dev/null
+++ b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/util/SplitStreamTypeKeySelector.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.storm.util;
+
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.streaming.util.keys.KeySelectorUtil;
+import org.apache.flink.streaming.util.keys.KeySelectorUtil.ArrayKeySelector;
+
+/**
+ * {@link SplitStreamTypeKeySelector} is a specific grouping key selector for streams that are selected via
+ * {@link StormStreamSelector} from a Spout or Bolt that declares multiple output streams.
+ * 
+ * It extracts the wrapped {@link Tuple} type from the {@link SplitStreamType} tuples and applies a regular
+ * {@link ArrayKeySelector} on it.
+ */
+public class SplitStreamTypeKeySelector implements KeySelector<SplitStreamType<Tuple>, Tuple> {
+	private static final long serialVersionUID = 4672434660037669254L;
+
+	private final ArrayKeySelector<Tuple> selector;
+
+	public SplitStreamTypeKeySelector(int... fields) {
+		this.selector = new KeySelectorUtil.ArrayKeySelector<Tuple>(fields);
+	}
+
+	@Override
+	public Tuple getKey(SplitStreamType<Tuple> value) throws Exception {
+		return selector.getKey(value.value);
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/util/StormConfig.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/util/StormConfig.java b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/util/StormConfig.java
new file mode 100644
index 0000000..6550990
--- /dev/null
+++ b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/util/StormConfig.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.storm.util;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.flink.api.common.ExecutionConfig.GlobalJobParameters;
+
+import backtype.storm.Config;
+
+/**
+ * {@link StormConfig} is used to provide a user-defined Storm configuration (ie, a raw {@link Map} or {@link Config}
+ * object) for embedded Spouts and Bolts.
+ */
+@SuppressWarnings("rawtypes")
+public final class StormConfig extends GlobalJobParameters implements Map {
+	private static final long serialVersionUID = 8019519109673698490L;
+
+	/** Contains the actual configuration that is provided to Spouts and Bolts. */
+	private final Map config = new HashMap();
+
+	/**
+	 * Creates an empty configuration.
+	 */
+	public StormConfig() {
+	}
+
+	/**
+	 * Creates an configuration with initial values provided by the given {@code Map}.
+	 * 
+	 * @param config
+	 *            Initial values for this configuration.
+	 */
+	@SuppressWarnings("unchecked")
+	public StormConfig(Map config) {
+		this.config.putAll(config);
+	}
+
+
+	@Override
+	public int size() {
+		return this.config.size();
+	}
+
+	@Override
+	public boolean isEmpty() {
+		return this.config.isEmpty();
+	}
+
+	@Override
+	public boolean containsKey(Object key) {
+		return this.config.containsKey(key);
+	}
+
+	@Override
+	public boolean containsValue(Object value) {
+		return this.config.containsValue(value);
+	}
+
+	@Override
+	public Object get(Object key) {
+		return this.config.get(key);
+	}
+
+	@SuppressWarnings("unchecked")
+	@Override
+	public Object put(Object key, Object value) {
+		return this.config.put(key, value);
+	}
+
+	@Override
+	public Object remove(Object key) {
+		return this.config.remove(key);
+	}
+
+	@SuppressWarnings("unchecked")
+	@Override
+	public void putAll(Map m) {
+		this.config.putAll(m);
+	}
+
+	@Override
+	public void clear() {
+		this.config.clear();
+	}
+
+	@SuppressWarnings("unchecked")
+	@Override
+	public Set<Object> keySet() {
+		return this.config.keySet();
+	}
+
+	@SuppressWarnings("unchecked")
+	@Override
+	public Collection<Object> values() {
+		return this.config.values();
+	}
+
+	@SuppressWarnings("unchecked")
+	@Override
+	public Set<java.util.Map.Entry<Object, Object>> entrySet() {
+		return this.config.entrySet();
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/util/StormStreamSelector.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/util/StormStreamSelector.java b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/util/StormStreamSelector.java
new file mode 100644
index 0000000..d9f4178
--- /dev/null
+++ b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/util/StormStreamSelector.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.storm.util;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+
+import org.apache.flink.storm.api.FlinkTopologyBuilder;
+import org.apache.flink.streaming.api.collector.selector.OutputSelector;
+
+/**
+ * Used by {@link FlinkTopologyBuilder} to split multiple declared output streams within Flink.
+ */
+public final class StormStreamSelector<T> implements OutputSelector<SplitStreamType<T>> {
+	private static final long serialVersionUID = 2553423379715401023L;
+
+	/** internal cache to avoid short living ArrayList objects. */
+	private final HashMap<String, List<String>> streams = new HashMap<String, List<String>>();
+
+	@Override
+	public Iterable<String> select(SplitStreamType<T> value) {
+		String sid = value.streamId;
+		List<String> streamId = this.streams.get(sid);
+		if (streamId == null) {
+			streamId = new ArrayList<String>(1);
+			streamId.add(sid);
+			this.streams.put(sid, streamId);
+		}
+		return streamId;
+	}
+
+}
\ No newline at end of file


Mime
View raw message