flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mbala...@apache.org
Subject [02/27] flink git commit: [strom-compat] Added Storm API compatibility classes
Date Mon, 15 Jun 2015 09:32:52 GMT
[strom-compat] Added Storm API compatibility classes


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2bf76be1
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2bf76be1
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2bf76be1

Branch: refs/heads/master
Commit: 2bf76be1864b716ddcacca79ca01d28507fe8514
Parents: 118469d
Author: mjsax <mjsax@informatik.hu-berlin.de>
Authored: Thu May 14 12:13:20 2015 +0200
Committer: mbalassi <mbalassi@apache.org>
Committed: Sun Jun 14 22:58:31 2015 +0200

----------------------------------------------------------------------
 .../stormcompatibility/api/FlinkClient.java     | 321 +++++++++++++++++++
 .../api/FlinkLocalCluster.java                  | 133 ++++++++
 .../api/FlinkOutputFieldsDeclarer.java          | 171 ++++++++++
 .../stormcompatibility/api/FlinkSubmitter.java  | 245 ++++++++++++++
 .../stormcompatibility/api/FlinkTopology.java   | 112 +++++++
 .../api/FlinkTopologyBuilder.java               | 314 ++++++++++++++++++
 6 files changed, 1296 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/2bf76be1/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/api/FlinkClient.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/api/FlinkClient.java b/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/api/FlinkClient.java
