drill-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amansi...@apache.org
Subject [2/2] drill git commit: DRILL-3536: Add support for LEAD, LAG, NTILE, FIRST_VALUE and LAST_VALUE window functions
Date Tue, 18 Aug 2015 17:05:12 GMT
DRILL-3536: Add support for LEAD, LAG, NTILE, FIRST_VALUE and LAST_VALUE window functions


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/b55e2328
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/b55e2328
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/b55e2328

Branch: refs/heads/master
Commit: b55e2328d929df5d361c038f63fdeffadb0e544c
Parents: 6813b20
Author: adeneche <adeneche@gmail.com>
Authored: Thu Jul 23 16:38:44 2015 -0700
Committer: Aman Sinha <asinha@maprtech.com>
Committed: Tue Aug 18 08:54:56 2015 -0700

----------------------------------------------------------------------
 .../impl/window/DefaultFrameTemplate.java       | 177 ++++++--
 .../exec/physical/impl/window/Partition.java    |  30 +-
 .../physical/impl/window/WindowDataBatch.java   |  18 +-
 .../impl/window/WindowFrameRecordBatch.java     | 177 ++------
 .../exec/physical/impl/window/WindowFramer.java |   4 +-
 .../physical/impl/window/WindowFunction.java    | 428 +++++++++++++++++++
 .../sql/parser/UnsupportedOperatorsVisitor.java |  19 -
 .../apache/drill/exec/TestWindowFunctions.java  |  65 ---
 .../physical/impl/window/GenerateTestData.java  | 383 +++++++++++++++++
 .../physical/impl/window/TestWindowFrame.java   | 162 +++++++
 .../src/test/resources/window/3604.sql          |   4 +
 .../src/test/resources/window/3605.sql          |   5 +
 .../src/test/resources/window/3605.tsv          |  78 ++++
 .../src/test/resources/window/3606.sql          |   5 +
 .../src/test/resources/window/3606.tsv          |  78 ++++
 .../src/test/resources/window/3648.parquet      | Bin 0 -> 2364 bytes
 .../src/test/resources/window/3648.sql          |   5 +
 .../src/test/resources/window/3648.tsv          |  22 +
 .../src/test/resources/window/b2.p4.ntile.tsv   |  40 ++
 .../test/resources/window/b4.p4.fval.pby.tsv    |  80 ++++
 .../src/test/resources/window/b4.p4.lag.oby.tsv |  80 ++++
 .../test/resources/window/b4.p4.lag.pby.oby.tsv |  80 ++++
 .../test/resources/window/b4.p4.lead.oby.tsv    |  80 ++++
 .../resources/window/b4.p4.lead.pby.oby.tsv     |  80 ++++
 .../resources/window/b4.p4.lval.pby.oby.tsv     |  80 ++++
 .../src/test/resources/window/b4.p4.oby.tsv     | 154 +++----
 .../src/test/resources/window/b4.p4.pby.oby.tsv | 140 +++---
 .../src/test/resources/window/b4.p4.pby.tsv     | 140 +++---
 .../src/test/resources/window/b4.p4.tsv         | 160 +++----
 .../src/test/resources/window/b4.p4/0.data.json |  44 +-
 .../src/test/resources/window/b4.p4/1.data.json |  40 +-
 .../src/test/resources/window/b4.p4/2.data.json |  40 +-
 .../src/test/resources/window/b4.p4/3.data.json |  40 +-
 .../resources/window/fewRowsAllData.parquet     | Bin 0 -> 3990 bytes
 .../src/test/resources/window/fval.alltypes.sql |  14 +
 .../src/test/resources/window/fval.pby.sql      |   4 +
 .../src/test/resources/window/lag.oby.sql       |   3 +
 .../src/test/resources/window/lag.pby.oby.sql   |   3 +
 .../src/test/resources/window/lead.oby.sql      |   3 +
 .../src/test/resources/window/lead.pby.oby.sql  |   3 +
 .../src/test/resources/window/lval.alltypes.sql |  14 +
 .../src/test/resources/window/lval.pby.oby.sql  |   4 +
 .../src/test/resources/window/ntile.sql         |   4 +
 43 files changed, 2353 insertions(+), 637 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/b55e2328/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/DefaultFrameTemplate.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/DefaultFrameTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/DefaultFrameTemplate.java
index 535deaa..83e3754 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/DefaultFrameTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/DefaultFrameTemplate.java
@@ -19,6 +19,7 @@ package org.apache.drill.exec.physical.impl.window;
 
 import org.apache.drill.common.exceptions.DrillException;
 import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.ops.OperatorContext;
 import org.apache.drill.exec.record.TransferPair;
 import org.apache.drill.exec.record.VectorAccessible;
 import org.apache.drill.exec.record.VectorContainer;
@@ -34,9 +35,13 @@ public abstract class DefaultFrameTemplate implements WindowFramer {
   private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DefaultFrameTemplate.class);
 
   private VectorContainer container;
+  private VectorContainer internal;
+  private boolean lagCopiedToInternal;
   private List<WindowDataBatch> batches;
   private int outputCount; // number of rows in currently/last processed batch
 
+  private int frameLastRow;
+
   /**
    * current partition being processed.</p>
    * Can span over multiple batches, so we may need to keep it between calls to doWork()
@@ -44,10 +49,15 @@ public abstract class DefaultFrameTemplate implements WindowFramer {
   private Partition partition;
 
   @Override
-  public void setup(List<WindowDataBatch> batches, final VectorContainer container) throws SchemaChangeException {
+  public void setup(final List<WindowDataBatch> batches, final VectorContainer container, final OperatorContext oContext)
+      throws SchemaChangeException {
     this.container = container;
     this.batches = batches;
 
+    internal = new VectorContainer(oContext);
+    allocateInternal();
+    lagCopiedToInternal = false;
+
     outputCount = 0;
     partition = null;
   }
@@ -58,6 +68,14 @@ public abstract class DefaultFrameTemplate implements WindowFramer {
     }
   }
 
+  private void allocateInternal() {
+    // TODO we don't need to allocate all container's vectors, we can pass a specific list of vectors to allocate internally
+    for (VectorWrapper<?> w : container) {
+      ValueVector vv = internal.addOrGet(w.getField());
+      vv.allocateNew();
+    }
+  }
+
   /**
    * processes all rows of current batch:
    * <ul>
@@ -87,15 +105,12 @@ public abstract class DefaultFrameTemplate implements WindowFramer {
         // we have a pending window we need to handle from a previous call to doWork()
         logger.trace("we have a pending partition {}", partition);
       } else {
-        final int length = computePartitionSize(currentRow);
-        partition = new Partition(length);
-        setupWrite(current, container);
+        newPartition(current, currentRow);
       }
 
       currentRow = processPartition(currentRow);
       if (partition.isDone()) {
-        partition = null;
-        resetValues();
+        cleanPartition();
       }
     }
 
@@ -117,6 +132,21 @@ public abstract class DefaultFrameTemplate implements WindowFramer {
     logger.trace("WindowFramer.doWork() END");
   }
 
+  private void newPartition(final WindowDataBatch current, final int currentRow) throws SchemaChangeException {
+    final long length = computePartitionSize(currentRow);
+    partition = new Partition(length);
+    setupPartition(current, container);
+    setupCopyFirstValue(current, internal);
+    copyFirstValueToInternal(currentRow);
+  }
+
+  private void cleanPartition() {
+    partition = null;
+    resetValues();
+    internal.zeroVectors();
+    lagCopiedToInternal = false;
+  }
+
   /**
    * process all rows (computes and writes aggregation values) of current batch that are part of current partition.
    * @param currentRow first unprocessed row
@@ -126,33 +156,82 @@ public abstract class DefaultFrameTemplate implements WindowFramer {
   private int processPartition(final int currentRow) throws DrillException {
     logger.trace("process partition {}, currentRow: {}, outputCount: {}", partition, currentRow, outputCount);
 
+    final VectorAccessible current = getCurrent();
+    setupCopyNext(current, container);
+    setupPasteValues(internal, container);
+
+    copyPrevFromInternal();
+
+    // copy remaining from current
+    setupCopyPrev(current, container);
+
     int row = currentRow;
+
+    // process all rows except the last one of the batch/partition
     while (row < outputCount && !partition.isDone()) {
-      if (partition.isFrameDone()) {
-        // because all peer rows share the same frame, we only need to compute and aggregate the frame once
-        partition.newFrame(countPeers(row));
-        aggregatePeers(row);
+      if (row != currentRow) { // this is not the first row of the partition
+        copyPrev(row - 1, row);
       }
 
-      outputAggregatedValues(row, partition);
+      processRow(row);
+
+      if (row < outputCount - 1 && !partition.isDone()) {
+        copyNext(row + 1, row);
+      }
 
-      partition.rowAggregated();
       row++;
     }
 
+    // if we didn't reach the end of partition yet
+    if (!partition.isDone() && batches.size() > 1) {
+      // copy next value onto the current one
+      setupCopyNext(batches.get(1), container);
+      copyNext(0, row - 1);
+
+      copyPrevToInternal(current, row);
+    }
+
     return row;
   }
 
+  private void copyPrevToInternal(VectorAccessible current, int row) {
+    logger.trace("copying {} into internal", row - 1);
+    setupCopyPrev(current, internal);
+    copyPrev(row - 1, 0);
+    lagCopiedToInternal = true;
+  }
+
+  private void copyPrevFromInternal() {
+    if (lagCopiedToInternal) {
+      setupCopyFromInternal(internal, container);
+      copyFromInternal(0, 0);
+      lagCopiedToInternal = false;
+    }
+  }
+
+  private void processRow(final int row) throws DrillException {
+    if (partition.isFrameDone()) {
+      // because all peer rows share the same frame, we only need to compute and aggregate the frame once
+      partition.newFrame(countPeers(row));
+      aggregatePeers(row);
+    }
+
+    outputRow(row, partition);
+    writeLastValue(frameLastRow, row);
+
+    partition.rowAggregated();
+  }
+
   /**
    * @return number of rows that are part of the partition starting at row start of first batch
    */
