spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Kartik Mathur <kar...@bluedata.com>
Subject Re: Problem understanding spark word count execution
Date Fri, 02 Oct 2015 07:50:24 GMT
Thanks Yong ,

That was a good explanation I was looking for , however I have one doubt ,
you write - *"**Image that you have 2 mappers to read the data, then each
mapper will generate the (word, count) tuple output in segments. Spark
always output that in local file. (In fact, one file with different
segments to represent different partitions) "  *if this is true then spark
is very similar to Hadoop MapReduce (Disk IO bw phases) , with so many IOs
after each stage how does spark achieves the performance that it does as
compared to map reduce . Another doubt is  *"*The 2000 bytes sent to driver
is the final output aggregated on the reducers end, and merged back to the
driver."* , *which part of our word count code takes care of this part ?
And yes there are only 273 distinct words in the text so that's not a
surprise.

Thanks again,

Hope to get a reply.

--Kartik

On Thu, Oct 1, 2015 at 5:49 PM, java8964 <java8964@hotmail.com> wrote:

> I am not sure about originally explain of shuffle write.
>
> In the word count example, the shuffle is needed, as Spark has to group by
> the word (ReduceBy is more accurate here). Image that you have 2 mappers to
> read the data, then each mapper will generate the (word, count) tuple
> output in segments. Spark always output that in local file. (In fact, one
> file with different segments to represent different partitions).
>
> As you can image, the output of these segments will be small, as it only
> contains (word, count of word) tuples. After each mapper generates this
> segmented file for different partitions, then the reduce will fetch the
> partitions belonging to itself.
>
> In your job summery, if your source is text file, so your data corresponds
> to 2 HDFS block, or 2x256M. There are 2 tasks concurrent read these 2
> partitions, about 2.5M lines of data of each partition being processed.
>
> The output of each partition is shuffle-writing 2.7K data, which is the
> size of the segment file generated, corresponding to all the unique words
> and their count of this partition. So the size is reasonable, at least for
> me.
>
> The interested number is 273 as shuffle write records. I am not 100% sure
> its meaning. Does it mean that this partition have 273 unique words from
> these 2.5M lines of data? That is kind of low, but I really don't have
> other explaining of its meaning.
>
> If you finally output shows hundreds of unique words, then it is.
>
> The 2000 bytes sent to driver is the final output aggregated on the
> reducers end, and merged back to the driver.
>
> Yong
>
>
> ------------------------------
> Date: Thu, 1 Oct 2015 13:33:59 -0700
> Subject: Re: Problem understanding spark word count execution
> From: kartik@bluedata.com
> To: nicolae.marasoiu@adswizz.com
> CC: user@spark.apache.org
>
>
> Hi Nicolae,
> Thanks for the reply. To further clarify things -
>
> sc.textFile is reading from HDFS, now shouldn't the file be read in a way
> such that EACH executer works on only the local copy of file part available
> , in this case its a ~ 4.64 GB file and block size is 256MB, so approx 19
> partitions will be created and each task will run on  1 partition (which is
> what I am seeing in the stages logs) , also i assume it will read the file
> in a way that each executer will have exactly same amount of data. so there
> shouldn't be any shuffling in reading atleast.
>
> During the stage 0 (sc.textFile -> flatMap -> Map) for every task this is
> the output I am seeing
>
> IndexIDAttemptStatusLocality LevelExecutor ID / HostLaunch TimeDurationGC
> TimeInput Size / RecordsWrite TimeShuffle Write Size / RecordsErrors0440
> SUCCESSNODE_LOCAL1 / 10.35.244.102015/09/29 13:57:2414 s0.2 s256.0 MB
> (hadoop) / 25951612.7 KB / 2731450SUCCESSNODE_LOCAL2 / 10.35.244.112015/09/29
> 13:57:2413 s0.2 s256.0 MB (hadoop) / 25951762.7 KB / 273
> I have following questions -
>
> 1) What exactly is 2.7KB of shuffle write  ?
> 2) is this 2.7 KB of shuffle write is local to that executer ?
> 3) In the executers log I am seeing 2000 bytes results sent to the driver
> , if instead this number is much much greater than 2000 byes such that it
> does not fit in executer's memory , will shuffle write increase ?
> 4)For word count 256 MB data is substantial amount text , how come the
> result for this stage is only 2000 bytes !! it should send everyword with
> respective count , for a 256 MB input this result should be much bigger ?
>
> I hope I am clear this time.
>
> Hope to get a reply,
>
> Thanks
> Kartik
>
>
>
> On Thu, Oct 1, 2015 at 12:38 PM, Nicolae Marasoiu <
> nicolae.marasoiu@adswizz.com> wrote:
>
> Hi,
>
> So you say " *sc.textFile -> flatMap -> Map".*
>
> *My understanding is like this:*
> *First step is a number of partitions are determined, p of them. You can
> give hint on this.*
> *Then the nodes which will load partitions p, that is n nodes (where
> n<=p).*
>
> *Relatively at the same time or not, the n nodes start opening different
> sections of the file - the physical equivalent of the partitions: for
> instance in HDFS they would do an open and a seek I guess and just read
> from the stream there, convert to whatever the InputFormat dictates.*
>
> The shuffle can only be the part when a node opens an HDFS file for
> instance but the node does not have a local replica of the blocks which it
> needs to read (those pertaining to his assigned partitions). So he needs to
> pick them up from remote nodes which do have replicas of that data.
>
> After blocks are read into memory, flatMap and Map are local computations
> generating new RDDs and in the end the result is sent to the driver
> (whatever termination computation does on the RDD like the result of
> reduce, or side effects of rdd.foreach, etc).
>
> Maybe you can share more of your context if still unclear.
> I just made assumptions to give clarity on a similar thing.
>
> Nicu
> ------------------------------
> *From:* Kartik Mathur <kartik@bluedata.com>
> *Sent:* Thursday, October 1, 2015 10:25 PM
> *To:* Nicolae Marasoiu
> *Cc:* user
> *Subject:* Re: Problem understanding spark word count execution
>
> Thanks Nicolae ,
> So In my case all executers are sending results back to the driver and and
> "*shuffle* *is just sending out the textFile to distribute the
> partitions", *could you please elaborate on this  ? what exactly is in
> this file ?
>
> On Wed, Sep 30, 2015 at 9:57 PM, Nicolae Marasoiu <
> nicolae.marasoiu@adswizz.com> wrote:
>
>
>
> Hi,
>
> 2- the end results are sent back to the driver; the shuffles are
> transmission of intermediate results between nodes such as the -> which are
> all intermediate transformations.
>
> More precisely, since flatMap and map are narrow dependencies, meaning
> they can usually happen on the local node, I bet shuffle is just sending
> out the textFile to a few nodes to distribute the partitions.
>
>
> ------------------------------
> *From:* Kartik Mathur <kartik@bluedata.com>
> *Sent:* Thursday, October 1, 2015 12:42 AM
> *To:* user
> *Subject:* Problem understanding spark word count execution
>
> Hi All,
>
> I tried running spark word count and I have couple of questions -
>
> I am analyzing stage 0 , i.e
>  *sc.textFile -> flatMap -> Map (Word count example)*
>
> 1) In the *Stage logs* under Application UI details for every task I am
> seeing Shuffle write as 2.7 KB, *question - how can I know where all did
> this task write ? like how many bytes to which executer ?*
>
> 2) In the executer's log when I look for same task it says 2000 bytes of
> result is sent to driver , my question is , *if the results were directly
> sent to driver what is this shuffle write ? *
>
> Thanks,
> Kartik
>
>
>
>

Mime
View raw message