tajo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hyun...@apache.org
Subject [3/4] TAJO-774: Implement logical plan part and physical executor for window function.
Date Wed, 09 Jul 2014 04:24:41 GMT
http://git-wip-us.apache.org/repos/asf/tajo/blob/d3ee50a9/tajo-core/src/main/java/org/apache/tajo/engine/eval/AlgebraicUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/eval/AlgebraicUtil.java b/tajo-core/src/main/java/org/apache/tajo/engine/eval/AlgebraicUtil.java
index d993b27..a72e2a8 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/eval/AlgebraicUtil.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/eval/AlgebraicUtil.java
@@ -37,7 +37,7 @@ public class AlgebraicUtil {
    * @return Transposed expression
    */
   public static EvalNode transpose(EvalNode evalNode, Column target) {
-    BinaryEval commutated = null;
+    BinaryEval commutated;
 
     if (evalNode instanceof BinaryEval) { // if it is binary
       BinaryEval binaryEval = (BinaryEval) evalNode;
@@ -168,19 +168,22 @@ public class AlgebraicUtil {
       return unaryEval;
     }
 
-    public EvalNode visitFuncCall(Object context, GeneralFunctionEval evalNode, Stack<EvalNode> stack) {
-      boolean constant = true;
+    @Override
+    public EvalNode visitFuncCall(Object context, FunctionEval evalNode, Stack<EvalNode> stack) {
+      boolean constantOfAllDescendents = true;
 
       if ("sleep".equals(evalNode.funcDesc.getSignature())) {
-        constant = false;
+        constantOfAllDescendents = false;
       } else {
-        for (EvalNode arg : evalNode.getArgs()) {
-          arg = visit(context, arg, stack);
-          constant &= (arg.getType() == EvalType.CONST);
+        if (evalNode.getArgs() != null) {
+          for (EvalNode arg : evalNode.getArgs()) {
+            arg = visit(context, arg, stack);
+            constantOfAllDescendents &= (arg.getType() == EvalType.CONST);
+          }
         }
       }
 
-      if (constant) {
+      if (constantOfAllDescendents && evalNode.getType() == EvalType.FUNCTION) {
         return new ConstEval(evalNode.eval(null, null));
       } else {
         return evalNode;

http://git-wip-us.apache.org/repos/asf/tajo/blob/d3ee50a9/tajo-core/src/main/java/org/apache/tajo/engine/eval/BasicEvalNodeVisitor.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/eval/BasicEvalNodeVisitor.java b/tajo-core/src/main/java/org/apache/tajo/engine/eval/BasicEvalNodeVisitor.java
index 6e83c70..3b94cc9 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/eval/BasicEvalNodeVisitor.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/eval/BasicEvalNodeVisitor.java
@@ -125,6 +125,9 @@ public class BasicEvalNodeVisitor<CONTEXT, RESULT> implements EvalNodeVisitor2<C
       case AGG_FUNCTION:
         result = visitAggrFuncCall(context, (AggregationFunctionCallEval) evalNode, stack);
         break;
+      case WINDOW_FUNCTION:
+        result = visitWindowFunc(context, (WindowFunctionEval) evalNode, stack);
+      break;
 
       case SIGNED:
         result = visitSigned(context, (SignedEval) evalNode, stack);
@@ -326,6 +329,11 @@ public class BasicEvalNodeVisitor<CONTEXT, RESULT> implements EvalNodeVisitor2<C
   }
 
   @Override
+  public RESULT visitWindowFunc(CONTEXT context, WindowFunctionEval evalNode, Stack<EvalNode> stack) {
+    return visitDefaultFunctionEval(context, evalNode, stack);
+  }
+
+  @Override
   public RESULT visitSigned(CONTEXT context, SignedEval signedEval, Stack<EvalNode> stack) {
     return visitDefaultUnaryEval(context, signedEval, stack);
   }

http://git-wip-us.apache.org/repos/asf/tajo/blob/d3ee50a9/tajo-core/src/main/java/org/apache/tajo/engine/eval/BinaryEval.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/eval/BinaryEval.java b/tajo-core/src/main/java/org/apache/tajo/engine/eval/BinaryEval.java
index bafce91..70d6bb1 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/eval/BinaryEval.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/eval/BinaryEval.java
@@ -26,6 +26,7 @@ import org.apache.tajo.common.TajoDataTypes.DataType;
 import org.apache.tajo.datum.Datum;
 import org.apache.tajo.datum.DatumFactory;
 import org.apache.tajo.datum.NullDatum;
+import org.apache.tajo.engine.utils.DataTypeUtil;
 import org.apache.tajo.exception.InvalidOperationException;
 import org.apache.tajo.storage.Tuple;
 
@@ -60,7 +61,7 @@ public class BinaryEval extends EvalNode implements Cloneable {
             type == EvalType.MULTIPLY ||
             type == EvalType.DIVIDE ||
             type == EvalType.MODULAR ) {
-      this.returnType = determineType(left.getValueType(), right.getValueType());
+      this.returnType = DataTypeUtil.determineType(left.getValueType(), right.getValueType());
 
     } else if (type == EvalType.CONCATENATE) {
       this.returnType = CatalogUtil.newSimpleDataType(Type.TEXT);
@@ -97,102 +98,6 @@ public class BinaryEval extends EvalNode implements Cloneable {
     }
   }
 
-  /**
-   * This is verified by ExprsVerifier.checkArithmeticOperand().
-   */
-  private DataType determineType(DataType left, DataType right) throws InvalidEvalException {
-    switch (left.getType()) {
-
-    case INT1:
-    case INT2:
-    case INT4: {
-      switch(right.getType()) {
-      case INT1:
-      case INT2:
-      case INT4: return CatalogUtil.newSimpleDataType(Type.INT4);
-      case INT8: return CatalogUtil.newSimpleDataType(Type.INT8);
-      case FLOAT4: return CatalogUtil.newSimpleDataType(Type.FLOAT4);
-      case FLOAT8: return CatalogUtil.newSimpleDataType(Type.FLOAT8);
-      case DATE: return CatalogUtil.newSimpleDataType(Type.DATE);
-      case INTERVAL: return CatalogUtil.newSimpleDataType(Type.INTERVAL);
-      }
-    }
-
-    case INT8: {
-      switch(right.getType()) {
-      case INT1:
-      case INT2:
-      case INT4:
-      case INT8: return CatalogUtil.newSimpleDataType(Type.INT8);
-      case FLOAT4: return CatalogUtil.newSimpleDataType(Type.FLOAT4);
-      case FLOAT8: return CatalogUtil.newSimpleDataType(Type.FLOAT8);
-      case DATE: return CatalogUtil.newSimpleDataType(Type.DATE);
-      case INTERVAL: return CatalogUtil.newSimpleDataType(Type.INTERVAL);
-      }
-    }
-
-    case FLOAT4: {
-      switch(right.getType()) {
-      case INT1:
-      case INT2:
-      case INT4: return CatalogUtil.newSimpleDataType(Type.FLOAT4);
-      case INT8: return CatalogUtil.newSimpleDataType(Type.FLOAT4);
-      case FLOAT4: return CatalogUtil.newSimpleDataType(Type.FLOAT4);
-      case FLOAT8: return CatalogUtil.newSimpleDataType(Type.FLOAT8);
-      case INTERVAL: return CatalogUtil.newSimpleDataType(Type.INTERVAL);
-      }
-    }
-
-    case FLOAT8: {
-      switch(right.getType()) {
-      case INT1:
-      case INT2:
-      case INT4:
-      case INT8:
-      case FLOAT4:
-      case FLOAT8: return CatalogUtil.newSimpleDataType(Type.FLOAT8);
-      case INTERVAL: return CatalogUtil.newSimpleDataType(Type.INTERVAL);
-      }
-    }
-
-    case DATE: {
-      switch(right.getType()) {
-      case INT2:
-      case INT4:
-      case INT8: return CatalogUtil.newSimpleDataType(Type.DATE);
-      case INTERVAL:
-      case TIME: return CatalogUtil.newSimpleDataType(Type.TIMESTAMP);
-      case DATE: return CatalogUtil.newSimpleDataType(Type.INT4);
-      }
-    }
-
-    case TIME: {
-      switch(right.getType()) {
-      case INTERVAL: return CatalogUtil.newSimpleDataType(Type.TIME);
-      case TIME: return CatalogUtil.newSimpleDataType(Type.INTERVAL);
-      case DATE: return CatalogUtil.newSimpleDataType(Type.INT4);
-      }
-    }
-
-    case TIMESTAMP: {
-      switch (right.getType()) {
-      case INTERVAL: return CatalogUtil.newSimpleDataType(Type.TIMESTAMP);
-      case TIMESTAMP: return CatalogUtil.newSimpleDataType(Type.INTERVAL);
-      }
-    }
-
-    case INTERVAL: {
-      switch (right.getType()) {
-      case INTERVAL:
-      case FLOAT4:
-      case FLOAT8: return CatalogUtil.newSimpleDataType(Type.INTERVAL);
-      }
-    }
-
-    default: return left;
-    }
-  }
-
   @Override
   public Datum eval(Schema schema, Tuple tuple) {
     Datum lhs = leftExpr.eval(schema, tuple);

http://git-wip-us.apache.org/repos/asf/tajo/blob/d3ee50a9/tajo-core/src/main/java/org/apache/tajo/engine/eval/EvalNodeVisitor2.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/eval/EvalNodeVisitor2.java b/tajo-core/src/main/java/org/apache/tajo/engine/eval/EvalNodeVisitor2.java
index 024a988..e85984e 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/eval/EvalNodeVisitor2.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/eval/EvalNodeVisitor2.java
@@ -64,6 +64,7 @@ public interface EvalNodeVisitor2<CONTEXT, RESULT> {
   // Functions
   RESULT visitFuncCall(CONTEXT context, GeneralFunctionEval evalNode, Stack<EvalNode> stack);
   RESULT visitAggrFuncCall(CONTEXT context, AggregationFunctionCallEval evalNode, Stack<EvalNode> stack);
+  RESULT visitWindowFunc(CONTEXT context, WindowFunctionEval evalNode, Stack<EvalNode> stack);
 
   RESULT visitSigned(CONTEXT context, SignedEval signedEval, Stack<EvalNode> stack);
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/d3ee50a9/tajo-core/src/main/java/org/apache/tajo/engine/eval/EvalType.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/eval/EvalType.java b/tajo-core/src/main/java/org/apache/tajo/engine/eval/EvalType.java
index 14367f4..549f8d0 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/eval/EvalType.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/eval/EvalType.java
@@ -44,6 +44,7 @@ public enum EvalType {
   BIT_XOR(BinaryEval.class, "|"),
 
   // Function
+  WINDOW_FUNCTION(WindowFunctionEval.class),
   AGG_FUNCTION(AggregationFunctionCallEval.class),
   FUNCTION(GeneralFunctionEval.class),
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/d3ee50a9/tajo-core/src/main/java/org/apache/tajo/engine/eval/FunctionEval.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/eval/FunctionEval.java b/tajo-core/src/main/java/org/apache/tajo/engine/eval/FunctionEval.java
index 6781c34..0cc8d98 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/eval/FunctionEval.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/eval/FunctionEval.java
@@ -19,6 +19,7 @@
 package org.apache.tajo.engine.eval;
 
 import com.google.common.base.Objects;
+import com.google.common.base.Preconditions;
 import com.google.gson.annotations.Expose;
 import org.apache.tajo.catalog.FunctionDesc;
 import org.apache.tajo.catalog.Schema;
@@ -42,7 +43,8 @@ public abstract class FunctionEval extends EvalNode implements Cloneable {
 	public FunctionEval(EvalType type, FunctionDesc funcDesc, EvalNode[] argEvals) {
 		super(type);
 		this.funcDesc = funcDesc;
-		this.argEvals = argEvals;
+    Preconditions.checkArgument(argEvals != null, "argEvals cannot be null");
+    this.argEvals = argEvals;
 	}
 
   public ParamType [] getParamType() {
@@ -85,6 +87,10 @@ public abstract class FunctionEval extends EvalNode implements Cloneable {
 		return funcDesc.getSignature();
 	}
 
+  public FunctionDesc getFuncDesc() {
+    return this.funcDesc;
+  }
+
   @Override
 	public String toString() {
 		StringBuilder sb = new StringBuilder();
@@ -119,26 +125,32 @@ public abstract class FunctionEval extends EvalNode implements Cloneable {
   public Object clone() throws CloneNotSupportedException {
     FunctionEval eval = (FunctionEval) super.clone();
     eval.funcDesc = (FunctionDesc) funcDesc.clone();
-    eval.argEvals = new EvalNode[argEvals.length];
-    for (int i = 0; i < argEvals.length; i++) {
-      eval.argEvals[i] = (EvalNode) argEvals[i].clone();
-    }    
+    if (argEvals != null) {
+      eval.argEvals = new EvalNode[argEvals.length];
+      for (int i = 0; i < argEvals.length; i++) {
+        eval.argEvals[i] = (EvalNode) argEvals[i].clone();
+      }
+    }
     return eval;
   }
 	
 	@Override
   public void preOrder(EvalNodeVisitor visitor) {
-    for (EvalNode eval : argEvals) {
-      eval.postOrder(visitor);
+    if (argEvals != null) {
+      for (EvalNode eval : argEvals) {
+        eval.postOrder(visitor);
+      }
     }
     visitor.visit(this);
   }
 	
 	@Override
 	public void postOrder(EvalNodeVisitor visitor) {
-	  for (EvalNode eval : argEvals) {
-	    eval.postOrder(visitor);
-	  }
+    if (argEvals != null) {
+      for (EvalNode eval : argEvals) {
+        eval.postOrder(visitor);
+      }
+    }
 	  visitor.visit(this);
 	}
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/d3ee50a9/tajo-core/src/main/java/org/apache/tajo/engine/eval/SimpleEvalNodeVisitor.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/eval/SimpleEvalNodeVisitor.java b/tajo-core/src/main/java/org/apache/tajo/engine/eval/SimpleEvalNodeVisitor.java
index e4503e2..93f1f74 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/eval/SimpleEvalNodeVisitor.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/eval/SimpleEvalNodeVisitor.java
@@ -66,10 +66,13 @@ public class SimpleEvalNodeVisitor<CONTEXT> {
 
       // Functions
       case FUNCTION:
-        result = visitFuncCall(context, (GeneralFunctionEval) evalNode, stack);
+        result = visitFuncCall(context, (FunctionEval) evalNode, stack);
         break;
       case AGG_FUNCTION:
-        result = visitAggrFuncCall(context, (AggregationFunctionCallEval) evalNode, stack);
+        result = visitFuncCall(context, (FunctionEval) evalNode, stack);
+        break;
+      case WINDOW_FUNCTION:
+        result = visitFuncCall(context, (FunctionEval) evalNode, stack);
         break;
 
       default:
@@ -163,11 +166,7 @@ public class SimpleEvalNodeVisitor<CONTEXT> {
   // Functions
   ///////////////////////////////////////////////////////////////////////////////////////////////
 
-  public EvalNode visitFuncCall(CONTEXT context, GeneralFunctionEval evalNode, Stack<EvalNode> stack) {
-    return visitDefaultFunctionEval(context, stack, evalNode);
-  }
-
-  public EvalNode visitAggrFuncCall(CONTEXT context, AggregationFunctionCallEval evalNode, Stack<EvalNode> stack) {
+  public EvalNode visitFuncCall(CONTEXT context, FunctionEval evalNode, Stack<EvalNode> stack) {
     return visitDefaultFunctionEval(context, stack, evalNode);
   }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/d3ee50a9/tajo-core/src/main/java/org/apache/tajo/engine/eval/WindowFunctionEval.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/eval/WindowFunctionEval.java b/tajo-core/src/main/java/org/apache/tajo/engine/eval/WindowFunctionEval.java
new file mode 100644
index 0000000..fb4eede
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/eval/WindowFunctionEval.java
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.eval;
+
+import com.google.gson.annotations.Expose;
+import org.apache.tajo.catalog.FunctionDesc;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.SortSpec;
+import org.apache.tajo.common.TajoDataTypes.DataType;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.engine.function.AggFunction;
+import org.apache.tajo.engine.function.FunctionContext;
+import org.apache.tajo.engine.planner.logical.WindowSpec;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.storage.VTuple;
+import org.apache.tajo.util.TUtil;
+
+public class WindowFunctionEval extends AggregationFunctionCallEval implements Cloneable {
+  @Expose private SortSpec [] sortSpecs;
+  @Expose WindowSpec.WindowFrame windowFrame;
+  private Tuple params;
+
+  public WindowFunctionEval(FunctionDesc desc, AggFunction instance, EvalNode[] givenArgs,
+                            WindowSpec.WindowFrame windowFrame) {
+    super(EvalType.WINDOW_FUNCTION, desc, instance, givenArgs);
+    this.windowFrame = windowFrame;
+  }
+
+  public boolean hasSortSpecs() {
+    return sortSpecs != null;
+  }
+
+  public void setSortSpecs(SortSpec [] sortSpecs) {
+    this.sortSpecs = sortSpecs;
+  }
+
+  public SortSpec [] getSortSpecs() {
+    return sortSpecs;
+  }
+
+  public WindowSpec.WindowFrame getWindowFrame() {
+    return windowFrame;
+  }
+
+  @Override
+  public Datum eval(Schema schema, Tuple tuple) {
+    throw new UnsupportedOperationException("Cannot execute eval() of aggregation function");
+  }
+
+  public void merge(FunctionContext context, Schema schema, Tuple tuple) {
+    if (params == null) {
+      this.params = new VTuple(argEvals.length);
+    }
+
+    if (argEvals != null) {
+      for (int i = 0; i < argEvals.length; i++) {
+        params.put(i, argEvals[i].eval(schema, tuple));
+      }
+    }
+
+    instance.eval(context, params);
+  }
+
+  public Datum terminate(FunctionContext context) {
+    return instance.terminate(context);
+  }
+
+  @Override
+  public DataType getValueType() {
+    return funcDesc.getReturnType();
+  }
+
+  @Override
+  public Object clone() throws CloneNotSupportedException {
+    WindowFunctionEval windowFunctionEval = (WindowFunctionEval) super.clone();
+    if (sortSpecs != null) {
+      windowFunctionEval.sortSpecs = new SortSpec[sortSpecs.length];
+      for (int i = 0; i < sortSpecs.length; i++) {
+        windowFunctionEval.sortSpecs[i] = (SortSpec) sortSpecs[i].clone();
+      }
+    }
+    return windowFunctionEval;
+  }
+
+  public String toString() {
+    StringBuilder sb = new StringBuilder();
+    if (argEvals != null) {
+      for(int i=0; i < argEvals.length; i++) {
+        sb.append(argEvals[i]);
+        if(i+1 < argEvals.length)
+          sb.append(",");
+      }
+    }
+    sb.append(funcDesc.getSignature()).append("(").append(isDistinct() ? " distinct" : "").append(sb)
+        .append(")");
+    if (hasSortSpecs()) {
+      sb.append("ORDER BY ").append(TUtil.arrayToString(sortSpecs));
+    }
+    return sb.toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/d3ee50a9/tajo-core/src/main/java/org/apache/tajo/engine/function/WindowAggFunc.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/function/WindowAggFunc.java b/tajo-core/src/main/java/org/apache/tajo/engine/function/WindowAggFunc.java
new file mode 100644
index 0000000..164738a
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/function/WindowAggFunc.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.function;
+
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.json.CatalogGsonHelper;
+import org.apache.tajo.catalog.proto.CatalogProtos;
+import org.apache.tajo.common.TajoDataTypes.DataType;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.exception.InvalidOperationException;
+import org.apache.tajo.storage.Tuple;
+
+public abstract class WindowAggFunc<T extends Datum> extends AggFunction<T> {
+
+  public WindowAggFunc(Column[] definedArgs) {
+    super(definedArgs);
+  }
+
+  public abstract FunctionContext newContext();
+
+  public abstract void eval(FunctionContext ctx, Tuple params);
+
+  public void merge(FunctionContext ctx, Tuple part) {
+    throw new InvalidOperationException("Window function does not support getPartialResult()");
+  }
+
+  public Datum getPartialResult(FunctionContext ctx) {
+    throw new InvalidOperationException("Window function does not support getPartialResult()");
+  }
+
+  public DataType getPartialResultType() {
+    throw new InvalidOperationException("Window function does not support getPartialResultType()");
+  }
+
+  public abstract T terminate(FunctionContext ctx);
+
+  @Override
+  public String toJson() {
+    return CatalogGsonHelper.toJson(this, WindowAggFunc.class);
+  }
+
+  @Override
+  public CatalogProtos.FunctionType getFunctionType() {
+    return CatalogProtos.FunctionType.WINDOW;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/d3ee50a9/tajo-core/src/main/java/org/apache/tajo/engine/function/window/Rank.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/function/window/Rank.java b/tajo-core/src/main/java/org/apache/tajo/engine/function/window/Rank.java
new file mode 100644
index 0000000..e461f9d
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/function/window/Rank.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.function.window;
+
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.proto.CatalogProtos;
+import org.apache.tajo.common.TajoDataTypes;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.datum.DatumFactory;
+import org.apache.tajo.datum.Int8Datum;
+import org.apache.tajo.engine.function.FunctionContext;
+import org.apache.tajo.engine.function.WindowAggFunc;
+import org.apache.tajo.engine.function.annotation.Description;
+import org.apache.tajo.engine.function.annotation.ParamTypes;
+import org.apache.tajo.storage.Tuple;
+
+@Description(
+    functionName = "rank",
+    description = " The number of rows for "
+        + "which the supplied expressions are unique and non-NULL.",
+    example = "> SELECT rank() OVER (ORDER BY x) FROM ...;",
+    returnType = TajoDataTypes.Type.INT8,
+    paramTypes = {@ParamTypes(paramTypes = {})}
+)
+public final class Rank extends WindowAggFunc {
+
+  public Rank() {
+    super(new Column[] {
+        new Column("expr", TajoDataTypes.Type.ANY)
+    });
+  }
+
+  public static boolean checkIfDistinctValue(RankContext context, Tuple params) {
+    for (int i = 0; i < context.latest.length; i++) {
+      if (!context.latest[i].equalsTo(params.get(i)).isTrue()) {
+        return true;
+      }
+    }
+
+    return false;
+  }
+
+  @Override
+  public void eval(FunctionContext context, Tuple params) {
+    RankContext ctx = (RankContext) context;
+
+    if ((ctx.latest == null || checkIfDistinctValue(ctx, params))) {
+      ctx.rank = ctx.accumulatedCount;
+      ctx.latest = params.getValues().clone();
+    }
+    ctx.accumulatedCount++;
+  }
+
+  @Override
+  public Int8Datum terminate(FunctionContext ctx) {
+    return DatumFactory.createInt8(((RankContext) ctx).rank);
+  }
+
+  @Override
+  public FunctionContext newContext() {
+    return new RankContext();
+  }
+
+  private class RankContext implements FunctionContext {
+    long rank = 0;
+    long accumulatedCount = 1;
+    Datum [] latest = null;
+  }
+
+  @Override
+  public CatalogProtos.FunctionType getFunctionType() {
+    return CatalogProtos.FunctionType.WINDOW;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/d3ee50a9/tajo-core/src/main/java/org/apache/tajo/engine/function/window/RowNumber.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/function/window/RowNumber.java b/tajo-core/src/main/java/org/apache/tajo/engine/function/window/RowNumber.java
new file mode 100644
index 0000000..8b0943f
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/function/window/RowNumber.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.function.window;
+
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.common.TajoDataTypes.Type;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.datum.DatumFactory;
+import org.apache.tajo.datum.Int8Datum;
+import org.apache.tajo.engine.function.FunctionContext;
+import org.apache.tajo.engine.function.WindowAggFunc;
+import org.apache.tajo.engine.function.annotation.Description;
+import org.apache.tajo.engine.function.annotation.ParamTypes;
+import org.apache.tajo.storage.Tuple;
+
+@Description(
+  functionName = "row_number",
+  description = "the total number of retrieved rows",
+  example = "> SELECT row_number() OVER ();",
+  returnType = Type.INT8,
+  paramTypes = {@ParamTypes(paramTypes = {})}
+)
+public class RowNumber extends WindowAggFunc<Datum> {
+
+  public RowNumber() {
+    super(NoArgs);
+  }
+
+  protected RowNumber(Column[] columns) {
+    super(columns);
+  }
+
+  @Override
+  public FunctionContext newContext() {
+    return new RowNumberContext();
+  }
+
+  @Override
+  public void eval(FunctionContext ctx, Tuple params) {
+    ((RowNumberContext) ctx).count++;
+  }
+
+  @Override
+  public Int8Datum terminate(FunctionContext ctx) {
+    return DatumFactory.createInt8(((RowNumberContext) ctx).count);
+  }
+
+  protected class RowNumberContext implements FunctionContext {
+    long count = 0;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/d3ee50a9/tajo-core/src/main/java/org/apache/tajo/engine/parser/SQLAnalyzer.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/parser/SQLAnalyzer.java b/tajo-core/src/main/java/org/apache/tajo/engine/parser/SQLAnalyzer.java
index 59a9b46..580ec61 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/parser/SQLAnalyzer.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/parser/SQLAnalyzer.java
@@ -36,8 +36,8 @@ import java.util.*;
 
 import static org.apache.tajo.algebra.Aggregation.GroupElement;
 import static org.apache.tajo.algebra.CreateTable.*;
-import static org.apache.tajo.algebra.WindowSpecExpr.WindowFrameEndBoundType;
-import static org.apache.tajo.algebra.WindowSpecExpr.WindowFrameStartBoundType;
+import static org.apache.tajo.algebra.WindowSpec.WindowFrameEndBoundType;
+import static org.apache.tajo.algebra.WindowSpec.WindowFrameStartBoundType;
 import static org.apache.tajo.common.TajoDataTypes.Type;
 import static org.apache.tajo.engine.parser.SQLParser.*;
 
@@ -347,7 +347,7 @@ public class SQLAnalyzer extends SQLParserBaseVisitor<Expr> {
     if (checkIfExist(windowNameOrSpec.window_name())) {
       windowFunction.setWindowName(windowNameOrSpec.window_name().getText());
     } else {
-      windowFunction.setWindowSpec(visitWindow_specification(windowNameOrSpec.window_specification()));
+      windowFunction.setWindowSpec(buildWindowSpec(windowNameOrSpec.window_specification()));
     }
 
     return windowFunction;
@@ -360,14 +360,14 @@ public class SQLAnalyzer extends SQLParserBaseVisitor<Expr> {
     for (int i = 0; i < definitions.length; i++) {
       Window_definitionContext windowDefinitionContext = ctx.window_definition_list().window_definition(i);
       String windowName = windowDefinitionContext.window_name().identifier().getText();
-      WindowSpecExpr windowSpec = visitWindow_specification(windowDefinitionContext.window_specification());
+      WindowSpec windowSpec = buildWindowSpec(windowDefinitionContext.window_specification());
       definitions[i] = new Window.WindowDefinition(windowName, windowSpec);
     }
     return new Window(definitions);
   }
 
-  @Override public WindowSpecExpr visitWindow_specification(@NotNull SQLParser.Window_specificationContext ctx) {
-    WindowSpecExpr windowSpec = new WindowSpecExpr();
+  public WindowSpec buildWindowSpec(SQLParser.Window_specificationContext ctx) {
+    WindowSpec windowSpec = new WindowSpec();
     if (checkIfExist(ctx.window_specification_details())) {
       Window_specification_detailsContext windowSpecDetail = ctx.window_specification_details();
 
@@ -388,26 +388,26 @@ public class SQLAnalyzer extends SQLParserBaseVisitor<Expr> {
       if (checkIfExist(windowSpecDetail.window_frame_clause())) {
         Window_frame_clauseContext frameContext = windowSpecDetail.window_frame_clause();
 
-        WindowSpecExpr.WindowFrameUnit unit;
+        WindowSpec.WindowFrameUnit unit;
         // frame unit - there are only two cases: RANGE and ROW
         if (checkIfExist(frameContext.window_frame_units().RANGE())) {
-          unit = WindowSpecExpr.WindowFrameUnit.RANGE;
+          unit = WindowSpec.WindowFrameUnit.RANGE;
         } else {
-          unit = WindowSpecExpr.WindowFrameUnit.ROW;
+          unit = WindowSpec.WindowFrameUnit.ROW;
         }
 
-        WindowSpecExpr.WindowFrame windowFrame;
+        WindowSpec.WindowFrame windowFrame;
 
         if (checkIfExist(frameContext.window_frame_extent().window_frame_between())) { // when 'between' is given
           Window_frame_betweenContext between = frameContext.window_frame_extent().window_frame_between();
-          WindowSpecExpr.WindowStartBound startBound = buildWindowStartBound(between.window_frame_start_bound());
-          WindowSpecExpr.WindowEndBound endBound = buildWindowEndBound(between.window_frame_end_bound());
+          WindowSpec.WindowStartBound startBound = buildWindowStartBound(between.window_frame_start_bound());
+          WindowSpec.WindowEndBound endBound = buildWindowEndBound(between.window_frame_end_bound());
 
-          windowFrame = new WindowSpecExpr.WindowFrame(unit, startBound, endBound);
+          windowFrame = new WindowSpec.WindowFrame(unit, startBound, endBound);
         } else { // if there is only start bound
-          WindowSpecExpr.WindowStartBound startBound =
+          WindowSpec.WindowStartBound startBound =
               buildWindowStartBound(frameContext.window_frame_extent().window_frame_start_bound());
-          windowFrame = new WindowSpecExpr.WindowFrame(unit, startBound);
+          windowFrame = new WindowSpec.WindowFrame(unit, startBound);
         }
 
         windowSpec.setWindowFrame(windowFrame);
@@ -416,7 +416,7 @@ public class SQLAnalyzer extends SQLParserBaseVisitor<Expr> {
     return windowSpec;
   }
 
-  public WindowSpecExpr.WindowStartBound buildWindowStartBound(Window_frame_start_boundContext context) {
+  public WindowSpec.WindowStartBound buildWindowStartBound(Window_frame_start_boundContext context) {
     WindowFrameStartBoundType boundType = null;
     if (checkIfExist(context.UNBOUNDED())) {
       boundType = WindowFrameStartBoundType.UNBOUNDED_PRECEDING;
@@ -426,7 +426,7 @@ public class SQLAnalyzer extends SQLParserBaseVisitor<Expr> {
       boundType = WindowFrameStartBoundType.CURRENT_ROW;
     }
 
-    WindowSpecExpr.WindowStartBound bound = new WindowSpecExpr.WindowStartBound(boundType);
+    WindowSpec.WindowStartBound bound = new WindowSpec.WindowStartBound(boundType);
     if (boundType == WindowFrameStartBoundType.PRECEDING) {
       bound.setNumber(visitUnsigned_value_specification(context.unsigned_value_specification()));
     }
@@ -434,7 +434,7 @@ public class SQLAnalyzer extends SQLParserBaseVisitor<Expr> {
     return bound;
   }
 
-  public WindowSpecExpr.WindowEndBound buildWindowEndBound(Window_frame_end_boundContext context) {
+  public WindowSpec.WindowEndBound buildWindowEndBound(Window_frame_end_boundContext context) {
     WindowFrameEndBoundType boundType;
     if (checkIfExist(context.UNBOUNDED())) {
       boundType = WindowFrameEndBoundType.UNBOUNDED_FOLLOWING;
@@ -444,7 +444,7 @@ public class SQLAnalyzer extends SQLParserBaseVisitor<Expr> {
       boundType = WindowFrameEndBoundType.CURRENT_ROW;
     }
 
-    WindowSpecExpr.WindowEndBound endBound = new WindowSpecExpr.WindowEndBound(boundType);
+    WindowSpec.WindowEndBound endBound = new WindowSpec.WindowEndBound(boundType);
     if (boundType == WindowFrameEndBoundType.FOLLOWING) {
       endBound.setNumber(visitUnsigned_value_specification(context.unsigned_value_specification()));
     }
@@ -1275,7 +1275,7 @@ public class SQLAnalyzer extends SQLParserBaseVisitor<Expr> {
       return new ListPartition(buildColumnReferenceList(ctx.list_partitions().column_reference_list()), specifiers);
 
     } else if (checkIfExist(ctx.column_partitions())) { // For Column Partition (Hive Style)
-      return new CreateTable.ColumnPartition(getDefinitions(ctx.column_partitions().table_elements()), true);
+      return new CreateTable.ColumnPartition(getDefinitions(ctx.column_partitions().table_elements()));
     } else {
       throw new SQLSyntaxError("Invalid Partition Type: " + ctx.toStringTree());
     }

http://git-wip-us.apache.org/repos/asf/tajo/blob/d3ee50a9/tajo-core/src/main/java/org/apache/tajo/engine/planner/AlgebraVisitor.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/AlgebraVisitor.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/AlgebraVisitor.java
index 5811d36..8c3e606 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/AlgebraVisitor.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/AlgebraVisitor.java
@@ -91,13 +91,14 @@ public interface AlgebraVisitor<CONTEXT, RESULT> {
   RESULT visitSign(CONTEXT ctx, Stack<Expr> stack, SignedExpr expr) throws PlanningException;
   RESULT visitColumnReference(CONTEXT ctx, Stack<Expr> stack, ColumnReferenceExpr expr) throws PlanningException;
   RESULT visitTargetExpr(CONTEXT ctx, Stack<Expr> stack, NamedExpr expr) throws PlanningException;
-  RESULT visitFunction(CONTEXT ctx, Stack<Expr> stack, FunctionExpr expr) throws PlanningException;
   RESULT visitQualifiedAsterisk(CONTEXT ctx, Stack<Expr> stack, QualifiedAsteriskExpr expr) throws PlanningException;
 
-  // set functions
-  RESULT visitCountRowsFunction(CONTEXT ctx, Stack<Expr> stack, CountRowsFunctionExpr expr) throws PlanningException;
+  // functions
+  RESULT visitFunction(CONTEXT ctx, Stack<Expr> stack, FunctionExpr expr) throws PlanningException;
   RESULT visitGeneralSetFunction(CONTEXT ctx, Stack<Expr> stack, GeneralSetFunctionExpr expr)
       throws PlanningException;
+  RESULT visitCountRowsFunction(CONTEXT ctx, Stack<Expr> stack, CountRowsFunctionExpr expr) throws PlanningException;
+  RESULT visitWindowFunction(CONTEXT ctx, Stack<Expr> stack, WindowFunctionExpr expr) throws PlanningException;
 
   // Literal
   RESULT visitCastExpr(CONTEXT ctx, Stack<Expr> stack, CastExpr expr) throws PlanningException;

http://git-wip-us.apache.org/repos/asf/tajo/blob/d3ee50a9/tajo-core/src/main/java/org/apache/tajo/engine/planner/BaseAlgebraVisitor.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/BaseAlgebraVisitor.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/BaseAlgebraVisitor.java
index 907042a..24ff2e4 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/BaseAlgebraVisitor.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/BaseAlgebraVisitor.java
@@ -230,6 +230,10 @@ public class BaseAlgebraVisitor<CONTEXT, RESULT> implements AlgebraVisitor<CONTE
     case GeneralSetFunction:
       current = visitGeneralSetFunction(ctx, stack, (GeneralSetFunctionExpr) expr);
       break;
+    case WindowFunction:
+      current = visitWindowFunction(ctx, stack, (WindowFunctionExpr) expr);
+      break;
+
 
     case DataType:
       current = visitDataType(ctx, stack, (DataTypeExpr) expr);
@@ -703,6 +707,39 @@ public class BaseAlgebraVisitor<CONTEXT, RESULT> implements AlgebraVisitor<CONTE
     return result;
   }
 
+  @Override
+  public RESULT visitWindowFunction(CONTEXT ctx, Stack<Expr> stack, WindowFunctionExpr expr) throws PlanningException {
+    stack.push(expr);
+    RESULT result = null;
+    for (Expr param : expr.getParams()) {
+      result = visit(ctx, stack, param);
+    }
+
+    WindowSpec windowSpec = expr.getWindowSpec();
+
+    if (windowSpec.hasPartitionBy()) {
+      for (Expr partitionKey : windowSpec.getPartitionKeys()) {
+        visit(ctx, stack, partitionKey);
+      }
+    }
+    if (windowSpec.hasOrderBy()) {
+      for (Sort.SortSpec sortKey : windowSpec.getSortSpecs()) {
+        visit(ctx, stack, sortKey.getKey());
+      }
+    }
+    if (windowSpec.hasWindowFrame()) {
+      if (windowSpec.getWindowFrame().getStartBound().hasNumber()) {
+        visit(ctx, stack, windowSpec.getWindowFrame().getStartBound().getNumber());
+      }
+      if (windowSpec.getWindowFrame().getEndBound().hasNumber()) {
+        visit(ctx, stack, windowSpec.getWindowFrame().getEndBound().getNumber());
+      }
+    }
+
+    stack.pop();
+    return result;
+  }
+
   ///////////////////////////////////////////////////////////////////////////////////////////////////////////
   // Literal Section
   ///////////////////////////////////////////////////////////////////////////////////////////////////////////

http://git-wip-us.apache.org/repos/asf/tajo/blob/d3ee50a9/tajo-core/src/main/java/org/apache/tajo/engine/planner/BasicLogicalPlanVisitor.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/BasicLogicalPlanVisitor.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/BasicLogicalPlanVisitor.java
index 2112615..a4e90b4 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/BasicLogicalPlanVisitor.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/BasicLogicalPlanVisitor.java
@@ -74,6 +74,9 @@ public class BasicLogicalPlanVisitor<CONTEXT, RESULT> implements LogicalPlanVisi
       case GROUP_BY:
         current = visitGroupBy(context, plan, block, (GroupbyNode) node, stack);
         break;
+      case WINDOW_AGG:
+        current = visitWindowAgg(context, plan, block, (WindowAggNode) node, stack);
+        break;
       case DISTINCT_GROUP_BY:
         current = visitDistinct(context, plan, block, (DistinctGroupbyNode) node, stack);
         break;
@@ -191,6 +194,14 @@ public class BasicLogicalPlanVisitor<CONTEXT, RESULT> implements LogicalPlanVisi
   }
 
   @Override
+  public RESULT visitWindowAgg(CONTEXT context, LogicalPlan plan, LogicalPlan.QueryBlock block, WindowAggNode node,
+                               Stack<LogicalNode> stack) throws PlanningException {
+    stack.push(node);
+    RESULT result = visit(context, plan, block, node.getChild(), stack);
+    stack.pop();
+    return result;
+  }
+
   public RESULT visitDistinct(CONTEXT context, LogicalPlan plan, LogicalPlan.QueryBlock block, DistinctGroupbyNode node,
                              Stack<LogicalNode> stack) throws PlanningException {
     stack.push(node);

http://git-wip-us.apache.org/repos/asf/tajo/blob/d3ee50a9/tajo-core/src/main/java/org/apache/tajo/engine/planner/ExplainLogicalPlanVisitor.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/ExplainLogicalPlanVisitor.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/ExplainLogicalPlanVisitor.java
index ad9bdf1..a7e5375 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/ExplainLogicalPlanVisitor.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/ExplainLogicalPlanVisitor.java
@@ -109,6 +109,11 @@ public class ExplainLogicalPlanVisitor extends BasicLogicalPlanVisitor<ExplainLo
   }
 
   @Override
+  public LogicalNode visitWindowAgg(Context context, LogicalPlan plan, LogicalPlan.QueryBlock block, WindowAggNode node,
+                                    Stack<LogicalNode> stack) throws PlanningException {
+    return visitUnaryNode(context, plan, block, node, stack);
+  }
+
   public LogicalNode visitDistinct(Context context, LogicalPlan plan, LogicalPlan.QueryBlock block, DistinctGroupbyNode node,
                                   Stack<LogicalNode> stack) throws PlanningException {
     return visitUnaryNode(context, plan, block, node, stack);

http://git-wip-us.apache.org/repos/asf/tajo/blob/d3ee50a9/tajo-core/src/main/java/org/apache/tajo/engine/planner/ExprAnnotator.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/ExprAnnotator.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/ExprAnnotator.java
index e143823..2c386b2 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/ExprAnnotator.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/ExprAnnotator.java
@@ -19,13 +19,14 @@
 package org.apache.tajo.engine.planner;
 
 import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
 import org.apache.tajo.algebra.*;
 import org.apache.tajo.catalog.CatalogService;
 import org.apache.tajo.catalog.CatalogUtil;
 import org.apache.tajo.catalog.Column;
 import org.apache.tajo.catalog.FunctionDesc;
 import org.apache.tajo.catalog.exception.NoSuchFunctionException;
-import org.apache.tajo.catalog.proto.CatalogProtos;
+import org.apache.tajo.common.TajoDataTypes;
 import org.apache.tajo.datum.*;
 import org.apache.tajo.engine.eval.*;
 import org.apache.tajo.engine.function.AggFunction;
@@ -38,10 +39,17 @@ import org.apache.tajo.util.datetime.DateTimeUtil;
 import org.apache.tajo.util.datetime.TimeMeta;
 
 import java.util.Map;
+import java.util.Set;
 import java.util.Stack;
 
+import static org.apache.tajo.algebra.WindowSpec.WindowFrameEndBoundType;
+import static org.apache.tajo.algebra.WindowSpec.WindowFrameStartBoundType;
+import static org.apache.tajo.catalog.proto.CatalogProtos.FunctionType;
 import static org.apache.tajo.common.TajoDataTypes.DataType;
 import static org.apache.tajo.common.TajoDataTypes.Type;
+import static org.apache.tajo.engine.planner.logical.WindowSpec.WindowEndBound;
+import static org.apache.tajo.engine.planner.logical.WindowSpec.WindowFrame;
+import static org.apache.tajo.engine.planner.logical.WindowSpec.WindowStartBound;
 
 /**
  * <code>ExprAnnotator</code> makes an annotated expression called <code>EvalNode</code> from an
@@ -122,7 +130,7 @@ public class ExprAnnotator extends BaseAlgebraVisitor<ExprAnnotator.Context, Eva
    * @return The widest DataType
    * @throws PlanningException when types are not compatible, it throws the exception.
    */
-  static DataType getWidestType(DataType...types) throws PlanningException {
+  public static DataType getWidestType(DataType...types) throws PlanningException {
     DataType widest = types[0];
     for (int i = 1; i < types.length; i++) {
 
@@ -541,6 +549,10 @@ public class ExprAnnotator extends BaseAlgebraVisitor<ExprAnnotator.Context, Eva
     throw new PlanningException("ExprAnnotator cannot take NamedExpr");
   }
 
+  ///////////////////////////////////////////////////////////////////////////////////////////////////////////
+  // Functions and General Set Functions Section
+  ///////////////////////////////////////////////////////////////////////////////////////////////////////////
+
   @Override
   public EvalNode visitFunction(Context ctx, Stack<Expr> stack, FunctionExpr expr) throws PlanningException {
     stack.push(expr); // <--- Push
@@ -589,18 +601,18 @@ public class ExprAnnotator extends BaseAlgebraVisitor<ExprAnnotator.Context, Eva
 
 
     try {
-      CatalogProtos.FunctionType functionType = funcDesc.getFuncType();
-      if (functionType == CatalogProtos.FunctionType.GENERAL
-          || functionType == CatalogProtos.FunctionType.UDF) {
+      FunctionType functionType = funcDesc.getFuncType();
+      if (functionType == FunctionType.GENERAL
+          || functionType == FunctionType.UDF) {
         return new GeneralFunctionEval(funcDesc, (GeneralFunction) funcDesc.newInstance(), givenArgs);
-      } else if (functionType == CatalogProtos.FunctionType.AGGREGATION
-          || functionType == CatalogProtos.FunctionType.UDA) {
+      } else if (functionType == FunctionType.AGGREGATION
+          || functionType == FunctionType.UDA) {
         if (!ctx.currentBlock.hasNode(NodeType.GROUP_BY)) {
           ctx.currentBlock.setAggregationRequire();
         }
         return new AggregationFunctionCallEval(funcDesc, (AggFunction) funcDesc.newInstance(), givenArgs);
-      } else if (functionType == CatalogProtos.FunctionType.DISTINCT_AGGREGATION
-          || functionType == CatalogProtos.FunctionType.DISTINCT_UDA) {
+      } else if (functionType == FunctionType.DISTINCT_AGGREGATION
+          || functionType == FunctionType.DISTINCT_UDA) {
         throw new PlanningException("Unsupported function: " + funcDesc.toString());
       } else {
         throw new PlanningException("Unsupported Function Type: " + functionType.name());
@@ -610,14 +622,10 @@ public class ExprAnnotator extends BaseAlgebraVisitor<ExprAnnotator.Context, Eva
     }
   }
 
-  ///////////////////////////////////////////////////////////////////////////////////////////////////////////
-  // General Set Section
-  ///////////////////////////////////////////////////////////////////////////////////////////////////////////
-
   @Override
   public EvalNode visitCountRowsFunction(Context ctx, Stack<Expr> stack, CountRowsFunctionExpr expr)
       throws PlanningException {
-    FunctionDesc countRows = catalog.getFunction("count", CatalogProtos.FunctionType.AGGREGATION,
+    FunctionDesc countRows = catalog.getFunction("count", FunctionType.AGGREGATION,
         new DataType[] {});
     if (countRows == null) {
       throw new NoSuchFunctionException(expr.getSignature(), new DataType[]{});
@@ -641,8 +649,8 @@ public class ExprAnnotator extends BaseAlgebraVisitor<ExprAnnotator.Context, Eva
     EvalNode[] givenArgs = new EvalNode[params.length];
     DataType[] paramTypes = new DataType[params.length];
 
-    CatalogProtos.FunctionType functionType = setFunction.isDistinct() ?
-        CatalogProtos.FunctionType.DISTINCT_AGGREGATION : CatalogProtos.FunctionType.AGGREGATION;
+    FunctionType functionType = setFunction.isDistinct() ?
+        FunctionType.DISTINCT_AGGREGATION : FunctionType.AGGREGATION;
     givenArgs[0] = visit(ctx, stack, params[0]);
     if (setFunction.getSignature().equalsIgnoreCase("count")) {
       paramTypes[0] = CatalogUtil.newSimpleDataType(Type.ANY);
@@ -666,6 +674,91 @@ public class ExprAnnotator extends BaseAlgebraVisitor<ExprAnnotator.Context, Eva
     }
   }
 
+  public static final Set<String> WINDOW_FUNCTIONS =
+      Sets.newHashSet("row_number", "rank", "dense_rank", "percent_rank", "cume_dist");
+
+  public EvalNode visitWindowFunction(Context ctx, Stack<Expr> stack, WindowFunctionExpr windowFunc)
+      throws PlanningException {
+
+    WindowSpec windowSpec = windowFunc.getWindowSpec();
+
+    Expr key;
+    if (windowSpec.hasPartitionBy()) {
+      for (int i = 0; i < windowSpec.getPartitionKeys().length; i++) {
+        key = windowSpec.getPartitionKeys()[i];
+        visit(ctx, stack, key);
+      }
+    }
+
+    EvalNode [] sortKeys = null;
+    if (windowSpec.hasOrderBy()) {
+      sortKeys = new EvalNode[windowSpec.getSortSpecs().length];
+      for (int i = 0; i < windowSpec.getSortSpecs().length; i++) {
+        key = windowSpec.getSortSpecs()[i].getKey();
+        sortKeys[i] = visit(ctx, stack, key);
+      }
+    }
+
+    String funcName = windowFunc.getSignature();
+    boolean distinct = windowFunc.isDistinct();
+    Expr[] params = windowFunc.getParams();
+    EvalNode[] givenArgs = new EvalNode[params.length];
+    TajoDataTypes.DataType[] paramTypes = new TajoDataTypes.DataType[params.length];
+    FunctionType functionType;
+
+    WindowFrame frame = null;
+
+    if (params.length > 0) {
+      givenArgs[0] = visit(ctx, stack, params[0]);
+      if (windowFunc.getSignature().equalsIgnoreCase("count")) {
+        paramTypes[0] = CatalogUtil.newSimpleDataType(TajoDataTypes.Type.ANY);
+      } else if (windowFunc.getSignature().equalsIgnoreCase("row_number")) {
+        paramTypes[0] = CatalogUtil.newSimpleDataType(Type.INT8);
+      } else {
+        paramTypes[0] = givenArgs[0].getValueType();
+      }
+    } else {
+      if (windowFunc.getSignature().equalsIgnoreCase("rank")) {
+        givenArgs = sortKeys != null ? sortKeys : new EvalNode[0];
+      }
+    }
+
+    if (frame == null) {
+      if (windowSpec.hasOrderBy()) {
+        frame = new WindowFrame(new WindowStartBound(WindowFrameStartBoundType.UNBOUNDED_PRECEDING),
+            new WindowEndBound(WindowFrameEndBoundType.CURRENT_ROW));
+      } else if (windowFunc.getSignature().equalsIgnoreCase("row_number")) {
+        frame = new WindowFrame(new WindowStartBound(WindowFrameStartBoundType.UNBOUNDED_PRECEDING),
+            new WindowEndBound(WindowFrameEndBoundType.UNBOUNDED_FOLLOWING));
+      } else {
+        frame = new WindowFrame();
+      }
+    }
+
+    // TODO - containFunction and getFunction should support the function type mask which provides ORing multiple types.
+    // the below checking against WINDOW_FUNCTIONS is a workaround code for the above problem.
+    if (WINDOW_FUNCTIONS.contains(funcName.toLowerCase())) {
+      if (distinct) {
+        throw new NoSuchFunctionException("row_number() does not support distinct keyword.");
+      }
+      functionType = FunctionType.WINDOW;
+    } else {
+      functionType = distinct ? FunctionType.DISTINCT_AGGREGATION : FunctionType.AGGREGATION;
+    }
+
+    if (!catalog.containFunction(windowFunc.getSignature(), functionType, paramTypes)) {
+      throw new NoSuchFunctionException(funcName, paramTypes);
+    }
+
+    FunctionDesc funcDesc = catalog.getFunction(funcName, functionType, paramTypes);
+
+    try {
+      return new WindowFunctionEval(funcDesc, (AggFunction) funcDesc.newInstance(), givenArgs, frame);
+    } catch (InternalException e) {
+      throw new PlanningException(e);
+    }
+  }
+
   ///////////////////////////////////////////////////////////////////////////////////////////////////////////
   // Literal Section
   ///////////////////////////////////////////////////////////////////////////////////////////////////////////

http://git-wip-us.apache.org/repos/asf/tajo/blob/d3ee50a9/tajo-core/src/main/java/org/apache/tajo/engine/planner/ExprNormalizer.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/ExprNormalizer.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/ExprNormalizer.java
index 75b2b95..81bbd41 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/ExprNormalizer.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/ExprNormalizer.java
@@ -18,12 +18,16 @@
 
 package org.apache.tajo.engine.planner;
 
+import com.google.common.collect.Sets;
+import com.google.common.collect.Sets;
 import org.apache.tajo.algebra.*;
 import org.apache.tajo.catalog.CatalogUtil;
 import org.apache.tajo.engine.exception.NoSuchColumnException;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Set;
+import java.util.Set;
 import java.util.Stack;
 
 /**
@@ -87,6 +91,8 @@ class ExprNormalizer extends SimpleAlgebraVisitor<ExprNormalizer.ExprNormalizedR
                    // function.
     List<NamedExpr> aggExprs = new ArrayList<NamedExpr>(); // aggregation functions
     List<NamedExpr> scalarExprs = new ArrayList<NamedExpr>(); // scalar expressions which can be referred
+    List<NamedExpr> windowAggExprs = new ArrayList<NamedExpr>(); // window expressions which can be referred
+    Set<WindowSpecReferences> windowSpecs = Sets.newLinkedHashSet();
 
     private ExprNormalizedResult(LogicalPlanner.PlanContext context, boolean tryBinaryCommonTermsElimination) {
       this.plan = context.plan;
@@ -248,7 +254,7 @@ class ExprNormalizer extends SimpleAlgebraVisitor<ExprNormalizer.ExprNormalizedR
 
       // If parameters are all constants, we don't need to dissect an aggregation expression into two parts:
       // function and parameter parts.
-      if (!OpType.isLiteral(param.getType()) && param.getType() != OpType.Column) {
+      if (!OpType.isLiteralType(param.getType()) && param.getType() != OpType.Column) {
         String referenceName = ctx.block.namedExprsMgr.addExpr(param);
         ctx.scalarExprs.add(new NamedExpr(param, referenceName));
         expr.getParams()[i] = new ColumnReferenceExpr(referenceName);
@@ -258,6 +264,54 @@ class ExprNormalizer extends SimpleAlgebraVisitor<ExprNormalizer.ExprNormalizedR
     return expr;
   }
 
+  public Expr visitWindowFunction(ExprNormalizedResult ctx, Stack<Expr> stack, WindowFunctionExpr expr)
+      throws PlanningException {
+    stack.push(expr);
+
+    WindowSpec windowSpec = expr.getWindowSpec();
+    Expr key;
+
+    WindowSpecReferences windowSpecReferences;
+    if (windowSpec.hasWindowName()) {
+      windowSpecReferences = new WindowSpecReferences(windowSpec.getWindowName());
+    } else {
+      String [] partitionKeyReferenceNames = null;
+      if (windowSpec.hasPartitionBy()) {
+        partitionKeyReferenceNames = new String [windowSpec.getPartitionKeys().length];
+        for (int i = 0; i < windowSpec.getPartitionKeys().length; i++) {
+          key = windowSpec.getPartitionKeys()[i];
+          visit(ctx, stack, key);
+          partitionKeyReferenceNames[i] = ctx.block.namedExprsMgr.addExpr(key);
+        }
+      }
+
+      String [] orderKeyReferenceNames = null;
+      if (windowSpec.hasOrderBy()) {
+        orderKeyReferenceNames = new String[windowSpec.getSortSpecs().length];
+        for (int i = 0; i < windowSpec.getSortSpecs().length; i++) {
+          key = windowSpec.getSortSpecs()[i].getKey();
+          visit(ctx, stack, key);
+          String referenceName = ctx.block.namedExprsMgr.addExpr(key);
+          if (OpType.isAggregationFunction(key.getType())) {
+            ctx.aggExprs.add(new NamedExpr(key, referenceName));
+            windowSpec.getSortSpecs()[i].setKey(new ColumnReferenceExpr(referenceName));
+          }
+          orderKeyReferenceNames[i] = referenceName;
+        }
+      }
+      windowSpecReferences =
+          new WindowSpecReferences(partitionKeyReferenceNames,orderKeyReferenceNames);
+    }
+    ctx.windowSpecs.add(windowSpecReferences);
+
+    String funcExprRef = ctx.block.namedExprsMgr.addExpr(expr);
+    ctx.windowAggExprs.add(new NamedExpr(expr, funcExprRef));
+    stack.pop();
+
+    ctx.block.setHasWindowFunction();
+    return expr;
+  }
+
   ///////////////////////////////////////////////////////////////////////////////////////////////////////////
   // Literal Section
   ///////////////////////////////////////////////////////////////////////////////////////////////////////////
@@ -288,4 +342,40 @@ class ExprNormalizer extends SimpleAlgebraVisitor<ExprNormalizer.ExprNormalizedR
     }
     return expr;
   }
+
+  public static class WindowSpecReferences {
+    String windowName;
+
+    String [] partitionKeys;
+    String [] orderKeys;
+
+    public WindowSpecReferences(String windowName) {
+      this.windowName = windowName;
+    }
+
+    public WindowSpecReferences(String [] partitionKeys, String [] orderKeys) {
+      this.partitionKeys = partitionKeys;
+      this.orderKeys = orderKeys;
+    }
+
+    public String getWindowName() {
+      return windowName;
+    }
+
+    public boolean hasPartitionKeys() {
+      return partitionKeys != null;
+    }
+
+    public String [] getPartitionKeys() {
+      return partitionKeys;
+    }
+
+    public boolean hasOrderBy() {
+      return orderKeys != null;
+    }
+
+    public String [] getOrderKeys() {
+      return orderKeys;
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/d3ee50a9/tajo-core/src/main/java/org/apache/tajo/engine/planner/LogicalPlan.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/LogicalPlan.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/LogicalPlan.java
index 92df760..4e1d313 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/LogicalPlan.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/LogicalPlan.java
@@ -19,6 +19,7 @@
 package org.apache.tajo.engine.planner;
 
 import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
 import org.apache.commons.lang.ObjectUtils;
 import org.apache.tajo.algebra.*;
 import org.apache.tajo.annotation.NotThreadSafe;
@@ -48,6 +49,8 @@ public class LogicalPlan {
   /** the prefix character for virtual tables */
   public static final char VIRTUAL_TABLE_PREFIX='#';
   public static final char NONAMED_COLUMN_PREFIX='?';
+  public static final char NONAMED_WINDOW_PREFIX='^';
+
   /** it indicates the root block */
   public static final String ROOT_BLOCK = VIRTUAL_TABLE_PREFIX + "ROOT";
   public static final String NONAME_BLOCK_PREFIX = VIRTUAL_TABLE_PREFIX + "QB_";
@@ -55,6 +58,7 @@ public class LogicalPlan {
   private int nextPid = 0;
   private Integer noNameBlockId = 0;
   private Integer noNameColumnId = 0;
+  private Integer noNameWindowId = 0;
 
   /** a map from between a block name to a block plan */
   private Map<String, QueryBlock> queryBlocks = new LinkedHashMap<String, QueryBlock>();
@@ -134,6 +138,10 @@ public class LogicalPlan {
     return newAndGetBlock(NONAME_BLOCK_PREFIX + (noNameBlockId++));
   }
 
+  public void resetGeneratedId() {
+    noNameColumnId = 0;
+  }
+
   /**
    * It generates an unique column name from EvalNode. It is usually used for an expression or predicate without
    * a specified name (i.e., alias).
@@ -596,6 +604,8 @@ public class LogicalPlan {
     private final Map<String, List<String>> aliasMap = TUtil.newHashMap();
     private final Map<OpType, List<Expr>> operatorToExprMap = TUtil.newHashMap();
     private final List<RelationNode> relationList = TUtil.newList();
+    private boolean hasWindowFunction = false;
+
     /**
      * It's a map between nodetype and node. node types can be duplicated. So, latest node type is only kept.
      */
@@ -779,6 +789,14 @@ public class LogicalPlan {
       return (T) exprToNodeMap.get(ObjectUtils.identityToString(expr));
     }
 
+    public void setHasWindowFunction() {
+      hasWindowFunction = true;
+    }
+
+    public boolean hasWindowSpecs() {
+      return hasWindowFunction;
+    }
+
     /**
      * This flag can be changed as a plan is generated.
      *
@@ -823,4 +841,4 @@ public class LogicalPlan {
       return blockName;
     }
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/d3ee50a9/tajo-core/src/main/java/org/apache/tajo/engine/planner/LogicalPlanPreprocessor.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/LogicalPlanPreprocessor.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/LogicalPlanPreprocessor.java
index 2de96c4..4f1218f 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/LogicalPlanPreprocessor.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/LogicalPlanPreprocessor.java
@@ -20,8 +20,8 @@ package org.apache.tajo.engine.planner;
 
 import org.apache.tajo.algebra.*;
 import org.apache.tajo.catalog.*;
+import org.apache.tajo.common.TajoDataTypes;
 import org.apache.tajo.engine.eval.EvalNode;
-import org.apache.tajo.engine.eval.EvalType;
 import org.apache.tajo.engine.eval.FieldEval;
 import org.apache.tajo.engine.exception.NoSuchColumnException;
 import org.apache.tajo.engine.planner.LogicalPlan.QueryBlock;
@@ -35,13 +35,14 @@ import java.util.*;
 /**
  * It finds all relations for each block and builds base schema information.
  */
-class LogicalPlanPreprocessor extends BaseAlgebraVisitor<LogicalPlanPreprocessor.PreprocessContext, LogicalNode> {
+public class LogicalPlanPreprocessor extends BaseAlgebraVisitor<LogicalPlanPreprocessor.PreprocessContext, LogicalNode> {
+  private TypeDeterminant typeDeterminant;
   private ExprAnnotator annotator;
 
-  static class PreprocessContext {
-    Session session;
-    LogicalPlan plan;
-    LogicalPlan.QueryBlock currentBlock;
+  public static class PreprocessContext {
+    public Session session;
+    public LogicalPlan plan;
+    public LogicalPlan.QueryBlock currentBlock;
 
     public PreprocessContext(Session session, LogicalPlan plan, LogicalPlan.QueryBlock currentBlock) {
       this.session = session;
@@ -62,6 +63,7 @@ class LogicalPlanPreprocessor extends BaseAlgebraVisitor<LogicalPlanPreprocessor
   LogicalPlanPreprocessor(CatalogService catalog, ExprAnnotator annotator) {
     this.catalog = catalog;
     this.annotator = annotator;
+    this.typeDeterminant = new TypeDeterminant(catalog);
   }
 
   @Override
@@ -201,16 +203,13 @@ class LogicalPlanPreprocessor extends BaseAlgebraVisitor<LogicalPlanPreprocessor
 
     for (int i = 0; i < expr.getNamedExprs().length; i++) {
       NamedExpr namedExpr = expr.getNamedExprs()[i];
-      EvalNode evalNode = annotator.createEvalNode(ctx.plan, ctx.currentBlock, namedExpr.getExpr());
+      TajoDataTypes.DataType dataType = typeDeterminant.determineDataType(ctx, namedExpr.getExpr());
 
       if (namedExpr.hasAlias()) {
-        targets[i] = new Target(evalNode, namedExpr.getAlias());
-      } else if (evalNode.getType() == EvalType.FIELD) {
-        targets[i] = new Target((FieldEval) evalNode);
+        targets[i] = new Target(new FieldEval(new Column(namedExpr.getAlias(), dataType)));
       } else {
         String generatedName = ctx.plan.generateUniqueColumnName(namedExpr.getExpr());
-        targets[i] = new Target(evalNode, generatedName);
-        namedExpr.setAlias(generatedName);
+        targets[i] = new Target(new FieldEval(new Column(generatedName, dataType)));
       }
     }
     stack.pop(); // <--- Pop

http://git-wip-us.apache.org/repos/asf/tajo/blob/d3ee50a9/tajo-core/src/main/java/org/apache/tajo/engine/planner/LogicalPlanVisitor.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/LogicalPlanVisitor.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/LogicalPlanVisitor.java
index 963e9f1..0a36610 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/LogicalPlanVisitor.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/LogicalPlanVisitor.java
@@ -40,7 +40,8 @@ public interface LogicalPlanVisitor<CONTEXT, RESULT> {
 
   RESULT visitGroupBy(CONTEXT context, LogicalPlan plan, LogicalPlan.QueryBlock block, GroupbyNode node,
                       Stack<LogicalNode> stack) throws PlanningException;
-
+  RESULT visitWindowAgg(CONTEXT context, LogicalPlan plan, LogicalPlan.QueryBlock block, WindowAggNode node,
+                      Stack<LogicalNode> stack) throws PlanningException;
   RESULT visitDistinct(CONTEXT context, LogicalPlan plan, LogicalPlan.QueryBlock block, DistinctGroupbyNode node,
                                 Stack<LogicalNode> stack) throws PlanningException;
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/d3ee50a9/tajo-core/src/main/java/org/apache/tajo/engine/planner/LogicalPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/LogicalPlanner.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/LogicalPlanner.java
index be7bce6..80390d3 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/LogicalPlanner.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/LogicalPlanner.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.tajo.algebra.*;
+import org.apache.tajo.algebra.WindowSpec;
 import org.apache.tajo.catalog.*;
 import org.apache.tajo.catalog.partition.PartitionMethodDesc;
 import org.apache.tajo.catalog.proto.CatalogProtos;
@@ -43,6 +44,7 @@ import org.apache.tajo.engine.planner.rewrite.ProjectionPushDownRule;
 import org.apache.tajo.engine.utils.SchemaUtil;
 import org.apache.tajo.master.session.Session;
 import org.apache.tajo.storage.StorageUtil;
+import org.apache.tajo.util.Pair;
 import org.apache.tajo.util.KeyValueSet;
 import org.apache.tajo.util.TUtil;
 
@@ -117,6 +119,7 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
     QueryBlock rootBlock = plan.newAndGetBlock(LogicalPlan.ROOT_BLOCK);
     PreprocessContext preProcessorCtx = new PreprocessContext(session, plan, rootBlock);
     preprocessor.visit(preProcessorCtx, new Stack<Expr>(), expr);
+    plan.resetGeneratedId();
 
     PlanContext context = new PlanContext(session, plan, plan.getRootBlock(), debug);
     LogicalNode topMostNode = this.visit(context, new Stack<Expr>(), expr);
@@ -196,7 +199,8 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
     String [] referenceNames;
     // in prephase, insert all target list into NamedExprManagers.
     // Then it gets reference names, each of which points an expression in target list.
-    referenceNames = doProjectionPrephase(context, projection);
+    Pair<String [], ExprNormalizer.WindowSpecReferences []> referencesPair = doProjectionPrephase(context, projection);
+    referenceNames = referencesPair.getFirst();
 
     ////////////////////////////////////////////////////////
     // Visit and Build Child Plan
@@ -208,6 +212,14 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
     if (block.isAggregationRequired()) {
       child = insertGroupbyNode(context, child, stack);
     }
+
+    if (block.hasWindowSpecs()) {
+      LogicalNode windowAggNode =
+          insertWindowAggNode(context, child, stack, referenceNames, referencesPair.getSecond());
+      if (windowAggNode != null) {
+        child = windowAggNode;
+      }
+    }
     stack.pop();
     ////////////////////////////////////////////////////////
 
@@ -274,30 +286,73 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
     projectionNode.setInSchema(dupRemoval.getOutSchema());
   }
 
-  private String [] doProjectionPrephase(PlanContext context, Projection projection) throws PlanningException {
+  private Pair<String [], ExprNormalizer.WindowSpecReferences []> doProjectionPrephase(PlanContext context,
+                                                                                    Projection projection)
+      throws PlanningException {
+
     QueryBlock block = context.queryBlock;
 
     int finalTargetNum = projection.size();
     String [] referenceNames = new String[finalTargetNum];
     ExprNormalizedResult [] normalizedExprList = new ExprNormalizedResult[finalTargetNum];
-    NamedExpr namedExpr;
-    for (int i = 0; i < finalTargetNum; i++) {
-      namedExpr = projection.getNamedExprs()[i];
 
-      if (PlannerUtil.existsAggregationFunction(namedExpr)) {
-        block.setAggregationRequire();
+    List<ExprNormalizer.WindowSpecReferences> windowSpecReferencesList = TUtil.newList();
+
+    List<Integer> targetsIds = normalize(context, projection, normalizedExprList, new Matcher() {
+      @Override
+      public boolean isMatch(Expr expr) {
+        return ExprFinder.finds(expr, OpType.WindowFunction).size() == 0;
       }
-      // dissect an expression into multiple parts (at most dissected into three parts)
-      normalizedExprList[i] = normalizer.normalize(context, namedExpr.getExpr());
-    }
+    });
 
     // Note: Why separate normalization and add(Named)Expr?
     //
     // ExprNormalizer internally makes use of the named exprs in NamedExprsManager.
     // If we don't separate normalization work and addExprWithName, addExprWithName will find named exprs evaluated
     // the same logical node. It will cause impossible evaluation in physical executors.
-    for (int i = 0; i < finalTargetNum; i++) {
-      namedExpr = projection.getNamedExprs()[i];
+    addNamedExprs(block, referenceNames, normalizedExprList, windowSpecReferencesList, projection, targetsIds);
+
+    targetsIds = normalize(context, projection, normalizedExprList, new Matcher() {
+      @Override
+      public boolean isMatch(Expr expr) {
+        return ExprFinder.finds(expr, OpType.WindowFunction).size() > 0;
+      }
+    });
+    addNamedExprs(block, referenceNames, normalizedExprList, windowSpecReferencesList, projection, targetsIds);
+
+    return new Pair<String[], ExprNormalizer.WindowSpecReferences []>(referenceNames,
+        windowSpecReferencesList.toArray(new ExprNormalizer.WindowSpecReferences[windowSpecReferencesList.size()]));
+  }
+
+  private interface Matcher {
+    public boolean isMatch(Expr expr);
+  }
+
+  public List<Integer> normalize(PlanContext context, Projection projection, ExprNormalizedResult [] normalizedExprList,
+                                 Matcher matcher) throws PlanningException {
+    List<Integer> targetIds = new ArrayList<Integer>();
+    for (int i = 0; i < projection.size(); i++) {
+      NamedExpr namedExpr = projection.getNamedExprs()[i];
+
+      if (PlannerUtil.existsAggregationFunction(namedExpr)) {
+        context.queryBlock.setAggregationRequire();
+      }
+
+      if (matcher.isMatch(namedExpr.getExpr())) {
+        // dissect an expression into multiple parts (at most dissected into three parts)
+        normalizedExprList[i] = normalizer.normalize(context, namedExpr.getExpr());
+        targetIds.add(i);
+      }
+    }
+
+    return targetIds;
+  }
+
+  private void addNamedExprs(QueryBlock block, String [] referenceNames, ExprNormalizedResult [] normalizedExprList,
+                             List<ExprNormalizer.WindowSpecReferences> windowSpecReferencesList, Projection projection,
+                             List<Integer> targetIds) throws PlanningException {
+    for (int i : targetIds) {
+      NamedExpr namedExpr = projection.getNamedExprs()[i];
       // Get all projecting references
       if (namedExpr.hasAlias()) {
         NamedExpr aliasedExpr = new NamedExpr(normalizedExprList[i].baseExpr, namedExpr.getAlias());
@@ -309,9 +364,10 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
       // Add sub-expressions (i.e., aggregation part and scalar part) from dissected parts.
       block.namedExprsMgr.addNamedExprArray(normalizedExprList[i].aggExprs);
       block.namedExprsMgr.addNamedExprArray(normalizedExprList[i].scalarExprs);
-    }
+      block.namedExprsMgr.addNamedExprArray(normalizedExprList[i].windowAggExprs);
 
-    return referenceNames;
+      windowSpecReferencesList.addAll(normalizedExprList[i].windowSpecs);
+    }
   }
 
   /**
@@ -359,62 +415,244 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
     return targets;
   }
 
+  /**
+   * It checks if all targets of Projectable plan node can be evaluated from the child node.
+   * It can avoid potential errors which possibly occur in physical operators.
+   *
+   * @param block QueryBlock which includes the Projectable node
+   * @param projectable Projectable node to be valid
+   * @throws PlanningException
+   */
   public static void verifyProjectedFields(QueryBlock block, Projectable projectable) throws PlanningException {
-    if (projectable instanceof ProjectionNode && block.hasNode(NodeType.GROUP_BY)) {
-      for (Target target : projectable.getTargets()) {
-        Set<Column> columns = EvalTreeUtil.findUniqueColumns(target.getEvalTree());
-        for (Column c : columns) {
-          if (!projectable.getInSchema().contains(c)) {
-            throw new PlanningException(c.getQualifiedName()
-                + " must appear in the GROUP BY clause or be used in an aggregate function at node ("
-                + projectable.getPID() + ")" );
-          }
-        }
-      }
-    } else  if (projectable instanceof GroupbyNode) {
+    if (projectable instanceof GroupbyNode) {
       GroupbyNode groupbyNode = (GroupbyNode) projectable;
-      // It checks if all column references within each target can be evaluated with the input schema.
-      int groupingColumnNum = groupbyNode.getGroupingColumns().length;
-      for (int i = 0; i < groupingColumnNum; i++) {
-        Set<Column> columns = EvalTreeUtil.findUniqueColumns(groupbyNode.getTargets()[i].getEvalTree());
-        if (!projectable.getInSchema().containsAll(columns)) {
-          throw new PlanningException(String.format("Cannot get the field(s) \"%s\" at node (%d)",
-              TUtil.collectionToString(columns), projectable.getPID()));
+
+      if (!groupbyNode.isEmptyGrouping()) { // it should be targets instead of
+        int groupingKeyNum = groupbyNode.getGroupingColumns().length;
+
+        for (int i = 0; i < groupingKeyNum; i++) {
+          Target target = groupbyNode.getTargets()[i];
+          if (groupbyNode.getTargets()[i].getEvalTree().getType() == EvalType.FIELD) {
+            FieldEval grpKeyEvalNode = target.getEvalTree();
+            if (!groupbyNode.getInSchema().contains(grpKeyEvalNode.getColumnRef())) {
+              throwCannotEvaluateException(projectable, grpKeyEvalNode.getName());
+            }
+          }
         }
       }
+
       if (groupbyNode.hasAggFunctions()) {
-        for (AggregationFunctionCallEval f : groupbyNode.getAggFunctions()) {
-          Set<Column> columns = EvalTreeUtil.findUniqueColumns(f);
-          for (Column c : columns) {
-            if (!projectable.getInSchema().contains(c)) {
-              throw new PlanningException(String.format("Cannot get the field \"%s\" at node (%d)",
-                  c, projectable.getPID()));
-            }
+        verifyIfEvalNodesCanBeEvaluated(projectable, groupbyNode.getAggFunctions());
+      }
+
+    } else if (projectable instanceof WindowAggNode) {
+      WindowAggNode windowAggNode = (WindowAggNode) projectable;
+
+      if (windowAggNode.hasPartitionKeys()) {
+        verifyIfColumnCanBeEvaluated(projectable.getInSchema(), projectable, windowAggNode.getPartitionKeys());
+      }
+
+      if (windowAggNode.hasAggFunctions()) {
+        verifyIfEvalNodesCanBeEvaluated(projectable, windowAggNode.getWindowFunctions());
+      }
+
+      if (windowAggNode.hasSortSpecs()) {
+        Column [] sortKeys = PlannerUtil.sortSpecsToSchema(windowAggNode.getSortSpecs()).toArray();
+        verifyIfColumnCanBeEvaluated(projectable.getInSchema(), projectable, sortKeys);
+      }
+
+      // verify targets except for function slots
+      for (int i = 0; i < windowAggNode.getTargets().length - windowAggNode.getWindowFunctions().length; i++) {
+        Target target = windowAggNode.getTargets()[i];
+        Set<Column> columns = EvalTreeUtil.findUniqueColumns(target.getEvalTree());
+        for (Column c : columns) {
+          if (!windowAggNode.getInSchema().contains(c)) {
+            throwCannotEvaluateException(projectable, c.getQualifiedName());
           }
         }
       }
+
     } else if (projectable instanceof RelationNode) {
       RelationNode relationNode = (RelationNode) projectable;
-      for (Target target : projectable.getTargets()) {
-        Set<Column> columns = EvalTreeUtil.findUniqueColumns(target.getEvalTree());
-        for (Column c : columns) {
-          if (!relationNode.getTableSchema().contains(c)) {
-            throw new PlanningException(String.format("Cannot get the field \"%s\" at node (%d)",
-                c, projectable.getPID()));
-          }
+      verifyIfTargetsCanBeEvaluated(relationNode.getTableSchema(), (Projectable) relationNode);
+
+    } else {
+      verifyIfTargetsCanBeEvaluated(projectable.getInSchema(), projectable);
+    }
+  }
+
+  public static void verifyIfEvalNodesCanBeEvaluated(Projectable projectable, EvalNode[] evalNodes)
+      throws PlanningException {
+    for (EvalNode e : evalNodes) {
+      Set<Column> columns = EvalTreeUtil.findUniqueColumns(e);
+      for (Column c : columns) {
+        if (!projectable.getInSchema().contains(c)) {
+          throwCannotEvaluateException(projectable, c.getQualifiedName());
         }
       }
+    }
+  }
+
+  public static void verifyIfTargetsCanBeEvaluated(Schema baseSchema, Projectable projectable)
+      throws PlanningException {
+    for (Target target : projectable.getTargets()) {
+      Set<Column> columns = EvalTreeUtil.findUniqueColumns(target.getEvalTree());
+      for (Column c : columns) {
+        if (!baseSchema.contains(c)) {
+          throwCannotEvaluateException(projectable, c.getQualifiedName());
+        }
+      }
+    }
+  }
+
+  public static void verifyIfColumnCanBeEvaluated(Schema baseSchema, Projectable projectable, Column [] columns)
+      throws PlanningException {
+    for (Column c : columns) {
+      if (!baseSchema.contains(c)) {
+        throwCannotEvaluateException(projectable, c.getQualifiedName());
+      }
+    }
+  }
+
+  public static void throwCannotEvaluateException(Projectable projectable, String columnName) throws PlanningException {
+    if (projectable instanceof UnaryNode && ((UnaryNode) projectable).getChild().getType() == NodeType.GROUP_BY) {
+      throw new PlanningException(columnName
+          + " must appear in the GROUP BY clause or be used in an aggregate function at node ("
+          + projectable.getPID() + ")");
     } else {
-      for (Target target : projectable.getTargets()) {
-        Set<Column> columns = EvalTreeUtil.findUniqueColumns(target.getEvalTree());
-        for (Column c : columns) {
-          if (!projectable.getInSchema().contains(c)) {
-            throw new PlanningException(String.format("Cannot get the field \"%s\" at node (%d)",
-                c, projectable.getPID()));
+      throw new PlanningException(String.format("Cannot evaluate the field \"%s\" at node (%d)",
+          columnName, projectable.getPID()));
+    }
+  }
+
+  private LogicalNode insertWindowAggNode(PlanContext context, LogicalNode child, Stack<Expr> stack,
+                                          String [] referenceNames,
+                                          ExprNormalizer.WindowSpecReferences [] windowSpecReferenceses)
+      throws PlanningException {
+    LogicalPlan plan = context.plan;
+    QueryBlock block = context.queryBlock;
+    WindowAggNode windowAggNode = context.plan.createNode(WindowAggNode.class);
+    if (child.getType() == NodeType.LIMIT) {
+      LimitNode limitNode = (LimitNode) child;
+      windowAggNode.setChild(limitNode.getChild());
+      windowAggNode.setInSchema(limitNode.getChild().getOutSchema());
+      limitNode.setChild(windowAggNode);
+    } else if (child.getType() == NodeType.SORT) {
+      SortNode sortNode = (SortNode) child;
+      windowAggNode.setChild(sortNode.getChild());
+      windowAggNode.setInSchema(sortNode.getChild().getOutSchema());
+      sortNode.setChild(windowAggNode);
+    } else {
+      windowAggNode.setChild(child);
+      windowAggNode.setInSchema(child.getOutSchema());
+    }
+
+    List<String> winFuncRefs = new ArrayList<String>();
+    List<WindowFunctionEval> winFuncs = new ArrayList<WindowFunctionEval>();
+    List<WindowSpec> rawWindowSpecs = Lists.newArrayList();
+    for (Iterator<NamedExpr> it = block.namedExprsMgr.getIteratorForUnevaluatedExprs(); it.hasNext();) {
+      NamedExpr rawTarget = it.next();
+      try {
+        EvalNode evalNode = exprAnnotator.createEvalNode(context.plan, context.queryBlock, rawTarget.getExpr());
+        if (evalNode.getType() == EvalType.WINDOW_FUNCTION) {
+          winFuncRefs.add(rawTarget.getAlias());
+          winFuncs.add((WindowFunctionEval) evalNode);
+          block.namedExprsMgr.markAsEvaluated(rawTarget.getAlias(), evalNode);
+
+          // TODO - Later, we also consider the possibility that a window function contains only a window name.
+          rawWindowSpecs.add(((WindowFunctionExpr) (rawTarget.getExpr())).getWindowSpec());
+        }
+      } catch (VerifyException ve) {
+      }
+    }
+
+    // we only consider one window definition.
+    if (windowSpecReferenceses[0].hasPartitionKeys()) {
+      Column [] partitionKeyColumns = new Column[windowSpecReferenceses[0].getPartitionKeys().length];
+      int i = 0;
+      for (String partitionKey : windowSpecReferenceses[0].getPartitionKeys()) {
+        if (block.namedExprsMgr.isEvaluated(partitionKey)) {
+          partitionKeyColumns[i++] = block.namedExprsMgr.getTarget(partitionKey).getNamedColumn();
+        } else {
+          throw new PlanningException("Each grouping column expression must be a scalar expression.");
+        }
+      }
+      windowAggNode.setPartitionKeys(partitionKeyColumns);
+    }
+
+    SortSpec [][] sortGroups = new SortSpec[rawWindowSpecs.size()][];
+
+    for (int winSpecIdx = 0; winSpecIdx < rawWindowSpecs.size(); winSpecIdx++) {
+      WindowSpec spec = rawWindowSpecs.get(winSpecIdx);
+      if (spec.hasOrderBy()) {
+        Sort.SortSpec [] sortSpecs = spec.getSortSpecs();
+        int sortNum = sortSpecs.length;
+        String [] sortKeyRefNames = windowSpecReferenceses[winSpecIdx].getOrderKeys();
+        SortSpec [] annotatedSortSpecs = new SortSpec[sortNum];
+
+        Column column;
+        for (int i = 0; i < sortNum; i++) {
+          if (block.namedExprsMgr.isEvaluated(sortKeyRefNames[i])) {
+            column = block.namedExprsMgr.getTarget(sortKeyRefNames[i]).getNamedColumn();
+          } else {
+            throw new IllegalStateException("Unexpected State: " + TUtil.arrayToString(sortSpecs));
           }
+          annotatedSortSpecs[i] = new SortSpec(column, sortSpecs[i].isAscending(), sortSpecs[i].isNullFirst());
         }
+
+        sortGroups[winSpecIdx] = annotatedSortSpecs;
+      } else {
+        sortGroups[winSpecIdx] = null;
       }
     }
+
+    for (int i = 0; i < winFuncRefs.size(); i++) {
+      WindowFunctionEval winFunc = winFuncs.get(i);
+      if (sortGroups[i] != null) {
+        winFunc.setSortSpecs(sortGroups[i]);
+      }
+    }
+
+    Target [] targets = new Target[referenceNames.length];
+    List<Integer> windowFuncIndices = Lists.newArrayList();
+    Projection projection = (Projection) stack.peek();
+    int windowFuncIdx = 0;
+    for (NamedExpr expr : projection.getNamedExprs()) {
+      if (expr.getExpr().getType() == OpType.WindowFunction) {
+        windowFuncIndices.add(windowFuncIdx);
+      }
+      windowFuncIdx++;
+    }
+    windowAggNode.setWindowFunctions(winFuncs.toArray(new WindowFunctionEval[winFuncs.size()]));
+
+    int targetIdx = 0;
+    for (int i = 0; i < referenceNames.length ; i++) {
+      if (!windowFuncIndices.contains(i)) {
+        targets[targetIdx++] = block.namedExprsMgr.getTarget(referenceNames[i]);
+      }
+    }
+    for (int i = 0; i < winFuncRefs.size(); i++) {
+      targets[targetIdx++] = block.namedExprsMgr.getTarget(winFuncRefs.get(i));
+    }
+    windowAggNode.setTargets(targets);
+    verifyProjectedFields(block, windowAggNode);
+
+    block.registerNode(windowAggNode);
+    postHook(context, stack, null, windowAggNode);
+
+    if (child.getType() == NodeType.LIMIT) {
+      LimitNode limitNode = (LimitNode) child;
+      limitNode.setInSchema(windowAggNode.getOutSchema());
+      limitNode.setOutSchema(windowAggNode.getOutSchema());
+      return null;
+    } else if (child.getType() == NodeType.SORT) {
+      SortNode sortNode = (SortNode) child;
+      sortNode.setInSchema(windowAggNode.getOutSchema());
+      sortNode.setOutSchema(windowAggNode.getOutSchema());
+      return null;
+    } else {
+      return windowAggNode;
+    }
   }
 
   /**
@@ -458,6 +696,8 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
     // this inserted group-by node doesn't pass through preprocessor. So manually added.
     block.registerNode(groupbyNode);
     postHook(context, stack, null, groupbyNode);
+
+    verifyProjectedFields(block, groupbyNode);
     return groupbyNode;
   }
 
@@ -1541,7 +1781,7 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
     return new Column(columnDefinition.getColumnName(), convertDataType(columnDefinition));
   }
 
-  static TajoDataTypes.DataType convertDataType(DataTypeExpr dataType) {
+  public static TajoDataTypes.DataType convertDataType(DataTypeExpr dataType) {
     TajoDataTypes.Type type = TajoDataTypes.Type.valueOf(dataType.getTypeName());
 
     TajoDataTypes.DataType.Builder builder = TajoDataTypes.DataType.newBuilder();
@@ -1600,6 +1840,20 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
     Util SECTION
   ===============================================================================================*/
 
+  public static boolean checkIfBeEvaluatedAtWindowAgg(EvalNode evalNode, WindowAggNode node) {
+    Set<Column> columnRefs = EvalTreeUtil.findUniqueColumns(evalNode);
+
+    if (columnRefs.size() > 0 && !node.getInSchema().containsAll(columnRefs)) {
+      return false;
+    }
+
+    if (EvalTreeUtil.findDistinctAggFunction(evalNode).size() > 0) {
+      return false;
+    }
+
+    return true;
+  }
+
   public static boolean checkIfBeEvaluatedAtGroupBy(EvalNode evalNode, GroupbyNode node) {
     Set<Column> columnRefs = EvalTreeUtil.findUniqueColumns(evalNode);
 
@@ -1607,6 +1861,10 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
       return false;
     }
 
+    if (EvalTreeUtil.findEvalsByType(evalNode, EvalType.WINDOW_FUNCTION).size() > 0) {
+      return false;
+    }
+
     return true;
   }
 
@@ -1618,6 +1876,10 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
       return false;
     }
 
+    if (EvalTreeUtil.findEvalsByType(evalNode, EvalType.WINDOW_FUNCTION).size() > 0) {
+      return false;
+    }
+
     if (columnRefs.size() > 0 && !node.getInSchema().containsAll(columnRefs)) {
       return false;
     }
@@ -1658,6 +1920,11 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
       return false;
     }
 
+    // aggregation functions cannot be evaluated in scan node
+    if (EvalTreeUtil.findEvalsByType(evalNode, EvalType.WINDOW_FUNCTION).size() > 0) {
+      return false;
+    }
+
     if (columnRefs.size() > 0 && !node.getTableSchema().containsAll(columnRefs)) {
       return false;
     }


Mime
View raw message