spark-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Michael Allman <>
Subject Re: Scaling partitioned Hive table support
Date Mon, 08 Aug 2016 21:01:03 GMT
Hi Eric,

Thanks for your feedback. I'm rebasing my code for the first approach on a more recent Spark
master and am resolving some conflicts. I'll have a better understanding of the relationship
to your PR once my rebase is complete.



> On Aug 8, 2016, at 12:51 PM, Eric Liang <> wrote:
> I like the former approach -- it seems more generally applicable to other catalogs and
IIUC would let you defer pruning until execution time. Pruning is work that should be done
by the catalog anyways, as is the case when querying over an (unconverted) hive table.
> You might also want to look at <>
, which refactors some of the file scan execution to defer pruning.
> On Mon, Aug 8, 2016, 11:53 AM Michael Allman < <>>
> Hello,
> I'd like to propose a modification in the way Hive table partition metadata are loaded
and cached. Currently, when a user reads from a partitioned Hive table whose metadata are
not cached (and for which Hive table conversion is enabled and supported), all partition metadata
is fetched from the metastore:
> This is highly inefficient in some scenarios. In the most extreme case, a user starts
a new Spark app, runs a query which reads from a single partition in a table with a large
number of partitions and terminates their app. All partition metadata are loaded and their
files' schema are merged, but only a single partition is read. Instead, I propose we load
and cache partition metadata on-demand, as needed to build query plans.
> We've long encountered this performance problem at VideoAmp and have taken different
approaches to address it. In addition to the load time, we've found that loading all of a
table's partition metadata can require a significant amount of JVM heap space. Our largest
tables OOM our Spark drivers unless we allocate several GB of heap space.
> Certainly one could argue that our situation is pathological and rare, and that the problem
in our scenario is with the design of our tables—not Spark. However, even in tables with
more modest numbers of partitions, loading only the necessary partition metadata and file
schema can significantly reduce the query planning time, and is definitely more memory efficient.
> I've written POCs for a couple of different implementation approaches. Though incomplete,
both have been successful in their basic goal. The first extends `org.apache.spark.sql.catalyst.catalog.ExternalCatalog`
and as such is more general. It requires some new abstractions and refactoring of `HadoopFsRelation`
and `FileCatalog`, among others. It places a greater burden on other implementations of `ExternalCatalog`.
Currently the only other implementation of `ExternalCatalog` is `InMemoryCatalog`, and my
code throws an `UnsupportedOperationException` on that implementation.
> The other approach is simpler and only touches code in the codebase's `hive` project.
Basically, conversion of `MetastoreRelation` to `HadoopFsRelation` is deferred to physical
planning when the metastore relation is partitioned. During physical planning, the partition
pruning filters in a logical query plan are used to select the required partition metadata
and a `HadoopFsRelation` is built from those. The new logical plan is then re-injected into
the planner.
> I'd like to get the community's thoughts on my proposal and implementation approaches.
> Thanks!
> Michael

View raw message