spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Qiao, Richard" <Richard.Q...@capitalone.com>
Subject Re: Do I need to do .collect inside forEachRDD
Date Thu, 07 Dec 2017 10:46:00 GMT
Thanks for sharing the code.
The 1st problem in the first code is the map is allocated in Driver, but it’s trying to
put data in Executors, then retrieve it in driver to send to Kafka.
You are using this map as accumulator’s feature, but it doesn’t work in this way.

The 2nd problem is both codes are trying to collect rdd level data to generate a single Json
string then send to Kafka, which could cause very long json string if your TPS is very high.
If possible you can send smaller json strings at task level, such as:
    .foreachRDD(stringLongJavaPairRDD -> {
      stringLongJavaPairRDD.foreachPartition{partition ->{
          Map<String, Long> map = new HashMap<>(); //Defined in a task
          partition.foreach(stringLongTuple2 -> {
            map.put(stringLongTuple2._1(), stringLongTuple2._2())
          });
          producer.send(new ProducerRecord<>("topicA", gson.toJson(map))); // send smaller
json in a task
        }
      }
    });
When you do it, make sure kafka producer (seek kafka sink for it) and gson’s environment
setup correctly in executors.

If after this, there is still OOM, let’s discuss further.

Best Regards
Richard


From: kant kodali <kanth909@gmail.com>
Date: Thursday, December 7, 2017 at 2:30 AM
To: Gerard Maas <gerard.maas@gmail.com>
Cc: "Qiao, Richard" <Richard.Qiao@capitalone.com>, "user @spark" <user@spark.apache.org>
Subject: Re: Do I need to do .collect inside forEachRDD

@Richard I had pasted the two versions of the code below and I still couldn't figure out why
it wouldn't work without .collect ?  Any help would be great


The code below doesn't work and sometime I also run into OutOfMemory error.

jsonMessagesDStream
    .window(new Duration(60000), new Duration(1000))
    .mapToPair(val -> {
      JsonParser parser = new JsonParser();
      JsonObject jsonObj = parser.parse(val).getAsJsonObject();
      if (jsonObj.has("key4")) {
        return new Tuple2<>("", 0L);
      }
      String symbol = jsonObj.get("key1").getAsString();
      long uuantity = jsonObj.get("key2").getAsLong();
      return new Tuple2<>(symbol, quantity);
    })
    .reduceByKey((v1, v2) -> v1 + v2)
    .foreachRDD(stringLongJavaPairRDD -> {
        Map<String, Long> map = new HashMap<>();
        stringLongJavaPairRDD.foreach(stringLongTuple2 -> {

            System.out.println(stringLongTuple2._1()); // Works I can see values getting printed
out

            System.out.println(stringLongTuple2._2()); // Works I can see values getting printed
out

            map.put(stringLongTuple2._1(), stringLongTuple2._2())

        });

        System.out.println(gson.toJson(map)); // Prints empty json doc string "{}" always.
But why? especially

        // when the map is getting filled values as confirmed by the print statements above

        producer.send(new ProducerRecord<>("topicA", gson.toJson(map)));
    });

    jssc.start();

    jssc.awaitTermination();



                          VS

The below code works but it is slow because .collectAsMap



jsonMessagesDStream
    .window(new Duration(60000), new Duration(1000))
    .mapToPair(val -> {
      JsonParser parser = new JsonParser();
      JsonObject jsonObj = parser.parse(val).getAsJsonObject();
      if (jsonObj.has("key4")) {
        return new Tuple2<>("", 0L);
      }
      String symbol = jsonObj.get("key1").getAsString();
      long uuantity = jsonObj.get("key2").getAsLong();
      return new Tuple2<>(symbol, quantity);
    })
    .reduceByKey((v1, v2) -> v1 + v2)
    .foreachRDD(stringLongJavaPairRDD -> {

        LinkedHashMap<String, Long> map = new LinkedHashMap<>(stringLongJavaPairRDD.collectAsMap());

        producer.send(new ProducerRecord<>("topicA", gson.toJson(map)));

    });

    jssc.start();

    jssc.awaitTermination();





On Wed, Dec 6, 2017 at 1:43 AM, Gerard Maas <gerard.maas@gmail.com<mailto:gerard.maas@gmail.com>>
wrote:
Hi Kant,

