spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Hyukjin Kwon (JIRA)" <j...@apache.org>
Subject [jira] [Updated] (SPARK-19222) Limit Query Performance issue
Date Mon, 16 Jan 2017 03:53:26 GMT

     [ https://issues.apache.org/jira/browse/SPARK-19222?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]

Hyukjin Kwon updated SPARK-19222:
---------------------------------
    Description: 
Performance/memory bottle neck occurs in the below mentioned query
case 1:
create table t1 as select * from dest1 limit 10000000;
case 2:
create table t1 as select * from dest1 limit 1000;
pre-condition : partition count >=10000
(It'd be great if the code blocks are wrapped with {{ {code} {code} }}
In above cases limit is being added in the terminal of the physical plan 

== Physical Plan  ==
ExecutedCommand
   +- CreateHiveTableAsSelectCommand [Database:spark}, TableName: t2, InsertIntoHiveTable]
         +- GlobalLimit 10000000
            +- LocalLimit 10000000
               +- Project [imei#101, age#102, task#103L, num#104, level#105, productdate#106,
name#107, point#108]
                  +- SubqueryAlias hive
                     +- Relation[imei#101,age#102,task#103L,num#104,level#105,productdate#106,name#107,point#108]
csv  |
Issue Hints: 

Possible Bottleneck snippet in limit.scala file under spark-sql package.
  protected override def doExecute(): RDD[InternalRow] = {
    val locallyLimited = child.execute().mapPartitionsInternal(_.take(limit))
    val shuffled = new ShuffledRowRDD(
      ShuffleExchange.prepareShuffleDependency(
        locallyLimited, child.output, SinglePartition, serializer))
    shuffled.mapPartitionsInternal(_.take(limit))
  }

As mentioned in above case 1  (where limit value is 10000000 or partition count is > 10000)
and case 2(limit value is small(around 1000)), As per the above snippet when the ShuffledRowRDD
is created by grouping all the limit data from different partitions to a single partition
in executer,  memory issue occurs since all the partition limit data will be collected and

grouped  in a single partition for processing, in both former/later case the data count  can
go very high which can create the memory bottleneck.

Proposed solution for case 2:
An accumulator value can be to send to all partitions, all executor will be updating the accumulator
value based on the  data fetched , 
eg: Number of partition = 100, number of cores =10
Ideally tasks will be launched in a group of 10 task/core, once the first group finishes the
tasks driver will check whether the accumulator value is been reached the limit value if its
reached then no further tasks will be launched to executors and the result after applying
limit will be returned.

Please let me now for any suggestions or solutions for the above mentioned problems

Thanks,
Sujith

  was:
Performance/memory bottle neck occurs in the below mentioned query
case 1:
create table t1 as select * from dest1 limit 10000000;
case 2:
create table t1 as select * from dest1 limit 1000;
pre-condition : partition count >=10000

In above cases limit is being added in the terminal of the physical plan 

== Physical Plan  ==
ExecutedCommand
   +- CreateHiveTableAsSelectCommand [Database:spark}, TableName: t2, InsertIntoHiveTable]
         +- GlobalLimit 10000000
            +- LocalLimit 10000000
               +- Project [imei#101, age#102, task#103L, num#104, level#105, productdate#106,
name#107, point#108]
                  +- SubqueryAlias hive
                     +- Relation[imei#101,age#102,task#103L,num#104,level#105,productdate#106,name#107,point#108]
csv  |
Issue Hints: 

Possible Bottleneck snippet in limit.scala file under spark-sql package.
  protected override def doExecute(): RDD[InternalRow] = {
    val locallyLimited = child.execute().mapPartitionsInternal(_.take(limit))
    val shuffled = new ShuffledRowRDD(
      ShuffleExchange.prepareShuffleDependency(
        locallyLimited, child.output, SinglePartition, serializer))
    shuffled.mapPartitionsInternal(_.take(limit))
  }

As mentioned in above case 1  (where limit value is 10000000 or partition count is > 10000)
and case 2(limit value is small(around 1000)), As per the above snippet when the ShuffledRowRDD
is created by grouping all the limit data from different partitions to a single partition
in executer,  memory issue occurs since all the partition limit data will be collected and

grouped  in a single partition for processing, in both former/later case the data count  can
go very high which can create the memory bottleneck.

Proposed solution for case 2:
An accumulator value can be to send to all partitions, all executor will be updating the accumulator
value based on the  data fetched , 
eg: Number of partition = 100, number of cores =10
Ideally tasks will be launched in a group of 10 task/core, once the first group finishes the
tasks driver will check whether the accumulator value is been reached the limit value if its
reached then no further tasks will be launched to executors and the result after applying
limit will be returned.

Please let me now for any suggestions or solutions for the above mentioned problems

Thanks,
Sujith


> Limit Query Performance issue
> -----------------------------
>
>                 Key: SPARK-19222
>                 URL: https://issues.apache.org/jira/browse/SPARK-19222
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 2.1.0
>         Environment: Linux/Windows
>            Reporter: Sujith
>            Priority: Minor
>
> Performance/memory bottle neck occurs in the below mentioned query
> case 1:
> create table t1 as select * from dest1 limit 10000000;
> case 2:
> create table t1 as select * from dest1 limit 1000;
> pre-condition : partition count >=10000
> (It'd be great if the code blocks are wrapped with {{ {code} {code} }}
> In above cases limit is being added in the terminal of the physical plan 
> == Physical Plan  ==
> ExecutedCommand
>    +- CreateHiveTableAsSelectCommand [Database:spark}, TableName: t2, InsertIntoHiveTable]
>          +- GlobalLimit 10000000
>             +- LocalLimit 10000000
>                +- Project [imei#101, age#102, task#103L, num#104, level#105, productdate#106,
name#107, point#108]
>                   +- SubqueryAlias hive
>                      +- Relation[imei#101,age#102,task#103L,num#104,level#105,productdate#106,name#107,point#108]
csv  |
> Issue Hints: 
> Possible Bottleneck snippet in limit.scala file under spark-sql package.
>   protected override def doExecute(): RDD[InternalRow] = {
>     val locallyLimited = child.execute().mapPartitionsInternal(_.take(limit))
>     val shuffled = new ShuffledRowRDD(
>       ShuffleExchange.prepareShuffleDependency(
>         locallyLimited, child.output, SinglePartition, serializer))
>     shuffled.mapPartitionsInternal(_.take(limit))
>   }
> As mentioned in above case 1  (where limit value is 10000000 or partition count is >
10000) and case 2(limit value is small(around 1000)), As per the above snippet when the ShuffledRowRDD
> is created by grouping all the limit data from different partitions to a single partition
in executer,  memory issue occurs since all the partition limit data will be collected and

> grouped  in a single partition for processing, in both former/later case the data count
 can go very high which can create the memory bottleneck.
> Proposed solution for case 2:
> An accumulator value can be to send to all partitions, all executor will be updating
the accumulator value based on the  data fetched , 
> eg: Number of partition = 100, number of cores =10
> Ideally tasks will be launched in a group of 10 task/core, once the first group finishes
the tasks driver will check whether the accumulator value is been reached the limit value
if its reached then no further tasks will be launched to executors and the result after applying
limit will be returned.
> Please let me now for any suggestions or solutions for the above mentioned problems
> Thanks,
> Sujith



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message