spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From t...@apache.org
Subject spark git commit: [SPARK-6274][Streaming][Examples] Added examples streaming + sql examples.
Date Wed, 11 Mar 2015 18:20:10 GMT
Repository: spark
Updated Branches:
  refs/heads/branch-1.3 edbcb6ff3 -> ac61466f0


[SPARK-6274][Streaming][Examples] Added examples streaming + sql examples.

Added Scala, Java and Python streaming examples showing DataFrame and SQL operations within
streaming.

Author: Tathagata Das <tathagata.das1565@gmail.com>

Closes #4975 from tdas/streaming-sql-examples and squashes the following commits:

705cba1 [Tathagata Das] Fixed python lint error
75a3fad [Tathagata Das] Fixed python lint error
5fbf789 [Tathagata Das] Removed empty lines at the end
874b943 [Tathagata Das] Added examples streaming + sql examples.

(cherry picked from commit 51a79a770a8356bd0ed244af5ca7f1c44c9437d2)
Signed-off-by: Tathagata Das <tathagata.das1565@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ac61466f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ac61466f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ac61466f

Branch: refs/heads/branch-1.3
Commit: ac61466f007723767bacb603a2d31bc6a2b178ce
Parents: edbcb6f
Author: Tathagata Das <tathagata.das1565@gmail.com>
Authored: Wed Mar 11 11:19:51 2015 -0700
Committer: Tathagata Das <tathagata.das1565@gmail.com>
Committed: Wed Mar 11 11:20:03 2015 -0700

----------------------------------------------------------------------
 .../spark/examples/streaming/JavaRecord.java    |  31 +++++
 .../streaming/JavaSqlNetworkWordCount.java      | 122 +++++++++++++++++++
 .../python/streaming/sql_network_wordcount.py   |  82 +++++++++++++
 .../streaming/SqlNetworkWordCount.scala         | 101 +++++++++++++++
 4 files changed, 336 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/ac61466f/examples/src/main/java/org/apache/spark/examples/streaming/JavaRecord.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaRecord.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaRecord.java