new file mode 100644
index 0000000..4efe02f
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/api/FlinkClient.java
@@ -0,0 +1,321 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.stormcompatibility.api;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.client.program.Client;
+import org.apache.flink.client.program.JobWithJars;
+import org.apache.flink.client.program.ProgramInvocationException;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.client.JobStatusMessage;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobmanager.JobManager;
+import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.runtime.messages.JobManagerMessages.CancelJob;
+import org.apache.flink.runtime.messages.JobManagerMessages.RunningJobsStatus;
+
+import scala.Some;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.pattern.Patterns;
+import akka.util.Timeout;
+import backtype.storm.Config;
+import backtype.storm.generated.AlreadyAliveException;
+import backtype.storm.generated.InvalidTopologyException;
+import backtype.storm.generated.KillOptions;
+import backtype.storm.generated.Nimbus;
+import backtype.storm.generated.NotAliveException;
+import backtype.storm.generated.SubmitOptions;
+import backtype.storm.utils.NimbusClient;
+import backtype.storm.utils.Utils;
+
+
+
+
+
+/**
+ * {@link FlinkClient} mimics a Storm {@link NimbusClient} and {@link Nimbus}{@code .Client} at once, to interact with
+ * Flink's JobManager instead of Storm's Nimbus.
+ */
+public class FlinkClient {
+	/**
+	 * The jobmanager's host name.
+	 */
+	private final String jobManagerHost;
+	/**
+	 * The jobmanager's rpc port.
+	 */
+	private final int jobManagerPort;
+	/**
+	 * The user specified timeout in milliseconds.
+	 */
+	private final String timeout;
+	
+	
+	
+	/*
+	 * The following methods are derived from "backtype.storm.utils.NimbusClient"
+	 */
+	
+	/**
+	 * Instantiates a new {@link FlinkClient} for the given configuration, host name, and port. If values for
+	 * {@link Config#NIMBUS_HOST} and {@link Config#NIMBUS_THRIFT_PORT} of the given configuration are ignored.
+	 * 
+	 * @param conf
+	 *            A configuration.
+	 * @param host
+	 *            The jobmanager's host name.
+	 * @param port
+	 *            The jobmanager's rpc port.
+	 */
+	public FlinkClient(final Map<?, ?> conf, final String host, final int port) {
+		this(conf, host, port, null);
+	}
+	
+	/**
+	 * Instantiates a new {@link FlinkClient} for the given configuration, host name, and port. If values for
+	 * {@link Config#NIMBUS_HOST} and {@link Config#NIMBUS_THRIFT_PORT} of the given configuration are ignored.
+	 * 
+	 * @param conf
+	 *            A configuration.
+	 * @param host
+	 *            The jobmanager's host name.
+	 * @param port
+	 *            The jobmanager's rpc port.
+	 * @param timeout
+	 */
+	public FlinkClient(final Map<?, ?> conf, final String host, final int port, final Integer timeout) {
+		this.jobManagerHost = host;
+		this.jobManagerPort = port;
+		if(timeout != null) {
+			this.timeout = timeout + " ms";
+		} else {
+			this.timeout = null;
+		}
+	}
+	
+	
+	
+	/**
+	 * Returns a {@link FlinkClient} that uses the configured {@link Config#NIMBUS_HOST} and
+	 * {@link Config#NIMBUS_THRIFT_PORT} as JobManager address.
+	 * 
+	 * @param conf
+	 *            Configuration that contains the jobmanager's hostname and port.
+	 * 
+	 * @return A configured {@link FlinkClient}.
+	 */
+	public static FlinkClient getConfiguredClient(@SuppressWarnings("rawtypes") final Map conf) {
+		final String nimbusHost = (String)conf.get(Config.NIMBUS_HOST);
+		final int nimbusPort = Utils.getInt(conf.get(Config.NIMBUS_THRIFT_PORT)).intValue();
+		return new FlinkClient(conf, nimbusHost, nimbusPort);
+	}
+	
+	
+	
+	/**
+	 * Return a reference to itself.
+	 * 
+	 * {@link FlinkClient} mimics both, {@link NimbusClient} and {@link Nimbus}{@code .Client}, at once.
+	 * 
+	 * @return A reference to itself.
+	 */
+	public FlinkClient getClient() {
+		return this;
+	}
+	
+	public void close() {/* nothing to do */}
+	
+	
+	/*
+	 * The following methods are derived from "backtype.storm.generated.Nimubs.Client"
+	 */
+	
+	/**
+	 * Parameter {@code uploadedJarLocation} is actually used to point to the local jar, because Flink does not support
+	 * uploading a jar file before hand. Jar files are always uploaded directly when a program is submitted.
+	 */
+	public void submitTopology(final String name, final String uploadedJarLocation, final String jsonConf, final FlinkTopology topology)
+		throws AlreadyAliveException, InvalidTopologyException {
+		this.submitTopologyWithOpts(name, uploadedJarLocation, jsonConf, topology, null);
+	}
+	
+	/**
+	 * Parameter {@code uploadedJarLocation} is actually used to point to the local jar, because Flink does not support
+	 * uploading a jar file before hand. Jar files are always uploaded directly when a program is submitted.
+	 */
+	public void submitTopologyWithOpts(final String name, final String uploadedJarLocation, final String jsonConf, final FlinkTopology topology, final SubmitOptions options)
+		throws AlreadyAliveException, InvalidTopologyException {
+		
+		if(this.getTopologyJobId(name) != null) {
+			throw new AlreadyAliveException();
+		}
+		
+		final File uploadedJarFile = new File(uploadedJarLocation);
+		try {
+			JobWithJars.checkJarFile(uploadedJarFile);
+		} catch(final IOException e) {
+			throw new RuntimeException("Problem with jar file " + uploadedJarFile.getAbsolutePath(), e);
+		}
+		
+		final List<File> jarFiles = new ArrayList<File>();
+		jarFiles.add(uploadedJarFile);
+		
+		final JobGraph jobGraph = topology.getStreamGraph().getJobGraph(name);
+		jobGraph.addJar(new Path(uploadedJarFile.getAbsolutePath()));
+		
+		final Configuration configuration = jobGraph.getJobConfiguration();
+		
+		final Client client;
+		try {
+			client = new Client(new InetSocketAddress(this.jobManagerHost, this.jobManagerPort), configuration,
+				JobWithJars.buildUserCodeClassLoader(jarFiles, JobWithJars.class.getClassLoader()), -1);
+		} catch(final UnknownHostException e) {
+			throw new RuntimeException("Cannot execute job due to UnknownHostException", e);
+		}
+		
+		try {
+			client.run(jobGraph, false);
+		} catch(final ProgramInvocationException e) {
+			throw new RuntimeException("Cannot execute job due to ProgramInvocationException", e);
+		}
+	}
+	
+	public void killTopology(final String name) throws NotAliveException {
+		this.killTopologyWithOpts(name, null);
+	}
+	
+	public void killTopologyWithOpts(final String name, final KillOptions options) throws NotAliveException {
+		final JobID jobId = this.getTopologyJobId(name);
+		if(jobId == null) {
+			throw new NotAliveException();
+		}
+		
+		try {
+			final ActorRef jobManager = this.getJobManager();
+			
+			if(options != null) {
+				try {
+					Thread.sleep(1000 * options.get_wait_secs());
+				} catch(final InterruptedException e) {
+					throw new RuntimeException(e);
+				}
+			}
+			
+			final FiniteDuration askTimeout = this.getTimeout();
+			final Future<Object> response = Patterns.ask(jobManager, new CancelJob(jobId), new Timeout(askTimeout));
+			try {
+				Await.result(response, askTimeout);
+			} catch(final Exception e) {
+				throw new RuntimeException("Killing topology " + name + " with Flink job ID " + jobId + " failed.", e);
+			}
+		} catch(final IOException e) {
+			throw new RuntimeException("Could not connect to Flink JobManager with address " + this.jobManagerHost
+				+ ":" + this.jobManagerPort, e);
+		}
+	}
+	
+	/**
+	 * Package internal method to get a Flink {@link JobID} from a Storm topology name.
+	 * 
+	 * @param id
+	 *            The Storm topology name.
+	 * 
+	 * @return Flink's internally used {@link JobID}.
+	 */
+	JobID getTopologyJobId(final String id) {
+		final Configuration configuration = GlobalConfiguration.getConfiguration();
+		if(this.timeout != null) {
+			configuration.setString(ConfigConstants.AKKA_ASK_TIMEOUT, this.timeout);
+		}
+		
+		try {
+			final ActorRef jobManager = this.getJobManager();
+			
+			final FiniteDuration askTimeout = this.getTimeout();
+			final Future<Object> response = Patterns.ask(jobManager, JobManagerMessages.getRequestRunningJobsStatus(),
+				new Timeout(askTimeout));
+			
+			Object result;
+			try {
+				result = Await.result(response, askTimeout);
+			} catch(final Exception e) {
+				throw new RuntimeException("Could not retrieve running jobs from the JobManager.", e);
+			}
+			
+			if(result instanceof RunningJobsStatus) {
+				final List<JobStatusMessage> jobs = ((RunningJobsStatus)result).getStatusMessages();
+				
+				for(final JobStatusMessage status : jobs) {
+					if(status.getJobName().equals(id)) {
+						return status.getJobId();
+					}
+				}
+			} else {
+				throw new RuntimeException("ReqeustRunningJobs requires a response of type "
+					+ "RunningJobs. Instead the response is of type " + result.getClass() + ".");
+			}
+		} catch(final IOException e) {
+			throw new RuntimeException("Could not connect to Flink JobManager with address " + this.jobManagerHost
+				+ ":" + this.jobManagerPort, e);
+		}
+		
+		return null;
+	}
+	
+	private FiniteDuration getTimeout() {
+		final Configuration configuration = GlobalConfiguration.getConfiguration();
+		if(this.timeout != null) {
+			configuration.setString(ConfigConstants.AKKA_ASK_TIMEOUT, this.timeout);
+		}
+		
+		return AkkaUtils.getTimeout(configuration);
+	}
+	
+	private ActorRef getJobManager() throws IOException {
+		final Configuration configuration = GlobalConfiguration.getConfiguration();
+		
+		ActorSystem actorSystem;
+		try {
+			final scala.Tuple2<String, Object> systemEndpoint = new scala.Tuple2<String, Object>("", new Integer(0));
+			actorSystem = AkkaUtils.createActorSystem(configuration, new Some<scala.Tuple2<String, Object>>(
+				systemEndpoint));
+		} catch(final Exception e) {
+			throw new RuntimeException("Could not start actor system to communicate with JobManager", e);
+		}
+		
+		return JobManager.getJobManagerRemoteReference(new InetSocketAddress(this.jobManagerHost, this.jobManagerPort),
+			actorSystem, AkkaUtils.getLookupTimeout(configuration));
+	}
+	
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2bf76be1/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/api/FlinkLocalCluster.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/api/FlinkLocalCluster.java b/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/api/FlinkLocalCluster.java
new file mode 100644
index 0000000..fb88570
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/api/FlinkLocalCluster.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.stormcompatibility.api;
+
+import java.util.Map;
+
+import org.apache.flink.streaming.util.ClusterUtil;
+
+import backtype.storm.LocalCluster;
+import backtype.storm.generated.ClusterSummary;
+import backtype.storm.generated.KillOptions;
+import backtype.storm.generated.RebalanceOptions;
+import backtype.storm.generated.StormTopology;
+import backtype.storm.generated.SubmitOptions;
+import backtype.storm.generated.TopologyInfo;
+
+
+
+
+
+/**
+ * {@link FlinkLocalCluster} mimics a Storm {@link LocalCluster}.
+ */
+public class FlinkLocalCluster {
+	
+	public void submitTopology(final String topologyName, final Map<?, ?> conf, final FlinkTopology topology)
+		throws Exception {
+		this.submitTopologyWithOpts(topologyName, conf, topology, null);
+	}
+	
+	public void submitTopologyWithOpts(final String topologyName, final Map<?, ?> conf, final FlinkTopology topology, final SubmitOptions submitOpts)
+		throws Exception {
+		ClusterUtil
+			.startOnMiniCluster(topology.getStreamGraph().getJobGraph(topologyName), topology.getNumberOfTasks());
+	}
+	
+	public void killTopology(final String topologyName) {
+		this.killTopologyWithOpts(topologyName, null);
+	}
+	
+	public void killTopologyWithOpts(final String name, final KillOptions options) {
+		// TODO Auto-generated method stub
+	}
+	
+	public void activate(final String topologyName) {
+		// TODO Auto-generated method stub
+	}
+	
+	public void deactivate(final String topologyName) {
+		// TODO Auto-generated method stub
+	}
+	
+	public void rebalance(final String name, final RebalanceOptions options) {
+		// TODO Auto-generated method stub
+	}
+	
+	public void shutdown() {
+		ClusterUtil.stopOnMiniCluster();
+	}
+	
+	public String getTopologyConf(final String id) {
+		// TODO Auto-generated method stub
+		return null;
+	}
+	
+	public StormTopology getTopology(final String id) {
+		// TODO Auto-generated method stub
+		return null;
+	}
+	
+	public ClusterSummary getClusterInfo() {
+		// TODO Auto-generated method stub
+		return null;
+	}
+	
+	public TopologyInfo getTopologyInfo(final String id) {
+		// TODO Auto-generated method stub
+		return null;
+	}
+	
+	public Map<?, ?> getState() {
+		// TODO Auto-generated method stub
+		return null;
+	}
+	
+	
+	
+	// the following is used to set a different execution environment for ITCases
+	/**
+	 * A different {@link FlinkLocalCluster} to be used for execution.
+	 */
+	private static FlinkLocalCluster currentCluster = null;
+	
+	/**
+	 * Returns a {@link FlinkLocalCluster} that should be used for execution. If no cluster was set by
+	 * {@link #initialize(FlinkLocalCluster)} in advance, a new {@link FlinkLocalCluster} is returned.
+	 * 
+	 * @return a {@link FlinkLocalCluster} to be used for execution
+	 */
+	public static FlinkLocalCluster getLocalCluster() {
+		if(currentCluster == null) {
+			currentCluster = new FlinkLocalCluster();
+		}
+		
+		return currentCluster;
+	}
+	
+	/**
+	 * Sets a different {@link FlinkLocalCluster} to be used for execution.
+	 * 
+	 * @param cluster
+	 *            the {@link FlinkLocalCluster} to be used for execution
+	 */
+	public static void initialize(final FlinkLocalCluster cluster) {
+		currentCluster = cluster;
+	}
+	
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2bf76be1/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/api/FlinkOutputFieldsDeclarer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/api/FlinkOutputFieldsDeclarer.java b/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/api/FlinkOutputFieldsDeclarer.java
new file mode 100644
index 0000000..d371fc0
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/api/FlinkOutputFieldsDeclarer.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.stormcompatibility.api;
+
+import java.util.List;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+
+import backtype.storm.topology.IRichBolt;
+import backtype.storm.topology.IRichSpout;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.tuple.Fields;
+import backtype.storm.utils.Utils;
+
+
+
+
+
+/**
+ * {@link FlinkOutputFieldsDeclarer} is used to get the declared output schema of a {@link IRichSpout spout} or
+ * {@link IRichBolt bolt}.<br />
+ * <br />
+ * <strong>CAUTION: Currently, Flink does only support the default output stream. Furthermore, direct emit is not
+ * supported.</strong>
+ */
+final class FlinkOutputFieldsDeclarer implements OutputFieldsDeclarer {
+	private Fields outputSchema;
+	
+	@Override
+	public void declare(final Fields fields) {
+		this.declareStream(Utils.DEFAULT_STREAM_ID, false, fields);
+	}
+	
+	/**
+	 * {@inheritDoc}
+	 * 
+	 * Direct emit is no supported by Flink. Parameter {@code direct} must be {@code false}.
+	 * 
+	 * @throws UnsupportedOperationException
+	 *             if {@code direct} is {@code true}
+	 */
+	@Override
+	public void declare(final boolean direct, final Fields fields) {
+		this.declareStream(Utils.DEFAULT_STREAM_ID, direct, fields);
+	}
+	
+	/**
+	 * {@inheritDoc}
+	 * 
+	 * Currently, Flink only supports the default output stream. Thus, pareamter {@code streamId} must be equals to
+	 * {@link Utils#DEFAULT_STREAM_ID}.
+	 * 
+	 * @throws UnsupportedOperationException
+	 *             if {@code streamId} is not equal to {@link Utils#DEFAULT_STREAM_ID}
+	 */
+	@Override
+	public void declareStream(final String streamId, final Fields fields) {
+		this.declareStream(streamId, false, fields);
+	}
+	
+	/**
+	 * {@inheritDoc}
+	 * 
+	 * Currently, Flink only supports the default output stream. Thus, pareamter {@code streamId} must be equals to
+	 * {@link Utils#DEFAULT_STREAM_ID}. Furthermore, direct emit is no supported by Flink and parameter {@code direct}
+	 * must be {@code false}.
+	 * 
+	 * @throws UnsupportedOperationException
+	 *             if {@code streamId} is not equal to {@link Utils#DEFAULT_STREAM_ID} or {@code direct} is {@code true}
+	 */
+	@Override
+	public void declareStream(final String streamId, final boolean direct, final Fields fields) {
+		if(!Utils.DEFAULT_STREAM_ID.equals(streamId)) {
+			throw new UnsupportedOperationException("Currently, only the default output stream is supported by Flink");
+		}
+		if(direct) {
+			throw new UnsupportedOperationException("Direct emit is not supported by Flink");
+		}
+		
+		this.outputSchema = fields;
+	}
+	
+	/**
+	 * Returns {@link TypeInformation} for the declared output schema. If no or an empty output schema was declared,
+	 * {@code null} is returned.
+	 * 
+	 * @return output type information for the declared output schema; or {@code null} if no output schema was declared
+	 * 
+	 * @throws IllegalArgumentException
+	 *             if more then 25 attributes are declared
+	 */
+	public TypeInformation<?> getOutputType() throws IllegalArgumentException {
+		if((this.outputSchema == null) || (this.outputSchema.size() == 0)) {
+			return null;
+		}
+		
+		Tuple t;
+		final int numberOfAttributes = this.outputSchema.size();
+		
+		if(numberOfAttributes == 1) {
+			return TypeExtractor.getForClass(Object.class);
+		} else if(numberOfAttributes <= 25) {
+			try {
+				t = Tuple.getTupleClass(numberOfAttributes).newInstance();
+			} catch(final InstantiationException e) {
+				throw new RuntimeException(e);
+			} catch(final IllegalAccessException e) {
+				throw new RuntimeException(e);
+			}
+		} else {
+			throw new IllegalArgumentException("Flink supports only a maximum number of 25 attributes.");
+		}
+		
+		// TODO: declare only key fields as DefaultComparable
+		for(int i = 0; i < numberOfAttributes; ++i) {
+			t.setField(new DefaultComparable(), i);
+		}
+		
+		return TypeExtractor.getForObject(t);
+	}
+	
+	/**
+	 * {@link DefaultComparable} is a {@link Comparable} helper class that is used to get the correct
+	 * {@link TypeInformation} from {@link TypeExtractor} within {@link #getOutputType()}. If key fields are not
+	 * comparable, Flink cannot use them and will throw an exception.
+	 * 
+	 * @author mjsax
+	 */
+	private static class DefaultComparable implements Comparable<DefaultComparable> {
+		
+		public DefaultComparable() {}
+		
+		@Override
+		public int compareTo(final DefaultComparable o) {
+			return 0;
+		}
+	}
+	
+	/**
+	 * Computes the indexes within the declared output schema, for a list of given field-grouping attributes.
+	 * 
+	 * @return array of {@code int}s that contains the index without the output schema for each attribute in the given
+	 *         list
+	 */
+	public int[] getGroupingFieldIndexes(final List<String> groupingFields) {
+		final int[] fieldIndexes = new int[groupingFields.size()];
+		
+		for(int i = 0; i < fieldIndexes.length; ++i) {
+			fieldIndexes[i] = this.outputSchema.fieldIndex(groupingFields.get(i));
+		}
+		
+		return fieldIndexes;
+	}
+	
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2bf76be1/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/api/FlinkSubmitter.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/api/FlinkSubmitter.java b/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/api/FlinkSubmitter.java
new file mode 100644
index 0000000..efd0c46
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/api/FlinkSubmitter.java
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.stormcompatibility.api;
+
+import java.io.File;
+import java.util.Map;
+
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.client.program.ContextEnvironment;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.json.simple.JSONValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import backtype.storm.Config;
+import backtype.storm.StormSubmitter;
+import backtype.storm.generated.AlreadyAliveException;
+import backtype.storm.generated.InvalidTopologyException;
+import backtype.storm.generated.SubmitOptions;
+import backtype.storm.utils.Utils;
+
+
+
+
+
+/**
+ * {@link FlinkSubmitter} mimics a {@link StormSubmitter} to submit Storm topologies to a Flink cluster.
+ */
+public class FlinkSubmitter {
+	public static Logger logger = LoggerFactory.getLogger(FlinkSubmitter.class);
+	
+	/**
+	 * Submits a topology to run on the cluster. A topology runs forever or until explicitly killed.
+	 * 
+	 * 
+	 * @param name
+	 *            the name of the storm.
+	 * @param stormConf
+	 *            the topology-specific configuration. See {@link Config}.
+	 * @param topology
+	 *            the processing to execute.
+	 * @throws AlreadyAliveException
+	 *             if a topology with this name is already running
+	 * @throws InvalidTopologyException
+	 *             if an invalid topology was submitted
+	 */
+	public static void submitTopology(final String name, final Map<?, ?> stormConf, final FlinkTopology topology)
+		throws AlreadyAliveException, InvalidTopologyException {
+		submitTopology(name, stormConf, topology, (SubmitOptions)null, (FlinkProgressListener)null);
+	}
+	
+	/**
+	 * Submits a topology to run on the cluster. A topology runs forever or until explicitly killed.
+	 * 
+	 * @param name
+	 *            the name of the storm.
+	 * @param stormConf
+	 *            the topology-specific configuration. See {@link Config}.
+	 * @param topology
+	 *            the processing to execute.
+	 * @param opts
+	 *            to manipulate the starting of the topology.
+	 * @throws AlreadyAliveException
+	 *             if a topology with this name is already running
+	 * @throws InvalidTopologyException
+	 *             if an invalid topology was submitted
+	 */
+	public static void submitTopology(final String name, final Map<?, ?> stormConf, final FlinkTopology topology, final SubmitOptions opts)
+		throws AlreadyAliveException, InvalidTopologyException {
+		submitTopology(name, stormConf, topology, opts, (FlinkProgressListener)null);
+	}
+	
+	/**
+	 * Submits a topology to run on the cluster. A topology runs forever or until explicitly killed. The given
+	 * {@link FlinkProgressListener} is ignored because progress bars are not supported by Flink.
+	 * 
+	 * 
+	 * @param name
+	 *            the name of the storm.
+	 * @param stormConf
+	 *            the topology-specific configuration. See {@link Config}.
+	 * @param topology
+	 *            the processing to execute.
+	 * @param opts
+	 *            to manipulate the starting of the topology
+	 * @param progressListener
+	 *            to track the progress of the jar upload process
+	 * @throws AlreadyAliveException
+	 *             if a topology with this name is already running
+	 * @throws InvalidTopologyException
+	 *             if an invalid topology was submitted
+	 */
+	@SuppressWarnings({"unchecked", "rawtypes"})
+	public static void submitTopology(final String name, final Map stormConf, final FlinkTopology topology, final SubmitOptions opts, final FlinkProgressListener progressListener)
+		throws AlreadyAliveException, InvalidTopologyException {
+		if(!Utils.isValidConf(stormConf)) {
+			throw new IllegalArgumentException("Storm conf is not valid. Must be json-serializable");
+		}
+		
+		final Configuration flinkConfig = GlobalConfiguration.getConfiguration();
+		if(!stormConf.containsKey(Config.NIMBUS_HOST)) {
+			stormConf.put(Config.NIMBUS_HOST,
+				flinkConfig.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "localhost"));
+		}
+		if(!stormConf.containsKey(Config.NIMBUS_THRIFT_PORT)) {
+			stormConf.put(Config.NIMBUS_THRIFT_PORT,
+				new Integer(flinkConfig.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, 6123)));
+		}
+		
+		final String serConf = JSONValue.toJSONString(stormConf);
+		
+		final FlinkClient client = FlinkClient.getConfiguredClient(stormConf);
+		if(client.getTopologyJobId(name) != null) {
+			throw new RuntimeException("Topology with name `" + name + "` already exists on cluster");
+		}
+		String localJar = System.getProperty("storm.jar");
+		if(localJar == null) {
+			try {
+				for(final File file : ((ContextEnvironment)ExecutionEnvironment.getExecutionEnvironment()).getJars()) {
+					// should only be one jar file...
+					// TODO verify above assumption
+					localJar = file.getAbsolutePath();
+				}
+			} catch(final ClassCastException e) {
+				// ignore
+			}
+		}
+		try {
+			logger.info("Submitting topology " + name + " in distributed mode with conf " + serConf);
+			client.submitTopologyWithOpts(name, localJar, serConf, topology, opts);
+		} catch(final InvalidTopologyException e) {
+			logger.warn("Topology submission exception: " + e.get_msg());
+			throw e;
+		} catch(final AlreadyAliveException e) {
+			logger.warn("Topology already alive exception", e);
+			throw e;
+		} finally {
+			client.close();
+		}
+		
+		logger.info("Finished submitting topology: " + name);
+	}
+	
+	/**
+	 * Same as {@link #submitTopology(String, Map, FlinkTopology)}. Progress bars are not supported by Flink.
+	 * 
+	 * @param name
+	 *            the name of the storm.
+	 * @param stormConf
+	 *            the topology-specific configuration. See {@link Config}.
+	 * @param topology
+	 *            the processing to execute.
+	 * @throws AlreadyAliveException
+	 *             if a topology with this name is already running
+	 * @throws InvalidTopologyException
+	 *             if an invalid topology was submitted
+	 */
+	
+	public static void submitTopologyWithProgressBar(final String name, final Map<?, ?> stormConf, final FlinkTopology topology)
+		throws AlreadyAliveException, InvalidTopologyException {
+		submitTopologyWithProgressBar(name, stormConf, topology, null);
+	}
+	
+	/**
+	 * Same as {@link #submitTopology(String, Map, FlinkTopology, SubmitOptions)}. Progress bars are not supported by
+	 * Flink.
+	 * 
+	 * @param name
+	 *            the name of the storm.
+	 * @param stormConf
+	 *            the topology-specific configuration. See {@link Config}.
+	 * @param topology
+	 *            the processing to execute.
+	 * @param opts
+	 *            to manipulate the starting of the topology
+	 * @throws AlreadyAliveException
+	 *             if a topology with this name is already running
+	 * @throws InvalidTopologyException
+	 *             if an invalid topology was submitted
+	 */
+	
+	public static void submitTopologyWithProgressBar(final String name, final Map<?, ?> stormConf, final FlinkTopology topology, final SubmitOptions opts)
+		throws AlreadyAliveException, InvalidTopologyException {
+		submitTopology(name, stormConf, topology, opts, null);
+	}
+	
+	/**
+	 * In Flink, jar files are submitted directly when a program is started. Thus, this method does nothing. The
+	 * returned value is parameter localJar, because this give the best integration of Storm behavior within a Flink
+	 * environment.
+	 * 
+	 * @param conf
+	 *            the topology-specific configuration. See {@link Config}.
+	 * @param localJar
+	 *            file path of the jar file to submit
+	 * @return the value of parameter localJar
+	 */
+	public static String submitJar(@SuppressWarnings("rawtypes") final Map conf, final String localJar) {
+		return submitJar(conf, localJar, (FlinkProgressListener)null);
+	}
+	
+	/**
+	 * In Flink, jar files are submitted directly when a program is started. Thus, this method does nothing. The
+	 * returned value is parameter localJar, because this give the best integration of Storm behavior within a Flink
+	 * environment.
+	 * 
+	 * @param conf
+	 *            the topology-specific configuration. See {@link Config}.
+	 * @param localJar
+	 *            file path of the jar file to submit
+	 * @param listener
+	 *            progress listener to track the jar file upload
+	 * @return the value of parameter localJar
+	 */
+	public static String submitJar(@SuppressWarnings("rawtypes") final Map conf, final String localJar, final FlinkProgressListener listener) {
+		if(localJar == null) {
+			throw new RuntimeException(
+				"Must submit topologies using the 'storm' client script so that StormSubmitter knows which jar to upload.");
+		}
+		
+		return localJar;
+	}
+	
+	/**
+	 * Dummy interface use to track progress of file upload. Does not do anything. Kept for compatibility.
+	 */
+	public interface FlinkProgressListener { /* empty */}
+	
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2bf76be1/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/api/FlinkTopology.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/api/FlinkTopology.java b/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/api/FlinkTopology.java
new file mode 100644
index 0000000..5f4340d
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/api/FlinkTopology.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.stormcompatibility.api;
+
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+import backtype.storm.generated.StormTopology;
+
+
+
+
+
+/**
+ * {@link FlinkTopology} mimics a {@link StormTopology} and is implemented in terms of a
+ * {@link StreamExecutionEnvironment} . In contrast to a regular {@link StreamExecutionEnvironment}, a
+ * {@link FlinkTopology} cannot be executed directly, but must be handed over to a {@link FlinkLocalCluster},
+ * {@link FlinkSubmitter}, or {@link FlinkClient}.
+ */
+class FlinkTopology extends StreamExecutionEnvironment {
+	/**
+	 * The corresponding {@link StormTopology} that is mimiced by this {@link FlinkTopology}.
+	 */
+	private final StormTopology stormTopology;
+	/**
+	 * The number of declared tasks for the whole program (ie, sum over all dops)
+	 */
+	private int numberOfTasks = 0;
+	
+	
+	
+	/**
+	 * Instantiates a new {@link FlinkTestTopology}.
+	 */
+	public FlinkTopology(final StormTopology stormTopology) {
+		// set default parallelism to 1, to mirror Storm default behavior
+		super.setParallelism(1);
+		this.stormTopology = stormTopology;
+	}
+	
+	
+	
+	/**
+	 * Is not supported. In order to execute use {@link FlinkLocalCluster}, {@link FlinkSubmitter}, or
+	 * {@link FlinkClient}.
+	 * 
+	 * @throws UnsupportedOperationException
+	 *             at every invocation
+	 */
+	@Override
+	public JobExecutionResult execute() throws Exception {
+		throw new UnsupportedOperationException(
+			"A FlinkTopology cannot be executed directly. Use FlinkLocalCluster, FlinkSubmitter, or FlinkClient instead.");
+	}
+	
+	/**
+	 * Is not supported. In order to execute use {@link FlinkLocalCluster}, {@link FlinkSubmitter} or
+	 * {@link FlinkClient}.
+	 * 
+	 * @throws UnsupportedOperationException
+	 *             at every invocation
+	 */
+	@Override
+	public JobExecutionResult execute(final String jobName) throws Exception {
+		throw new UnsupportedOperationException(
+			"A FlinkTopology cannot be executed directly. Use FlinkLocalCluster, FlinkSubmitter, or FlinkClient instead.");
+	}
+	
+	/**
+	 * TODO
+	 */
+	public String getStormTopologyAsString() {
+		return this.stormTopology.toString();
+	}
+	
+	/**
+	 * Increased the number of declared tasks of this program by the given value.
+	 * 
+	 * @param dop
+	 *            The dop of a new operator that increases the number of overall tasks.
+	 */
+	public void increaseNumberOfTasks(final int dop) {
+		assert (dop > 0);
+		this.numberOfTasks += dop;
+	}
+	
+	
+	/**
+	 * Return the number or required tasks to execute this program.
+	 * 
+	 * @return the number or required tasks to execute this program
+	 */
+	public int getNumberOfTasks() {
+		return this.numberOfTasks;
+	}
+	
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2bf76be1/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/api/FlinkTopologyBuilder.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/api/FlinkTopologyBuilder.java b/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/api/FlinkTopologyBuilder.java
new file mode 100644
index 0000000..e8f3702
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/api/FlinkTopologyBuilder.java
@@ -0,0 +1,314 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.stormcompatibility.api;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.Set;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.stormcompatibility.wrappers.StormBoltWrapper;
+import org.apache.flink.stormcompatibility.wrappers.StormSpoutWrapper;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+
+import backtype.storm.generated.ComponentCommon;
+import backtype.storm.generated.GlobalStreamId;
+import backtype.storm.generated.Grouping;
+import backtype.storm.generated.StormTopology;
+import backtype.storm.topology.BasicBoltExecutor;
+import backtype.storm.topology.BoltDeclarer;
+import backtype.storm.topology.IBasicBolt;
+import backtype.storm.topology.IRichBolt;
+import backtype.storm.topology.IRichSpout;
+import backtype.storm.topology.IRichStateSpout;
+import backtype.storm.topology.SpoutDeclarer;
+import backtype.storm.topology.TopologyBuilder;
+
+
+
+
+
+/**
+ * {@link FlinkTopologyBuilder} mimics a {@link TopologyBuilder}, but builds a Flink program instead of a Storm
+ * topology. Most methods (except {@link #createTopology()} are copied from the original {@link TopologyBuilder}
+ * implementation to ensure equal behavior.<br />
+ * <br />
+ * <strong>CAUTION: {@link IRichStateSpout StateSpout}s and multiple output streams per spout/bolt are currently not
+ * supported.</strong>
+ */
+public class FlinkTopologyBuilder {
+	/**
+	 * A Storm {@link TopologyBuilder} to build a real Storm topology.
+	 */
+	private final TopologyBuilder stormBuilder = new TopologyBuilder();
+	/**
+	 * All user spouts by their ID.
+	 */
+	private final HashMap<String, IRichSpout> spouts = new HashMap<String, IRichSpout>();
+	/**
+	 * All user bolts by their ID.
+	 */
+	private final HashMap<String, IRichBolt> bolts = new HashMap<String, IRichBolt>();
+	
+	// TODO add StateSpout support (Storm 0.9.4 does not yet support StateSpouts itself)
+	
+	
+	
+	/**
+	 * Creates a Flink program that used the specified spouts and bolts.
+	 */
+	@SuppressWarnings({"rawtypes", "unchecked"})
+	public FlinkTopology createTopology() {
+		final StormTopology stormTopolgoy = this.stormBuilder.createTopology();
+		final FlinkTopology env = new FlinkTopology(stormTopolgoy);
+		env.setParallelism(1);
+		
+		final HashMap<String, SingleOutputStreamOperator> availableOperators = new HashMap<String, SingleOutputStreamOperator>();
+		
+		for(final Entry<String, IRichSpout> spout : this.spouts.entrySet()) {
+			final String spoutId = spout.getKey();
+			final IRichSpout userSpout = spout.getValue();
+			
+			final FlinkOutputFieldsDeclarer declarer = new FlinkOutputFieldsDeclarer();
+			userSpout.declareOutputFields(declarer);
+			
+			// TODO in order to support multiple output streams, use an additional wrapper (or modify StormSpoutWrapper
+			// and StormCollector)
+			// -> add an additional output attribute tagging the output stream, and use .split() and .select() to split
+			// the streams
+			final DataStreamSource source = env.addSource(new StormSpoutWrapper(userSpout), declarer.getOutputType());
+			availableOperators.put(spoutId, source);
+			
+			int dop = 1;
+			final ComponentCommon common = stormTopolgoy.get_spouts().get(spoutId).get_common();
+			if(common.is_set_parallelism_hint()) {
+				dop = common.get_parallelism_hint();
+				source.setParallelism(dop);
+			}
+			env.increaseNumberOfTasks(dop);
+		}
+		
+		
+		
+		final HashMap<String, IRichBolt> unprocessedBolts = new HashMap<String, IRichBolt>();
+		unprocessedBolts.putAll(this.bolts);
+		
+		final HashMap<String, Set<Entry<GlobalStreamId, Grouping>>> unprocessdInputsPerBolt = new HashMap<String, Set<Entry<GlobalStreamId, Grouping>>>();
+		
+		// because we do not know the order in which an iterator steps over a set, we might process a consumer before
+		// its producer
+		// -> thus, we might need to repeat multiple times
+		while(unprocessedBolts.size() > 0) {
+			
+			final Iterator<Entry<String, IRichBolt>> boltsIterator = unprocessedBolts.entrySet().iterator();
+			while(boltsIterator.hasNext()) {
+				
+				final Entry<String, IRichBolt> bolt = boltsIterator.next();
+				final String boltId = bolt.getKey();
+				final IRichBolt userBolt = bolt.getValue();
+				
+				final FlinkOutputFieldsDeclarer declarer = new FlinkOutputFieldsDeclarer();
+				userBolt.declareOutputFields(declarer);
+				
+				final ComponentCommon common = stormTopolgoy.get_bolts().get(boltId).get_common();
+				
+				Set<Entry<GlobalStreamId, Grouping>> unprocessedInputs = unprocessdInputsPerBolt.get(boltId);
+				if(unprocessedInputs == null) {
+					unprocessedInputs = new HashSet<Entry<GlobalStreamId, Grouping>>();
+					unprocessedInputs.addAll(common.get_inputs().entrySet());
+					unprocessdInputsPerBolt.put(boltId, unprocessedInputs);
+				}
+				
+				// connect each available producer to the current bolt
+				final Iterator<Entry<GlobalStreamId, Grouping>> inputStreamsIterator = unprocessedInputs.iterator();
+				while(inputStreamsIterator.hasNext()) {
+					
+					final Entry<GlobalStreamId, Grouping> inputStream = inputStreamsIterator.next();
+					final String producerId = inputStream.getKey().get_componentId();
+					
+					DataStream<?> inputDataStream = availableOperators.get(producerId);
+					
+					if(inputDataStream != null) { // if producer was processed already
+						final Grouping grouping = inputStream.getValue();
+						if(grouping.is_set_shuffle()) {
+							// Storm uses a round-robin shuffle strategy
+							inputDataStream = inputDataStream.distribute();
+						} else if(grouping.is_set_fields()) {
+							// global grouping is emulated in Storm via an empty fields grouping list
+							final List<String> fields = grouping.get_fields();
+							if(fields.size() > 0) {
+								inputDataStream = inputDataStream.groupBy(declarer.getGroupingFieldIndexes(grouping
+									.get_fields()));
+							} else {
+								inputDataStream = inputDataStream.global();
+							}
+						} else if(grouping.is_set_all()) {
+							inputDataStream = inputDataStream.broadcast();
+						} else if(grouping.is_set_local_or_shuffle()) {
+							// nothing to do
+						} else {
+							throw new UnsupportedOperationException(
+								"Flink only supports (local-or-)shuffle, fields, all, and global grouping");
+						}
+						
+						final TypeInformation<?> outType = declarer.getOutputType();
+						
+						@SuppressWarnings("null")
+						final SingleOutputStreamOperator operator = inputDataStream.transform(boltId, outType,
+							new StormBoltWrapper(userBolt));
+						if(outType != null) { // only for non-sink nodes
+							availableOperators.put(boltId, operator);
+						}
+						
+						int dop = 1;
+						if(common.is_set_parallelism_hint()) {
+							dop = common.get_parallelism_hint();
+							operator.setParallelism(dop);
+						}
+						env.increaseNumberOfTasks(dop);
+						
+						inputStreamsIterator.remove();
+					}
+				}
+				
+				if(unprocessedInputs.size() == 0) { // all inputs are connected; processing bolt completed
+					boltsIterator.remove();
+				}
+			}
+		}
+		return env;
+	}
+	
+	/**
+	 * Define a new bolt in this topology with parallelism of just one thread.
+	 * 
+	 * @param id
+	 *            the id of this component. This id is referenced by other components that want to consume this bolt's
+	 *            outputs.
+	 * @param bolt
+	 *            the bolt
+	 * 
+	 * @return use the returned object to declare the inputs to this component
+	 */
+	public BoltDeclarer setBolt(final String id, final IRichBolt bolt) {
+		return this.setBolt(id, bolt, null);
+	}
+	
+	/**
+	 * Define a new bolt in this topology with the specified amount of parallelism.
+	 * 
+	 * @param id
+	 *            the id of this component. This id is referenced by other components that want to consume this bolt's
+	 *            outputs.
+	 * @param bolt
+	 *            the bolt
+	 * @param parallelism_hint
+	 *            the number of tasks that should be assigned to execute this bolt. Each task will run on a thread in a
+	 *            process somewhere around the cluster.
+	 * 
+	 * @return use the returned object to declare the inputs to this component
+	 */
+	public BoltDeclarer setBolt(final String id, final IRichBolt bolt, final Number parallelism_hint) {
+		final BoltDeclarer declarer = this.stormBuilder.setBolt(id, bolt, parallelism_hint);
+		this.bolts.put(id, bolt);
+		return declarer;
+	}
+	
+	/**
+	 * Define a new bolt in this topology. This defines a basic bolt, which is a simpler to use but more restricted kind
+	 * of bolt. Basic bolts are intended for non-aggregation processing and automate the anchoring/acking process to
+	 * achieve proper reliability in the topology.
+	 * 
+	 * @param id
+	 *            the id of this component. This id is referenced by other components that want to consume this bolt's
+	 *            outputs.
+	 * @param bolt
+	 *            the basic bolt
+	 * 
+	 * @return use the returned object to declare the inputs to this component
+	 */
+	public BoltDeclarer setBolt(final String id, final IBasicBolt bolt) {
+		return this.setBolt(id, bolt, null);
+	}
+	
+	/**
+	 * Define a new bolt in this topology. This defines a basic bolt, which is a simpler to use but more restricted kind
+	 * of bolt. Basic bolts are intended for non-aggregation processing and automate the anchoring/acking process to
+	 * achieve proper reliability in the topology.
+	 * 
+	 * @param id
+	 *            the id of this component. This id is referenced by other components that want to consume this bolt's
+	 *            outputs.
+	 * @param bolt
+	 *            the basic bolt
+	 * @param parallelism_hint
+	 *            the number of tasks that should be assigned to execute this bolt. Each task will run on a thread in a
+	 *            process somwehere around the cluster.
+	 * 
+	 * @return use the returned object to declare the inputs to this component
+	 */
+	public BoltDeclarer setBolt(final String id, final IBasicBolt bolt, final Number parallelism_hint) {
+		return this.setBolt(id, new BasicBoltExecutor(bolt), parallelism_hint);
+	}
+	
+	/**
+	 * Define a new spout in this topology.
+	 * 
+	 * @param id
+	 *            the id of this component. This id is referenced by other components that want to consume this spout's
+	 *            outputs.
+	 * @param spout
+	 *            the spout
+	 */
+	public SpoutDeclarer setSpout(final String id, final IRichSpout spout) {
+		return this.setSpout(id, spout, null);
+	}
+	
+	/**
+	 * Define a new spout in this topology with the specified parallelism. If the spout declares itself as
+	 * non-distributed, the parallelism_hint will be ignored and only one task will be allocated to this component.
+	 * 
+	 * @param id
+	 *            the id of this component. This id is referenced by other components that want to consume this spout's
+	 *            outputs.
+	 * @param parallelism_hint
+	 *            the number of tasks that should be assigned to execute this spout. Each task will run on a thread in a
+	 *            process somwehere around the cluster.
+	 * @param spout
+	 *            the spout
+	 */
+	public SpoutDeclarer setSpout(final String id, final IRichSpout spout, final Number parallelism_hint) {
+		final SpoutDeclarer declarer = this.stormBuilder.setSpout(id, spout, parallelism_hint);
+		this.spouts.put(id, spout);
+		return declarer;
+	}
+	
+	// not implemented by Storm 0.9.4
+	// public void setStateSpout(final String id, final IRichStateSpout stateSpout) {
+	// this.stormBuilder.setStateSpout(id, stateSpout);
+	// }
+	// public void setStateSpout(final String id, final IRichStateSpout stateSpout, final Number parallelism_hint) {
+	// this.stormBuilder.setStateSpout(id, stateSpout, parallelism_hint);
+	// }
+	
+}


Mime
View raw message