drill-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From adene...@apache.org
Subject drill git commit: DRILL-4270: Create a separate WindowFramer that supports the FRAME clause
Date Thu, 21 Jan 2016 20:37:01 GMT
Repository: drill
Updated Branches:
  refs/heads/master 2f0e3f27e -> 3d0b4b025


DRILL-4270: Create a separate WindowFramer that supports the FRAME clause

separate DefaultFrameTemplate into 2 implementations: one that supports custom frames (aggregations, first_value, last_value) and one that doesn't

this closes #322


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/3d0b4b02
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/3d0b4b02
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/3d0b4b02

Branch: refs/heads/master
Commit: 3d0b4b02521f12e3871d6060c8f9bfce73b218a0
Parents: 2f0e3f2
Author: adeneche <adeneche@gmail.com>
Authored: Mon Jan 11 14:47:32 2016 -0800
Committer: adeneche <adeneche@gmail.com>
Committed: Thu Jan 21 12:23:04 2016 -0800

----------------------------------------------------------------------
 .../impl/window/DefaultFrameTemplate.java       | 425 -------------------
 .../impl/window/FrameSupportTemplate.java       | 316 ++++++++++++++
 .../impl/window/NoFrameSupportTemplate.java     | 347 +++++++++++++++
 .../impl/window/WindowFrameRecordBatch.java     | 153 ++++---
 .../exec/physical/impl/window/WindowFramer.java |   3 +-
 .../physical/impl/window/WindowFunction.java    |  53 ++-
 .../physical/impl/window/TestWindowFrame.java   |   8 +
 7 files changed, 807 insertions(+), 498 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/3d0b4b02/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/DefaultFrameTemplate.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/DefaultFrameTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/DefaultFrameTemplate.java
