spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Nicolas PHUNG (JIRA)" <j...@apache.org>
Subject [jira] [Comment Edited] (SPARK-7122) KafkaUtils.createDirectStream - unreasonable processing time in absence of load
Date Wed, 03 Jun 2015 13:54:38 GMT

    [ https://issues.apache.org/jira/browse/SPARK-7122?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14570847#comment-14570847
] 

Nicolas PHUNG edited comment on SPARK-7122 at 6/3/15 1:53 PM:
--------------------------------------------------------------

I've tried with _KafkaUtils.createDirectStream_ without the print like this :

{code}
    analyticEventStream.foreachRDD(rdd => {
      if (!rdd.isEmpty()) {
        EsSpark.saveJsonToEs(rdd, esIndex)
      }
    })
{code}

And unfortunately it's still getting behind (after a while still 30 min away). However, the
processing time goes to 4s max. I'll try to a 10 second batch to see if it's better. Else
I think I'll keep using _KafkaUtils.createStream_. I don't understand why it can't keep up.


was (Author: nphung):
I've tried with _KafkaUtils.createDirectStream_ without the print like this :

{code}
    analyticEventStream.foreachRDD(rdd => {
      if (!rdd.isEmpty()) {
        EsSpark.saveJsonToEs(rdd, esIndex)
      }
    })
{code}

And unfortunately it's still getting behind (after a while still 30 min away). However, the
processing time goes to 4s max. I'll try to a 10 second batch to see if it's better. Else
I think I'll keep using _KafkaUtils.createStream_.

> KafkaUtils.createDirectStream - unreasonable processing time in absence of load
> -------------------------------------------------------------------------------
>
>                 Key: SPARK-7122
>                 URL: https://issues.apache.org/jira/browse/SPARK-7122
>             Project: Spark
>          Issue Type: Question
>          Components: Streaming
>    Affects Versions: 1.3.1
>         Environment: Spark Streaming 1.3.1, standalone mode running on just 1 box: Ubuntu
14.04.2 LTS, 4 cores, 8GB RAM, java version "1.8.0_40"
>            Reporter: Platon Potapov
>            Priority: Minor
>         Attachments: 10.second.window.fast.job.txt, 5.second.window.slow.job.txt, SparkStreamingJob.scala
>
>
> attached is the complete source code of a test spark job. no external data generators
are run - just the presence of a kafka topic named "raw" suffices.
> the spark job is run with no load whatsoever. http://localhost:4040/streaming is checked
to obtain job processing duration.
> * in case the test contains the following transformation:
> {code}
>     // dummy transformation
>     val temperature = bytes.filter(_._1 == "abc")
>     val abc = temperature.window(Seconds(40), Seconds(5))
>     abc.print()
> {code}
> the median processing time is 3 seconds 80 ms
> * in case the test contains the following transformation:
> {code}
>     // dummy transformation
>     val temperature = bytes.filter(_._1 == "abc")
>     val abc = temperature.map(x => (1, x))
>     abc.print()
> {code}
> the median processing time is just 50 ms
> please explain why does the "window" transformation introduce such a growth of job duration?
> note: the result is the same regardless of the number of kafka topic partitions (I've
tried 1 and 8)
> note2: the result is the same regardless of the window parameters (I've tried (20, 2)
and (40, 5))



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message