spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From t...@apache.org
Subject spark git commit: [SPARK-16114][SQL] structured streaming event time window example
Date Tue, 12 Jul 2016 00:57:55 GMT
Repository: spark
Updated Branches:
  refs/heads/master b4fbe140b -> 9e2c763db


[SPARK-16114][SQL] structured streaming event time window example

## What changes were proposed in this pull request?

A structured streaming example with event time windowing.

## How was this patch tested?

Run locally

Author: James Thomas <jamesjoethomas@gmail.com>

Closes #13957 from jjthomas/current.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9e2c763d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9e2c763d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9e2c763d

Branch: refs/heads/master
Commit: 9e2c763dbb5ac6fc5d2eb0759402504d4b9073a4
Parents: b4fbe14
Author: James Thomas <jamesjoethomas@gmail.com>
Authored: Mon Jul 11 17:57:51 2016 -0700
Committer: Tathagata Das <tathagata.das1565@gmail.com>
Committed: Mon Jul 11 17:57:51 2016 -0700

----------------------------------------------------------------------
 .../JavaStructuredNetworkWordCount.java         |   4 +-
 .../JavaStructuredNetworkWordCountWindowed.java | 116 +++++++++++++++++++
 .../streaming/structured_network_wordcount.py   |   3 +-
 .../structured_network_wordcount_windowed.py    | 102 ++++++++++++++++
 .../streaming/StructuredNetworkWordCount.scala  |   2 +-
 .../StructuredNetworkWordCountWindowed.scala    | 103 ++++++++++++++++
 .../spark/sql/execution/streaming/socket.scala  |  47 ++++++--
 .../streaming/TextSocketStreamSuite.scala       |  52 ++++++++-
 8 files changed, 415 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/9e2c763d/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredNetworkWordCount.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredNetworkWordCount.java
