flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mbala...@apache.org
Subject [2/2] flink git commit: [FLINK-2243] [storm-compat] Added finite spout functionality to Storm compatibility layer
Date Fri, 07 Aug 2015 12:51:03 GMT
[FLINK-2243] [storm-compat] Added finite spout functionality to Storm compatibility layer


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6d9eeb55
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6d9eeb55
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6d9eeb55

Branch: refs/heads/master
Commit: 6d9eeb559a895d92bb0a71c6535c55dfb49a16cb
Parents: f1dd914
Author: szape <nemderogatorius@gmail.com>
Authored: Fri Jun 19 10:22:04 2015 +0200
Committer: mbalassi <mbalassi@apache.org>
Committed: Fri Aug 7 14:50:02 2015 +0200

----------------------------------------------------------------------
 .../api/FlinkTopologyBuilder.java               | 13 ++-
 .../wrappers/AbstractStormSpoutWrapper.java     |  1 -
 .../wrappers/FiniteStormSpout.java              | 37 +++++++++
 .../wrappers/FiniteStormSpoutWrapper.java       | 87 ++++++++++++++++++++
 .../wrappers/FiniteStormSpoutWrapperTest.java   | 69 ++++++++++++++++
 5 files changed, 205 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/6d9eeb55/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkTopologyBuilder.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkTopologyBuilder.java
b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkTopologyBuilder.java
index 4ecf4a6..d146250 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkTopologyBuilder.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkTopologyBuilder.java
@@ -33,6 +33,9 @@ import backtype.storm.topology.TopologyBuilder;
 import backtype.storm.tuple.Fields;
 
 import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.stormcompatibility.wrappers.AbstractStormSpoutWrapper;
+import org.apache.flink.stormcompatibility.wrappers.FiniteStormSpout;
+import org.apache.flink.stormcompatibility.wrappers.FiniteStormSpoutWrapper;
 import org.apache.flink.stormcompatibility.wrappers.StormBoltWrapper;
 import org.apache.flink.stormcompatibility.wrappers.StormSpoutWrapper;
 import org.apache.flink.streaming.api.datastream.DataStream;
