spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From t...@apache.org
Subject git commit: [SPARK-1633][Streaming] Java API unit test and example for custom streaming receiver in Java
Date Mon, 28 Apr 2014 20:58:16 GMT
Repository: spark
Updated Branches:
  refs/heads/master f73588441 -> 1d84964bf


[SPARK-1633][Streaming] Java API unit test and example for custom streaming receiver in Java

Author: Tathagata Das <tathagata.das1565@gmail.com>

Closes #558 from tdas/more-fixes and squashes the following commits:

c0c84e6 [Tathagata Das] Removing extra println()
d8a8cf4 [Tathagata Das] More tweaks to make unit test work in Jenkins.
b7caa98 [Tathagata Das] More tweaks.
d337367 [Tathagata Das] More tweaks
22d6f2d [Tathagata Das] Merge remote-tracking branch 'apache/master' into more-fixes
40a961b [Tathagata Das] Modified java test to reduce flakiness.
9410ca6 [Tathagata Das] Merge remote-tracking branch 'apache/master' into more-fixes
86d9147 [Tathagata Das] scala style fix
2f3d7b1 [Tathagata Das] Added Scala custom receiver example.
d677611 [Tathagata Das] Merge remote-tracking branch 'apache/master' into more-fixes
bec3fc2 [Tathagata Das] Added license.
51d6514 [Tathagata Das] Fixed docs on receiver.
81aafa0 [Tathagata Das] Added Java test for Receiver API, and added JavaCustomReceiver example.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1d84964b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1d84964b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1d84964b

Branch: refs/heads/master
Commit: 1d84964bf80f4e69e54d62286c3861c2362342d0
Parents: f735884
Author: Tathagata Das <tathagata.das1565@gmail.com>
Authored: Mon Apr 28 13:58:09 2014 -0700
Committer: Tathagata Das <tathagata.das1565@gmail.com>
Committed: Mon Apr 28 13:58:09 2014 -0700

----------------------------------------------------------------------
 .../streaming/examples/JavaCustomReceiver.java  | 152 +++++++++++++++++++
 .../examples/JavaNetworkWordCount.java          |   5 +-
 .../streaming/examples/CustomReceiver.scala     | 108 +++++++++++++
 .../api/java/JavaStreamingContext.scala         |   2 +-
 .../spark/streaming/receiver/Receiver.scala     |  90 +++++++----
 .../spark/streaming/JavaReceiverAPISuite.java   | 144 ++++++++++++++++++
 .../apache/spark/streaming/JavaTestUtils.scala  |   4 +-
 .../spark/streaming/InputStreamsSuite.scala     |   3 +-
 .../spark/streaming/StreamingContextSuite.scala |   2 +-
 .../apache/spark/streaming/TestSuiteBase.scala  |   1 +
 10 files changed, 476 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/1d84964b/examples/src/main/java/org/apache/spark/streaming/examples/JavaCustomReceiver.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/spark/streaming/examples/JavaCustomReceiver.java
