spark-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Liang-Chi Hsieh <>
Subject Re: Limit Query Performance Suggestion
Date Mon, 16 Jan 2017 03:36:21 GMT

Hi Sujith,

Thanks for suggestion.

The codes you quoted are from `CollectLimitExec` which will be in the plan
if a logical `Limit` is the final operator in an logical plan. But in the
physical plan you showed, there are `GlobalLimit` and `LocalLimit` for the
logical `Limit` operation, so the `doExecute` method of `CollectLimitExec`
will not be executed.

In the case `CollectLimitExec` is the final physical operation, its
`executeCollect` will be executed and delegate to `SparkPlan.executeTake`
which is optimized to only retrieved required number of rows back to the
driver. So when using `limit n` with a huge partition number it should not
be a problem.

In the case `GlobalLimit` and `LocalLimit` are the final physical
operations, your concern is that when returning `n` rows from `N` partitions
and `N` is huge, the total `n * N` rows will cause heavy memory pressure on
the driver. I am not sure if you really observe this problem or you just
think it might be a problem. In this case, there will be a shuffle exchange
between `GlobalLimit` and `LocalLimit` to retrieve data from all partitions
to one partition. In `GlobalLimit` we will only take the required number of
rows from the input iterator which really pulls data from local blocks and
remote blocks. Due to the use of iterator approach, I think when we get the
enough rows in `GlobalLimit`, we won't continue to consume the input
iterator and pull more data back. So I don't think your concern will be a

sujith71955 wrote
> When limit is being added in the terminal of the physical plan there will
> be possibility of memory bottleneck
> if the limit value is too large and system will try to aggregate all the
> partition limit values as part of single partition.
> Description:
> Eg:
> create table src_temp as select * from src limit n;
> == Physical Plan ==
> ExecutedCommand
>    +- CreateHiveTableAsSelectCommand [Database:spark}, TableName: t2,
> InsertIntoHiveTable]
>          +- GlobalLimit 2
>             +- LocalLimit 2
>                +- Project [imei#101, age#102, task#103L, num#104,
> level#105, productdate#106, name#107, point#108]
>                   +- SubqueryAlias hive
>                      +-
> Relation[imei#101,age#102,task#103L,num#104,level#105,productdate#106,name#107,point#108]
> csv  |
> As shown in above plan when the limit comes in terminal ,there can be two
> types of performance bottlenecks.
> scenario 1: when the partition count is very high and limit value is small
> scenario 2: when the limit value is very large
>  protected override def doExecute(): RDD[InternalRow] = {
>     val locallyLimited =
> child.execute().mapPartitionsInternal(_.take(limit))
>     val shuffled = new ShuffledRowRDD(
>       ShuffleExchange.prepareShuffleDependency(
>         locallyLimited, child.output, SinglePartition, serializer))
>     shuffled.mapPartitionsInternal(_.take(limit))
>   }
> }
> As per my understanding the current algorithm first creates the
> MapPartitionsRDD by applying limit from each partition, then
> ShuffledRowRDD
> will be created by grouping data from all partitions into single
> partition,
> this can create overhead since all partitions will return limit n data ,
> so
> while grouping there will be N partition * limit N which can be very huge,
> in both scenarios mentioned above this logic can be a bottle neck.
> My suggestion for handling scenario 1 where large number of partition and
> limit value is small, in this case driver can create an accumulator value
> and try to send to all partitions, all executer will be updating the
> accumulator value based on the data fetched ,
> eg: number of partition = 100, number of cores =10
> tasks will be launched in a group of 10(10*10 = 100), once the first group
> finishes the tasks driver will check whether the accumulator value is been
> reached the limit value
> if its reached then no further task will be launched to executers and the
> result will be returned.
> Let me know for any furthur suggestions or solution.
> Thanks in advance,
> Sujith

Liang-Chi Hsieh | @viirya 
Spark Technology Center 
View this message in context:
Sent from the Apache Spark Developers List mailing list archive at

To unsubscribe e-mail:

View raw message