pulsar-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] sijie closed pull request #3265: [flink-consumer-source] fix up streaming to be consistent with batch
Date Mon, 31 Dec 2018 18:31:40 GMT
sijie closed pull request #3265:  [flink-consumer-source] fix up streaming to be consistent
with batch
URL: https://github.com/apache/pulsar/pull/3265
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/examples/flink/src/main/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchAvroSinkExample.java
b/examples/flink/src/main/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchAvroSinkExample.java
index 584d59fec1..1349dba4cc 100644
--- a/examples/flink/src/main/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchAvroSinkExample.java
+++ b/examples/flink/src/main/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchAvroSinkExample.java
@@ -84,5 +84,5 @@ public static void main(String[] args) throws Exception {
         // execute program
         env.execute("Flink - Pulsar Batch Avro");
     }
-
+  
 }
diff --git a/examples/flink/src/main/java/org/apache/flink/batch/connectors/pulsar/example/README.md
b/examples/flink/src/main/java/org/apache/flink/batch/connectors/pulsar/example/README.md
index 84c1bd8a52..29adc344e3 100644
--- a/examples/flink/src/main/java/org/apache/flink/batch/connectors/pulsar/example/README.md
+++ b/examples/flink/src/main/java/org/apache/flink/batch/connectors/pulsar/example/README.md
@@ -211,6 +211,10 @@ $ bin/pulsar-client consume -n 0 -s test test_flink_topic
 
 6. Please find sample output for above linked application as follows:
 ```
- "4,SKYLAB,1973,1974"
- "5,APOLLO–SOYUZ TEST PROJECT,1975,1975"
-```
+ ----- got message -----
+ 
+ Skylab��
+ ----- got message -----
+ 
+ 6Apollo–Soyuz Test Project��
+```
\ No newline at end of file
diff --git a/examples/flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/example/PulsarConsumerSourceWordCountToAvroTableSink.java
b/examples/flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/example/PulsarConsumerSourceWordCountToAvroTableSink.java
index 7b78da5149..4563b56696 100644
--- a/examples/flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/example/PulsarConsumerSourceWordCountToAvroTableSink.java
+++ b/examples/flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/example/PulsarConsumerSourceWordCountToAvroTableSink.java
@@ -42,16 +42,17 @@
  *
  * <p>Example usage:
  *   --service-url pulsar://localhost:6650 --input-topic test_topic --subscription test_sub
+ * or
+ *   --service-url pulsar://localhost:6650 --input-topic test_src --subscription test_sub
--output-topic test_sub
  */
 public class PulsarConsumerSourceWordCountToAvroTableSink {
-    private static final String SERVICE_URL = "pulsar://localhost:6650";
     private static final String ROUTING_KEY = "word";
 
     public static void main(String[] args) throws Exception {
         // parse input arguments
         final ParameterTool parameterTool = ParameterTool.fromArgs(args);
 
-        if (parameterTool.getNumberOfParameters() < 2) {
+        if (parameterTool.getNumberOfParameters() < 3) {
             System.out.println("Missing parameters!");
             System.out.println("Usage: pulsar --service-url <pulsar-service-url> --input-topic
<topic> --subscription <sub> --output-topic <topic>");
             return;
@@ -96,23 +97,23 @@ public static void main(String[] args) throws Exception {
                     }
                 })
                 .returns(WordWithCount.class)
-                .keyBy("word")
+                .keyBy(ROUTING_KEY)
                 .timeWindow(Time.seconds(5))
                 .reduce((ReduceFunction<WordWithCount>) (c1, c2) ->
                         WordWithCount.newBuilder().setWord(c1.getWord()).setCount(c1.getCount()
+ c2.getCount()).build()
                 );
 
         tableEnvironment.registerDataStream("wc",wc);
-
-        Table table = tableEnvironment.sqlQuery("select * from wc");
+        Table table = tableEnvironment.sqlQuery("select word, `count` from wc");
+        table.printSchema();
+        TableSink sink = null;
         if (null != outputTopic) {
-            PulsarAvroTableSink sink = new PulsarAvroTableSink(SERVICE_URL, outputTopic,
new ProducerConfiguration(), ROUTING_KEY,WordWithCount.class);
-            table.writeToSink(sink);
+            sink = new PulsarAvroTableSink(serviceUrl, outputTopic, new ProducerConfiguration(),
ROUTING_KEY, WordWithCount.class);
         } else {
-            TableSink sink = new CsvTableSink("./examples/file",  "|");
             // print the results with a csv file
-            table.writeToSink(sink);
+            sink = new CsvTableSink("./examples/file",  "|");
         }
+        table.writeToSink(sink);
 
         env.execute("Pulsar Stream WordCount");
     }
diff --git a/examples/flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/example/PulsarConsumerSourceWordCountToJsonTableSink.java
b/examples/flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/example/PulsarConsumerSourceWordCountToJsonTableSink.java
index 95b253675a..de09146e5a 100644
--- a/examples/flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/example/PulsarConsumerSourceWordCountToJsonTableSink.java
+++ b/examples/flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/example/PulsarConsumerSourceWordCountToJsonTableSink.java
@@ -44,16 +44,17 @@
  *
  * <p>Example usage:
  *   --service-url pulsar://localhost:6650 --input-topic test_topic --subscription test_sub
+ * or
+ *   --service-url pulsar://localhost:6650 --input-topic test_src --subscription test_sub
--output-topic test_sub
  */
 public class PulsarConsumerSourceWordCountToJsonTableSink {
-    private static final String SERVICE_URL = "pulsar://localhost:6650";
     private static final String ROUTING_KEY = "word";
 
     public static void main(String[] args) throws Exception {
         // parse input arguments
         final ParameterTool parameterTool = ParameterTool.fromArgs(args);
 
-        if (parameterTool.getNumberOfParameters() < 2) {
+        if (parameterTool.getNumberOfParameters() < 3) {
             System.out.println("Missing parameters!");
             System.out.println("Usage: pulsar --service-url <pulsar-service-url> --input-topic
<topic> --subscription <sub> --output-topic <topic>");
             return;
@@ -98,22 +99,22 @@ public static void main(String[] args) throws Exception {
                     }
                 })
                 .returns(WordWithCount.class)
-                .keyBy("word")
+                .keyBy(ROUTING_KEY)
                 .timeWindow(Time.seconds(5))
                 .reduce((ReduceFunction<WordWithCount>) (c1, c2) ->
                         new WordWithCount(c1.word, c1.count + c2.count));
 
         tableEnvironment.registerDataStream("wc",wc);
-
-        Table table = tableEnvironment.sqlQuery("select * from wc");
+        Table table = tableEnvironment.sqlQuery("select word, `count` from wc");
+        table.printSchema();
+        TableSink sink = null;
         if (null != outputTopic) {
-            PulsarJsonTableSink sink = new PulsarJsonTableSink(SERVICE_URL, outputTopic,
new ProducerConfiguration(), ROUTING_KEY);
-            table.writeToSink(sink);
+            sink = new PulsarJsonTableSink(serviceUrl, outputTopic, new ProducerConfiguration(),
ROUTING_KEY);
         } else {
-            TableSink sink = new CsvTableSink("./examples/file",  "|");
             // print the results with a csv file
-            table.writeToSink(sink);
+            sink = new CsvTableSink("./examples/file",  "|");
         }
+        table.writeToSink(sink);
 
         env.execute("Pulsar Stream WordCount");
     }
diff --git a/examples/flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/example/README.md
b/examples/flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/example/README.md
index 7c8c17687e..ac36eb5dbc 100644
--- a/examples/flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/example/README.md
+++ b/examples/flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/example/README.md
@@ -61,7 +61,7 @@ The steps to run the example:
 4. Run the word count example to print results to stdout.
 
     ```shell