new file mode 100644
index 0000000..e63697a
--- /dev/null
+++ b/examples/src/main/java/org/apache/spark/examples/streaming/JavaRecord.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.examples.streaming;
+
+/** Java Bean class to be used with the example JavaSqlNetworkWordCount. */
+public class JavaRecord implements java.io.Serializable {
+  private String word;
+
+  public String getWord() {
+    return word;
+  }
+
+  public void setWord(String word) {
+    this.word = word;
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/ac61466f/examples/src/main/java/org/apache/spark/examples/streaming/JavaSqlNetworkWordCount.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaSqlNetworkWordCount.java
b/examples/src/main/java/org/apache/spark/examples/streaming/JavaSqlNetworkWordCount.java
new file mode 100644
index 0000000..46562dd
--- /dev/null
+++ b/examples/src/main/java/org/apache/spark/examples/streaming/JavaSqlNetworkWordCount.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.examples.streaming;
+
+import java.util.regex.Pattern;
+
+import com.google.common.collect.Lists;
+
+import org.apache.spark.SparkConf;
+import org.apache.spark.SparkContext;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.function.FlatMapFunction;
+import org.apache.spark.api.java.function.Function;
+import org.apache.spark.api.java.function.Function2;
+import org.apache.spark.sql.SQLContext;
+import org.apache.spark.sql.DataFrame;
+import org.apache.spark.api.java.StorageLevels;
+import org.apache.spark.streaming.Durations;
+import org.apache.spark.streaming.Time;
+import org.apache.spark.streaming.api.java.JavaDStream;
+import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
+import org.apache.spark.streaming.api.java.JavaStreamingContext;
+
+/**
+ * Use DataFrames and SQL to count words in UTF8 encoded, '\n' delimited text received from
the
+ * network every second.
+ *
+ * Usage: JavaSqlNetworkWordCount <hostname> <port>
+ * <hostname> and <port> describe the TCP server that Spark Streaming would connect
to receive data.
+ *
+ * To run this on your local machine, you need to first run a Netcat server
+ *    `$ nc -lk 9999`
+ * and then run the example
+ *    `$ bin/run-example org.apache.spark.examples.streaming.JavaSqlNetworkWordCount localhost
9999`
+ */
+
+public final class JavaSqlNetworkWordCount {
+  private static final Pattern SPACE = Pattern.compile(" ");
+
+  public static void main(String[] args) {
+    if (args.length < 2) {
+      System.err.println("Usage: JavaNetworkWordCount <hostname> <port>");
+      System.exit(1);
+    }
+
+    StreamingExamples.setStreamingLogLevels();
+
+    // Create the context with a 1 second batch size
+    SparkConf sparkConf = new SparkConf().setAppName("JavaSqlNetworkWordCount");
+    JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.seconds(1));
+
+    // Create a JavaReceiverInputDStream on target ip:port and count the
+    // words in input stream of \n delimited text (eg. generated by 'nc')
+    // Note that no duplication in storage level only for running locally.
+    // Replication necessary in distributed scenario for fault tolerance.
+    JavaReceiverInputDStream<String> lines = ssc.socketTextStream(
+        args[0], Integer.parseInt(args[1]), StorageLevels.MEMORY_AND_DISK_SER);
+    JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>()
{
+      @Override
+      public Iterable<String> call(String x) {
+        return Lists.newArrayList(SPACE.split(x));
+      }
+    });
+
+    // Convert RDDs of the words DStream to DataFrame and run SQL query
+    words.foreachRDD(new Function2<JavaRDD<String>, Time, Void>() {
+      @Override
+      public Void call(JavaRDD<String> rdd, Time time) {
+        SQLContext sqlContext = JavaSQLContextSingleton.getInstance(rdd.context());
+
+        // Convert JavaRDD[String] to JavaRDD[bean class] to DataFrame
+        JavaRDD<JavaRecord> rowRDD = rdd.map(new Function<String, JavaRecord>()
{
+          public JavaRecord call(String word) {
+            JavaRecord record = new JavaRecord();
+            record.setWord(word);
+            return record;
+          }
+        });
+        DataFrame wordsDataFrame = sqlContext.createDataFrame(rowRDD, JavaRecord.class);
+
+        // Register as table
+        wordsDataFrame.registerTempTable("words");
+
+        // Do word count on table using SQL and print it
+        DataFrame wordCountsDataFrame =
+            sqlContext.sql("select word, count(*) as total from words group by word");
+        System.out.println("========= " + time + "=========");
+        wordCountsDataFrame.show();
+        return null;
+      }
+    });
+
+    ssc.start();
+    ssc.awaitTermination();
+  }
+}
+
+/** Lazily instantiated singleton instance of SQLContext */
+class JavaSQLContextSingleton {
+  static private transient SQLContext instance = null;
+  static public SQLContext getInstance(SparkContext sparkContext) {
+    if (instance == null) {
+      instance = new SQLContext(sparkContext);
+    }
+    return instance;
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/ac61466f/examples/src/main/python/streaming/sql_network_wordcount.py
----------------------------------------------------------------------
diff --git a/examples/src/main/python/streaming/sql_network_wordcount.py b/examples/src/main/python/streaming/sql_network_wordcount.py
new file mode 100644
index 0000000..f89bc56
--- /dev/null
+++ b/examples/src/main/python/streaming/sql_network_wordcount.py
@@ -0,0 +1,82 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+ Use DataFrames and SQL to count words in UTF8 encoded, '\n' delimited text received from
the
+ network every second.
+
+ Usage: sql_network_wordcount.py <hostname> <port>
+   <hostname> and <port> describe the TCP server that Spark Streaming would connect
to receive data.
+
+ To run this on your local machine, you need to first run a Netcat server
+    `$ nc -lk 9999`
+ and then run the example
+    `$ bin/spark-submit examples/src/main/python/streaming/sql_network_wordcount.py localhost
9999`
+"""
+
+import os
+import sys
+
+from pyspark import SparkContext
+from pyspark.streaming import StreamingContext
+from pyspark.sql import SQLContext, Row
+
+
+def getSqlContextInstance(sparkContext):
+    if ('sqlContextSingletonInstance' not in globals()):
+        globals()['sqlContextSingletonInstance'] = SQLContext(sparkContext)
+    return globals()['sqlContextSingletonInstance']
+
+
+if __name__ == "__main__":
+    if len(sys.argv) != 3:
+        print >> sys.stderr, "Usage: sql_network_wordcount.py <hostname> <port>
"
+        exit(-1)
+    host, port = sys.argv[1:]
+    sc = SparkContext(appName="PythonSqlNetworkWordCount")
+    ssc = StreamingContext(sc, 1)
+
+    # Create a socket stream on target ip:port and count the
+    # words in input stream of \n delimited text (eg. generated by 'nc')
+    lines = ssc.socketTextStream(host, int(port))
+    words = lines.flatMap(lambda line: line.split(" "))
+
+    # Convert RDDs of the words DStream to DataFrame and run SQL query
+    def process(time, rdd):
+        print "========= %s =========" % str(time)
+
+        try:
+            # Get the singleton instance of SQLContext
+            sqlContext = getSqlContextInstance(rdd.context)
+
+            # Convert RDD[String] to RDD[Row] to DataFrame
+            rowRdd = rdd.map(lambda w: Row(word=w))
+            wordsDataFrame = sqlContext.createDataFrame(rowRdd)
+
+            # Register as table
+            wordsDataFrame.registerTempTable("words")
+
+            # Do word count on table using SQL and print it
+            wordCountsDataFrame = \
+                sqlContext.sql("select word, count(*) as total from words group by word")
+            wordCountsDataFrame.show()
+        except:
+            pass
+
+    words.foreachRDD(process)
+    ssc.start()
+    ssc.awaitTermination()

http://git-wip-us.apache.org/repos/asf/spark/blob/ac61466f/examples/src/main/scala/org/apache/spark/examples/streaming/SqlNetworkWordCount.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/SqlNetworkWordCount.scala
b/examples/src/main/scala/org/apache/spark/examples/streaming/SqlNetworkWordCount.scala
new file mode 100644
index 0000000..5a6b921
--- /dev/null
+++ b/examples/src/main/scala/org/apache/spark/examples/streaming/SqlNetworkWordCount.scala
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.examples.streaming
+
+import org.apache.spark.SparkConf
+import org.apache.spark.SparkContext
+import org.apache.spark.rdd.RDD
+import org.apache.spark.streaming.{Time, Seconds, StreamingContext}
+import org.apache.spark.util.IntParam
+import org.apache.spark.sql.SQLContext
+import org.apache.spark.storage.StorageLevel
+
+/**
+ * Use DataFrames and SQL to count words in UTF8 encoded, '\n' delimited text received from
the
+ * network every second.
+ *
+ * Usage: SqlNetworkWordCount <hostname> <port>
+ * <hostname> and <port> describe the TCP server that Spark Streaming would connect
to receive data.
+ *
+ * To run this on your local machine, you need to first run a Netcat server
+ *    `$ nc -lk 9999`
+ * and then run the example
+ *    `$ bin/run-example org.apache.spark.examples.streaming.SqlNetworkWordCount localhost
9999`
+ */
+
+object SqlNetworkWordCount {
+  def main(args: Array[String]) {
+    if (args.length < 2) {
+      System.err.println("Usage: NetworkWordCount <hostname> <port>")
+      System.exit(1)
+    }
+
+    StreamingExamples.setStreamingLogLevels()
+
+    // Create the context with a 2 second batch size
+    val sparkConf = new SparkConf().setAppName("SqlNetworkWordCount")
+    val ssc = new StreamingContext(sparkConf, Seconds(2))
+
+    // Create a socket stream on target ip:port and count the
+    // words in input stream of \n delimited text (eg. generated by 'nc')
+    // Note that no duplication in storage level only for running locally.
+    // Replication necessary in distributed scenario for fault tolerance.
+    val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_AND_DISK_SER)
+    val words = lines.flatMap(_.split(" "))
+
+    // Convert RDDs of the words DStream to DataFrame and run SQL query
+    words.foreachRDD((rdd: RDD[String], time: Time) => {
+      // Get the singleton instance of SQLContext
+      val sqlContext = SQLContextSingleton.getInstance(rdd.sparkContext)
+      import sqlContext.implicits._
+
+      // Convert RDD[String] to RDD[case class] to DataFrame
+      val wordsDataFrame = rdd.map(w => Record(w)).toDF()
+
+      // Register as table
+      wordsDataFrame.registerTempTable("words")
+
+      // Do word count on table using SQL and print it
+      val wordCountsDataFrame =
+        sqlContext.sql("select word, count(*) as total from words group by word")
+      println(s"========= $time =========")
+      wordCountsDataFrame.show()
+    })
+
+    ssc.start()
+    ssc.awaitTermination()
+  }
+}
+
+
+/** Case class for converting RDD to DataFrame */
+case class Record(word: String)
+
+
+/** Lazily instantiated singleton instance of SQLContext */
+object SQLContextSingleton {
+
+  @transient  private var instance: SQLContext = _
+
+  def getInstance(sparkContext: SparkContext): SQLContext = {
+    if (instance == null) {
+      instance = new SQLContext(sparkContext)
+    }
+    instance
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message