b/examples/src/main/java/org/apache/spark/streaming/examples/JavaCustomReceiver.java
new file mode 100644
index 0000000..a94fa62
--- /dev/null
+++ b/examples/src/main/java/org/apache/spark/streaming/examples/JavaCustomReceiver.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.examples;
+
+import com.google.common.collect.Lists;
+
+import org.apache.spark.api.java.function.FlatMapFunction;
+import org.apache.spark.api.java.function.Function2;
+import org.apache.spark.api.java.function.PairFunction;
+import org.apache.spark.storage.StorageLevel;
+import org.apache.spark.streaming.Duration;
+import org.apache.spark.streaming.api.java.JavaDStream;
+import org.apache.spark.streaming.api.java.JavaPairDStream;
+import org.apache.spark.streaming.api.java.JavaStreamingContext;
+import org.apache.spark.streaming.receiver.Receiver;
+import scala.Tuple2;
+
+import java.io.BufferedReader;
+import java.io.InputStreamReader;
+import java.net.ConnectException;
+import java.net.Socket;
+import java.util.regex.Pattern;
+
+/**
+ * Custom Receiver that receives data over a socket. Received bytes is interpreted as
+ * text and \n delimited lines are considered as records. They are then counted and printed.
+ *
+ * Usage: JavaCustomReceiver <master> <hostname> <port>
+ *   <master> is the Spark master URL. In local mode, <master> should be 'local[n]'
with n > 1.
+ *   <hostname> and <port> of the TCP server that Spark Streaming would connect
to receive data.
+ *
+ * To run this on your local machine, you need to first run a Netcat server
+ *    `$ nc -lk 9999`
+ * and then run the example
+ *    `$ ./run org.apache.spark.streaming.examples.JavaCustomReceiver local[2] localhost
9999`
+ */
+
+public class JavaCustomReceiver extends Receiver<String> {
+  private static final Pattern SPACE = Pattern.compile(" ");
+
+  public static void main(String[] args) {
+    if (args.length < 3) {
+      System.err.println("Usage: JavaNetworkWordCount <master> <hostname> <port>\n"
+
+          "In local mode, <master> should be 'local[n]' with n > 1");
+      System.exit(1);
+    }
+
+    StreamingExamples.setStreamingLogLevels();
+
+    // Create the context with a 1 second batch size
+    JavaStreamingContext ssc = new JavaStreamingContext(args[0], "JavaNetworkWordCount",
+            new Duration(1000), System.getenv("SPARK_HOME"),
+            JavaStreamingContext.jarOfClass(JavaNetworkWordCount.class));
+
+    // Create a input stream with the custom receiver on target ip:port and count the
+    // words in input stream of \n delimited text (eg. generated by 'nc')
+    JavaDStream<String> lines = ssc.receiverStream(
+      new JavaCustomReceiver(args[1], Integer.parseInt(args[2])));
+    JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>()
{
+      @Override
+      public Iterable<String> call(String x) {
+        return Lists.newArrayList(SPACE.split(x));
+      }
+    });
+    JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
+      new PairFunction<String, String, Integer>() {
+        @Override public Tuple2<String, Integer> call(String s) {
+          return new Tuple2<String, Integer>(s, 1);
+        }
+      }).reduceByKey(new Function2<Integer, Integer, Integer>() {
+        @Override
+        public Integer call(Integer i1, Integer i2) {
+          return i1 + i2;
+        }
+      });
+
+    wordCounts.print();
+    ssc.start();
+    ssc.awaitTermination();
+  }
+
+  // ============= Receiver code that receives data over a socket ==============
+
+  String host = null;
+  int port = -1;
+
+  public JavaCustomReceiver(String host_ , int port_) {
+    super(StorageLevel.MEMORY_AND_DISK_2());
+    host = host_;
+    port = port_;
+  }
+
+  public void onStart() {
+    // Start the thread that receives data over a connection
+    new Thread()  {
+      @Override public void run() {
+        receive();
+      }
+    }.start();
+  }
+
+  public void onStop() {
+    // There is nothing much to do as the thread calling receive()
+    // is designed to stop by itself isStopped() returns false
+  }
+
+  /** Create a socket connection and receive data until receiver is stopped */
+  private void receive() {
+    Socket socket = null;
+    String userInput = null;
+
+    try {
+      // connect to the server
+      socket = new Socket(host, port);
+
+      BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
+
+      // Until stopped or connection broken continue reading
+      while (!isStopped() && (userInput = reader.readLine()) != null) {
+        System.out.println("Received data '" + userInput + "'");
+        store(userInput);
+      }
+      reader.close();
+      socket.close();
+
+      // Restart in an attempt to connect again when server is active again
+      restart("Trying to connect again");
+    } catch(ConnectException ce) {
+      // restart if could not connect to server
+      restart("Could not connect", ce);
+    } catch(Throwable t) {
+      restart("Error receiving data", t);
+    }
+  }
+}
+
+

