Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 276D4200C0C for ; Mon, 16 Jan 2017 04:58:35 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 261B4160B4F; Mon, 16 Jan 2017 03:58:35 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 47CA4160B32 for ; Mon, 16 Jan 2017 04:58:34 +0100 (CET) Received: (qmail 29154 invoked by uid 500); 16 Jan 2017 03:58:33 -0000 Mailing-List: contact issues-help@spark.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list issues@spark.apache.org Received: (qmail 29145 invoked by uid 99); 16 Jan 2017 03:58:33 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 16 Jan 2017 03:58:33 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id 09B341A0593 for ; Mon, 16 Jan 2017 03:58:33 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -1.999 X-Spam-Level: X-Spam-Status: No, score=-1.999 tagged_above=-999 required=6.31 tests=[KAM_LAZY_DOMAIN_SECURITY=1, RP_MATCHES_RCVD=-2.999] autolearn=disabled Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id 6ApHRQbYU95b for ; Mon, 16 Jan 2017 03:58:31 +0000 (UTC) Received: from mailrelay1-us-west.apache.org (mailrelay1-us-west.apache.org [209.188.14.139]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with ESMTP id E76BD5FE35 for ; Mon, 16 Jan 2017 03:58:30 +0000 (UTC) Received: from jira-lw-us.apache.org (unknown [207.244.88.139]) by mailrelay1-us-west.apache.org (ASF Mail Server at mailrelay1-us-west.apache.org) with ESMTP id 43AA2E8EE5 for ; Mon, 16 Jan 2017 03:58:29 +0000 (UTC) Received: from jira-lw-us.apache.org (localhost [127.0.0.1]) by jira-lw-us.apache.org (ASF Mail Server at jira-lw-us.apache.org) with ESMTP id C41852529A for ; Mon, 16 Jan 2017 03:58:26 +0000 (UTC) Date: Mon, 16 Jan 2017 03:58:26 +0000 (UTC) From: "Hyukjin Kwon (JIRA)" To: issues@spark.apache.org Message-ID: In-Reply-To: References: Subject: [jira] [Updated] (SPARK-19222) Limit Query Performance issue MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 7bit X-JIRA-FingerPrint: 30527f35849b9dde25b450d4833f0394 archived-at: Mon, 16 Jan 2017 03:58:35 -0000 [ https://issues.apache.org/jira/browse/SPARK-19222?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hyukjin Kwon updated SPARK-19222: --------------------------------- Description: Performance/memory bottle neck occurs in the below mentioned query case 1: {code} create table t1 as select * from dest1 limit 10000000; {code} case 2: {code} create table t1 as select * from dest1 limit 1000; pre-condition : partition count >=10000 {code} In above cases limit is being added in the terminal of the physical plan {code} == Physical Plan == ExecutedCommand +- CreateHiveTableAsSelectCommand [Database:spark}, TableName: t2, InsertIntoHiveTable] +- GlobalLimit 10000000 +- LocalLimit 10000000 +- Project [imei#101, age#102, task#103L, num#104, level#105, productdate#106, name#107, point#108] +- SubqueryAlias hive +- Relation[imei#101,age#102,task#103L,num#104,level#105,productdate#106,name#107,point#108] csv | {code} Issue Hints: Possible Bottleneck snippet in limit.scala file under spark-sql package. {code} protected override def doExecute(): RDD[InternalRow] = { val locallyLimited = child.execute().mapPartitionsInternal(_.take(limit)) val shuffled = new ShuffledRowRDD( ShuffleExchange.prepareShuffleDependency( locallyLimited, child.output, SinglePartition, serializer)) shuffled.mapPartitionsInternal(_.take(limit)) } {code} As mentioned in above case 1 (where limit value is 10000000 or partition count is > 10000) and case 2(limit value is small(around 1000)), As per the above snippet when the {{ShuffledRowRDD}} is created by grouping all the limit data from different partitions to a single partition in executer, memory issue occurs since all the partition limit data will be collected and grouped in a single partition for processing, in both former/later case the data count can go very high which can create the memory bottleneck. Proposed solution for case 2: An accumulator value can be to send to all partitions, all executor will be updating the accumulator value based on the data fetched , eg: Number of partition = 100, number of cores =10 Ideally tasks will be launched in a group of 10 task/core, once the first group finishes the tasks driver will check whether the accumulator value is been reached the limit value if its reached then no further tasks will be launched to executors and the result after applying limit will be returned. Please let me now for any suggestions or solutions for the above mentioned problems Thanks, Sujith was: Performance/memory bottle neck occurs in the below mentioned query case 1: create table t1 as select * from dest1 limit 10000000; case 2: create table t1 as select * from dest1 limit 1000; pre-condition : partition count >=10000 (It'd be great if the code blocks are wrapped with {{ {code} {code} }} In above cases limit is being added in the terminal of the physical plan == Physical Plan == ExecutedCommand +- CreateHiveTableAsSelectCommand [Database:spark}, TableName: t2, InsertIntoHiveTable] +- GlobalLimit 10000000 +- LocalLimit 10000000 +- Project [imei#101, age#102, task#103L, num#104, level#105, productdate#106, name#107, point#108] +- SubqueryAlias hive +- Relation[imei#101,age#102,task#103L,num#104,level#105,productdate#106,name#107,point#108] csv | Issue Hints: Possible Bottleneck snippet in limit.scala file under spark-sql package. protected override def doExecute(): RDD[InternalRow] = { val locallyLimited = child.execute().mapPartitionsInternal(_.take(limit)) val shuffled = new ShuffledRowRDD( ShuffleExchange.prepareShuffleDependency( locallyLimited, child.output, SinglePartition, serializer)) shuffled.mapPartitionsInternal(_.take(limit)) } As mentioned in above case 1 (where limit value is 10000000 or partition count is > 10000) and case 2(limit value is small(around 1000)), As per the above snippet when the ShuffledRowRDD is created by grouping all the limit data from different partitions to a single partition in executer, memory issue occurs since all the partition limit data will be collected and grouped in a single partition for processing, in both former/later case the data count can go very high which can create the memory bottleneck. Proposed solution for case 2: An accumulator value can be to send to all partitions, all executor will be updating the accumulator value based on the data fetched , eg: Number of partition = 100, number of cores =10 Ideally tasks will be launched in a group of 10 task/core, once the first group finishes the tasks driver will check whether the accumulator value is been reached the limit value if its reached then no further tasks will be launched to executors and the result after applying limit will be returned. Please let me now for any suggestions or solutions for the above mentioned problems Thanks, Sujith > Limit Query Performance issue > ----------------------------- > > Key: SPARK-19222 > URL: https://issues.apache.org/jira/browse/SPARK-19222 > Project: Spark > Issue Type: Bug > Components: SQL > Affects Versions: 2.1.0 > Environment: Linux/Windows > Reporter: Sujith > Priority: Minor > > Performance/memory bottle neck occurs in the below mentioned query > case 1: > {code} > create table t1 as select * from dest1 limit 10000000; > {code} > case 2: > {code} > create table t1 as select * from dest1 limit 1000; > pre-condition : partition count >=10000 > {code} > In above cases limit is being added in the terminal of the physical plan > {code} > == Physical Plan == > ExecutedCommand > +- CreateHiveTableAsSelectCommand [Database:spark}, TableName: t2, InsertIntoHiveTable] > +- GlobalLimit 10000000 > +- LocalLimit 10000000 > +- Project [imei#101, age#102, task#103L, num#104, level#105, productdate#106, name#107, point#108] > +- SubqueryAlias hive > +- Relation[imei#101,age#102,task#103L,num#104,level#105,productdate#106,name#107,point#108] csv | > {code} > Issue Hints: > Possible Bottleneck snippet in limit.scala file under spark-sql package. > {code} > protected override def doExecute(): RDD[InternalRow] = { > val locallyLimited = child.execute().mapPartitionsInternal(_.take(limit)) > val shuffled = new ShuffledRowRDD( > ShuffleExchange.prepareShuffleDependency( > locallyLimited, child.output, SinglePartition, serializer)) > shuffled.mapPartitionsInternal(_.take(limit)) > } > {code} > As mentioned in above case 1 (where limit value is 10000000 or partition count is > 10000) and case 2(limit value is small(around 1000)), As per the above snippet when the {{ShuffledRowRDD}} > is created by grouping all the limit data from different partitions to a single partition in executer, memory issue occurs since all the partition limit data will be collected and > grouped in a single partition for processing, in both former/later case the data count can go very high which can create the memory bottleneck. > Proposed solution for case 2: > An accumulator value can be to send to all partitions, all executor will be updating the accumulator value based on the data fetched , > eg: Number of partition = 100, number of cores =10 > Ideally tasks will be launched in a group of 10 task/core, once the first group finishes the tasks driver will check whether the accumulator value is been reached the limit value if its reached then no further tasks will be launched to executors and the result after applying limit will be returned. > Please let me now for any suggestions or solutions for the above mentioned problems > Thanks, > Sujith -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org For additional commands, e-mail: issues-help@spark.apache.org