b/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredNetworkWordCount.java
index a2cf938..346d218 100644
--- a/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredNetworkWordCount.java
+++ b/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredNetworkWordCount.java
@@ -24,7 +24,7 @@ import java.util.Arrays;
 import java.util.Iterator;
 
 /**
- * Counts words in UTF8 encoded, '\n' delimited text received from the network every second.
+ * Counts words in UTF8 encoded, '\n' delimited text received from the network.
  *
  * Usage: JavaStructuredNetworkWordCount <hostname> <port>
  * <hostname> and <port> describe the TCP server that Structured Streaming
@@ -40,7 +40,7 @@ public final class JavaStructuredNetworkWordCount {
 
   public static void main(String[] args) throws Exception {
     if (args.length < 2) {
-      System.err.println("Usage: JavaNetworkWordCount <hostname> <port>");
+      System.err.println("Usage: JavaStructuredNetworkWordCount <hostname> <port>");
       System.exit(1);
     }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/9e2c763d/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredNetworkWordCountWindowed.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredNetworkWordCountWindowed.java
b/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredNetworkWordCountWindowed.java
new file mode 100644
index 0000000..557d36c
--- /dev/null
+++ b/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredNetworkWordCountWindowed.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.examples.sql.streaming;
+
+import org.apache.spark.api.java.function.FlatMapFunction;
+import org.apache.spark.sql.*;
+import org.apache.spark.sql.functions;
+import org.apache.spark.sql.streaming.StreamingQuery;
+import scala.Tuple2;
+
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+/**
+ * Counts words in UTF8 encoded, '\n' delimited text received from the network over a
+ * sliding window of configurable duration. Each line from the network is tagged
+ * with a timestamp that is used to determine the windows into which it falls.
+ *
+ * Usage: JavaStructuredNetworkWordCountWindowed <hostname> <port> <window
duration>
+ *   [<slide duration>]
+ * <hostname> and <port> describe the TCP server that Structured Streaming
+ * would connect to receive data.
+ * <window duration> gives the size of window, specified as integer number of seconds
+ * <slide duration> gives the amount of time successive windows are offset from one
another,
+ * given in the same units as above. <slide duration> should be less than or equal
to
+ * <window duration>. If the two are equal, successive windows have no overlap. If
+ * <slide duration> is not provided, it defaults to <window duration>.
+ *
+ * To run this on your local machine, you need to first run a Netcat server
+ *    `$ nc -lk 9999`
+ * and then run the example
+ *    `$ bin/run-example sql.streaming.JavaStructuredNetworkWordCountWindowed
+ *    localhost 9999 <window duration in seconds> [<slide duration in seconds>]`
+ *
+ * One recommended <window duration>, <slide duration> pair is 10, 5
+ */
+public final class JavaStructuredNetworkWordCountWindowed {
+
+  public static void main(String[] args) throws Exception {
+    if (args.length < 3) {
+      System.err.println("Usage: JavaStructuredNetworkWordCountWindowed <hostname>
<port>" +
+        " <window duration in seconds> [<slide duration in seconds>]");
+      System.exit(1);
+    }
+
+    String host = args[0];
+    int port = Integer.parseInt(args[1]);
+    int windowSize = Integer.parseInt(args[2]);
+    int slideSize = (args.length == 3) ? windowSize : Integer.parseInt(args[3]);
+    if (slideSize > windowSize) {
+      System.err.println("<slide duration> must be less than or equal to <window
duration>");
+    }
+    String windowDuration = windowSize + " seconds";
+    String slideDuration = slideSize + " seconds";
+
+    SparkSession spark = SparkSession
+      .builder()
+      .appName("JavaStructuredNetworkWordCountWindowed")
+      .getOrCreate();
+
+    // Create DataFrame representing the stream of input lines from connection to host:port
+    Dataset<Tuple2<String, Timestamp>> lines = spark
+      .readStream()
+      .format("socket")
+      .option("host", host)
+      .option("port", port)
+      .option("includeTimestamp", true)
+      .load().as(Encoders.tuple(Encoders.STRING(), Encoders.TIMESTAMP()));
+
+    // Split the lines into words, retaining timestamps
+    Dataset<Row> words = lines.flatMap(
+      new FlatMapFunction<Tuple2<String, Timestamp>, Tuple2<String, Timestamp>>()
{
+        @Override
+        public Iterator<Tuple2<String, Timestamp>> call(Tuple2<String, Timestamp>
t) {
+          List<Tuple2<String, Timestamp>> result = new ArrayList<>();
+          for (String word : t._1.split(" ")) {
+            result.add(new Tuple2<>(word, t._2));
+          }
+          return result.iterator();
+        }
+      },
+      Encoders.tuple(Encoders.STRING(), Encoders.TIMESTAMP())
+    ).toDF("word", "timestamp");
+
+    // Group the data by window and word and compute the count of each group
+    Dataset<Row> windowedCounts = words.groupBy(
+      functions.window(words.col("timestamp"), windowDuration, slideDuration),
+      words.col("word")
+    ).count().orderBy("window");
+
+    // Start running the query that prints the windowed word counts to the console
+    StreamingQuery query = windowedCounts.writeStream()
+      .outputMode("complete")
+      .format("console")
+      .option("truncate", "false")
+      .start();
+
+    query.awaitTermination();
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/9e2c763d/examples/src/main/python/sql/streaming/structured_network_wordcount.py
----------------------------------------------------------------------
diff --git a/examples/src/main/python/sql/streaming/structured_network_wordcount.py b/examples/src/main/python/sql/streaming/structured_network_wordcount.py
index 32d63c5..afde255 100644
--- a/examples/src/main/python/sql/streaming/structured_network_wordcount.py
+++ b/examples/src/main/python/sql/streaming/structured_network_wordcount.py
@@ -16,7 +16,7 @@
 #
 
 """
- Counts words in UTF8 encoded, '\n' delimited text received from the network every second.
+ Counts words in UTF8 encoded, '\n' delimited text received from the network.
  Usage: structured_network_wordcount.py <hostname> <port>
    <hostname> and <port> describe the TCP server that Structured Streaming
    would connect to receive data.
@@ -58,6 +58,7 @@ if __name__ == "__main__":
 
     # Split the lines into words
     words = lines.select(
+        # explode turns each item in an array into a separate row
         explode(
             split(lines.value, ' ')
         ).alias('word')

http://git-wip-us.apache.org/repos/asf/spark/blob/9e2c763d/examples/src/main/python/sql/streaming/structured_network_wordcount_windowed.py
----------------------------------------------------------------------
diff --git a/examples/src/main/python/sql/streaming/structured_network_wordcount_windowed.py
b/examples/src/main/python/sql/streaming/structured_network_wordcount_windowed.py
new file mode 100644
index 0000000..02a7d33
--- /dev/null
+++ b/examples/src/main/python/sql/streaming/structured_network_wordcount_windowed.py
@@ -0,0 +1,102 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+ Counts words in UTF8 encoded, '\n' delimited text received from the network over a
+ sliding window of configurable duration. Each line from the network is tagged
+ with a timestamp that is used to determine the windows into which it falls.
+
+ Usage: structured_network_wordcount_windowed.py <hostname> <port> <window
duration>
+   [<slide duration>]
+ <hostname> and <port> describe the TCP server that Structured Streaming
+ would connect to receive data.
+ <window duration> gives the size of window, specified as integer number of seconds
+ <slide duration> gives the amount of time successive windows are offset from one another,
+ given in the same units as above. <slide duration> should be less than or equal to
+ <window duration>. If the two are equal, successive windows have no overlap. If
+ <slide duration> is not provided, it defaults to <window duration>.
+
+ To run this on your local machine, you need to first run a Netcat server
+    `$ nc -lk 9999`
+ and then run the example
+    `$ bin/spark-submit
+    examples/src/main/python/sql/streaming/structured_network_wordcount_windowed.py
+    localhost 9999 <window duration> [<slide duration>]`
+
+ One recommended <window duration>, <slide duration> pair is 10, 5
+"""
+from __future__ import print_function
+
+import sys
+
+from pyspark.sql import SparkSession
+from pyspark.sql.functions import explode
+from pyspark.sql.functions import split
+from pyspark.sql.functions import window
+
+if __name__ == "__main__":
+    if len(sys.argv) != 5 and len(sys.argv) != 4:
+        msg = ("Usage: structured_network_wordcount_windowed.py <hostname> <port>
"
+               "<window duration in seconds> [<slide duration in seconds>]")
+        print(msg, file=sys.stderr)
+        exit(-1)
+
+    host = sys.argv[1]
+    port = int(sys.argv[2])
+    windowSize = int(sys.argv[3])
+    slideSize = int(sys.argv[4]) if (len(sys.argv) == 5) else windowSize
+    if slideSize > windowSize:
+        print("<slide duration> must be less than or equal to <window duration>",
file=sys.stderr)
+    windowDuration = '{} seconds'.format(windowSize)
+    slideDuration = '{} seconds'.format(slideSize)
+
+    spark = SparkSession\
+        .builder\
+        .appName("StructuredNetworkWordCountWindowed")\
+        .getOrCreate()
+
+    # Create DataFrame representing the stream of input lines from connection to host:port
+    lines = spark\
+        .readStream\
+        .format('socket')\
+        .option('host', host)\
+        .option('port', port)\
+        .option('includeTimestamp', 'true')\
+        .load()
+
+    # Split the lines into words, retaining timestamps
+    # split() splits each line into an array, and explode() turns the array into multiple
rows
+    words = lines.select(
+        explode(split(lines.value, ' ')).alias('word'),
+        lines.timestamp
+    )
+
+    # Group the data by window and word and compute the count of each group
+    windowedCounts = words.groupBy(
+        window(words.timestamp, windowDuration, slideDuration),
+        words.word
+    ).count().orderBy('window')
+
+    # Start running the query that prints the windowed word counts to the console
+    query = windowedCounts\
+        .writeStream\
+        .outputMode('complete')\
+        .format('console')\
+        .option('truncate', 'false')\
+        .start()
+
+    query.awaitTermination()

http://git-wip-us.apache.org/repos/asf/spark/blob/9e2c763d/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredNetworkWordCount.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredNetworkWordCount.scala
b/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredNetworkWordCount.scala
index 433f7a1..364bff2 100644
--- a/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredNetworkWordCount.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredNetworkWordCount.scala
@@ -22,7 +22,7 @@ import org.apache.spark.sql.functions._
 import org.apache.spark.sql.SparkSession
 
 /**
- * Counts words in UTF8 encoded, '\n' delimited text received from the network every second.
+ * Counts words in UTF8 encoded, '\n' delimited text received from the network.
  *
  * Usage: StructuredNetworkWordCount <hostname> <port>
  * <hostname> and <port> describe the TCP server that Structured Streaming

http://git-wip-us.apache.org/repos/asf/spark/blob/9e2c763d/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredNetworkWordCountWindowed.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredNetworkWordCountWindowed.scala
b/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredNetworkWordCountWindowed.scala
new file mode 100644
index 0000000..333b0a9
--- /dev/null
+++ b/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredNetworkWordCountWindowed.scala
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// scalastyle:off println
+package org.apache.spark.examples.sql.streaming
+
+import java.sql.Timestamp
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.functions._
+
+/**
+ * Counts words in UTF8 encoded, '\n' delimited text received from the network over a
+ * sliding window of configurable duration. Each line from the network is tagged
+ * with a timestamp that is used to determine the windows into which it falls.
+ *
+ * Usage: StructuredNetworkWordCountWindowed <hostname> <port> <window duration>
+ *   [<slide duration>]
+ * <hostname> and <port> describe the TCP server that Structured Streaming
+ * would connect to receive data.
+ * <window duration> gives the size of window, specified as integer number of seconds
+ * <slide duration> gives the amount of time successive windows are offset from one
another,
+ * given in the same units as above. <slide duration> should be less than or equal
to
+ * <window duration>. If the two are equal, successive windows have no overlap. If
+ * <slide duration> is not provided, it defaults to <window duration>.
+ *
+ * To run this on your local machine, you need to first run a Netcat server
+ *    `$ nc -lk 9999`
+ * and then run the example
+ *    `$ bin/run-example sql.streaming.StructuredNetworkWordCountWindowed
+ *    localhost 9999 <window duration in seconds> [<slide duration in seconds>]`
+ *
+ * One recommended <window duration>, <slide duration> pair is 10, 5
+ */
+object StructuredNetworkWordCountWindowed {
+
+  def main(args: Array[String]) {
+    if (args.length < 3) {
+      System.err.println("Usage: StructuredNetworkWordCountWindowed <hostname> <port>"
+
+        " <window duration in seconds> [<slide duration in seconds>]")
+      System.exit(1)
+    }
+
+    val host = args(0)
+    val port = args(1).toInt
+    val windowSize = args(2).toInt
+    val slideSize = if (args.length == 3) windowSize else args(3).toInt
+    if (slideSize > windowSize) {
+      System.err.println("<slide duration> must be less than or equal to <window
duration>")
+    }
+    val windowDuration = s"$windowSize seconds"
+    val slideDuration = s"$slideSize seconds"
+
+    val spark = SparkSession
+      .builder
+      .appName("StructuredNetworkWordCountWindowed")
+      .getOrCreate()
+
+    import spark.implicits._
+
+    // Create DataFrame representing the stream of input lines from connection to host:port
+    val lines = spark.readStream
+      .format("socket")
+      .option("host", host)
+      .option("port", port)
+      .option("includeTimestamp", true)
+      .load().as[(String, Timestamp)]
+
+    // Split the lines into words, retaining timestamps
+    val words = lines.flatMap(line =>
+      line._1.split(" ").map(word => (word, line._2))
+    ).toDF("word", "timestamp")
+
+    // Group the data by window and word and compute the count of each group
+    val windowedCounts = words.groupBy(
+      window($"timestamp", windowDuration, slideDuration), $"word"
+    ).count().orderBy("window")
+
+    // Start running the query that prints the windowed word counts to the console
+    val query = windowedCounts.writeStream
+      .outputMode("complete")
+      .format("console")
+      .option("truncate", "false")
+      .start()
+
+    query.awaitTermination()
+  }
+}
+// scalastyle:on println

http://git-wip-us.apache.org/repos/asf/spark/blob/9e2c763d/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/socket.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/socket.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/socket.scala
index d07d88d..fb15239 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/socket.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/socket.scala
@@ -19,17 +19,24 @@ package org.apache.spark.sql.execution.streaming
 
 import java.io.{BufferedReader, InputStreamReader, IOException}
 import java.net.Socket
+import java.sql.Timestamp
+import java.text.SimpleDateFormat
+import java.util.Calendar
 import javax.annotation.concurrent.GuardedBy
 
 import scala.collection.mutable.ArrayBuffer
+import scala.util.{Failure, Success, Try}
 
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.{AnalysisException, DataFrame, SQLContext}
 import org.apache.spark.sql.sources.{DataSourceRegister, StreamSourceProvider}
-import org.apache.spark.sql.types.{StringType, StructField, StructType}
+import org.apache.spark.sql.types.{StringType, StructField, StructType, TimestampType}
 
 object TextSocketSource {
-  val SCHEMA = StructType(StructField("value", StringType) :: Nil)
+  val SCHEMA_REGULAR = StructType(StructField("value", StringType) :: Nil)
+  val SCHEMA_TIMESTAMP = StructType(StructField("value", StringType) ::
+    StructField("timestamp", TimestampType) :: Nil)
+  val DATE_FORMAT = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
 }
 
 /**
@@ -37,7 +44,7 @@ object TextSocketSource {
  * This source will *not* work in production applications due to multiple reasons, including
no
  * support for fault recovery and keeping all of the text read in memory forever.
  */
-class TextSocketSource(host: String, port: Int, sqlContext: SQLContext)
+class TextSocketSource(host: String, port: Int, includeTimestamp: Boolean, sqlContext: SQLContext)
   extends Source with Logging
 {
   @GuardedBy("this")
@@ -47,7 +54,7 @@ class TextSocketSource(host: String, port: Int, sqlContext: SQLContext)
   private var readThread: Thread = null
 
   @GuardedBy("this")
-  private var lines = new ArrayBuffer[String]
+  private var lines = new ArrayBuffer[(String, Timestamp)]
 
   initialize()
 
@@ -67,7 +74,10 @@ class TextSocketSource(host: String, port: Int, sqlContext: SQLContext)
               return
             }
             TextSocketSource.this.synchronized {
-              lines += line
+              lines += ((line,
+                Timestamp.valueOf(
+                  TextSocketSource.DATE_FORMAT.format(Calendar.getInstance().getTime()))
+                ))
             }
           }
         } catch {
@@ -79,7 +89,8 @@ class TextSocketSource(host: String, port: Int, sqlContext: SQLContext)
   }
 
   /** Returns the schema of the data from this source */
-  override def schema: StructType = TextSocketSource.SCHEMA
+  override def schema: StructType = if (includeTimestamp) TextSocketSource.SCHEMA_TIMESTAMP
+  else TextSocketSource.SCHEMA_REGULAR
 
   /** Returns the maximum available offset for this source. */
   override def getOffset: Option[Offset] = synchronized {
@@ -92,7 +103,11 @@ class TextSocketSource(host: String, port: Int, sqlContext: SQLContext)
     val endIdx = end.asInstanceOf[LongOffset].offset.toInt + 1
     val data = synchronized { lines.slice(startIdx, endIdx) }
     import sqlContext.implicits._
-    data.toDF("value")
+    if (includeTimestamp) {
+      data.toDF("value", "timestamp")
+    } else {
+      data.map(_._1).toDF("value")
+    }
   }
 
   /** Stop this source. */
@@ -111,6 +126,14 @@ class TextSocketSource(host: String, port: Int, sqlContext: SQLContext)
 }
 
 class TextSocketSourceProvider extends StreamSourceProvider with DataSourceRegister with
Logging {
+  private def parseIncludeTimestamp(params: Map[String, String]): Boolean = {
+    Try(params.getOrElse("includeTimestamp", "false").toBoolean) match {
+      case Success(bool) => bool
+      case Failure(_) =>
+        throw new AnalysisException("includeTimestamp must be set to either \"true\" or \"false\"")
+    }
+  }
+
   /** Returns the name and schema of the source that can be used to continually read data.
*/
   override def sourceSchema(
       sqlContext: SQLContext,
@@ -125,7 +148,13 @@ class TextSocketSourceProvider extends StreamSourceProvider with DataSourceRegis
     if (!parameters.contains("port")) {
       throw new AnalysisException("Set a port to read from with option(\"port\", ...).")
     }
-    ("textSocket", TextSocketSource.SCHEMA)
+    val schema =
+      if (parseIncludeTimestamp(parameters)) {
+        TextSocketSource.SCHEMA_TIMESTAMP
+      } else {
+        TextSocketSource.SCHEMA_REGULAR
+      }
+    ("textSocket", schema)
   }
 
   override def createSource(
@@ -136,7 +165,7 @@ class TextSocketSourceProvider extends StreamSourceProvider with DataSourceRegis
       parameters: Map[String, String]): Source = {
     val host = parameters("host")
     val port = parameters("port").toInt
-    new TextSocketSource(host, port, sqlContext)
+    new TextSocketSource(host, port, parseIncludeTimestamp(parameters), sqlContext)
   }
 
   /** String that represents the format that this data source provider uses. */

http://git-wip-us.apache.org/repos/asf/spark/blob/9e2c763d/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/TextSocketStreamSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/TextSocketStreamSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/TextSocketStreamSuite.scala
index ca57763..6b0ba7a 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/TextSocketStreamSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/TextSocketStreamSuite.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.streaming
 
 import java.io.{IOException, OutputStreamWriter}
 import java.net.ServerSocket
+import java.sql.Timestamp
 import java.util.concurrent.LinkedBlockingQueue
 
 import org.scalatest.BeforeAndAfterEach
@@ -27,7 +28,7 @@ import org.apache.spark.internal.Logging
 import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.streaming.StreamTest
 import org.apache.spark.sql.test.SharedSQLContext
-import org.apache.spark.sql.types.{StringType, StructField, StructType}
+import org.apache.spark.sql.types.{StringType, StructField, StructType, TimestampType}
 
 class TextSocketStreamSuite extends StreamTest with SharedSQLContext with BeforeAndAfterEach
{
   import testImplicits._
@@ -85,6 +86,47 @@ class TextSocketStreamSuite extends StreamTest with SharedSQLContext with
Before
     }
   }
 
+  test("timestamped usage") {
+    serverThread = new ServerThread()
+    serverThread.start()
+
+    val provider = new TextSocketSourceProvider
+    val parameters = Map("host" -> "localhost", "port" -> serverThread.port.toString,
+      "includeTimestamp" -> "true")
+    val schema = provider.sourceSchema(sqlContext, None, "", parameters)._2
+    assert(schema === StructType(StructField("value", StringType) ::
+      StructField("timestamp", TimestampType) :: Nil))
+
+    source = provider.createSource(sqlContext, "", None, "", parameters)
+
+    failAfter(streamingTimeout) {
+      serverThread.enqueue("hello")
+      while (source.getOffset.isEmpty) {
+        Thread.sleep(10)
+      }
+      val offset1 = source.getOffset.get
+      val batch1 = source.getBatch(None, offset1)
+      val batch1Seq = batch1.as[(String, Timestamp)].collect().toSeq
+      assert(batch1Seq.map(_._1) === Seq("hello"))
+      val batch1Stamp = batch1Seq(0)._2
+
+      serverThread.enqueue("world")
+      while (source.getOffset.get === offset1) {
+        Thread.sleep(10)
+      }
+      val offset2 = source.getOffset.get
+      val batch2 = source.getBatch(Some(offset1), offset2)
+      val batch2Seq = batch2.as[(String, Timestamp)].collect().toSeq
+      assert(batch2Seq.map(_._1) === Seq("world"))
+      val batch2Stamp = batch2Seq(0)._2
+      assert(!batch2Stamp.before(batch1Stamp))
+
+      // Try stopping the source to make sure this does not block forever.
+      source.stop()
+      source = null
+    }
+  }
+
   test("params not given") {
     val provider = new TextSocketSourceProvider
     intercept[AnalysisException] {
@@ -98,6 +140,14 @@ class TextSocketStreamSuite extends StreamTest with SharedSQLContext with
Before
     }
   }
 
+  test("non-boolean includeTimestamp") {
+    val provider = new TextSocketSourceProvider
+    intercept[AnalysisException] {
+      provider.sourceSchema(sqlContext, None, "", Map("host" -> "localhost",
+      "port" -> "1234", "includeTimestamp" -> "fasle"))
+    }
+  }
+
   test("no server up") {
     val provider = new TextSocketSourceProvider
     val parameters = Map("host" -> "localhost", "port" -> "0")


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message