spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From summerdaway <summerda...@gmail.com>
Subject Spark Streaming-Receiver drops data
Date Mon, 29 Jun 2015 10:17:35 GMT
Hi, I'm using spark streaming to process data. I do a simple flatMap on each
record as follows

package bb;

import java.io.*;
import java.net.*;

import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.net.URI;
import java.util.List;
import java.util.ArrayList;
import java.util.Scanner;
import java.util.StringTokenizer;
import java.util.HashMap;
import java.util.Map;


import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.function.*;
import org.apache.spark.broadcast.Broadcast;
import scala.Tuple2;

import org.apache.spark.streaming.*;
import org.apache.spark.streaming.api.java.*;


public class ReceiverTest {

    public static void main(String[] args) throws Exception {

        SparkConf sparkConf = new SparkConf().setAppName("Receiver Test");
        JavaSparkContext sc = new JavaSparkContext(sparkConf);
        JavaStreamingContext jssc = new JavaStreamingContext(sc, new
Duration(2000));

        //JavaReceiverInputDStream<String> inputStream =
jssc.receiverStream(new JavaCustomReceiver("localhost", 3081));
        JavaReceiverInputDStream<String> inputStream =
jssc.socketTextStream("localhost", 3081);


        JavaPairDStream<String, String> bab = inputStream.flatMapToPair(new
PairFlatMapFunction<String, String, String>() {
            @Override
            public Iterable<Tuple2&lt;String, String>> call(String t) {

                ArrayList<Tuple2&lt;String, String>> ret = new
ArrayList<Tuple2&lt;String, String>>();
                System.out.println(t);
                if (t.length() > 2) return ret;
                for (int i = 0; i < 5; ++i) {
                    String str = t + i;
                    ret.add(new Tuple2("", str));

                    Socket socket = null;
                    PrintWriter pw = null;
                    try {
                        socket=new Socket("spark000",3081);
                        pw = new PrintWriter(socket.getOutputStream());
                        pw.println(str);
                        pw.flush();
                    } catch (Exception e) {
                        e.printStackTrace();
                    } finally {
                        try {
                            pw.close();
                            socket.close();
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                    }
                }
                return ret;
            }
        });

        bab.print();

        jssc.start();
        jssc.awaitTermination();
        System.exit(0);
    }
}

For each record s, {"s0", "s1", "s2", "s3", "s4"} will be produced if
length(s) <= 2. The new records are sent back to the socket server to be
further processed.

At the start, I send an "1" to the socket server. In the worker log (I'm
using one machine so there is only one worker), I expect to see the
following result (may appear in different order I think): 1 10 11 12 13 14
100 101 102 103 104 110 111 112 ... 143 144

However, the result is as follows
1 10 11 12 100 101 102 103 104 110 111 112 113 114 120 121 122 123 124

Namely "13" and "14" are not processed in flatMap. I've also tried a custom
receiver(the first sample in
https://spark.apache.org/docs/1.4.0/streaming-custom-receivers.html) instead
of socketTextStream(). The guide says that store(single-record) is not
reliable but store(multiple-records) is reliable, thus I changed that part
to

while (!isStopped() && (userInput = reader.readLine()) != null) {
    ArrayList<String> lt = new ArrayList<String>();
    lt.add(userInput);
    store(lt.iterator());
}

But I still see data loss. The socket server did receive all the data, but
not all of them are stored to spark by store().

Can anyone spot the problem? Thanks in advance.



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Receiver-drops-data-tp23529.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Mime
View raw message