deleted file mode 100644
index d2ee9f1..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/DefaultFrameTemplate.java
+++ /dev/null
@@ -1,425 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.physical.impl.window;
-
-import org.apache.drill.common.exceptions.DrillException;
-import org.apache.drill.exec.exception.SchemaChangeException;
-import org.apache.drill.exec.ops.OperatorContext;
-import org.apache.drill.exec.record.TransferPair;
-import org.apache.drill.exec.record.VectorAccessible;
-import org.apache.drill.exec.record.VectorContainer;
-import org.apache.drill.exec.record.VectorWrapper;
-import org.apache.drill.exec.vector.BaseDataValueVector;
-import org.apache.drill.exec.vector.ValueVector;
-
-import javax.inject.Named;
-import java.util.List;
-
-
-public abstract class DefaultFrameTemplate implements WindowFramer {
-  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DefaultFrameTemplate.class);
-
-  private VectorContainer container;
-  private VectorContainer internal;
-  private boolean lagCopiedToInternal;
-  private List<WindowDataBatch> batches;
-  private int outputCount; // number of rows in currently/last processed batch
-
-  private int frameLastRow;
-
-  // true when at least one window function needs to process all batches of a partition before passing any batch downstream
-  private boolean requireFullPartition;
-
-  /**
-   * current partition being processed.</p>
-   * Can span over multiple batches, so we may need to keep it between calls to doWork()
-   */
-  private Partition partition;
-
-  @Override
-  public void setup(final List<WindowDataBatch> batches, final VectorContainer container, final OperatorContext oContext,
-                    final boolean requireFullPartition) throws SchemaChangeException {
-    this.container = container;
-    this.batches = batches;
-
-    internal = new VectorContainer(oContext);
-    allocateInternal();
-    lagCopiedToInternal = false;
-
-    outputCount = 0;
-    partition = null;
-
-    this.requireFullPartition = requireFullPartition;
-  }
-
-  private void allocateOutgoing() {
-    for (VectorWrapper<?> w : container) {
-      w.getValueVector().allocateNew();
-    }
-  }
-
-  private void allocateInternal() {
-    // TODO we don't need to allocate all container's vectors, we can pass a specific list of vectors to allocate internally
-    for (VectorWrapper<?> w : container) {
-      ValueVector vv = internal.addOrGet(w.getField());
-      vv.allocateNew();
-    }
-  }
-
-  /**
-   * processes all rows of current batch:
-   * <ul>
-   *   <li>compute aggregations</li>
-   *   <li>compute window functions</li>
-   *   <li>transfer remaining vectors from current batch to container</li>
-   * </ul>
-   */
-  @Override
-  public void doWork() throws DrillException {
-    int currentRow = 0;
-
-    logger.trace("WindowFramer.doWork() START, num batches {}, current batch has {} rows",
-      batches.size(), batches.get(0).getRecordCount());
-
-    allocateOutgoing();
-
-    final WindowDataBatch current = batches.get(0);
-
-    setupCopyFirstValue(current, internal);
-
-    // we need to store the record count explicitly, because we release current batch at the end of this call
-    outputCount = current.getRecordCount();
-
-    while (currentRow < outputCount) {
-      if (partition != null) {
-        assert currentRow == 0 : "pending windows are only expected at the start of the batch";
-
-        // we have a pending window we need to handle from a previous call to doWork()
-        logger.trace("we have a pending partition {}", partition);
-
-        if (!requireFullPartition) {
-          // we didn't compute the whole partition length in the previous partition, we need to update the length now
-          updatePartitionSize(partition, currentRow);
-        }
-      } else {
-        newPartition(current, currentRow);
-      }
-
-      currentRow = processPartition(currentRow);
-      if (partition.isDone()) {
-        cleanPartition();
-      }
-    }
-
-    // transfer "non aggregated" vectors
-    for (VectorWrapper<?> vw : current) {
-      ValueVector v = container.addOrGet(vw.getField());
-      TransferPair tp = vw.getValueVector().makeTransferPair(v);
-      tp.transfer();
-    }
-
-    for (VectorWrapper<?> v : container) {
-      v.getValueVector().getMutator().setValueCount(outputCount);
-    }
-
-    // because we are using the default frame, and we keep the aggregated value until we start a new frame
-    // we can safely free the current batch
-    batches.remove(0).clear();
-
-    logger.trace("WindowFramer.doWork() END");
-  }
-
-  private void newPartition(final WindowDataBatch current, final int currentRow) throws SchemaChangeException {
-    partition = new Partition();
-    updatePartitionSize(partition, currentRow);
-
-    setupPartition(current, container);
-    copyFirstValueToInternal(currentRow);
-  }
-
-  private void cleanPartition() {
-    partition = null;
-    resetValues();
-    for (VectorWrapper<?> vw : internal) {
-      if ((vw.getValueVector() instanceof BaseDataValueVector)) {
-        ((BaseDataValueVector) vw.getValueVector()).reset();
-      }
-    }
-    lagCopiedToInternal = false;
-  }
-
-  /**
-   * process all rows (computes and writes aggregation values) of current batch that are part of current partition.
-   * @param currentRow first unprocessed row
-   * @return index of next unprocessed row
-   * @throws DrillException if it can't write into the container
-   */
-  private int processPartition(final int currentRow) throws DrillException {
-    logger.trace("process partition {}, currentRow: {}, outputCount: {}", partition, currentRow, outputCount);
-
-    final VectorAccessible current = getCurrent();
-    setupCopyNext(current, container);
-    setupPasteValues(internal, container);
-
-    copyPrevFromInternal();
-
-    // copy remaining from current
-    setupCopyPrev(current, container);
-
-    int row = currentRow;
-
-    // process all rows except the last one of the batch/partition
-    while (row < outputCount && !partition.isDone()) {
-      if (row != currentRow) { // this is not the first row of the partition
-        copyPrev(row - 1, row);
-      }
-
-      processRow(row);
-
-      if (row < outputCount - 1 && !partition.isDone()) {
-        copyNext(row + 1, row);
-      }
-
-      row++;
-    }
-
-    // if we didn't reach the end of partition yet
-    if (!partition.isDone() && batches.size() > 1) {
-      // copy next value onto the current one
-      setupCopyNext(batches.get(1), container);
-      copyNext(0, row - 1);
-
-      copyPrevToInternal(current, row);
-    }
-
-    return row;
-  }
-
-  private void copyPrevToInternal(VectorAccessible current, int row) {
-    logger.trace("copying {} into internal", row - 1);
-    setupCopyPrev(current, internal);
-    copyPrev(row - 1, 0);
-    lagCopiedToInternal = true;
-  }
-
-  private void copyPrevFromInternal() {
-    if (lagCopiedToInternal) {
-      setupCopyFromInternal(internal, container);
-      copyFromInternal(0, 0);
-      lagCopiedToInternal = false;
-    }
-  }
-
-  private void processRow(final int row) throws DrillException {
-    if (partition.isFrameDone()) {
-      // because all peer rows share the same frame, we only need to compute and aggregate the frame once
-      final long peers = aggregatePeers(row);
-      partition.newFrame(peers);
-    }
-
-    outputRow(row, partition);
-    writeLastValue(frameLastRow, row);
-
-    partition.rowAggregated();
-  }
-
-  /**
-   * updates partition's length after computing the number of rows for the current the partition starting at the specified
-   * row of the first batch. If !requiresFullPartition, this method will only count the rows in the current batch
-   */
-  private void updatePartitionSize(final Partition partition, final int start) {
-    logger.trace("compute partition size starting from {} on {} batches", start, batches.size());
-
-    // current partition always starts from first batch
-    final VectorAccessible first = getCurrent();
-
-    long length = 0;
-    boolean lastBatch = false;
-    int row = start;
-
-    // count all rows that are in the same partition of start
-    // keep increasing length until we find first row of next partition or we reach the very last batch
-
-    outer:
-    for (WindowDataBatch batch : batches) {
-      final int recordCount = batch.getRecordCount();
-
-      // check first container from start row, and subsequent containers from first row
-      for (; row < recordCount; row++, length++) {
-        if (!isSamePartition(start, first, row, batch)) {
-          break outer;
-        }
-      }
-
-      if (!requireFullPartition) {
-        // we are only interested in the first batch's records
-        break;
-      }
-
-      row = 0;
-    }
-
-    if (!requireFullPartition) {
-      // this is the last batch of current partition if
-      lastBatch = row < outputCount                           // partition ends before the end of the batch
-        || batches.size() == 1                                // it's the last available batch
-        || !isSamePartition(start, first, 0, batches.get(1)); // next batch contains a different partition
-    }
-
-    partition.updateLength(length, !(requireFullPartition || lastBatch));
-  }
-
-  /**
-   * aggregates all peer rows of current row
-   * @param start starting row of the current frame
-   * @return num peer rows for current row
-   * @throws SchemaChangeException
-   */
-  private long aggregatePeers(final int start) throws SchemaChangeException {
-    logger.trace("aggregating rows starting from {}", start);
-
-    // current frame always starts from first batch
-    final VectorAccessible first = getCurrent();
-    VectorAccessible last = first;
-    long length = 0;
-
-    // a single frame can include rows from multiple batches
-    // start processing first batch and, if necessary, move to next batches
-    for (WindowDataBatch batch : batches) {
-      setupEvaluatePeer(batch, container);
-      final int recordCount = batch.getRecordCount();
-
-      // for every remaining row in the partition, count it if it's a peer row
-      for (int row = (batch == first) ? start : 0; row < recordCount; row++, length++) {
-        if (!isPeer(start, first, row, batch)) {
-          break;
-        }
-
-        evaluatePeer(row);
-        last = batch;
-        frameLastRow = row;
-      }
-    }
-
-    setupReadLastValue(last, container);
-
-    return length;
-  }
-
-  /**
-   * @return saved batch that will be processed in doWork()
-   */
-  private VectorAccessible getCurrent() {
-    return batches.get(0);
-  }
-
-  @Override
-  public int getOutputCount() {
-    return outputCount;
-  }
-
-  // we need this abstract method for code generation
-  @Override
-  public void cleanup() {
-    logger.trace("clearing internal");
-    internal.clear();
-  }
-
-  /**
-   * called once for each peer row of the current frame.
-   * @param index of row to aggregate
-   */
-  public abstract void evaluatePeer(@Named("index") int index);
-  public abstract void setupEvaluatePeer(@Named("incoming") VectorAccessible incoming, @Named("outgoing") VectorAccessible outgoing) throws SchemaChangeException;
-
-  public abstract void setupReadLastValue(@Named("incoming") VectorAccessible incoming, @Named("outgoing") VectorAccessible outgoing) throws SchemaChangeException;
-  public abstract void writeLastValue(@Named("index") int index, @Named("outIndex") int outIndex);
-
-  public abstract void setupCopyFirstValue(@Named("incoming") VectorAccessible incoming, @Named("outgoing") VectorAccessible outgoing) throws SchemaChangeException;
-  public abstract void copyFirstValueToInternal(@Named("index") int index);
-
-  /**
-   * called once for each row after we evaluate all peer rows. Used to write a value in the row
-   *
-   * @param outIndex index of row
-   * @param partition object used by "computed" window functions
-   */
-  public abstract void outputRow(@Named("outIndex") int outIndex, @Named("partition") Partition partition);
-
-  /**
-   * Called once per partition, before processing the partition. Used to setup read/write vectors
-   * @param incoming batch we will read from
-   * @param outgoing batch we will be writing to
-   *
-   * @throws SchemaChangeException
-   */
-  public abstract void setupPartition(@Named("incoming") WindowDataBatch incoming,
-                                      @Named("outgoing") VectorAccessible outgoing) throws SchemaChangeException;
-
-  /**
-   * copies value(s) from inIndex row to outIndex row. Mostly used by LEAD. inIndex always points to the row next to
-   * outIndex
-   * @param inIndex source row of the copy
-   * @param outIndex destination row of the copy.
-   */
-  public abstract void copyNext(@Named("inIndex") int inIndex, @Named("outIndex") int outIndex);
-  public abstract void setupCopyNext(@Named("incoming") VectorAccessible incoming, @Named("outgoing") VectorAccessible outgoing);
-
-  public abstract void setupPasteValues(@Named("incoming") VectorAccessible incoming, @Named("outgoing") VectorAccessible outgoing);
-
-  /**
-   * copies value(s) from inIndex row to outIndex row. Mostly used by LAG. inIndex always points to the previous row
-   *
-   * @param inIndex source row of the copy
-   * @param outIndex destination row of the copy.
-   */
-  public abstract void copyPrev(@Named("inIndex") int inIndex, @Named("outIndex") int outIndex);
-  public abstract void setupCopyPrev(@Named("incoming") VectorAccessible incoming, @Named("outgoing") VectorAccessible outgoing);
-
-  public abstract void copyFromInternal(@Named("inIndex") int inIndex, @Named("outIndex") int outIndex);
-  public abstract void setupCopyFromInternal(@Named("incoming") VectorAccessible incoming, @Named("outgoing") VectorAccessible outgoing);
-
-  /**
-   * reset all window functions
-   */
-  public abstract boolean resetValues();
-
-  /**
-   * compares two rows from different batches (can be the same), if they have the same value for the partition by
-   * expression
-   * @param b1Index index of first row
-   * @param b1 batch for first row
-   * @param b2Index index of second row
-   * @param b2 batch for second row
-   * @return true if the rows are in the same partition
-   */
-  public abstract boolean isSamePartition(@Named("b1Index") int b1Index, @Named("b1") VectorAccessible b1,
-                                          @Named("b2Index") int b2Index, @Named("b2") VectorAccessible b2);
-
-  /**
-   * compares two rows from different batches (can be the same), if they have the same value for the order by
-   * expression
-   * @param b1Index index of first row
-   * @param b1 batch for first row
-   * @param b2Index index of second row
-   * @param b2 batch for second row
-   * @return true if the rows are in the same partition
-   */
-  public abstract boolean isPeer(@Named("b1Index") int b1Index, @Named("b1") VectorAccessible b1,
-                                 @Named("b2Index") int b2Index, @Named("b2") VectorAccessible b2);
-}

