Return-Path: X-Original-To: apmail-spark-user-archive@minotaur.apache.org Delivered-To: apmail-spark-user-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id BBFAA1898F for ; Mon, 29 Jun 2015 10:18:20 +0000 (UTC) Received: (qmail 33923 invoked by uid 500); 29 Jun 2015 10:18:17 -0000 Delivered-To: apmail-spark-user-archive@spark.apache.org Received: (qmail 33845 invoked by uid 500); 29 Jun 2015 10:18:17 -0000 Mailing-List: contact user-help@spark.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list user@spark.apache.org Received: (qmail 33835 invoked by uid 99); 29 Jun 2015 10:18:17 -0000 Received: from Unknown (HELO spamd4-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 29 Jun 2015 10:18:17 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id 99F4EC09B3 for ; Mon, 29 Jun 2015 10:18:16 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 3.487 X-Spam-Level: *** X-Spam-Status: No, score=3.487 tagged_above=-999 required=6.31 tests=[DKIM_ADSP_CUSTOM_MED=0.001, NML_ADSP_CUSTOM_MED=1.2, SPF_SOFTFAIL=0.972, URIBL_BLOCKED=0.001, URI_HEX=1.313] autolearn=disabled Received: from mx1-us-west.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id 3FyIaMwUt-8p for ; Mon, 29 Jun 2015 10:18:07 +0000 (UTC) Received: from mwork.nabble.com (mwork.nabble.com [162.253.133.43]) by mx1-us-west.apache.org (ASF Mail Server at mx1-us-west.apache.org) with ESMTP id CCFF6211EF for ; Mon, 29 Jun 2015 10:18:06 +0000 (UTC) Received: from mben.nabble.com (unknown [162.253.133.72]) by mwork.nabble.com (Postfix) with ESMTP id 8FFE4221E7A7 for ; Mon, 29 Jun 2015 03:17:36 -0700 (PDT) Date: Mon, 29 Jun 2015 03:17:35 -0700 (MST) From: summerdaway To: user@spark.apache.org Message-ID: <1435573055977-23529.post@n3.nabble.com> Subject: Spark Streaming-Receiver drops data MIME-Version: 1.0 Content-Type: text/plain; charset=us-ascii Content-Transfer-Encoding: 7bit Hi, I'm using spark streaming to process data. I do a simple flatMap on each record as follows package bb; import java.io.*; import java.net.*; import java.io.BufferedReader; import java.io.FileReader; import java.io.IOException; import java.net.URI; import java.util.List; import java.util.ArrayList; import java.util.Scanner; import java.util.StringTokenizer; import java.util.HashMap; import java.util.Map; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.function.*; import org.apache.spark.broadcast.Broadcast; import scala.Tuple2; import org.apache.spark.streaming.*; import org.apache.spark.streaming.api.java.*; public class ReceiverTest { public static void main(String[] args) throws Exception { SparkConf sparkConf = new SparkConf().setAppName("Receiver Test"); JavaSparkContext sc = new JavaSparkContext(sparkConf); JavaStreamingContext jssc = new JavaStreamingContext(sc, new Duration(2000)); //JavaReceiverInputDStream inputStream = jssc.receiverStream(new JavaCustomReceiver("localhost", 3081)); JavaReceiverInputDStream inputStream = jssc.socketTextStream("localhost", 3081); JavaPairDStream bab = inputStream.flatMapToPair(new PairFlatMapFunction() { @Override public Iterable> call(String t) { ArrayList> ret = new ArrayList>(); System.out.println(t); if (t.length() > 2) return ret; for (int i = 0; i < 5; ++i) { String str = t + i; ret.add(new Tuple2("", str)); Socket socket = null; PrintWriter pw = null; try { socket=new Socket("spark000",3081); pw = new PrintWriter(socket.getOutputStream()); pw.println(str); pw.flush(); } catch (Exception e) { e.printStackTrace(); } finally { try { pw.close(); socket.close(); } catch (Exception e) { e.printStackTrace(); } } } return ret; } }); bab.print(); jssc.start(); jssc.awaitTermination(); System.exit(0); } } For each record s, {"s0", "s1", "s2", "s3", "s4"} will be produced if length(s) <= 2. The new records are sent back to the socket server to be further processed. At the start, I send an "1" to the socket server. In the worker log (I'm using one machine so there is only one worker), I expect to see the following result (may appear in different order I think): 1 10 11 12 13 14 100 101 102 103 104 110 111 112 ... 143 144 However, the result is as follows 1 10 11 12 100 101 102 103 104 110 111 112 113 114 120 121 122 123 124 Namely "13" and "14" are not processed in flatMap. I've also tried a custom receiver(the first sample in https://spark.apache.org/docs/1.4.0/streaming-custom-receivers.html) instead of socketTextStream(). The guide says that store(single-record) is not reliable but store(multiple-records) is reliable, thus I changed that part to while (!isStopped() && (userInput = reader.readLine()) != null) { ArrayList lt = new ArrayList(); lt.add(userInput); store(lt.iterator()); } But I still see data loss. The socket server did receive all the data, but not all of them are stored to spark by store(). Can anyone spot the problem? Thanks in advance. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Receiver-drops-data-tp23529.html Sent from the Apache Spark User List mailing list archive at Nabble.com. --------------------------------------------------------------------- To unsubscribe, e-mail: user-unsubscribe@spark.apache.org For additional commands, e-mail: user-help@spark.apache.org