-    $ ./bin/flink run  ${PULSAR_HOME}/examples/flink/target/pulsar-flink-examples.jar --service-url
pulsar://localhost:6650 --input-topic test_src --subscription test_sub
+    $ ./bin/flink run -c org.apache.flink.streaming.connectors.pulsar.example.PulsarConsumerSourceWordCount
${PULSAR_HOME}/examples/flink/target/pulsar-flink-examples.jar --service-url pulsar://localhost:6650
--input-topic test_src --subscription test_sub
     ```
 
 5. Produce messages to topic `test_src`.
@@ -73,19 +73,16 @@ The steps to run the example:
 6. You can check the flink taskexecutor `.out` file. The `.out` file will print the counts
at the end of each time window as long as words are floating in, e.g.:
 
     ```shell
-    PulsarConsumerSourceWordCount.WordWithCount(word=hello, count=200)
-    PulsarConsumerSourceWordCount.WordWithCount(word=again, count=200)
-    PulsarConsumerSourceWordCount.WordWithCount(word=test, count=200)
-    PulsarConsumerSourceWordCount.WordWithCount(word=world, count=200)
     PulsarConsumerSourceWordCount.WordWithCount(word=hello, count=100)
     PulsarConsumerSourceWordCount.WordWithCount(word=again, count=100)
     PulsarConsumerSourceWordCount.WordWithCount(word=test, count=100)
