flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gyf...@apache.org
Subject flink git commit: [FLINK-2305] Add documenation about Storm compatibility layer additional improvements - added RawOutputFormatter - bug fix in SimpleOutputFormatter - enable default output formatter for StormBoltFileSink
Date Sat, 11 Jul 2015 12:20:53 GMT
Repository: flink
Updated Branches:
  refs/heads/master 3f3aeb7e0 -> ea4f339d7


[FLINK-2305] Add documenation about Storm compatibility layer
additional improvements
- added RawOutputFormatter
- bug fix in SimpleOutputFormatter
- enable default output formatter for StormBoltFileSink

Closes #884


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ea4f339d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ea4f339d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ea4f339d

Branch: refs/heads/master
Commit: ea4f339d76574c8499f3588524bb174ec3283a2d
Parents: 3f3aeb7
Author: mjsax <mjsax@informatik.hu-berlin.de>
Authored: Thu Jul 2 02:22:08 2015 +0200
Committer: Gyula Fora <gyfora@apache.org>
Committed: Sat Jul 11 14:19:43 2015 +0200

----------------------------------------------------------------------
 docs/_includes/navbar.html                      |   1 +
 docs/apis/storm_compatibility.md                | 155 +++++++++++++++++++
 .../excamation/ExclamationTopology.java         |   4 +-
 .../util/RawOutputFormatter.java                |  32 ++++
 .../util/SimpleOutputFormatter.java             |   2 +-
 .../util/StormBoltFileSink.java                 |   4 +
 6 files changed, 195 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ea4f339d/docs/_includes/navbar.html
----------------------------------------------------------------------
diff --git a/docs/_includes/navbar.html b/docs/_includes/navbar.html
index dc7ef30..cdd801f 100644
--- a/docs/_includes/navbar.html
+++ b/docs/_includes/navbar.html
@@ -81,6 +81,7 @@ under the License.
                 <li><a href="{{ apis }}/iterations.html">Iterations</a></li>
                 <li><a href="{{ apis }}/java8.html">Java 8</a></li>
                 <li><a href="{{ apis }}/hadoop_compatibility.html">Hadoop Compatability