http://git-wip-us.apache.org/repos/asf/spark/blob/1d84964b/examples/src/main/java/org/apache/spark/streaming/examples/JavaNetworkWordCount.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/spark/streaming/examples/JavaNetworkWordCount.java
b/examples/src/main/java/org/apache/spark/streaming/examples/JavaNetworkWordCount.java
index 7f68d45..0cc9d0a 100644
--- a/examples/src/main/java/org/apache/spark/streaming/examples/JavaNetworkWordCount.java
+++ b/examples/src/main/java/org/apache/spark/streaming/examples/JavaNetworkWordCount.java
@@ -31,7 +31,7 @@ import java.util.regex.Pattern;
 
 /**
  * Counts words in UTF8 encoded, '\n' delimited text received from the network every second.
- * Usage: NetworkWordCount <master> <hostname> <port>
+ * Usage: JavaNetworkWordCount <master> <hostname> <port>
  *   <master> is the Spark master URL. In local mode, <master> should be 'local[n]'
with n > 1.
  *   <hostname> and <port> describe the TCP server that Spark Streaming would
connect to receive data.
  *
@@ -43,9 +43,6 @@ import java.util.regex.Pattern;
 public final class JavaNetworkWordCount {
   private static final Pattern SPACE = Pattern.compile(" ");
 
-  private JavaNetworkWordCount() {
-  }
-
   public static void main(String[] args) {
     if (args.length < 3) {
       System.err.println("Usage: JavaNetworkWordCount <master> <hostname> <port>\n"
+

http://git-wip-us.apache.org/repos/asf/spark/blob/1d84964b/examples/src/main/scala/org/apache/spark/streaming/examples/CustomReceiver.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/CustomReceiver.scala
b/examples/src/main/scala/org/apache/spark/streaming/examples/CustomReceiver.scala
new file mode 100644
index 0000000..eebffd8
--- /dev/null
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/CustomReceiver.scala
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.examples
+
+import java.io.{InputStreamReader, BufferedReader, InputStream}
+import java.net.Socket
+
+import org.apache.spark.Logging
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.{Seconds, StreamingContext}
+import org.apache.spark.streaming.StreamingContext._
+import org.apache.spark.streaming.receiver.Receiver
+
+/**
+ * Custom Receiver that receives data over a socket. Received bytes is interpreted as
+ * text and \n delimited lines are considered as records. They are then counted and printed.
+ *
+ * Usage: CustomReceiver <master> <hostname> <port>
+ *   <master> is the Spark master URL. In local mode, <master> should be 'local[n]'
with n > 1.
+ *   <hostname> and <port> of the TCP server that Spark Streaming would connect
to receive data.
+ *
+ * To run this on your local machine, you need to first run a Netcat server
+ *    `$ nc -lk 9999`
+ * and then run the example
+ *    `$ ./run org.apache.spark.streaming.examples.CustomReceiver local[2] localhost 9999`
+ */
+object CustomReceiver {
+  def main(args: Array[String]) {
+    if (args.length < 3) {
+      System.err.println("Usage: NetworkWordCount <master> <hostname> <port>\n"
+
+        "In local mode, <master> should be 'local[n]' with n > 1")
+      System.exit(1)
+    }
+
+    StreamingExamples.setStreamingLogLevels()
+
+    // Create the context with a 1 second batch size
+    val ssc = new StreamingContext(args(0), "NetworkWordCount", Seconds(1),
+      System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass).toSeq)
+
+    // Create a input stream with the custom receiver on target ip:port and count the
+    // words in input stream of \n delimited text (eg. generated by 'nc')
+    val lines = ssc.receiverStream(new CustomReceiver(args(1), args(2).toInt))
+    val words = lines.flatMap(_.split(" "))
+    val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
+    wordCounts.print()
+    ssc.start()
+    ssc.awaitTermination()
+  }
+}
+
+
+class CustomReceiver(host: String, port: Int)
+  extends Receiver[String](StorageLevel.MEMORY_AND_DISK_2) with Logging {
+
+  def onStart() {
+    // Start the thread that receives data over a connection
+    new Thread("Socket Receiver") {
+      override def run() { receive() }
+    }.start()
+  }
+
+  def onStop() {
+   // There is nothing much to do as the thread calling receive()
+   // is designed to stop by itself isStopped() returns false
+  }
+
+  /** Create a socket connection and receive data until receiver is stopped */
+  private def receive() {
+   var socket: Socket = null
+   var userInput: String = null
+   try {
+     logInfo("Connecting to " + host + ":" + port)
+     socket = new Socket(host, port)
+     logInfo("Connected to " + host + ":" + port)
+     val reader = new BufferedReader(new InputStreamReader(socket.getInputStream(), "UTF-8"))
+     userInput = reader.readLine()
+     while(!isStopped && userInput != null) {
+       store(userInput)
+       userInput = reader.readLine()
+     }
+     reader.close()
+     socket.close()
+     logInfo("Stopped receiving")
+     restart("Trying to connect again")
+   } catch {
+     case e: java.net.ConnectException =>
+       restart("Error connecting to " + host + ":" + port, e)
+     case t: Throwable =>
+       restart("Error receiving data", t)
+   }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/1d84964b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
index fbb2e9f..75a3e93 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
@@ -390,7 +390,7 @@ class JavaStreamingContext(val ssc: StreamingContext) {
      * Find more details at: http://spark.apache.org/docs/latest/streaming-custom-receivers.html
      * @param receiver Custom implementation of Receiver
      */
-  def receiverStream[T](receiver: Receiver[T]): ReceiverInputDStream[T] = {
+  def receiverStream[T](receiver: Receiver[T]): JavaReceiverInputDStream[T] = {
     implicit val cm: ClassTag[T] =
       implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
     ssc.receiverStream(receiver)

http://git-wip-us.apache.org/repos/asf/spark/blob/1d84964b/streaming/src/main/scala/org/apache/spark/streaming/receiver/Receiver.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/Receiver.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/Receiver.scala
index 524c1b8..b310c22 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/Receiver.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/Receiver.scala
@@ -30,28 +30,55 @@ import org.apache.spark.annotation.DeveloperApi
  * Abstract class of a receiver that can be run on worker nodes to receive external data.
A
  * custom receiver can be defined by defining the functions onStart() and onStop(). onStart()
  * should define the setup steps necessary to start receiving data,
- * and onStop() should define the cleanup steps necessary to stop receiving data. A custom
- * receiver would look something like this.
+ * and onStop() should define the cleanup steps necessary to stop receiving data.
  *
- * @example {{{
+ * A custom receiver in Scala would look like this.
+ *
+ * {{{
  *  class MyReceiver(storageLevel: StorageLevel) extends NetworkReceiver[String](storageLevel)
{
- *    def onStart() {
- *      // Setup stuff (start threads, open sockets, etc.) to start receiving data.
- *      // Must start new thread to receive data, as onStart() must be non-blocking.
+ *      def onStart() {
+ *          // Setup stuff (start threads, open sockets, etc.) to start receiving data.
+ *          // Must start new thread to receive data, as onStart() must be non-blocking.
  *
- *      // Call store(...) in those threads to store received data into Spark's memory.
+ *          // Call store(...) in those threads to store received data into Spark's memory.
  *
- *      // Call stop(...), restart() or reportError(...) on any thread based on how
- *      // different errors should be handled.
+ *          // Call stop(...), restart(...) or reportError(...) on any thread based on how
+ *          // different errors needs to be handled.
  *
- *      // See corresponding method documentation for more details
- *    }
+ *          // See corresponding method documentation for more details
+ *      }
  *
- *    def onStop() {
- *      // Cleanup stuff (stop threads, close sockets, etc.) to stop receiving data.
- *    }
+ *      def onStop() {
+ *          // Cleanup stuff (stop threads, close sockets, etc.) to stop receiving data.
+ *      }
  *  }
  * }}}
+ *
+ * A custom receiver in Java would look like this.
+ *
+ * {{{
+ * class MyReceiver extends Receiver<String> {
+ *     public MyReceiver(StorageLevel storageLevel) {
+ *         super(storageLevel);
+ *     }
+ *
+ *     public void onStart() {
+ *          // Setup stuff (start threads, open sockets, etc.) to start receiving data.
+ *          // Must start new thread to receive data, as onStart() must be non-blocking.
+ *
+ *          // Call store(...) in those threads to store received data into Spark's memory.
+ *
+ *          // Call stop(...), restart(...) or reportError(...) on any thread based on how
+ *          // different errors needs to be handled.
+ *
+ *          // See corresponding method documentation for more details
+ *     }
+ *
+ *     public void onStop() {
+ *          // Cleanup stuff (stop threads, close sockets, etc.) to stop receiving data.
+ *     }
+ * }
+ * }}}
  */
 @DeveloperApi
 abstract class Receiver[T](val storageLevel: StorageLevel) extends Serializable {
@@ -156,30 +183,34 @@ abstract class Receiver[T](val storageLevel: StorageLevel) extends Serializable
   }
 
   /**
-   * Restart the receiver. This will call `onStop()` immediately and return.
-   * Asynchronously, after a delay, `onStart()` will be called.
+   * Restart the receiver. This method schedules the restart and returns
+   * immediately. The stopping and subsequent starting of the receiver
+   * (by calling `onStop()` and `onStart()`) is performed asynchronously
+   * in a background thread. The delay between the stopping and the starting
+   * is defined by the Spark configuration `spark.streaming.receiverRestartDelay`.
    * The `message` will be reported to the driver.
-   * The delay is defined by the Spark configuration
-   * `spark.streaming.receiverRestartDelay`.
    */
   def restart(message: String) {
     executor.restartReceiver(message)
   }
 
   /**
-   * Restart the receiver. This will call `onStop()` immediately and return.
-   * Asynchronously, after a delay, `onStart()` will be called.
+   * Restart the receiver. This method schedules the restart and returns
+   * immediately. The stopping and subsequent starting of the receiver
+   * (by calling `onStop()` and `onStart()`) is performed asynchronously
+   * in a background thread. The delay between the stopping and the starting
+   * is defined by the Spark configuration `spark.streaming.receiverRestartDelay`.
    * The `message` and `exception` will be reported to the driver.
-   * The delay is defined by the Spark configuration
-   * `spark.streaming.receiverRestartDelay`.
    */
   def restart(message: String, error: Throwable) {
     executor.restartReceiver(message, Some(error))
   }
 
   /**
-   * Restart the receiver. This will call `onStop()` immediately and return.
-   * Asynchronously, after the given delay, `onStart()` will be called.
+   * Restart the receiver. This method schedules the restart and returns
+   * immediately. The stopping and subsequent starting of the receiver
+   * (by calling `onStop()` and `onStart()`) is performed asynchronously
+   * in a background thread.
    */
   def restart(message: String, error: Throwable, millisecond: Int) {
     executor.restartReceiver(message, Some(error), millisecond)
@@ -195,16 +226,23 @@ abstract class Receiver[T](val storageLevel: StorageLevel) extends Serializable
     executor.stop(message, Some(error))
   }
 
+  /** Check if the receiver has started or not. */
   def isStarted(): Boolean = {
     executor.isReceiverStarted()
   }
 
-  /** Check if receiver has been marked for stopping. */
+  /**
+   * Check if receiver has been marked for stopping. Use this to identify when
+   * the receiving of data should be stopped.
+   */
   def isStopped(): Boolean = {
     executor.isReceiverStopped()
   }
 
-  /** Get unique identifier of this receiver. */
+  /**
+   * Get the unique identifier the receiver input stream that this
+   * receiver is associated with.
+   */
   def streamId = id
 
   /*

http://git-wip-us.apache.org/repos/asf/spark/blob/1d84964b/streaming/src/test/java/org/apache/spark/streaming/JavaReceiverAPISuite.java
----------------------------------------------------------------------
diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaReceiverAPISuite.java
b/streaming/src/test/java/org/apache/spark/streaming/JavaReceiverAPISuite.java
new file mode 100644
index 0000000..1b0787f
--- /dev/null
+++ b/streaming/src/test/java/org/apache/spark/streaming/JavaReceiverAPISuite.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming;
+
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.streaming.api.java.JavaDStream;
+import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
+import org.apache.spark.streaming.api.java.JavaStreamingContext;
+import static org.junit.Assert.*;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.spark.storage.StorageLevel;
+import org.apache.spark.streaming.receiver.Receiver;
+import org.apache.spark.api.java.function.Function;
+
+import java.io.BufferedReader;
+import java.io.InputStreamReader;
+import java.io.Serializable;
+import java.net.ConnectException;
+import java.net.Socket;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+public class JavaReceiverAPISuite implements Serializable {
+
+  @Before
+  public void setUp() {
+    System.clearProperty("spark.streaming.clock");
+  }
+
+  @After
+  public void tearDown() {
+    System.clearProperty("spark.streaming.clock");
+  }
+
+  @Test
+  public void testReceiver() throws InterruptedException {
+    TestServer server = new TestServer(0);
+    server.start();
+
+    final AtomicLong dataCounter = new AtomicLong(0);
+
+    try {
+      JavaStreamingContext ssc = new JavaStreamingContext("local[2]", "test", new Duration(200));
+      JavaReceiverInputDStream<String> input =
+        ssc.receiverStream(new JavaSocketReceiver("localhost", server.port()));
+      JavaDStream<String> mapped = input.map(new Function<String, String>() {
+        @Override
+        public String call(String v1) throws Exception {
+          return v1 + ".";
+        }
+      });
+      mapped.foreachRDD(new Function<JavaRDD<String>, Void>() {
+        @Override
+        public Void call(JavaRDD<String> rdd) throws Exception {
+        long count = rdd.count();
+        dataCounter.addAndGet(count);
+        return null;
+        }
+      });
+
+      ssc.start();
+      long startTime = System.currentTimeMillis();
+      long timeout = 10000;
+
+      Thread.sleep(200);
+      for (int i = 0; i < 6; i++) {
+        server.send("" + i + "\n"); // \n to make sure these are separate lines
+        Thread.sleep(100);
+      }
+      while (dataCounter.get() == 0 && System.currentTimeMillis() - startTime <
timeout) {
+        Thread.sleep(100);
+      }
+      ssc.stop();
+      assertTrue(dataCounter.get() > 0);
+    } finally {
+      server.stop();
+    }
+  }
+}
+
+class JavaSocketReceiver extends Receiver<String> {
+
+  String host = null;
+  int port = -1;
+
+  public JavaSocketReceiver(String host_ , int port_) {
+    super(StorageLevel.MEMORY_AND_DISK());
+    host = host_;
+    port = port_;
+  }
+
+  @Override
+  public void onStart() {
+    new Thread()  {
+      @Override public void run() {
+        receive();
+      }
+    }.start();
+  }
+
+  @Override
+  public void onStop() {
+  }
+
+  private void receive() {
+    Socket socket = null;
+    try {
+      socket = new Socket(host, port);
+      BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
+      String userInput;
+      while ((userInput = in.readLine()) != null) {
+        store(userInput);
+      }
+      in.close();
+      socket.close();
+    } catch(ConnectException ce) {
+      ce.printStackTrace();
+      restart("Could not connect", ce);
+    } catch(Throwable t) {
+      t.printStackTrace();
+      restart("Error receiving data", t);
+    }
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/spark/blob/1d84964b/streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala b/streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala
index 33f6df8..c0ea049 100644
--- a/streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala
+++ b/streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala
@@ -26,6 +26,7 @@ import org.apache.spark.streaming._
 import java.util.ArrayList
 import collection.JavaConversions._
 import org.apache.spark.api.java.JavaRDDLike
+import org.apache.spark.streaming.dstream.DStream
 
 /** Exposes streaming test functionality in a Java-friendly way. */
 trait JavaTestBase extends TestSuiteBase {
@@ -51,8 +52,7 @@ trait JavaTestBase extends TestSuiteBase {
    * [[org.apache.spark.streaming.TestOutputStream]].
    **/
   def attachTestOutputStream[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T,
R]](
-      dstream: JavaDStreamLike[T, This, R]) =
-  {
+      dstream: JavaDStreamLike[T, This, R]) = {
     implicit val cm: ClassTag[T] =
       implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
     val ostream = new TestOutputStreamWithPartitions(dstream.dstream)

http://git-wip-us.apache.org/repos/asf/spark/blob/1d84964b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
index b55b783..3fa2540 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
@@ -49,7 +49,8 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
 
     // Set up the streaming context and input streams
     val ssc = new StreamingContext(conf, batchDuration)
-    val networkStream = ssc.socketTextStream("localhost", testServer.port, StorageLevel.MEMORY_AND_DISK)
+    val networkStream = ssc.socketTextStream(
+      "localhost", testServer.port, StorageLevel.MEMORY_AND_DISK)
     val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String]]
     val outputStream = new TestOutputStream(networkStream, outputBuffer)
     def output = outputBuffer.flatMap(x => x)

http://git-wip-us.apache.org/repos/asf/spark/blob/1d84964b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
index 3e2b25a..ee0bc8b 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
@@ -165,7 +165,7 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with
Timeouts w
       ssc = new StreamingContext(sc, Milliseconds(100))
       var runningCount = 0
       TestReceiver.counter.set(1)
-      val input = ssc.networkStream(new TestReceiver)
+      val input = ssc.receiverStream(new TestReceiver)
       input.count.foreachRDD(rdd => {
         val count = rdd.first()
         runningCount += count.toInt

http://git-wip-us.apache.org/repos/asf/spark/blob/1d84964b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
index 4f63fd3..8036f77 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
@@ -155,6 +155,7 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging
{
   def afterFunction() {
     // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
     System.clearProperty("spark.driver.port")
+    System.clearProperty("spark.streaming.clock")
   }
 
   before(beforeFunction)


Mime
View raw message