flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mbala...@apache.org
Subject [25/27] flink git commit: [storm-compat] Moved Storm-compatibility to flink-contrib and split flink-contrib into small sub-projects
Date Mon, 15 Jun 2015 09:33:15 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/12b13f9c/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormBoltWrapper.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormBoltWrapper.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormBoltWrapper.java
new file mode 100644
index 0000000..f7ed8c5
--- /dev/null
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormBoltWrapper.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.stormcompatibility.wrappers;
+
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.IRichBolt;
+
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.api.java.tuple.Tuple25;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
+
+
+
+
+
+/**
+ * A {@link StormBoltWrapper} wraps an {@link IRichBolt} in order to execute the Storm bolt within a Flink Streaming
+ * program. It takes the Flink input tuples of type {@code IN} and transforms them into {@link StormTuple}s that the
+ * bolt can process. Furthermore, it takes the bolt's output tuples and transforms them into Flink tuples of type
+ * {@code OUT} (see {@link AbstractStormCollector} for supported types).<br />
+ * <br />
+ * <strong>CAUTION: currently, only simple bolts are supported! (ie, bolts that do not use the Storm configuration
+ * <code>Map</code> or <code>TopologyContext</code> that is provided by the bolt's <code>open(..)</code> method.
+ * Furthermore, acking and failing of tuples as well as accessing tuple attributes by field names is not supported so
+ * far.</strong>
+ */
+public class StormBoltWrapper<IN, OUT> extends AbstractStreamOperator<OUT> implements OneInputStreamOperator<IN, OUT> {
+	private static final long serialVersionUID = -4788589118464155835L;
+
+	// The wrapped Storm {@link IRichBolt bolt}
+	private final IRichBolt bolt;
+	// Number of attributes of the bolt's output tuples
+	private final int numberOfAttributes;
+
+	/**
+	 * Instantiates a new {@link StormBoltWrapper} that wraps the given Storm {@link IRichBolt bolt}
+	 * such that it can be used within a Flink streaming program. The output type will be one of
+	 * {@link Tuple1} to {@link Tuple25} depending on the bolt's declared number of attributes.
+	 * 
+	 * @param bolt
+	 * 		The Storm {@link IRichBolt bolt} to be used.
+	 * @throws IllegalArgumentException
+	 * 		If the number of declared output attributes is not with range [1;25].
+	 */
+	public StormBoltWrapper(final IRichBolt bolt) throws IllegalArgumentException {
+		this(bolt, false);
+	}
+
+	/**
+	 * Instantiates a new {@link StormBoltWrapper} that wraps the given Storm {@link IRichBolt bolt}
+	 * such that it can be used within a Flink streaming program. The output type can be any type if
+	 * parameter {@code rawOutput} is {@code true} and the bolt's number of declared output tuples
+	 * is 1. If {@code rawOutput} is {@code false} the output type will be one of {@link Tuple1} to
+	 * {@link Tuple25} depending on the bolt's declared number of attributes.
+	 * 
+	 * @param bolt
+	 * 		The Storm {@link IRichBolt bolt} to be used.
+	 * @param rawOutput
+	 * 		Set to {@code true} if a single attribute output stream, should not be of type
+	 * 		{@link Tuple1} but be of a raw type.
+	 * @throws IllegalArgumentException
+	 * 		If {@code rawOuput} is {@code true} and the number of declared output attributes is
+	 * 		not 1 or if {@code rawOuput} is {@code false} and the number of declared output
+	 * 		attributes is not with range [1;25].
+	 */
+	public StormBoltWrapper(final IRichBolt bolt, final boolean rawOutput) throws IllegalArgumentException {
+		this.bolt = bolt;
+		this.numberOfAttributes = StormWrapperSetupHelper.getNumberOfAttributes(bolt, rawOutput);
+	}
+
+	@Override
+	public void open(final Configuration parameters) throws Exception {
+		super.open(parameters);
+
+		final TopologyContext topologyContext = StormWrapperSetupHelper.convertToTopologyContext(
+				(StreamingRuntimeContext)super.runtimeContext, false);
+		OutputCollector stormCollector = null;
+
+		if (this.numberOfAttributes != -1) {
+			stormCollector = new OutputCollector(new StormBoltCollector<OUT>(
+					this.numberOfAttributes, super.output));
+		}
+
+		this.bolt.prepare(null, topologyContext, stormCollector);
+	}
+
+	@Override
+	public void close() throws Exception {
+		super.close();
+		this.bolt.cleanup();
+	}
+
+	@Override
+	public void processElement(final IN element) throws Exception {
+		this.bolt.execute(new StormTuple<IN>(element));
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/12b13f9c/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormFiniteSpoutWrapper.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormFiniteSpoutWrapper.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormFiniteSpoutWrapper.java
new file mode 100644
index 0000000..6cf8c1b
--- /dev/null
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormFiniteSpoutWrapper.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.stormcompatibility.wrappers;
+
+import backtype.storm.topology.IRichSpout;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.api.java.tuple.Tuple25;
+
+/**
+ * A {@link StormFiniteSpoutWrapper} is an {@link AbstractStormSpoutWrapper} that calls
+ * {@link IRichSpout#nextTuple() nextTuple()} for finite number of times before
+ * {@link #run(org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext)}
+ * returns. The number of {@code nextTuple()} calls can be specified as a certain number of
+ * invocations or can be undefined. In the undefined case, the {@code run(...)} method return if no
+ * record was emitted to the output collector for the first time.
+ */
+public class StormFiniteSpoutWrapper<OUT> extends AbstractStormSpoutWrapper<OUT> {
+	private static final long serialVersionUID = 3883246587044801286L;
+
+	// The number of {@link IRichSpout#nextTuple()} calls
+	private int numberOfInvocations;
+
+	/**
+	 * Instantiates a new {@link StormFiniteSpoutWrapper} that calls the {@link IRichSpout#nextTuple() nextTuple()}
+	 * method of the given Storm {@link IRichSpout spout} as long as it emits records to the output collector. The
+	 * output type will be one of {@link Tuple1} to {@link Tuple25} depending on the spout's declared number of
+	 * attributes.
+	 *
+	 * @param spout
+	 * 		The Storm {@link IRichSpout spout} to be used.
+	 * @throws IllegalArgumentException
+	 * 		If the number of declared output attributes is not with range [1;25].
+	 */
+	public StormFiniteSpoutWrapper(final IRichSpout spout) throws IllegalArgumentException {
+		this(spout, false, -1);
+	}
+
+	/**
+	 * Instantiates a new {@link StormFiniteSpoutWrapper} that calls the {@link IRichSpout#nextTuple() nextTuple()}
+	 * method of the given Storm {@link IRichSpout spout} {@code numberOfInvocations} times. The output type will be
+	 * one
+	 * of {@link Tuple1} to {@link Tuple25} depending on the spout's declared number of attributes.
+	 *
+	 * @param spout
+	 * 		The Storm {@link IRichSpout spout} to be used.
+	 * @param numberOfInvocations
+	 * 		The number of calls to {@link IRichSpout#nextTuple()}.
+	 * @throws IllegalArgumentException
+	 * 		If the number of declared output attributes is not with range [1;25].
+	 */
+	public StormFiniteSpoutWrapper(final IRichSpout spout, final int numberOfInvocations)
+			throws IllegalArgumentException {
+		this(spout, false, numberOfInvocations);
+	}
+
+	/**
+	 * Instantiates a new {@link StormFiniteSpoutWrapper} that calls the {@link IRichSpout#nextTuple() nextTuple()}
+	 * method of the given Storm {@link IRichSpout spout} as long as it emits records to the output collector. The
+	 * output type can be any type if parameter {@code rawOutput} is {@code true} and the spout's number of declared
+	 * output tuples is 1. If {@code rawOutput} is {@code false} the output type will be one of {@link Tuple1} to
+	 * {@link Tuple25} depending on the spout's declared number of attributes.
+	 *
+	 * @param spout
+	 * 		The Storm {@link IRichSpout spout} to be used.
+	 * @param rawOutput
+	 * 		Set to {@code true} if a single attribute output stream, should not be of type {@link Tuple1} but be
+	 * 		of a raw type.
+	 * @throws IllegalArgumentException
+	 * 		If {@code rawOuput} is {@code true} and the number of declared output attributes is not 1 or if
+	 * 		{@code rawOuput} is {@code false} and the number of declared output attributes is not with range
+	 * 		[1;25].
+	 */
+	public StormFiniteSpoutWrapper(final IRichSpout spout, final boolean rawOutput) throws IllegalArgumentException {
+		this(spout, rawOutput, -1);
+	}
+
+	/**
+	 * Instantiates a new {@link StormFiniteSpoutWrapper} that calls the {@link IRichSpout#nextTuple() nextTuple()}
+	 * method of the given Storm {@link IRichSpout spout} {@code numberOfInvocations} times. The output type can be any
+	 * type if parameter {@code rawOutput} is {@code true} and the spout's number of declared output tuples is 1. If
+	 * {@code rawOutput} is {@code false} the output type will be one of {@link Tuple1} to {@link Tuple25} depending on
+	 * the spout's declared number of attributes.
+	 *
+	 * @param spout
+	 * 		The Storm {@link IRichSpout spout} to be used.
+	 * @param rawOutput
+	 * 		Set to {@code true} if a single attribute output stream, should not be of type {@link Tuple1} but be
+	 * 		of a raw type.
+	 * @param numberOfInvocations
+	 * 		The number of calls to {@link IRichSpout#nextTuple()}.
+	 * @throws IllegalArgumentException
+	 * 		If {@code rawOuput} is {@code true} and the number of declared output attributes is not 1 or if
+	 * 		{@code rawOuput} is {@code false} and the number of declared output attributes is not with range
+	 * 		[1;25].
+	 */
+	public StormFiniteSpoutWrapper(final IRichSpout spout, final boolean rawOutput, final int numberOfInvocations)
+			throws IllegalArgumentException {
+		super(spout, rawOutput);
+		this.numberOfInvocations = numberOfInvocations;
+	}
+
+	/**
+	 * Calls {@link IRichSpout#nextTuple()} for the given number of times.
+	 */
+	@Override
+	protected void execute() {
+		if (this.numberOfInvocations >= 0) {
+			while ((--this.numberOfInvocations >= 0) && super.isRunning) {
+				super.spout.nextTuple();
+			}
+		} else {
+			do {
+				super.collector.tupleEmitted = false;
+				super.spout.nextTuple();
+			} while (super.collector.tupleEmitted && super.isRunning);
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/12b13f9c/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormOutputFieldsDeclarer.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormOutputFieldsDeclarer.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormOutputFieldsDeclarer.java
new file mode 100644
index 0000000..c486237
--- /dev/null
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormOutputFieldsDeclarer.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.stormcompatibility.wrappers;
+
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.tuple.Fields;
+import backtype.storm.utils.Utils;
+
+/**
+ * {@link StormOutputFieldsDeclarer} is used by {@link StormWrapperSetupHelper} to determine the
+ * number of attributes declared by the wrapped spout's or bolt's {@code declare(...)} method.
+ */
+class StormOutputFieldsDeclarer implements OutputFieldsDeclarer {
+
+	// The output schema declared by the wrapped bolt.
+	private Fields outputSchema = null;
+
+	@Override
+	public void declare(final Fields fields) {
+		this.declareStream(Utils.DEFAULT_STREAM_ID, false, fields);
+	}
+
+	@Override
+	public void declare(final boolean direct, final Fields fields) {
+		this.declareStream(Utils.DEFAULT_STREAM_ID, direct, fields);
+	}
+
+	@Override
+	public void declareStream(final String streamId, final Fields fields) {
+		this.declareStream(streamId, false, fields);
+	}
+
+	@Override
+	public void declareStream(final String streamId, final boolean direct, final Fields fields) {
+		if (!Utils.DEFAULT_STREAM_ID.equals(streamId)) {
+			throw new UnsupportedOperationException("Currently, only the default output stream is supported by Flink");
+		}
+		if (direct) {
+			throw new UnsupportedOperationException("Direct emit is not supported by Flink");
+		}
+
+		this.outputSchema = fields;
+	}
+
+	/**
+	 * Returns the number of attributes of the output schema declare by the wrapped bolt. If no output schema is
+	 * declared (eg, for sink bolts), {@code -1} is returned.
+	 *
+	 * @return the number of attributes of the output schema declare by the wrapped bolt
+	 */
+	public int getNumberOfAttributes() {
+		if (this.outputSchema != null) {
+			return this.outputSchema.size();
+		}
+
+		return -1;
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/12b13f9c/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormSpoutCollector.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormSpoutCollector.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormSpoutCollector.java
new file mode 100644
index 0000000..01c980a
--- /dev/null
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormSpoutCollector.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.stormcompatibility.wrappers;
+
+import backtype.storm.spout.ISpoutOutputCollector;
+
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.api.java.tuple.Tuple25;
+import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
+import java.util.List;
+
+/**
+ * A {@link StormSpoutCollector} is used by {@link AbstractStormSpoutWrapper} to provided an Storm
+ * compatible output collector to the wrapped spout. It transforms the emitted Storm tuples into
+ * Flink tuples and emits them via the provide {@link SourceContext} object.
+ */
+class StormSpoutCollector<OUT> extends AbstractStormCollector<OUT> implements ISpoutOutputCollector {
+
+	// The Flink source context object
+	private final SourceContext<OUT> flinkContext;
+
+	/**
+	 * Instantiates a new {@link StormSpoutCollector} that emits Flink tuples to the given Flink
+	 * source context. If the number of attributes is specified as zero, any output type is
+	 * supported. If the number of attributes is between 1 to 25, the output type is {@link Tuple1}
+	 * to {@link Tuple25}.
+	 * 
+	 * @param numberOfAttributes
+	 *        The number of attributes of the emitted tuples.
+	 * @param flinkContext
+	 *        The Flink source context to be used.
+	 * @throws UnsupportedOperationException
+	 *         if the specified number of attributes is not in the valid range of [0,25]
+	 */
+	public StormSpoutCollector(final int numberOfAttributes, final SourceContext<OUT> flinkContext) throws UnsupportedOperationException {
+		super(numberOfAttributes);
+		assert (flinkContext != null);
+		this.flinkContext = flinkContext;
+	}
+
+	@Override
+	protected List<Integer> doEmit(final OUT flinkTuple) {
+		this.flinkContext.collect(flinkTuple);
+		// TODO
+		return null;
+	}
+
+	@Override
+	public void reportError(final Throwable error) {
+		// not sure, if Flink can support this
+		throw new UnsupportedOperationException("Not implemented yet");
+	}
+
+	@Override
+	public List<Integer> emit(final String streamId, final List<Object> tuple, final Object messageId) {
+		return this.tansformAndEmit(tuple);
+	}
+
+
+	@Override
+	public void emitDirect(final int taskId, final String streamId, final List<Object> tuple, final Object messageId) {
+		throw new UnsupportedOperationException("Direct emit is not supported by Flink");
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/12b13f9c/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormSpoutWrapper.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormSpoutWrapper.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormSpoutWrapper.java
new file mode 100644
index 0000000..f5e0733
--- /dev/null
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormSpoutWrapper.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.stormcompatibility.wrappers;
+
+import backtype.storm.topology.IRichSpout;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.api.java.tuple.Tuple25;
+
+/**
+ * A {@link StormSpoutWrapper} is an {@link AbstractStormSpoutWrapper} that calls the wrapped spout's
+ * {@link IRichSpout#nextTuple() nextTuple()} method in in infinite loop.
+ */
+public class StormSpoutWrapper<OUT> extends AbstractStormSpoutWrapper<OUT> {
+	private static final long serialVersionUID = -218340336648247605L;
+
+	/**
+	 * Instantiates a new {@link StormSpoutWrapper} that wraps the given Storm {@link IRichSpout spout} such that it
+	 * can
+	 * be used within a Flink streaming program. The output type will be one of {@link Tuple1} to {@link Tuple25}
+	 * depending on the spout's declared number of attributes.
+	 *
+	 * @param spout
+	 * 		The Storm {@link IRichSpout spout} to be used.
+	 * @throws IllegalArgumentException
+	 * 		If the number of declared output attributes is not with range [1;25].
+	 */
+	public StormSpoutWrapper(final IRichSpout spout) throws IllegalArgumentException {
+		super(spout, false);
+	}
+
+	/**
+	 * Instantiates a new {@link StormSpoutWrapper} that wraps the given Storm {@link IRichSpout spout} such that it
+	 * can
+	 * be used within a Flink streaming program. The output type can be any type if parameter {@code rawOutput} is
+	 * {@code true} and the spout's number of declared output tuples is 1. If {@code rawOutput} is {@code false} the
+	 * output type will be one of {@link Tuple1} to {@link Tuple25} depending on the spout's declared number of
+	 * attributes.
+	 *
+	 * @param spout
+	 * 		The Storm {@link IRichSpout spout} to be used.
+	 * @param rawOutput
+	 * 		Set to {@code true} if a single attribute output stream, should not be of type {@link Tuple1} but be
+	 * 		of a raw type.
+	 * @throws IllegalArgumentException
+	 * 		If {@code rawOuput} is {@code true} and the number of declared output attributes is not 1 or if
+	 * 		{@code rawOuput} is {@code false} and the number of declared output attributes is not with range
+	 * 		[1;25].
+	 */
+	public StormSpoutWrapper(final IRichSpout spout, final boolean rawOutput) throws IllegalArgumentException {
+		super(spout, rawOutput);
+	}
+
+	/**
+	 * Calls {@link IRichSpout#nextTuple()} in an infinite loop until {@link #cancel()} is called.
+	 */
+	@Override
+	protected void execute() {
+		while (super.isRunning) {
+			super.spout.nextTuple();
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/12b13f9c/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormTuple.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormTuple.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormTuple.java
new file mode 100644
index 0000000..3fb1b06
--- /dev/null
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormTuple.java
@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.stormcompatibility.wrappers;
+
+/*
+ * We do neither import
+ * 		backtype.storm.tuple.Tuple;
+ * nor
+ * 		org.apache.flink.api.java.tuple.Tuple
+ * to avoid confusion
+ */
+
+import backtype.storm.generated.GlobalStreamId;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.MessageId;
+import backtype.storm.tuple.Values;
+
+import java.util.List;
+
+/**
+ * {@link StormTuple} converts a Flink tuple of type {@code IN} into a Storm tuple.
+ */
+class StormTuple<IN> implements backtype.storm.tuple.Tuple {
+
+	// The storm representation of the original Flink tuple
+	private final Values stormTuple;
+
+	/**
+	 * Create a new Storm tuple from the given Flink tuple.
+	 *
+	 * @param flinkTuple
+	 * 		The Flink tuple to be converted.
+	 */
+	public StormTuple(final IN flinkTuple) {
+		if (flinkTuple instanceof org.apache.flink.api.java.tuple.Tuple) {
+			final org.apache.flink.api.java.tuple.Tuple t = (org.apache.flink.api.java.tuple.Tuple) flinkTuple;
+
+			final int numberOfAttributes = t.getArity();
+			this.stormTuple = new Values();
+			for (int i = 0; i < numberOfAttributes; ++i) {
+				this.stormTuple.add(t.getField(i));
+			}
+		} else {
+			this.stormTuple = new Values(flinkTuple);
+		}
+	}
+
+	@Override
+	public int size() {
+		return this.stormTuple.size();
+	}
+
+	@Override
+	public boolean contains(final String field) {
+		throw new UnsupportedOperationException("Not implemented yet");
+	}
+
+	@Override
+	public Fields getFields() {
+		throw new UnsupportedOperationException("Not implemented yet");
+	}
+
+	@Override
+	public int fieldIndex(final String field) {
+		throw new UnsupportedOperationException("Not implemented yet");
+	}
+
+	@Override
+	public List<Object> select(final Fields selector) {
+		throw new UnsupportedOperationException("Not implemented yet");
+	}
+
+	@Override
+	public Object getValue(final int i) {
+		return this.stormTuple.get(i);
+	}
+
+	@Override
+	public String getString(final int i) {
+		return (String) this.stormTuple.get(i);
+	}
+
+	@Override
+	public Integer getInteger(final int i) {
+		return (Integer) this.stormTuple.get(i);
+	}
+
+	@Override
+	public Long getLong(final int i) {
+		return (Long) this.stormTuple.get(i);
+	}
+
+	@Override
+	public Boolean getBoolean(final int i) {
+		return (Boolean) this.stormTuple.get(i);
+	}
+
+	@Override
+	public Short getShort(final int i) {
+		return (Short) this.stormTuple.get(i);
+	}
+
+	@Override
+	public Byte getByte(final int i) {
+		return (Byte) this.stormTuple.get(i);
+	}
+
+	@Override
+	public Double getDouble(final int i) {
+		return (Double) this.stormTuple.get(i);
+	}
+
+	@Override
+	public Float getFloat(final int i) {
+		return (Float) this.stormTuple.get(i);
+	}
+
+	@Override
+	public byte[] getBinary(final int i) {
+		return (byte[]) this.stormTuple.get(i);
+	}
+
+	@Override
+	public Object getValueByField(final String field) {
+		throw new UnsupportedOperationException("Not implemented yet");
+	}
+
+	@Override
+	public String getStringByField(final String field) {
+		throw new UnsupportedOperationException("Not implemented yet");
+	}
+
+	@Override
+	public Integer getIntegerByField(final String field) {
+		throw new UnsupportedOperationException("Not implemented yet");
+	}
+
+	@Override
+	public Long getLongByField(final String field) {
+		throw new UnsupportedOperationException("Not implemented yet");
+	}
+
+	@Override
+	public Boolean getBooleanByField(final String field) {
+		throw new UnsupportedOperationException("Not implemented yet");
+	}
+
+	@Override
+	public Short getShortByField(final String field) {
+		throw new UnsupportedOperationException("Not implemented yet");
+	}
+
+	@Override
+	public Byte getByteByField(final String field) {
+		throw new UnsupportedOperationException("Not implemented yet");
+	}
+
+	@Override
+	public Double getDoubleByField(final String field) {
+		throw new UnsupportedOperationException("Not implemented yet");
+	}
+
+	@Override
+	public Float getFloatByField(final String field) {
+		throw new UnsupportedOperationException("Not implemented yet");
+	}
+
+	@Override
+	public byte[] getBinaryByField(final String field) {
+		throw new UnsupportedOperationException("Not implemented yet");
+	}
+
+	@Override
+	public List<Object> getValues() {
+		return this.stormTuple;
+	}
+
+	@Override
+	public GlobalStreamId getSourceGlobalStreamid() {
+		// not sure if Flink can support this
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public String getSourceComponent() {
+		// not sure if Flink can support this
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public int getSourceTask() {
+		// not sure if Flink can support this
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public String getSourceStreamId() {
+		// not sure if Flink can support this
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public MessageId getMessageId() {
+		// not sure if Flink can support this
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public int hashCode() {
+		final int prime = 31;
+		int result = 1;
+		result = (prime * result) + ((this.stormTuple == null) ? 0 : this.stormTuple.hashCode());
+		return result;
+	}
+
+	@Override
+	public boolean equals(final Object obj) {
+		if (this == obj) {
+			return true;
+		}
+		if (obj == null) {
+			return false;
+		}
+		if (this.getClass() != obj.getClass()) {
+			return false;
+		}
+		final StormTuple<?> other = (StormTuple<?>) obj;
+		if (this.stormTuple == null) {
+			if (other.stormTuple != null) {
+				return false;
+			}
+		} else if (!this.stormTuple.equals(other.stormTuple)) {
+			return false;
+		}
+		return true;
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/12b13f9c/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormWrapperSetupHelper.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormWrapperSetupHelper.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormWrapperSetupHelper.java
new file mode 100644
index 0000000..a189fba
--- /dev/null
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormWrapperSetupHelper.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.stormcompatibility.wrappers;
+
+import backtype.storm.generated.Bolt;
+import backtype.storm.generated.ComponentCommon;
+import backtype.storm.generated.SpoutSpec;
+import backtype.storm.generated.StormTopology;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.IComponent;
+import backtype.storm.topology.IRichBolt;
+import backtype.storm.topology.IRichSpout;
+import org.apache.flink.stormcompatibility.api.FlinkTopologyContext;
+import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * {@link StormWrapperSetupHelper} is an helper class used by {@link AbstractStormSpoutWrapper} or
+ * {@link StormBoltWrapper}.
+ */
+class StormWrapperSetupHelper {
+
+	/**
+	 * Computes the number of output attributes used by a {@link AbstractStormSpoutWrapper} or
+	 * {@link StormBoltWrapper}. Returns zero for raw output type or a value within range [1;25] for
+	 * output type {@link org.apache.flink.api.java.tuple.Tuple1 Tuple1} to
+	 * {@link org.apache.flink.api.java.tuple.Tuple25 Tuple25} . In case of a data sink, {@code -1}
+	 * is returned. .
+	 * 
+	 * @param spoutOrBolt
+	 * 		The Storm {@link IRichSpout spout} or {@link IRichBolt bolt} to be used.
+	 * @param rawOutput
+	 * 		Set to {@code true} if a single attribute output stream, should not be of type
+	 * 		{@link org.apache.flink.api.java.tuple.Tuple1 Tuple1} but be of a raw type.
+	 * @return The number of attributes to be used.
+	 * @throws IllegalArgumentException
+	 * 		If {@code rawOuput} is {@code true} and the number of declared output
+	 * 		attributes is not 1 or if {@code rawOuput} is {@code false} and the number
+	 * 		of declared output attributes is not with range [1;25].
+	 */
+	public static int getNumberOfAttributes(final IComponent spoutOrBolt, final boolean rawOutput)
+			throws IllegalArgumentException {
+		final StormOutputFieldsDeclarer declarer = new StormOutputFieldsDeclarer();
+		spoutOrBolt.declareOutputFields(declarer);
+
+		final int declaredNumberOfAttributes = declarer.getNumberOfAttributes();
+
+		if (declaredNumberOfAttributes == -1) {
+			return -1;
+		}
+
+		if ((declaredNumberOfAttributes < 1) || (declaredNumberOfAttributes > 25)) {
+			throw new IllegalArgumentException(
+					"Provided bolt declares non supported number of output attributes. Must be in range [1;25] but " +
+							"was "
+							+ declaredNumberOfAttributes);
+		}
+
+		if (rawOutput) {
+			if (declaredNumberOfAttributes > 1) {
+				throw new IllegalArgumentException(
+						"Ouput type is requested to be raw type, but provided bolt declares more then one output " +
+						"attribute.");
+
+			}
+			return 0;
+		}
+
+		return declaredNumberOfAttributes;
+	}
+
+	// TODO
+	public static TopologyContext convertToTopologyContext(final StreamingRuntimeContext context,
+			final boolean spoutOrBolt) {
+		final Integer taskId = new Integer(1 + context.getIndexOfThisSubtask());
+
+		final Map<Integer, String> taskToComponents = new HashMap<Integer, String>();
+		taskToComponents.put(taskId, context.getTaskName());
+
+		final ComponentCommon common = new ComponentCommon();
+		common.set_parallelism_hint(context.getNumberOfParallelSubtasks());
+
+		final Map<String, Bolt> bolts = new HashMap<String, Bolt>();
+		final Map<String, SpoutSpec> spoutSpecs = new HashMap<String, SpoutSpec>();
+
+		if (spoutOrBolt) {
+			spoutSpecs.put(context.getTaskName(), new SpoutSpec(null, common));
+		} else {
+			bolts.put(context.getTaskName(), new Bolt(null, common));
+		}
+
+		return new FlinkTopologyContext(new StormTopology(spoutSpecs, bolts, null), taskToComponents, taskId);
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/12b13f9c/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/api/FlinkOutputFieldsDeclarerTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/api/FlinkOutputFieldsDeclarerTest.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/api/FlinkOutputFieldsDeclarerTest.java
new file mode 100644
index 0000000..b55e1ef
--- /dev/null
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/api/FlinkOutputFieldsDeclarerTest.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.stormcompatibility.api;
+
+import backtype.storm.tuple.Fields;
+import backtype.storm.utils.Utils;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.stormcompatibility.util.AbstractTest;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.LinkedList;
+
+public class FlinkOutputFieldsDeclarerTest extends AbstractTest {
+
+	@Test
+	public void testDeclare() {
+		for (int i = 0; i < 4; ++i) {
+			for (int j = 0; j <= 25; ++j) {
+				this.runDeclareTest(i, j);
+			}
+		}
+	}
+
+	@Test(expected = IllegalArgumentException.class)
+	public void testDeclareSimpleToManyAttributes() {
+		this.runDeclareTest(0, 26);
+	}
+
+	@Test(expected = IllegalArgumentException.class)
+	public void testDeclareNonDirectToManyAttributes() {
+		this.runDeclareTest(1, 26);
+	}
+
+	@Test(expected = IllegalArgumentException.class)
+	public void testDeclareDefaultStreamToManyAttributes() {
+		this.runDeclareTest(2, 26);
+	}
+
+	@Test(expected = IllegalArgumentException.class)
+	public void testDeclareFullToManyAttributes() {
+		this.runDeclareTest(3, 26);
+	}
+
+	private void runDeclareTest(final int testCase, final int numberOfAttributes) {
+		final FlinkOutputFieldsDeclarer declarere = new FlinkOutputFieldsDeclarer();
+
+		final String[] attributes = new String[numberOfAttributes];
+		for (int i = 0; i < numberOfAttributes; ++i) {
+			attributes[i] = "a" + i;
+		}
+
+		switch (testCase) {
+			case 0:
+				this.declareSimple(declarere, attributes);
+				break;
+			case 1:
+				this.declareNonDirect(declarere, attributes);
+				break;
+			case 2:
+				this.declareDefaultStream(declarere, attributes);
+				break;
+			default:
+				this.declareFull(declarere, attributes);
+		}
+
+		final TypeInformation<?> type = declarere.getOutputType();
+
+		if (numberOfAttributes == 0) {
+			Assert.assertNull(type);
+		} else {
+			Assert.assertEquals(numberOfAttributes, type.getArity());
+			if (numberOfAttributes == 1) {
+				Assert.assertFalse(type.isTupleType());
+			} else {
+				Assert.assertTrue(type.isTupleType());
+			}
+		}
+	}
+
+	private void declareSimple(final FlinkOutputFieldsDeclarer declarere, final String[] attributes) {
+		declarere.declare(new Fields(attributes));
+	}
+
+	private void declareNonDirect(final FlinkOutputFieldsDeclarer declarere, final String[] attributes) {
+		declarere.declare(false, new Fields(attributes));
+	}
+
+	private void declareDefaultStream(final FlinkOutputFieldsDeclarer declarere, final String[] attributes) {
+		declarere.declareStream(Utils.DEFAULT_STREAM_ID, new Fields(attributes));
+	}
+
+	private void declareFull(final FlinkOutputFieldsDeclarer declarere, final String[] attributes) {
+		declarere.declareStream(Utils.DEFAULT_STREAM_ID, false, new Fields(attributes));
+	}
+
+	@Test(expected = UnsupportedOperationException.class)
+	public void testDeclareDirect() {
+		new FlinkOutputFieldsDeclarer().declare(true, null);
+	}
+
+	@Test(expected = UnsupportedOperationException.class)
+	public void testDeclareNonDefaultStrem() {
+		new FlinkOutputFieldsDeclarer().declareStream("dummy", null);
+	}
+
+	@Test(expected = UnsupportedOperationException.class)
+	public void testDeclareDirect2() {
+		new FlinkOutputFieldsDeclarer().declareStream(Utils.DEFAULT_STREAM_ID, true, null);
+	}
+
+	@Test(expected = UnsupportedOperationException.class)
+	public void testDeclareNonDefaultStrem2() {
+		new FlinkOutputFieldsDeclarer().declareStream("dummy", this.r.nextBoolean(), null);
+	}
+
+	@Test
+	public void testGetGroupingFieldIndexes() {
+		final int numberOfAttributes = 5 + this.r.nextInt(21);
+		final String[] attributes = new String[numberOfAttributes];
+		for (int i = 0; i < numberOfAttributes; ++i) {
+			attributes[i] = "a" + i;
+		}
+
+		final FlinkOutputFieldsDeclarer declarere = new FlinkOutputFieldsDeclarer();
+		declarere.declare(new Fields(attributes));
+
+		final int numberOfKeys = 1 + this.r.nextInt(25);
+		final LinkedList<String> groupingFields = new LinkedList<String>();
+		final boolean[] indexes = new boolean[numberOfAttributes];
+
+		for (int i = 0; i < numberOfAttributes; ++i) {
+			if (this.r.nextInt(26) < numberOfKeys) {
+				groupingFields.add(attributes[i]);
+				indexes[i] = true;
+			} else {
+				indexes[i] = false;
+			}
+		}
+
+		final int[] expectedResult = new int[groupingFields.size()];
+		int j = 0;
+		for (int i = 0; i < numberOfAttributes; ++i) {
+			if (indexes[i]) {
+				expectedResult[j++] = i;
+			}
+		}
+
+		final int[] result = declarere.getGroupingFieldIndexes(groupingFields);
+
+		Assert.assertEquals(expectedResult.length, result.length);
+		for (int i = 0; i < expectedResult.length; ++i) {
+			Assert.assertEquals(expectedResult[i], result[i]);
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/12b13f9c/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/api/FlinkTopologyContextTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/api/FlinkTopologyContextTest.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/api/FlinkTopologyContextTest.java
new file mode 100644
index 0000000..d214610
--- /dev/null
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/api/FlinkTopologyContextTest.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.stormcompatibility.api;
+
+import backtype.storm.metric.api.ICombiner;
+import backtype.storm.metric.api.IMetric;
+import backtype.storm.metric.api.IReducer;
+import org.junit.Test;
+
+public class FlinkTopologyContextTest {
+
+	@Test(expected = UnsupportedOperationException.class)
+	public void testAddTaskHook() {
+		new FlinkTopologyContext(null, null, null).addTaskHook(null);
+	}
+
+	@Test(expected = UnsupportedOperationException.class)
+	public void testGetHooks() {
+		new FlinkTopologyContext(null, null, null).getHooks();
+	}
+
+	@SuppressWarnings("rawtypes")
+	@Test(expected = UnsupportedOperationException.class)
+	public void testRegisteredMetric1() {
+		new FlinkTopologyContext(null, null, null).registerMetric(null, (ICombiner) null, 0);
+	}
+
+	@SuppressWarnings("rawtypes")
+	@Test(expected = UnsupportedOperationException.class)
+	public void testRegisteredMetric2() {
+		new FlinkTopologyContext(null, null, null).registerMetric(null, (IReducer) null, 0);
+	}
+
+	@Test(expected = UnsupportedOperationException.class)
+	public void testRegisteredMetric3() {
+		new FlinkTopologyContext(null, null, null).registerMetric(null, (IMetric) null, 0);
+	}
+
+	@Test(expected = UnsupportedOperationException.class)
+	public void testGetRegisteredMetricByName() {
+		new FlinkTopologyContext(null, null, null).getRegisteredMetricByName(null);
+	}
+
+	@Test(expected = UnsupportedOperationException.class)
+	public void testSetAllSubscribedState() {
+		new FlinkTopologyContext(null, null, null).setAllSubscribedState(null);
+	}
+
+	@Test(expected = UnsupportedOperationException.class)
+	public void testSetSubscribedState1() {
+		new FlinkTopologyContext(null, null, null).setSubscribedState(null, null);
+	}
+
+	@Test(expected = UnsupportedOperationException.class)
+	public void testSetSubscribedState2() {
+		new FlinkTopologyContext(null, null, null).setSubscribedState(null, null, null);
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/12b13f9c/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/api/FlinkTopologyTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/api/FlinkTopologyTest.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/api/FlinkTopologyTest.java
new file mode 100644
index 0000000..f179919
--- /dev/null
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/api/FlinkTopologyTest.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.stormcompatibility.api;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class FlinkTopologyTest {
+
+	@Test
+	public void testDefaultParallelism() {
+		final FlinkTopology topology = new FlinkTopology(null);
+		Assert.assertEquals(1, topology.getParallelism());
+	}
+
+	@Test(expected = UnsupportedOperationException.class)
+	public void testExecute() throws Exception {
+		new FlinkTopology(null).execute();
+	}
+
+	@Test(expected = UnsupportedOperationException.class)
+	public void testExecuteWithName() throws Exception {
+		new FlinkTopology(null).execute(null);
+	}
+
+	@Test
+	public void testNumberOfTasks() {
+		final FlinkTopology topology = new FlinkTopology(null);
+
+		Assert.assertEquals(0, topology.getNumberOfTasks());
+
+		topology.increaseNumberOfTasks(3);
+		Assert.assertEquals(3, topology.getNumberOfTasks());
+
+		topology.increaseNumberOfTasks(2);
+		Assert.assertEquals(5, topology.getNumberOfTasks());
+
+		topology.increaseNumberOfTasks(8);
+		Assert.assertEquals(13, topology.getNumberOfTasks());
+	}
+
+	@Test(expected = AssertionError.class)
+	public void testAssert() {
+		new FlinkTopology(null).increaseNumberOfTasks(0);
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/12b13f9c/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/util/AbstractTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/util/AbstractTest.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/util/AbstractTest.java
new file mode 100644
index 0000000..94a50cf
--- /dev/null
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/util/AbstractTest.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.stormcompatibility.util;
+
+import org.junit.Before;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Random;
+
+public abstract class AbstractTest {
+	private static final Logger LOG = LoggerFactory.getLogger(AbstractTest.class);
+
+	protected long seed;
+	protected Random r;
+
+	@Before
+	public void prepare() {
+		this.seed = System.currentTimeMillis();
+		this.r = new Random(this.seed);
+		LOG.info("Test seed: {}", new Long(this.seed));
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/12b13f9c/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/FiniteTestSpout.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/FiniteTestSpout.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/FiniteTestSpout.java
new file mode 100644
index 0000000..96b5aea
--- /dev/null
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/FiniteTestSpout.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.stormcompatibility.wrappers;
+
+import backtype.storm.spout.SpoutOutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.IRichSpout;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Values;
+
+import java.util.Map;
+
+class FiniteTestSpout implements IRichSpout {
+	private static final long serialVersionUID = 7992419478267824279L;
+
+	private int numberOfOutputTuples;
+	private SpoutOutputCollector collector;
+
+	public FiniteTestSpout(final int numberOfOutputTuples) {
+		this.numberOfOutputTuples = numberOfOutputTuples;
+	}
+
+	@SuppressWarnings("rawtypes")
+	@Override
+	public void open(final Map conf, final TopologyContext context,
+			@SuppressWarnings("hiding") final SpoutOutputCollector collector) {
+		this.collector = collector;
+	}
+
+	@Override
+	public void close() {/* nothing to do */}
+
+	@Override
+	public void activate() {/* nothing to do */}
+
+	@Override
+	public void deactivate() {/* nothing to do */}
+
+	@Override
+	public void nextTuple() {
+		if (--this.numberOfOutputTuples >= 0) {
+			this.collector.emit(new Values(new Integer(this.numberOfOutputTuples)));
+		}
+	}
+
+	@Override
+	public void ack(final Object msgId) {/* nothing to do */}
+
+	@Override
+	public void fail(final Object msgId) {/* nothing to do */}
+
+	@Override
+	public void declareOutputFields(final OutputFieldsDeclarer declarer) {
+		declarer.declare(new Fields("dummy"));
+	}
+
+	@Override
+	public Map<String, Object> getComponentConfiguration() {
+		return null;
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/12b13f9c/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormBoltCollectorTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormBoltCollectorTest.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormBoltCollectorTest.java
new file mode 100644
index 0000000..2789ce5
--- /dev/null
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormBoltCollectorTest.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.stormcompatibility.wrappers;
+
+import backtype.storm.tuple.Values;
+
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.stormcompatibility.util.AbstractTest;
+import org.apache.flink.streaming.api.operators.Output;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Collection;
+import java.util.List;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+
+public class StormBoltCollectorTest extends AbstractTest {
+
+	@SuppressWarnings({ "rawtypes", "unchecked" })
+	@Test
+	public void testBoltStormCollector() throws InstantiationException, IllegalAccessException {
+		for (int numberOfAttributes = 0; numberOfAttributes < 26; ++numberOfAttributes) {
+			final Output flinkCollector = mock(Output.class);
+			Tuple flinkTuple = null;
+			final Values tuple = new Values();
+
+			StormBoltCollector<?> collector;
+
+			if (numberOfAttributes == 0) {
+				collector = new StormBoltCollector(numberOfAttributes, flinkCollector);
+				tuple.add(new Integer(this.r.nextInt()));
+
+			} else {
+				collector = new StormBoltCollector(numberOfAttributes, flinkCollector);
+				flinkTuple = Tuple.getTupleClass(numberOfAttributes).newInstance();
+
+				for (int i = 0; i < numberOfAttributes; ++i) {
+					tuple.add(new Integer(this.r.nextInt()));
+					flinkTuple.setField(tuple.get(i), i);
+				}
+			}
+
+			final String streamId = "streamId";
+			final Collection anchors = mock(Collection.class);
+			final List<Integer> taskIds;
+			taskIds = collector.emit(streamId, anchors, tuple);
+
+			Assert.assertNull(taskIds);
+
+			if (numberOfAttributes == 0) {
+				verify(flinkCollector).collect(tuple.get(0));
+			} else {
+				verify(flinkCollector).collect(flinkTuple);
+			}
+		}
+	}
+
+
+	@SuppressWarnings("unchecked")
+	@Test(expected = UnsupportedOperationException.class)
+	public void testReportError() {
+		new StormBoltCollector<Object>(1, mock(Output.class)).reportError(null);
+	}
+
+	@SuppressWarnings({"rawtypes", "unchecked"})
+	@Test(expected = UnsupportedOperationException.class)
+	public void testEmitDirect() {
+		new StormBoltCollector<Object>(1, mock(Output.class)).emitDirect(0, null,
+				(Collection) null, null);
+	}
+
+	@SuppressWarnings("unchecked")
+	@Test(expected = UnsupportedOperationException.class)
+	public void testAck() {
+		new StormBoltCollector<Object>(1, mock(Output.class)).ack(null);
+	}
+
+	@SuppressWarnings("unchecked")
+	@Test(expected = UnsupportedOperationException.class)
+	public void testFail() {
+		new StormBoltCollector<Object>(1, mock(Output.class)).fail(null);
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/12b13f9c/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormBoltWrapperTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormBoltWrapperTest.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormBoltWrapperTest.java
new file mode 100644
index 0000000..780c75e
--- /dev/null
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormBoltWrapperTest.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.stormcompatibility.wrappers;
+
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.IRichBolt;
+import backtype.storm.tuple.Fields;
+
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer;
+import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.util.Map;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Matchers.isNull;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({StreamRecordSerializer.class, StormWrapperSetupHelper.class})
+public class StormBoltWrapperTest {
+
+	@SuppressWarnings("unused")
+	@Test(expected = IllegalArgumentException.class)
+	public void testWrapperRawType() throws Exception {
+		final StormOutputFieldsDeclarer declarer = new StormOutputFieldsDeclarer();
+		declarer.declare(new Fields("dummy1", "dummy2"));
+		PowerMockito.whenNew(StormOutputFieldsDeclarer.class).withNoArguments().thenReturn(declarer);
+
+		new StormBoltWrapper<Object, Object>(mock(IRichBolt.class), true);
+	}
+
+	@SuppressWarnings("unused")
+	@Test(expected = IllegalArgumentException.class)
+	public void testWrapperToManyAttributes1() throws Exception {
+		final StormOutputFieldsDeclarer declarer = new StormOutputFieldsDeclarer();
+		final String[] schema = new String[26];
+		for (int i = 0; i < schema.length; ++i) {
+			schema[i] = "a" + i;
+		}
+		declarer.declare(new Fields(schema));
+		PowerMockito.whenNew(StormOutputFieldsDeclarer.class).withNoArguments().thenReturn(declarer);
+
+		new StormBoltWrapper<Object, Object>(mock(IRichBolt.class));
+	}
+
+	@SuppressWarnings("unused")
+	@Test(expected = IllegalArgumentException.class)
+	public void testWrapperToManyAttributes2() throws Exception {
+		final StormOutputFieldsDeclarer declarer = new StormOutputFieldsDeclarer();
+		final String[] schema = new String[26];
+		for (int i = 0; i < schema.length; ++i) {
+			schema[i] = "a" + i;
+		}
+		declarer.declare(new Fields(schema));
+		PowerMockito.whenNew(StormOutputFieldsDeclarer.class).withNoArguments().thenReturn(declarer);
+
+		new StormBoltWrapper<Object, Object>(mock(IRichBolt.class), false);
+	}
+
+	@Test
+	public void testWrapper() throws Exception {
+		for (int i = 0; i < 26; ++i) {
+			this.testWrapper(i);
+		}
+	}
+
+	@SuppressWarnings({"rawtypes", "unchecked"})
+	private void testWrapper(final int numberOfAttributes) throws Exception {
+		assert ((0 <= numberOfAttributes) && (numberOfAttributes <= 25));
+		Tuple flinkTuple = null;
+		String rawTuple = null;
+
+		if (numberOfAttributes == 0) {
+			rawTuple = "test";
+		} else {
+			flinkTuple = Tuple.getTupleClass(numberOfAttributes).newInstance();
+		}
+
+		String[] schema = new String[numberOfAttributes];
+		if (numberOfAttributes == 0) {
+			schema = new String[1];
+		}
+		for (int i = 0; i < schema.length; ++i) {
+			schema[i] = "a" + i;
+		}
+
+		final StreamRecord record = mock(StreamRecord.class);
+		if (numberOfAttributes == 0) {
+			when(record.getObject()).thenReturn(rawTuple);
+		} else {
+			when(record.getObject()).thenReturn(flinkTuple);
+		}
+
+		final StreamingRuntimeContext taskContext = mock(StreamingRuntimeContext.class);
+
+		final IRichBolt bolt = mock(IRichBolt.class);
+
+		final StormOutputFieldsDeclarer declarer = new StormOutputFieldsDeclarer();
+		declarer.declare(new Fields(schema));
+		PowerMockito.whenNew(StormOutputFieldsDeclarer.class).withNoArguments().thenReturn(declarer);
+
+		final StormBoltWrapper wrapper = new StormBoltWrapper(bolt);
+		wrapper.setup(mock(Output.class), taskContext);
+
+		wrapper.processElement(record.getObject());
+		if (numberOfAttributes == 0) {
+			verify(bolt).execute(eq(new StormTuple<String>(rawTuple)));
+		} else {
+			verify(bolt).execute(eq(new StormTuple<Tuple>(flinkTuple)));
+		}
+	}
+
+	@SuppressWarnings("unchecked")
+	@Test
+	public void testOpen() throws Exception {
+		final IRichBolt bolt = mock(IRichBolt.class);
+
+		final StormOutputFieldsDeclarer declarer = new StormOutputFieldsDeclarer();
+		declarer.declare(new Fields("dummy"));
+		PowerMockito.whenNew(StormOutputFieldsDeclarer.class).withNoArguments().thenReturn(declarer);
+
+		final StormBoltWrapper<Object, Object> wrapper = new StormBoltWrapper<Object, Object>(bolt);
+		wrapper.setup(mock(Output.class), mock(StreamingRuntimeContext.class));
+
+		wrapper.open(mock(Configuration.class));
+
+		verify(bolt).prepare(any(Map.class), any(TopologyContext.class), any(OutputCollector.class));
+	}
+
+	@SuppressWarnings("unchecked")
+	@Test
+	public void testOpenSink() throws Exception {
+		final IRichBolt bolt = mock(IRichBolt.class);
+		final StormBoltWrapper<Object, Object> wrapper = new StormBoltWrapper<Object, Object>(bolt);
+		wrapper.setup(mock(Output.class), mock(StreamingRuntimeContext.class));
+
+		wrapper.open(mock(Configuration.class));
+
+		verify(bolt).prepare(any(Map.class), any(TopologyContext.class), isNull(OutputCollector.class));
+	}
+
+	@SuppressWarnings("unchecked")
+	@Test
+	public void testClose() throws Exception {
+		final IRichBolt bolt = mock(IRichBolt.class);
+
+		final StormOutputFieldsDeclarer declarer = new StormOutputFieldsDeclarer();
+		declarer.declare(new Fields("dummy"));
+		PowerMockito.whenNew(StormOutputFieldsDeclarer.class).withNoArguments().thenReturn(declarer);
+
+		final StormBoltWrapper<Object, Object> wrapper = new StormBoltWrapper<Object, Object>(bolt);
+
+		final StreamingRuntimeContext taskContext = mock(StreamingRuntimeContext.class);
+		// when(taskContext.getOutputCollector()).thenReturn(mock(Collector.class));
+		wrapper.setup(mock(Output.class), taskContext);
+
+		wrapper.close();
+		verify(bolt).cleanup();
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/12b13f9c/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormFiniteSpoutWrapperTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormFiniteSpoutWrapperTest.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormFiniteSpoutWrapperTest.java
new file mode 100644
index 0000000..c890ab1
--- /dev/null
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormFiniteSpoutWrapperTest.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.stormcompatibility.wrappers;
+
+import backtype.storm.topology.IRichSpout;
+import backtype.storm.tuple.Fields;
+
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.stormcompatibility.util.AbstractTest;
+import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
+import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.util.LinkedList;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(StormWrapperSetupHelper.class)
+public class StormFiniteSpoutWrapperTest extends AbstractTest {
+
+	@SuppressWarnings("unchecked")
+	@Test
+	public void testRunExecuteFixedNumber() throws Exception {
+		final StormOutputFieldsDeclarer declarer = new StormOutputFieldsDeclarer();
+		declarer.declare(new Fields("dummy"));
+		PowerMockito.whenNew(StormOutputFieldsDeclarer.class).withNoArguments().thenReturn(declarer);
+
+		final IRichSpout spout = mock(IRichSpout.class);
+		final int numberOfCalls = this.r.nextInt(50);
+		final StormFiniteSpoutWrapper<?> spoutWrapper = new StormFiniteSpoutWrapper<Object>(spout, numberOfCalls);
+		spoutWrapper.setRuntimeContext(mock(StreamingRuntimeContext.class));
+
+		spoutWrapper.run(mock(SourceContext.class));
+		verify(spout, times(numberOfCalls)).nextTuple();
+	}
+
+	@Test
+	public void testRunExecute() throws Exception {
+		final int numberOfCalls = this.r.nextInt(50);
+
+		final LinkedList<Tuple1<Integer>> expectedResult = new LinkedList<Tuple1<Integer>>();
+		for (int i = numberOfCalls - 1; i >= 0; --i) {
+			expectedResult.add(new Tuple1<Integer>(new Integer(i)));
+		}
+
+		final IRichSpout spout = new FiniteTestSpout(numberOfCalls);
+		final StormFiniteSpoutWrapper<Tuple1<Integer>> spoutWrapper = new StormFiniteSpoutWrapper<Tuple1<Integer>>(
+				spout);
+		spoutWrapper.setRuntimeContext(mock(StreamingRuntimeContext.class));
+
+		final TestContext collector = new TestContext();
+		spoutWrapper.run(collector);
+
+		Assert.assertEquals(expectedResult, collector.result);
+	}
+
+	@Test
+	public void testCancel() throws Exception {
+		final int numberOfCalls = 5 + this.r.nextInt(5);
+
+		final LinkedList<Tuple1<Integer>> expectedResult = new LinkedList<Tuple1<Integer>>();
+		expectedResult.add(new Tuple1<Integer>(new Integer(numberOfCalls - 1)));
+
+		final IRichSpout spout = new FiniteTestSpout(numberOfCalls);
+		final StormFiniteSpoutWrapper<Tuple1<Integer>> spoutWrapper = new StormFiniteSpoutWrapper<Tuple1<Integer>>(
+				spout);
+		spoutWrapper.setRuntimeContext(mock(StreamingRuntimeContext.class));
+
+		spoutWrapper.cancel();
+		final TestContext collector = new TestContext();
+		spoutWrapper.run(collector);
+
+		Assert.assertEquals(expectedResult, collector.result);
+	}
+
+	@Test
+	public void testClose() throws Exception {
+		final IRichSpout spout = mock(IRichSpout.class);
+		final StormFiniteSpoutWrapper<Tuple1<Integer>> spoutWrapper = new StormFiniteSpoutWrapper<Tuple1<Integer>>(
+				spout);
+
+		spoutWrapper.close();
+
+		verify(spout).close();
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/12b13f9c/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormOutputFieldsDeclarerTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormOutputFieldsDeclarerTest.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormOutputFieldsDeclarerTest.java
new file mode 100644
index 0000000..cfde770
--- /dev/null
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormOutputFieldsDeclarerTest.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.stormcompatibility.wrappers;
+
+import backtype.storm.tuple.Fields;
+import backtype.storm.utils.Utils;
+import org.apache.flink.stormcompatibility.util.AbstractTest;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+
+public class StormOutputFieldsDeclarerTest extends AbstractTest {
+
+	@Test
+	public void testDeclare() {
+		final StormOutputFieldsDeclarer declarer = new StormOutputFieldsDeclarer();
+
+		Assert.assertEquals(-1, declarer.getNumberOfAttributes());
+
+		final int numberOfAttributes = 1 + this.r.nextInt(25);
+		final ArrayList<String> schema = new ArrayList<String>(numberOfAttributes);
+		for (int i = 0; i < numberOfAttributes; ++i) {
+			schema.add("a" + i);
+		}
+		declarer.declare(new Fields(schema));
+		Assert.assertEquals(numberOfAttributes, declarer.getNumberOfAttributes());
+	}
+
+	@Test
+	public void testDeclareDirect() {
+		new StormOutputFieldsDeclarer().declare(false, null);
+	}
+
+	@Test(expected = UnsupportedOperationException.class)
+	public void testDeclareDirectFail() {
+		new StormOutputFieldsDeclarer().declare(true, null);
+	}
+
+	@Test
+	public void testDeclareStream() {
+		new StormOutputFieldsDeclarer().declareStream(Utils.DEFAULT_STREAM_ID, null);
+	}
+
+	@Test(expected = UnsupportedOperationException.class)
+	public void testDeclareStreamFail() {
+		new StormOutputFieldsDeclarer().declareStream(null, null);
+	}
+
+	@Test
+	public void testDeclareFullStream() {
+		new StormOutputFieldsDeclarer().declareStream(Utils.DEFAULT_STREAM_ID, false, null);
+	}
+
+	@Test(expected = UnsupportedOperationException.class)
+	public void testDeclareFullStreamFailNonDefaultStream() {
+		new StormOutputFieldsDeclarer().declareStream(null, false, null);
+	}
+
+	@Test(expected = UnsupportedOperationException.class)
+	public void testDeclareFullStreamFailDirect() {
+		new StormOutputFieldsDeclarer().declareStream(Utils.DEFAULT_STREAM_ID, true, null);
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/12b13f9c/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormSpoutCollectorTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormSpoutCollectorTest.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormSpoutCollectorTest.java
new file mode 100644
index 0000000..e4826bb
--- /dev/null
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormSpoutCollectorTest.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.stormcompatibility.wrappers;
+
+import backtype.storm.tuple.Values;
+
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.stormcompatibility.util.AbstractTest;
+import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.List;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+
+public class StormSpoutCollectorTest extends AbstractTest {
+
+	@SuppressWarnings({ "rawtypes", "unchecked" })
+	@Test
+	public void testSpoutStormCollector() throws InstantiationException, IllegalAccessException {
+		for (int numberOfAttributes = 0; numberOfAttributes < 26; ++numberOfAttributes) {
+			final SourceContext flinkCollector = mock(SourceContext.class);
+			Tuple flinkTuple = null;
+			final Values tuple = new Values();
+
+			StormSpoutCollector<?> collector;
+
+			if (numberOfAttributes == 0) {
+				collector = new StormSpoutCollector(numberOfAttributes, flinkCollector);
+				tuple.add(new Integer(this.r.nextInt()));
+
+			} else {
+				collector = new StormSpoutCollector(numberOfAttributes, flinkCollector);
+				flinkTuple = Tuple.getTupleClass(numberOfAttributes).newInstance();
+
+				for (int i = 0; i < numberOfAttributes; ++i) {
+					tuple.add(new Integer(this.r.nextInt()));
+					flinkTuple.setField(tuple.get(i), i);
+				}
+			}
+
+			final String streamId = "streamId";
+			final List<Integer> taskIds;
+			final Object messageId = new Integer(this.r.nextInt());
+
+			taskIds = collector.emit(streamId, tuple, messageId);
+
+			Assert.assertNull(taskIds);
+
+			if (numberOfAttributes == 0) {
+				verify(flinkCollector).collect(tuple.get(0));
+			} else {
+				verify(flinkCollector).collect(flinkTuple);
+			}
+		}
+	}
+
+	@SuppressWarnings("unchecked")
+	@Test(expected = UnsupportedOperationException.class)
+	public void testReportError() {
+		new StormSpoutCollector<Object>(1, mock(SourceContext.class)).reportError(null);
+	}
+
+	@SuppressWarnings("unchecked")
+	@Test(expected = UnsupportedOperationException.class)
+	public void testEmitDirect() {
+		new StormSpoutCollector<Object>(1, mock(SourceContext.class)).emitDirect(0, null, null,
+				(Object) null);
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/12b13f9c/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormSpoutWrapperTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormSpoutWrapperTest.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormSpoutWrapperTest.java
new file mode 100644
index 0000000..6d2f196
--- /dev/null
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormSpoutWrapperTest.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.stormcompatibility.wrappers;
+
+import backtype.storm.topology.IRichSpout;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.stormcompatibility.util.AbstractTest;
+import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.util.LinkedList;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(StormWrapperSetupHelper.class)
+public class StormSpoutWrapperTest extends AbstractTest {
+
+	@Test
+	public void testRunExecuteCancelInfinite() throws Exception {
+		final int numberOfCalls = 5 + this.r.nextInt(5);
+
+		final IRichSpout spout = new FiniteTestSpout(numberOfCalls);
+		final StormSpoutWrapper<Tuple1<Integer>> spoutWrapper = new StormSpoutWrapper<Tuple1<Integer>>(spout);
+		spoutWrapper.setRuntimeContext(mock(StreamingRuntimeContext.class));
+
+		spoutWrapper.cancel();
+		final TestContext collector = new TestContext();
+		spoutWrapper.run(collector);
+
+		Assert.assertEquals(new LinkedList<Tuple1<Integer>>(), collector.result);
+	}
+
+	@Test
+	public void testClose() throws Exception {
+		final IRichSpout spout = mock(IRichSpout.class);
+		final StormSpoutWrapper<Tuple1<Integer>> spoutWrapper = new StormSpoutWrapper<Tuple1<Integer>>(spout);
+
+		spoutWrapper.close();
+
+		verify(spout).close();
+	}
+
+}


Mime
View raw message