-  private int computePartitionSize(final int start) {
+  private long computePartitionSize(final int start) {
     logger.trace("compute partition size starting from {} on {} batches", start, batches.size());
 
     // current partition always starts from first batch
     final VectorAccessible first = getCurrent();
 
-    int length = 0;
+    long length = 0;
 
     // count all rows that are in the same partition of start
     // keep increasing length until we find first row of next partition or we reach the very
@@ -189,7 +268,7 @@ public abstract class DefaultFrameTemplate implements WindowFramer {
       final int recordCount = batch.getRecordCount();
 
       // for every remaining row in the partition, count it if it's a peer row
-      final int remaining = partition.getRemaining();
+      final long remaining = partition.getRemaining();
       for (int row = (batch == first) ? start : 0; row < recordCount && length < remaining; row++, length++) {
         if (!isPeer(start, first, row, batch)) {
           return length;
@@ -213,19 +292,24 @@ public abstract class DefaultFrameTemplate implements WindowFramer {
     // start processing first batch and, if necessary, move to next batches
     Iterator<WindowDataBatch> iterator = batches.iterator();
     WindowDataBatch current = iterator.next();
-    setupRead(current, container);
+    setupEvaluatePeer(current, container);
 
     final int peers = partition.getPeers();
-    for (int i = 0, row = currentRow; i < peers; i++, row++) {
+    int row = currentRow;
+    for (int i = 0; i < peers; i++, row++) {
       if (row >= current.getRecordCount()) {
         // we reached the end of the current batch, move to the next one
         current = iterator.next();
-        setupRead(current, container);
+        setupEvaluatePeer(current, container);
         row = 0;
       }
 
-      aggregateRecord(row);
+      evaluatePeer(row);
     }
+
+    // last row of current frame
+    setupReadLastValue(current, container);
+    frameLastRow = row - 1;
   }
 
   @Override
@@ -263,31 +347,66 @@ public abstract class DefaultFrameTemplate implements WindowFramer {
     return outputCount;
   }
 
+  // we need this abstract method for code generation
   @Override
   public void cleanup() {
+    logger.trace("clearing internal");
+    internal.clear();
   }
 
   /**
-   * setup incoming container for aggregateRecord()
+   * called once for each peer row of the current frame.
+   * @param index of row to aggregate
    */
-  public abstract void setupRead(@Named("incoming") VectorAccessible incoming, @Named("outgoing") VectorAccessible outgoing) throws SchemaChangeException;
+  public abstract void evaluatePeer(@Named("index") int index);
+  public abstract void setupEvaluatePeer(@Named("incoming") VectorAccessible incoming, @Named("outgoing") VectorAccessible outgoing) throws SchemaChangeException;
+
+  public abstract void setupReadLastValue(@Named("incoming") VectorAccessible incoming, @Named("outgoing") VectorAccessible outgoing) throws SchemaChangeException;
+  public abstract void writeLastValue(@Named("index") int index, @Named("outIndex") int outIndex);
+
+  public abstract void setupCopyFirstValue(@Named("incoming") VectorAccessible incoming, @Named("outgoing") VectorAccessible outgoing) throws SchemaChangeException;
+  public abstract void copyFirstValueToInternal(@Named("index") int index);
 
   /**
-   * setup outgoing container for outputAggregatedValues. This will also reset the aggregations in most cases.
+   * called once for each row after we evaluate all peer rows. Used to write a value in the row
+   *
+   * @param outIndex index of row
+   * @param partition object used by "computed" window functions
    */
-  public abstract void setupWrite(@Named("incoming") WindowDataBatch incoming, @Named("outgoing") VectorAccessible outgoing) throws SchemaChangeException;
+  public abstract void outputRow(@Named("outIndex") int outIndex, @Named("partition") Partition partition);
 
   /**
-   * aggregates a row from the incoming container
-   * @param index of row to aggregate
+   * Called once per partition, before processing the partition. Used to setup read/write vectors
+   * @param incoming batch we will read from
+   * @param outgoing batch we will be writing to
+   *
+   * @throws SchemaChangeException
    */
-  public abstract void aggregateRecord(@Named("index") int index);
+  public abstract void setupPartition(@Named("incoming") WindowDataBatch incoming,
+                                      @Named("outgoing") VectorAccessible outgoing) throws SchemaChangeException;
 
   /**
-   * writes aggregated values to row of outgoing container
-   * @param outIndex index of row
+   * copies value(s) from inIndex row to outIndex row. Mostly used by LEAD. inIndex always points to the row next to
+   * outIndex
+   * @param inIndex source row of the copy
+   * @param outIndex destination row of the copy.
    */
-  public abstract void outputAggregatedValues(@Named("outIndex") int outIndex, @Named("partition") Partition partition);
+  public abstract void copyNext(@Named("inIndex") int inIndex, @Named("outIndex") int outIndex);
+  public abstract void setupCopyNext(@Named("incoming") VectorAccessible incoming, @Named("outgoing") VectorAccessible outgoing);
+
+  public abstract void setupPasteValues(@Named("incoming") VectorAccessible incoming, @Named("outgoing") VectorAccessible outgoing);
+
+  /**
+   * copies value(s) from inIndex row to outIndex row. Mostly used by LAG. inIndex always points to the previous row
+   *
+   * @param inIndex source row of the copy
+   * @param outIndex destination row of the copy.
+   */
+  public abstract void copyPrev(@Named("inIndex") int inIndex, @Named("outIndex") int outIndex);
+  public abstract void setupCopyPrev(@Named("incoming") VectorAccessible incoming, @Named("outgoing") VectorAccessible outgoing);
+
+  public abstract void copyFromInternal(@Named("inIndex") int inIndex, @Named("outIndex") int outIndex);
+  public abstract void setupCopyFromInternal(@Named("incoming") VectorAccessible incoming, @Named("outgoing") VectorAccessible outgoing);
 
   /**
    * reset all window functions

http://git-wip-us.apache.org/repos/asf/drill/blob/b55e2328/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/Partition.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/Partition.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/Partition.java
index 8d6728e..52c7790 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/Partition.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/Partition.java
@@ -21,9 +21,12 @@ package org.apache.drill.exec.physical.impl.window;
  * Used internally to keep track of partitions and frames
  */
 public class Partition {
-  private final int length; // size of this partition
-  private int remaining;
-  private int peers;
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(Partition.class);
+
+  private final long length; // size of this partition
+  private long remaining;
+
+  private int peers; // remaining non-processed peers in current frame
 
   // we keep these attributes public because the generated code needs to access them
   public int row_number;
@@ -35,7 +38,7 @@ public class Partition {
   /**
    * @return number of rows not yet aggregated in this partition
    */
-  public int getRemaining() {
+  public long getRemaining() {
     return remaining;
   }
 
@@ -46,7 +49,7 @@ public class Partition {
     return peers;
   }
 
-  public Partition(int length) {
+  public Partition(long length) {
     this.length = length;
     remaining = length;
     row_number = 1;
@@ -61,6 +64,7 @@ public class Partition {
 
   public void newFrame(int peers) {
     this.peers = peers;
+
     rank = row_number; // rank = row number of 1st peer
     dense_rank++;
     percent_rank = length > 1 ? (double) (rank - 1) / (length - 1) : 0;
@@ -71,6 +75,22 @@ public class Partition {
     return remaining == 0;
   }
 
+  public int ntile(int numTiles) {
+    long mod = length % numTiles;
+    double ceil = Math.ceil((double) length / numTiles);
+
+    int out;
+    if (row_number <= mod * ceil) {
+      out = (int) Math.ceil(row_number / ceil);
+    } else {
+      double floor = Math.floor((double) length / numTiles);
+      out = (int) Math.ceil((row_number - mod) / floor);
+    }
+
+    logger.trace("NTILE(row_number = {}, nt = {}, ct = {}) = {}", row_number, numTiles, length, out);
+    return out;
+  }
+
   public boolean isFrameDone() {
     return peers == 0;
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/b55e2328/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowDataBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowDataBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowDataBatch.java
index 5045cb3..b2befa3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowDataBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowDataBatch.java
@@ -19,7 +19,7 @@ package org.apache.drill.exec.physical.impl.window;
 
 import com.google.common.collect.Lists;
 import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.OperatorContext;
 import org.apache.drill.exec.record.BatchSchema;
 import org.apache.drill.exec.record.TransferPair;
 import org.apache.drill.exec.record.TypedFieldId;
@@ -33,12 +33,12 @@ import java.util.List;
 
 public class WindowDataBatch implements VectorAccessible {
 
-  private final FragmentContext context;
+  private final OperatorContext oContext;
   private final VectorContainer container;
   private final int recordCount;
 
-  public WindowDataBatch(final VectorAccessible batch, final FragmentContext context) {
-    this.context = context;
+  public WindowDataBatch(final VectorAccessible batch, final OperatorContext oContext) {
+    this.oContext = oContext;
     recordCount = batch.getRecordCount();
 
     List<ValueVector> vectors = Lists.newArrayList();
@@ -52,14 +52,18 @@ public class WindowDataBatch implements VectorAccessible {
       vectors.add(tp.getTo());
     }
 
-    container = new VectorContainer();
+    container = new VectorContainer(oContext);
     container.addCollection(vectors);
     container.setRecordCount(recordCount);
     container.buildSchema(batch.getSchema().getSelectionVectorMode());
   }
 
-  public FragmentContext getContext() {
-    return context;
+  public OperatorContext getContext() {
+    return oContext;
+  }
+
+  public VectorContainer getContainer() {
+    return container;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/drill/blob/b55e2328/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFrameRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFrameRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFrameRecordBatch.java
index 9c8cfc0..f1da1db 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFrameRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFrameRecordBatch.java
@@ -19,21 +19,13 @@ package org.apache.drill.exec.physical.impl.window;
 
 import java.io.IOException;
 import java.util.List;
-import java.util.Map;
 
-import com.google.common.collect.Maps;
-import com.sun.codemodel.JExpression;
-import com.sun.codemodel.JInvocation;
-import com.sun.codemodel.JVar;
 import org.apache.drill.common.exceptions.DrillException;
-import org.apache.drill.common.expression.ErrorCollector;
-import org.apache.drill.common.expression.ErrorCollectorImpl;
+import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.common.expression.FunctionCall;
 import org.apache.drill.common.expression.LogicalExpression;
 import org.apache.drill.common.logical.data.NamedExpression;
 import org.apache.drill.common.logical.data.Order;
-import org.apache.drill.common.types.TypeProtos;
-import org.apache.drill.common.types.Types;
 import org.apache.drill.exec.compile.sig.GeneratorMapping;
 import org.apache.drill.exec.compile.sig.MappingSet;
 import org.apache.drill.exec.exception.ClassTransformationException;
@@ -41,19 +33,15 @@ import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.expr.ClassGenerator;
 import org.apache.drill.exec.expr.CodeGenerator;
 import org.apache.drill.exec.expr.ExpressionTreeMaterializer;
-import org.apache.drill.exec.expr.ValueVectorWriteExpression;
 import org.apache.drill.exec.expr.fn.FunctionGenerationHelper;
 import org.apache.drill.exec.memory.OutOfMemoryException;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.physical.config.WindowPOP;
 import org.apache.drill.exec.record.AbstractRecordBatch;
 import org.apache.drill.exec.record.BatchSchema;
-import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.record.RecordBatch;
-import org.apache.drill.exec.record.TypedFieldId;
 import org.apache.drill.exec.record.VectorAccessible;
 import org.apache.drill.exec.record.VectorWrapper;
-import org.apache.drill.exec.vector.ValueVector;
 
 import com.google.common.collect.Lists;
 import com.sun.codemodel.JExpr;
@@ -61,7 +49,6 @@ import com.sun.codemodel.JExpr;
 /**
  * support for OVER(PARTITION BY expression1,expression2,... [ORDER BY expressionA, expressionB,...])
  *
- * Doesn't support distinct partitions: multiple window with different PARTITION BY clauses.
  */
 public class WindowFrameRecordBatch extends AbstractRecordBatch<WindowPOP> {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(WindowFrameRecordBatch.class);
@@ -73,45 +60,6 @@ public class WindowFrameRecordBatch extends AbstractRecordBatch<WindowPOP> {
   private boolean noMoreBatches;
   private BatchSchema schema;
 
-  /**
-   * Describes supported window functions and if they output FLOAT8 or BIGINT
-   */
-  private enum WindowFunction {
-    ROW_NUMBER(false),
-    RANK(false),
-    DENSE_RANK(false),
-    PERCENT_RANK(true),
-    CUME_DIST(true);
-
-    private final boolean useDouble;
-
-    WindowFunction(boolean useDouble) {
-      this.useDouble = useDouble;
-    }
-
-    public TypeProtos.MajorType getMajorType() {
-      return useDouble ? Types.required(TypeProtos.MinorType.FLOAT8) : Types.required(TypeProtos.MinorType.BIGINT);
-    }
-
-    /**
-     * Extract the WindowFunction corresponding to the logical expression
-     * @param expr logical expression
-     * @return WindowFunction or null if the logical expression is not a window function
-     */
-    public static WindowFunction fromExpression(final LogicalExpression expr) {
-      if (!(expr instanceof FunctionCall)) {
-        return null;
-      }
-
-      final String name = ((FunctionCall) expr).getName();
-      try {
-        return WindowFunction.valueOf(name.toUpperCase());
-      } catch (IllegalArgumentException e) {
-        return null; // not a window function
-      }
-    }
-  }
-
   public WindowFrameRecordBatch(WindowPOP popConfig, FragmentContext context, RecordBatch incoming) throws OutOfMemoryException {
     super(popConfig, context);
     this.incoming = incoming;
@@ -152,9 +100,6 @@ public class WindowFrameRecordBatch extends AbstractRecordBatch<WindowPOP> {
    * when innerNext() is called:
    *  we return NONE
    * </pre></p>
-   * The previous scenario applies when we don't have an ORDER BY clause, otherwise a batch can be processed
-   * as soon as we reach the final peer row of the batch's last row (we find the end of the last frame of the batch).
-   * </p>
    * Because we only support the default frame, we don't need to reset the aggregations until we reach the end of
    * a partition. We can safely free a batch as soon as it has been processed.
    */
@@ -191,7 +136,7 @@ public class WindowFrameRecordBatch extends AbstractRecordBatch<WindowPOP> {
           }
         case OK:
           if (incoming.getRecordCount() > 0) {
-            batches.add(new WindowDataBatch(incoming, context));
+            batches.add(new WindowDataBatch(incoming, oContext));
           } else {
             logger.trace("incoming has 0 records, it won't be added to batches");
           }
@@ -247,7 +192,7 @@ public class WindowFrameRecordBatch extends AbstractRecordBatch<WindowPOP> {
     }
 
     if (incoming.getRecordCount() > 0) {
-      batches.add(new WindowDataBatch(incoming, getContext()));
+      batches.add(new WindowDataBatch(incoming, oContext));
     }
   }
 
@@ -256,11 +201,9 @@ public class WindowFrameRecordBatch extends AbstractRecordBatch<WindowPOP> {
 
     logger.trace("creating framer");
 
-    final List<LogicalExpression> aggExprs = Lists.newArrayList();
-    final Map<WindowFunction, TypedFieldId> winExprs = Maps.newHashMap();
     final List<LogicalExpression> keyExprs = Lists.newArrayList();
     final List<LogicalExpression> orderExprs = Lists.newArrayList();
-    final ErrorCollector collector = new ErrorCollectorImpl();
+    final List<WindowFunction> functions = Lists.newArrayList();
 
     container.clear();
 
@@ -271,24 +214,16 @@ public class WindowFrameRecordBatch extends AbstractRecordBatch<WindowPOP> {
 
     // add aggregation vectors to the container, and materialize corresponding expressions
     for (final NamedExpression ne : popConfig.getAggregations()) {
-      final WindowFunction wf = WindowFunction.fromExpression(ne.getExpr());
-
-      if (wf != null) {
-        // add corresponding ValueVector to container
-        final MaterializedField outputField = MaterializedField.create(ne.getRef(), wf.getMajorType());
-        ValueVector vv = container.addOrGet(outputField);
-        vv.allocateNew();
-        winExprs.put(wf, container.getValueVectorId(ne.getRef()));
-      } else {
-        // evaluate expression over saved batch
-        final LogicalExpression expr = ExpressionTreeMaterializer.materialize(ne.getExpr(), batch, collector, context.getFunctionRegistry());
-
-        // add corresponding ValueVector to container
-        final MaterializedField outputField = MaterializedField.create(ne.getRef(), expr.getMajorType());
-        ValueVector vv = container.addOrGet(outputField);
-        vv.allocateNew();
-        TypedFieldId id = container.getValueVectorId(ne.getRef());
-        aggExprs.add(new ValueVectorWriteExpression(id, expr, true));
+      if (!(ne.getExpr() instanceof FunctionCall)) {
+        throw UserException.functionError()
+          .message("Unsupported window function '%s'", ne.getExpr())
+          .build(logger);
+      }
+
+      final FunctionCall call = (FunctionCall) ne.getExpr();
+      final WindowFunction winfun = WindowFunction.fromExpression(call);
+      if (winfun.materialize(ne, container, context.getFunctionRegistry())) {
+        functions.add(winfun);
       }
     }
 
@@ -298,42 +233,48 @@ public class WindowFrameRecordBatch extends AbstractRecordBatch<WindowPOP> {
 
     // materialize partition by expressions
     for (final NamedExpression ne : popConfig.getWithins()) {
-      keyExprs.add(
-        ExpressionTreeMaterializer.materialize(ne.getExpr(), batch, collector, context.getFunctionRegistry()));
+      keyExprs.add(ExpressionTreeMaterializer.materializeAndCheckErrors(ne.getExpr(), batch, context.getFunctionRegistry()));
     }
 
     // materialize order by expressions
     for (final Order.Ordering oe : popConfig.getOrderings()) {
-      orderExprs.add(
-        ExpressionTreeMaterializer.materialize(oe.getExpr(), batch, collector, context.getFunctionRegistry()));
+      orderExprs.add(ExpressionTreeMaterializer.materializeAndCheckErrors(oe.getExpr(), batch, context.getFunctionRegistry()));
     }
 
-    if (collector.hasErrors()) {
-      throw new SchemaChangeException("Failure while materializing expression. " + collector.toErrorString());
-    }
+    final WindowFramer framer = generateFramer(keyExprs, orderExprs, functions);
+    framer.setup(batches, container, oContext);
 
-    // generate framer code
+    return framer;
+  }
+
+  private WindowFramer generateFramer(final List<LogicalExpression> keyExprs, final List<LogicalExpression> orderExprs,
+      final List<WindowFunction> functions) throws IOException, ClassTransformationException {
     final ClassGenerator<WindowFramer> cg = CodeGenerator.getRoot(WindowFramer.TEMPLATE_DEFINITION, context.getFunctionRegistry());
-    setupIsFunction(cg, keyExprs, isaB1, isaB2); // setup for isSamePartition()
-    setupIsFunction(cg, orderExprs, isaP1, isaP2); // setup for isPeer()
-    setupOutputAggregatedValues(cg, aggExprs);
-    setupAddWindowValue(cg, winExprs);
 
-    cg.getBlock("resetValues")._return(JExpr.TRUE);
+    {
+      // generating framer.isSamePartition()
+      final GeneratorMapping IS_SAME_PARTITION_READ = GeneratorMapping.create("isSamePartition", "isSamePartition", null, null);
+      final MappingSet isaB1 = new MappingSet("b1Index", null, "b1", null, IS_SAME_PARTITION_READ, IS_SAME_PARTITION_READ);
+      final MappingSet isaB2 = new MappingSet("b2Index", null, "b2", null, IS_SAME_PARTITION_READ, IS_SAME_PARTITION_READ);
+      setupIsFunction(cg, keyExprs, isaB1, isaB2);
+    }
 
-    WindowFramer framer = context.getImplementationClass(cg);
-    framer.setup(batches, container);
+    {
+      // generating framer.isPeer()
+      final GeneratorMapping IS_SAME_PEER_READ = GeneratorMapping.create("isPeer", "isPeer", null, null);
+      final MappingSet isaP1 = new MappingSet("b1Index", null, "b1", null, IS_SAME_PEER_READ, IS_SAME_PEER_READ);
+      final MappingSet isaP2 = new MappingSet("b2Index", null, "b2", null, IS_SAME_PEER_READ, IS_SAME_PEER_READ);
+      setupIsFunction(cg, orderExprs, isaP1, isaP2);
+    }
 
-    return framer;
-  }
+    for (final WindowFunction function : functions) {
+      function.generateCode(cg);
+    }
 
-  private static final GeneratorMapping IS_SAME_RECORD_BATCH_DATA_READ = GeneratorMapping.create("isSamePartition", "isSamePartition", null, null);
-  private final MappingSet isaB1 = new MappingSet("b1Index", null, "b1", null, IS_SAME_RECORD_BATCH_DATA_READ, IS_SAME_RECORD_BATCH_DATA_READ);
-  private final MappingSet isaB2 = new MappingSet("b2Index", null, "b2", null, IS_SAME_RECORD_BATCH_DATA_READ, IS_SAME_RECORD_BATCH_DATA_READ);
+    cg.getBlock("resetValues")._return(JExpr.TRUE);
 
-  private static final GeneratorMapping IS_SAME_PEER = GeneratorMapping.create("isPeer", "isPeer", null, null);
-  private final MappingSet isaP1 = new MappingSet("b1Index", null, "b1", null, IS_SAME_PEER, IS_SAME_PEER);
-  private final MappingSet isaP2 = new MappingSet("b2Index", null, "b2", null, IS_SAME_PEER, IS_SAME_PEER);
+    return context.getImplementationClass(cg);
+  }
 
   /**
    * setup comparison functions isSamePartition and isPeer
@@ -342,6 +283,10 @@ public class WindowFrameRecordBatch extends AbstractRecordBatch<WindowPOP> {
                                final MappingSet leftMapping, final MappingSet rightMapping) {
     cg.setMappingSet(leftMapping);
     for (LogicalExpression expr : exprs) {
+      if (expr == null) {
+        continue;
+      }
+
       cg.setMappingSet(leftMapping);
       ClassGenerator.HoldingContainer first = cg.addExpr(expr, false);
       cg.setMappingSet(rightMapping);
@@ -356,34 +301,6 @@ public class WindowFrameRecordBatch extends AbstractRecordBatch<WindowPOP> {
     cg.getEvalBlock()._return(JExpr.TRUE);
   }
 
-  private static final GeneratorMapping EVAL_INSIDE = GeneratorMapping.create("setupRead", "aggregateRecord", null, null);
-  private static final GeneratorMapping EVAL_OUTSIDE = GeneratorMapping.create("setupWrite", "outputAggregatedValues", "resetValues", "cleanup");
-  private final MappingSet eval = new MappingSet("index", "outIndex", EVAL_INSIDE, EVAL_OUTSIDE, EVAL_INSIDE);
-
-  /**
-   * setup for aggregateRecord() and outputAggregatedValues()
-   */
-  private void setupOutputAggregatedValues(ClassGenerator<WindowFramer> cg, List<LogicalExpression> valueExprs) {
-    cg.setMappingSet(eval);
-    for (LogicalExpression ex : valueExprs) {
-      cg.addExpr(ex);
-    }
-  }
-
-  /**
-   * generate code to write "computed" window function values into their respective value vectors
-   */
-  private void setupAddWindowValue(final ClassGenerator<WindowFramer> cg, final Map<WindowFunction, TypedFieldId> functions) {
-    cg.setMappingSet(eval);
-    for (WindowFunction function : functions.keySet()) {
-      final JVar vv = cg.declareVectorValueSetupAndMember(cg.getMappingSet().getOutgoing(), functions.get(function));
-      final JExpression outIndex = cg.getMappingSet().getValueWriteIndex();
-      JInvocation setMethod = vv.invoke("getMutator").invoke("setSafe").arg(outIndex).arg(
-        JExpr.direct("partition." + function.name().toLowerCase()));
-      cg.getEvalBlock().add(setMethod);
-    }
-  }
-
   private void cleanup() {
     if (framer != null) {
       framer.cleanup();

http://git-wip-us.apache.org/repos/asf/drill/blob/b55e2328/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFramer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFramer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFramer.java
index 69866af..1d1d746 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFramer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFramer.java
@@ -20,6 +20,7 @@ package org.apache.drill.exec.physical.impl.window;
 import org.apache.drill.common.exceptions.DrillException;
 import org.apache.drill.exec.compile.TemplateClassDefinition;
 import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.ops.OperatorContext;
 import org.apache.drill.exec.record.VectorContainer;
 
 import java.util.List;
@@ -27,7 +28,8 @@ import java.util.List;
 public interface WindowFramer {
   TemplateClassDefinition<WindowFramer> TEMPLATE_DEFINITION = new TemplateClassDefinition<>(WindowFramer.class, DefaultFrameTemplate.class);
 
-  void setup(List<WindowDataBatch> batches, final VectorContainer container) throws SchemaChangeException;
+  void setup(final List<WindowDataBatch> batches, final VectorContainer container, final OperatorContext operatorContext)
+    throws SchemaChangeException;
 
   /**
    * process the inner batch and write the aggregated values in the container

http://git-wip-us.apache.org/repos/asf/drill/blob/b55e2328/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFunction.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFunction.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFunction.java
new file mode 100644
index 0000000..c86cd15
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFunction.java
@@ -0,0 +1,428 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.window;
+
+import com.sun.codemodel.JExpr;
+import com.sun.codemodel.JExpression;
+import com.sun.codemodel.JInvocation;
+import com.sun.codemodel.JVar;
+import org.apache.drill.common.expression.FunctionCall;
+import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.common.expression.ValueExpressions;
+import org.apache.drill.common.logical.data.NamedExpression;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.common.types.Types;
+import org.apache.drill.exec.compile.sig.GeneratorMapping;
+import org.apache.drill.exec.compile.sig.MappingSet;
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.expr.ClassGenerator;
+import org.apache.drill.exec.expr.ExpressionTreeMaterializer;
+import org.apache.drill.exec.expr.ValueVectorReadExpression;
+import org.apache.drill.exec.expr.ValueVectorWriteExpression;
+import org.apache.drill.exec.expr.fn.FunctionLookupContext;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.record.TypedFieldId;
+import org.apache.drill.exec.record.VectorContainer;
+
+public abstract class WindowFunction {
+  public enum Type {
+    ROW_NUMBER,
+    RANK,
+    DENSE_RANK,
+    PERCENT_RANK,
+    CUME_DIST,
+    LEAD,
+    LAG,
+    FIRST_VALUE,
+    LAST_VALUE,
+    NTILE,
+    AGGREGATE
+  }
+
+  final Type type;
+
+  WindowFunction(Type type) {
+    this.type = type;
+  }
+
+  static WindowFunction fromExpression(final FunctionCall call) {
+    final String name = call.getName();
+    Type type;
+    try {
+      type = Type.valueOf(name.toUpperCase());
+    } catch (IllegalArgumentException e) {
+      type = Type.AGGREGATE;
+    }
+
+    switch (type) {
+      case AGGREGATE:
+        return new WindowAggregate();
+      case LEAD:
+        return new Lead();
+      case LAG:
+        return new Lag();
+      case FIRST_VALUE:
+        return new FirstValue();
+      case LAST_VALUE:
+        return new LastValue();
+      case NTILE:
+        return new Ntile();
+      default:
+        return new Ranking(type);
+    }
+  }
+
+  abstract void generateCode(final ClassGenerator<WindowFramer> cg);
+
+  abstract boolean materialize(final NamedExpression ne, final VectorContainer batch, final FunctionLookupContext registry)
+    throws SchemaChangeException;
+
+  static class WindowAggregate extends WindowFunction {
+
+    private ValueVectorWriteExpression writeAggregationToOutput;
+
+    WindowAggregate() {
+      super(Type.AGGREGATE);
+    }
+
+    @Override
+    boolean materialize(final NamedExpression ne, final VectorContainer batch, final FunctionLookupContext registry)
+        throws SchemaChangeException {
+      final LogicalExpression aggregate = ExpressionTreeMaterializer.materializeAndCheckErrors(ne.getExpr(), batch, registry);
+      if (aggregate == null) {
+        return false;
+      }
+
+      // add corresponding ValueVector to container
+      final MaterializedField output = MaterializedField.create(ne.getRef(), aggregate.getMajorType());
+      batch.addOrGet(output).allocateNew();
+      TypedFieldId outputId = batch.getValueVectorId(ne.getRef());
+      writeAggregationToOutput = new ValueVectorWriteExpression(outputId, aggregate, true);
+
+      return true;
+    }
+
+    @Override
+    void generateCode(ClassGenerator<WindowFramer> cg) {
+      final GeneratorMapping EVAL_INSIDE = GeneratorMapping.create("setupEvaluatePeer", "evaluatePeer", null, null);
+      final GeneratorMapping EVAL_OUTSIDE = GeneratorMapping.create("setupPartition", "outputRow", "resetValues", "cleanup");
+      final MappingSet mappingSet = new MappingSet("index", "outIndex", EVAL_INSIDE, EVAL_OUTSIDE, EVAL_INSIDE);
+
+      cg.setMappingSet(mappingSet);
+      cg.addExpr(writeAggregationToOutput);
+    }
+  }
+
+  static class Ranking extends WindowFunction {
+
+    protected TypedFieldId fieldId;
+
+    Ranking(final Type type) {
+      super(type);
+    }
+
+    private TypeProtos.MajorType getMajorType() {
+      if (type == Type.CUME_DIST || type == Type.PERCENT_RANK) {
+        return Types.required(TypeProtos.MinorType.FLOAT8);
+      }
+      return Types.required(TypeProtos.MinorType.BIGINT);
+    }
+
+    private String getName() {
+      return type.name().toLowerCase();
+    }
+
+    @Override
+    void generateCode(ClassGenerator<WindowFramer> cg) {
+      final GeneratorMapping mapping = GeneratorMapping.create("setupPartition", "outputRow", "resetValues", "cleanup");
+      final MappingSet mappingSet = new MappingSet(null, "outIndex", mapping, mapping);
+
+      cg.setMappingSet(mappingSet);
+      final JVar vv = cg.declareVectorValueSetupAndMember(cg.getMappingSet().getOutgoing(), fieldId);
+      final JExpression outIndex = cg.getMappingSet().getValueWriteIndex();
+      JInvocation setMethod = vv.invoke("getMutator").invoke("setSafe").arg(outIndex).arg(JExpr.direct("partition." + getName()));
+
+      cg.getEvalBlock().add(setMethod);
+    }
+
+    @Override
+    boolean materialize(final NamedExpression ne, final VectorContainer batch, FunctionLookupContext registry)
+        throws SchemaChangeException {
+      final MaterializedField outputField = MaterializedField.create(ne.getRef(), getMajorType());
+      batch.addOrGet(outputField).allocateNew();
+      fieldId = batch.getValueVectorId(ne.getRef());
+      return true;
+    }
+  }
+
+  static class Ntile extends Ranking {
+
+    private int numTiles;
+
+    public Ntile() {
+      super(Type.NTILE);
+    }
+
+    private int numTilesFromExpression(LogicalExpression numTilesExpr) {
+      if ((numTilesExpr instanceof ValueExpressions.IntExpression)) {
+        int nt = ((ValueExpressions.IntExpression) numTilesExpr).getInt();
+        if (nt >= 0) {
+          return nt;
+        }
+      }
+
+      throw new IllegalArgumentException("NTILE only accepts unsigned integer argument");
+    }
+
+    @Override
+    boolean materialize(final NamedExpression ne, final VectorContainer batch, final FunctionLookupContext registry)
+        throws SchemaChangeException {
+      final FunctionCall call = (FunctionCall) ne.getExpr();
+      final LogicalExpression argument = call.args.get(0);
+      final MaterializedField outputField = MaterializedField.create(ne.getRef(), argument.getMajorType());
+      batch.addOrGet(outputField).allocateNew();
+      fieldId = batch.getValueVectorId(ne.getRef());
+
+      numTiles = numTilesFromExpression(argument);
+      return true;
+    }
+
+    @Override
+    void generateCode(ClassGenerator<WindowFramer> cg) {
+      final GeneratorMapping mapping = GeneratorMapping.create("setupPartition", "outputRow", "resetValues", "cleanup");
+      final MappingSet mappingSet = new MappingSet(null, "outIndex", mapping, mapping);
+
+      cg.setMappingSet(mappingSet);
+      final JVar vv = cg.declareVectorValueSetupAndMember(cg.getMappingSet().getOutgoing(), fieldId);
+      final JExpression outIndex = cg.getMappingSet().getValueWriteIndex();
+      JInvocation setMethod = vv.invoke("getMutator").invoke("setSafe").arg(outIndex)
+        .arg(JExpr.direct("partition.ntile(" + numTiles + ")"));
+      cg.getEvalBlock().add(setMethod);
+    }
+  }
+
+  static class Lead extends WindowFunction {
+    private LogicalExpression writeInputToLead;
+
+    public Lead() {
+      super(Type.LEAD);
+    }
+
+    @Override
+    void generateCode(ClassGenerator<WindowFramer> cg) {
+      final GeneratorMapping mapping = GeneratorMapping.create("setupCopyNext", "copyNext", null, null);
+      final MappingSet eval = new MappingSet("inIndex", "outIndex", mapping, mapping);
+
+      cg.setMappingSet(eval);
+      cg.addExpr(writeInputToLead);
+    }
+
+    @Override
+    boolean materialize(final NamedExpression ne, final VectorContainer batch, final FunctionLookupContext registry)
+        throws SchemaChangeException {
+      final FunctionCall call = (FunctionCall) ne.getExpr();
+      final LogicalExpression input = ExpressionTreeMaterializer.materializeAndCheckErrors(call.args.get(0), batch, registry);
+      if (input == null) {
+        return false;
+      }
+
+      // make sure output vector type is Nullable, because we will write a null value in the first row of each partition
+      TypeProtos.MajorType majorType = input.getMajorType();
+      if (majorType.getMode() == TypeProtos.DataMode.REQUIRED) {
+        majorType = Types.optional(majorType.getMinorType());
+      }
+
+      // add corresponding ValueVector to container
+      final MaterializedField output = MaterializedField.create(ne.getRef(), majorType);
+      batch.addOrGet(output).allocateNew();
+      final TypedFieldId outputId =  batch.getValueVectorId(ne.getRef());
+
+      writeInputToLead = new ValueVectorWriteExpression(outputId, input, true);
+      return true;
+    }
+  }
+
+  static class Lag extends WindowFunction {
+    private LogicalExpression writeLagToLag;
+    private LogicalExpression writeInputToLag;
+
+    Lag() {
+      super(Type.LAG);
+    }
+
+    @Override
+    boolean materialize(final NamedExpression ne, final VectorContainer batch, final FunctionLookupContext registry)
+        throws SchemaChangeException {
+      final FunctionCall call = (FunctionCall) ne.getExpr();
+      final LogicalExpression input = ExpressionTreeMaterializer.materializeAndCheckErrors(call.args.get(0), batch, registry);
+      if (input == null) {
+        return false;
+      }
+
+      // make sure output vector type is Nullable, because we will write a null value in the first row of each partition
+      TypeProtos.MajorType majorType = input.getMajorType();
+      if (majorType.getMode() == TypeProtos.DataMode.REQUIRED) {
+        majorType = Types.optional(majorType.getMinorType());
+      }
+
+      // add lag output ValueVector to container
+      final MaterializedField output = MaterializedField.create(ne.getRef(), majorType);
+      batch.addOrGet(output).allocateNew();
+      final TypedFieldId outputId = batch.getValueVectorId(ne.getRef());
+
+      writeInputToLag = new ValueVectorWriteExpression(outputId, input, true);
+      writeLagToLag = new ValueVectorWriteExpression(outputId, new ValueVectorReadExpression(outputId), true);
+      return true;
+    }
+
+    @Override
+    void generateCode(ClassGenerator<WindowFramer> cg) {
+      {
+        // generating lag copyFromInternal
+        final GeneratorMapping mapping = GeneratorMapping.create("setupCopyFromInternal", "copyFromInternal", null, null);
+        final MappingSet mappingSet = new MappingSet("inIndex", "outIndex", mapping, mapping);
+
+        cg.setMappingSet(mappingSet);
+        cg.addExpr(writeLagToLag);
+      }
+
+      {
+        // generating lag copyPrev
+        final GeneratorMapping mapping = GeneratorMapping.create("setupCopyPrev", "copyPrev", null, null);
+        final MappingSet eval = new MappingSet("inIndex", "outIndex", mapping, mapping);
+
+        cg.setMappingSet(eval);
+        cg.addExpr(writeInputToLag);
+      }
+    }
+  }
+
+  static class LastValue extends WindowFunction {
+
+    private LogicalExpression writeSourceToLastValue;
+
+    LastValue() {
+      super(Type.LAST_VALUE);
+    }
+
+    @Override
+    boolean materialize(final NamedExpression ne, final VectorContainer batch, final FunctionLookupContext registry)
+        throws SchemaChangeException {
+      final FunctionCall call = (FunctionCall) ne.getExpr();
+      final LogicalExpression input = ExpressionTreeMaterializer.materializeAndCheckErrors(call.args.get(0), batch, registry);
+      if (input == null) {
+        return false;
+      }
+
+      final MaterializedField output = MaterializedField.create(ne.getRef(), input.getMajorType());
+      batch.addOrGet(output).allocateNew();
+      final TypedFieldId outputId = batch.getValueVectorId(ne.getRef());
+
+      // write incoming.source[inIndex] to outgoing.last_value[outIndex]
+      writeSourceToLastValue = new ValueVectorWriteExpression(outputId, input, true);
+      return true;
+    }
+
+    @Override
+    void generateCode(ClassGenerator<WindowFramer> cg) {
+      // in DefaultFrameTemplate we call setupReadLastValue:
+      //   setupReadLastValue(current, container)
+      // and readLastValue:
+      //   writeLastValue(frameLastRow, row)
+      //
+      // this will generate the the following, pseudo, code:
+      //   write current.source_last_value[frameLastRow] to container.last_value[row]
+
+      final GeneratorMapping EVAL_INSIDE = GeneratorMapping.create("setupReadLastValue", "readLastValue", null, null);
+      final GeneratorMapping EVAL_OUTSIDE = GeneratorMapping.create("setupReadLastValue", "writeLastValue", "resetValues", "cleanup");
+      final MappingSet mappingSet = new MappingSet("index", "outIndex", EVAL_INSIDE, EVAL_OUTSIDE, EVAL_INSIDE);
+
+      cg.setMappingSet(mappingSet);
+      cg.addExpr(writeSourceToLastValue);
+    }
+  }
+
+  static class FirstValue extends WindowFunction {
+
+    private LogicalExpression writeInputToFirstValue;
+
+    private LogicalExpression writeFirstValueToFirstValue;
+
+    FirstValue() {
+      super(Type.FIRST_VALUE);
+    }
+
+    @Override
+    boolean materialize(final NamedExpression ne, final VectorContainer batch, final FunctionLookupContext registry)
+        throws SchemaChangeException {
+      final FunctionCall call = (FunctionCall) ne.getExpr();
+      final LogicalExpression input = ExpressionTreeMaterializer.materializeAndCheckErrors(call.args.get(0), batch, registry);
+      if (input == null) {
+        return false;
+      }
+
+      final MaterializedField output = MaterializedField.create(ne.getRef(), input.getMajorType());
+      batch.addOrGet(output).allocateNew();
+      final TypedFieldId outputId = batch.getValueVectorId(ne.getRef());
+
+      // write incoming.first_value[inIndex] to outgoing.first_value[outIndex]
+      writeFirstValueToFirstValue = new ValueVectorWriteExpression(outputId, new ValueVectorReadExpression(outputId), true);
+      // write incoming.source[inIndex] to outgoing.first_value[outIndex]
+      writeInputToFirstValue = new ValueVectorWriteExpression(outputId, input, true);
+      return true;
+    }
+
+    @Override
+    void generateCode(final ClassGenerator<WindowFramer> cg) {
+      {
+        // in DefaultFrameTemplate we call setupCopyFirstValue:
+        //   setupCopyFirstValue(current, internal)
+        // and copyFirstValueToInternal:
+        //   copyFirstValueToInternal(currentRow, 0)
+        //
+        // this will generate the the following, pseudo, code:
+        //   write current.source[currentRow] to internal.first_value[0]
+        //
+        // so it basically copies the first value of current partition into the first row of internal.first_value
+        // this is especially useful when handling multiple batches for the same partition where we need to keep
+        // the first value of the partition somewhere after we release the first batch
+        final GeneratorMapping mapping = GeneratorMapping.create("setupCopyFirstValue", "copyFirstValueToInternal", null, null);
+        final MappingSet mappingSet = new MappingSet("index", "0", mapping, mapping);
+
+        cg.setMappingSet(mappingSet);
+        cg.addExpr(writeInputToFirstValue);
+      }
+
+      {
+        // in DefaultFrameTemplate we call setupPasteValues:
+        //   setupPasteValues(internal, container)
+        // and outputRow:
+        //   outputRow(outIndex)
+        //
+        // this will generate the the following, pseudo, code:
+        //   write internal.first_value[0] to container.first_value[outIndex]
+        //
+        // so it basically copies the value stored in internal.first_value's first row into all rows of container.first_value
+        final GeneratorMapping mapping = GeneratorMapping.create("setupPasteValues", "outputRow", "resetValues", "cleanup");
+        final MappingSet mappingSet = new MappingSet("0", "outIndex", mapping, mapping);
+        cg.setMappingSet(mappingSet);
+        cg.addExpr(writeFirstValueToFirstValue);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/b55e2328/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/UnsupportedOperatorsVisitor.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/UnsupportedOperatorsVisitor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/UnsupportedOperatorsVisitor.java
index 04d1231..67793bf 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/UnsupportedOperatorsVisitor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/UnsupportedOperatorsVisitor.java
@@ -109,25 +109,6 @@ public class UnsupportedOperatorsVisitor extends SqlShuttle {
           if(over.getOperandList().get(0) instanceof SqlCall) {
             SqlCall function = (SqlCall) over.getOperandList().get(0);
 
-            // DRILL-3195:
-            // The following window functions are temporarily disabled
-            // NTILE(), LAG(), LEAD(), FIRST_VALUE(), LAST_VALUE()
-            String functionName = function.getOperator().getName().toUpperCase();
-            switch(functionName) {
-              case "NTILE":
-              case "LAG":
-              case "LEAD":
-              case "FIRST_VALUE":
-              case "LAST_VALUE":
-                unsupportedOperatorCollector.setException(SqlUnsupportedException.ExceptionType.FUNCTION,
-                    "The window function " + functionName + " is not supported\n" +
-                    "See Apache Drill JIRA: DRILL-3195");
-                throw new UnsupportedOperationException();
-
-              default:
-                break;
-            }
-
             // DRILL-3182
             // Window function with DISTINCT qualifier is temporarily disabled
             if(function.getFunctionQuantifier() != null

http://git-wip-us.apache.org/repos/asf/drill/blob/b55e2328/exec/java-exec/src/test/java/org/apache/drill/exec/TestWindowFunctions.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/TestWindowFunctions.java b/exec/java-exec/src/test/java/org/apache/drill/exec/TestWindowFunctions.java
index 9e09106..ae5562f 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/TestWindowFunctions.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/TestWindowFunctions.java
@@ -66,71 +66,6 @@ public class TestWindowFunctions extends BaseTestQuery {
     }
   }
 
-  @Test(expected = UnsupportedFunctionException.class) // DRILL-3195
-  public void testWindowFunctionNTILE() throws Exception {
-    try {
-      final String query = "explain plan for select NTILE(1) over(partition by n_name order by n_name) \n" +
-          "from cp.`tpch/nation.parquet`";
-
-      test(query);
-    } catch(UserException ex) {
-      throwAsUnsupportedException(ex);
-      throw ex;
-    }
-  }
-
-  @Test(expected = UnsupportedFunctionException.class) // DRILL-3195
-  public void testWindowFunctionLAG() throws Exception {
-    try {
-      final String query = "explain plan for select LAG(n_nationKey, 1) over(partition by n_name order by n_name) \n" +
-          "from cp.`tpch/nation.parquet`";
-
-      test(query);
-    } catch(UserException ex) {
-      throwAsUnsupportedException(ex);
-      throw ex;
-    }
-  }
-
-  @Test(expected = UnsupportedFunctionException.class) // DRILL-3195
-  public void testWindowFunctionLEAD() throws Exception {
-    try {
-      final String query = "explain plan for select LEAD(n_nationKey, 1) over(partition by n_name order by n_name) \n" +
-          "from cp.`tpch/nation.parquet`";
-
-      test(query);
-    } catch(UserException ex) {
-      throwAsUnsupportedException(ex);
-      throw ex;
-    }
-  }
-
-  @Test(expected = UnsupportedFunctionException.class) // DRILL-3195
-  public void testWindowFunctionFIRST_VALUE() throws Exception {
-    try {
-      final String query = "explain plan for select FIRST_VALUE(n_nationKey) over(partition by n_name order by n_name) \n" +
-          "from cp.`tpch/nation.parquet`";
-
-      test(query);
-    } catch(UserException ex) {
-      throwAsUnsupportedException(ex);
-      throw ex;
-    }
-  }
-
-  @Test(expected = UnsupportedFunctionException.class) // DRILL-3195
-  public void testWindowFunctionLAST_VALUE() throws Exception {
-    try {
-      final String query = "explain plan for select LAST_VALUE(n_nationKey) over(partition by n_name order by n_name) \n" +
-          "from cp.`tpch/nation.parquet`";
-
-      test(query);
-    } catch(UserException ex) {
-      throwAsUnsupportedException(ex);
-      throw ex;
-    }
-  }
-
   @Test(expected = UnsupportedFunctionException.class) // DRILL-3188
   public void testWindowFrame() throws Exception {
     try {

http://git-wip-us.apache.org/repos/asf/drill/blob/b55e2328/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/window/GenerateTestData.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/window/GenerateTestData.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/window/GenerateTestData.java
new file mode 100644
index 0000000..c68cbe5
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/window/GenerateTestData.java
@@ -0,0 +1,383 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.window;
+
+import org.apache.drill.common.util.TestTools;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.PrintStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+public class GenerateTestData {
+  private static final int BATCH_SIZE = 20;
+
+  private static class Builder {
+    List<Partition> partitions = new ArrayList<>();
+
+    int cur_length;
+    List<Integer> cur_subs = new ArrayList<>();
+    List<Integer> cur_subs_size = new ArrayList<>();
+
+    Builder partition(int length) {
+      if (cur_length > 0) {
+        addPartition();
+      }
+
+      cur_length = length;
+      cur_subs.clear();
+      cur_subs_size.clear();
+      return this;
+    }
+
+    Builder sub(int subId) {
+      return sub(subId, subId);
+    }
+
+    Builder sub(int subId, int num) {
+      cur_subs.add(subId);
+      cur_subs_size.add(num);
+      return this;
+    }
+
+    void addPartition() {
+      partitions.add(
+        new Partition(cur_length,
+          cur_subs.toArray(new Integer[cur_subs.size()]),
+          cur_subs_size.toArray(new Integer[cur_subs_size.size()])));
+    }
+
+    Partition[] build() {
+      if (cur_length > 0) {
+        addPartition();
+      }
+
+      // set previous partitions
+      for (int i = 1; i < partitions.size(); i++) {
+        partitions.get(i).previous = partitions.get(i - 1);
+      }
+
+      return partitions.toArray(new Partition[partitions.size()]);
+    }
+  }
+
+  private static class Partition {
+    Partition previous;
+    final int length;
+    final Integer[] subs;
+    final Integer[] subs_sizes;
+
+    public Partition(int length, Integer[] subs, Integer[] sub_sizes) {
+      this.length = length;
+      this.subs = subs;
+      this.subs_sizes = sub_sizes;
+    }
+
+    /**
+     * @return total number of rows since first partition, this partition included
+     */
+    public int cumulLength() {
+      int prevLength = previous != null ? previous.cumulLength() : 0;
+      return length + prevLength;
+    }
+
+    public boolean isPartOf(int rowNumber) {
+      int prevLength = previous != null ? previous.cumulLength() : 0;
+      return rowNumber >= prevLength && rowNumber < cumulLength();
+    }
+
+    public int getSubIndex(final int sub) {
+      return Arrays.binarySearch(subs, sub);
+    }
+
+    public int getSubSize(int sub) {
+      if (sub != subs[subs.length - 1]) {
+        return subs_sizes[getSubIndex(sub)];
+      } else {
+        //last sub has enough rows to reach partition length
+        int size = length;
+        for (int i = 0; i < subs.length - 1; i++) {
+          size -= subs_sizes[i];
+        }
+        return size;
+      }
+    }
+
+    /**
+     * @return sub id of the sub that contains rowNumber
+     */
+    public int getSubId(int rowNumber) {
+      assert isPartOf(rowNumber) : "row "+rowNumber+" isn't part of this partition";
+
+      int prevLength = previous != null ? previous.cumulLength() : 0;
+      rowNumber -= prevLength; // row num from start of this partition
+
+      for (int s : subs) {
+        if (rowNumber < subRunningCount(s)) {
+          return s;
+        }
+      }
+
+      throw new RuntimeException("should never happen!");
+    }
+
+    /**
+     * @return running count of rows from first row of the partition to current sub, this sub included
+     */
+    public int subRunningCount(int sub) {
+      int count = 0;
+      for (int s : subs) {
+        count += getSubSize(s);
+        if (s == sub) {
+          break;
+        }
+      }
+      return count;
+    }
+
+    /**
+     * @return running sum of salaries from first row of the partition to current sub, this sub included
+     */
+    public int subRunningSum(int sub) {
+      int sum = 0;
+      for (int s : subs) {
+        sum += (s+10) * getSubSize(s);
+        if (s == sub) {
+          break;
+        }
+      }
+      return sum;
+    }
+
+    /**
+     * @return sum of salaries for all rows of the partition
+     */
+    public int totalSalary() {
+      return subRunningSum(subs[subs.length-1]);
+    }
+
+  }
+
+  private static Partition[] dataB1P1() {
+    // partition rows 20, subs [1, 2, 3, 4, 5, 6]
+    return new Builder()
+      .partition(20).sub(1).sub(2).sub(3).sub(4).sub(5).sub(6)
+      .build();
+  }
+
+  private static Partition[] dataB1P2(boolean pby) {
+    // partition rows 10, subs [1, 2, 3, 4]
+    // partition rows 10, subs [4, 5, 6]
+    if (pby) {
+      return new Builder()
+        .partition(10).sub(1).sub(2).sub(3).sub(4)
+        .partition(10).sub(4).sub(5).sub(6)
+        .build();
+    } else {
+      return new Builder()
+        .partition(20).sub(1).sub(2).sub(3).sub(4, 8).sub(5).sub(6)
+        .build();
+    }
+  }
+
+  private static Partition[] dataB2P2(boolean pby) {
+    // partition rows 20, subs [3, 5, 9]
+    // partition rows 20, subs [9, 10]
+    if (pby) {
+      return new Builder()
+        .partition(20).sub(3).sub(5).sub(9)
+        .partition(20).sub(9).sub(10)
+        .build();
+    } else {
+      return new Builder()
+        .partition(40).sub(3).sub(5).sub(9, 12 + 9).sub(10)
+        .build();
+    }
+  }
+
+  private static Partition[] dataB2P4(boolean pby) {
+    // partition rows 5, subs [1, 2, 3]
+    // partition rows 10, subs [3, 4, 5]
+    // partition rows 15, subs [5, 6, 7]
+    // partition rows 10, subs [7, 8]
+    if (pby) {
+      return new Builder()
+        .partition(5).sub(1).sub(2).sub(3)
+        .partition(10).sub(3).sub(4).sub(5)
+        .partition(15).sub(5).sub(6).sub(7)
+        .partition(10).sub(7).sub(8)
+        .build();
+    } else {
+      return new Builder()
+        .partition(40).sub(1).sub(2).sub(3, 5).sub(4).sub(5, 8).sub(6).sub(7, 11).sub(8)
+        .build();
+    }
+  }
+
+  private static Partition[] dataB3P2(boolean pby) {
+    // partition rows 5, subs [1, 2, 3]
+    // partition rows 55, subs [4, 5, 7, 8, 9, 10, 11, 12]
+    if (pby) {
+      return new Builder()
+        .partition(5).sub(1).sub(2).sub(3)
+        .partition(55).sub(4).sub(5).sub(7).sub(8).sub(9).sub(10).sub(11).sub(12)
+        .build();
+    } else {
+      return new Builder()
+        .partition(60).sub(1).sub(2).sub(3, 2).sub(4).sub(5).sub(7).sub(8).sub(9).sub(10).sub(11).sub(12)
+        .build();
+    }
+  }
+
+  private static Partition[] dataB4P4(boolean pby) {
+    // partition rows 10, subs [1, 2, 3]
+    // partition rows 30, subs [3, 4, 5, 6, 7, 8]
+    // partition rows 20, subs [8, 9, 10]
+    // partition rows 20, subs [10, 11]
+    if (pby) {
+      return new Builder()
+        .partition(10).sub(1).sub(2).sub(3)
+        .partition(30).sub(3).sub(4).sub(5).sub(6).sub(7).sub(8)
+        .partition(20).sub(8).sub(9).sub(10)
+        .partition(20).sub(10).sub(11)
+        .build();
+    } else {
+      return new Builder()
+        .partition(80).sub(1).sub(2).sub(3, 10)
+        .sub(4).sub(5).sub(6).sub(7).sub(8, 13)
+        .sub(9).sub(10, 13).sub(11, 10)
+        .build();
+    }
+  }
+
+  private static void writeData(final String path, final Partition[] partitions, final boolean addLineNo)
+      throws FileNotFoundException {
+
+    // total number of rows
+    int total = partitions[partitions.length - 1].cumulLength();
+
+    // create data rows in random order
+    List<Integer> emp_ids = new ArrayList<>(total);
+    for (int i = 0; i < total; i++) {
+      emp_ids.add(i);
+    }
+    Collections.shuffle(emp_ids);
+
+    // data file(s)
+    int fileId = 0;
+    PrintStream dataStream = new PrintStream(path + "/" + fileId + ".data.json");
+
+    int emp_idx = 0;
+    int lineNo = 0;
+    for (int id : emp_ids) {
+      int p = 0;
+      while (!partitions[p].isPartOf(id)) { // emp x is @ row x-1
+        p++;
+      }
+
+      int sub = partitions[p].getSubId(id);
+      int salary = 10 + sub;
+
+      if (addLineNo) {
+        dataStream.printf("{ \"employee_id\":%d, \"position_id\":%d, \"sub\":%d, \"salary\":%d, \"line_no\":%d }%n",
+          id, p + 1, sub, salary, lineNo);
+      } else {
+        dataStream.printf("{ \"employee_id\":%d, \"position_id\":%d, \"sub\":%d, \"salary\":%d }%n", id, p + 1, sub, salary);
+      }
+      emp_idx++;
+      if ((emp_idx % BATCH_SIZE)==0 && emp_idx < total) {
+        System.out.printf("total: %d, emp_idx: %d, fileID: %d%n", total, emp_idx, fileId);
+        dataStream.close();
+        fileId++;
+        dataStream = new PrintStream(path + "/" + fileId + ".data.json");
+      }
+
+      lineNo++;
+    }
+
+    dataStream.close();
+  }
+
+  private static void writeResults(final String path, final String prefix, final Partition[] partitions) throws FileNotFoundException {
+    // expected results for query without order by clause
+    final PrintStream resultStream = new PrintStream(path + prefix + ".tsv");
+    // expected results for query with order by clause
+    final PrintStream resultOrderStream = new PrintStream(path + prefix + ".oby.tsv");
+
+    int idx = 0;
+    for (final Partition partition : partitions) {
+      for (int i = 0; i < partition.length; i++, idx++) {
+
+        final int sub = partition.getSubId(idx);
+        final int rowNumber = i + 1;
+        final int rank = 1 + partition.subRunningCount(sub) - partition.getSubSize(sub);
+        final int denseRank = partition.getSubIndex(sub) + 1;
+        final double cumeDist = (double) partition.subRunningCount(sub) / partition.length;
+        final double percentRank = partition.length == 1 ? 0 : (double)(rank - 1)/(partition.length - 1);
+
+        // each line has: count(*)  sum(salary)  row_number()  rank()  dense_rank()  cume_dist()  percent_rank()
+        resultOrderStream.printf("%d\t%d\t%d\t%d\t%d\t%s\t%s%n",
+          partition.subRunningCount(sub), partition.subRunningSum(sub),
+          rowNumber, rank, denseRank, Double.toString(cumeDist), Double.toString(percentRank));
+
+        // each line has: count(*)  sum(salary)
+        resultStream.printf("%d\t%d%n", partition.length, partition.totalSalary());
+      }
+    }
+
+    resultStream.close();
+    resultOrderStream.close();
+  }
+
+  private static void generateData(final String tableName, final Partition[] pby_data, final Partition[] nopby_data)
+      throws FileNotFoundException {
+    generateData(tableName, pby_data, nopby_data, false);
+  }
+
+  private static void generateData(final String tableName, final Partition[] pby_data, final Partition[] nopby_data,
+      final boolean addLineNo) throws FileNotFoundException {
+    final String WORKING_PATH = TestTools.getWorkingPath();
+    final String TEST_RES_PATH = WORKING_PATH + "/src/test/resources";
+    final String path = TEST_RES_PATH+"/window/" + tableName;
+
+    final File pathFolder = new File(path);
+    if (!pathFolder.exists()) {
+      if (!pathFolder.mkdirs()) {
+        System.err.printf("Couldn't create folder %s, exiting%n", path);
+      }
+    }
+
+    writeData(path, pby_data, addLineNo);
+
+    writeResults(path, "", nopby_data);
+    writeResults(path, ".pby", pby_data);
+  }
+
+  public static void main(String[] args) throws FileNotFoundException {
+    generateData("b1.p1", dataB1P1(), dataB1P1());
+    generateData("b1.p2", dataB1P2(true), dataB1P2(false));
+    generateData("b2.p2", dataB2P2(true), dataB2P2(false));
+    generateData("b2.p4", dataB2P4(true), dataB2P4(false));
+    generateData("b3.p2", dataB3P2(true), dataB3P2(false));
+    generateData("b4.p4", dataB4P4(true), dataB4P4(false), true);
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/drill/blob/b55e2328/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/window/TestWindowFrame.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/window/TestWindowFrame.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/window/TestWindowFrame.java
index 553c4e8..d76420f 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/window/TestWindowFrame.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/window/TestWindowFrame.java
@@ -24,6 +24,7 @@ import org.apache.drill.DrillTestWrapper;
 import org.apache.drill.common.config.DrillConfig;
 import org.apache.drill.common.util.TestTools;
 import org.apache.drill.exec.ExecConstants;
+import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
@@ -146,4 +147,165 @@ public class TestWindowFrame extends BaseTestQuery {
     test(getFile("window/q3220.sql"), TEST_RES_PATH);
   }
 
+  @Test // DRILL-3604
+  public void testFix3604() throws Exception {
+    // make sure the query doesn't fail
+    test(getFile("window/3604.sql"), TEST_RES_PATH);
+  }
+
+  @Test // DRILL-3605
+  public void testFix3605() throws Exception {
+    testBuilder()
+      .sqlQuery(getFile("window/3605.sql"), TEST_RES_PATH)
+      .ordered()
+      .csvBaselineFile("window/3605.tsv")
+      .baselineColumns("col2", "lead_col2")
+      .build()
+      .run();
+  }
+
+  @Test // DRILL-3606
+  public void testFix3606() throws Exception {
+    testBuilder()
+      .sqlQuery(getFile("window/3606.sql"), TEST_RES_PATH)
+      .ordered()
+      .csvBaselineFile("window/3606.tsv")
+      .baselineColumns("col2", "lead_col2")
+      .build()
+      .run();
+  }
+
+  @Test
+  public void testLead() throws Exception {
+    testBuilder()
+      .sqlQuery(getFile("window/lead.oby.sql"), TEST_RES_PATH)
+      .ordered()
+      .csvBaselineFile("window/b4.p4.lead.oby.tsv")
+      .baselineColumns("lead")
+      .build()
+      .run();
+  }
+
+  @Test
+  public void testLagWithPby() throws Exception {
+    testBuilder()
+      .sqlQuery(getFile("window/lag.pby.oby.sql"), TEST_RES_PATH)
+      .ordered()
+      .csvBaselineFile("window/b4.p4.lag.pby.oby.tsv")
+      .baselineColumns("lag")
+      .build()
+      .run();
+  }
+
+
+  @Test
+  public void testLag() throws Exception {
+    testBuilder()
+      .sqlQuery(getFile("window/lag.oby.sql"), TEST_RES_PATH)
+      .ordered()
+      .csvBaselineFile("window/b4.p4.lag.oby.tsv")
+      .baselineColumns("lag")
+      .build()
+      .run();
+  }
+
+  @Test
+  public void testLeadWithPby() throws Exception {
+    testBuilder()
+      .sqlQuery(getFile("window/lead.pby.oby.sql"), TEST_RES_PATH)
+      .ordered()
+      .csvBaselineFile("window/b4.p4.lead.pby.oby.tsv")
+      .baselineColumns("lead")
+      .build()
+      .run();
+  }
+
+  @Test
+  public void testFirstValue() throws Exception {
+    testBuilder()
+      .sqlQuery(getFile("window/fval.pby.sql"), TEST_RES_PATH)
+      .ordered()
+      .csvBaselineFile("window/b4.p4.fval.pby.tsv")
+      .baselineColumns("first_value")
+      .build()
+      .run();
+  }
+
+  @Test
+  public void testLastValue() throws Exception {
+    testBuilder()
+      .sqlQuery(getFile("window/lval.pby.oby.sql"), TEST_RES_PATH)
+      .ordered()
+      .csvBaselineFile("window/b4.p4.lval.pby.oby.tsv")
+      .baselineColumns("last_value")
+      .build()
+      .run();
+  }
+
+  @Test
+  public void testFirstValueAllTypes() throws Exception {
+    // make sure all types are handled properly
+    test(getFile("window/fval.alltypes.sql"), TEST_RES_PATH);
+  }
+
+  @Test
+  public void testLastValueAllTypes() throws Exception {
+    // make sure all types are handled properly
+    test(getFile("window/fval.alltypes.sql"), TEST_RES_PATH);
+  }
+
+  @Test
+  public void testNtile() throws Exception {
+    testBuilder()
+      .sqlQuery(getFile("window/ntile.sql"), TEST_RES_PATH)
+      .ordered()
+      .csvBaselineFile("window/b2.p4.ntile.tsv")
+      .baselineColumns("ntile")
+      .build()
+      .run();
+  }
+
+  @Test
+  public void test3648Fix() throws Exception {
+    testBuilder()
+      .sqlQuery(getFile("window/3648.sql"), TEST_RES_PATH)
+      .ordered()
+      .csvBaselineFile("window/3648.tsv")
+      .baselineColumns("ntile")
+      .build()
+      .run();
+  }
+
+  @Test
+  public void testPartitionNtile() {
+    Partition partition = new Partition(12);
+
+    Assert.assertEquals(1, partition.ntile(5));
+    partition.rowAggregated();
+    Assert.assertEquals(1, partition.ntile(5));
+    partition.rowAggregated();
+    Assert.assertEquals(1, partition.ntile(5));
+
+    partition.rowAggregated();
+    Assert.assertEquals(2, partition.ntile(5));
+    partition.rowAggregated();
+    Assert.assertEquals(2, partition.ntile(5));
+    partition.rowAggregated();
+    Assert.assertEquals(2, partition.ntile(5));
+
+    partition.rowAggregated();
+    Assert.assertEquals(3, partition.ntile(5));
+    partition.rowAggregated();
+    Assert.assertEquals(3, partition.ntile(5));
+
+    partition.rowAggregated();
+    Assert.assertEquals(4, partition.ntile(5));
+    partition.rowAggregated();
+    Assert.assertEquals(4, partition.ntile(5));
+
+    partition.rowAggregated();
+    Assert.assertEquals(5, partition.ntile(5));
+    partition.rowAggregated();
+    Assert.assertEquals(5, partition.ntile(5));
+  }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/b55e2328/exec/java-exec/src/test/resources/window/3604.sql
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/window/3604.sql b/exec/java-exec/src/test/resources/window/3604.sql
new file mode 100644
index 0000000..f88beaf
--- /dev/null
+++ b/exec/java-exec/src/test/resources/window/3604.sql
@@ -0,0 +1,4 @@
+select
+  lead(col3) over(partition by col2 order by col0) lead_col0
+from
+  dfs_test.`%s/window/fewRowsAllData.parquet`
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/drill/blob/b55e2328/exec/java-exec/src/test/resources/window/3605.sql
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/window/3605.sql b/exec/java-exec/src/test/resources/window/3605.sql
new file mode 100644
index 0000000..1e5d54a
--- /dev/null
+++ b/exec/java-exec/src/test/resources/window/3605.sql
@@ -0,0 +1,5 @@
+select
+  col2,
+  lead(col2) over(partition by col2 order by col0) as lead_col2
+from
+  dfs_test.`%s/window/fewRowsAllData.parquet`
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/drill/blob/b55e2328/exec/java-exec/src/test/resources/window/3605.tsv
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/window/3605.tsv b/exec/java-exec/src/test/resources/window/3605.tsv
new file mode 100644
index 0000000..fccadbc
--- /dev/null
+++ b/exec/java-exec/src/test/resources/window/3605.tsv
@@ -0,0 +1,78 @@
+AK
+AZ	AZ
+AZ
+CA
+CO	CO
+CO	CO
+CO
+DE
+GA	GA
+GA	GA
+GA	GA
+GA
+HI
+IA	IA
+IA	IA
+IA	IA
+IA
+IN	IN
+IN	IN
+IN	IN
+IN
+KS
+LA
+MA	MA
+MA	MA
+MA
+MD	MD
+MD
+ME	ME
+ME
+MI
+MN	MN
+MN	MN
+MN	MN
+MN	MN
+MN	MN
+MN
+MO	MO
+MO	MO
+MO
+NC
+ND	ND
+ND
+NE
+NH	NH
+NH	NH
+NH
+NY	NY
+NY	NY
+NY
+OH	OH
+OH
+OR	OR
+OR
+PA
+RI	RI
+RI	RI
+RI	RI
+RI
+SC	SC
+SC
+SD	SD
+SD	SD
+SD
+UT	UT
+UT	UT
+UT
+VT	VT
+VT	VT
+VT	VT
+VT
+WI	WI
+WI
+WV
+WY	WY
+WY	WY
+WY	WY
+WY

http://git-wip-us.apache.org/repos/asf/drill/blob/b55e2328/exec/java-exec/src/test/resources/window/3606.sql
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/window/3606.sql b/exec/java-exec/src/test/resources/window/3606.sql
new file mode 100644
index 0000000..d96469e
--- /dev/null
+++ b/exec/java-exec/src/test/resources/window/3606.sql
@@ -0,0 +1,5 @@
+select
+  col2,
+  lead(col2) over(order by col0) as lead_col2
+from
+  dfs_test.`%s/window/fewRowsAllData.parquet`
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/drill/blob/b55e2328/exec/java-exec/src/test/resources/window/3606.tsv
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/window/3606.tsv b/exec/java-exec/src/test/resources/window/3606.tsv
new file mode 100644
index 0000000..c92a523
--- /dev/null
+++ b/exec/java-exec/src/test/resources/window/3606.tsv
@@ -0,0 +1,78 @@
+NE	NH
+NH	IN
+IN	CA
+CA	CO
+CO	SC
+SC	SD
+SD	WY
+WY	LA
+LA	KS
+KS	CO
+CO	NY
+NY	NY
+NY	SD
+SD	GA
+GA	MO
+MO	IN
+IN	MN
+MN	IA
+IA	GA
+GA	MN
+MN	MN
+MN	MI
+MI	RI
+RI	SD
+SD	IN
+IN	WI
+WI	MA
+MA	IA
+IA	ND
+ND	MA
+MA	RI
+RI	ME
+ME	MN
+MN	CO
+CO	OH
+OH	MO
+MO	GA
+GA	VT
+VT	ND
+ND	NH
+NH	RI
+RI	OR
+OR	NC
+NC	AZ
+AZ	OR
+OR	MD
+MD	HI
+HI	MA
+MA	NY
+NY	UT
+UT	DE
+DE	WY
+WY	OH
+OH	WY
+WY	NH
+NH	AK
+AK	MD
+MD	PA
+PA	MN
+MN	GA
+GA	MO
+MO	VT
+VT	UT
+UT	IN
+IN	WY
+WY	WV
+WV	IA
+IA	MN
+MN	AZ
+AZ	VT
+VT	IA
+IA	UT
+UT	WI
+WI	VT
+VT	RI
+RI	SC
+SC	ME
+ME
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/drill/blob/b55e2328/exec/java-exec/src/test/resources/window/3648.parquet
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/window/3648.parquet b/exec/java-exec/src/test/resources/window/3648.parquet
new file mode 100644
index 0000000..3819b21
Binary files /dev/null and b/exec/java-exec/src/test/resources/window/3648.parquet differ

http://git-wip-us.apache.org/repos/asf/drill/blob/b55e2328/exec/java-exec/src/test/resources/window/3648.sql
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/window/3648.sql b/exec/java-exec/src/test/resources/window/3648.sql
new file mode 100644
index 0000000..c63ba17
--- /dev/null
+++ b/exec/java-exec/src/test/resources/window/3648.sql
@@ -0,0 +1,5 @@
+select
+  ntile(5)
+    over(partition by col7 order by col0) as `ntile`
+from
+  dfs_test.`%s/window/3648.parquet`
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/drill/blob/b55e2328/exec/java-exec/src/test/resources/window/3648.tsv
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/window/3648.tsv b/exec/java-exec/src/test/resources/window/3648.tsv
new file mode 100644
index 0000000..761676a
--- /dev/null
+++ b/exec/java-exec/src/test/resources/window/3648.tsv
@@ -0,0 +1,22 @@
+1
+1
+1
+2
+2
+3
+3
+4
+4
+5
+5
+1
+1
+1
+2
+2
+3
+3
+4
+4
+5
+5
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/drill/blob/b55e2328/exec/java-exec/src/test/resources/window/b2.p4.ntile.tsv
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/window/b2.p4.ntile.tsv b/exec/java-exec/src/test/resources/window/b2.p4.ntile.tsv
new file mode 100644
index 0000000..7063e22
--- /dev/null
+++ b/exec/java-exec/src/test/resources/window/b2.p4.ntile.tsv
@@ -0,0 +1,40 @@
+1
+1
+2
+2
+3
+1
+1
+1
+1
+2
+2
+2
+3
+3
+3
+1
+1
+1
+1
+1
+2
+2
+2
+2
+2
+3
+3
+3
+3
+3
+1
+1
+1
+1
+2
+2
+2
+3
+3
+3
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/drill/blob/b55e2328/exec/java-exec/src/test/resources/window/b4.p4.fval.pby.tsv
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/window/b4.p4.fval.pby.tsv b/exec/java-exec/src/test/resources/window/b4.p4.fval.pby.tsv
new file mode 100644
index 0000000..f39bd2b
--- /dev/null
+++ b/exec/java-exec/src/test/resources/window/b4.p4.fval.pby.tsv
@@ -0,0 +1,80 @@
+6
+6
+6
+6
+6
+6
+6
+6
+6
+6
+11
+11
+11
+11
+11
+11
+11
+11
+11
+11
+11
+11
+11
+11
+11
+11
+11
+11
+11
+11
+11
+11
+11
+11
+11
+11
+11
+11
+11
+11
+58
+58
+58
+58
+58
+58
+58
+58
+58
+58
+58
+58
+58
+58
+58
+58
+58
+58
+58
+58
+68
+68
+68
+68
+68
+68
+68
+68
+68
+68
+68
+68
+68
+68
+68
+68
+68
+68
+68
+68
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/drill/blob/b55e2328/exec/java-exec/src/test/resources/window/b4.p4.lag.oby.tsv
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/window/b4.p4.lag.oby.tsv b/exec/java-exec/src/test/resources/window/b4.p4.lag.oby.tsv
new file mode 100644
index 0000000..a669832
--- /dev/null
+++ b/exec/java-exec/src/test/resources/window/b4.p4.lag.oby.tsv
@@ -0,0 +1,80 @@
+
+15
+77
+44
+69
+74
+17
+5
+60
+48
+47
+76
+3
+28
+64
+39
+71
+10
+67
+59
+36
+70
+7
+25
+72
+42
+35
+58
+63
+50
+26
+16
+12
+45
+61
+20
+51
+18
+19
+9
+22
+75
+68
+53
+55
+79
+56
+46
+32
+54
+11
+4
+57
+13
+62
+33
+43
+65
+52
+1
+27
+30
+38
+40
+2
+41
+66
+34
+49
+0
+29
+23
+14
+24
+21
+8
+31
+6
+37
+73
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/drill/blob/b55e2328/exec/java-exec/src/test/resources/window/b4.p4.lag.pby.oby.tsv
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/window/b4.p4.lag.pby.oby.tsv b/exec/java-exec/src/test/resources/window/b4.p4.lag.pby.oby.tsv
new file mode 100644
index 0000000..cf0de85
--- /dev/null
+++ b/exec/java-exec/src/test/resources/window/b4.p4.lag.pby.oby.tsv
@@ -0,0 +1,80 @@
+
+15
+77
+44
+69
+74
+17
+5
+60
+48
+
+76
+3
+28
+64
+39
+71
+10
+67
+59
+36
+70
+7
+25
+72
+42
+35
+58
+63
+50
+26
+16
+12
+45
+61
+20
+51
+18
+19
+9
+
+75
+68
+53
+55
+79
+56
+46
+32
+54
+11
+4
+57
+13
+62
+33
+43
+65
+52
+1
+
+30
+38
+40
+2
+41
+66
+34
+49
+0
+29
+23
+14
+24
+21
+8
+31
+6
+37
+73
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/drill/blob/b55e2328/exec/java-exec/src/test/resources/window/b4.p4.lead.oby.tsv
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/window/b4.p4.lead.oby.tsv b/exec/java-exec/src/test/resources/window/b4.p4.lead.oby.tsv
new file mode 100644
index 0000000..050c6ac
--- /dev/null
+++ b/exec/java-exec/src/test/resources/window/b4.p4.lead.oby.tsv
@@ -0,0 +1,80 @@
+77
+44
+69
+74
+17
+5
+60
+48
+47
+76
+3
+28
+64
+39
+71
+10
+67
+59
+36
+70
+7
+25
+72
+42
+35
+58
+63
+50
+26
+16
+12
+45
+61
+20
+51
+18
+19
+9
+22
+75
+68
+53
+55
+79
+56
+46
+32
+54
+11
+4
+57
+13
+62
+33
+43
+65
+52
+1
+27
+30
+38
+40
+2
+41
+66
+34
+49
+0
+29
+23
+14
+24
+21
+8
+31
+6
+37
+73
+78
+

http://git-wip-us.apache.org/repos/asf/drill/blob/b55e2328/exec/java-exec/src/test/resources/window/b4.p4.lead.pby.oby.tsv
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/window/b4.p4.lead.pby.oby.tsv b/exec/java-exec/src/test/resources/window/b4.p4.lead.pby.oby.tsv
new file mode 100644
index 0000000..0a6094c
--- /dev/null
+++ b/exec/java-exec/src/test/resources/window/b4.p4.lead.pby.oby.tsv
@@ -0,0 +1,80 @@
+77
+44
+69
+74
+17
+5
+60
+48
+47
+
+3
+28
+64
+39
+71
+10
+67
+59
+36
+70
+7
+25
+72
+42
+35
+58
+63
+50
+26
+16
+12
+45
+61
+20
+51
+18
+19
+9
+22
+
+68
+53
+55
+79
+56
+46
+32
+54
+11
+4
+57
+13
+62
+33
+43
+65
+52
+1
+27
+
+38
+40
+2
+41
+66
+34
+49
+0
+29
+23
+14
+24
+21
+8
+31
+6
+37
+73
+78
+

http://git-wip-us.apache.org/repos/asf/drill/blob/b55e2328/exec/java-exec/src/test/resources/window/b4.p4.lval.pby.oby.tsv
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/window/b4.p4.lval.pby.oby.tsv b/exec/java-exec/src/test/resources/window/b4.p4.lval.pby.oby.tsv
new file mode 100644
index 0000000..9c8a0c6
--- /dev/null
+++ b/exec/java-exec/src/test/resources/window/b4.p4.lval.pby.oby.tsv
@@ -0,0 +1,80 @@
+6
+0
+5
+2
+9
+8
+7
+3
+4
+1
+11
+21
+38
+16
+31
+30
+36
+37
+34
+39
+22
+29
+12
+25
+19
+14
+24
+32
+28
+35
+26
+18
+33
+27
+13
+17
+20
+15
+23
+10
+58
+50
+49
+52
+59
+47
+54
+55
+46
+57
+42
+48
+43
+45
+51
+53
+56
+41
+40
+44
+68
+63
+76
+74
+71
+73
+70
+72
+69
+60
+75
+66
+77
+61
+62
+64
+67
+65
+78
+79
\ No newline at end of file


Mime
View raw message