spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Fridtjof Sander (JIRA)" <j...@apache.org>
Subject [jira] [Updated] (SPARK-17497) Preserve order when scanning ordered buckets over multiple partitions
Date Sun, 11 Sep 2016 13:56:20 GMT

     [ https://issues.apache.org/jira/browse/SPARK-17497?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]

Fridtjof Sander updated SPARK-17497:
------------------------------------
    Description: 
Non-associative aggregations (like ```collect_list```) require the data to be sorted on the
grouping key in order to extract aggregation-groups.

Let `table` be a Hive-table, that is partitioned on `p` and bucketed and sorted on `id`. Let
`q` be a query, that executes a non-associative aggregation on `table.id` over multiple partitions
`p`.

Currently, when executing `q`, Spark creates as many RDD-partitions as there are buckets.
Each RDD-partition is created in `FileScanRDD`, by fetching the associated buckets in all
requested Hive-partitions. Because the buckets are read one-by-one, the resulting RDD-partition
is no longer sorted on `id` and has to be explicitly sorted before performing the aggregation.
Therefore an execution-pipeline-block is introduced.

In this Jira I propose to offer an alternative bucket-fetching strategy to the optimizer,
that preserves the internal sorting in a situation described above.

One way to achieve this, is to open all buckets over all partitions simultaneously when fetching
the data. Since each bucket is internally sorted, we can perform basically a merge-sort on
the collection of bucket-iterators, and directly emit a sorted RDD-partition, that can be
piped into the next operator.

While there should be no question about the theoretical feasibility of this idea, there are
some obvious implications i.e. with regards to IO-handling.

I would like to investigate the practical feasibility, limits, gains and drawbacks of this
optimization in my masters-thesis and, of course, contribute the implementation. Before I
start, however, I wanted to kindly ask you, the community, for any thoughts, opinions, corrections
or other kinds of feedback, which is much appreciated.

  was:
Non-associative aggregations (like `collect_list`) require the data to be sorted on the grouping
key in order to extract aggregation-groups.

Let `table` be a Hive-table, that is partitioned on `p` and bucketed and sorted on `id`. Let
`q` be a query, that executes a non-associative aggregation on `table.id` over multiple partitions
`p`.

Currently, when executing `q`, Spark creates as many RDD-partitions as there are buckets.
Each RDD-partition is created in `FileScanRDD`, by fetching the associated buckets in all
requested Hive-partitions. Because the buckets are read one-by-one, the resulting RDD-partition
is no longer sorted on `id` and has to be explicitly sorted before performing the aggregation.
Therefore an execution-pipeline-block is introduced.

In this Jira I propose to offer an alternative bucket-fetching strategy to the optimizer,
that preserves the internal sorting in a situation described above.

One way to achieve this, is to open all buckets over all partitions simultaneously when fetching
the data. Since each bucket is internally sorted, we can perform basically a merge-sort on
the collection of bucket-iterators, and directly emit a sorted RDD-partition, that can be
piped into the next operator.

While there should be no question about the theoretical feasibility of this idea, there are
some obvious implications i.e. with regards to IO-handling.

I would like to investigate the practical feasibility, limits, gains and drawbacks of this
optimization in my masters-thesis and, of course, contribute the implementation. Before I
start, however, I wanted to kindly ask you, the community, for any thoughts, opinions, corrections
or other kinds of feedback, which is much appreciated.


> Preserve order when scanning ordered buckets over multiple partitions
> ---------------------------------------------------------------------
>
>                 Key: SPARK-17497
>                 URL: https://issues.apache.org/jira/browse/SPARK-17497
>             Project: Spark
>          Issue Type: Improvement
>          Components: SQL
>            Reporter: Fridtjof Sander
>            Priority: Minor
>
> Non-associative aggregations (like ```collect_list```) require the data to be sorted
on the grouping key in order to extract aggregation-groups.
> Let `table` be a Hive-table, that is partitioned on `p` and bucketed and sorted on `id`.
Let `q` be a query, that executes a non-associative aggregation on `table.id` over multiple
partitions `p`.
> Currently, when executing `q`, Spark creates as many RDD-partitions as there are buckets.
Each RDD-partition is created in `FileScanRDD`, by fetching the associated buckets in all
requested Hive-partitions. Because the buckets are read one-by-one, the resulting RDD-partition
is no longer sorted on `id` and has to be explicitly sorted before performing the aggregation.
Therefore an execution-pipeline-block is introduced.
> In this Jira I propose to offer an alternative bucket-fetching strategy to the optimizer,
that preserves the internal sorting in a situation described above.
> One way to achieve this, is to open all buckets over all partitions simultaneously when
fetching the data. Since each bucket is internally sorted, we can perform basically a merge-sort
on the collection of bucket-iterators, and directly emit a sorted RDD-partition, that can
be piped into the next operator.
> While there should be no question about the theoretical feasibility of this idea, there
are some obvious implications i.e. with regards to IO-handling.
> I would like to investigate the practical feasibility, limits, gains and drawbacks of
this optimization in my masters-thesis and, of course, contribute the implementation. Before
I start, however, I wanted to kindly ask you, the community, for any thoughts, opinions, corrections
or other kinds of feedback, which is much appreciated.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message