<span class="badge">Beta</span></a></li>
+                <li><a href="{{ apis }}/storm_compatibility.html">Storm Compatability
<span class="badge">Beta</span></a></li>
               </ul>
             </li>
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ea4f339d/docs/apis/storm_compatibility.md
----------------------------------------------------------------------
diff --git a/docs/apis/storm_compatibility.md b/docs/apis/storm_compatibility.md
new file mode 100644
index 0000000..0f6b17b
--- /dev/null
+++ b/docs/apis/storm_compatibility.md
@@ -0,0 +1,155 @@
+---
+title: "Storm Compatibility"
+is_beta: true
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+[Flink streaming](streaming_guide.html) is compatible with Apache Storm interfaces and therefore
allows
+reusing code that was implemented for Storm.
+
+You can:
+
+- execute a whole Storm `Topology` in Flink.
+- use Storm `Spout`/`Bolt` as source/operator in Flink streaming programs.
+
+This document shows how to use existing Storm code with Flink.
+
+* This will be replaced by the TOC
+{:toc}
+
+### Project Configuration
+
+Support for Storm is contained in the `flink-storm-compatibility-core` Maven module.
+The code resides in the `org.apache.flink.stormcompatibility` package.
+
+Add the following dependency to your `pom.xml` if you want to execute Storm code in Flink.
+
+~~~xml
+<dependency>
+	<groupId>org.apache.flink</groupId>
+	<artifactId>flink-storm-compatibility-core</artifactId>
+	<version>{{site.version}}</version>
+</dependency>
+~~~
+
+**Please note**: `flink-storm-compatibility-core` is not part of the provided binary Flink
distribution. Thus, you need to include `flink-storm-compatiblitly-core` classes (and their
dependencies) in your program jar that is submitted to Flink's JobManager.
+
+### Execute Storm Topologies
+
+Flink provides a Storm compatible API (`org.apache.flink.stormcompatibility.api`) that offers
replacements for the following classes:
+
+- `TopologyBuilder` replaced by `FlinkTopologyBuilder`
+- `StormSubmitter` replaced by `FlinkSubmitter`
+- `NimbusClient` and `Client` replaced by `FlinkClient`
+- `LocalCluster` replaced by `FlinkLocalCluster`
+
+In order to submit a Storm topology to Flink, it is sufficient to replace the used Storm
classed with their Flink replacements in the original Storm client code that assembles the
topology.
+If a topology is executed in a remote cluster, parameters `nimbus.host` and `nimbus.thrift.port`
are used as `jobmanger.rpc.address` and `jobmanger.rpc.port`, respectively.
+If a parameter is not specified, the value is taken from `flink-conf.yaml`.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+~~~java
+FlinkTopologyBuilder builder = new FlinkTopologyBuilder(); // replaces: TopologyBuilder builder
= new FlinkTopology();
+
+builder.setSpout("source", new StormFileSpout(inputFilePath));
+builder.setBolt("tokenizer", new StormBoltTokenizer()).shuffleGrouping("source");
+builder.setBolt("counter", new StormBoltCounter()).fieldsGrouping("tokenizer", new Fields("word"));
+builder.setBolt("sink", new StormBoltFileSink(outputFilePath)).shuffleGrouping("counter");
+
+Config conf = new Config();
+if(runLocal) { // submit to test cluster
+	FlinkLocalCluster cluster = new FlinkLocalCluster(); // replaces: LocalCluster cluster =
new LocalCluster();
+	cluster.submitTopology("WordCount", conf, builder.createTopology());
+} else { // submit to remote cluster
+	// optional
+	// conf.put(Config.NIMBUS_HOST, "remoteHost");
+	// conf.put(Config.NIMBUS_THRIFT_PORT, 6123);
+	FlinkSubmitter.submitTopology("WordCount", conf, builder.createTopology()); // replaces:
StormSubmitter.submitTopology(topologyId, conf, builder.createTopology());
+}
+~~~
+</div>
+</div>
+
+### Embed Storm Operators in Flink Streaming Programs 
+
+As an alternative, Spouts and Bolts can be embedded into regular streaming programs.
+The Storm compatibility layer offers a wrapper classes for each, namely `StormSpoutWrapper`
and `StormBoltWrapper` (`org.apache.flink.stormcompatibility.wrappers`).
+
+Per default, both wrappers convert Storm output tuples to Flink's `Tuple` types (ie, `Tuple1`
to `Tuple25` according to the number of fields of the Storm tuples).
+For single field output tuples a conversion to the field's data type is also possible (eg,
`String` instead of `Tuple1<String>`).
+
+Because Flink cannot infer the output field types of Storm operators, it is required to specify
the output type manually.
+In order to get the correct `TypeInformation` object, Flink's `TypeExtractor` can be used.
+
+#### Embed Spouts
+
+In order to use a Spout as Flink source, use `StreamExecutionEnvironment.addSource(SourceFunction,
TypeInformation)`.
+The Spout object is handed to the constructor of `StormSpoutWrapper<OUT>` that serves
as first argument to `addSource(...)`.
+The generic type declaration `OUT` specifies the type of the source output stream.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+~~~java
+StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+// stream has `raw` type (single field output streams only)
+DataStream<String> rawInput = env.addSource(
+	new StormSpoutWrapper<String>(new StormFileSpout(localFilePath), true), // Spout source,
'true' for raw type
+	TypeExtractor.getForClass(String.class)); // output type
+
+// process data stream
+[...]
+~~~
+</div>
+</div>
+
+If a Spout emits a finite number of tuples, `StormFiniteSpoutWrapper` can be used instead
of `StormSpoutWrapper`.
+Using `StormFiniteSpoutWrapper` allows the Flink program to shut down automatically after
all data is processed.
+If `StormSpoutWrapper` is used, the program will run until it is [canceled](cli.html) manually.
+
+
+#### Embed Bolts
+
+In order to use a Bolt as Flink operator, use `DataStream.transform(String, TypeInformation,
OneInputStreamOperator)`.
+The Bolt object is handed to the constructor of `StormBoltWrapper<IN,OUT>` that serves
as last argument to `transform(...)`.
+The generic type declarations `IN` and `OUT` specify the type of the operator's input and
output stream, respectively.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+~~~java
+StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+DataStream<String> text = env.readTextFile(localFilePath);
+
+DataStream<Tuple2<String, Integer>> counts = text.transform(
+	"tokenizer", // operator name
+	TypeExtractor.getForObject(new Tuple2<String, Integer>("", 0)), // output type
+	new StormBoltWrapper<String, Tuple2<String, Integer>>(new StormBoltTokenizer()));
// Bolt operator
+
+// do further processing
+[...]
+~~~
+</div>
+</div>
+
+### Storm Compatibility Examples
+
+You can find more examples in Maven module `flink-storm-compatibilty-examples`.
+

