drill-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From adene...@apache.org
Subject drill git commit: DRILL-3952: Improve Window Functions performance when not all batches are required to process the current batch
Date Thu, 05 Nov 2015 04:24:46 GMT
Repository: drill
Updated Branches:
  refs/heads/master 4be8369cf -> ef1cb7288


DRILL-3952: Improve Window Functions performance when not all batches are required to process
the current batch

this closes #222


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/ef1cb728
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/ef1cb728
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/ef1cb728

Branch: refs/heads/master
Commit: ef1cb728853603414665280a389d8861c22a940b
Parents: 4be8369
Author: adeneche <adeneche@gmail.com>
Authored: Thu Oct 22 14:08:10 2015 -0700
Committer: adeneche <hakim@ADENECHE.local>
Committed: Wed Nov 4 20:24:31 2015 -0800

----------------------------------------------------------------------
 .../impl/window/DefaultFrameTemplate.java       | 143 ++++++++-----------
 .../exec/physical/impl/window/Partition.java    |  33 ++---
 .../impl/window/WindowFrameRecordBatch.java     |  68 ++++++++-
 .../exec/physical/impl/window/WindowFramer.java |  40 ++++--
 .../physical/impl/window/WindowFunction.java    |  89 ++++++++++++
 .../physical/impl/window/TestWindowFrame.java   |   3 +-
 6 files changed, 258 insertions(+), 118 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/ef1cb728/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/DefaultFrameTemplate.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/DefaultFrameTemplate.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/DefaultFrameTemplate.java
index 9bde6a5..d2ee9f1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/DefaultFrameTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/DefaultFrameTemplate.java
@@ -25,11 +25,9 @@ import org.apache.drill.exec.record.VectorAccessible;
 import org.apache.drill.exec.record.VectorContainer;
 import org.apache.drill.exec.record.VectorWrapper;
 import org.apache.drill.exec.vector.BaseDataValueVector;
-import org.apache.drill.exec.vector.BaseValueVector;
 import org.apache.drill.exec.vector.ValueVector;
 
 import javax.inject.Named;
-import java.util.Iterator;
 import java.util.List;
 
 
