drill-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From James Taylor <jamestay...@apache.org>
Subject Re: Partial aggregation in Drill-on-Phoenix
Date Fri, 09 Oct 2015 17:30:17 GMT
Thanks for the updates to the patch, Maryann. It's looking very good - this
will perform better I believe. I made a few comments on the pull request.

FYI, I filed PHOENIX-2316 to add the missing information (namely the region
server that the parallelized scan will go to) so that I can improve the
assignment logic.

     James

On Wed, Oct 7, 2015 at 1:11 PM, Maryann Xue <maryann.xue@gmail.com> wrote:

> Made another checkin for the pull request. All good now.
>
> In order to compile and run, be sure to update the Phoenix project under
> Julian's branch.
>
>
> Thanks,
> Maryann
>
> On Wed, Oct 7, 2015 at 12:19 PM, Jacques Nadeau <jacques@dremio.com>
> wrote:
>
>> I just filed a jira for the merge issue:
>>
>> https://issues.apache.org/jira/browse/DRILL-3907
>>
>> --
>> Jacques Nadeau
>> CTO and Co-Founder, Dremio
>>
>> On Wed, Oct 7, 2015 at 8:54 AM, Jacques Nadeau <jacques@dremio.com>
>> wrote:
>>
>>> Drill doesn't currently have a merge-sort operation available outside
>>> the context of an exchange. See here:
>>>
>>>
>>> https://github.com/apache/drill/tree/master/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver
>>>
>>> We'll need to do a bit of refactoring to provide this functionality
>>> outside the context of an exchange. The one other thing we'll have to think
>>> about in this context is how do we avoid doing a n-way merge in the case
>>> that the we're not using the collation.
>>>
>>> --
>>> Jacques Nadeau
>>> CTO and Co-Founder, Dremio
>>>
>>> On Wed, Oct 7, 2015 at 8:18 AM, Maryann Xue <maryann.xue@gmail.com>
>>> wrote:
>>>
>>>> One thing from what I asked James offline yesterday, and maybe we can
>>>> discuss a little bit in today's meeting:
>>>>
>>>> Phoenix uses a list of lists of Scan objects to indicate Region
>>>> boundaries and guideposts, and if the top-level list contains more than one
>>>> element it means that the results from different Scanner/ResultIterator
>>>> should be merge-sorted. We now use this list in Drill integration to
>>>> generate different batches or slices. I see from the Drill plan of a simple
>>>> select like "SELECT * FROM A.BEER" that a Drill Sort node sits on top of
>>>> the PhoenixTableScan. I guess this is a real sort rather than a merge-sort.
>>>> So optimally,
>>>> 1) this should be a merge-sort (to be more accurate, a merge)
>>>> 2) furthermore, if Drill has something to indicate the order among
>>>> slices and batches, we could even turn it into a concat.
>>>>
>>>> The structure of this Scan list might be helpful for 2), or we may have
>>>> some Logical representation for this. Otherwise, we can simply flatten this
>>>> list to a one-dimensional list as we do now (in my ci yesterday).
>>>>
>>>>
>>>>
>>>> Thanks,
>>>> Maryann
>>>>
>>>> On Tue, Oct 6, 2015 at 9:52 PM, Maryann Xue <maryann.xue@gmail.com>
>>>> wrote:
>>>>
>>>>> Yes, but the partially aggregated results will not contain any
>>>>> duplicate rowkeys, since they are also group-by keys. What we need is
the
>>>>> aggregators and call aggregate for each row. We can write a new simpler
>>>>> ResultIterator to replace this, but for now it should work correctly.
>>>>>
>>>>> On Tue, Oct 6, 2015 at 9:45 PM, James Taylor <jamestaylor@apache.org>
>>>>> wrote:
>>>>>
>>>>>> The results we get back from the server-side scan are already the
>>>>>> partial aggregated values we need. GroupedAggregatingResultIterator
>>>>>> will collapse adjacent Tuples together which happen to have the same
row
>>>>>> key. I'm not sure we want/need this to happen. Instead I think we
just need
>>>>>> to decode the aggregated values directly from the result of the scan.
>>>>>>
>>>>>> On Tue, Oct 6, 2015 at 6:07 PM, Maryann Xue <maryann.xue@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi James,
>>>>>>>
>>>>>>> bq. A few questions for you: not sure I understand the changes
you
>>>>>>> made to PhoenixRecordReader. Is it necessary to wrap the server-side
scan
>>>>>>> results in a GroupedAggregatingResultIterator? Each server-side
scan will
>>>>>>> produce results with a single tuple per group by key. In Phoenix,
the
>>>>>>> GroupedAggregatingResultIterator's function in life is to do
the final
>>>>>>> merge. Note too that the results aren't sorted that come back
from the
>>>>>>> aggregated scan (while GroupedAggregatingResultIterator needs
tuples sorted
>>>>>>> by the group by key). Or is this just to help in decoding the
values coming
>>>>>>> back from the scan?
>>>>>>>
>>>>>>> It is necessary. I suppose what we should return as a partial
result
>>>>>>> from PhoenixRecordReader is exactly the same as what we do in
standalone
>>>>>>> Phoenix+Calcite, except that the result is partial or say incomplete.
For
>>>>>>> example, we have "select a, count(*) from t group by a", we should
return
>>>>>>> rows that have "a" as the first expression value, and "count(*)"
as the
>>>>>>> second expression value. For this "count" expression, it actually
needs a
>>>>>>> ClientAggregator for evaluation, and that's what this
>>>>>>> GroupedAggregatingResultIterator is used for.
>>>>>>> Since "each server-side scan will produce results with a single
>>>>>>> tuple per group by key", and PhoenixRecordReader is only dealing
with one
>>>>>>> server-side result each time, we don't care how the group-by
keys are
>>>>>>> arranged (ordered or unordered"). Actually
>>>>>>> GroupedAggregatingResultIterator is not the group-by iterator
we
>>>>>>> use for AggregatePlan. It does not "combine". It treats every
row as a
>>>>>>> different group, by returning its rowkey as the group-by key
(
>>>>>>> GroupedAggregatingResultIterator.java:56).
>>>>>>>
>>>>>>> In short, this iterator is for decoding the server-side values.
So
>>>>>>> we may want to optimize this logic by removing this serialization
and
>>>>>>> deserialization and having only one set of aggregators in future.
>>>>>>>
>>>>>>> bq. Also, not sure what impact it has in the way we "combine"
the
>>>>>>> scans in our Drill parallelization code (PhoenixGroupScan.applyAssignments()),
>>>>>>> as each of our scans could include duplicate group by keys. Is
it ok to
>>>>>>> combine them in this case?
>>>>>>>
>>>>>>> It should not matter, or at least is not related to the problem
I'm
>>>>>>> now having.
>>>>>>>
>>>>>>> bq. One more question: how is the group by key communicated back
to
>>>>>>> Drill?
>>>>>>>
>>>>>>> According to the HashAggPrule, if it decides to create a two-phase
>>>>>>> aggregate, the first phase is now handled by Phoenix (after applying
the
>>>>>>> PhoenixHashAggPrule). I assume then the partial results gets
shuffled based
>>>>>>> on the hash of their group-by keys (returned by PhoenixRecordReader).
The
>>>>>>> final step is the Drill hash aggregation.
>>>>>>>
>>>>>>>
>>>>>>> This is my test table "A.BEER", which has for columns: "B", "E1",
>>>>>>> "E2", "R", all of INTEGER types. And the data is generated like
this:
>>>>>>> for (x=1 to N) { //currently N=1000
>>>>>>>  UPSERT INTO A.BEER VALUES (x, x % 10, x % 100, x);
>>>>>>> }
>>>>>>>
>>>>>>> The group-by query for testing is "SELECT e1, count(*) FROM a.beer
>>>>>>> GROUP BY e1".
>>>>>>> The expected result should be:
>>>>>>> 0 100
>>>>>>> 1 100
>>>>>>> 2 100
>>>>>>> 3 100
>>>>>>> 4 100
>>>>>>> 5 100
>>>>>>> 6 100
>>>>>>> 7 100
>>>>>>> 8 100
>>>>>>> 9 100
>>>>>>> The actual result was:
>>>>>>> 6 0
>>>>>>> 7 0
>>>>>>> 8 0
>>>>>>> 9 0
>>>>>>> 0 0
>>>>>>> 1 100
>>>>>>> 2 100
>>>>>>> 3 100
>>>>>>> 4 100
>>>>>>> 5 100
>>>>>>>
>>>>>>> Here I just tried another one "SELECT e2, count(*) FROM a.beer
>>>>>>> GROUP BY e2".
>>>>>>> Similarly, the expected result should have group-by keys from
0 to
>>>>>>> 99, each having a value of 10 as the count, while the actual
result was:
>>>>>>> from group-by key 86 to 99, together with 0, their count values
were
>>>>>>> all 0; the rest (1 to 85) all had the correct value 10.
>>>>>>>
>>>>>>> Looks to me that the scans were good but there was a problem
with
>>>>>>> one of the hash buckets.
>>>>>>>
>>>>>>> Thanks,
>>>>>>> Maryann
>>>>>>>
>>>>>>>
>>>>>>> On Tue, Oct 6, 2015 at 6:45 PM, James Taylor <jamestaylor@apache.org
>>>>>>> > wrote:
>>>>>>>
>>>>>>>> Nice progress, Maryann.
>>>>>>>>
>>>>>>>> A few questions for you: not sure I understand the changes
you made
>>>>>>>> to PhoenixRecordReader. Is it necessary to wrap the server-side
scan
>>>>>>>> results in a GroupedAggregatingResultIterator? Each server-side
scan will
>>>>>>>> produce results with a single tuple per group by key. In
Phoenix, the
>>>>>>>> GroupedAggregatingResultIterator's function in life is to
do the final
>>>>>>>> merge. Note too that the results aren't sorted that come
back from the
>>>>>>>> aggregated scan (while GroupedAggregatingResultIterator needs
tuples sorted
>>>>>>>> by the group by key). Or is this just to help in decoding
the values coming
>>>>>>>> back from the scan?
>>>>>>>>
>>>>>>>> Also, not sure what impact it has in the way we "combine"
the scans
>>>>>>>> in our Drill parallelization code (PhoenixGroupScan.applyAssignments()),
as
>>>>>>>> each of our scans could include duplicate group by keys.
Is it ok to
>>>>>>>> combine them in this case?
>>>>>>>>
>>>>>>>> One more question: how is the group by key communicated back
to
>>>>>>>> Drill?
>>>>>>>>
>>>>>>>> Thanks,
>>>>>>>> James
>>>>>>>>
>>>>>>>>
>>>>>>>> On Tue, Oct 6, 2015 at 2:10 PM, Maryann Xue <maryann.xue@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Added a few fixes in the pull request. Tested with two
regions,
>>>>>>>>> turned out that half of the result is empty (count =
0).
>>>>>>>>> Not sure if there's anything wrong with
>>>>>>>>> https://github.com/maryannxue/drill/blob/phoenix_plugin/contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/rel/PhoenixHashAggPrule.java
>>>>>>>>> .
>>>>>>>>> Like Julian said, this rule looks a bit hacky.
>>>>>>>>>
>>>>>>>>> To force a 2-phase HashAgg, I made a temporary change
as well:
>>>>>>>>>
>>>>>>>>> diff --git
>>>>>>>>> a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java
>>>>>>>>> b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java
>>>>>>>>>
>>>>>>>>> index b911f6b..58bc918 100644
>>>>>>>>>
>>>>>>>>> ---
>>>>>>>>> a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java
>>>>>>>>>
>>>>>>>>> +++
>>>>>>>>> b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java
>>>>>>>>>
>>>>>>>>> @@ -60,12 +60,12 @@ public abstract class AggPruleBase
extends
>>>>>>>>> Prule {
>>>>>>>>>
>>>>>>>>>    // If any of the aggregate functions are not one of
these, then
>>>>>>>>> we
>>>>>>>>>
>>>>>>>>>    // currently won't generate a 2 phase plan.
>>>>>>>>>
>>>>>>>>>    protected boolean create2PhasePlan(RelOptRuleCall
call,
>>>>>>>>> DrillAggregateRel aggregate) {
>>>>>>>>>
>>>>>>>>> -    PlannerSettings settings =
>>>>>>>>> PrelUtil.getPlannerSettings(call.getPlanner());
>>>>>>>>>
>>>>>>>>> -    RelNode child = call.rel(0).getInputs().get(0);
>>>>>>>>>
>>>>>>>>> -    boolean smallInput = child.getRows() <
>>>>>>>>> settings.getSliceTarget();
>>>>>>>>>
>>>>>>>>> -    if (! settings.isMultiPhaseAggEnabled() ||
>>>>>>>>> settings.isSingleMode() || smallInput) {
>>>>>>>>>
>>>>>>>>> -      return false;
>>>>>>>>>
>>>>>>>>> -    }
>>>>>>>>>
>>>>>>>>> +//    PlannerSettings settings =
>>>>>>>>> PrelUtil.getPlannerSettings(call.getPlanner());
>>>>>>>>>
>>>>>>>>> +//    RelNode child = call.rel(0).getInputs().get(0);
>>>>>>>>>
>>>>>>>>> +//    boolean smallInput = child.getRows() <
>>>>>>>>> settings.getSliceTarget();
>>>>>>>>>
>>>>>>>>> +//    if (! settings.isMultiPhaseAggEnabled() ||
>>>>>>>>> settings.isSingleMode() || smallInput) {
>>>>>>>>>
>>>>>>>>> +//      return false;
>>>>>>>>>
>>>>>>>>> +//    }
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>      for (AggregateCall aggCall : aggregate.getAggCallList())
{
>>>>>>>>>
>>>>>>>>>        String name = aggCall.getAggregation().getName();
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Thanks,
>>>>>>>>> Maryann
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Tue, Oct 6, 2015 at 2:31 PM, Julian Hyde <jhyde@apache.org>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Drill's current approach seems adequate for Drill
alone but
>>>>>>>>>> extending
>>>>>>>>>> it to a heterogenous system that includes Phoenix
seems like a
>>>>>>>>>> hack.
>>>>>>>>>>
>>>>>>>>>> I think you should only create Prels for algebra
nodes that you
>>>>>>>>>> know
>>>>>>>>>> for sure are going to run on the Drill engine. If
there's a
>>>>>>>>>> possibility that it would run in another engine such
as Phoenix
>>>>>>>>>> then
>>>>>>>>>> they should still be logical.
>>>>>>>>>>
>>>>>>>>>> On Tue, Oct 6, 2015 at 11:03 AM, Maryann Xue <
>>>>>>>>>> maryann.xue@gmail.com> wrote:
>>>>>>>>>> > The partial aggregate seems to be working now,
with one
>>>>>>>>>> interface extension
>>>>>>>>>> > and one bug fix in the Phoenix project. Will
do some code
>>>>>>>>>> cleanup and
>>>>>>>>>> > create a pull request soon.
>>>>>>>>>> >
>>>>>>>>>> > Still there was a hack in the Drill project
which I made to
>>>>>>>>>> force 2-phase
>>>>>>>>>> > aggregation. I'll try to fix that.
>>>>>>>>>> >
>>>>>>>>>> > Jacques, I have one question though, how can
I verify that
>>>>>>>>>> there are more
>>>>>>>>>> > than one slice and the shuffle happens?
>>>>>>>>>> >
>>>>>>>>>> >
>>>>>>>>>> > Thanks,
>>>>>>>>>> > Maryann
>>>>>>>>>> >
>>>>>>>>>> > On Mon, Oct 5, 2015 at 2:03 PM, James Taylor
<
>>>>>>>>>> jamestaylor@apache.org> wrote:
>>>>>>>>>> >
>>>>>>>>>> >> Maryann,
>>>>>>>>>> >> I believe Jacques mentioned that a little
bit of refactoring
>>>>>>>>>> is required
>>>>>>>>>> >> for a merge sort to occur - there's something
that does that,
>>>>>>>>>> but it's not
>>>>>>>>>> >> expected to be used in this context currently.
>>>>>>>>>> >>
>>>>>>>>>> >> IMHO, there's more of a clear value in getting
the aggregation
>>>>>>>>>> to use
>>>>>>>>>> >> Phoenix first, so I'd recommend going down
that road as
>>>>>>>>>> Jacques mentioned
>>>>>>>>>> >> above if possible. Once that's working,
we can circle back to
>>>>>>>>>> the partial
>>>>>>>>>> >> sort.
>>>>>>>>>> >>
>>>>>>>>>> >> Thoughts?
>>>>>>>>>> >> James
>>>>>>>>>> >>
>>>>>>>>>> >> On Mon, Oct 5, 2015 at 10:40 AM, Maryann
Xue <
>>>>>>>>>> maryann.xue@gmail.com>
>>>>>>>>>> >> wrote:
>>>>>>>>>> >>
>>>>>>>>>> >>> I actually tried implementing partial
sort with
>>>>>>>>>> >>> https://github.com/jacques-n/drill/pull/4,
which I figured
>>>>>>>>>> might be a
>>>>>>>>>> >>> little easier to start with than partial
aggregation. But I
>>>>>>>>>> found that even
>>>>>>>>>> >>> though the code worked (returned the
right results), the
>>>>>>>>>> Drill side sort
>>>>>>>>>> >>> turned out to be a ordinary sort instead
of a merge which it
>>>>>>>>>> should have
>>>>>>>>>> >>> been. Any idea of how to fix that?
>>>>>>>>>> >>>
>>>>>>>>>> >>>
>>>>>>>>>> >>> Thanks,
>>>>>>>>>> >>> Maryann
>>>>>>>>>> >>>
>>>>>>>>>> >>> On Mon, Oct 5, 2015 at 12:52 PM, Jacques
Nadeau <
>>>>>>>>>> jacques@dremio.com>
>>>>>>>>>> >>> wrote:
>>>>>>>>>> >>>
>>>>>>>>>> >>>> Right now this type of work is done
here:
>>>>>>>>>> >>>>
>>>>>>>>>> >>>>
>>>>>>>>>> >>>>
>>>>>>>>>> https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrule.java
>>>>>>>>>> >>>>
>>>>>>>>>> >>>>
>>>>>>>>>> https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java
>>>>>>>>>> >>>>
>>>>>>>>>> >>>> With Distribution Trait application
here:
>>>>>>>>>> >>>>
>>>>>>>>>> >>>>
>>>>>>>>>> https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/DrillDistributionTraitDef.java
>>>>>>>>>> >>>>
>>>>>>>>>> >>>> To me, the easiest way to solve
the Phoenix issue is by
>>>>>>>>>> providing a rule
>>>>>>>>>> >>>> that matches HashAgg and StreamAgg
but requires Phoenix
>>>>>>>>>> convention as
>>>>>>>>>> >>>> input. It would replace everywhere
but would only be
>>>>>>>>>> plannable when it is
>>>>>>>>>> >>>> the first phase of aggregation.
>>>>>>>>>> >>>>
>>>>>>>>>> >>>> Thoughts?
>>>>>>>>>> >>>>
>>>>>>>>>> >>>>
>>>>>>>>>> >>>>
>>>>>>>>>> >>>> --
>>>>>>>>>> >>>> Jacques Nadeau
>>>>>>>>>> >>>> CTO and Co-Founder, Dremio
>>>>>>>>>> >>>>
>>>>>>>>>> >>>> On Thu, Oct 1, 2015 at 2:30 PM,
Julian Hyde <
>>>>>>>>>> jhyde@apache.org> wrote:
>>>>>>>>>> >>>>
>>>>>>>>>> >>>>> Phoenix is able to perform quite
a few relational
>>>>>>>>>> operations on the
>>>>>>>>>> >>>>> region server: scan, filter,
project, aggregate, sort
>>>>>>>>>> (optionally with
>>>>>>>>>> >>>>> limit). However, the sort and
aggregate are necessarily
>>>>>>>>>> "local". They
>>>>>>>>>> >>>>> can only deal with data on that
region server, and there
>>>>>>>>>> needs to be a
>>>>>>>>>> >>>>> further operation to combine
the results from the region
>>>>>>>>>> servers.
>>>>>>>>>> >>>>>
>>>>>>>>>> >>>>> The question is how to plan
such queries. I think the
>>>>>>>>>> answer is an
>>>>>>>>>> >>>>> AggregateExchangeTransposeRule.
>>>>>>>>>> >>>>>
>>>>>>>>>> >>>>> The rule would spot an Aggregate
on a data source that is
>>>>>>>>>> split into
>>>>>>>>>> >>>>> multiple locations (partitions)
and split it into a partial
>>>>>>>>>> Aggregate
>>>>>>>>>> >>>>> that computes sub-totals and
a summarizing Aggregate that
>>>>>>>>>> combines
>>>>>>>>>> >>>>> those totals.
>>>>>>>>>> >>>>>
>>>>>>>>>> >>>>> How does the planner know that
the Aggregate needs to be
>>>>>>>>>> split? Since
>>>>>>>>>> >>>>> the data's distribution has
changed, there would need to be
>>>>>>>>>> an
>>>>>>>>>> >>>>> Exchange operator. It is the
Exchange operator that
>>>>>>>>>> triggers the rule
>>>>>>>>>> >>>>> to fire.
>>>>>>>>>> >>>>>
>>>>>>>>>> >>>>> There are some special cases.
If the data is sorted as well
>>>>>>>>>> as
>>>>>>>>>> >>>>> partitioned (say because the
local aggregate uses a
>>>>>>>>>> sort-based
>>>>>>>>>> >>>>> algorithm) we could maybe use
a more efficient plan. And if
>>>>>>>>>> the
>>>>>>>>>> >>>>> partition key is the same as
the aggregation key we don't
>>>>>>>>>> need a
>>>>>>>>>> >>>>> summarizing Aggregate, just
a Union.
>>>>>>>>>> >>>>>
>>>>>>>>>> >>>>> It turns out not to be very
Phoenix-specific. In the
>>>>>>>>>> Drill-on-Phoenix
>>>>>>>>>> >>>>> scenario, once the Aggregate
has been pushed through the
>>>>>>>>>> Exchange
>>>>>>>>>> >>>>> (i.e. onto the drill-bit residing
on the region server) we
>>>>>>>>>> can then
>>>>>>>>>> >>>>> push the DrillAggregate across
the drill-to-phoenix
>>>>>>>>>> membrane and make
>>>>>>>>>> >>>>> it into a PhoenixServerAggregate
that executes in the
>>>>>>>>>> region server.
>>>>>>>>>> >>>>>
>>>>>>>>>> >>>>> Related issues:
>>>>>>>>>> >>>>> * https://issues.apache.org/jira/browse/DRILL-3840
>>>>>>>>>> >>>>> * https://issues.apache.org/jira/browse/CALCITE-751
>>>>>>>>>> >>>>>
>>>>>>>>>> >>>>> Julian
>>>>>>>>>> >>>>>
>>>>>>>>>> >>>>
>>>>>>>>>> >>>>
>>>>>>>>>> >>>
>>>>>>>>>> >>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message