drill-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From meh...@apache.org
Subject [3/3] drill git commit: DRILL-2107: Fix hash aggregate to return OK_NEW_SCHEMA only once. Existing tests will serve as unit tests once swap join rule is enabled
Date Thu, 12 Feb 2015 02:17:29 GMT
DRILL-2107: Fix hash aggregate to return OK_NEW_SCHEMA only once.
Existing tests will serve as unit tests once swap join rule is enabled


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/b79f7661
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/b79f7661
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/b79f7661

Branch: refs/heads/master
Commit: b79f76619b5290e465921219fcd09091a3d93607
Parents: 9f9fb6c
Author: Mehant Baid <mehantr@gmail.com>
Authored: Sat Feb 7 19:14:01 2015 -0800
Committer: Mehant Baid <mehantr@gmail.com>
Committed: Wed Feb 11 17:00:48 2015 -0800

----------------------------------------------------------------------
 .../physical/impl/aggregate/HashAggBatch.java   | 38 ++------------------
 .../impl/aggregate/HashAggTemplate.java         | 31 ++++++----------
 2 files changed, 13 insertions(+), 56 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/b79f7661/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
index 0e2a017..5946d43 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
@@ -49,6 +49,7 @@ import org.apache.drill.exec.record.TypedFieldId;
 import org.apache.drill.exec.record.VectorWrapper;
 import org.apache.drill.exec.record.selection.SelectionVector2;
 import org.apache.drill.exec.record.selection.SelectionVector4;
+import org.apache.drill.exec.vector.AllocationHelper;
 import org.apache.drill.exec.vector.ValueVector;
 
 import com.sun.codemodel.JExpr;
@@ -101,47 +102,12 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate>
{
       state = BatchState.DONE;
     }
     for (VectorWrapper w : container) {
-      w.getValueVector().allocateNew();
+      AllocationHelper.allocatePrecomputedChildCount(w.getValueVector(), 0, 0, 0);
     }
   }
 
   @Override
   public IterOutcome innerNext() {
-    // this is only called on the first batch. Beyond this, the aggregator manages batches.
-    if (aggregator == null || state == BatchState.FIRST) {
-      if (aggregator != null) {
-        aggregator.cleanup();
-      }
-      IterOutcome outcome;
-      if (state == BatchState.FIRST) {
-        state = BatchState.NOT_FIRST;
-        outcome = IterOutcome.OK;
-      } else {
-        outcome = next(incoming);
-      }
-      if (outcome == IterOutcome.OK) {
-        outcome = IterOutcome.OK_NEW_SCHEMA;
-      }
-      logger.debug("Next outcome of {}", outcome);
-      switch (outcome) {
-      case NONE:
-        //        throw new UnsupportedOperationException("Received NONE on first batch");
-        return outcome;
-      case NOT_YET:
-      case STOP:
-        return outcome;
-      case OK_NEW_SCHEMA:
-        if (!createAggregator()) {
-          state = BatchState.DONE;
-          return IterOutcome.STOP;
-        }
-        break;
-      case OK:
-        break;
-      default:
-        throw new IllegalStateException(String.format("unknown outcome %s", outcome));
-      }
-    }
 
     if (aggregator.allFlushed()) {
       return IterOutcome.NONE;

http://git-wip-us.apache.org/repos/asf/drill/blob/b79f7661/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
index 73cadb2..87cd4d6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
@@ -65,12 +65,12 @@ public abstract class HashAggTemplate implements HashAggregator {
 
   private static final long ALLOCATOR_INITIAL_RESERVATION = 1 * 1024 * 1024;
   private static final long ALLOCATOR_MAX_RESERVATION = 20L * 1000 * 1000 * 1000;
+  private static final int VARIABLE_WIDTH_VALUE_SIZE = 50;
 
   private static final boolean EXTRA_DEBUG_1 = false;
   private static final boolean EXTRA_DEBUG_2 = false;
   private static final String TOO_BIG_ERROR =
       "Couldn't add value to an empty batch.  This likely means that a single value is too
long for a varlen field.";
-  private boolean first = true;
   private boolean newSchema = false;
   private int underlyingIndex = 0;
   private int currentIndex = 0;
@@ -370,9 +370,6 @@ public abstract class HashAggTemplate implements HashAggregator {
         }
       }
     } finally {
-      if (first) {
-        first = !first;
-      }
     }
   }
 
@@ -386,11 +383,13 @@ public abstract class HashAggTemplate implements HashAggregator {
     while (outgoingIter.hasNext()) {
       ValueVector vv = outgoingIter.next().getValueVector();
       MajorType type = vv.getField().getType();
-      if (!Types.isFixedWidthType(type) || Types.isRepeated(type)) {
-        vv.allocateNew();
-      } else {
-        AllocationHelper.allocate(vv, records, 1);
-      }
+
+      /*
+       * In build schema we use the allocation model that specifies exact record count
+       * so we need to stick with that allocation model until DRILL-2211 is resolved. Using
+       * 50 as the average bytes per value as is used in HashTable.
+       */
+      AllocationHelper.allocatePrecomputedChildCount(vv, records, VARIABLE_WIDTH_VALUE_SIZE,
0);
     }
   }
 
@@ -426,11 +425,7 @@ public abstract class HashAggTemplate implements HashAggregator {
   }
 
   private final AggOutcome setOkAndReturn() {
-    if (first) {
-      this.outcome = IterOutcome.OK_NEW_SCHEMA;
-    } else {
-      this.outcome = IterOutcome.OK;
-    }
+    this.outcome = IterOutcome.OK;
     for (VectorWrapper<?> v : outgoing) {
       v.getValueVector().getMutator().setValueCount(outputCount);
     }
@@ -471,7 +466,7 @@ public abstract class HashAggTemplate implements HashAggregator {
     // get the number of records in the batch holder that are pending output
     int numPendingOutput = batchHolders.get(outBatchIndex).getNumPendingOutput();
 
-    if (!first && numPendingOutput == 0) {
+    if (numPendingOutput == 0) {
       this.outcome = IterOutcome.NONE;
       return outcome;
     }
@@ -493,11 +488,7 @@ public abstract class HashAggTemplate implements HashAggregator {
 
     outputCount += numOutputRecords;
 
-    if (first) {
-      this.outcome = IterOutcome.OK_NEW_SCHEMA;
-    } else {
-      this.outcome = IterOutcome.OK;
-    }
+    this.outcome = IterOutcome.OK;
 
     logger.debug("HashAggregate: Output current batch index {} with {} records.", outBatchIndex,
numOutputRecords);
 


Mime
View raw message