pulsar-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mme...@apache.org
Subject [pulsar] branch master updated: [Pulsar-Flink] Add Scala Examples (#3071)
Date Wed, 28 Nov 2018 01:46:16 GMT
This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new b413c19  [Pulsar-Flink] Add Scala Examples (#3071)
b413c19 is described below

commit b413c19aa205d38e57ebc1509099d57bb72fb844
Author: Eren Avsarogullari <erenavsarogullari@gmail.com>
AuthorDate: Wed Nov 28 01:46:11 2018 +0000

    [Pulsar-Flink] Add Scala Examples (#3071)
    
    * [Pulsar-Flink] Add Scala Examples
    
    * Line break is added for input text.
    
    * Adding ASF Header.
    
    * Fix License format
---
 pulsar-flink/pom.xml                               |  6 ++
 .../example/FlinkPulsarBatchSinkExample.java       | 10 +++
 .../FlinkPulsarBatchCsvSinkScalaExample.scala      | 78 ++++++++++++++++++++
 .../FlinkPulsarBatchJsonSinkScalaExample.scala     | 81 +++++++++++++++++++++
 .../example/FlinkPulsarBatchSinkScalaExample.scala | 85 ++++++++++++++++++++++
 5 files changed, 260 insertions(+)

diff --git a/pulsar-flink/pom.xml b/pulsar-flink/pom.xml
index b91af16..e48d213 100644
--- a/pulsar-flink/pom.xml
+++ b/pulsar-flink/pom.xml
@@ -52,6 +52,12 @@
     </dependency>
 
     <dependency>
+      <groupId>org.apache.flink</groupId>
+      <artifactId>flink-scala_${scala.binary.version}</artifactId>
+      <version>${flink.version}</version>
+    </dependency>
+
+    <dependency>
       <groupId>${project.groupId}</groupId>
       <artifactId>pulsar-client</artifactId>
       <version>${project.version}</version>
diff --git a/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchSinkExample.java
b/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchSinkExample.java
index 7b35065..6724c62 100644
--- a/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchSinkExample.java
+++ b/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchSinkExample.java
@@ -50,6 +50,7 @@ public class FlinkPulsarBatchSinkExample {
         // create DataSet
         DataSet<String> textDS = env.fromElements(EINSTEIN_QUOTE);
 
+        // convert sentences to words
         textDS.flatMap(new FlatMapFunction<String, WordWithCount>() {
             @Override
             public void flatMap(String value, Collector<WordWithCount> out) throws
Exception {
@@ -59,23 +60,32 @@ public class FlinkPulsarBatchSinkExample {
                 }
             }
         })
+
         // filter words which length is bigger than 4
         .filter(wordWithCount -> wordWithCount.word.length() > 4)
+
+        // group the words
         .groupBy(new KeySelector<WordWithCount, String>() {
             @Override
             public String getKey(WordWithCount wordWithCount) throws Exception {
                 return wordWithCount.word;
             }
         })
+
+        // sum the word counts
         .reduce(new ReduceFunction<WordWithCount>() {
             @Override
             public WordWithCount reduce(WordWithCount wordWithCount1, WordWithCount wordWithCount2)
throws Exception {
                 return  new WordWithCount(wordWithCount1.word, wordWithCount1.count + wordWithCount2.count);
             }
         })
+
         // write batch data to Pulsar
         .output(pulsarOutputFormat);
 
+        // set parallelism to write Pulsar in parallel (optional)
+        env.setParallelism(2);
+
         // execute program
         env.execute("Flink - Pulsar Batch WordCount");
 
diff --git a/pulsar-flink/src/test/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchCsvSinkScalaExample.scala
b/pulsar-flink/src/test/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchCsvSinkScalaExample.scala
new file mode 100644
index 0000000..7db844b
--- /dev/null
+++ b/pulsar-flink/src/test/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchCsvSinkScalaExample.scala
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flink.batch.connectors.pulsar.example
+
+import org.apache.flink.api.java.tuple.Tuple4
+import org.apache.flink.api.scala._
+import org.apache.flink.batch.connectors.pulsar.PulsarCsvOutputFormat
+
+/**
+  * Implements a batch Scala program on Pulsar topic by writing Flink DataSet as Csv.
+  */
+object FlinkPulsarBatchCsvSinkScalaExample {
+
+  /**
+    * NasaMission Model
+    */
+  private case class NasaMission(id: Int, missionName: String, startYear: Int, endYear: Int)
+    extends Tuple4(id, missionName, startYear, endYear)
+
+  private val SERVICE_URL = "pulsar://127.0.0.1:6650"
+  private val TOPIC_NAME = "my-flink-topic"
+
+  private val nasaMissions = List(
+    NasaMission(1, "Mercury program", 1959, 1963),
+    NasaMission(2, "Apollo program", 1961, 1972),
+    NasaMission(3, "Gemini program", 1963, 1966),
+    NasaMission(4, "Skylab", 1973, 1974),
+    NasaMission(5, "Apollo–Soyuz Test Project", 1975, 1975))
+
+  def main(args: Array[String]): Unit = {
+
+    // set up the execution environment
+    val env = ExecutionEnvironment.getExecutionEnvironment
+
+    // create PulsarCsvOutputFormat instance
+    val pulsarCsvOutputFormat =
+      new PulsarCsvOutputFormat[NasaMission](SERVICE_URL, TOPIC_NAME)
+
+    // create DataSet
+    val textDS = env.fromCollection(nasaMissions)
+
+    // map nasa mission names to upper-case
+    textDS.map(nasaMission => NasaMission(
+      nasaMission.id,
+      nasaMission.missionName.toUpperCase,
+      nasaMission.startYear,
+      nasaMission.endYear))
+
+    // filter missions which started after 1970
+    .filter(_.startYear > 1970)
+
+    // write batch data to Pulsar as Csv
+    .output(pulsarCsvOutputFormat)
+
+    // set parallelism to write Pulsar in parallel (optional)
+    env.setParallelism(2)
+
+    // execute program
+    env.execute("Flink - Pulsar Batch Csv")
+  }
+
+}
\ No newline at end of file
diff --git a/pulsar-flink/src/test/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchJsonSinkScalaExample.scala
b/pulsar-flink/src/test/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchJsonSinkScalaExample.scala
new file mode 100644
index 0000000..1f7fc19
--- /dev/null
+++ b/pulsar-flink/src/test/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchJsonSinkScalaExample.scala
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flink.batch.connectors.pulsar.example
+
+import org.apache.flink.api.scala._
+import org.apache.flink.batch.connectors.pulsar.PulsarJsonOutputFormat
+
+import scala.beans.BeanProperty
+
+/**
+  * Implements a batch Scala program on Pulsar topic by writing Flink DataSet as Json.
+  */
+object FlinkPulsarBatchJsonSinkScalaExample {
+
+  /**
+    * NasaMission Model
+    */
+  private case class NasaMission(@BeanProperty id: Int,
+                         @BeanProperty missionName: String,
+                         @BeanProperty startYear: Int,
+                         @BeanProperty endYear: Int)
+
+  private val nasaMissions = List(
+    NasaMission(1, "Mercury program", 1959, 1963),
+    NasaMission(2, "Apollo program", 1961, 1972),
+    NasaMission(3, "Gemini program", 1963, 1966),
+    NasaMission(4, "Skylab", 1973, 1974),
+    NasaMission(5, "Apollo–Soyuz Test Project", 1975, 1975))
+
+  private val SERVICE_URL = "pulsar://127.0.0.1:6650"
+  private val TOPIC_NAME = "my-flink-topic"
+
+  def main(args: Array[String]): Unit = {
+
+    // set up the execution environment
+    val env = ExecutionEnvironment.getExecutionEnvironment
+
+    // create PulsarJsonOutputFormat instance
+    val pulsarJsonOutputFormat = new PulsarJsonOutputFormat[NasaMission](SERVICE_URL, TOPIC_NAME)
+
+    // create DataSet
+    val nasaMissionDS = env.fromCollection(nasaMissions)
+
+    // map nasa mission names to upper-case
+    nasaMissionDS.map(nasaMission =>
+      NasaMission(
+        nasaMission.id,
+        nasaMission.missionName.toUpperCase,
+        nasaMission.startYear,
+        nasaMission.endYear))
+
+    // filter missions which started after 1970
+    .filter(_.startYear > 1970)
+
+    // write batch data to Pulsar
+    .output(pulsarJsonOutputFormat)
+
+    // set parallelism to write Pulsar in parallel (optional)
+    env.setParallelism(2)
+
+    // execute program
+    env.execute("Flink - Pulsar Batch Json")
+  }
+
+}
diff --git a/pulsar-flink/src/test/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchSinkScalaExample.scala
b/pulsar-flink/src/test/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchSinkScalaExample.scala
new file mode 100644
index 0000000..5e536cf
--- /dev/null
+++ b/pulsar-flink/src/test/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchSinkScalaExample.scala
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flink.batch.connectors.pulsar.example
+
+import org.apache.flink.api.common.serialization.SerializationSchema
+import org.apache.flink.api.scala._
+import org.apache.flink.batch.connectors.pulsar.PulsarOutputFormat
+import org.apache.flink.util.Collector
+
+/**
+  * Data type for words with count.
+  */
+case class WordWithCount(word: String, count: Long) {
+  override def toString: String = "WordWithCount { word = " + word + ", count = " + count
+ " }"
+}
+
+/**
+  * Implements a batch word-count Scala program on Pulsar topic by writing Flink DataSet.
+  */
+object FlinkPulsarBatchSinkScalaExample {
+
+  private val EINSTEIN_QUOTE = "Imagination is more important than knowledge. " +
+    "Knowledge is limited. Imagination encircles the world."
+  private val SERVICE_URL = "pulsar://127.0.0.1:6650"
+  private val TOPIC_NAME = "my-flink-topic"
+
+  def main(args: Array[String]): Unit = {
+
+    // set up the execution environment
+    val env = ExecutionEnvironment.getExecutionEnvironment
+
+    // create PulsarOutputFormat instance
+    val pulsarOutputFormat =
+      new PulsarOutputFormat[WordWithCount](SERVICE_URL, TOPIC_NAME, new SerializationSchema[WordWithCount]
{
+        override def serialize(wordWithCount: WordWithCount): Array[Byte] = wordWithCount.toString.getBytes
+      })
+
+    // create DataSet
+    val textDS = env.fromElements[String](EINSTEIN_QUOTE)
+
+    // convert sentence to words
+    textDS.flatMap((value: String, out: Collector[WordWithCount]) => {
+      val words = value.toLowerCase.split(" ")
+      for (word <- words) {
+        out.collect(new WordWithCount(word.replace(".", ""), 1))
+      }
+    })
+
+    // filter words which length is bigger than 4
+    .filter((wordWithCount: WordWithCount) => wordWithCount.word.length > 4)
+
+    // group the words
+    .groupBy((wordWithCount: WordWithCount) => wordWithCount.word)
+
+    // sum the word counts
+    .reduce((wordWithCount1: WordWithCount, wordWithCount2: WordWithCount) =>
+      new WordWithCount(wordWithCount1.word, wordWithCount1.count + wordWithCount2.count))
+
+    // write batch data to Pulsar
+    .output(pulsarOutputFormat)
+
+    // set parallelism to write Pulsar in parallel (optional)
+    env.setParallelism(2)
+
+    // execute program
+    env.execute("Flink - Pulsar Batch WordCount")
+  }
+
+}
\ No newline at end of file


Mime
View raw message