Return-Path: X-Original-To: apmail-flink-commits-archive@minotaur.apache.org Delivered-To: apmail-flink-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 8040918DF9 for ; Mon, 15 Jun 2015 09:32:52 +0000 (UTC) Received: (qmail 90649 invoked by uid 500); 15 Jun 2015 09:32:52 -0000 Delivered-To: apmail-flink-commits-archive@flink.apache.org Received: (qmail 90534 invoked by uid 500); 15 Jun 2015 09:32:52 -0000 Mailing-List: contact commits-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list commits@flink.apache.org Received: (qmail 90142 invoked by uid 99); 15 Jun 2015 09:32:52 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 15 Jun 2015 09:32:52 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id F1DE9E042F; Mon, 15 Jun 2015 09:32:51 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: mbalassi@apache.org To: commits@flink.apache.org Date: Mon, 15 Jun 2015 09:33:01 -0000 Message-Id: <12e2d551fb824ce596c5c1c21d6de54c@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [11/27] flink git commit: [storm-compat] Storm compatibility code cleanup http://git-wip-us.apache.org/repos/asf/flink/blob/e497a831/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormBoltWrapper.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormBoltWrapper.java b/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormBoltWrapper.java index d322cc5..6a5efdf 100644 --- a/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormBoltWrapper.java +++ b/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormBoltWrapper.java @@ -14,22 +14,18 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.stormcompatibility.wrappers; +import backtype.storm.task.OutputCollector; +import backtype.storm.task.TopologyContext; +import backtype.storm.topology.IRichBolt; import org.apache.flink.api.java.tuple.Tuple1; import org.apache.flink.api.java.tuple.Tuple25; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.operators.StreamOperator; import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext; -import backtype.storm.task.OutputCollector; -import backtype.storm.task.TopologyContext; -import backtype.storm.topology.IRichBolt; - - - - - /** * A {@link StormBoltWrapper} wraps an {@link IRichBolt} in order to execute the Storm bolt within a Flink Streaming * program. It takes the Flink input tuples of type {@code IN} and transforms them into a {@link StormTuple}s that the @@ -43,95 +39,88 @@ import backtype.storm.topology.IRichBolt; */ public class StormBoltWrapper extends StreamOperator { private static final long serialVersionUID = -4788589118464155835L; - - /** - * The wrapped Storm {@link IRichBolt bolt}. - */ + + // The wrapped Storm {@link IRichBolt bolt} private final IRichBolt bolt; - /** - * Number of attributes of the bolt's output tuples. - */ + // Number of attributes of the bolt's output tuples private final int numberOfAttributes; - - - + /** - * Instantiates a new {@link StormBoltWrapper} that wraps the given Storm {@link IRichBolt bolt} such that it can be - * used within a Flink streaming program. The output type will be one of {@link Tuple1} to {@link Tuple25} depending + * Instantiates a new {@link StormBoltWrapper} that wraps the given Storm {@link IRichBolt bolt} such that it + * can be + * used within a Flink streaming program. The output type will be one of {@link Tuple1} to {@link Tuple25} + * depending * on the bolt's declared number of attributes. - * + * * @param bolt - * The Storm {@link IRichBolt bolt} to be used. + * The Storm {@link IRichBolt bolt} to be used. * @throws IllegalArgumentException - * If the number of declared output attributes is not with range [1;25]. + * If the number of declared output attributes is not with range [1;25]. */ public StormBoltWrapper(final IRichBolt bolt) throws IllegalArgumentException { this(bolt, false); } - + /** - * Instantiates a new {@link StormBoltWrapper} that wraps the given Storm {@link IRichBolt bolt} such that it can be + * Instantiates a new {@link StormBoltWrapper} that wraps the given Storm {@link IRichBolt bolt} such that it + * can be * used within a Flink streaming program. The output type can be any type if parameter {@code rawOutput} is * {@code true} and the bolt's number of declared output tuples is 1. If {@code rawOutput} is {@code false} the * output type will be one of {@link Tuple1} to {@link Tuple25} depending on the bolt's declared number of * attributes. - * + * * @param bolt - * The Storm {@link IRichBolt bolt} to be used. + * The Storm {@link IRichBolt bolt} to be used. * @param rawOutput - * Set to {@code true} if a single attribute output stream, should not be of type {@link Tuple1} but be - * of a raw type. + * Set to {@code true} if a single attribute output stream, should not be of type {@link Tuple1} but be + * of a raw type. * @throws IllegalArgumentException - * If {@code rawOuput} is {@code true} and the number of declared output attributes is not 1 or if - * {@code rawOuput} is {@code false} and the number of declared output attributes is not with range - * [1;25]. + * If {@code rawOuput} is {@code true} and the number of declared output attributes is not 1 or if + * {@code rawOuput} is {@code false} and the number of declared output attributes is not with range + * [1;25]. */ public StormBoltWrapper(final IRichBolt bolt, final boolean rawOutput) throws IllegalArgumentException { super(new FlinkDummyRichFunction()); this.bolt = bolt; this.numberOfAttributes = StormWrapperSetupHelper.getNumberOfAttributes(bolt, rawOutput); } - - - + @Override public void open(final Configuration parameters) throws Exception { super.open(parameters); - - final StreamingRuntimeContext flinkContext = (StreamingRuntimeContext)((FlinkDummyRichFunction)super - .getUserFunction()).getRuntimeContext(); - + + final StreamingRuntimeContext flinkContext = (StreamingRuntimeContext) ((FlinkDummyRichFunction) super + .getUserFunction()).getRuntimeContext(); + final TopologyContext topologyContext = StormWrapperSetupHelper.convertToTopologyContext(flinkContext, false); OutputCollector stormCollector = null; - - if(this.numberOfAttributes != -1) { + + if (this.numberOfAttributes != -1) { stormCollector = new OutputCollector(new StormCollector(this.numberOfAttributes, super.collector)); } - + this.bolt.prepare(null, topologyContext, stormCollector); } - + @Override public void close() { super.close(); this.bolt.cleanup(); } - + /** - * {@inheritDoc} - * * Transforms a Flink tuple into a Storm tuple and calls the bolt's {@code execute} method. */ @Override protected void callUserFunction() throws Exception { this.bolt.execute(new StormTuple(this.nextRecord.getObject())); } - + @Override public void run() throws Exception { - while(this.readNext() != null) { + while (this.readNext() != null) { this.callUserFunctionAndLogException(); } } - + } http://git-wip-us.apache.org/repos/asf/flink/blob/e497a831/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormCollector.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormCollector.java b/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormCollector.java index ed333b8..1c13e88 100644 --- a/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormCollector.java +++ b/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormCollector.java @@ -14,22 +14,25 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.stormcompatibility.wrappers; -import java.util.Collection; -import java.util.List; +/* we do not import + * --> "org.apache.flink.api.java.tuple.Tuple" + * or + * --> "backtype.storm.tuple.Tuple" + * to avoid confusion + */ +import backtype.storm.spout.ISpoutOutputCollector; +import backtype.storm.task.IOutputCollector; import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.api.java.tuple.Tuple1; import org.apache.flink.api.java.tuple.Tuple25; import org.apache.flink.util.Collector; -import backtype.storm.spout.ISpoutOutputCollector; -import backtype.storm.task.IOutputCollector; - - - - +import java.util.Collection; +import java.util.List; /** * A {@link StormCollector} is used by {@link AbstractStormSpoutWrapper} and {@link StormBoltWrapper} to provided an @@ -38,119 +41,105 @@ import backtype.storm.task.IOutputCollector; * {@link Collector}. */ class StormCollector implements ISpoutOutputCollector, IOutputCollector { - // we do not import - // --> "org.apache.flink.api.java.tuple.Tuple" - // or - // --> "backtype.storm.tuple.Tuple" - // to avoid confusion - - /** - * The Flink collector. - */ + + // The Flink collector private final Collector flinkCollector; - /** - * The Flink output tuple of concrete type {@link Tuple1} to {@link Tuple25}. - */ + // The Flink output tuple of concrete type {@link Tuple1} to {@link Tuple25} private final org.apache.flink.api.java.tuple.Tuple outputTuple; - /** - * The number of attributes of the output tuples. (Determines the concrete type of {@link #outputTuple}). - */ + // The number of attributes of the output tuples. (Determines the concrete type of {@link #outputTuple}) private final int numberOfAttributes; - /** - * Is set to {@code true} each time a tuple is emitted. - */ + // Is set to {@code true} each time a tuple is emitted boolean tupleEmitted = false; - - - + /** * Instantiates a new {@link StormCollector} that emits Flink tuples to the given Flink collector. If the number of * attributes is specified as zero, any output type is supported. If the number of attributes is between 1 to 25, * the output type is {@link Tuple1} to {@link Tuple25}. - * + * * @param numberOfAttributes - * The number of attributes of the emitted tuples. + * The number of attributes of the emitted tuples. * @param flinkCollector - * The Flink collector to be used. - * + * The Flink collector to be used. * @throws UnsupportedOperationException - * if the specified number of attributes is not in the valid range of [0,25] + * if the specified number of attributes is not in the valid range of [0,25] */ public StormCollector(final int numberOfAttributes, final Collector flinkCollector) - throws UnsupportedOperationException { + throws UnsupportedOperationException { this.numberOfAttributes = numberOfAttributes; this.flinkCollector = flinkCollector; - - if(this.numberOfAttributes <= 0) { + + if (this.numberOfAttributes <= 0) { this.outputTuple = null; - } else if(this.numberOfAttributes <= 25) { + } else if (this.numberOfAttributes <= 25) { try { this.outputTuple = Tuple.getTupleClass(this.numberOfAttributes).newInstance(); - } catch(final InstantiationException e) { + } catch (final InstantiationException e) { throw new RuntimeException(e); - } catch(final IllegalAccessException e) { + } catch (final IllegalAccessException e) { throw new RuntimeException(e); } } else { throw new UnsupportedOperationException( - "SimpleStormBoltWrapper can handle not more then 25 attributes, but " + this.numberOfAttributes - + " are declared by the given bolt."); + "SimpleStormBoltWrapper can handle not more then 25 attributes, but " + this.numberOfAttributes + + " are declared by the given bolt"); } } - - - + @Override public void reportError(final Throwable error) { // not sure, if Flink can support this - throw new UnsupportedOperationException("Not implemented yet."); + throw new UnsupportedOperationException("Not implemented yet"); } - + @Override public List emit(final String streamId, final List tuple, final Object messageId) { - return this.emitImpl(streamId, null, tuple, messageId); + return this.emitImpl(tuple); } - + @Override - public List emit(final String streamId, final Collection anchors, final List tuple) { - return this.emitImpl(streamId, anchors, tuple, null); + public List emit(final String streamId, final Collection anchors, + final List tuple) { + return this.emitImpl(tuple); } - + @SuppressWarnings("unchecked") - public List emitImpl(final String streamId, final Collection anchors, final List tuple, final Object messageId) { - if(this.numberOfAttributes > 0) { + public List emitImpl( + final List tuple) { + if (this.numberOfAttributes > 0) { assert (tuple.size() == this.numberOfAttributes); - for(int i = 0; i < this.numberOfAttributes; ++i) { + for (int i = 0; i < this.numberOfAttributes; ++i) { this.outputTuple.setField(tuple.get(i), i); } - this.flinkCollector.collect((OUT)this.outputTuple); + this.flinkCollector.collect((OUT) this.outputTuple); } else { assert (tuple.size() == 1); - this.flinkCollector.collect((OUT)tuple.get(0)); + this.flinkCollector.collect((OUT) tuple.get(0)); } this.tupleEmitted = true; - - return null; // TODO + + // TODO + return null; } - + @Override public void emitDirect(final int taskId, final String streamId, final List tuple, final Object messageId) { throw new UnsupportedOperationException("Direct emit is not supported by Flink"); } - + @Override - public void emitDirect(final int taskId, final String streamId, final Collection anchors, final List tuple) { + public void emitDirect(final int taskId, final String streamId, + final Collection anchors, final List tuple) { throw new UnsupportedOperationException("Direct emit is not supported by Flink"); } - + @Override public void ack(final backtype.storm.tuple.Tuple input) { - throw new UnsupportedOperationException("Currently, acking/failing is not supported by Flink."); + throw new UnsupportedOperationException("Currently, acking/failing is not supported by Flink"); } - + @Override public void fail(final backtype.storm.tuple.Tuple input) { - throw new UnsupportedOperationException("Currently, acking/failing is not supported by Flink."); + throw new UnsupportedOperationException("Currently, acking/failing is not supported by Flink"); } - + } http://git-wip-us.apache.org/repos/asf/flink/blob/e497a831/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormFiniteSpoutWrapper.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormFiniteSpoutWrapper.java b/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormFiniteSpoutWrapper.java index f72fcee..ebbf80f 100644 --- a/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormFiniteSpoutWrapper.java +++ b/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormFiniteSpoutWrapper.java @@ -14,17 +14,13 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.stormcompatibility.wrappers; +import backtype.storm.topology.IRichSpout; import org.apache.flink.api.java.tuple.Tuple1; import org.apache.flink.api.java.tuple.Tuple25; -import backtype.storm.topology.IRichSpout; - - - - - /** * A {@link StormFiniteSpoutWrapper} is an {@link AbstractStormSpoutWrapper} that calles {@link IRichSpout#nextTuple() * nextTuple()} for finite number of times before {@link #run(org.apache.flink.util.Collector)} returns. The number of @@ -33,113 +29,104 @@ import backtype.storm.topology.IRichSpout; */ public class StormFiniteSpoutWrapper extends AbstractStormSpoutWrapper { private static final long serialVersionUID = 3883246587044801286L; - - /** - * The number of {@link IRichSpout#nextTuple()} calls. - */ + + // The number of {@link IRichSpout#nextTuple()} calls private int numberOfInvocations; - - - + /** * Instantiates a new {@link StormFiniteSpoutWrapper} that calls the {@link IRichSpout#nextTuple() nextTuple()} * method of the given Storm {@link IRichSpout spout} as long as it emits records to the output collector. The * output type will be one of {@link Tuple1} to {@link Tuple25} depending on the spout's declared number of * attributes. - * + * * @param spout - * The Storm {@link IRichSpout spout} to be used. - * + * The Storm {@link IRichSpout spout} to be used. * @throws IllegalArgumentException - * If the number of declared output attributes is not with range [1;25]. + * If the number of declared output attributes is not with range [1;25]. */ public StormFiniteSpoutWrapper(final IRichSpout spout) throws IllegalArgumentException { this(spout, false, -1); } - + /** * Instantiates a new {@link StormFiniteSpoutWrapper} that calls the {@link IRichSpout#nextTuple() nextTuple()} - * method of the given Storm {@link IRichSpout spout} {@code numberOfInvocations} times. The output type will be one + * method of the given Storm {@link IRichSpout spout} {@code numberOfInvocations} times. The output type will be + * one * of {@link Tuple1} to {@link Tuple25} depending on the spout's declared number of attributes. - * + * * @param spout - * The Storm {@link IRichSpout spout} to be used. + * The Storm {@link IRichSpout spout} to be used. * @param numberOfInvocations - * The number of calls to {@link IRichSpout#nextTuple()}. - * + * The number of calls to {@link IRichSpout#nextTuple()}. * @throws IllegalArgumentException - * If the number of declared output attributes is not with range [1;25]. + * If the number of declared output attributes is not with range [1;25]. */ public StormFiniteSpoutWrapper(final IRichSpout spout, final int numberOfInvocations) - throws IllegalArgumentException { + throws IllegalArgumentException { this(spout, false, numberOfInvocations); } - + /** * Instantiates a new {@link StormFiniteSpoutWrapper} that calls the {@link IRichSpout#nextTuple() nextTuple()} * method of the given Storm {@link IRichSpout spout} as long as it emits records to the output collector. The * output type can be any type if parameter {@code rawOutput} is {@code true} and the spout's number of declared * output tuples is 1. If {@code rawOutput} is {@code false} the output type will be one of {@link Tuple1} to * {@link Tuple25} depending on the spout's declared number of attributes. - * + * * @param spout - * The Storm {@link IRichSpout spout} to be used. + * The Storm {@link IRichSpout spout} to be used. * @param rawOutput - * Set to {@code true} if a single attribute output stream, should not be of type {@link Tuple1} but be - * of a raw type. - * + * Set to {@code true} if a single attribute output stream, should not be of type {@link Tuple1} but be + * of a raw type. * @throws IllegalArgumentException - * If {@code rawOuput} is {@code true} and the number of declared output attributes is not 1 or if - * {@code rawOuput} is {@code false} and the number of declared output attributes is not with range - * [1;25]. + * If {@code rawOuput} is {@code true} and the number of declared output attributes is not 1 or if + * {@code rawOuput} is {@code false} and the number of declared output attributes is not with range + * [1;25]. */ public StormFiniteSpoutWrapper(final IRichSpout spout, final boolean rawOutput) throws IllegalArgumentException { this(spout, rawOutput, -1); } - + /** * Instantiates a new {@link StormFiniteSpoutWrapper} that calls the {@link IRichSpout#nextTuple() nextTuple()} * method of the given Storm {@link IRichSpout spout} {@code numberOfInvocations} times. The output type can be any * type if parameter {@code rawOutput} is {@code true} and the spout's number of declared output tuples is 1. If * {@code rawOutput} is {@code false} the output type will be one of {@link Tuple1} to {@link Tuple25} depending on * the spout's declared number of attributes. - * + * * @param spout - * The Storm {@link IRichSpout spout} to be used. + * The Storm {@link IRichSpout spout} to be used. * @param rawOutput - * Set to {@code true} if a single attribute output stream, should not be of type {@link Tuple1} but be - * of a raw type. + * Set to {@code true} if a single attribute output stream, should not be of type {@link Tuple1} but be + * of a raw type. * @param numberOfInvocations - * The number of calls to {@link IRichSpout#nextTuple()}. - * + * The number of calls to {@link IRichSpout#nextTuple()}. * @throws IllegalArgumentException - * If {@code rawOuput} is {@code true} and the number of declared output attributes is not 1 or if - * {@code rawOuput} is {@code false} and the number of declared output attributes is not with range - * [1;25]. + * If {@code rawOuput} is {@code true} and the number of declared output attributes is not 1 or if + * {@code rawOuput} is {@code false} and the number of declared output attributes is not with range + * [1;25]. */ public StormFiniteSpoutWrapper(final IRichSpout spout, final boolean rawOutput, final int numberOfInvocations) - throws IllegalArgumentException { + throws IllegalArgumentException { super(spout, rawOutput); this.numberOfInvocations = numberOfInvocations; } - - - + /** * Calls {@link IRichSpout#nextTuple()} for the given number of times. */ @Override protected void execute() { - if(this.numberOfInvocations >= 0) { - while((--this.numberOfInvocations >= 0) && super.isRunning) { + if (this.numberOfInvocations >= 0) { + while ((--this.numberOfInvocations >= 0) && super.isRunning) { super.spout.nextTuple(); } } else { do { super.collector.tupleEmitted = false; super.spout.nextTuple(); - } while(super.collector.tupleEmitted && super.isRunning); + } while (super.collector.tupleEmitted && super.isRunning); } } - + } http://git-wip-us.apache.org/repos/asf/flink/blob/e497a831/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormOutputFieldsDeclarer.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormOutputFieldsDeclarer.java b/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormOutputFieldsDeclarer.java index a9c66ee..6005d6d 100644 --- a/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormOutputFieldsDeclarer.java +++ b/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormOutputFieldsDeclarer.java @@ -21,61 +21,54 @@ import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.tuple.Fields; import backtype.storm.utils.Utils; - - - - /** * {@link StormOutputFieldsDeclarer} is used by {@link StormBoltWrapper} to determine the number of attributes declared * by the wrapped bolt's {@code declare(...)} method. */ class StormOutputFieldsDeclarer implements OutputFieldsDeclarer { - /** - * The output schema declared by the wrapped bolt. - */ + + // The output schema declared by the wrapped bolt. private Fields outputSchema = null; - - - + @Override public void declare(final Fields fields) { this.declareStream(Utils.DEFAULT_STREAM_ID, false, fields); } - + @Override public void declare(final boolean direct, final Fields fields) { this.declareStream(Utils.DEFAULT_STREAM_ID, direct, fields); } - + @Override public void declareStream(final String streamId, final Fields fields) { this.declareStream(streamId, false, fields); } - + @Override public void declareStream(final String streamId, final boolean direct, final Fields fields) { - if(!Utils.DEFAULT_STREAM_ID.equals(streamId)) { - throw new UnsupportedOperationException("Currently, only the default output stream is supported by Flink."); + if (!Utils.DEFAULT_STREAM_ID.equals(streamId)) { + throw new UnsupportedOperationException("Currently, only the default output stream is supported by Flink"); } - if(direct) { + if (direct) { throw new UnsupportedOperationException("Direct emit is not supported by Flink"); } - + this.outputSchema = fields; } - + /** * Returns the number of attributes of the output schema declare by the wrapped bolt. If no output schema is * declared (eg, for sink bolts), {@code -1} is returned. - * + * * @return the number of attributes of the output schema declare by the wrapped bolt */ public int getNumberOfAttributes() { - if(this.outputSchema != null) { + if (this.outputSchema != null) { return this.outputSchema.size(); } - + return -1; } - + } http://git-wip-us.apache.org/repos/asf/flink/blob/e497a831/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormSpoutWrapper.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormSpoutWrapper.java b/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormSpoutWrapper.java index 0c7d62d..f5e0733 100644 --- a/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormSpoutWrapper.java +++ b/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormSpoutWrapper.java @@ -14,70 +14,65 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.stormcompatibility.wrappers; +import backtype.storm.topology.IRichSpout; import org.apache.flink.api.java.tuple.Tuple1; import org.apache.flink.api.java.tuple.Tuple25; -import backtype.storm.topology.IRichSpout; - - - - - /** * A {@link StormSpoutWrapper} is an {@link AbstractStormSpoutWrapper} that calls the wrapped spout's * {@link IRichSpout#nextTuple() nextTuple()} method in in infinite loop. */ public class StormSpoutWrapper extends AbstractStormSpoutWrapper { private static final long serialVersionUID = -218340336648247605L; - + /** - * Instantiates a new {@link StormSpoutWrapper} that wraps the given Storm {@link IRichSpout spout} such that it can + * Instantiates a new {@link StormSpoutWrapper} that wraps the given Storm {@link IRichSpout spout} such that it + * can * be used within a Flink streaming program. The output type will be one of {@link Tuple1} to {@link Tuple25} * depending on the spout's declared number of attributes. - * + * * @param spout - * The Storm {@link IRichSpout spout} to be used. - * + * The Storm {@link IRichSpout spout} to be used. * @throws IllegalArgumentException - * If the number of declared output attributes is not with range [1;25]. + * If the number of declared output attributes is not with range [1;25]. */ public StormSpoutWrapper(final IRichSpout spout) throws IllegalArgumentException { super(spout, false); } - + /** - * Instantiates a new {@link StormSpoutWrapper} that wraps the given Storm {@link IRichSpout spout} such that it can + * Instantiates a new {@link StormSpoutWrapper} that wraps the given Storm {@link IRichSpout spout} such that it + * can * be used within a Flink streaming program. The output type can be any type if parameter {@code rawOutput} is * {@code true} and the spout's number of declared output tuples is 1. If {@code rawOutput} is {@code false} the * output type will be one of {@link Tuple1} to {@link Tuple25} depending on the spout's declared number of * attributes. - * + * * @param spout - * The Storm {@link IRichSpout spout} to be used. - * + * The Storm {@link IRichSpout spout} to be used. * @param rawOutput - * Set to {@code true} if a single attribute output stream, should not be of type {@link Tuple1} but be - * of a raw type. - * + * Set to {@code true} if a single attribute output stream, should not be of type {@link Tuple1} but be + * of a raw type. * @throws IllegalArgumentException - * If {@code rawOuput} is {@code true} and the number of declared output attributes is not 1 or if - * {@code rawOuput} is {@code false} and the number of declared output attributes is not with range - * [1;25]. + * If {@code rawOuput} is {@code true} and the number of declared output attributes is not 1 or if + * {@code rawOuput} is {@code false} and the number of declared output attributes is not with range + * [1;25]. */ public StormSpoutWrapper(final IRichSpout spout, final boolean rawOutput) throws IllegalArgumentException { super(spout, rawOutput); } - + /** * Calls {@link IRichSpout#nextTuple()} in an infinite loop until {@link #cancel()} is called. */ @Override protected void execute() { - while(super.isRunning) { + while (super.isRunning) { super.spout.nextTuple(); } } - + } http://git-wip-us.apache.org/repos/asf/flink/blob/e497a831/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormTuple.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormTuple.java b/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormTuple.java index 21f8de4..8019d7d 100644 --- a/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormTuple.java +++ b/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormTuple.java @@ -18,209 +18,202 @@ package org.apache.flink.stormcompatibility.wrappers; -import java.util.List; - import backtype.storm.generated.GlobalStreamId; import backtype.storm.tuple.Fields; import backtype.storm.tuple.MessageId; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values; - - - +import java.util.List; /** * {@link StormTuple} converts a Flink tuple of type {@code IN} into a Storm tuple. */ class StormTuple implements Tuple { - /** - * The storm representation of the original Flink tuple. - */ + + // The storm representation of the original Flink tuple private final Values stormTuple; - - - + /** * Create a new Storm tuple from the given Flink tuple. - * + * * @param flinkTuple - * The Flink tuple to be converted. + * The Flink tuple to be converted. */ public StormTuple(final IN flinkTuple) { - if(flinkTuple instanceof org.apache.flink.api.java.tuple.Tuple) { - final org.apache.flink.api.java.tuple.Tuple t = (org.apache.flink.api.java.tuple.Tuple)flinkTuple; - + if (flinkTuple instanceof org.apache.flink.api.java.tuple.Tuple) { + final org.apache.flink.api.java.tuple.Tuple t = (org.apache.flink.api.java.tuple.Tuple) flinkTuple; + final int numberOfAttributes = t.getArity(); this.stormTuple = new Values(); - for(int i = 0; i < numberOfAttributes; ++i) { + for (int i = 0; i < numberOfAttributes; ++i) { this.stormTuple.add(t.getField(i)); } } else { this.stormTuple = new Values(flinkTuple); } } - + @Override public int size() { return this.stormTuple.size(); } - + @Override public boolean contains(final String field) { throw new UnsupportedOperationException("Not implemented yet"); } - + @Override public Fields getFields() { throw new UnsupportedOperationException("Not implemented yet"); } - + @Override public int fieldIndex(final String field) { throw new UnsupportedOperationException("Not implemented yet"); } - + @Override public List select(final Fields selector) { throw new UnsupportedOperationException("Not implemented yet"); } - + @Override public Object getValue(final int i) { return this.stormTuple.get(i); } - + @Override public String getString(final int i) { - return (String)this.stormTuple.get(i); + return (String) this.stormTuple.get(i); } - + @Override public Integer getInteger(final int i) { - return (Integer)this.stormTuple.get(i); + return (Integer) this.stormTuple.get(i); } - + @Override public Long getLong(final int i) { - return (Long)this.stormTuple.get(i); + return (Long) this.stormTuple.get(i); } - + @Override public Boolean getBoolean(final int i) { - return (Boolean)this.stormTuple.get(i); + return (Boolean) this.stormTuple.get(i); } - + @Override public Short getShort(final int i) { - return (Short)this.stormTuple.get(i); + return (Short) this.stormTuple.get(i); } - + @Override public Byte getByte(final int i) { - return (Byte)this.stormTuple.get(i); + return (Byte) this.stormTuple.get(i); } - + @Override public Double getDouble(final int i) { - return (Double)this.stormTuple.get(i); + return (Double) this.stormTuple.get(i); } - + @Override public Float getFloat(final int i) { - return (Float)this.stormTuple.get(i); + return (Float) this.stormTuple.get(i); } - + @Override public byte[] getBinary(final int i) { - return (byte[])this.stormTuple.get(i); + return (byte[]) this.stormTuple.get(i); } - + @Override public Object getValueByField(final String field) { throw new UnsupportedOperationException("Not implemented yet"); } - + @Override public String getStringByField(final String field) { throw new UnsupportedOperationException("Not implemented yet"); } - + @Override public Integer getIntegerByField(final String field) { throw new UnsupportedOperationException("Not implemented yet"); } - + @Override public Long getLongByField(final String field) { throw new UnsupportedOperationException("Not implemented yet"); } - + @Override public Boolean getBooleanByField(final String field) { throw new UnsupportedOperationException("Not implemented yet"); } - + @Override public Short getShortByField(final String field) { throw new UnsupportedOperationException("Not implemented yet"); } - + @Override public Byte getByteByField(final String field) { throw new UnsupportedOperationException("Not implemented yet"); } - + @Override public Double getDoubleByField(final String field) { throw new UnsupportedOperationException("Not implemented yet"); } - + @Override public Float getFloatByField(final String field) { throw new UnsupportedOperationException("Not implemented yet"); } - + @Override public byte[] getBinaryByField(final String field) { throw new UnsupportedOperationException("Not implemented yet"); } - + @Override public List getValues() { return this.stormTuple; } - + @Override public GlobalStreamId getSourceGlobalStreamid() { // not sure if Flink can support this throw new UnsupportedOperationException(); } - + @Override public String getSourceComponent() { // not sure if Flink can support this throw new UnsupportedOperationException(); } - + @Override public int getSourceTask() { // not sure if Flink can support this throw new UnsupportedOperationException(); } - + @Override public String getSourceStreamId() { // not sure if Flink can support this throw new UnsupportedOperationException(); } - + @Override public MessageId getMessageId() { // not sure if Flink can support this throw new UnsupportedOperationException(); } - + @Override public int hashCode() { final int prime = 31; @@ -228,27 +221,27 @@ class StormTuple implements Tuple { result = (prime * result) + ((this.stormTuple == null) ? 0 : this.stormTuple.hashCode()); return result; } - + @Override public boolean equals(final Object obj) { - if(this == obj) { + if (this == obj) { return true; } - if(obj == null) { + if (obj == null) { return false; } - if(this.getClass() != obj.getClass()) { + if (this.getClass() != obj.getClass()) { return false; } - final StormTuple other = (StormTuple)obj; - if(this.stormTuple == null) { - if(other.stormTuple != null) { + final StormTuple other = (StormTuple) obj; + if (this.stormTuple == null) { + if (other.stormTuple != null) { return false; } - } else if(!this.stormTuple.equals(other.stormTuple)) { + } else if (!this.stormTuple.equals(other.stormTuple)) { return false; } return true; } - + } http://git-wip-us.apache.org/repos/asf/flink/blob/e497a831/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormWrapperSetupHelper.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormWrapperSetupHelper.java b/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormWrapperSetupHelper.java index ecbbe5c..a8f99e6 100644 --- a/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormWrapperSetupHelper.java +++ b/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormWrapperSetupHelper.java @@ -14,15 +14,8 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.flink.stormcompatibility.wrappers; -import java.util.HashMap; -import java.util.Map; - -import org.apache.flink.api.java.tuple.Tuple1; -import org.apache.flink.api.java.tuple.Tuple25; -import org.apache.flink.stormcompatibility.api.FlinkTopologyContext; -import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext; +package org.apache.flink.stormcompatibility.wrappers; import backtype.storm.generated.Bolt; import backtype.storm.generated.ComponentCommon; @@ -32,84 +25,90 @@ import backtype.storm.task.TopologyContext; import backtype.storm.topology.IComponent; import backtype.storm.topology.IRichBolt; import backtype.storm.topology.IRichSpout; +import org.apache.flink.api.java.tuple.Tuple1; +import org.apache.flink.api.java.tuple.Tuple25; +import org.apache.flink.stormcompatibility.api.FlinkTopologyContext; +import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext; - - - +import java.util.HashMap; +import java.util.Map; /** * {@link StormWrapperSetupHelper} is an helper class used by {@link AbstractStormSpoutWrapper} or * {@link StormBoltWrapper}. */ class StormWrapperSetupHelper { - + /** - * Computes the number of output attributes used by a {@link AbstractStormSpoutWrapper} or {@link StormBoltWrapper}. - * Returns zero for raw output type or a value within range [1;25] for output type {@link Tuple1} to {@link Tuple25} + * Computes the number of output attributes used by a {@link AbstractStormSpoutWrapper} or {@link + * StormBoltWrapper}. + * Returns zero for raw output type or a value within range [1;25] for output type {@link Tuple1} to {@link + * Tuple25} * . In case of a data sink, {@code -1} is returned. . - * + * * @param spoutOrBolt - * The Storm {@link IRichSpout spout} or {@link IRichBolt bolt} to be used. + * The Storm {@link IRichSpout spout} or {@link IRichBolt bolt} to be used. * @param rawOutput - * Set to {@code true} if a single attribute output stream, should not be of type {@link Tuple1} but be - * of a raw type. + * Set to {@code true} if a single attribute output stream, should not be of type {@link Tuple1} but be + * of a raw type. * @return The number of attributes to be used. * @throws IllegalArgumentException - * If {@code rawOuput} is {@code true} and the number of declared output attributes is not 1 or if - * {@code rawOuput} is {@code false} and the number of declared output attributes is not with range - * [1;25]. + * If {@code rawOuput} is {@code true} and the number of declared output attributes is not 1 or if + * {@code rawOuput} is {@code false} and the number of declared output attributes is not with range + * [1;25]. */ public static int getNumberOfAttributes(final IComponent spoutOrBolt, final boolean rawOutput) - throws IllegalArgumentException { + throws IllegalArgumentException { final StormOutputFieldsDeclarer declarer = new StormOutputFieldsDeclarer(); spoutOrBolt.declareOutputFields(declarer); - + final int declaredNumberOfAttributes = declarer.getNumberOfAttributes(); - - if(declaredNumberOfAttributes == -1) { + + if (declaredNumberOfAttributes == -1) { return -1; } - - if((declaredNumberOfAttributes < 1) || (declaredNumberOfAttributes > 25)) { + + if ((declaredNumberOfAttributes < 1) || (declaredNumberOfAttributes > 25)) { throw new IllegalArgumentException( - "Provided bolt declares non supported number of output attributes. Must be in range [1;25] but was " - + declaredNumberOfAttributes); + "Provided bolt declares non supported number of output attributes. Must be in range [1;25] but " + + "was " + + declaredNumberOfAttributes); } - - if(rawOutput) { - if(declaredNumberOfAttributes > 1) { + + if (rawOutput) { + if (declaredNumberOfAttributes > 1) { throw new IllegalArgumentException( - "Ouput type is requested to be raw type, but provided bolt declares more then one output attribute."); - + "Ouput type is requested to be raw type, but provided bolt declares more then one output " + + "attribute."); + } return 0; } - + return declaredNumberOfAttributes; } - - /** - * TODO - */ - public static TopologyContext convertToTopologyContext(final StreamingRuntimeContext context, final boolean spoutOrBolt) { - final Integer taskId = new Integer(1 + context.getIndexOfThisSubtask()); - + + // TODO + public static TopologyContext convertToTopologyContext(final StreamingRuntimeContext context, + final boolean spoutOrBolt) { + final Integer taskId = 1 + context.getIndexOfThisSubtask(); + final Map taskToComponents = new HashMap(); taskToComponents.put(taskId, context.getTaskName()); - + final ComponentCommon common = new ComponentCommon(); common.set_parallelism_hint(context.getNumberOfParallelSubtasks()); - + final Map bolts = new HashMap(); final Map spoutSpecs = new HashMap(); - - if(spoutOrBolt) { + + if (spoutOrBolt) { spoutSpecs.put(context.getTaskName(), new SpoutSpec(null, common)); } else { bolts.put(context.getTaskName(), new Bolt(null, common)); } - + return new FlinkTopologyContext(new StormTopology(spoutSpecs, bolts, null), taskToComponents, taskId); } - + } http://git-wip-us.apache.org/repos/asf/flink/blob/e497a831/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/api/FlinkOutputFieldsDeclarerTest.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/api/FlinkOutputFieldsDeclarerTest.java b/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/api/FlinkOutputFieldsDeclarerTest.java index 1902835..b55e1ef 100644 --- a/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/api/FlinkOutputFieldsDeclarerTest.java +++ b/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/api/FlinkOutputFieldsDeclarerTest.java @@ -14,164 +14,159 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.flink.stormcompatibility.api; -import java.util.LinkedList; +package org.apache.flink.stormcompatibility.api; +import backtype.storm.tuple.Fields; +import backtype.storm.utils.Utils; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.stormcompatibility.util.AbstractTest; import org.junit.Assert; import org.junit.Test; -import backtype.storm.tuple.Fields; -import backtype.storm.utils.Utils; - - - - +import java.util.LinkedList; public class FlinkOutputFieldsDeclarerTest extends AbstractTest { - + @Test public void testDeclare() { - for(int i = 0; i < 4; ++i) { - for(int j = 0; j <= 25; ++j) { + for (int i = 0; i < 4; ++i) { + for (int j = 0; j <= 25; ++j) { this.runDeclareTest(i, j); } } } - + @Test(expected = IllegalArgumentException.class) public void testDeclareSimpleToManyAttributes() { this.runDeclareTest(0, 26); } - + @Test(expected = IllegalArgumentException.class) public void testDeclareNonDirectToManyAttributes() { this.runDeclareTest(1, 26); } - + @Test(expected = IllegalArgumentException.class) public void testDeclareDefaultStreamToManyAttributes() { this.runDeclareTest(2, 26); } - + @Test(expected = IllegalArgumentException.class) public void testDeclareFullToManyAttributes() { this.runDeclareTest(3, 26); } - + private void runDeclareTest(final int testCase, final int numberOfAttributes) { final FlinkOutputFieldsDeclarer declarere = new FlinkOutputFieldsDeclarer(); - + final String[] attributes = new String[numberOfAttributes]; - for(int i = 0; i < numberOfAttributes; ++i) { + for (int i = 0; i < numberOfAttributes; ++i) { attributes[i] = "a" + i; } - - switch(testCase) { - case 0: - this.declareSimple(declarere, attributes); - break; - case 1: - this.declareNonDirect(declarere, attributes); - break; - case 2: - this.declareDefaultStream(declarere, attributes); - break; - default: - this.declareFull(declarere, attributes); + + switch (testCase) { + case 0: + this.declareSimple(declarere, attributes); + break; + case 1: + this.declareNonDirect(declarere, attributes); + break; + case 2: + this.declareDefaultStream(declarere, attributes); + break; + default: + this.declareFull(declarere, attributes); } - - + final TypeInformation type = declarere.getOutputType(); - - if(numberOfAttributes == 0) { + + if (numberOfAttributes == 0) { Assert.assertNull(type); } else { Assert.assertEquals(numberOfAttributes, type.getArity()); - if(numberOfAttributes == 1) { + if (numberOfAttributes == 1) { Assert.assertFalse(type.isTupleType()); } else { Assert.assertTrue(type.isTupleType()); } } } - + private void declareSimple(final FlinkOutputFieldsDeclarer declarere, final String[] attributes) { declarere.declare(new Fields(attributes)); } - + private void declareNonDirect(final FlinkOutputFieldsDeclarer declarere, final String[] attributes) { declarere.declare(false, new Fields(attributes)); } - + private void declareDefaultStream(final FlinkOutputFieldsDeclarer declarere, final String[] attributes) { declarere.declareStream(Utils.DEFAULT_STREAM_ID, new Fields(attributes)); } - + private void declareFull(final FlinkOutputFieldsDeclarer declarere, final String[] attributes) { declarere.declareStream(Utils.DEFAULT_STREAM_ID, false, new Fields(attributes)); } - + @Test(expected = UnsupportedOperationException.class) public void testDeclareDirect() { new FlinkOutputFieldsDeclarer().declare(true, null); } - + @Test(expected = UnsupportedOperationException.class) public void testDeclareNonDefaultStrem() { new FlinkOutputFieldsDeclarer().declareStream("dummy", null); } - + @Test(expected = UnsupportedOperationException.class) public void testDeclareDirect2() { new FlinkOutputFieldsDeclarer().declareStream(Utils.DEFAULT_STREAM_ID, true, null); } - + @Test(expected = UnsupportedOperationException.class) public void testDeclareNonDefaultStrem2() { new FlinkOutputFieldsDeclarer().declareStream("dummy", this.r.nextBoolean(), null); } - + @Test public void testGetGroupingFieldIndexes() { final int numberOfAttributes = 5 + this.r.nextInt(21); final String[] attributes = new String[numberOfAttributes]; - for(int i = 0; i < numberOfAttributes; ++i) { + for (int i = 0; i < numberOfAttributes; ++i) { attributes[i] = "a" + i; } - + final FlinkOutputFieldsDeclarer declarere = new FlinkOutputFieldsDeclarer(); declarere.declare(new Fields(attributes)); - + final int numberOfKeys = 1 + this.r.nextInt(25); final LinkedList groupingFields = new LinkedList(); final boolean[] indexes = new boolean[numberOfAttributes]; - - for(int i = 0; i < numberOfAttributes; ++i) { - if(this.r.nextInt(26) < numberOfKeys) { + + for (int i = 0; i < numberOfAttributes; ++i) { + if (this.r.nextInt(26) < numberOfKeys) { groupingFields.add(attributes[i]); indexes[i] = true; } else { indexes[i] = false; } } - + final int[] expectedResult = new int[groupingFields.size()]; int j = 0; - for(int i = 0; i < numberOfAttributes; ++i) { - if(indexes[i]) { + for (int i = 0; i < numberOfAttributes; ++i) { + if (indexes[i]) { expectedResult[j++] = i; } } - + final int[] result = declarere.getGroupingFieldIndexes(groupingFields); - + Assert.assertEquals(expectedResult.length, result.length); - for(int i = 0; i < expectedResult.length; ++i) { + for (int i = 0; i < expectedResult.length; ++i) { Assert.assertEquals(expectedResult[i], result[i]); } } - + } http://git-wip-us.apache.org/repos/asf/flink/blob/e497a831/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/api/FlinkTopologyContextTest.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/api/FlinkTopologyContextTest.java b/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/api/FlinkTopologyContextTest.java index 029a400..d214610 100644 --- a/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/api/FlinkTopologyContextTest.java +++ b/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/api/FlinkTopologyContextTest.java @@ -14,65 +14,61 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.flink.stormcompatibility.api; -import org.junit.Test; +package org.apache.flink.stormcompatibility.api; import backtype.storm.metric.api.ICombiner; import backtype.storm.metric.api.IMetric; import backtype.storm.metric.api.IReducer; - - - - +import org.junit.Test; public class FlinkTopologyContextTest { - + @Test(expected = UnsupportedOperationException.class) public void testAddTaskHook() { new FlinkTopologyContext(null, null, null).addTaskHook(null); } - + @Test(expected = UnsupportedOperationException.class) public void testGetHooks() { new FlinkTopologyContext(null, null, null).getHooks(); } - + @SuppressWarnings("rawtypes") @Test(expected = UnsupportedOperationException.class) public void testRegisteredMetric1() { - new FlinkTopologyContext(null, null, null).registerMetric(null, (ICombiner)null, 0); + new FlinkTopologyContext(null, null, null).registerMetric(null, (ICombiner) null, 0); } - + @SuppressWarnings("rawtypes") @Test(expected = UnsupportedOperationException.class) public void testRegisteredMetric2() { - new FlinkTopologyContext(null, null, null).registerMetric(null, (IReducer)null, 0); + new FlinkTopologyContext(null, null, null).registerMetric(null, (IReducer) null, 0); } - + @Test(expected = UnsupportedOperationException.class) public void testRegisteredMetric3() { - new FlinkTopologyContext(null, null, null).registerMetric(null, (IMetric)null, 0); + new FlinkTopologyContext(null, null, null).registerMetric(null, (IMetric) null, 0); } - + @Test(expected = UnsupportedOperationException.class) public void testGetRegisteredMetricByName() { new FlinkTopologyContext(null, null, null).getRegisteredMetricByName(null); } - + @Test(expected = UnsupportedOperationException.class) public void testSetAllSubscribedState() { new FlinkTopologyContext(null, null, null).setAllSubscribedState(null); } - + @Test(expected = UnsupportedOperationException.class) public void testSetSubscribedState1() { new FlinkTopologyContext(null, null, null).setSubscribedState(null, null); } - + @Test(expected = UnsupportedOperationException.class) public void testSetSubscribedState2() { new FlinkTopologyContext(null, null, null).setSubscribedState(null, null, null); } - + } http://git-wip-us.apache.org/repos/asf/flink/blob/e497a831/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/api/FlinkTopologyTest.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/api/FlinkTopologyTest.java b/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/api/FlinkTopologyTest.java index 4f1f40f..f179919 100644 --- a/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/api/FlinkTopologyTest.java +++ b/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/api/FlinkTopologyTest.java @@ -14,52 +14,49 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.stormcompatibility.api; import org.junit.Assert; import org.junit.Test; - - - - public class FlinkTopologyTest { - + @Test public void testDefaultParallelism() { final FlinkTopology topology = new FlinkTopology(null); Assert.assertEquals(1, topology.getParallelism()); } - + @Test(expected = UnsupportedOperationException.class) public void testExecute() throws Exception { new FlinkTopology(null).execute(); } - + @Test(expected = UnsupportedOperationException.class) public void testExecuteWithName() throws Exception { new FlinkTopology(null).execute(null); } - + @Test public void testNumberOfTasks() { final FlinkTopology topology = new FlinkTopology(null); - + Assert.assertEquals(0, topology.getNumberOfTasks()); - + topology.increaseNumberOfTasks(3); Assert.assertEquals(3, topology.getNumberOfTasks()); - + topology.increaseNumberOfTasks(2); Assert.assertEquals(5, topology.getNumberOfTasks()); - + topology.increaseNumberOfTasks(8); Assert.assertEquals(13, topology.getNumberOfTasks()); } - + @Test(expected = AssertionError.class) public void testAssert() { new FlinkTopology(null).increaseNumberOfTasks(0); } - + } http://git-wip-us.apache.org/repos/asf/flink/blob/e497a831/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/util/AbstractTest.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/util/AbstractTest.java b/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/util/AbstractTest.java index 100ca87..94a50cf 100644 --- a/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/util/AbstractTest.java +++ b/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/util/AbstractTest.java @@ -14,29 +14,26 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.flink.stormcompatibility.util; -import java.util.Random; +package org.apache.flink.stormcompatibility.util; import org.junit.Before; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - - - +import java.util.Random; public abstract class AbstractTest { private static final Logger LOG = LoggerFactory.getLogger(AbstractTest.class); - + protected long seed; protected Random r; - + @Before public void prepare() { this.seed = System.currentTimeMillis(); this.r = new Random(this.seed); LOG.info("Test seed: {}", new Long(this.seed)); } - + } http://git-wip-us.apache.org/repos/asf/flink/blob/e497a831/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/wrappers/FiniteTestSpout.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/wrappers/FiniteTestSpout.java b/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/wrappers/FiniteTestSpout.java index 52136a9..4182ad0 100644 --- a/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/wrappers/FiniteTestSpout.java +++ b/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/wrappers/FiniteTestSpout.java @@ -14,9 +14,8 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.flink.stormcompatibility.wrappers; -import java.util.Map; +package org.apache.flink.stormcompatibility.wrappers; import backtype.storm.spout.SpoutOutputCollector; import backtype.storm.task.TopologyContext; @@ -25,59 +24,55 @@ import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values; - - - +import java.util.Map; class FiniteTestSpout implements IRichSpout { private static final long serialVersionUID = 7992419478267824279L; - + private int numberOfOutputTuples; private SpoutOutputCollector collector; - - - + public FiniteTestSpout(final int numberOfOutputTuples) { this.numberOfOutputTuples = numberOfOutputTuples; } - - - + + @SuppressWarnings("rawtypes") @Override - public void open(@SuppressWarnings("rawtypes") final Map conf, final TopologyContext context, @SuppressWarnings("hiding") final SpoutOutputCollector collector) { + public void open(final Map conf, final TopologyContext context, + final SpoutOutputCollector collector) { this.collector = collector; } - + @Override public void close() {/* nothing to do */} - + @Override public void activate() {/* nothing to do */} - + @Override public void deactivate() {/* nothing to do */} - + @Override public void nextTuple() { - if(--this.numberOfOutputTuples >= 0) { - this.collector.emit(new Values(new Integer(this.numberOfOutputTuples))); + if (--this.numberOfOutputTuples >= 0) { + this.collector.emit(new Values(this.numberOfOutputTuples)); } } - + @Override public void ack(final Object msgId) {/* nothing to do */} - + @Override public void fail(final Object msgId) {/* nothing to do */} - + @Override public void declareOutputFields(final OutputFieldsDeclarer declarer) { declarer.declare(new Fields("dummy")); } - + @Override public Map getComponentConfiguration() { return null; } - + } http://git-wip-us.apache.org/repos/asf/flink/blob/e497a831/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/wrappers/FlinkDummyRichFunctionTest.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/wrappers/FlinkDummyRichFunctionTest.java b/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/wrappers/FlinkDummyRichFunctionTest.java index 7ecc98c..2c2a221 100644 --- a/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/wrappers/FlinkDummyRichFunctionTest.java +++ b/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/wrappers/FlinkDummyRichFunctionTest.java @@ -14,28 +14,25 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.flink.stormcompatibility.wrappers; -import static org.mockito.Mockito.mock; +package org.apache.flink.stormcompatibility.wrappers; import org.apache.flink.api.common.functions.RuntimeContext; import org.junit.Assert; import org.junit.Test; - - - +import static org.mockito.Mockito.mock; public class FlinkDummyRichFunctionTest { - + @Test public void testRuntimeContext() { final FlinkDummyRichFunction dummy = new FlinkDummyRichFunction(); - + final RuntimeContext context = mock(RuntimeContext.class); dummy.setRuntimeContext(context); - + Assert.assertSame(context, dummy.getRuntimeContext()); } - + } http://git-wip-us.apache.org/repos/asf/flink/blob/e497a831/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormBoltWrapperTest.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormBoltWrapperTest.java b/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormBoltWrapperTest.java index 50ae763..f2cfe59 100644 --- a/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormBoltWrapperTest.java +++ b/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormBoltWrapperTest.java @@ -17,16 +17,10 @@ package org.apache.flink.stormcompatibility.wrappers; -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.eq; -import static org.mockito.Matchers.isNull; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - -import java.util.Map; - +import backtype.storm.task.OutputCollector; +import backtype.storm.task.TopologyContext; +import backtype.storm.topology.IRichBolt; +import backtype.storm.tuple.Fields; import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.runtime.io.IndexedReaderIterator; @@ -41,177 +35,168 @@ import org.powermock.api.mockito.PowerMockito; import org.powermock.core.classloader.annotations.PrepareForTest; import org.powermock.modules.junit4.PowerMockRunner; -import backtype.storm.task.OutputCollector; -import backtype.storm.task.TopologyContext; -import backtype.storm.topology.IRichBolt; -import backtype.storm.tuple.Fields; - - - +import java.util.Map; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.eq; +import static org.mockito.Matchers.isNull; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; @RunWith(PowerMockRunner.class) @PrepareForTest({StreamRecordSerializer.class, StormWrapperSetupHelper.class}) public class StormBoltWrapperTest { - - @SuppressWarnings("unused") + @Test(expected = IllegalArgumentException.class) public void testWrapperRawType() throws Exception { final StormOutputFieldsDeclarer declarer = new StormOutputFieldsDeclarer(); declarer.declare(new Fields("dummy1", "dummy2")); PowerMockito.whenNew(StormOutputFieldsDeclarer.class).withNoArguments().thenReturn(declarer); - + new StormBoltWrapper(mock(IRichBolt.class), true); } - - @SuppressWarnings("unused") + @Test(expected = IllegalArgumentException.class) public void testWrapperToManyAttributes1() throws Exception { final StormOutputFieldsDeclarer declarer = new StormOutputFieldsDeclarer(); final String[] schema = new String[26]; - for(int i = 0; i < schema.length; ++i) { + for (int i = 0; i < schema.length; ++i) { schema[i] = "a" + i; } declarer.declare(new Fields(schema)); PowerMockito.whenNew(StormOutputFieldsDeclarer.class).withNoArguments().thenReturn(declarer); - + new StormBoltWrapper(mock(IRichBolt.class)); } - - @SuppressWarnings("unused") + @Test(expected = IllegalArgumentException.class) public void testWrapperToManyAttributes2() throws Exception { final StormOutputFieldsDeclarer declarer = new StormOutputFieldsDeclarer(); final String[] schema = new String[26]; - for(int i = 0; i < schema.length; ++i) { + for (int i = 0; i < schema.length; ++i) { schema[i] = "a" + i; } declarer.declare(new Fields(schema)); PowerMockito.whenNew(StormOutputFieldsDeclarer.class).withNoArguments().thenReturn(declarer); - + new StormBoltWrapper(mock(IRichBolt.class), false); } - + @Test public void testWrapper() throws Exception { - for(int i = 0; i < 26; ++i) { + for (int i = 0; i < 26; ++i) { this.testWrapper(i); } } - + @SuppressWarnings({"rawtypes", "unchecked"}) private void testWrapper(final int numberOfAttributes) throws Exception { assert ((0 <= numberOfAttributes) && (numberOfAttributes <= 25)); - + Tuple flinkTuple = null; String rawTuple = null; - - if(numberOfAttributes == 0) { - rawTuple = new String("test"); + + if (numberOfAttributes == 0) { + rawTuple = "test"; } else { flinkTuple = Tuple.getTupleClass(numberOfAttributes).newInstance(); } - + String[] schema = new String[numberOfAttributes]; - if(numberOfAttributes == 0) { + if (numberOfAttributes == 0) { schema = new String[1]; } - for(int i = 0; i < schema.length; ++i) { + for (int i = 0; i < schema.length; ++i) { schema[i] = "a" + i; } - - + final StreamRecord record = mock(StreamRecord.class); - if(numberOfAttributes == 0) { + if (numberOfAttributes == 0) { when(record.getObject()).thenReturn(rawTuple); } else { when(record.getObject()).thenReturn(flinkTuple); } - + final StreamRecordSerializer serializer = mock(StreamRecordSerializer.class); when(serializer.createInstance()).thenReturn(record); - + final IndexedReaderIterator reader = mock(IndexedReaderIterator.class); when(reader.next(record)).thenReturn(record).thenReturn(null); - + final StreamTaskContext taskContext = mock(StreamTaskContext.class); when(taskContext.getInputSerializer(0)).thenReturn(serializer); when(taskContext.getIndexedInput(0)).thenReturn(reader); - - - + final IRichBolt bolt = mock(IRichBolt.class); - + final StormOutputFieldsDeclarer declarer = new StormOutputFieldsDeclarer(); declarer.declare(new Fields(schema)); PowerMockito.whenNew(StormOutputFieldsDeclarer.class).withNoArguments().thenReturn(declarer); - + final StormBoltWrapper wrapper = new StormBoltWrapper(bolt); wrapper.setup(taskContext); - - - + wrapper.callUserFunction(); - if(numberOfAttributes == 0) { + if (numberOfAttributes == 0) { verify(bolt).execute(eq(new StormTuple(rawTuple))); } else { verify(bolt).execute(eq(new StormTuple(flinkTuple))); } - - - + wrapper.run(); - if(numberOfAttributes == 0) { + if (numberOfAttributes == 0) { verify(bolt, times(2)).execute(eq(new StormTuple(rawTuple))); } else { verify(bolt, times(2)).execute(eq(new StormTuple(flinkTuple))); } } - + @Test public void testOpen() throws Exception { final IRichBolt bolt = mock(IRichBolt.class); - + final StormOutputFieldsDeclarer declarer = new StormOutputFieldsDeclarer(); declarer.declare(new Fields("dummy")); PowerMockito.whenNew(StormOutputFieldsDeclarer.class).withNoArguments().thenReturn(declarer); - + final StormBoltWrapper wrapper = new StormBoltWrapper(bolt); wrapper.setRuntimeContext(mock(StreamingRuntimeContext.class)); - + wrapper.open(mock(Configuration.class)); - + verify(bolt).prepare(any(Map.class), any(TopologyContext.class), any(OutputCollector.class)); } - + @Test public void testOpenSink() throws Exception { final IRichBolt bolt = mock(IRichBolt.class); final StormBoltWrapper wrapper = new StormBoltWrapper(bolt); wrapper.setRuntimeContext(mock(StreamingRuntimeContext.class)); - + wrapper.open(mock(Configuration.class)); - + verify(bolt).prepare(any(Map.class), any(TopologyContext.class), isNull(OutputCollector.class)); } - + @SuppressWarnings("unchecked") @Test public void testClose() throws Exception { final IRichBolt bolt = mock(IRichBolt.class); - + final StormOutputFieldsDeclarer declarer = new StormOutputFieldsDeclarer(); declarer.declare(new Fields("dummy")); PowerMockito.whenNew(StormOutputFieldsDeclarer.class).withNoArguments().thenReturn(declarer); - + final StormBoltWrapper wrapper = new StormBoltWrapper(bolt); - + final StreamTaskContext taskContext = mock(StreamTaskContext.class); when(taskContext.getOutputCollector()).thenReturn(mock(Collector.class)); wrapper.setup(taskContext); - + wrapper.close(); verify(bolt).cleanup(); } - + } http://git-wip-us.apache.org/repos/asf/flink/blob/e497a831/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormCollectorTest.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormCollectorTest.java b/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormCollectorTest.java index 660fcea..925da04 100644 --- a/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormCollectorTest.java +++ b/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormCollectorTest.java @@ -17,109 +17,104 @@ package org.apache.flink.stormcompatibility.wrappers; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.verify; - -import java.util.Collection; -import java.util.List; - +import backtype.storm.tuple.Values; import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.stormcompatibility.util.AbstractTest; import org.apache.flink.util.Collector; import org.junit.Assert; import org.junit.Test; -import backtype.storm.tuple.Values; - - - +import java.util.Collection; +import java.util.List; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; public class StormCollectorTest extends AbstractTest { - + @Test public void testSpoutStormCollector() throws InstantiationException, IllegalAccessException { - for(int i = 0; i < 26; ++i) { + for (int i = 0; i < 26; ++i) { this.testStromCollector(true, i); } } - + @Test public void testBoltStormCollector() throws InstantiationException, IllegalAccessException { - for(int i = 0; i < 26; ++i) { + for (int i = 0; i < 26; ++i) { this.testStromCollector(false, i); } } - - @SuppressWarnings({"unchecked", "rawtypes"}) + + @SuppressWarnings({"rawtypes", "unchecked"}) private void testStromCollector(final boolean spoutTest, final int numberOfAttributes) - throws InstantiationException, IllegalAccessException { + throws InstantiationException, IllegalAccessException { assert ((0 <= numberOfAttributes) && (numberOfAttributes <= 25)); - + final Collector flinkCollector = mock(Collector.class); Tuple flinkTuple = null; final Values tuple = new Values(); - - StormCollector collector = null; - - if(numberOfAttributes == 0) { + + StormCollector collector; + + if (numberOfAttributes == 0) { collector = new StormCollector(numberOfAttributes, flinkCollector); - tuple.add(new Integer(this.r.nextInt())); - + tuple.add(this.r.nextInt()); + } else { collector = new StormCollector(numberOfAttributes, flinkCollector); flinkTuple = Tuple.getTupleClass(numberOfAttributes).newInstance(); - - for(int i = 0; i < numberOfAttributes; ++i) { - tuple.add(new Integer(this.r.nextInt())); + + for (int i = 0; i < numberOfAttributes; ++i) { + tuple.add(this.r.nextInt()); flinkTuple.setField(tuple.get(i), i); } } - + final String streamId = "streamId"; final Collection anchors = mock(Collection.class); final List taskIds; - final Object messageId = new Integer(this.r.nextInt()); - if(spoutTest) { + final Object messageId = this.r.nextInt(); + if (spoutTest) { taskIds = collector.emit(streamId, tuple, messageId); } else { taskIds = collector.emit(streamId, anchors, tuple); } - + Assert.assertNull(taskIds); - - if(numberOfAttributes == 0) { + + if (numberOfAttributes == 0) { verify(flinkCollector).collect(tuple.get(0)); } else { verify(flinkCollector).collect(flinkTuple); } } - + @Test(expected = UnsupportedOperationException.class) public void testReportError() { new StormCollector(1, null).reportError(null); } - + @SuppressWarnings({"rawtypes", "unchecked"}) @Test(expected = UnsupportedOperationException.class) public void testBoltEmitDirect() { - new StormCollector(1, null).emitDirect(0, (String)null, (Collection)null, (List)null); + new StormCollector(1, null).emitDirect(0, null, (Collection) null, null); } - - @SuppressWarnings({"rawtypes", "unchecked"}) + + @SuppressWarnings("unchecked") @Test(expected = UnsupportedOperationException.class) public void testSpoutEmitDirect() { - new StormCollector(1, null).emitDirect(0, (String)null, (List)null, (Object)null); + new StormCollector(1, null).emitDirect(0, null, null, (Object) null); } - + @Test(expected = UnsupportedOperationException.class) public void testAck() { new StormCollector(1, null).ack(null); } - + @Test(expected = UnsupportedOperationException.class) public void testFail() { new StormCollector(1, null).fail(null); } - + } http://git-wip-us.apache.org/repos/asf/flink/blob/e497a831/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormFiniteSpoutWrapperTest.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormFiniteSpoutWrapperTest.java b/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormFiniteSpoutWrapperTest.java index 3c4f5aa..0c5b124 100644 --- a/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormFiniteSpoutWrapperTest.java +++ b/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormFiniteSpoutWrapperTest.java @@ -14,14 +14,11 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.flink.stormcompatibility.wrappers; - -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import java.util.LinkedList; +package org.apache.flink.stormcompatibility.wrappers; +import backtype.storm.topology.IRichSpout; +import backtype.storm.tuple.Fields; import org.apache.flink.api.java.tuple.Tuple1; import org.apache.flink.stormcompatibility.util.AbstractTest; import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext; @@ -32,80 +29,79 @@ import org.powermock.api.mockito.PowerMockito; import org.powermock.core.classloader.annotations.PrepareForTest; import org.powermock.modules.junit4.PowerMockRunner; -import backtype.storm.topology.IRichSpout; -import backtype.storm.tuple.Fields; - - - +import java.util.LinkedList; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; @RunWith(PowerMockRunner.class) @PrepareForTest(StormWrapperSetupHelper.class) public class StormFiniteSpoutWrapperTest extends AbstractTest { - + @Test public void testRunExecuteFixedNumber() throws Exception { final StormOutputFieldsDeclarer declarer = new StormOutputFieldsDeclarer(); declarer.declare(new Fields("dummy")); PowerMockito.whenNew(StormOutputFieldsDeclarer.class).withNoArguments().thenReturn(declarer); - + final IRichSpout spout = mock(IRichSpout.class); final int numberOfCalls = this.r.nextInt(50); final StormFiniteSpoutWrapper spoutWrapper = new StormFiniteSpoutWrapper(spout, numberOfCalls); spoutWrapper.setRuntimeContext(mock(StreamingRuntimeContext.class)); - + spoutWrapper.run(null); verify(spout, times(numberOfCalls)).nextTuple(); } - + @Test public void testRunExecute() throws Exception { final int numberOfCalls = this.r.nextInt(50); - + final LinkedList> expectedResult = new LinkedList>(); - for(int i = numberOfCalls - 1; i >= 0; --i) { - expectedResult.add(new Tuple1(new Integer(i))); + for (int i = numberOfCalls - 1; i >= 0; --i) { + expectedResult.add(new Tuple1(i)); } - + final IRichSpout spout = new FiniteTestSpout(numberOfCalls); final StormFiniteSpoutWrapper> spoutWrapper = new StormFiniteSpoutWrapper>( - spout); + spout); spoutWrapper.setRuntimeContext(mock(StreamingRuntimeContext.class)); - + final TestCollector collector = new TestCollector(); spoutWrapper.run(collector); - + Assert.assertEquals(expectedResult, collector.result); } - + @Test public void testCancel() throws Exception { final int numberOfCalls = 5 + this.r.nextInt(5); - + final LinkedList> expectedResult = new LinkedList>(); - expectedResult.add(new Tuple1(new Integer(numberOfCalls - 1))); - + expectedResult.add(new Tuple1(numberOfCalls - 1)); + final IRichSpout spout = new FiniteTestSpout(numberOfCalls); final StormFiniteSpoutWrapper> spoutWrapper = new StormFiniteSpoutWrapper>( - spout); + spout); spoutWrapper.setRuntimeContext(mock(StreamingRuntimeContext.class)); - + spoutWrapper.cancel(); final TestCollector collector = new TestCollector(); spoutWrapper.run(collector); - + Assert.assertEquals(expectedResult, collector.result); } - + @Test public void testClose() throws Exception { final IRichSpout spout = mock(IRichSpout.class); final StormFiniteSpoutWrapper> spoutWrapper = new StormFiniteSpoutWrapper>( - spout); - + spout); + spoutWrapper.close(); - + verify(spout).close(); } - + } http://git-wip-us.apache.org/repos/asf/flink/blob/e497a831/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormOutputFieldsDeclarerTest.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormOutputFieldsDeclarerTest.java b/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormOutputFieldsDeclarerTest.java index 5a4c05b..a72eb19 100644 --- a/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormOutputFieldsDeclarerTest.java +++ b/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormOutputFieldsDeclarerTest.java @@ -17,66 +17,64 @@ package org.apache.flink.stormcompatibility.wrappers; -import java.util.ArrayList; - +import backtype.storm.tuple.Fields; +import backtype.storm.utils.Utils; import org.apache.flink.stormcompatibility.util.AbstractTest; import org.junit.Assert; import org.junit.Test; -import backtype.storm.tuple.Fields; -import backtype.storm.utils.Utils; - - - - +import java.util.ArrayList; public class StormOutputFieldsDeclarerTest extends AbstractTest { - + @Test public void testDeclare() { final StormOutputFieldsDeclarer declarer = new StormOutputFieldsDeclarer(); - + Assert.assertEquals(-1, declarer.getNumberOfAttributes()); - + final int numberOfAttributes = 1 + this.r.nextInt(25); final ArrayList schema = new ArrayList(numberOfAttributes); - for(int i = 0; i < numberOfAttributes; ++i) { + for (int i = 0; i < numberOfAttributes; ++i) { schema.add("a" + i); } declarer.declare(new Fields(schema)); Assert.assertEquals(numberOfAttributes, declarer.getNumberOfAttributes()); } - + + @SuppressWarnings("unused") public void testDeclareDirect() { new StormOutputFieldsDeclarer().declare(false, null); } - + @Test(expected = UnsupportedOperationException.class) public void testDeclareDirectFail() { new StormOutputFieldsDeclarer().declare(true, null); } - + + @SuppressWarnings("unused") public void testDeclareStream() { new StormOutputFieldsDeclarer().declareStream(Utils.DEFAULT_STREAM_ID, null); } - + @Test(expected = UnsupportedOperationException.class) public void testDeclareStreamFail() { new StormOutputFieldsDeclarer().declareStream(null, null); } - + + @SuppressWarnings("unused") public void testDeclareFullStream() { new StormOutputFieldsDeclarer().declareStream(Utils.DEFAULT_STREAM_ID, false, null); } - + @Test(expected = UnsupportedOperationException.class) public void testDeclareFullStreamFailNonDefaultStream() { new StormOutputFieldsDeclarer().declareStream(null, false, null); } - + @Test(expected = UnsupportedOperationException.class) public void testDeclareFullStreamFailDirect() { new StormOutputFieldsDeclarer().declareStream(Utils.DEFAULT_STREAM_ID, true, null); } - + } http://git-wip-us.apache.org/repos/asf/flink/blob/e497a831/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormSpoutWrapperTest.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormSpoutWrapperTest.java b/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormSpoutWrapperTest.java index 88d855c..48f680a 100644 --- a/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormSpoutWrapperTest.java +++ b/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormSpoutWrapperTest.java @@ -14,13 +14,10 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.flink.stormcompatibility.wrappers; - -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.verify; -import java.util.LinkedList; +package org.apache.flink.stormcompatibility.wrappers; +import backtype.storm.topology.IRichSpout; import org.apache.flink.api.java.tuple.Tuple1; import org.apache.flink.stormcompatibility.util.AbstractTest; import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext; @@ -30,39 +27,38 @@ import org.junit.runner.RunWith; import org.powermock.core.classloader.annotations.PrepareForTest; import org.powermock.modules.junit4.PowerMockRunner; -import backtype.storm.topology.IRichSpout; - - - +import java.util.LinkedList; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; @RunWith(PowerMockRunner.class) @PrepareForTest(StormWrapperSetupHelper.class) public class StormSpoutWrapperTest extends AbstractTest { - + @Test public void testRunExecuteCancelInfinite() throws Exception { final int numberOfCalls = 5 + this.r.nextInt(5); - + final IRichSpout spout = new FiniteTestSpout(numberOfCalls); final StormSpoutWrapper> spoutWrapper = new StormSpoutWrapper>(spout); spoutWrapper.setRuntimeContext(mock(StreamingRuntimeContext.class)); - + spoutWrapper.cancel(); final TestCollector collector = new TestCollector(); spoutWrapper.run(collector); - + Assert.assertEquals(new LinkedList>(), collector.result); } - + @Test public void testClose() throws Exception { final IRichSpout spout = mock(IRichSpout.class); final StormSpoutWrapper> spoutWrapper = new StormSpoutWrapper>(spout); - + spoutWrapper.close(); - + verify(spout).close(); } - + }