spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From t...@apache.org
Subject spark git commit: [SPARK-16114][SQL] structured streaming network word count examples
Date Tue, 28 Jun 2016 23:12:52 GMT
Repository: spark
Updated Branches:
  refs/heads/master 8a977b065 -> 3554713a1


[SPARK-16114][SQL] structured streaming network word count examples

## What changes were proposed in this pull request?

Network word count example for structured streaming

## How was this patch tested?

Run locally

Author: James Thomas <jamesjoethomas@gmail.com>
Author: James Thomas <jamesthomas@Jamess-MacBook-Pro.local>

Closes #13816 from jjthomas/master.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3554713a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3554713a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3554713a

Branch: refs/heads/master
Commit: 3554713a163c58ca176ffde87d2c6e4a91bacb50
Parents: 8a977b0
Author: James Thomas <jamesjoethomas@gmail.com>
Authored: Tue Jun 28 16:12:48 2016 -0700
Committer: Tathagata Das <tathagata.das1565@gmail.com>
Committed: Tue Jun 28 16:12:48 2016 -0700

----------------------------------------------------------------------
 .../JavaStructuredNetworkWordCount.java         | 82 ++++++++++++++++++++
 .../streaming/structured_network_wordcount.py   | 76 ++++++++++++++++++
 .../streaming/StructuredNetworkWordCount.scala  | 76 ++++++++++++++++++
 3 files changed, 234 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/3554713a/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredNetworkWordCount.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredNetworkWordCount.java