http://git-wip-us.apache.org/repos/asf/flink/blob/ea4f339d/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/ExclamationTopology.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/ExclamationTopology.java
b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/ExclamationTopology.java
index 9ff3fb7..a5bb571 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/ExclamationTopology.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/ExclamationTopology.java
@@ -21,7 +21,7 @@ import org.apache.flink.examples.java.wordcount.util.WordCountData;
 import org.apache.flink.stormcompatibility.api.FlinkTopologyBuilder;
 import org.apache.flink.stormcompatibility.excamation.stormoperators.ExclamationBolt;
 import org.apache.flink.stormcompatibility.util.OutputFormatter;
-import org.apache.flink.stormcompatibility.util.SimpleOutputFormatter;
+import org.apache.flink.stormcompatibility.util.RawOutputFormatter;
 import org.apache.flink.stormcompatibility.util.StormBoltFileSink;
 import org.apache.flink.stormcompatibility.util.StormBoltPrintSink;
 import org.apache.flink.stormcompatibility.util.StormFileSpout;
@@ -36,7 +36,7 @@ public class ExclamationTopology {
 	public final static String firstBoltId = "exclamation1";
 	public final static String secondBoltId = "exclamation2";
 	public final static String sinkId = "sink";
-	private final static OutputFormatter formatter = new SimpleOutputFormatter();
+	private final static OutputFormatter formatter = new RawOutputFormatter();
 
 	public static FlinkTopologyBuilder buildTopology() {
 		final FlinkTopologyBuilder builder = new FlinkTopologyBuilder();

http://git-wip-us.apache.org/repos/asf/flink/blob/ea4f339d/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/RawOutputFormatter.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/RawOutputFormatter.java
b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/RawOutputFormatter.java
new file mode 100644
index 0000000..7faf6cd
--- /dev/null
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/RawOutputFormatter.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.stormcompatibility.util;
+
+import backtype.storm.tuple.Tuple;
+
+public class RawOutputFormatter implements OutputFormatter {
+	private static final long serialVersionUID = 8685668993521259832L;
+
+	@Override
+	public String format(final Tuple input) {
+		assert (input.size() == 1);
+		return input.getValue(0).toString();
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ea4f339d/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/SimpleOutputFormatter.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/SimpleOutputFormatter.java
b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/SimpleOutputFormatter.java
index a9d72d9..ccb617b 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/SimpleOutputFormatter.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/SimpleOutputFormatter.java
@@ -25,7 +25,7 @@ public class SimpleOutputFormatter implements OutputFormatter {
 
 	@Override
 	public String format(final Tuple input) {
-		return input.getValue(0).toString();
+		return input.getValues().toString();
 	}
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ea4f339d/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/StormBoltFileSink.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/StormBoltFileSink.java
b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/StormBoltFileSink.java
index a92bc6a..ee8dca4 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/StormBoltFileSink.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/StormBoltFileSink.java
@@ -34,6 +34,10 @@ public final class StormBoltFileSink extends AbstractStormBoltSink {
 	private final String path;
 	private BufferedWriter writer;
 
+	public StormBoltFileSink(final String path) {
+		this(path, new SimpleOutputFormatter());
+	}
+
 	public StormBoltFileSink(final String path, final OutputFormatter formatter) {
 		super(formatter);
 		this.path = path;


Mime
View raw message