http://git-wip-us.apache.org/repos/asf/drill/blob/3d0b4b02/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/FrameSupportTemplate.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/FrameSupportTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/FrameSupportTemplate.java
new file mode 100644
index 0000000..16c7513
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/FrameSupportTemplate.java
@@ -0,0 +1,316 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.window;
+
+import org.apache.drill.common.exceptions.DrillException;
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.record.VectorAccessible;
+import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.record.VectorWrapper;
+import org.apache.drill.exec.vector.BaseDataValueVector;
+import org.apache.drill.exec.vector.ValueVector;
+
+import javax.inject.Named;
+import java.util.List;
+
+
+/**
+ * WindowFramer implementation that supports the FRAME clause.
+ * <br>According to the SQL specification, FIRST_VALUE, LAST_VALUE and all aggregate functions support the FRAME clause.
+ * This class will handle such functions even if the FRAME clause is not present.
+ */
+public abstract class FrameSupportTemplate implements WindowFramer {
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(NoFrameSupportTemplate.class);
+
+  private VectorContainer container;
+  private VectorContainer internal;
+  private List<WindowDataBatch> batches;
+  private int outputCount; // number of rows in currently/last processed batch
+
+  private WindowDataBatch current;
+
+  private int frameLastRow;
+
+  // true when at least one window function needs to process all batches of a partition before passing any batch downstream
+  private boolean requireFullPartition;
+
+  private Partition partition;
+
+  @Override
+  public void setup(final List<WindowDataBatch> batches, final VectorContainer container, final OperatorContext oContext,
+                    final boolean requireFullPartition) throws SchemaChangeException {
+    this.container = container;
+    this.batches = batches;
+
+    internal = new VectorContainer(oContext);
+    allocateInternal();
+
+    outputCount = 0;
+    partition = null;
+
+    this.requireFullPartition = requireFullPartition;
+  }
+
+  private void allocateInternal() {
+    for (VectorWrapper<?> w : container) {
+      ValueVector vv = internal.addOrGet(w.getField());
+      vv.allocateNew();
+    }
+  }
+
+  /**
+   * processes all rows of the first batch.
+   */
+  @Override
+  public void doWork() throws DrillException {
+    int currentRow = 0;
+
+    this.current = batches.get(0);
+
+    setupSaveFirstValue(current, internal);
+
+    outputCount = current.getRecordCount();
+
+    while (currentRow < outputCount) {
+      if (partition != null) {
+        assert currentRow == 0 : "pending windows are only expected at the start of the batch";
+
+        // we have a pending window we need to handle from a previous call to doWork()
+        logger.trace("we have a pending partition {}", partition);
+
+        if (!requireFullPartition) {
+          // we didn't compute the whole partition length in the previous partition, we need to update the length now
+          updatePartitionSize(partition, currentRow);
+        }
+      } else {
+        newPartition(current, currentRow);
+      }
+
+      currentRow = processPartition(currentRow);
+      if (partition.isDone()) {
+        cleanPartition();
+      }
+    }
+  }
+
+  private void newPartition(final WindowDataBatch current, final int currentRow) throws SchemaChangeException {
+    partition = new Partition();
+    updatePartitionSize(partition, currentRow);
+
+    setupPartition(current, container);
+    saveFirstValue(currentRow);
+  }
+
+  private void cleanPartition() {
+    partition = null;
+    resetValues();
+    for (VectorWrapper<?> vw : internal) {
+      if ((vw.getValueVector() instanceof BaseDataValueVector)) {
+        ((BaseDataValueVector) vw.getValueVector()).reset();
+      }
+    }
+  }
+
+  /**
+   * process all rows (computes and writes aggregation values) of current batch that are part of current partition.
+   * @param currentRow first unprocessed row
+   * @return index of next unprocessed row
+   * @throws DrillException if it can't write into the container
+   */
+  private int processPartition(final int currentRow) throws DrillException {
+    logger.trace("process partition {}, currentRow: {}, outputCount: {}", partition, currentRow, outputCount);
+
+    setupWriteFirstValue(internal, container);
+
+    int row = currentRow;
+
+    // process all rows except the last one of the batch/partition
+    while (row < outputCount && !partition.isDone()) {
+      processRow(row);
+
+      row++;
+    }
+
+    return row;
+  }
+
+  private void processRow(final int row) throws DrillException {
+    if (partition.isFrameDone()) {
+      // because all peer rows share the same frame, we only need to compute and aggregate the frame once
+      final long peers = aggregatePeers(row);
+      partition.newFrame(peers);
+    }
+
+    outputRow(row, partition);
+    writeLastValue(frameLastRow, row);
+
+    partition.rowAggregated();
+  }
+
+  /**
+   * updates partition's length after computing the number of rows for the current the partition starting at the specified
+   * row of the first batch. If !requiresFullPartition, this method will only count the rows in the current batch
+   */
+  private void updatePartitionSize(final Partition partition, final int start) {
+    logger.trace("compute partition size starting from {} on {} batches", start, batches.size());
+
+    long length = 0;
+    boolean lastBatch = false;
+    int row = start;
+
+    // count all rows that are in the same partition of start
+    // keep increasing length until we find first row of next partition or we reach the very last batch
+
+    outer:
+    for (WindowDataBatch batch : batches) {
+      final int recordCount = batch.getRecordCount();
+
+      // check first container from start row, and subsequent containers from first row
+      for (; row < recordCount; row++, length++) {
+        if (!isSamePartition(start, current, row, batch)) {
+          break outer;
+        }
+      }
+
+      if (!requireFullPartition) {
+        // we are only interested in the first batch's records
+        break;
+      }
+
+      row = 0;
+    }
+
+    if (!requireFullPartition) {
+      // this is the last batch of current partition if
+      lastBatch = row < outputCount                           // partition ends before the end of the batch
+        || batches.size() == 1                                // it's the last available batch
+        || !isSamePartition(start, current, 0, batches.get(1)); // next batch contains a different partition
+    }
+
+    partition.updateLength(length, !(requireFullPartition || lastBatch));
+  }
+
+  /**
+   * aggregates all peer rows of current row
+   * @param start starting row of the current frame
+   * @return num peer rows for current row
+   * @throws SchemaChangeException
+   */
+  private long aggregatePeers(final int start) throws SchemaChangeException {
+    logger.trace("aggregating rows starting from {}", start);
+
+    VectorAccessible last = current;
+    long length = 0;
+
+    // a single frame can include rows from multiple batches
+    // start processing first batch and, if necessary, move to next batches
+    for (WindowDataBatch batch : batches) {
+      setupEvaluatePeer(batch, container);
+      final int recordCount = batch.getRecordCount();
+
+      // for every remaining row in the partition, count it if it's a peer row
+      for (int row = (batch == current) ? start : 0; row < recordCount; row++, length++) {
+        if (!isPeer(start, current, row, batch)) {
+          break;
+        }
+
+        evaluatePeer(row);
+        last = batch;
+        frameLastRow = row;
+      }
+    }
+
+    setupReadLastValue(last, container);
+
+    return length;
+  }
+
+  @Override
+  public int getOutputCount() {
+    return outputCount;
+  }
+
+  // we need this abstract method for code generation
+  @Override
+  public void cleanup() {
+    logger.trace("clearing internal");
+    internal.clear();
+  }
+
+  /**
+   * called once for each peer row of the current frame.
+   * @param index of row to aggregate
+   */
+  public abstract void evaluatePeer(@Named("index") int index);
+  public abstract void setupEvaluatePeer(@Named("incoming") VectorAccessible incoming, @Named("outgoing") VectorAccessible outgoing) throws SchemaChangeException;
+
+  public abstract void setupReadLastValue(@Named("incoming") VectorAccessible incoming, @Named("outgoing") VectorAccessible outgoing) throws SchemaChangeException;
+  public abstract void writeLastValue(@Named("index") int index, @Named("outIndex") int outIndex);
+
+  public abstract void setupSaveFirstValue(@Named("incoming") VectorAccessible incoming, @Named("outgoing") VectorAccessible outgoing) throws SchemaChangeException;
+  public abstract void saveFirstValue(@Named("index") int index);
+  public abstract void setupWriteFirstValue(@Named("incoming") VectorAccessible incoming, @Named("outgoing") VectorAccessible outgoing);
+
+  /**
+   * called once for each row after we evaluate all peer rows. Used to write a value in the row
+   *
+   * @param outIndex index of row
+   * @param partition object used by "computed" window functions
+   */
+  public abstract void outputRow(@Named("outIndex") int outIndex, @Named("partition") Partition partition);
+
+  /**
+   * Called once per partition, before processing the partition. Used to setup read/write vectors
+   * @param incoming batch we will read from
+   * @param outgoing batch we will be writing to
+   *
+   * @throws SchemaChangeException
+   */
+  public abstract void setupPartition(@Named("incoming") WindowDataBatch incoming,
+                                      @Named("outgoing") VectorAccessible outgoing) throws SchemaChangeException;
+
+  /**
+   * reset all window functions
+   */
+  public abstract boolean resetValues();
+
+  /**
+   * compares two rows from different batches (can be the same), if they have the same value for the partition by
+   * expression
+   * @param b1Index index of first row
+   * @param b1 batch for first row
+   * @param b2Index index of second row
+   * @param b2 batch for second row
+   * @return true if the rows are in the same partition
+   */
+  public abstract boolean isSamePartition(@Named("b1Index") int b1Index, @Named("b1") VectorAccessible b1,
+                                          @Named("b2Index") int b2Index, @Named("b2") VectorAccessible b2);
+
+  /**
+   * compares two rows from different batches (can be the same), if they have the same value for the order by
+   * expression
+   * @param b1Index index of first row
+   * @param b1 batch for first row
+   * @param b2Index index of second row
+   * @param b2 batch for second row
+   * @return true if the rows are in the same partition
+   */
+  public abstract boolean isPeer(@Named("b1Index") int b1Index, @Named("b1") VectorAccessible b1,
+                                 @Named("b2Index") int b2Index, @Named("b2") VectorAccessible b2);
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/3d0b4b02/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/NoFrameSupportTemplate.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/NoFrameSupportTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/NoFrameSupportTemplate.java
new file mode 100644
index 0000000..ac1eefc
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/NoFrameSupportTemplate.java
@@ -0,0 +1,347 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.window;
+
+import org.apache.drill.common.exceptions.DrillException;
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.record.VectorAccessible;
+import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.record.VectorWrapper;
+import org.apache.drill.exec.vector.BaseDataValueVector;
+import org.apache.drill.exec.vector.ValueVector;
+
+import javax.inject.Named;
+import java.util.List;
+
+
+/**
+ * WindowFramer implementation that doesn't support the FRAME clause (will assume the default frame).
+ * <br>According to the SQL standard, LEAD, LAG, ROW_NUMBER, NTILE and all ranking functions don't support the FRAME clause.
+ * This class will handle such functions.
+ */
+public abstract class NoFrameSupportTemplate implements WindowFramer {
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(NoFrameSupportTemplate.class);
+
+  private VectorContainer container;
+  private VectorContainer internal;
+  private boolean lagCopiedToInternal;
+  private List<WindowDataBatch> batches;
+  private int outputCount; // number of rows in currently/last processed batch
+
+  private WindowDataBatch current;
+
+  // true when at least one window function needs to process all batches of a partition before passing any batch downstream
+  private boolean requireFullPartition;
+
+  private Partition partition; // current partition being processed
+
+  @Override
+  public void setup(final List<WindowDataBatch> batches, final VectorContainer container, final OperatorContext oContext,
+                    final boolean requireFullPartition) throws SchemaChangeException {
+    this.container = container;
+    this.batches = batches;
+
+    internal = new VectorContainer(oContext);
+    allocateInternal();
+    lagCopiedToInternal = false;
+
+    outputCount = 0;
+    partition = null;
+
+    this.requireFullPartition = requireFullPartition;
+  }
+
+  private void allocateInternal() {
+    for (VectorWrapper<?> w : container) {
+      ValueVector vv = internal.addOrGet(w.getField());
+      vv.allocateNew();
+    }
+  }
+
+  /**
+   * processes all rows of the first batch.
+   */
+  @Override
+  public void doWork() throws DrillException {
+    int currentRow = 0;
+
+    this.current = batches.get(0);
+
+    outputCount = current.getRecordCount();
+
+    while (currentRow < outputCount) {
+      if (partition != null) {
+        assert currentRow == 0 : "pending windows are only expected at the start of the batch";
+
+        // we have a pending window we need to handle from a previous call to doWork()
+        logger.trace("we have a pending partition {}", partition);
+
+        if (!requireFullPartition) {
+          // we didn't compute the whole partition length in the previous partition, we need to update the length now
+          updatePartitionSize(partition, currentRow);
+        }
+      } else {
+        newPartition(current, currentRow);
+      }
+
+      currentRow = processPartition(currentRow);
+      if (partition.isDone()) {
+        cleanPartition();
+      }
+    }
+  }
+
+  private void newPartition(final WindowDataBatch current, final int currentRow) throws SchemaChangeException {
+    partition = new Partition();
+    updatePartitionSize(partition, currentRow);
+
+    setupPartition(current, container);
+  }
+
+  private void cleanPartition() {
+    partition = null;
+    resetValues();
+    for (VectorWrapper<?> vw : internal) {
+      if ((vw.getValueVector() instanceof BaseDataValueVector)) {
+        ((BaseDataValueVector) vw.getValueVector()).reset();
+      }
+    }
+    lagCopiedToInternal = false;
+  }
+
+  /**
+   * process all rows (computes and writes function values) of current batch that are part of current partition.
+   * @param currentRow first unprocessed row
+   * @return index of next unprocessed row
+   * @throws DrillException if it can't write into the container
+   */
+  private int processPartition(final int currentRow) throws DrillException {
+    logger.trace("process partition {}, currentRow: {}, outputCount: {}", partition, currentRow, outputCount);
+
+    setupCopyNext(current, container);
+
+    copyPrevFromInternal();
+
+    // copy remaining from current
+    setupCopyPrev(current, container);
+
+    int row = currentRow;
+
+    // process all rows except the last one of the batch/partition
+    while (row < outputCount && !partition.isDone()) {
+      if (row != currentRow) { // this is not the first row of the partition
+        copyPrev(row - 1, row);
+      }
+
+      processRow(row);
+
+      if (row < outputCount - 1 && !partition.isDone()) {
+        copyNext(row + 1, row);
+      }
+
+      row++;
+    }
+
+    // if we didn't reach the end of partition yet
+    if (!partition.isDone() && batches.size() > 1) {
+      // copy next value onto the current one
+      setupCopyNext(batches.get(1), container);
+      copyNext(0, row - 1);
+
+      copyPrevToInternal(current, row);
+    }
+
+    return row;
+  }
+
+  private void copyPrevToInternal(VectorAccessible current, int row) {
+    logger.trace("copying {} into internal", row - 1);
+    setupCopyPrev(current, internal);
+    copyPrev(row - 1, 0);
+    lagCopiedToInternal = true;
+  }
+
+  private void copyPrevFromInternal() {
+    if (lagCopiedToInternal) {
+      setupCopyFromInternal(internal, container);
+      copyFromInternal(0, 0);
+      lagCopiedToInternal = false;
+    }
+  }
+
+  private void processRow(final int row) throws DrillException {
+    if (partition.isFrameDone()) {
+      // because all peer rows share the same frame, we only need to compute and aggregate the frame once
+      final long peers = countPeers(row);
+      partition.newFrame(peers);
+    }
+
+    outputRow(row, partition);
+
+    partition.rowAggregated();
+  }
+
+  /**
+   * updates partition's length after computing the number of rows for the current the partition starting at the specified
+   * row of the first batch. If !requiresFullPartition, this method will only count the rows in the current batch
+   */
+  private void updatePartitionSize(final Partition partition, final int start) {
+    logger.trace("compute partition size starting from {} on {} batches", start, batches.size());
+
+    long length = 0;
+    boolean lastBatch = false;
+    int row = start;
+
+    // count all rows that are in the same partition of start
+    // keep increasing length until we find first row of next partition or we reach the very last batch
+
+    outer:
+    for (WindowDataBatch batch : batches) {
+      final int recordCount = batch.getRecordCount();
+
+      // check first container from start row, and subsequent containers from first row
+      for (; row < recordCount; row++, length++) {
+        if (!isSamePartition(start, current, row, batch)) {
+          break outer;
+        }
+      }
+
+      if (!requireFullPartition) {
+        // we are only interested in the first batch's records
+        break;
+      }
+
+      row = 0;
+    }
+
+    if (!requireFullPartition) {
+      // this is the last batch of current partition if
+      lastBatch = row < outputCount                           // partition ends before the end of the batch
+        || batches.size() == 1                                // it's the last available batch
+        || !isSamePartition(start, current, 0, batches.get(1)); // next batch contains a different partition
+    }
+
+    partition.updateLength(length, !(requireFullPartition || lastBatch));
+  }
+
+  /**
+   * count number of peer rows for current row
+   * @param start starting row of the current frame
+   * @return num peer rows for current row
+   * @throws SchemaChangeException
+   */
+  private long countPeers(final int start) throws SchemaChangeException {
+    long length = 0;
+
+    // a single frame can include rows from multiple batches
+    // start processing first batch and, if necessary, move to next batches
+    for (WindowDataBatch batch : batches) {
+      final int recordCount = batch.getRecordCount();
+
+      // for every remaining row in the partition, count it if it's a peer row
+      for (int row = (batch == current) ? start : 0; row < recordCount; row++, length++) {
+        if (!isPeer(start, current, row, batch)) {
+          break;
+        }
+      }
+    }
+
+    return length;
+  }
+
+  @Override
+  public int getOutputCount() {
+    return outputCount;
+  }
+
+  // we need this abstract method for code generation
+  @Override
+  public void cleanup() {
+    logger.trace("clearing internal");
+    internal.clear();
+  }
+
+  /**
+   * called once for each row after we evaluate all peer rows. Used to write a value in the row
+   *
+   * @param outIndex index of row
+   * @param partition object used by "computed" window functions
+   */
+  public abstract void outputRow(@Named("outIndex") int outIndex, @Named("partition") Partition partition);
+
+  /**
+   * Called once per partition, before processing the partition. Used to setup read/write vectors
+   * @param incoming batch we will read from
+   * @param outgoing batch we will be writing to
+   *
+   * @throws SchemaChangeException
+   */
+  public abstract void setupPartition(@Named("incoming") WindowDataBatch incoming,
+                                      @Named("outgoing") VectorAccessible outgoing) throws SchemaChangeException;
+
+  /**
+   * copies value(s) from inIndex row to outIndex row. Mostly used by LEAD. inIndex always points to the row next to
+   * outIndex
+   * @param inIndex source row of the copy
+   * @param outIndex destination row of the copy.
+   */
+  public abstract void copyNext(@Named("inIndex") int inIndex, @Named("outIndex") int outIndex);
+  public abstract void setupCopyNext(@Named("incoming") VectorAccessible incoming, @Named("outgoing") VectorAccessible outgoing);
+
+  /**
+   * copies value(s) from inIndex row to outIndex row. Mostly used by LAG. inIndex always points to the previous row
+   *
+   * @param inIndex source row of the copy
+   * @param outIndex destination row of the copy.
+   */
+  public abstract void copyPrev(@Named("inIndex") int inIndex, @Named("outIndex") int outIndex);
+  public abstract void setupCopyPrev(@Named("incoming") VectorAccessible incoming, @Named("outgoing") VectorAccessible outgoing);
+
+  public abstract void copyFromInternal(@Named("inIndex") int inIndex, @Named("outIndex") int outIndex);
+  public abstract void setupCopyFromInternal(@Named("incoming") VectorAccessible incoming, @Named("outgoing") VectorAccessible outgoing);
+
+  /**
+   * reset all window functions
+   */
+  public abstract boolean resetValues();
+
+  /**
+   * compares two rows from different batches (can be the same), if they have the same value for the partition by
+   * expression
+   * @param b1Index index of first row
+   * @param b1 batch for first row
+   * @param b2Index index of second row
+   * @param b2 batch for second row
+   * @return true if the rows are in the same partition
+   */
+  public abstract boolean isSamePartition(@Named("b1Index") int b1Index, @Named("b1") VectorAccessible b1,
+                                          @Named("b2Index") int b2Index, @Named("b2") VectorAccessible b2);
+
+  /**
+   * compares two rows from different batches (can be the same), if they have the same value for the order by
+   * expression
+   * @param b1Index index of first row
+   * @param b1 batch for first row
+   * @param b2Index index of second row
+   * @param b2 batch for second row
+   * @return true if the rows are in the same partition
+   */
+  public abstract boolean isPeer(@Named("b1Index") int b1Index, @Named("b1") VectorAccessible b1,
+                                 @Named("b2Index") int b2Index, @Named("b2") VectorAccessible b2);
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/3d0b4b02/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFrameRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFrameRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFrameRecordBatch.java
index 2261734..d6be1eb 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFrameRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFrameRecordBatch.java
@@ -28,6 +28,7 @@ import org.apache.drill.common.expression.FunctionCall;
 import org.apache.drill.common.expression.LogicalExpression;
 import org.apache.drill.common.logical.data.NamedExpression;
 import org.apache.drill.common.logical.data.Order;
+import org.apache.drill.exec.compile.TemplateClassDefinition;
 import org.apache.drill.exec.compile.sig.GeneratorMapping;
 import org.apache.drill.exec.compile.sig.MappingSet;
 import org.apache.drill.exec.exception.ClassTransformationException;
@@ -42,11 +43,13 @@ import org.apache.drill.exec.physical.config.WindowPOP;
 import org.apache.drill.exec.record.AbstractRecordBatch;
 import org.apache.drill.exec.record.BatchSchema;
 import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.TransferPair;
 import org.apache.drill.exec.record.VectorAccessible;
 import org.apache.drill.exec.record.VectorWrapper;
 
 import com.google.common.collect.Lists;
 import com.sun.codemodel.JExpr;
+import org.apache.drill.exec.vector.ValueVector;
 
 /**
  * support for OVER(PARTITION BY expression1,expression2,... [ORDER BY expressionA, expressionB,...])
@@ -58,14 +61,14 @@ public class WindowFrameRecordBatch extends AbstractRecordBatch<WindowPOP> {
   private final RecordBatch incoming;
   private List<WindowDataBatch> batches;
 
-  private WindowFramer framer;
+  private WindowFramer[] framers;
   private boolean hasOrderBy; // true if window definition contains an order-by clause
   private final List<WindowFunction> functions = Lists.newArrayList();
 
-  private boolean noMoreBatches;
+  private boolean noMoreBatches; // true when downstream returns NONE
   private BatchSchema schema;
 
-  private boolean shouldStop;
+  private boolean shouldStop; // true if we received an early termination request
 
   public WindowFrameRecordBatch(WindowPOP popConfig, FragmentContext context, RecordBatch incoming) throws OutOfMemoryException {
     super(popConfig, context);
@@ -74,41 +77,7 @@ public class WindowFrameRecordBatch extends AbstractRecordBatch<WindowPOP> {
   }
 
   /**
-   * Let's assume we have the following 3 batches of data:
-   * <p><pre>
-   * +---------+--------+--------------+--------+
-   * |   b0    |   b1   |      b2      |   b3   |
-   * +----+----+--------+----+----+----+--------+
-   * | p0 | p1 |   p1   | p2 | p3 | p4 |   p5   |
-   * +----+----+--------+----+----+----+--------+
-   * </pre></p>
-   *
-   * batch b0 contains partitions p0 and p1
-   * batch b1 contains partition p1
-   * batch b2 contains partitions p2 p3 and p4
-   * batch b3 contains partition p5
-   *
-   * <p><pre>
-   * when innerNext() is called:
-   *   call next(incoming), we receive and save b0 in a list of WindowDataBatch
-   *     we can't process b0 yet because we don't know if p1 has more rows upstream
-   *   call next(incoming), we receive and save b1
-   *     we can't process b0 yet for the same reason previously stated
-   *   call next(incoming), we receive and save b2
-   *   we process b0 (using the framer) and pass the container downstream
-   * when innerNext() is called:
-   *   we process b1 and pass the container downstream, b0 and b1 are released from memory
-   * when innerNext() is called:
-   *   call next(incoming), we receive and save b3
-   *   we process b2 and pass the container downstream, b2 is released from memory
-   * when innerNext() is called:
-   *   call next(incoming) and receive NONE
-   *   we process b3 and pass the container downstream, b3 is released from memory
-   * when innerNext() is called:
-   *  we return NONE
-   * </pre></p>
-   * Because we only support the default frame, we don't need to reset the aggregations until we reach the end of
-   * a partition. We can safely free a batch as soon as it has been processed.
+   * Hold incoming batches in memory until all window functions are ready to process the batch on top of the queue
    */
   @Override
   public IterOutcome innerNext() {
@@ -174,7 +143,7 @@ public class WindowFrameRecordBatch extends AbstractRecordBatch<WindowPOP> {
 
     // process first saved batch, then release it
     try {
-      framer.doWork();
+      doWork();
     } catch (DrillException e) {
       context.fail(e);
       cleanup();
@@ -188,6 +157,41 @@ public class WindowFrameRecordBatch extends AbstractRecordBatch<WindowPOP> {
     return IterOutcome.OK;
   }
 
+  private void doWork() throws DrillException {
+
+    final WindowDataBatch current = batches.get(0);
+    final int recordCount = current.getRecordCount();
+
+    logger.trace("WindowFramer.doWork() START, num batches {}, current batch has {} rows", batches.size(), recordCount);
+
+    // allocate outgoing vectors
+    for (VectorWrapper<?> w : container) {
+      w.getValueVector().allocateNew();
+    }
+
+    for (WindowFramer framer : framers) {
+      framer.doWork();
+    }
+
+    // transfer "non aggregated" vectors
+    for (VectorWrapper<?> vw : current) {
+      ValueVector v = container.addOrGet(vw.getField());
+      TransferPair tp = vw.getValueVector().makeTransferPair(v);
+      tp.transfer();
+    }
+
+    container.setRecordCount(recordCount);
+    for (VectorWrapper<?> v : container) {
+      v.getValueVector().getMutator().setValueCount(recordCount);
+    }
+
+    // we can safely free the current batch
+    current.clear();
+    batches.remove(0);
+
+    logger.trace("doWork() END");
+  }
+
   /**
    * @return true when all window functions are ready to process the current batch (it's the first batch currently
    * held in memory)
@@ -204,8 +208,8 @@ public class WindowFrameRecordBatch extends AbstractRecordBatch<WindowPOP> {
     final VectorAccessible last = batches.get(batches.size() - 1);
     final int lastSize = last.getRecordCount();
 
-    final boolean partitionEndReached = !framer.isSamePartition(currentSize - 1, current, lastSize - 1, last);
-    final boolean frameEndReached = partitionEndReached || !framer.isPeer(currentSize - 1, current, lastSize - 1, last);
+    final boolean partitionEndReached = !framers[0].isSamePartition(currentSize - 1, current, lastSize - 1, last);
+    final boolean frameEndReached = partitionEndReached || !framers[0].isPeer(currentSize - 1, current, lastSize - 1, last);
 
     for (final WindowFunction function : functions) {
       if (!function.canDoWork(batches.size(), hasOrderBy, frameEndReached, partitionEndReached)) {
@@ -234,7 +238,7 @@ public class WindowFrameRecordBatch extends AbstractRecordBatch<WindowPOP> {
     }
 
     try {
-      framer = createFramer(incoming);
+      createFramers(incoming);
     } catch (IOException | ClassTransformationException e) {
       throw new SchemaChangeException("Exception when creating the schema", e);
     }
@@ -244,18 +248,17 @@ public class WindowFrameRecordBatch extends AbstractRecordBatch<WindowPOP> {
     }
   }
 
-  private WindowFramer createFramer(VectorAccessible batch) throws SchemaChangeException, IOException, ClassTransformationException {
-    assert framer == null : "createFramer should only be called once";
+  private void createFramers(VectorAccessible batch) throws SchemaChangeException, IOException, ClassTransformationException {
+    assert framers == null : "createFramer should only be called once";
 
-    logger.trace("creating framer");
+    logger.trace("creating framer(s)");
 
     final List<LogicalExpression> keyExprs = Lists.newArrayList();
     final List<LogicalExpression> orderExprs = Lists.newArrayList();
     boolean requireFullPartition = false;
 
-    container.clear();
-
-    functions.clear();
+    boolean useDefaultFrame = false; // at least one window function uses the DefaultFrameTemplate
+    boolean useCustomFrame = false; // at least one window function uses the CustomFrameTemplate
 
     hasOrderBy = popConfig.getOrderings().length > 0;
 
@@ -277,12 +280,17 @@ public class WindowFrameRecordBatch extends AbstractRecordBatch<WindowPOP> {
       if (winfun.materialize(ne, container, context.getFunctionRegistry())) {
         functions.add(winfun);
         requireFullPartition |= winfun.requiresFullPartition(hasOrderBy);
+
+        if (winfun.supportsCustomFrames()) {
+          useCustomFrame = true;
+        } else {
+          useDefaultFrame = true;
+        }
       }
     }
 
-    if (container.isSchemaChanged()) {
-      container.buildSchema(BatchSchema.SelectionVectorMode.NONE);
-    }
+    container.buildSchema(BatchSchema.SelectionVectorMode.NONE);
+    container.setRecordCount(0);
 
     // materialize partition by expressions
     for (final NamedExpression ne : popConfig.getWithins()) {
@@ -294,15 +302,31 @@ public class WindowFrameRecordBatch extends AbstractRecordBatch<WindowPOP> {
       orderExprs.add(ExpressionTreeMaterializer.materializeAndCheckErrors(oe.getExpr(), batch, context.getFunctionRegistry()));
     }
 
-    final WindowFramer framer = generateFramer(keyExprs, orderExprs, functions);
-    framer.setup(batches, container, oContext, requireFullPartition);
+    // count how many framers we need
+    int numFramers = useDefaultFrame ? 1 : 0;
+    numFramers += useCustomFrame ? 1 : 0;
+    assert numFramers > 0 : "No framer was needed!";
+
+    framers = new WindowFramer[numFramers];
+    int index = 0;
+    if (useDefaultFrame) {
+      framers[index] = generateFramer(keyExprs, orderExprs, functions, false);
+      framers[index].setup(batches, container, oContext, requireFullPartition);
+      index++;
+    }
 
-    return framer;
+    if (useCustomFrame) {
+      framers[index] = generateFramer(keyExprs, orderExprs, functions, true);
+      framers[index].setup(batches, container, oContext, requireFullPartition);
+    }
   }
 
   private WindowFramer generateFramer(final List<LogicalExpression> keyExprs, final List<LogicalExpression> orderExprs,
-      final List<WindowFunction> functions) throws IOException, ClassTransformationException {
-    final ClassGenerator<WindowFramer> cg = CodeGenerator.getRoot(WindowFramer.TEMPLATE_DEFINITION, context.getFunctionRegistry());
+      final List<WindowFunction> functions, boolean useCustomFrame) throws IOException, ClassTransformationException {
+
+    TemplateClassDefinition<WindowFramer> definition = useCustomFrame ?
+      WindowFramer.FRAME_TEMPLATE_DEFINITION : WindowFramer.NOFRAME_TEMPLATE_DEFINITION;
+    final ClassGenerator<WindowFramer> cg = CodeGenerator.getRoot(definition, context.getFunctionRegistry());
 
     {
       // generating framer.isSamePartition()
@@ -322,7 +346,10 @@ public class WindowFrameRecordBatch extends AbstractRecordBatch<WindowPOP> {
     }
 
     for (final WindowFunction function : functions) {
-      function.generateCode(cg);
+      // only generate code for the proper window functions
+      if (function.supportsCustomFrames() == useCustomFrame) {
+        function.generateCode(cg);
+      }
     }
 
     cg.getBlock("resetValues")._return(JExpr.TRUE);
@@ -356,9 +383,13 @@ public class WindowFrameRecordBatch extends AbstractRecordBatch<WindowPOP> {
   }
 
   private void cleanup() {
-    if (framer != null) {
-      framer.cleanup();
-      framer = null;
+
+    if (framers != null) {
+      for (WindowFramer framer : framers) {
+        framer.cleanup();
+      }
+
+      framers = null;
     }
 
     if (batches != null) {
@@ -383,6 +414,6 @@ public class WindowFrameRecordBatch extends AbstractRecordBatch<WindowPOP> {
 
   @Override
   public int getRecordCount() {
-    return framer.getOutputCount();
+    return framers[0].getOutputCount();
   }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/3d0b4b02/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFramer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFramer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFramer.java
index 719d5a8..9b985c0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFramer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFramer.java
@@ -28,7 +28,8 @@ import javax.inject.Named;
 import java.util.List;
 
 public interface WindowFramer {
-  TemplateClassDefinition<WindowFramer> TEMPLATE_DEFINITION = new TemplateClassDefinition<>(WindowFramer.class, DefaultFrameTemplate.class);
+  TemplateClassDefinition<WindowFramer> NOFRAME_TEMPLATE_DEFINITION = new TemplateClassDefinition<>(WindowFramer.class, NoFrameSupportTemplate.class);
+  TemplateClassDefinition<WindowFramer> FRAME_TEMPLATE_DEFINITION = new TemplateClassDefinition<>(WindowFramer.class, FrameSupportTemplate.class);
 
   void setup(final List<WindowDataBatch> batches, final VectorContainer container, final OperatorContext operatorContext,
              final boolean requireFullPartition) throws SchemaChangeException;

http://git-wip-us.apache.org/repos/asf/drill/blob/3d0b4b02/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFunction.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFunction.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFunction.java
index 548809b..1c71297 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFunction.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFunction.java
@@ -92,6 +92,8 @@ public abstract class WindowFunction {
 
   abstract void generateCode(final ClassGenerator<WindowFramer> cg);
 
+  abstract boolean supportsCustomFrames();
+
   /**
    * @param hasOrderBy window definition contains an ORDER BY clause
    * @return true if this window function requires all batches of current partition to be available before processing
@@ -161,6 +163,11 @@ public abstract class WindowFunction {
     public boolean canDoWork(int numBatchesAvailable, boolean hasOrderBy, boolean frameEndReached, boolean partitionEndReached) {
       return partitionEndReached || (hasOrderBy && frameEndReached);
     }
+
+    @Override
+    boolean supportsCustomFrames() {
+      return true;
+    }
   }
 
   static class Ranking extends WindowFunction {
@@ -218,6 +225,11 @@ public abstract class WindowFunction {
       // otherwise we can process the first batch immediately
       return partitionEndReached || ! requiresFullPartition(hasOrderBy);
     }
+
+    @Override
+    boolean supportsCustomFrames() {
+      return false;
+    }
   }
 
   static class Ntile extends Ranking {
@@ -315,6 +327,11 @@ public abstract class WindowFunction {
     public boolean canDoWork(int numBatchesAvailable, final boolean hasOrderBy, boolean frameEndReached, boolean partitionEndReached) {
       return partitionEndReached || numBatchesAvailable > 1;
     }
+
+    @Override
+    boolean supportsCustomFrames() {
+      return false;
+    }
   }
 
   static class Lag extends WindowFunction {
@@ -381,6 +398,11 @@ public abstract class WindowFunction {
       assert numBatchesAvailable > 0 : "canDoWork() should not be called when numBatchesAvailable == 0";
       return true;
     }
+
+    @Override
+    boolean supportsCustomFrames() {
+      return false;
+    }
   }
 
   static class LastValue extends WindowFunction {
@@ -419,9 +441,8 @@ public abstract class WindowFunction {
       // this will generate the the following, pseudo, code:
       //   write current.source_last_value[frameLastRow] to container.last_value[row]
 
-      final GeneratorMapping EVAL_INSIDE = GeneratorMapping.create("setupReadLastValue", "readLastValue", null, null);
-      final GeneratorMapping EVAL_OUTSIDE = GeneratorMapping.create("setupReadLastValue", "writeLastValue", "resetValues", "cleanup");
-      final MappingSet mappingSet = new MappingSet("index", "outIndex", EVAL_INSIDE, EVAL_OUTSIDE, EVAL_INSIDE);
+      final GeneratorMapping mapping = GeneratorMapping.create("setupReadLastValue", "writeLastValue", "resetValues", "cleanup");
+      final MappingSet mappingSet = new MappingSet("index", "outIndex", mapping, mapping);
 
       cg.setMappingSet(mappingSet);
       cg.addExpr(writeSourceToLastValue);
@@ -436,6 +457,11 @@ public abstract class WindowFunction {
     public boolean canDoWork(int numBatchesAvailable, boolean hasOrderBy, boolean frameEndReached, boolean partitionEndReached) {
       return partitionEndReached || (hasOrderBy && frameEndReached);
     }
+
+    @Override
+    boolean supportsCustomFrames() {
+      return true;
+    }
   }
 
   static class FirstValue extends WindowFunction {
@@ -471,10 +497,10 @@ public abstract class WindowFunction {
     @Override
     void generateCode(final ClassGenerator<WindowFramer> cg) {
       {
-        // in DefaultFrameTemplate we call setupCopyFirstValue:
-        //   setupCopyFirstValue(current, internal)
-        // and copyFirstValueToInternal:
-        //   copyFirstValueToInternal(currentRow, 0)
+        // in DefaultFrameTemplate we call setupSaveFirstValue:
+        //   setupSaveFirstValue(current, internal)
+        // and saveFirstValue:
+        //   saveFirstValue(currentRow, 0)
         //
         // this will generate the the following, pseudo, code:
         //   write current.source[currentRow] to internal.first_value[0]
@@ -482,7 +508,7 @@ public abstract class WindowFunction {
         // so it basically copies the first value of current partition into the first row of internal.first_value
         // this is especially useful when handling multiple batches for the same partition where we need to keep
         // the first value of the partition somewhere after we release the first batch
-        final GeneratorMapping mapping = GeneratorMapping.create("setupCopyFirstValue", "copyFirstValueToInternal", null, null);
+        final GeneratorMapping mapping = GeneratorMapping.create("setupSaveFirstValue", "saveFirstValue", null, null);
         final MappingSet mappingSet = new MappingSet("index", "0", mapping, mapping);
 
         cg.setMappingSet(mappingSet);
@@ -490,8 +516,8 @@ public abstract class WindowFunction {
       }
 
       {
-        // in DefaultFrameTemplate we call setupPasteValues:
-        //   setupPasteValues(internal, container)
+        // in DefaultFrameTemplate we call setupWriteFirstValue:
+        //   setupWriteFirstValue(internal, container)
         // and outputRow:
         //   outputRow(outIndex)
         //
@@ -499,7 +525,7 @@ public abstract class WindowFunction {
         //   write internal.first_value[0] to container.first_value[outIndex]
         //
         // so it basically copies the value stored in internal.first_value's first row into all rows of container.first_value
-        final GeneratorMapping mapping = GeneratorMapping.create("setupPasteValues", "outputRow", "resetValues", "cleanup");
+        final GeneratorMapping mapping = GeneratorMapping.create("setupWriteFirstValue", "outputRow", "resetValues", "cleanup");
         final MappingSet mappingSet = new MappingSet("0", "outIndex", mapping, mapping);
         cg.setMappingSet(mappingSet);
         cg.addExpr(writeFirstValueToFirstValue);
@@ -516,5 +542,10 @@ public abstract class WindowFunction {
       assert numBatchesAvailable > 0 : "canDoWork() should not be called when numBatchesAvailable == 0";
       return true;
     }
+
+    @Override
+    boolean supportsCustomFrames() {
+      return true;
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/3d0b4b02/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/window/TestWindowFrame.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/window/TestWindowFrame.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/window/TestWindowFrame.java
index 6cb0f4a..10abbff 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/window/TestWindowFrame.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/window/TestWindowFrame.java
@@ -98,6 +98,14 @@ public class TestWindowFrame extends BaseTestQuery {
     runTest("b1.p2", 1);
   }
 
+  @Test
+  public void testMultipleFramers() throws Exception {
+    final String window = " OVER(PARTITION BY position_id ORDER by sub)";
+    test("SELECT COUNT(*)"+window+", SUM(salary)"+window+", ROW_NUMBER()"+window+", RANK()"+window+" " +
+      "FROM dfs_test.`"+TEST_RES_PATH+"/window/b1.p1`"
+    );
+  }
+
   /**
    * 2 batches with 2 partitions (position_id column), each batch contains a different partition
    */


Mime
View raw message