>  but would your answer on .collect() change depending on running the spark app in client
vs cluster mode?

No, it should make no difference.

-kr, Gerard.

On Tue, Dec 5, 2017 at 11:34 PM, kant kodali <kanth909@gmail.com<mailto:kanth909@gmail.com>>
wrote:
@Richard I don't see any error in the executor log but let me run again to make sure.

@Gerard Thanks much!  but would your answer on .collect() change depending on running the
spark app in client vs cluster mode?

Thanks!

On Tue, Dec 5, 2017 at 1:54 PM, Gerard Maas <gerard.maas@gmail.com<mailto:gerard.maas@gmail.com>>
wrote:
The general answer to your initial question is that "it depends". If the operation in the
rdd.foreach() closure can be parallelized, then you don't need to collect first. If it needs
some local context (e.g. a socket connection), then you need to do rdd.collect first to bring
the data locally, which has a perf penalty and also is restricted to the memory size to the
driver process.

Given the further clarification:
>Reads from Kafka and outputs to Kafka. so I check the output from Kafka.

If it's writing to Kafka, that operation can be done in a distributed form.

You could use this lib: https://github.com/BenFradet/spark-kafka-writer

Or, if you can upgrade to Spark 2.2 version, you can pave your way to migrate to structured
streaming by already adopting the 'structured' APIs within Spark Streaming:

case class KV(key: String, value: String)

dstream.map().reduce().forEachRdd{rdd ->
    import spark.implicits._
    val kv = rdd.map{e => KV(extractKey(e), extractValue(e))} // needs to be in a (key,value)
shape
    val dataFrame = rdd.toDF()
    dataFrame.write
                     .format("kafka")
                     .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
                     .option("topic", "topic1")
                     .save()
}

-kr, Gerard.



On Tue, Dec 5, 2017 at 10:38 PM, kant kodali <kanth909@gmail.com<mailto:kanth909@gmail.com>>
wrote:
Reads from Kafka and outputs to Kafka. so I check the output from Kafka.

On Tue, Dec 5, 2017 at 1:26 PM, Qiao, Richard <Richard.Qiao@capitalone.com<mailto:Richard.Qiao@capitalone.com>>
wrote:
Where do you check the output result for both case?

Sent from my iPhone


> On Dec 5, 2017, at 15:36, kant kodali <kanth909@gmail.com<mailto:kanth909@gmail.com>>
wrote:
>
> Hi All,
>
> I have a simple stateless transformation using Dstreams (stuck with the old API for one
of the Application). The pseudo code is rough like this
>
> dstream.map().reduce().forEachRdd(rdd -> {
>      rdd.collect(),forEach(); // Is this necessary ? Does execute fine but a bit slow
> })
>
> I understand collect collects the results back to the driver but is that necessary? can
I just do something like below? I believe I tried both and somehow the below code didn't output
any results (It can be issues with my env. I am not entirely sure) but I just would like some
clarification on .collect() since it seems to slow things down for me.
>
> dstream.map().reduce().forEachRdd(rdd -> {
>      rdd.forEach(() -> {} ); //
> })
>
> Thanks!
>
>
________________________________________________________

The information contained in this e-mail is confidential and/or proprietary to Capital One
and/or its affiliates and may only be used solely in performance of work or services for Capital
One. The information transmitted herewith is intended only for use by the individual or entity
to which it is addressed. If the reader of this message is not the intended recipient, you
are hereby notified that any review, retransmission, dissemination, distribution, copying
or other use of, or taking of any action in reliance upon this information is strictly prohibited.
If you have received this communication in error, please contact the sender and delete the
material from your computer.





________________________________________________________

The information contained in this e-mail is confidential and/or proprietary to Capital One
and/or its affiliates and may only be used solely in performance of work or services for Capital
One. The information transmitted herewith is intended only for use by the individual or entity
to which it is addressed. If the reader of this message is not the intended recipient, you
are hereby notified that any review, retransmission, dissemination, distribution, copying
or other use of, or taking of any action in reliance upon this information is strictly prohibited.
If you have received this communication in error, please contact the sender and delete the
material from your computer.
Mime
View raw message