@@ -44,6 +42,9 @@ public abstract class DefaultFrameTemplate implements WindowFramer {
 
   private int frameLastRow;
 
+  // true when at least one window function needs to process all batches of a partition before
passing any batch downstream
+  private boolean requireFullPartition;
+
   /**
    * current partition being processed.</p>
    * Can span over multiple batches, so we may need to keep it between calls to doWork()
@@ -51,8 +52,8 @@ public abstract class DefaultFrameTemplate implements WindowFramer {
   private Partition partition;
 
   @Override
-  public void setup(final List<WindowDataBatch> batches, final VectorContainer container,
final OperatorContext oContext)
-      throws SchemaChangeException {
+  public void setup(final List<WindowDataBatch> batches, final VectorContainer container,
final OperatorContext oContext,
+                    final boolean requireFullPartition) throws SchemaChangeException {
     this.container = container;
     this.batches = batches;
 
@@ -62,6 +63,8 @@ public abstract class DefaultFrameTemplate implements WindowFramer {
 
     outputCount = 0;
     partition = null;
+
+    this.requireFullPartition = requireFullPartition;
   }
 
   private void allocateOutgoing() {
@@ -108,6 +111,11 @@ public abstract class DefaultFrameTemplate implements WindowFramer {
 
         // we have a pending window we need to handle from a previous call to doWork()
         logger.trace("we have a pending partition {}", partition);
+
+        if (!requireFullPartition) {
+          // we didn't compute the whole partition length in the previous partition, we need
to update the length now
+          updatePartitionSize(partition, currentRow);
+        }
       } else {
         newPartition(current, currentRow);
       }
@@ -137,8 +145,9 @@ public abstract class DefaultFrameTemplate implements WindowFramer {
   }
 
   private void newPartition(final WindowDataBatch current, final int currentRow) throws SchemaChangeException
{
-    final long length = computePartitionSize(currentRow);
-    partition = new Partition(length);
+    partition = new Partition();
+    updatePartitionSize(partition, currentRow);
+
     setupPartition(current, container);
     copyFirstValueToInternal(currentRow);
   }
@@ -219,8 +228,8 @@ public abstract class DefaultFrameTemplate implements WindowFramer {
   private void processRow(final int row) throws DrillException {
     if (partition.isFrameDone()) {
       // because all peer rows share the same frame, we only need to compute and aggregate
the frame once
-      partition.newFrame(countPeers(row));
-      aggregatePeers(row);
+      final long peers = aggregatePeers(row);
+      partition.newFrame(peers);
     }
 
     outputRow(row, partition);
@@ -230,116 +239,86 @@ public abstract class DefaultFrameTemplate implements WindowFramer
{
   }
 
   /**
-   * @return number of rows that are part of the partition starting at row start of first
batch
+   * updates partition's length after computing the number of rows for the current the partition
starting at the specified
+   * row of the first batch. If !requiresFullPartition, this method will only count the rows
in the current batch
    */
-  private long computePartitionSize(final int start) {
+  private void updatePartitionSize(final Partition partition, final int start) {
     logger.trace("compute partition size starting from {} on {} batches", start, batches.size());
 
     // current partition always starts from first batch
     final VectorAccessible first = getCurrent();
 
     long length = 0;
+    boolean lastBatch = false;
+    int row = start;
 
     // count all rows that are in the same partition of start
-    // keep increasing length until we find first row of next partition or we reach the very
-    // last batch
+    // keep increasing length until we find first row of next partition or we reach the very
last batch
+
+    outer:
     for (WindowDataBatch batch : batches) {
       final int recordCount = batch.getRecordCount();
 
       // check first container from start row, and subsequent containers from first row
-      for (int row = (batch == first) ? start : 0; row < recordCount; row++, length++)
{
+      for (; row < recordCount; row++, length++) {
         if (!isSamePartition(start, first, row, batch)) {
-          return length;
+          break outer;
         }
       }
+
+      if (!requireFullPartition) {
+        // we are only interested in the first batch's records
+        break;
+      }
+
+      row = 0;
     }
 
-    return length;
+    if (!requireFullPartition) {
+      // this is the last batch of current partition if
+      lastBatch = row < outputCount                           // partition ends before
the end of the batch
+        || batches.size() == 1                                // it's the last available
batch
+        || !isSamePartition(start, first, 0, batches.get(1)); // next batch contains a different
partition
+    }
+
+    partition.updateLength(length, !(requireFullPartition || lastBatch));
   }
 
   /**
-   * Counts how many rows are peer with the first row of the current frame
-   * @param start first row of current frame
-   * @return number of peer rows
+   * aggregates all peer rows of current row
+   * @param start starting row of the current frame
+   * @return num peer rows for current row
+   * @throws SchemaChangeException
    */
-  private int countPeers(final int start) {
+  private long aggregatePeers(final int start) throws SchemaChangeException {
+    logger.trace("aggregating rows starting from {}", start);
+
     // current frame always starts from first batch
     final VectorAccessible first = getCurrent();
+    VectorAccessible last = first;
+    long length = 0;
 
-    int length = 0;
-
-    // count all rows that are in the same frame of starting row
-    // keep increasing length until we find first non peer row we reach the very
-    // last batch
+    // a single frame can include rows from multiple batches
+    // start processing first batch and, if necessary, move to next batches
     for (WindowDataBatch batch : batches) {
+      setupEvaluatePeer(batch, container);
       final int recordCount = batch.getRecordCount();
 
       // for every remaining row in the partition, count it if it's a peer row
-      final long remaining = partition.getRemaining();
-      for (int row = (batch == first) ? start : 0; row < recordCount && length
< remaining; row++, length++) {
+      for (int row = (batch == first) ? start : 0; row < recordCount; row++, length++)
{
         if (!isPeer(start, first, row, batch)) {
-          return length;
+          break;
         }
-      }
-    }
-
-    return length;
-  }
 
-  /**
-   * aggregates all peer rows of current row
-   * @param currentRow starting row of the current frame
-   * @throws SchemaChangeException
-   */
-  private void aggregatePeers(final int currentRow) throws SchemaChangeException {
-    logger.trace("aggregating {} rows starting from {}", partition.getPeers(), currentRow);
-    assert !partition.isFrameDone() : "frame is empty!";
-
-    // a single frame can include rows from multiple batches
-    // start processing first batch and, if necessary, move to next batches
-    Iterator<WindowDataBatch> iterator = batches.iterator();
-    WindowDataBatch current = iterator.next();
-    setupEvaluatePeer(current, container);
-
-    final int peers = partition.getPeers();
-    int row = currentRow;
-    for (int i = 0; i < peers; i++, row++) {
-      if (row >= current.getRecordCount()) {
-        // we reached the end of the current batch, move to the next one
-        current = iterator.next();
-        setupEvaluatePeer(current, container);
-        row = 0;
+        evaluatePeer(row);
+        last = batch;
+        frameLastRow = row;
       }
-
-      evaluatePeer(row);
     }
 
-    // last row of current frame
-    setupReadLastValue(current, container);
-    frameLastRow = row - 1;
-  }
-
-  @Override
-  public boolean canDoWork() {
-    // check if we can process a saved batch
-    if (batches.size() < 2) {
-      logger.trace("we don't have enough batches to proceed, fetch next batch");
-      return false;
-    }
+    setupReadLastValue(last, container);
 
-    final VectorAccessible current = getCurrent();
-    final int currentSize = current.getRecordCount();
-    final VectorAccessible last = batches.get(batches.size() - 1);
-    final int lastSize = last.getRecordCount();
-
-    if (!isSamePartition(currentSize - 1, current, lastSize - 1, last)
-        /*|| !isPeer(currentSize - 1, current, lastSize - 1, last)*/) {
-      logger.trace("partition changed, we are ready to process first saved batch");
-      return true;
-    } else {
-      logger.trace("partition didn't change, fetch next batch");
-      return false;
-    }
+    return length;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/drill/blob/ef1cb728/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/Partition.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/Partition.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/Partition.java
index 52c7790..66cf720 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/Partition.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/Partition.java
@@ -18,18 +18,21 @@
 package org.apache.drill.exec.physical.impl.window;
 
 /**
- * Used internally to keep track of partitions and frames
+ * Used internally to keep track of partitions and frames.<br>
+ * A partition can be partial, which means we don't know "yet" the total number of records
that are part of this partition.
+ * Even for partial partitions, we know the number of rows that are part of current frame
  */
 public class Partition {
   private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(Partition.class);
 
-  private final long length; // size of this partition
-  private long remaining;
+  private boolean partial; // true if we don't know yet the full length of this partition
+  private long length; // size of this partition (if partial is true, then this is a partial
length of the partition)
+  private long remaining; // remaining non-processed rows in this partition
 
-  private int peers; // remaining non-processed peers in current frame
+  private long peers; // remaining non-processed peers in current frame
 
   // we keep these attributes public because the generated code needs to access them
-  public int row_number;
+  public int row_number = 1;
   public int rank;
   public int dense_rank;
   public double percent_rank;
@@ -43,16 +46,14 @@ public class Partition {
   }
 
   /**
-   * @return peer rows not yet aggregated in current frame
+   * @param length number of rows in this partition
+   * @param partial if true, then length is not the full length of the partition but just
the number of rows in the
+   *                current batch
    */
-  public int getPeers() {
-    return peers;
-  }
-
-  public Partition(long length) {
-    this.length = length;
-    remaining = length;
-    row_number = 1;
+  public void updateLength(long length, boolean partial) {
+    this.length += length;
+    this.partial = partial;
+    remaining += length;
   }
 
   public void rowAggregated() {
@@ -62,7 +63,7 @@ public class Partition {
     row_number++;
   }
 
-  public void newFrame(int peers) {
+  public void newFrame(long peers) {
     this.peers = peers;
 
     rank = row_number; // rank = row number of 1st peer
@@ -72,7 +73,7 @@ public class Partition {
   }
 
   public boolean isDone() {
-    return remaining == 0;
+    return !partial && remaining == 0;
   }
 
   public int ntile(int numTiles) {

http://git-wip-us.apache.org/repos/asf/drill/blob/ef1cb728/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFrameRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFrameRecordBatch.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFrameRecordBatch.java
index 701ead5..da13669 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFrameRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFrameRecordBatch.java
@@ -20,6 +20,7 @@ package org.apache.drill.exec.physical.impl.window;
 import java.io.IOException;
 import java.util.List;
 
+import com.google.common.collect.Iterables;
 import org.apache.drill.common.exceptions.DrillException;
 import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.common.expression.FunctionCall;
@@ -55,11 +56,16 @@ public class WindowFrameRecordBatch extends AbstractRecordBatch<WindowPOP>
{
 
   private final RecordBatch incoming;
   private List<WindowDataBatch> batches;
+
   private WindowFramer framer;
+  private boolean hasOrderBy; // true if window definition contains an order-by clause
+  private final List<WindowFunction> functions = Lists.newArrayList();
 
   private boolean noMoreBatches;
   private BatchSchema schema;
 
+  private boolean shouldStop;
+
   public WindowFrameRecordBatch(WindowPOP popConfig, FragmentContext context, RecordBatch
incoming) throws OutOfMemoryException {
     super(popConfig, context);
     this.incoming = incoming;
@@ -112,8 +118,23 @@ public class WindowFrameRecordBatch extends AbstractRecordBatch<WindowPOP>
{
       return IterOutcome.NONE;
     }
 
+    if (shouldStop) {
+      if (!noMoreBatches) {
+        IterOutcome upstream = next(incoming);
+        while (upstream == IterOutcome.OK || upstream == IterOutcome.OK_NEW_SCHEMA) {
+          // Clear the memory for the incoming batch
+          for (VectorWrapper<?> wrapper : incoming) {
+            wrapper.getValueVector().clear();
+          }
+          upstream = next(incoming);
+        }
+      }
+
+      return IterOutcome.NONE;
+    }
+
     // keep saving incoming batches until the first unprocessed batch can be processed, or
upstream == NONE
-    while (!noMoreBatches && !framer.canDoWork()) {
+    while (!noMoreBatches && !canDoWork()) {
       IterOutcome upstream = next(incoming);
       logger.trace("next(incoming) returned {}", upstream);
 
@@ -137,8 +158,6 @@ public class WindowFrameRecordBatch extends AbstractRecordBatch<WindowPOP>
{
         case OK:
           if (incoming.getRecordCount() > 0) {
             batches.add(new WindowDataBatch(incoming, oContext));
-          } else {
-            logger.trace("incoming has 0 records, it won't be added to batches");
           }
           break;
         default:
@@ -168,6 +187,34 @@ public class WindowFrameRecordBatch extends AbstractRecordBatch<WindowPOP>
{
     return IterOutcome.OK;
   }
 
+  /**
+   * @return true when all window functions are ready to process the current batch (it's
the first batch currently
+   * held in memory)
+   */
+  private boolean canDoWork() {
+    if (batches.size() < 2) {
+      // we need at least 2 batches even when window functions only need one batch, so we
can detect the end of the
+      // current partition
+      return false;
+    }
+
+    final VectorAccessible current = batches.get(0);
+    final int currentSize = current.getRecordCount();
+    final VectorAccessible last = batches.get(batches.size() - 1);
+    final int lastSize = last.getRecordCount();
+
+    final boolean partitionEndReached = !framer.isSamePartition(currentSize - 1, current,
lastSize - 1, last);
+    final boolean frameEndReached = partitionEndReached || !framer.isPeer(currentSize - 1,
current, lastSize - 1, last);
+
+    for (final WindowFunction function : functions) {
+      if (!function.canDoWork(batches.size(), hasOrderBy, frameEndReached, partitionEndReached))
{
+        return false;
+      }
+    }
+
+    return true;
+  }
+
   @Override
   protected void buildSchema() throws SchemaChangeException {
     logger.trace("buildSchema()");
@@ -203,10 +250,14 @@ public class WindowFrameRecordBatch extends AbstractRecordBatch<WindowPOP>
{
 
     final List<LogicalExpression> keyExprs = Lists.newArrayList();
     final List<LogicalExpression> orderExprs = Lists.newArrayList();
-    final List<WindowFunction> functions = Lists.newArrayList();
+    boolean requireFullPartition = false;
 
     container.clear();
 
+    functions.clear();
+
+    hasOrderBy = popConfig.getOrderings().length > 0;
+
     // all existing vectors will be transferred to the outgoing container in framer.doWork()
     for (final VectorWrapper<?> wrapper : batch) {
       container.addOrGet(wrapper.getField());
@@ -224,6 +275,7 @@ public class WindowFrameRecordBatch extends AbstractRecordBatch<WindowPOP>
{
       final WindowFunction winfun = WindowFunction.fromExpression(call);
       if (winfun.materialize(ne, container, context.getFunctionRegistry())) {
         functions.add(winfun);
+        requireFullPartition |= winfun.requiresFullPartition(hasOrderBy);
       }
     }
 
@@ -242,7 +294,7 @@ public class WindowFrameRecordBatch extends AbstractRecordBatch<WindowPOP>
{
     }
 
     final WindowFramer framer = generateFramer(keyExprs, orderExprs, functions);
-    framer.setup(batches, container, oContext);
+    framer.setup(batches, container, oContext, requireFullPartition);
 
     return framer;
   }
@@ -264,7 +316,8 @@ public class WindowFrameRecordBatch extends AbstractRecordBatch<WindowPOP>
{
       final GeneratorMapping IS_SAME_PEER_READ = GeneratorMapping.create("isPeer", "isPeer",
null, null);
       final MappingSet isaP1 = new MappingSet("b1Index", null, "b1", null, IS_SAME_PEER_READ,
IS_SAME_PEER_READ);
       final MappingSet isaP2 = new MappingSet("b2Index", null, "b2", null, IS_SAME_PEER_READ,
IS_SAME_PEER_READ);
-      setupIsFunction(cg, orderExprs, isaP1, isaP2);
+      // isPeer also checks if it's the same partition
+      setupIsFunction(cg, Iterables.concat(keyExprs, orderExprs), isaP1, isaP2);
     }
 
     for (final WindowFunction function : functions) {
@@ -279,7 +332,7 @@ public class WindowFrameRecordBatch extends AbstractRecordBatch<WindowPOP>
{
   /**
    * setup comparison functions isSamePartition and isPeer
    */
-  private void setupIsFunction(final ClassGenerator<WindowFramer> cg, final List<LogicalExpression>
exprs,
+  private void setupIsFunction(final ClassGenerator<WindowFramer> cg, final Iterable<LogicalExpression>
exprs,
                                final MappingSet leftMapping, final MappingSet rightMapping)
{
     cg.setMappingSet(leftMapping);
     for (LogicalExpression expr : exprs) {
@@ -323,6 +376,7 @@ public class WindowFrameRecordBatch extends AbstractRecordBatch<WindowPOP>
{
 
   @Override
   protected void killIncoming(boolean sendUpstream) {
+    shouldStop = true;
     incoming.kill(sendUpstream);
   }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/ef1cb728/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFramer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFramer.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFramer.java
index 1d1d746..719d5a8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFramer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFramer.java
@@ -21,15 +21,17 @@ import org.apache.drill.common.exceptions.DrillException;
 import org.apache.drill.exec.compile.TemplateClassDefinition;
 import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.record.VectorAccessible;
 import org.apache.drill.exec.record.VectorContainer;
 
+import javax.inject.Named;
 import java.util.List;
 
 public interface WindowFramer {
   TemplateClassDefinition<WindowFramer> TEMPLATE_DEFINITION = new TemplateClassDefinition<>(WindowFramer.class,
DefaultFrameTemplate.class);
 
-  void setup(final List<WindowDataBatch> batches, final VectorContainer container,
final OperatorContext operatorContext)
-    throws SchemaChangeException;
+  void setup(final List<WindowDataBatch> batches, final VectorContainer container,
final OperatorContext operatorContext,
+             final boolean requireFullPartition) throws SchemaChangeException;
 
   /**
    * process the inner batch and write the aggregated values in the container
@@ -38,19 +40,33 @@ public interface WindowFramer {
   void doWork() throws DrillException;
 
   /**
-   * check if current batch can be processed:
-   * <ol>
-   *   <li>we have at least 2 saved batches</li>
-   *   <li>last partition of current batch ended</li>
-   * </ol>
-   * @return true if current batch can be processed, false otherwise
-   */
-  boolean canDoWork();
-
-  /**
    * @return number rows processed in last batch
    */
   int getOutputCount();
 
   void cleanup();
+
+  /**
+   * compares two rows from different batches (can be the same), if they have the same value
for the partition by
+   * expression
+   * @param b1Index index of first row
+   * @param b1 batch for first row
+   * @param b2Index index of second row
+   * @param b2 batch for second row
+   * @return true if the rows are in the same partition
+   */
+  boolean isSamePartition(@Named("b1Index") int b1Index, @Named("b1") VectorAccessible b1,
+                                          @Named("b2Index") int b2Index, @Named("b2") VectorAccessible
b2);
+
+  /**
+   * compares two rows from different batches (can be the same), if they have the same value
for the order by
+   * expression
+   * @param b1Index index of first row
+   * @param b1 batch for first row
+   * @param b2Index index of second row
+   * @param b2 batch for second row
+   * @return true if the rows are in the same partition
+   */
+  boolean isPeer(@Named("b1Index") int b1Index, @Named("b1") VectorAccessible b1,
+                                 @Named("b2Index") int b2Index, @Named("b2") VectorAccessible
b2);
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/ef1cb728/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFunction.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFunction.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFunction.java
index 5e65340..548809b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFunction.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFunction.java
@@ -92,6 +92,28 @@ public abstract class WindowFunction {
 
   abstract void generateCode(final ClassGenerator<WindowFramer> cg);
 
+  /**
+   * @param hasOrderBy window definition contains an ORDER BY clause
+   * @return true if this window function requires all batches of current partition to be
available before processing
+   * the first batch
+   */
+  public boolean requiresFullPartition(final boolean hasOrderBy) {
+    return true;
+  }
+
+  /**
+   * @param numBatchesAvailable number of batches available for current partition
+   * @param hasOrderBy window definition contains an ORDER BY clause
+   * @param frameEndReached we found the last row of the first batch's frame
+   * @param partitionEndReached all batches of current partition are available
+   *
+   * @return true if this window function can process the first batch immediately
+   */
+  public boolean canDoWork(final int numBatchesAvailable, final boolean hasOrderBy, final
boolean frameEndReached,
+                           final boolean partitionEndReached) {
+    return partitionEndReached;
+  }
+
   abstract boolean materialize(final NamedExpression ne, final VectorContainer batch, final
FunctionLookupContext registry)
     throws SchemaChangeException;
 
@@ -129,6 +151,16 @@ public abstract class WindowFunction {
       cg.setMappingSet(mappingSet);
       cg.addExpr(writeAggregationToOutput);
     }
+
+    @Override
+    public boolean requiresFullPartition(final boolean hasOrderBy) {
+      return !hasOrderBy;
+    }
+
+    @Override
+    public boolean canDoWork(int numBatchesAvailable, boolean hasOrderBy, boolean frameEndReached,
boolean partitionEndReached) {
+      return partitionEndReached || (hasOrderBy && frameEndReached);
+    }
   }
 
   static class Ranking extends WindowFunction {
@@ -171,6 +203,21 @@ public abstract class WindowFunction {
       fieldId = batch.getValueVectorId(ne.getRef());
       return true;
     }
+
+    @Override
+    public boolean requiresFullPartition(final boolean hasOrderBy) {
+      // CUME_DIST, PERCENT_RANK and NTILE require the length of current partition before
processing it's first batch
+      return type == Type.CUME_DIST || type == Type.PERCENT_RANK || type == Type.NTILE;
+    }
+
+    @Override
+    public boolean canDoWork(int numBatchesAvailable, final boolean hasOrderBy, boolean frameEndReached,
boolean partitionEndReached) {
+      assert numBatchesAvailable > 0 : "canDoWork() should not be called when numBatchesAvailable
== 0";
+
+      // for CUME_DIST, PERCENT_RANK and NTILE we need the full partition
+      // otherwise we can process the first batch immediately
+      return partitionEndReached || ! requiresFullPartition(hasOrderBy);
+    }
   }
 
   static class Ntile extends Ranking {
@@ -258,6 +305,16 @@ public abstract class WindowFunction {
       writeInputToLead = new ValueVectorWriteExpression(outputId, input, true);
       return true;
     }
+
+    @Override
+    public boolean requiresFullPartition(final boolean hasOrderBy) {
+      return false;
+    }
+
+    @Override
+    public boolean canDoWork(int numBatchesAvailable, final boolean hasOrderBy, boolean frameEndReached,
boolean partitionEndReached) {
+      return partitionEndReached || numBatchesAvailable > 1;
+    }
   }
 
   static class Lag extends WindowFunction {
@@ -313,6 +370,17 @@ public abstract class WindowFunction {
         cg.addExpr(writeInputToLag);
       }
     }
+
+    @Override
+    public boolean requiresFullPartition(final boolean hasOrderBy) {
+      return false;
+    }
+
+    @Override
+    public boolean canDoWork(int numBatchesAvailable, final boolean hasOrderBy, boolean frameEndReached,
boolean partitionEndReached) {
+      assert numBatchesAvailable > 0 : "canDoWork() should not be called when numBatchesAvailable
== 0";
+      return true;
+    }
   }
 
   static class LastValue extends WindowFunction {
@@ -358,6 +426,16 @@ public abstract class WindowFunction {
       cg.setMappingSet(mappingSet);
       cg.addExpr(writeSourceToLastValue);
     }
+
+    @Override
+    public boolean requiresFullPartition(final boolean hasOrderBy) {
+      return !hasOrderBy;
+    }
+
+    @Override
+    public boolean canDoWork(int numBatchesAvailable, boolean hasOrderBy, boolean frameEndReached,
boolean partitionEndReached) {
+      return partitionEndReached || (hasOrderBy && frameEndReached);
+    }
   }
 
   static class FirstValue extends WindowFunction {
@@ -427,5 +505,16 @@ public abstract class WindowFunction {
         cg.addExpr(writeFirstValueToFirstValue);
       }
     }
+
+    @Override
+    public boolean requiresFullPartition(boolean hasOrderBy) {
+      return false;
+    }
+
+    @Override
+    public boolean canDoWork(int numBatchesAvailable, boolean hasOrderBy, boolean frameEndReached,
boolean partitionEndReached) {
+      assert numBatchesAvailable > 0 : "canDoWork() should not be called when numBatchesAvailable
== 0";
+      return true;
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/ef1cb728/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/window/TestWindowFrame.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/window/TestWindowFrame.java
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/window/TestWindowFrame.java
index 92c27d7..6cb0f4a 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/window/TestWindowFrame.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/window/TestWindowFrame.java
@@ -330,7 +330,8 @@ public class TestWindowFrame extends BaseTestQuery {
 
   @Test
   public void testPartitionNtile() {
-    Partition partition = new Partition(12);
+    Partition partition = new Partition();
+    partition.updateLength(12, false);
 
     assertEquals(1, partition.ntile(5));
     partition.rowAggregated();


Mime
View raw message