hadoop-pig-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Pradeep Kamath (JIRA)" <j...@apache.org>
Subject [jira] Commented: (PIG-1038) Optimize nested distinct/sort to use secondary key
Date Tue, 10 Nov 2009 00:55:32 GMT

    [ https://issues.apache.org/jira/browse/PIG-1038?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=12775214#action_12775214
] 

Pradeep Kamath commented on PIG-1038:
-------------------------------------

Review comments:
In JobControlCompiler:
======================
The OutputValueGroupingComparator is a RawComparator - 
how are we ensuring that compare(WritableComparable a, WritableComparable b) is called?

{code}
622             if ((wa.getIndex() & PigNullableWritable.mqFlag) != 0) { // this is a
multi-query index  
{code}
Why do we only compare on index if this is true? The if-else in this block does not consider
the
case where both indices are same - is that by design?

{code}
653             } else if (wa.isNull() && wb.isNull()) {      
{code}
In this block the case where both indices are same is not considered - is that by design?

The change in src/org/apache/pig/impl/io/PigNullableWritable.java seems unrelated to the patch

In SecondaryKeyOptimizer.java:
==============================
{code}
154             else if (mapLeaf instanceof POUnion || mapLeaf instanceof POSplit) {
155                 List<PhysicalOperator> preds = mr.mapPlan.getPredecessors(mapLeaf);
156                 for (PhysicalOperator pred:preds)
157                 {
158                     if (pred instanceof POLocalRearrange)
159                     {
160                         SortKeyInfo sortKeyInfo = getSortKeyInfo((POLocalRearrange)pred);
161                         sortKeyInfos.add(sortKeyInfo);
162                     }
163                 }
{code}

If mapLeaf is a POSplit, the POSplit may have POLocalRearrange as the leaf (in multi query
optimized queries) - should we be handling those?
Also, getSortKeyInfo() can return a null - so all places where getSortKeyInfo() is called,
return value should be checked for null
{code}
98                 List<Integer> columns = new ArrayList<Integer>();
 99                 columns.add(rearrange.getIndex()&PigNullableWritable.idxSpace);
100                 columnChainInfo.insert(false, columns, DataType.TUPLE)
{code}
Why does the column chain start with the index of LocalRearrange and why is the type tuple?


{code}
102                 PhysicalOperator node = plan.getRoots().get(0);
103                 while (node!=null)
104                 {
105                     if (node instanceof POProject) {
106                         POProject project = (POProject)node;
107 
108                         columnChainInfo.insert(project.isStar(), project.getColumns(),
project.getResultType());
109 
110                         if (plan.getPredecessors(node)==null)
111                             node = null;
{code}
If node is initially the root, wouldn't plan.getPredecessors(node) always == null?

{code}
175         List<PhysicalOperator> reduceRoots = mr.reducePlan.getRoots();
176         if (reduceRoots.size() != 1) {
177             log.debug("Expected reduce to have single leaf");
178             return;
179         }
{code}
Did you mean to say "Expected reduce to have single root" ?

{code}
209             // Removed POSort, if the predecessor require a databag, we need to add a
PORelationToExprProject
{code}
Should the above comment read "if the successor requires ..." 

{code}
247                     throw new VisitorException("Sort on columns from different inputs.");
{code}
Should this exception follow Error Handling guidelines to include errorcode, and error source?

{code}
253             } else if (mapLeaf instanceof POUnion || mapLeaf instanceof POSplit){
254                 List<PhysicalOperator> preds = mr.mapPlan.getPredecessors(mapLeaf);
255                 for (PhysicalOperator pred:preds) {
256                     POLocalRearrange rearrange = (POLocalRearrange)pred;
257                     rearrange.setUseSecondaryKey(true);
258                     if (rearrange.getIndex()==indexOfRearrangeToChange)
259                         setSecondaryPlan(mr.mapPlan, rearrange, secondarySortKeyInfo);
260                 }
261             }
{code}
If mapLeaf is a POSplit, the POSplit may have POLocalRearrange as the leaf (in multi query
optimized queries) - should we be handling those?
Also in the if statement on line 258, what if the condition evaluates to false - shouwl we
throw an Exception like earlier in the same
method?

{code}
274                 for (int i=1;i<columnChainInfo.size();i++) {
{code}
A comment explaining why this loop starts from 1 would be useful

{code}
286                 if (secondaryPlan.isEmpty())
287                 {
{code}
A comment explaining when this would happen would be useful

There is ForEachVisitor which visits *all* ForEach ops in the reduce plan - is a visitor needed?
Shouldn't only the
foreach following the POPackage be processed?

372         SecondaryKeyDiscoverVisitor(PhysicalPlan plan, List<SortKeyInfo> sortKeyInfos,
SortKeyInfo secondarySortKeyInfo) {
It seems like the secondarySortKeyInfo passed in the constructor call is always null - is
that argument needed in the constructor?

431                 throw new VisitorException("POForEach has more than 1 input plans");
Should this exception follow Error Handling guidelines to include errorcode, and error source?

A test case should be added for the case of a non Project group by key like group by $0 +
$1 - I did not follow the code path for this case - we should ensure this works with
a nested sort in the foreach.

In ColumnChainInfo.java:
=========================

A comment on ColumnChainInfo and SortKeyInfo explaining how it tracks to POProjects in the
plan would be useful

{code}
42         ColumnInfo newColumnChainInfo = new ColumnInfo(star, columns, type);
{code}
I think it is better to name this variable as newColumnInfo since it the name 'newColumnChainInfo'
seems to 
suggest the object is of type ColumnChainInfo.

 52             newColumns.add(columns.get(0)-1);
The above code suggests that we rely on columns only having 1 element - at position 0. If
that is not
true, should the code handle the case where columns has > 1 element?

 53             ColumnInfo newColumnChainInfo = new ColumnInfo(star, newColumns, type);
I think it is better to name this variable as newColumnInfo since it the name 'newColumnChainInfo'
seems to 
suggest the object is of type ColumnChainInfo.

{code}
 85     public void normalize() {
 86         int i=size()-1;
 87         while (i>=0 && getColumnInfo(i).star) {
 88             columnInfos.remove(i);
 89             i--;
 90         }
 91     }
{code}
What is the above method used for?

In SortKeyInfo.java:
=====================
{code}
   public void insertColumnInfo(int index, ColumnChainInfo columnChainInfo)
    {
        if (columnChains.size()<=index)
        {
            columnChains.add(columnChainInfo);
            return;
        }
        
        ColumnChainInfo chain = columnChains.get(index);
        chain.insertColumnChainInfo(columnChainInfo);
    }
{code}
Should this method be named "insertColumnChainInfo" instead? When would the if condition evaluate
to false?

{code}
    public void normalize() {
        if (columnChains.size()==1)
            columnChains.get(0).normalize();
    }
{code}
What does the above do?

In POSortedDistinct.java:
=========================
{code}
75             if (in.returnStatus == POStatus.STATUS_EOP) {
 76                 if (!isAccumulative() || !isAccumStarted()) {
 77                     lastTuple = null;
 78                 }
 79                 return in;
 80             }
{code}
what do the checks for isAccumulative() and is AccumStarted do?

In POLocalRearrange.java:
=========================
Is there a way to refactor the code so that setPlans() and setSecondayPlans() can share common
code?

In POMultiQueryPackage.java:
============================
In setUseSecondaryKey(), the code, sets the flag to true in all underlying packages irrespective
of whether the argument to
the method is true or false. Also I am wondering if in multiquery optimization, some subqueries
need secondary key and some don't,
is that use case handled?





> Optimize nested distinct/sort to use secondary key
> --------------------------------------------------
>
>                 Key: PIG-1038
>                 URL: https://issues.apache.org/jira/browse/PIG-1038
>             Project: Pig
>          Issue Type: Improvement
>          Components: impl
>    Affects Versions: 0.4.0
>            Reporter: Olga Natkovich
>            Assignee: Daniel Dai
>             Fix For: 0.6.0
>
>         Attachments: PIG-1038-1.patch, PIG-1038-2.patch
>
>
> If nested foreach plan contains sort/distinct, it is possible to use hadoop secondary
sort instead of SortedDataBag and DistinctDataBag to optimize the query. 
> Eg1:
> A = load 'mydata';
> B = group A by $0;
> C = foreach B {
>     D = order A by $1;
>     generate group, D;
> }
> store C into 'myresult';
> We can specify a secondary sort on A.$1, and drop "order A by $1".
> Eg2:
> A = load 'mydata';
> B = group A by $0;
> C = foreach B {
>     D = A.$1;
>     E = distinct D;
>     generate group, E;
> }
> store C into 'myresult';
> We can specify a secondary sort key on A.$1, and simplify "D=A.$1; E=distinct D" to a
special version of distinct, which does not do the sorting.

-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.


Mime
View raw message