@@ -93,7 +96,15 @@ public class FlinkTopologyBuilder {
 			 * -> add an additional output attribute tagging the output stream, and use .split()
and .select() to split
 			 * the streams
 			 */
-			final DataStreamSource source = env.addSource(new StormSpoutWrapper(userSpout), declarer.getOutputType());
+			AbstractStormSpoutWrapper spoutWrapper;
+
+			if (userSpout instanceof FiniteStormSpout) {
+				spoutWrapper = new FiniteStormSpoutWrapper((FiniteStormSpout) userSpout);
+			} else {
+				spoutWrapper = new StormSpoutWrapper(userSpout);
+			}
+
+			final DataStreamSource source = env.addSource(spoutWrapper, declarer.getOutputType());
 			availableOperators.put(spoutId, source);
 
 			int dop = 1;

http://git-wip-us.apache.org/repos/asf/flink/blob/6d9eeb55/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/AbstractStormSpoutWrapper.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/AbstractStormSpoutWrapper.java
b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/AbstractStormSpoutWrapper.java
index 3021bcb..4e43a8a 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/AbstractStormSpoutWrapper.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/AbstractStormSpoutWrapper.java
@@ -19,7 +19,6 @@ package org.apache.flink.stormcompatibility.wrappers;
 
 import backtype.storm.spout.SpoutOutputCollector;
 import backtype.storm.topology.IRichSpout;
-
 import org.apache.flink.api.java.tuple.Tuple1;
 import org.apache.flink.api.java.tuple.Tuple25;
 import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;

http://git-wip-us.apache.org/repos/asf/flink/blob/6d9eeb55/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/FiniteStormSpout.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/FiniteStormSpout.java
b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/FiniteStormSpout.java
new file mode 100644
index 0000000..58a4f7a
--- /dev/null
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/FiniteStormSpout.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.stormcompatibility.wrappers;
+
+import backtype.storm.topology.IRichSpout;
+
+/**
+ * This interface represents a Storm spout that emits a finite number of records. Common
Storm
+ * spouts emit infinite streams by default. To change this behaviour and take advantage of
+ * Flink's finite-source capabilities, the spout should implement this interface. To wrap
+ * {@link FiniteStormSpout} separately, use {@link FiniteStormSpoutWrapper}.
+ */
+public interface FiniteStormSpout extends IRichSpout {
+
+	/**
+	 * When returns true, the spout has reached the end of the stream.
+	 *
+	 * @return true, if the spout's stream reached its end, false otherwise
+	 */
+	public boolean reachedEnd();
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/6d9eeb55/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/FiniteStormSpoutWrapper.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/FiniteStormSpoutWrapper.java
b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/FiniteStormSpoutWrapper.java
new file mode 100644
index 0000000..7913510
--- /dev/null
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/FiniteStormSpoutWrapper.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.stormcompatibility.wrappers;
+
+/**
+ * A {@link FiniteStormSpoutWrapper} is an {@link AbstractStormSpoutWrapper} that calls the
wrapped
+ * {@link FiniteStormSpout}'s {@link FiniteStormSpout#nextTuple()} method until {@link
+ * FiniteStormSpout#reachedEnd()} is true.
+ */
+public class FiniteStormSpoutWrapper<OUT> extends AbstractStormSpoutWrapper<OUT>
{
+	private static final long serialVersionUID = -218340336648247605L;
+
+	private FiniteStormSpout finiteSpout;
+
+	/**
+	 * Instantiates a new {@link FiniteStormSpoutWrapper} that wraps the given Storm {@link
+	 * FiniteStormSpout spout} such that it can be used within a Flink streaming program. The
+	 * output
+	 * type will be one of {@link Tuple1} to {@link Tuple25} depending on the spout's declared
+	 * number of attributes.
+	 *
+	 * @param spout
+	 * 		The Storm {@link FiniteStormSpout spout} to be used. @throws
+	 * 		IllegalArgumentException If
+	 * 		the number of declared output attributes is not with range [1;25].
+	 */
+	public FiniteStormSpoutWrapper(FiniteStormSpout spout)
+			throws IllegalArgumentException {
+		super(spout);
+		this.finiteSpout = spout;
+	}
+
+	/**
+	 * Instantiates a new {@link FiniteStormSpoutWrapper} that wraps the given Storm {@link
+	 * FiniteStormSpout spout} such that it can be used within a Flink streaming program. The
+	 * output
+	 * type can be any type if parameter {@code rawOutput} is {@code true} and the spout's
+	 * number of
+	 * declared output tuples is 1. If {@code rawOutput} is {@code false} the output type will
be
+	 * one of {@link Tuple1} to {@link Tuple25} depending on the spout's declared number of
+	 * attributes.
+	 *
+	 * @param spout
+	 * 		The Storm {@link FiniteStormSpout spout} to be used.
+	 * @param rawOutput
+	 * 		Set to {@code true} if a single attribute output stream, should not be of type {@link
+	 * 		Tuple1} but be of a raw type.
+	 * @throws IllegalArgumentException
+	 * 		If {@code rawOuput} is {@code true} and the number of declared output attributes is
+	 * 		not 1
+	 * 		or if {@code rawOuput} is {@code false} and the number of declared output attributes
+	 * 		is not
+	 * 		with range [1;25].
+	 */
+	public FiniteStormSpoutWrapper(final FiniteStormSpout spout, final boolean rawOutput)
+			throws IllegalArgumentException {
+		super(spout, rawOutput);
+		this.finiteSpout = spout;
+	}
+
+	/**
+	 * Calls the {@link FiniteStormSpout#nextTuple()} method until {@link
+	 * FiniteStormSpout#reachedEnd()} is true or {@link FiniteStormSpout#cancel()} is called.
+	 */
+	@Override
+	protected void execute() {
+		while (super.isRunning && !finiteSpout.reachedEnd()) {
+			finiteSpout.nextTuple();
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/6d9eeb55/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/FiniteStormSpoutWrapperTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/FiniteStormSpoutWrapperTest.java
b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/FiniteStormSpoutWrapperTest.java
new file mode 100644
index 0000000..776e65d
--- /dev/null
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/FiniteStormSpoutWrapperTest.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.stormcompatibility.wrappers;
+
+import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
+import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(StormWrapperSetupHelper.class)
+public class FiniteStormSpoutWrapperTest {
+
+	@SuppressWarnings("unchecked")
+	@Test
+	public void runAndExecuteTest1() throws Exception {
+
+		FiniteStormSpout stormSpout =
+				mock(FiniteStormSpout.class);
+		when(stormSpout.reachedEnd()).thenReturn(false, false, false, true, false, false, true);
+
+		FiniteStormSpoutWrapper<?> wrapper =
+				new FiniteStormSpoutWrapper<Object>(stormSpout);
+		wrapper.setRuntimeContext(mock(StreamingRuntimeContext.class));
+
+		wrapper.run(mock(SourceContext.class));
+		verify(stormSpout, times(3)).nextTuple();
+	}
+
+	@SuppressWarnings("unchecked")
+	@Test
+	public void runAndExecuteTest2() throws Exception {
+
+		FiniteStormSpout stormSpout =
+				mock(FiniteStormSpout.class);
+		when(stormSpout.reachedEnd()).thenReturn(true, false, true, false, true, false, true);
+
+		FiniteStormSpoutWrapper<?> wrapper =
+				new FiniteStormSpoutWrapper<Object>(stormSpout);
+		wrapper.setRuntimeContext(mock(StreamingRuntimeContext.class));
+
+		wrapper.run(mock(SourceContext.class));
+		verify(stormSpout, never()).nextTuple();
+	}
+
+}


Mime
View raw message