Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 0CD54200B7F for ; Sun, 11 Sep 2016 15:56:22 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 0B812160AD4; Sun, 11 Sep 2016 13:56:22 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 58060160AAA for ; Sun, 11 Sep 2016 15:56:21 +0200 (CEST) Received: (qmail 43436 invoked by uid 500); 11 Sep 2016 13:56:20 -0000 Mailing-List: contact issues-help@spark.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list issues@spark.apache.org Received: (qmail 43423 invoked by uid 99); 11 Sep 2016 13:56:20 -0000 Received: from arcas.apache.org (HELO arcas) (140.211.11.28) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 11 Sep 2016 13:56:20 +0000 Received: from arcas.apache.org (localhost [127.0.0.1]) by arcas (Postfix) with ESMTP id 6CFD02C014F for ; Sun, 11 Sep 2016 13:56:20 +0000 (UTC) Date: Sun, 11 Sep 2016 13:56:20 +0000 (UTC) From: "Fridtjof Sander (JIRA)" To: issues@spark.apache.org Message-ID: In-Reply-To: References: Subject: [jira] [Updated] (SPARK-17497) Preserve order when scanning ordered buckets over multiple partitions MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 7bit X-JIRA-FingerPrint: 30527f35849b9dde25b450d4833f0394 archived-at: Sun, 11 Sep 2016 13:56:22 -0000 [ https://issues.apache.org/jira/browse/SPARK-17497?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Fridtjof Sander updated SPARK-17497: ------------------------------------ Description: Non-associative aggregations (like ```collect_list```) require the data to be sorted on the grouping key in order to extract aggregation-groups. Let `table` be a Hive-table, that is partitioned on `p` and bucketed and sorted on `id`. Let `q` be a query, that executes a non-associative aggregation on `table.id` over multiple partitions `p`. Currently, when executing `q`, Spark creates as many RDD-partitions as there are buckets. Each RDD-partition is created in `FileScanRDD`, by fetching the associated buckets in all requested Hive-partitions. Because the buckets are read one-by-one, the resulting RDD-partition is no longer sorted on `id` and has to be explicitly sorted before performing the aggregation. Therefore an execution-pipeline-block is introduced. In this Jira I propose to offer an alternative bucket-fetching strategy to the optimizer, that preserves the internal sorting in a situation described above. One way to achieve this, is to open all buckets over all partitions simultaneously when fetching the data. Since each bucket is internally sorted, we can perform basically a merge-sort on the collection of bucket-iterators, and directly emit a sorted RDD-partition, that can be piped into the next operator. While there should be no question about the theoretical feasibility of this idea, there are some obvious implications i.e. with regards to IO-handling. I would like to investigate the practical feasibility, limits, gains and drawbacks of this optimization in my masters-thesis and, of course, contribute the implementation. Before I start, however, I wanted to kindly ask you, the community, for any thoughts, opinions, corrections or other kinds of feedback, which is much appreciated. was: Non-associative aggregations (like `collect_list`) require the data to be sorted on the grouping key in order to extract aggregation-groups. Let `table` be a Hive-table, that is partitioned on `p` and bucketed and sorted on `id`. Let `q` be a query, that executes a non-associative aggregation on `table.id` over multiple partitions `p`. Currently, when executing `q`, Spark creates as many RDD-partitions as there are buckets. Each RDD-partition is created in `FileScanRDD`, by fetching the associated buckets in all requested Hive-partitions. Because the buckets are read one-by-one, the resulting RDD-partition is no longer sorted on `id` and has to be explicitly sorted before performing the aggregation. Therefore an execution-pipeline-block is introduced. In this Jira I propose to offer an alternative bucket-fetching strategy to the optimizer, that preserves the internal sorting in a situation described above. One way to achieve this, is to open all buckets over all partitions simultaneously when fetching the data. Since each bucket is internally sorted, we can perform basically a merge-sort on the collection of bucket-iterators, and directly emit a sorted RDD-partition, that can be piped into the next operator. While there should be no question about the theoretical feasibility of this idea, there are some obvious implications i.e. with regards to IO-handling. I would like to investigate the practical feasibility, limits, gains and drawbacks of this optimization in my masters-thesis and, of course, contribute the implementation. Before I start, however, I wanted to kindly ask you, the community, for any thoughts, opinions, corrections or other kinds of feedback, which is much appreciated. > Preserve order when scanning ordered buckets over multiple partitions > --------------------------------------------------------------------- > > Key: SPARK-17497 > URL: https://issues.apache.org/jira/browse/SPARK-17497 > Project: Spark > Issue Type: Improvement > Components: SQL > Reporter: Fridtjof Sander > Priority: Minor > > Non-associative aggregations (like ```collect_list```) require the data to be sorted on the grouping key in order to extract aggregation-groups. > Let `table` be a Hive-table, that is partitioned on `p` and bucketed and sorted on `id`. Let `q` be a query, that executes a non-associative aggregation on `table.id` over multiple partitions `p`. > Currently, when executing `q`, Spark creates as many RDD-partitions as there are buckets. Each RDD-partition is created in `FileScanRDD`, by fetching the associated buckets in all requested Hive-partitions. Because the buckets are read one-by-one, the resulting RDD-partition is no longer sorted on `id` and has to be explicitly sorted before performing the aggregation. Therefore an execution-pipeline-block is introduced. > In this Jira I propose to offer an alternative bucket-fetching strategy to the optimizer, that preserves the internal sorting in a situation described above. > One way to achieve this, is to open all buckets over all partitions simultaneously when fetching the data. Since each bucket is internally sorted, we can perform basically a merge-sort on the collection of bucket-iterators, and directly emit a sorted RDD-partition, that can be piped into the next operator. > While there should be no question about the theoretical feasibility of this idea, there are some obvious implications i.e. with regards to IO-handling. > I would like to investigate the practical feasibility, limits, gains and drawbacks of this optimization in my masters-thesis and, of course, contribute the implementation. Before I start, however, I wanted to kindly ask you, the community, for any thoughts, opinions, corrections or other kinds of feedback, which is much appreciated. -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org For additional commands, e-mail: issues-help@spark.apache.org