+    PulsarConsumerSourceWordCount.WordWithCount(word=world, count=100)  
     ```
 
 Alternatively, when you run the flink word count example at step 4, you can choose dump the
result to another pulsar topic.
 
 ```shell
-$ ./bin/flink run  ${PULSAR_HOME}/examples/flink/target/pulsar-flink-examples.jar --service-url
pulsar://localhost:6650 --input-topic test_src --subscription test_sub --output-topic test_dest
+$ ./bin/flink run -c org.apache.flink.streaming.connectors.pulsar.example.PulsarConsumerSourceWordCount
${PULSAR_HOME}/examples/flink/target/pulsar-flink-examples.jar --service-url pulsar://localhost:6650
--input-topic test_src --subscription test_sub --output-topic test_dest
 ```
 
 Once the flink word count example is running, you can use `bin/pulsar-client` to tail the
results produced into topic `test_dest`.
@@ -95,3 +92,118 @@ $ bin/pulsar-client consume -n 0 -s test test_dest
 ```
 
 You will see similar results as what you see at step 6 when running the word count example
to print results to stdout.
+
+
+### PulsarConsumerSourceWordCountToAvroTableSink
+
+This Flink streaming job is consuming from a Pulsar topic and counting the wordcount in a
streaming fashion. The job can write the word count results
+to csv file or another Pulsar topic for avro format.
+
+The steps to run the example:
+
+Step 1, 2 and 3 are same as above.
+
+4. Run the word count example to print results to stdout.
+
+    ```shell
+    $ ./bin/flink run -c org.apache.flink.streaming.connectors.pulsar.example.PulsarConsumerSourceWordCountToAvroTableSink
${PULSAR_HOME}/examples/flink/target/pulsar-flink-examples.jar --service-url pulsar://localhost:6650
--input-topic test_src --subscription test_sub
+    ```
+
+5. Produce messages to topic `test_src`.
+
+    ```shell
+    $ bin/pulsar-client produce -m "hello world again" -n 100 test_src
+    ```
+
+6. You can check the ${FLINK_HOME}/examples/file. The file contains the counts at the end
of each time window as long as words are floating in, e.g.:
+
+    ```file
+    hello|100
+    again|100
+    test|100
+    world|100
+    ```
+
+Alternatively, when you run the flink word count example at step 4, you can choose dump the
result to another pulsar topic.
+
+```shell
+$ ./bin/flink run -c org.apache.flink.streaming.connectors.pulsar.example.PulsarConsumerSourceWordCountToAvroTableSink
${PULSAR_HOME}/examples/flink/target/pulsar-flink-examples.jar --service-url pulsar://localhost:6650
--input-topic test_src --subscription test_sub --output-topic test_dest
+```
+
+Once the flink word count example is running, you can use `bin/pulsar-client` to tail the
results produced into topic `test_dest`.
+
+```shell
+$ bin/pulsar-client consume -n 0 -s test test_dest
+```
+
+You will see sample output for above linked application as follows:.
+```
+----- got message -----
+
+hello�
+----- got message -----
+
+again�
+----- got message -----
+test�
+----- got message -----
+
+world�
+
+```
+
+### PulsarConsumerSourceWordCountToJsonTableSink
+
+This Flink streaming job is consuming from a Pulsar topic and counting the wordcount in a
streaming fashion. The job can write the word count results
+to csv file or another Pulsar topic for json format.
+
+The steps to run the example:
+
+Step 1, 2 and 3 are same as above.
+
+4. Run the word count example to print results to stdout.
+
+    ```shell
+    $ ./bin/flink run -c org.apache.flink.streaming.connectors.pulsar.example.PulsarConsumerSourceWordCountToJsonTableSink
${PULSAR_HOME}/examples/flink/target/pulsar-flink-examples.jar --service-url pulsar://localhost:6650
--input-topic test_src --subscription test_sub
+    ```
+
+If java.lang.ClassNotFoundException: org.apache.flink.table.sinks.TableSink and java.lang.NoClassDefFoundError:
org/apache/flink/formats/json/JsonRowSerializationSchema, you need build Apache Flink from
source, then copy flink-table_{version}.jar, flink-json_{version}.jar to ${FLINK_HOME}/lib
and restart flink cluster. 
+
+5. Produce messages to topic `test_src`.
+
+    ```shell
+    $ bin/pulsar-client produce -m "hello world again" -n 100 test_src
+    ```
+
+6. You can check the ${FLINK_HOME}/examples/file. The file contains the counts at the end
of each time window as long as words are floating in, e.g.:
+
+    ```file
+    hello|100
+    again|100
+    test|100
+    world|100
+    ```
+
+Alternatively, when you run the flink word count example at step 4, you can choose dump the
result to another pulsar topic.
+
+```shell
+$ ./bin/flink run -c org.apache.flink.streaming.connectors.pulsar.example.PulsarConsumerSourceWordCountToJsonTableSink
${PULSAR_HOME}/examples/flink/target/pulsar-flink-examples.jar --service-url pulsar://localhost:6650
--input-topic test_src --subscription test_sub --output-topic test_dest
+```
+
+Once the flink word count example is running, you can use `bin/pulsar-client` to tail the
results produced into topic `test_dest`.
+
+```shell
+$ bin/pulsar-client consume -n 0 -s test test_dest
+```
+
+You will see sample output for above linked application as follows:.
+```
+----- got message -----
+{"word":"hello","count":100}
+----- got message -----
+{"word":"again","count":100}
+----- got message -----
+{"word":"test","count":100}
+----- got message -----
+{"word":"world","count":100}
+```
diff --git a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarAvroTableSink.java
b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarAvroTableSink.java
index 9187a0d71c..a7a44121c9 100644
--- a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarAvroTableSink.java
+++ b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarAvroTableSink.java
@@ -99,7 +99,8 @@ public void emitDataStream(DataStream<Row> dataStream) {
 
     @Override
     public TypeInformation<Row> getOutputType() {
-        return new RowTypeInfo(fieldTypes, fieldNames);
+        RowTypeInfo rowTypeInfo = new RowTypeInfo(fieldTypes, fieldNames);
+        return rowTypeInfo;
     }
 
     @Override
@@ -162,7 +163,7 @@ public AvroKeyExtractor(
 
         @Override
         public String getKey(Row event) {
-            return (String) event.getField(keyIndex);
+            return event.getField(keyIndex).toString();
         }
     }
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

Mime
View raw message