pulsar-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From si...@apache.org
Subject [incubator-pulsar] branch master updated: [documentation][example] Flink Source & Sink Connector (#2561)
Date Wed, 12 Sep 2018 08:38:59 GMT
This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 71003e2   [documentation][example] Flink Source & Sink Connector (#2561)
71003e2 is described below

commit 71003e20e6606527c2f538ba2059850d9539d878
Author: Sijie Guo <guosijie@gmail.com>
AuthorDate: Wed Sep 12 01:38:56 2018 -0700

     [documentation][example] Flink Source & Sink Connector (#2561)
    
    ### Motivation
    
    We added flink source connector (#2441) and sink connector (#2434). It would be great
to an example to show how to use flink source & sink connector.
    
    ### Modifications
    
    - introduce an `examples` module
    - introduce an `examples/flink-consumer-source` module
    - add a word count example to use flink source and sink connector
    
    ### Result
    
    be able to know how to use flink source & sink connector
---
 examples/flink-consumer-source/README.md           |  76 +++++++++++++
 examples/flink-consumer-source/pom.xml             |  97 ++++++++++++++++
 .../flink/PulsarConsumerSourceWordCount.java       | 126 +++++++++++++++++++++
 examples/pom.xml                                   |  39 +++++++
 pom.xml                                            |   3 +
 .../connectors/pulsar/FlinkPulsarProducer.java     |  10 +-
 .../connectors/pulsar/PulsarConsumerSource.java    |   1 -
 .../pulsar/partitioner/PulsarKeyExtractor.java     |   4 +-
 8 files changed, 350 insertions(+), 6 deletions(-)

diff --git a/examples/flink-consumer-source/README.md b/examples/flink-consumer-source/README.md
new file mode 100644
index 0000000..38b75b6
--- /dev/null
+++ b/examples/flink-consumer-source/README.md
@@ -0,0 +1,76 @@
+## Apache Flink Connectors for Pulsar
+
+This page describes how to use the connectors to read and write Pulsar topics with [Apache
Flink](https://flink.apache.org/) stream processing applications.
+
+Build end-to-end stream processing pipelines that use Pulsar as the stream storage and message
bus, and Apache Flink for computation over the streams.
+See the [Pulsar Concepts](https://pulsar.incubator.apache.org/docs/en/concepts-overview/)
page for more information.
+
+## Example
+
+### PulsarConsumerSourceWordCount
+
+This Flink streaming job is consuming from a Pulsar topic and couting the wordcount in a
streaming fashion. The job can write the word count results
+to stdout or another Pulsar topic.
+
+The steps to run the example:
+
+1. Start Pulsar Standalone.
+
+    You can follow the [instructions](https://pulsar.incubator.apache.org/docs/en/standalone/)
to start a Pulsar standalone locally.
+
+    ```shell
+    $ bin/pulsar standalone
+    ```
+
+2. Start Flink locally.
+
+    You can follow the [instructions](https://ci.apache.org/projects/flink/flink-docs-release-1.6/quickstart/setup_quickstart.html)
to download and start Flink.
+
+    ```shell
+    $ ./bin/start-cluster.sh
+    ```
+
+3. Build the examples.
+
+    ```shell
+    $ cd ${PULSAR_HOME}
+    $ mvn clean install -DskipTests
+    ```
+
+4. Run the word count example to print results to stdout.
+
+    ```shell
+    $ ./bin/flink run  ${PULSAR_HOME}/examples/flink-consumer-source/target/pulsar-flink-streaming-wordcount.jar
--service-url pulsar://localhost:6650 --input-topic test_src --subscription test_sub
+    ```
+
+5. Produce messages to topic `test_src`.
+
+    ```shell
+    $ bin/pulsar-client produce -m "hello world test again" -n 100 test_src
+    ```
+
+6. You can check the flink taskexecutor `.out` file. The `.out` file will print the counts
at the end of each time window as long as words are floating in, e.g.:
+
+    ```shell
+PulsarConsumerSourceWordCount.WordWithCount(word=hello, count=200)
+PulsarConsumerSourceWordCount.WordWithCount(word=again, count=200)
+PulsarConsumerSourceWordCount.WordWithCount(word=test, count=200)
+PulsarConsumerSourceWordCount.WordWithCount(word=world, count=200)
+PulsarConsumerSourceWordCount.WordWithCount(word=hello, count=100)
+PulsarConsumerSourceWordCount.WordWithCount(word=again, count=100)
+PulsarConsumerSourceWordCount.WordWithCount(word=test, count=100)
+    ```
+
+Alternatively, when you run the flink word count example at step 4, you can choose dump the
result to another pulsar topic.
+
+```shell
+$ ./bin/flink run  ${PULSAR_HOME}/examples/flink-consumer-source/target/pulsar-flink-streaming-wordcount.jar
--service-url pulsar://localhost:6650 --input-topic test_src --subscription test_sub --output-topic
test_dest
+```
+
+Once the flink word count example is running, you can use `bin/pulsar-client` to tail the
results produced into topic `test_dest`.
+
+```shell
+$ bin/pulsar-client consume -n 0 -s test test_dest
+```
+
+You will see similar results as what you see at step 6 when running the word count example
to print results to stdout.
diff --git a/examples/flink-consumer-source/pom.xml b/examples/flink-consumer-source/pom.xml
new file mode 100644
index 0000000..f7ed5d0
--- /dev/null
+++ b/examples/flink-consumer-source/pom.xml
@@ -0,0 +1,97 @@
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.apache.pulsar.examples</groupId>
+    <artifactId>pulsar-examples</artifactId>
+    <version>2.2.0-incubating-SNAPSHOT</version>
+  </parent>
+
+  <groupId>org.apache.pulsar.examples</groupId>
+  <artifactId>flink-consumer-source</artifactId>
+  <name>Pulsar Examples :: Flink Consumer Source</name>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.flink</groupId>
+      <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
+      <version>${flink.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.pulsar</groupId>
+      <artifactId>pulsar-client-schema</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.pulsar</groupId>
+      <artifactId>pulsar-flink</artifactId>
+      <version>${project.version}</version>
+      <exclusions>
+        <exclusion>
+          <groupId>org.apache.pulsar</groupId>
+          <artifactId>pulsar-client-original</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+  </dependencies>
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-shade-plugin</artifactId>
+        <executions>
+          <execution>
+            <id>pulsar-streaming-wordcount</id>
+            <phase>package</phase>
+            <goals>
+              <goal>shade</goal>
+            </goals>
+            <configuration>
+              <shadeTestJar>false</shadeTestJar>
+              <shadedArtifactAttached>false</shadedArtifactAttached>
+              <createDependencyReducedPom>false</createDependencyReducedPom>
+              <transformers>
+                <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
+                  <mainClass>org.apache.pulsar.examples.flink.PulsarConsumerSourceWordCount</mainClass>
+                </transformer>
+              </transformers>
+              <finalName>pulsar-flink-streaming-wordcount</finalName>
+              <filters>
+                <filter>
+                  <artifact>*</artifact>
+                  <includes>
+                    <include>org/apache/flink/streaming/examples/kafka/**</include>
+                    <include>org/apache/flink/streaming/**</include>
+                    <include>org/apache/pulsar/**</include>
+                  </includes>
+                </filter>
+              </filters>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+    </plugins>
+  </build>
+
+</project>
diff --git a/examples/flink-consumer-source/src/main/java/org/apache/pulsar/examples/flink/PulsarConsumerSourceWordCount.java
b/examples/flink-consumer-source/src/main/java/org/apache/pulsar/examples/flink/PulsarConsumerSourceWordCount.java
new file mode 100644
index 0000000..e163f60
--- /dev/null
+++ b/examples/flink-consumer-source/src/main/java/org/apache/pulsar/examples/flink/PulsarConsumerSourceWordCount.java
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.examples.flink;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import lombok.AllArgsConstructor;
+import lombok.NoArgsConstructor;
+import lombok.ToString;
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.serialization.SimpleStringSchema;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.streaming.connectors.pulsar.FlinkPulsarProducer;
+import org.apache.flink.streaming.connectors.pulsar.PulsarSourceBuilder;
+import org.apache.pulsar.client.api.ProducerConfiguration;
+
+/**
+ * Implements a streaming wordcount program on pulsar topics.
+ *
+ * <p>Example usage:
+ *   --service-url pulsar://localhost:6650 --input-topic test_src --subscription test_sub
+ */
+public class PulsarConsumerSourceWordCount {
+
+    public static void main(String[] args) throws Exception {
+        // parse input arguments
+        final ParameterTool parameterTool = ParameterTool.fromArgs(args);
+
+        if (parameterTool.getNumberOfParameters() < 2) {
+            System.out.println("Missing parameters!");
+            System.out.println("Usage: pulsar --service-url <pulsar-service-url> --input-topic
<topic> --subscription <sub> --output-topic <topic>");
+            return;
+        }
+
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+        env.getConfig().disableSysoutLogging();
+        env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 10000));
+        env.enableCheckpointing(5000);
+        env.getConfig().setGlobalJobParameters(parameterTool);
+        env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
+
+        String serviceUrl = parameterTool.getRequired("service-url");
+        String inputTopic = parameterTool.getRequired("input-topic");
+        String subscription = parameterTool.get("subscription", "flink-examples");
+        String outputTopic = parameterTool.get("output-topic", null);
+        int parallelism = parameterTool.getInt("parallelism", 1);
+
+        System.out.println("Parameters:");
+        System.out.println("\tServiceUrl:\t" + serviceUrl);
+        System.out.println("\tInputTopic:\t" + inputTopic);
+        System.out.println("\tSubscription:\t" + subscription);
+        System.out.println("\tOutputTopic:\t" + outputTopic);
+        System.out.println("\tParallelism:\t" + parallelism);
+
+        PulsarSourceBuilder<String> builder = PulsarSourceBuilder.builder(new SimpleStringSchema())
+            .serviceUrl(serviceUrl)
+            .topic(inputTopic)
+            .subscriptionName(subscription);
+        SourceFunction<String> src = builder.build();
+        DataStream<String> input = env.addSource(src);
+
+        DataStream<WordWithCount> wc = input
+            .flatMap((FlatMapFunction<String, WordWithCount>) (line, collector) ->
{
+                for (String word : line.split("\\s")) {
+                    collector.collect(new WordWithCount(word, 1));
+                }
+            })
+            .returns(WordWithCount.class)
+            .keyBy("word")
+            .timeWindow(Time.seconds(5))
+            .reduce((ReduceFunction<WordWithCount>) (c1, c2) ->
+                new WordWithCount(c1.word, c1.count + c2.count));
+
+        if (null != outputTopic) {
+            wc.addSink(new FlinkPulsarProducer<>(
+                serviceUrl,
+                outputTopic,
+                wordWithCount -> wordWithCount.toString().getBytes(UTF_8),
+                new ProducerConfiguration(),
+                wordWithCount -> wordWithCount.word
+            )).setParallelism(parallelism);
+        } else {
+            // print the results with a single thread, rather than in parallel
+            wc.print().setParallelism(1);
+        }
+
+        env.execute("Pulsar Stream WordCount");
+    }
+
+    /**
+     * Data type for words with count.
+     */
+    @AllArgsConstructor
+    @NoArgsConstructor
+    @ToString
+    public static class WordWithCount {
+
+        public String word;
+        public long count;
+
+    }
+
+}
diff --git a/examples/pom.xml b/examples/pom.xml
new file mode 100644
index 0000000..753d0ad
--- /dev/null
+++ b/examples/pom.xml
@@ -0,0 +1,39 @@
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  <packaging>pom</packaging>
+  <parent>
+    <groupId>org.apache.pulsar</groupId>
+    <artifactId>pulsar</artifactId>
+    <version>2.2.0-incubating-SNAPSHOT</version>
+  </parent>
+
+  <groupId>org.apache.pulsar.examples</groupId>
+  <artifactId>pulsar-examples</artifactId>
+  <name>Pulsar Examples :: Parent</name>
+
+  <modules>
+    <module>flink-consumer-source</module>
+  </modules>
+
+</project>
diff --git a/pom.xml b/pom.xml
index b349925..6d45a00 100644
--- a/pom.xml
+++ b/pom.xml
@@ -112,6 +112,9 @@ flexible messaging model and an intuitive client API.</description>
     <!-- connector-related modules -->
     <module>pulsar-io</module>
 
+    <!-- examples -->
+    <module>examples</module>
+
     <!-- all these 3 modules should be put at the end in this exact sequence -->
     <module>distribution</module>
     <module>docker</module>
diff --git a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarProducer.java
b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarProducer.java
index bddfee4..2324c55 100644
--- a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarProducer.java
+++ b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarProducer.java
@@ -97,10 +97,7 @@ public class FlinkPulsarProducer<IN>
     /**
      * The callback than handles error propagation or logging callbacks.
      */
-    protected transient Function<MessageId, MessageId> successCallback = msgId ->
{
-        acknowledgeMessage();
-        return msgId;
-    };
+    protected transient Function<MessageId, MessageId> successCallback;
 
     protected transient Function<Throwable, MessageId> failureCallback;
 
@@ -205,6 +202,11 @@ public class FlinkPulsarProducer<IN>
             flushOnCheckpoint = false;
         }
 
+        this.successCallback =  msgId -> {
+            acknowledgeMessage();
+            return msgId;
+        };
+
         if (PulsarProduceMode.AT_MOST_ONCE == produceMode) {
             this.failureCallback = cause -> {
                 LOG.error("Error while sending record to Pulsar : " + cause.getMessage(),
cause);
diff --git a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarConsumerSource.java
b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarConsumerSource.java
index f1b2595..0d01def 100644
--- a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarConsumerSource.java
+++ b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarConsumerSource.java
@@ -127,7 +127,6 @@ class PulsarConsumerSource<T> extends MessageAcknowledgingSourceBase<T,
MessageI
         while (isRunning) {
             message = consumer.receive(messageReceiveTimeoutMs, TimeUnit.MILLISECONDS);
             if (message == null) {
-                LOG.info("unexpected null message");
                 continue;
             }
 
diff --git a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/partitioner/PulsarKeyExtractor.java
b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/partitioner/PulsarKeyExtractor.java
index 90dc21c..270892e 100644
--- a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/partitioner/PulsarKeyExtractor.java
+++ b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/partitioner/PulsarKeyExtractor.java
@@ -18,10 +18,12 @@
  */
 package org.apache.flink.streaming.connectors.pulsar.partitioner;
 
+import java.io.Serializable;
+
 /**
  * Extract key from a value.
  */
-public interface PulsarKeyExtractor<IN> {
+public interface PulsarKeyExtractor<IN> extends Serializable {
 
     PulsarKeyExtractor NULL = in -> null;
 


Mime
View raw message