b/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredNetworkWordCount.java
new file mode 100644
index 0000000..a2cf938
--- /dev/null
+++ b/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredNetworkWordCount.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.examples.sql.streaming;
+
+import org.apache.spark.api.java.function.FlatMapFunction;
+import org.apache.spark.sql.*;
+import org.apache.spark.sql.streaming.StreamingQuery;
+
+import java.util.Arrays;
+import java.util.Iterator;
+
+/**
+ * Counts words in UTF8 encoded, '\n' delimited text received from the network every second.
+ *
+ * Usage: JavaStructuredNetworkWordCount <hostname> <port>
+ * <hostname> and <port> describe the TCP server that Structured Streaming
+ * would connect to receive data.
+ *
+ * To run this on your local machine, you need to first run a Netcat server
+ *    `$ nc -lk 9999`
+ * and then run the example
+ *    `$ bin/run-example sql.streaming.JavaStructuredNetworkWordCount
+ *    localhost 9999`
+ */
+public final class JavaStructuredNetworkWordCount {
+
+  public static void main(String[] args) throws Exception {
+    if (args.length < 2) {
+      System.err.println("Usage: JavaNetworkWordCount <hostname> <port>");
+      System.exit(1);
+    }
+
+    String host = args[0];
+    int port = Integer.parseInt(args[1]);
+
+    SparkSession spark = SparkSession
+      .builder()
+      .appName("JavaStructuredNetworkWordCount")
+      .getOrCreate();
+
+    // Create DataFrame representing the stream of input lines from connection to host:port
+    Dataset<String> lines = spark
+      .readStream()
+      .format("socket")
+      .option("host", host)
+      .option("port", port)
+      .load().as(Encoders.STRING());
+
+    // Split the lines into words
+    Dataset<String> words = lines.flatMap(new FlatMapFunction<String, String>()
{
+      @Override
+      public Iterator<String> call(String x) {
+        return Arrays.asList(x.split(" ")).iterator();
+      }
+    }, Encoders.STRING());
+
+    // Generate running word count
+    Dataset<Row> wordCounts = words.groupBy("value").count();
+
+    // Start running the query that prints the running counts to the console
+    StreamingQuery query = wordCounts.writeStream()
+      .outputMode("complete")
+      .format("console")
+      .start();
+
+    query.awaitTermination();
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/3554713a/examples/src/main/python/sql/streaming/structured_network_wordcount.py
----------------------------------------------------------------------
diff --git a/examples/src/main/python/sql/streaming/structured_network_wordcount.py b/examples/src/main/python/sql/streaming/structured_network_wordcount.py
new file mode 100644
index 0000000..32d63c5
--- /dev/null
+++ b/examples/src/main/python/sql/streaming/structured_network_wordcount.py
@@ -0,0 +1,76 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+ Counts words in UTF8 encoded, '\n' delimited text received from the network every second.
+ Usage: structured_network_wordcount.py <hostname> <port>
+   <hostname> and <port> describe the TCP server that Structured Streaming
+   would connect to receive data.
+
+ To run this on your local machine, you need to first run a Netcat server
+    `$ nc -lk 9999`
+ and then run the example
+    `$ bin/spark-submit examples/src/main/python/sql/streaming/structured_network_wordcount.py
+    localhost 9999`
+"""
+from __future__ import print_function
+
+import sys
+
+from pyspark.sql import SparkSession
+from pyspark.sql.functions import explode
+from pyspark.sql.functions import split
+
+if __name__ == "__main__":
+    if len(sys.argv) != 3:
+        print("Usage: structured_network_wordcount.py <hostname> <port>", file=sys.stderr)
+        exit(-1)
+
+    host = sys.argv[1]
+    port = int(sys.argv[2])
+
+    spark = SparkSession\
+        .builder\
+        .appName("StructuredNetworkWordCount")\
+        .getOrCreate()
+
+    # Create DataFrame representing the stream of input lines from connection to host:port
+    lines = spark\
+        .readStream\
+        .format('socket')\
+        .option('host', host)\
+        .option('port', port)\
+        .load()
+
+    # Split the lines into words
+    words = lines.select(
+        explode(
+            split(lines.value, ' ')
+        ).alias('word')
+    )
+
+    # Generate running word count
+    wordCounts = words.groupBy('word').count()
+
+    # Start running the query that prints the running counts to the console
+    query = wordCounts\
+        .writeStream\
+        .outputMode('complete')\
+        .format('console')\
+        .start()
+
+    query.awaitTermination()

http://git-wip-us.apache.org/repos/asf/spark/blob/3554713a/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredNetworkWordCount.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredNetworkWordCount.scala
b/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredNetworkWordCount.scala
new file mode 100644
index 0000000..433f7a1
--- /dev/null
+++ b/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredNetworkWordCount.scala
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// scalastyle:off println
+package org.apache.spark.examples.sql.streaming
+
+import org.apache.spark.sql.functions._
+import org.apache.spark.sql.SparkSession
+
+/**
+ * Counts words in UTF8 encoded, '\n' delimited text received from the network every second.
+ *
+ * Usage: StructuredNetworkWordCount <hostname> <port>
+ * <hostname> and <port> describe the TCP server that Structured Streaming
+ * would connect to receive data.
+ *
+ * To run this on your local machine, you need to first run a Netcat server
+ *    `$ nc -lk 9999`
+ * and then run the example
+ *    `$ bin/run-example sql.streaming.StructuredNetworkWordCount
+ *    localhost 9999`
+ */
+object StructuredNetworkWordCount {
+  def main(args: Array[String]) {
+    if (args.length < 2) {
+      System.err.println("Usage: StructuredNetworkWordCount <hostname> <port>")
+      System.exit(1)
+    }
+
+    val host = args(0)
+    val port = args(1).toInt
+
+    val spark = SparkSession
+      .builder
+      .appName("StructuredNetworkWordCount")
+      .getOrCreate()
+
+    import spark.implicits._
+
+    // Create DataFrame representing the stream of input lines from connection to host:port
+    val lines = spark.readStream
+      .format("socket")
+      .option("host", host)
+      .option("port", port)
+      .load().as[String]
+
+    // Split the lines into words
+    val words = lines.flatMap(_.split(" "))
+
+    // Generate running word count
+    val wordCounts = words.groupBy("value").count()
+
+    // Start running the query that prints the running counts to the console
+    val query = wordCounts.writeStream
+      .outputMode("complete")
+      .format("console")
+      .start()
+
+    query.awaitTermination()
+  }
+}
+// scalastyle:on println


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message