drill-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Maryann Xue <maryann....@gmail.com>
Subject Re: Partial aggregation in Drill-on-Phoenix
Date Wed, 07 Oct 2015 20:11:43 GMT
Made another checkin for the pull request. All good now.

In order to compile and run, be sure to update the Phoenix project under
Julian's branch.


Thanks,
Maryann

On Wed, Oct 7, 2015 at 12:19 PM, Jacques Nadeau <jacques@dremio.com> wrote:

> I just filed a jira for the merge issue:
>
> https://issues.apache.org/jira/browse/DRILL-3907
>
> --
> Jacques Nadeau
> CTO and Co-Founder, Dremio
>
> On Wed, Oct 7, 2015 at 8:54 AM, Jacques Nadeau <jacques@dremio.com> wrote:
>
>> Drill doesn't currently have a merge-sort operation available outside the
>> context of an exchange. See here:
>>
>>
>> https://github.com/apache/drill/tree/master/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver
>>
>> We'll need to do a bit of refactoring to provide this functionality
>> outside the context of an exchange. The one other thing we'll have to think
>> about in this context is how do we avoid doing a n-way merge in the case
>> that the we're not using the collation.
>>
>> --
>> Jacques Nadeau
>> CTO and Co-Founder, Dremio
>>
>> On Wed, Oct 7, 2015 at 8:18 AM, Maryann Xue <maryann.xue@gmail.com>
>> wrote:
>>
>>> One thing from what I asked James offline yesterday, and maybe we can
>>> discuss a little bit in today's meeting:
>>>
>>> Phoenix uses a list of lists of Scan objects to indicate Region
>>> boundaries and guideposts, and if the top-level list contains more than one
>>> element it means that the results from different Scanner/ResultIterator
>>> should be merge-sorted. We now use this list in Drill integration to
>>> generate different batches or slices. I see from the Drill plan of a simple
>>> select like "SELECT * FROM A.BEER" that a Drill Sort node sits on top of
>>> the PhoenixTableScan. I guess this is a real sort rather than a merge-sort.
>>> So optimally,
>>> 1) this should be a merge-sort (to be more accurate, a merge)
>>> 2) furthermore, if Drill has something to indicate the order among
>>> slices and batches, we could even turn it into a concat.
>>>
>>> The structure of this Scan list might be helpful for 2), or we may have
>>> some Logical representation for this. Otherwise, we can simply flatten this
>>> list to a one-dimensional list as we do now (in my ci yesterday).
>>>
>>>
>>>
>>> Thanks,
>>> Maryann
>>>
>>> On Tue, Oct 6, 2015 at 9:52 PM, Maryann Xue <maryann.xue@gmail.com>
>>> wrote:
>>>
>>>> Yes, but the partially aggregated results will not contain any
>>>> duplicate rowkeys, since they are also group-by keys. What we need is the
>>>> aggregators and call aggregate for each row. We can write a new simpler
>>>> ResultIterator to replace this, but for now it should work correctly.
>>>>
>>>> On Tue, Oct 6, 2015 at 9:45 PM, James Taylor <jamestaylor@apache.org>
>>>> wrote:
>>>>
>>>>> The results we get back from the server-side scan are already the
>>>>> partial aggregated values we need. GroupedAggregatingResultIterator
>>>>> will collapse adjacent Tuples together which happen to have the same
row
>>>>> key. I'm not sure we want/need this to happen. Instead I think we just
need
>>>>> to decode the aggregated values directly from the result of the scan.
>>>>>
>>>>> On Tue, Oct 6, 2015 at 6:07 PM, Maryann Xue <maryann.xue@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi James,
>>>>>>
>>>>>> bq. A few questions for you: not sure I understand the changes you
>>>>>> made to PhoenixRecordReader. Is it necessary to wrap the server-side
scan
>>>>>> results in a GroupedAggregatingResultIterator? Each server-side scan
will
>>>>>> produce results with a single tuple per group by key. In Phoenix,
the
>>>>>> GroupedAggregatingResultIterator's function in life is to do the
final
>>>>>> merge. Note too that the results aren't sorted that come back from
the
>>>>>> aggregated scan (while GroupedAggregatingResultIterator needs tuples
sorted
>>>>>> by the group by key). Or is this just to help in decoding the values
coming
>>>>>> back from the scan?
>>>>>>
>>>>>> It is necessary. I suppose what we should return as a partial result
>>>>>> from PhoenixRecordReader is exactly the same as what we do in standalone
>>>>>> Phoenix+Calcite, except that the result is partial or say incomplete.
For
>>>>>> example, we have "select a, count(*) from t group by a", we should
return
>>>>>> rows that have "a" as the first expression value, and "count(*)"
as the
>>>>>> second expression value. For this "count" expression, it actually
needs a
>>>>>> ClientAggregator for evaluation, and that's what this
>>>>>> GroupedAggregatingResultIterator is used for.
>>>>>> Since "each server-side scan will produce results with a single
>>>>>> tuple per group by key", and PhoenixRecordReader is only dealing
with one
>>>>>> server-side result each time, we don't care how the group-by keys
are
>>>>>> arranged (ordered or unordered"). Actually
>>>>>> GroupedAggregatingResultIterator is not the group-by iterator we
use
>>>>>> for AggregatePlan. It does not "combine". It treats every row as
a
>>>>>> different group, by returning its rowkey as the group-by key (
>>>>>> GroupedAggregatingResultIterator.java:56).
>>>>>>
>>>>>> In short, this iterator is for decoding the server-side values. So
we
>>>>>> may want to optimize this logic by removing this serialization and
>>>>>> deserialization and having only one set of aggregators in future.
>>>>>>
>>>>>> bq. Also, not sure what impact it has in the way we "combine" the
>>>>>> scans in our Drill parallelization code (PhoenixGroupScan.applyAssignments()),
>>>>>> as each of our scans could include duplicate group by keys. Is it
ok to
>>>>>> combine them in this case?
>>>>>>
>>>>>> It should not matter, or at least is not related to the problem I'm
>>>>>> now having.
>>>>>>
>>>>>> bq. One more question: how is the group by key communicated back
to
>>>>>> Drill?
>>>>>>
>>>>>> According to the HashAggPrule, if it decides to create a two-phase
>>>>>> aggregate, the first phase is now handled by Phoenix (after applying
the
>>>>>> PhoenixHashAggPrule). I assume then the partial results gets shuffled
based
>>>>>> on the hash of their group-by keys (returned by PhoenixRecordReader).
The
>>>>>> final step is the Drill hash aggregation.
>>>>>>
>>>>>>
>>>>>> This is my test table "A.BEER", which has for columns: "B", "E1",
>>>>>> "E2", "R", all of INTEGER types. And the data is generated like this:
>>>>>> for (x=1 to N) { //currently N=1000
>>>>>>  UPSERT INTO A.BEER VALUES (x, x % 10, x % 100, x);
>>>>>> }
>>>>>>
>>>>>> The group-by query for testing is "SELECT e1, count(*) FROM a.beer
>>>>>> GROUP BY e1".
>>>>>> The expected result should be:
>>>>>> 0 100
>>>>>> 1 100
>>>>>> 2 100
>>>>>> 3 100
>>>>>> 4 100
>>>>>> 5 100
>>>>>> 6 100
>>>>>> 7 100
>>>>>> 8 100
>>>>>> 9 100
>>>>>> The actual result was:
>>>>>> 6 0
>>>>>> 7 0
>>>>>> 8 0
>>>>>> 9 0
>>>>>> 0 0
>>>>>> 1 100
>>>>>> 2 100
>>>>>> 3 100
>>>>>> 4 100
>>>>>> 5 100
>>>>>>
>>>>>> Here I just tried another one "SELECT e2, count(*) FROM a.beer GROUP
>>>>>> BY e2".
>>>>>> Similarly, the expected result should have group-by keys from 0 to
>>>>>> 99, each having a value of 10 as the count, while the actual result
was:
>>>>>> from group-by key 86 to 99, together with 0, their count values were
>>>>>> all 0; the rest (1 to 85) all had the correct value 10.
>>>>>>
>>>>>> Looks to me that the scans were good but there was a problem with
one
>>>>>> of the hash buckets.
>>>>>>
>>>>>> Thanks,
>>>>>> Maryann
>>>>>>
>>>>>>
>>>>>> On Tue, Oct 6, 2015 at 6:45 PM, James Taylor <jamestaylor@apache.org>
>>>>>> wrote:
>>>>>>
>>>>>>> Nice progress, Maryann.
>>>>>>>
>>>>>>> A few questions for you: not sure I understand the changes you
made
>>>>>>> to PhoenixRecordReader. Is it necessary to wrap the server-side
scan
>>>>>>> results in a GroupedAggregatingResultIterator? Each server-side
scan will
>>>>>>> produce results with a single tuple per group by key. In Phoenix,
the
>>>>>>> GroupedAggregatingResultIterator's function in life is to do
the final
>>>>>>> merge. Note too that the results aren't sorted that come back
from the
>>>>>>> aggregated scan (while GroupedAggregatingResultIterator needs
tuples sorted
>>>>>>> by the group by key). Or is this just to help in decoding the
values coming
>>>>>>> back from the scan?
>>>>>>>
>>>>>>> Also, not sure what impact it has in the way we "combine" the
scans
>>>>>>> in our Drill parallelization code (PhoenixGroupScan.applyAssignments()),
as
>>>>>>> each of our scans could include duplicate group by keys. Is it
ok to
>>>>>>> combine them in this case?
>>>>>>>
>>>>>>> One more question: how is the group by key communicated back
to
>>>>>>> Drill?
>>>>>>>
>>>>>>> Thanks,
>>>>>>> James
>>>>>>>
>>>>>>>
>>>>>>> On Tue, Oct 6, 2015 at 2:10 PM, Maryann Xue <maryann.xue@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Added a few fixes in the pull request. Tested with two regions,
>>>>>>>> turned out that half of the result is empty (count = 0).
>>>>>>>> Not sure if there's anything wrong with
>>>>>>>> https://github.com/maryannxue/drill/blob/phoenix_plugin/contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/rel/PhoenixHashAggPrule.java
>>>>>>>> .
>>>>>>>> Like Julian said, this rule looks a bit hacky.
>>>>>>>>
>>>>>>>> To force a 2-phase HashAgg, I made a temporary change as
well:
>>>>>>>>
>>>>>>>> diff --git
>>>>>>>> a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java
>>>>>>>> b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java
>>>>>>>>
>>>>>>>> index b911f6b..58bc918 100644
>>>>>>>>
>>>>>>>> ---
>>>>>>>> a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java
>>>>>>>>
>>>>>>>> +++
>>>>>>>> b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java
>>>>>>>>
>>>>>>>> @@ -60,12 +60,12 @@ public abstract class AggPruleBase extends
>>>>>>>> Prule {
>>>>>>>>
>>>>>>>>    // If any of the aggregate functions are not one of these,
then
>>>>>>>> we
>>>>>>>>
>>>>>>>>    // currently won't generate a 2 phase plan.
>>>>>>>>
>>>>>>>>    protected boolean create2PhasePlan(RelOptRuleCall call,
>>>>>>>> DrillAggregateRel aggregate) {
>>>>>>>>
>>>>>>>> -    PlannerSettings settings =
>>>>>>>> PrelUtil.getPlannerSettings(call.getPlanner());
>>>>>>>>
>>>>>>>> -    RelNode child = call.rel(0).getInputs().get(0);
>>>>>>>>
>>>>>>>> -    boolean smallInput = child.getRows() <
>>>>>>>> settings.getSliceTarget();
>>>>>>>>
>>>>>>>> -    if (! settings.isMultiPhaseAggEnabled() ||
>>>>>>>> settings.isSingleMode() || smallInput) {
>>>>>>>>
>>>>>>>> -      return false;
>>>>>>>>
>>>>>>>> -    }
>>>>>>>>
>>>>>>>> +//    PlannerSettings settings =
>>>>>>>> PrelUtil.getPlannerSettings(call.getPlanner());
>>>>>>>>
>>>>>>>> +//    RelNode child = call.rel(0).getInputs().get(0);
>>>>>>>>
>>>>>>>> +//    boolean smallInput = child.getRows() <
>>>>>>>> settings.getSliceTarget();
>>>>>>>>
>>>>>>>> +//    if (! settings.isMultiPhaseAggEnabled() ||
>>>>>>>> settings.isSingleMode() || smallInput) {
>>>>>>>>
>>>>>>>> +//      return false;
>>>>>>>>
>>>>>>>> +//    }
>>>>>>>>
>>>>>>>>
>>>>>>>>      for (AggregateCall aggCall : aggregate.getAggCallList())
{
>>>>>>>>
>>>>>>>>        String name = aggCall.getAggregation().getName();
>>>>>>>>
>>>>>>>>
>>>>>>>> Thanks,
>>>>>>>> Maryann
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Tue, Oct 6, 2015 at 2:31 PM, Julian Hyde <jhyde@apache.org>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Drill's current approach seems adequate for Drill alone
but
>>>>>>>>> extending
>>>>>>>>> it to a heterogenous system that includes Phoenix seems
like a
>>>>>>>>> hack.
>>>>>>>>>
>>>>>>>>> I think you should only create Prels for algebra nodes
that you
>>>>>>>>> know
>>>>>>>>> for sure are going to run on the Drill engine. If there's
a
>>>>>>>>> possibility that it would run in another engine such
as Phoenix
>>>>>>>>> then
>>>>>>>>> they should still be logical.
>>>>>>>>>
>>>>>>>>> On Tue, Oct 6, 2015 at 11:03 AM, Maryann Xue <
>>>>>>>>> maryann.xue@gmail.com> wrote:
>>>>>>>>> > The partial aggregate seems to be working now, with
one
>>>>>>>>> interface extension
>>>>>>>>> > and one bug fix in the Phoenix project. Will do
some code
>>>>>>>>> cleanup and
>>>>>>>>> > create a pull request soon.
>>>>>>>>> >
>>>>>>>>> > Still there was a hack in the Drill project which
I made to
>>>>>>>>> force 2-phase
>>>>>>>>> > aggregation. I'll try to fix that.
>>>>>>>>> >
>>>>>>>>> > Jacques, I have one question though, how can I verify
that there
>>>>>>>>> are more
>>>>>>>>> > than one slice and the shuffle happens?
>>>>>>>>> >
>>>>>>>>> >
>>>>>>>>> > Thanks,
>>>>>>>>> > Maryann
>>>>>>>>> >
>>>>>>>>> > On Mon, Oct 5, 2015 at 2:03 PM, James Taylor <
>>>>>>>>> jamestaylor@apache.org> wrote:
>>>>>>>>> >
>>>>>>>>> >> Maryann,
>>>>>>>>> >> I believe Jacques mentioned that a little bit
of refactoring is
>>>>>>>>> required
>>>>>>>>> >> for a merge sort to occur - there's something
that does that,
>>>>>>>>> but it's not
>>>>>>>>> >> expected to be used in this context currently.
>>>>>>>>> >>
>>>>>>>>> >> IMHO, there's more of a clear value in getting
the aggregation
>>>>>>>>> to use
>>>>>>>>> >> Phoenix first, so I'd recommend going down that
road as Jacques
>>>>>>>>> mentioned
>>>>>>>>> >> above if possible. Once that's working, we can
circle back to
>>>>>>>>> the partial
>>>>>>>>> >> sort.
>>>>>>>>> >>
>>>>>>>>> >> Thoughts?
>>>>>>>>> >> James
>>>>>>>>> >>
>>>>>>>>> >> On Mon, Oct 5, 2015 at 10:40 AM, Maryann Xue
<
>>>>>>>>> maryann.xue@gmail.com>
>>>>>>>>> >> wrote:
>>>>>>>>> >>
>>>>>>>>> >>> I actually tried implementing partial sort
with
>>>>>>>>> >>> https://github.com/jacques-n/drill/pull/4,
which I figured
>>>>>>>>> might be a
>>>>>>>>> >>> little easier to start with than partial
aggregation. But I
>>>>>>>>> found that even
>>>>>>>>> >>> though the code worked (returned the right
results), the Drill
>>>>>>>>> side sort
>>>>>>>>> >>> turned out to be a ordinary sort instead
of a merge which it
>>>>>>>>> should have
>>>>>>>>> >>> been. Any idea of how to fix that?
>>>>>>>>> >>>
>>>>>>>>> >>>
>>>>>>>>> >>> Thanks,
>>>>>>>>> >>> Maryann
>>>>>>>>> >>>
>>>>>>>>> >>> On Mon, Oct 5, 2015 at 12:52 PM, Jacques
Nadeau <
>>>>>>>>> jacques@dremio.com>
>>>>>>>>> >>> wrote:
>>>>>>>>> >>>
>>>>>>>>> >>>> Right now this type of work is done
here:
>>>>>>>>> >>>>
>>>>>>>>> >>>>
>>>>>>>>> >>>>
>>>>>>>>> https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrule.java
>>>>>>>>> >>>>
>>>>>>>>> >>>>
>>>>>>>>> https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java
>>>>>>>>> >>>>
>>>>>>>>> >>>> With Distribution Trait application
here:
>>>>>>>>> >>>>
>>>>>>>>> >>>>
>>>>>>>>> https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/DrillDistributionTraitDef.java
>>>>>>>>> >>>>
>>>>>>>>> >>>> To me, the easiest way to solve the
Phoenix issue is by
>>>>>>>>> providing a rule
>>>>>>>>> >>>> that matches HashAgg and StreamAgg but
requires Phoenix
>>>>>>>>> convention as
>>>>>>>>> >>>> input. It would replace everywhere but
would only be
>>>>>>>>> plannable when it is
>>>>>>>>> >>>> the first phase of aggregation.
>>>>>>>>> >>>>
>>>>>>>>> >>>> Thoughts?
>>>>>>>>> >>>>
>>>>>>>>> >>>>
>>>>>>>>> >>>>
>>>>>>>>> >>>> --
>>>>>>>>> >>>> Jacques Nadeau
>>>>>>>>> >>>> CTO and Co-Founder, Dremio
>>>>>>>>> >>>>
>>>>>>>>> >>>> On Thu, Oct 1, 2015 at 2:30 PM, Julian
Hyde <jhyde@apache.org>
>>>>>>>>> wrote:
>>>>>>>>> >>>>
>>>>>>>>> >>>>> Phoenix is able to perform quite
a few relational operations
>>>>>>>>> on the
>>>>>>>>> >>>>> region server: scan, filter, project,
aggregate, sort
>>>>>>>>> (optionally with
>>>>>>>>> >>>>> limit). However, the sort and aggregate
are necessarily
>>>>>>>>> "local". They
>>>>>>>>> >>>>> can only deal with data on that
region server, and there
>>>>>>>>> needs to be a
>>>>>>>>> >>>>> further operation to combine the
results from the region
>>>>>>>>> servers.
>>>>>>>>> >>>>>
>>>>>>>>> >>>>> The question is how to plan such
queries. I think the answer
>>>>>>>>> is an
>>>>>>>>> >>>>> AggregateExchangeTransposeRule.
>>>>>>>>> >>>>>
>>>>>>>>> >>>>> The rule would spot an Aggregate
on a data source that is
>>>>>>>>> split into
>>>>>>>>> >>>>> multiple locations (partitions)
and split it into a partial
>>>>>>>>> Aggregate
>>>>>>>>> >>>>> that computes sub-totals and a summarizing
Aggregate that
>>>>>>>>> combines
>>>>>>>>> >>>>> those totals.
>>>>>>>>> >>>>>
>>>>>>>>> >>>>> How does the planner know that the
Aggregate needs to be
>>>>>>>>> split? Since
>>>>>>>>> >>>>> the data's distribution has changed,
there would need to be
>>>>>>>>> an
>>>>>>>>> >>>>> Exchange operator. It is the Exchange
operator that triggers
>>>>>>>>> the rule
>>>>>>>>> >>>>> to fire.
>>>>>>>>> >>>>>
>>>>>>>>> >>>>> There are some special cases. If
the data is sorted as well
>>>>>>>>> as
>>>>>>>>> >>>>> partitioned (say because the local
aggregate uses a
>>>>>>>>> sort-based
>>>>>>>>> >>>>> algorithm) we could maybe use a
more efficient plan. And if
>>>>>>>>> the
>>>>>>>>> >>>>> partition key is the same as the
aggregation key we don't
>>>>>>>>> need a
>>>>>>>>> >>>>> summarizing Aggregate, just a Union.
>>>>>>>>> >>>>>
>>>>>>>>> >>>>> It turns out not to be very Phoenix-specific.
In the
>>>>>>>>> Drill-on-Phoenix
>>>>>>>>> >>>>> scenario, once the Aggregate has
been pushed through the
>>>>>>>>> Exchange
>>>>>>>>> >>>>> (i.e. onto the drill-bit residing
on the region server) we
>>>>>>>>> can then
>>>>>>>>> >>>>> push the DrillAggregate across the
drill-to-phoenix membrane
>>>>>>>>> and make
>>>>>>>>> >>>>> it into a PhoenixServerAggregate
that executes in the region
>>>>>>>>> server.
>>>>>>>>> >>>>>
>>>>>>>>> >>>>> Related issues:
>>>>>>>>> >>>>> * https://issues.apache.org/jira/browse/DRILL-3840
>>>>>>>>> >>>>> * https://issues.apache.org/jira/browse/CALCITE-751
>>>>>>>>> >>>>>
>>>>>>>>> >>>>> Julian
>>>>>>>>> >>>>>
>>>>>>>>> >>>>
>>>>>>>>> >>>>
>>>>>>>>> >>>
>>>>>>>>> >>
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message