tajo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hyun...@apache.org
Subject [12/28] TAJO-1125: Separate logical plan and optimizer into a maven module.
Date Sat, 25 Oct 2014 18:18:01 GMT
http://git-wip-us.apache.org/repos/asf/tajo/blob/b143f991/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestSortExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestSortExec.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestSortExec.java
index 63e1899..3ed5997 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestSortExec.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestSortExec.java
@@ -33,8 +33,10 @@ import org.apache.tajo.datum.DatumFactory;
 import org.apache.tajo.engine.parser.SQLAnalyzer;
 import org.apache.tajo.engine.planner.*;
 import org.apache.tajo.engine.planner.enforce.Enforcer;
-import org.apache.tajo.engine.planner.logical.LogicalNode;
+import org.apache.tajo.plan.*;
+import org.apache.tajo.plan.logical.LogicalNode;
 import org.apache.tajo.engine.query.QueryContext;
+import org.apache.tajo.plan.util.PlannerUtil;
 import org.apache.tajo.storage.*;
 import org.apache.tajo.storage.fragment.FileFragment;
 import org.apache.tajo.util.CommonTestingUtil;

http://git-wip-us.apache.org/repos/asf/tajo/blob/b143f991/tajo-core/src/test/java/org/apache/tajo/engine/query/TestJoinBroadcast.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestJoinBroadcast.java b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestJoinBroadcast.java
index b95ae41..222b508 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestJoinBroadcast.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestJoinBroadcast.java
@@ -30,7 +30,7 @@ import org.apache.tajo.datum.Int4Datum;
 import org.apache.tajo.datum.TextDatum;
 import org.apache.tajo.engine.planner.global.ExecutionBlock;
 import org.apache.tajo.engine.planner.global.MasterPlan;
-import org.apache.tajo.engine.planner.logical.NodeType;
+import org.apache.tajo.plan.logical.NodeType;
 import org.apache.tajo.jdbc.TajoResultSet;
 import org.apache.tajo.master.querymaster.QueryMasterTask;
 import org.apache.tajo.storage.*;

http://git-wip-us.apache.org/repos/asf/tajo/blob/b143f991/tajo-core/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java
index da09129..828d2a3 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java
@@ -25,7 +25,10 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.io.compress.CompressionCodecFactory;
 import org.apache.hadoop.io.compress.DeflateCodec;
-import org.apache.tajo.*;
+import org.apache.tajo.QueryId;
+import org.apache.tajo.QueryTestCaseBase;
+import org.apache.tajo.TajoConstants;
+import org.apache.tajo.TajoTestingCluster;
 import org.apache.tajo.catalog.CatalogService;
 import org.apache.tajo.catalog.CatalogUtil;
 import org.apache.tajo.catalog.Schema;
@@ -35,16 +38,12 @@ import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.engine.planner.global.DataChannel;
 import org.apache.tajo.engine.planner.global.ExecutionBlock;
 import org.apache.tajo.engine.planner.global.MasterPlan;
-import org.apache.tajo.engine.planner.logical.NodeType;
 import org.apache.tajo.ipc.ClientProtos;
 import org.apache.tajo.jdbc.TajoResultSet;
-import org.apache.tajo.master.querymaster.Query;
 import org.apache.tajo.master.querymaster.QueryMasterTask;
-import org.apache.tajo.master.querymaster.QueryUnit;
-import org.apache.tajo.master.querymaster.SubQuery;
+import org.apache.tajo.plan.logical.NodeType;
 import org.apache.tajo.storage.StorageConstants;
 import org.apache.tajo.util.KeyValueSet;
-import org.apache.tajo.util.TUtil;
 import org.apache.tajo.worker.TajoWorker;
 import org.junit.Test;
 
@@ -56,9 +55,8 @@ import java.util.Map;
 import java.util.Random;
 
 import static org.apache.tajo.TajoConstants.DEFAULT_DATABASE_NAME;
-import static org.apache.tajo.ipc.TajoWorkerProtocol.ShuffleType.SCATTERED_HASH_SHUFFLE;
+import static org.apache.tajo.plan.serder.PlanProto.ShuffleType.SCATTERED_HASH_SHUFFLE;
 import static org.junit.Assert.*;
-import static org.junit.Assert.assertEquals;
 
 public class TestTablePartitions extends QueryTestCaseBase {
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/b143f991/tajo-core/src/test/java/org/apache/tajo/engine/util/TestTupleUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/util/TestTupleUtil.java b/tajo-core/src/test/java/org/apache/tajo/engine/util/TestTupleUtil.java
index cecb281..62c959c 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/util/TestTupleUtil.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/util/TestTupleUtil.java
@@ -24,10 +24,10 @@ import org.apache.tajo.catalog.SortSpec;
 import org.apache.tajo.common.TajoDataTypes.Type;
 import org.apache.tajo.datum.Datum;
 import org.apache.tajo.datum.DatumFactory;
-import org.apache.tajo.engine.planner.PlannerUtil;
+import org.apache.tajo.plan.util.PlannerUtil;
 import org.apache.tajo.engine.planner.RangePartitionAlgorithm;
 import org.apache.tajo.engine.planner.UniformRangePartition;
-import org.apache.tajo.engine.utils.TupleUtil;
+import org.apache.tajo.plan.rewrite.rules.PartitionedTableRewriter;
 import org.apache.tajo.storage.*;
 import org.apache.tajo.storage.RowStoreUtil.RowStoreDecoder;
 import org.apache.tajo.storage.RowStoreUtil.RowStoreEncoder;
@@ -129,41 +129,41 @@ public class TestTupleUtil {
     schema.addColumn("key2", Type.TEXT);
 
     Path path = new Path("hdfs://tajo/warehouse/partition_test/");
-    Tuple tuple = TupleUtil.buildTupleFromPartitionPath(schema, path, true);
+    Tuple tuple = PartitionedTableRewriter.buildTupleFromPartitionPath(schema, path, true);
     assertNull(tuple);
-    tuple = TupleUtil.buildTupleFromPartitionPath(schema, path, false);
+    tuple = PartitionedTableRewriter.buildTupleFromPartitionPath(schema, path, false);
     assertNull(tuple);
 
     path = new Path("hdfs://tajo/warehouse/partition_test/key1=123");
-    tuple = TupleUtil.buildTupleFromPartitionPath(schema, path, true);
+    tuple = PartitionedTableRewriter.buildTupleFromPartitionPath(schema, path, true);
     assertNotNull(tuple);
     assertEquals(DatumFactory.createInt8(123), tuple.get(0));
-    tuple = TupleUtil.buildTupleFromPartitionPath(schema, path, false);
+    tuple = PartitionedTableRewriter.buildTupleFromPartitionPath(schema, path, false);
     assertNotNull(tuple);
     assertEquals(DatumFactory.createInt8(123), tuple.get(0));
 
     path = new Path("hdfs://tajo/warehouse/partition_test/key1=123/part-0000"); // wrong cases;
-    tuple = TupleUtil.buildTupleFromPartitionPath(schema, path, true);
+    tuple = PartitionedTableRewriter.buildTupleFromPartitionPath(schema, path, true);
     assertNull(tuple);
-    tuple = TupleUtil.buildTupleFromPartitionPath(schema, path, false);
+    tuple = PartitionedTableRewriter.buildTupleFromPartitionPath(schema, path, false);
     assertNull(tuple);
 
     path = new Path("hdfs://tajo/warehouse/partition_test/key1=123/key2=abc");
-    tuple = TupleUtil.buildTupleFromPartitionPath(schema, path, true);
+    tuple = PartitionedTableRewriter.buildTupleFromPartitionPath(schema, path, true);
     assertNotNull(tuple);
     assertEquals(DatumFactory.createInt8(123), tuple.get(0));
     assertEquals(DatumFactory.createText("abc"), tuple.get(1));
-    tuple = TupleUtil.buildTupleFromPartitionPath(schema, path, false);
+    tuple = PartitionedTableRewriter.buildTupleFromPartitionPath(schema, path, false);
     assertNotNull(tuple);
     assertEquals(DatumFactory.createInt8(123), tuple.get(0));
     assertEquals(DatumFactory.createText("abc"), tuple.get(1));
 
 
     path = new Path("hdfs://tajo/warehouse/partition_test/key1=123/key2=abc/part-0001");
-    tuple = TupleUtil.buildTupleFromPartitionPath(schema, path, true);
+    tuple = PartitionedTableRewriter.buildTupleFromPartitionPath(schema, path, true);
     assertNull(tuple);
 
-    tuple = TupleUtil.buildTupleFromPartitionPath(schema, path, false);
+    tuple = PartitionedTableRewriter.buildTupleFromPartitionPath(schema, path, false);
     assertNotNull(tuple);
     assertEquals(DatumFactory.createInt8(123), tuple.get(0));
     assertEquals(DatumFactory.createText("abc"), tuple.get(1));

http://git-wip-us.apache.org/repos/asf/tajo/blob/b143f991/tajo-core/src/test/java/org/apache/tajo/master/TestExecutionBlockCursor.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/master/TestExecutionBlockCursor.java b/tajo-core/src/test/java/org/apache/tajo/master/TestExecutionBlockCursor.java
index 234c58e..067c6c8 100644
--- a/tajo-core/src/test/java/org/apache/tajo/master/TestExecutionBlockCursor.java
+++ b/tajo-core/src/test/java/org/apache/tajo/master/TestExecutionBlockCursor.java
@@ -27,9 +27,9 @@ import org.apache.tajo.catalog.proto.CatalogProtos;
 import org.apache.tajo.catalog.statistics.TableStats;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.engine.parser.SQLAnalyzer;
-import org.apache.tajo.engine.planner.LogicalOptimizer;
-import org.apache.tajo.engine.planner.LogicalPlan;
-import org.apache.tajo.engine.planner.LogicalPlanner;
+import org.apache.tajo.plan.LogicalOptimizer;
+import org.apache.tajo.plan.LogicalPlan;
+import org.apache.tajo.plan.LogicalPlanner;
 import org.apache.tajo.engine.planner.global.ExecutionBlockCursor;
 import org.apache.tajo.engine.planner.global.GlobalPlanner;
 import org.apache.tajo.engine.planner.global.MasterPlan;

http://git-wip-us.apache.org/repos/asf/tajo/blob/b143f991/tajo-core/src/test/java/org/apache/tajo/master/TestGlobalPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/master/TestGlobalPlanner.java b/tajo-core/src/test/java/org/apache/tajo/master/TestGlobalPlanner.java
index 167c1f6..c908737 100644
--- a/tajo-core/src/test/java/org/apache/tajo/master/TestGlobalPlanner.java
+++ b/tajo-core/src/test/java/org/apache/tajo/master/TestGlobalPlanner.java
@@ -29,22 +29,22 @@ import org.apache.tajo.catalog.partition.PartitionMethodDesc;
 import org.apache.tajo.catalog.proto.CatalogProtos;
 import org.apache.tajo.catalog.statistics.TableStats;
 import org.apache.tajo.common.TajoDataTypes;
-import org.apache.tajo.engine.eval.BinaryEval;
-import org.apache.tajo.engine.eval.EvalType;
-import org.apache.tajo.engine.eval.FieldEval;
 import org.apache.tajo.engine.function.FunctionLoader;
 import org.apache.tajo.engine.parser.SQLAnalyzer;
-import org.apache.tajo.engine.planner.*;
 import org.apache.tajo.engine.planner.global.DataChannel;
 import org.apache.tajo.engine.planner.global.ExecutionBlock;
 import org.apache.tajo.engine.planner.global.GlobalPlanner;
 import org.apache.tajo.engine.planner.global.MasterPlan;
 import org.apache.tajo.engine.query.QueryContext;
+import org.apache.tajo.plan.*;
+import org.apache.tajo.plan.expr.BinaryEval;
+import org.apache.tajo.plan.expr.EvalType;
+import org.apache.tajo.plan.expr.FieldEval;
+import org.apache.tajo.plan.util.PlannerUtil;
 import org.apache.tajo.util.CommonTestingUtil;
 import org.apache.tajo.util.FileUtil;
 import org.apache.tajo.util.TUtil;
 import org.junit.AfterClass;
-import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/b143f991/tajo-core/src/test/java/org/apache/tajo/master/TestRepartitioner.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/master/TestRepartitioner.java b/tajo-core/src/test/java/org/apache/tajo/master/TestRepartitioner.java
index afa330e..3c96770 100644
--- a/tajo-core/src/test/java/org/apache/tajo/master/TestRepartitioner.java
+++ b/tajo-core/src/test/java/org/apache/tajo/master/TestRepartitioner.java
@@ -28,6 +28,7 @@ import org.apache.tajo.ipc.TajoWorkerProtocol;
 import org.apache.tajo.master.querymaster.QueryUnit;
 import org.apache.tajo.master.querymaster.QueryUnit.IntermediateEntry;
 import org.apache.tajo.master.querymaster.Repartitioner;
+import org.apache.tajo.plan.serder.PlanProto;
 import org.apache.tajo.util.Pair;
 import org.apache.tajo.util.TUtil;
 import org.apache.tajo.worker.FetchImpl;
@@ -38,8 +39,10 @@ import java.net.URI;
 import java.util.*;
 
 import static junit.framework.Assert.assertEquals;
-import static org.apache.tajo.ipc.TajoWorkerProtocol.ShuffleType;
 import static org.apache.tajo.master.querymaster.Repartitioner.FetchGroupMeta;
+import static org.apache.tajo.plan.serder.PlanProto.ShuffleType;
+import static org.apache.tajo.plan.serder.PlanProto.ShuffleType.HASH_SHUFFLE;
+import static org.apache.tajo.plan.serder.PlanProto.ShuffleType.SCATTERED_HASH_SHUFFLE;
 import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertTrue;
 
@@ -68,7 +71,7 @@ public class TestRepartitioner {
         new HashMap<Integer, Map<ExecutionBlockId, List<IntermediateEntry>>>();
 
     for (Map.Entry<Integer, List<IntermediateEntry>> eachEntry: intermediateEntries.entrySet()) {
-      FetchImpl fetch = new FetchImpl(new QueryUnit.PullHost(hostName, port), TajoWorkerProtocol.ShuffleType.HASH_SHUFFLE,
+      FetchImpl fetch = new FetchImpl(new QueryUnit.PullHost(hostName, port), ShuffleType.HASH_SHUFFLE,
           sid, eachEntry.getKey(), eachEntry.getValue());
 
       fetch.setName(sid.toString());
@@ -117,7 +120,7 @@ public class TestRepartitioner {
     ExecutionBlockId ebId = new ExecutionBlockId(LocalTajoTestingUtility.newQueryId(), 0);
     FetchImpl [] fetches = new FetchImpl[12];
     for (int i = 0; i < 12; i++) {
-      fetches[i] = new FetchImpl(new QueryUnit.PullHost("localhost", 10000 + i), ShuffleType.HASH_SHUFFLE, ebId, i / 2);
+      fetches[i] = new FetchImpl(new QueryUnit.PullHost("localhost", 10000 + i), HASH_SHUFFLE, ebId, i / 2);
     }
 
     int [] VOLUMES = {100, 80, 70, 30, 10, 5};
@@ -480,8 +483,8 @@ public class TestRepartitioner {
     ExecutionBlockId ebId = new ExecutionBlockId(LocalTajoTestingUtility.newQueryId(), 0);
     QueryUnit.PullHost pullHost = new QueryUnit.PullHost("localhost", 0);
 
-    FetchImpl expected = new FetchImpl(pullHost, ShuffleType.SCATTERED_HASH_SHUFFLE, ebId, 1);
-    FetchImpl fetch2 = new FetchImpl(pullHost, ShuffleType.SCATTERED_HASH_SHUFFLE, ebId, 1);
+    FetchImpl expected = new FetchImpl(pullHost, SCATTERED_HASH_SHUFFLE, ebId, 1);
+    FetchImpl fetch2 = new FetchImpl(pullHost, SCATTERED_HASH_SHUFFLE, ebId, 1);
     assertEquals(expected, fetch2);
     fetch2.setOffset(5);
     fetch2.setLength(10);

http://git-wip-us.apache.org/repos/asf/tajo/blob/b143f991/tajo-core/src/test/java/org/apache/tajo/master/querymaster/TestKillQuery.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/master/querymaster/TestKillQuery.java b/tajo-core/src/test/java/org/apache/tajo/master/querymaster/TestKillQuery.java
index 9eebfcd..37ee402 100644
--- a/tajo-core/src/test/java/org/apache/tajo/master/querymaster/TestKillQuery.java
+++ b/tajo-core/src/test/java/org/apache/tajo/master/querymaster/TestKillQuery.java
@@ -23,9 +23,9 @@ import org.apache.tajo.algebra.Expr;
 import org.apache.tajo.catalog.CatalogService;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.engine.parser.SQLAnalyzer;
-import org.apache.tajo.engine.planner.LogicalOptimizer;
-import org.apache.tajo.engine.planner.LogicalPlan;
-import org.apache.tajo.engine.planner.LogicalPlanner;
+import org.apache.tajo.plan.LogicalOptimizer;
+import org.apache.tajo.plan.LogicalPlan;
+import org.apache.tajo.plan.LogicalPlanner;
 import org.apache.tajo.engine.planner.global.GlobalPlanner;
 import org.apache.tajo.engine.planner.global.MasterPlan;
 import org.apache.tajo.engine.query.QueryContext;

http://git-wip-us.apache.org/repos/asf/tajo/blob/b143f991/tajo-core/src/test/java/org/apache/tajo/worker/TestRangeRetrieverHandler.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/worker/TestRangeRetrieverHandler.java b/tajo-core/src/test/java/org/apache/tajo/worker/TestRangeRetrieverHandler.java
index aff5d73..124976a 100644
--- a/tajo-core/src/test/java/org/apache/tajo/worker/TestRangeRetrieverHandler.java
+++ b/tajo-core/src/test/java/org/apache/tajo/worker/TestRangeRetrieverHandler.java
@@ -36,7 +36,11 @@ import org.apache.tajo.datum.DatumFactory;
 import org.apache.tajo.engine.parser.SQLAnalyzer;
 import org.apache.tajo.engine.planner.*;
 import org.apache.tajo.engine.planner.enforce.Enforcer;
-import org.apache.tajo.engine.planner.logical.LogicalNode;
+import org.apache.tajo.plan.LogicalOptimizer;
+import org.apache.tajo.plan.LogicalPlan;
+import org.apache.tajo.plan.LogicalPlanner;
+import org.apache.tajo.plan.util.PlannerUtil;
+import org.apache.tajo.plan.logical.LogicalNode;
 import org.apache.tajo.engine.planner.physical.ExternalSortExec;
 import org.apache.tajo.engine.planner.physical.PhysicalExec;
 import org.apache.tajo.engine.planner.physical.ProjectionExec;

http://git-wip-us.apache.org/repos/asf/tajo/blob/b143f991/tajo-plan/pom.xml
----------------------------------------------------------------------
diff --git a/tajo-plan/pom.xml b/tajo-plan/pom.xml
new file mode 100644
index 0000000..6a52e9b
--- /dev/null
+++ b/tajo-plan/pom.xml
@@ -0,0 +1,288 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Copyright 2012 Database Lab., Korea Univ.
+
+  Licensed under the Apache License, Version 2.0 (the "License");
+  you may not use this file except in compliance with the License.
+  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <artifactId>tajo-project</artifactId>
+    <groupId>org.apache.tajo</groupId>
+    <version>0.9.1-SNAPSHOT</version>
+    <relativePath>../tajo-project</relativePath>
+  </parent>
+  <artifactId>tajo-plan</artifactId>
+  <packaging>jar</packaging>
+  <name>Tajo Plan</name>
+  <description>This module contains logical plan and query optimization parts.</description>
+  <properties>
+    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+    <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
+  </properties>
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-compiler-plugin</artifactId>
+        <configuration>
+          <source>1.6</source>
+          <target>1.6</target>
+          <encoding>${project.build.sourceEncoding}</encoding>
+        </configuration>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.rat</groupId>
+        <artifactId>apache-rat-plugin</artifactId>
+        <executions>
+          <execution>
+            <phase>verify</phase>
+            <goals>
+              <goal>check</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-antrun-plugin</artifactId>
+        <executions>
+          <execution>
+            <id>create-protobuf-generated-sources-directory</id>
+            <phase>initialize</phase>
+            <configuration>
+              <target>
+                <mkdir dir="target/generated-sources/proto" />
+              </target>
+            </configuration>
+            <goals>
+              <goal>run</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+      <plugin>
+        <groupId>org.codehaus.mojo</groupId>
+        <artifactId>exec-maven-plugin</artifactId>
+        <version>1.2</version>
+        <executions>
+          <execution>
+            <id>generate-sources</id>
+            <phase>generate-sources</phase>
+            <configuration>
+              <executable>protoc</executable>
+              <arguments>
+                <argument>-Isrc/main/proto/</argument>
+                <argument>--proto_path=../tajo-common/src/main/proto</argument>
+                <argument>--proto_path=../tajo-catalog/tajo-catalog-common/src/main/proto</argument>
+                <argument>--proto_path=../tajo-client/src/main/proto</argument>
+                <argument>--java_out=target/generated-sources/proto</argument>
+                <argument>src/main/proto/Plan.proto</argument>
+              </arguments>
+            </configuration>
+            <goals>
+              <goal>exec</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+      <plugin>
+        <groupId>org.codehaus.mojo</groupId>
+        <artifactId>build-helper-maven-plugin</artifactId>
+        <version>1.5</version>
+        <executions>
+          <execution>
+            <id>add-source</id>
+            <phase>generate-sources</phase>
+            <goals>
+              <goal>add-source</goal>
+            </goals>
+            <configuration>
+              <sources>
+                <source>target/generated-sources/proto</source>
+              </sources>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-jar-plugin</artifactId>
+        <version>2.2</version>
+        <executions>
+          <execution>
+            <phase>package</phase>
+            <goals>
+              <goal>test-jar</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-dependency-plugin</artifactId>
+        <executions>
+          <execution>
+            <id>copy-dependencies</id>
+            <phase>package</phase>
+            <goals>
+              <goal>copy-dependencies</goal>
+            </goals>
+            <configuration>
+              <includeScope>runtime</includeScope>
+              <outputDirectory>${project.build.directory}/lib</outputDirectory>
+              <overWriteReleases>false</overWriteReleases>
+              <overWriteSnapshots>false</overWriteSnapshots>
+              <overWriteIfNewer>true</overWriteIfNewer>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-surefire-report-plugin</artifactId>
+      </plugin>
+    </plugins>
+  </build>
+
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.tajo</groupId>
+      <artifactId>tajo-common</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.tajo</groupId>
+      <artifactId>tajo-algebra</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.tajo</groupId>
+      <artifactId>tajo-catalog-common</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-common</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-core</artifactId>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+
+  <profiles>
+    <profile>
+      <id>docs</id>
+      <activation>
+        <activeByDefault>false</activeByDefault>
+      </activation>
+      <build>
+        <plugins>
+          <plugin>
+            <groupId>org.apache.maven.plugins</groupId>
+            <artifactId>maven-javadoc-plugin</artifactId>
+            <executions>
+              <execution>
+                <!-- build javadoc jars per jar for publishing to maven -->
+                <id>module-javadocs</id>
+                <phase>package</phase>
+                <goals>
+                  <goal>jar</goal>
+                </goals>
+                <configuration>
+                  <destDir>${project.build.directory}</destDir>
+                </configuration>
+              </execution>
+            </executions>
+          </plugin>
+        </plugins>
+      </build>
+    </profile>
+    <profile>
+      <id>dist</id>
+      <activation>
+        <activeByDefault>false</activeByDefault>
+        <property>
+          <name>tar|rpm|deb</name>
+        </property>
+      </activation>
+      <build>
+        <plugins>
+          <plugin>
+            <groupId>org.apache.maven.plugins</groupId>
+            <artifactId>maven-antrun-plugin</artifactId>
+            <executions>
+              <execution>
+                <id>dist</id>
+                <phase>package</phase>
+                <goals>
+                  <goal>run</goal>
+                </goals>
+                <configuration>
+                  <target>
+                    <echo file="${project.build.directory}/dist-layout-stitching.sh">
+                      run() {
+                      echo "\$ ${@}"
+                      "${@}"
+                      res=$?
+                      if [ $res != 0 ]; then
+                      echo
+                      echo "Failed!"
+                      echo
+                      exit $res
+                      fi
+                      }
+
+                      ROOT=`cd ${basedir}/..;pwd`
+                      echo
+                      echo "Current directory `pwd`"
+                      echo
+                      run rm -rf ${project.artifactId}-${project.version}
+                      run mkdir ${project.artifactId}-${project.version}
+                      run cd ${project.artifactId}-${project.version}
+                      run cp -r ${basedir}/target/${project.artifactId}-${project.version}*.jar .
+                      echo
+                      echo "Tajo Algebra dist layout available at: ${project.build.directory}/${project.artifactId}-${project.version}"
+                      echo
+                    </echo>
+                    <exec executable="sh" dir="${project.build.directory}" failonerror="true">
+                      <arg line="./dist-layout-stitching.sh" />
+                    </exec>
+                  </target>
+                </configuration>
+              </execution>
+            </executions>
+          </plugin>
+        </plugins>
+      </build>
+    </profile>
+  </profiles>
+
+  <reporting>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-surefire-report-plugin</artifactId>
+        <version>2.15</version>
+      </plugin>
+    </plugins>
+  </reporting>
+</project>
+

http://git-wip-us.apache.org/repos/asf/tajo/blob/b143f991/tajo-plan/src/main/java/org/apache/tajo/plan/ExprAnnotator.java
----------------------------------------------------------------------
diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/ExprAnnotator.java b/tajo-plan/src/main/java/org/apache/tajo/plan/ExprAnnotator.java
new file mode 100644
index 0000000..406cdfc
--- /dev/null
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/ExprAnnotator.java
@@ -0,0 +1,934 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.plan;
+
+import com.google.common.collect.Sets;
+import org.apache.tajo.algebra.*;
+import org.apache.tajo.catalog.CatalogService;
+import org.apache.tajo.catalog.CatalogUtil;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.FunctionDesc;
+import org.apache.tajo.catalog.exception.NoSuchFunctionException;
+import org.apache.tajo.common.TajoDataTypes;
+import org.apache.tajo.datum.*;
+import org.apache.tajo.exception.InternalException;
+import org.apache.tajo.exception.InvalidOperationException;
+import org.apache.tajo.plan.algebra.BaseAlgebraVisitor;
+import org.apache.tajo.plan.expr.*;
+import org.apache.tajo.plan.function.AggFunction;
+import org.apache.tajo.plan.function.GeneralFunction;
+import org.apache.tajo.plan.logical.NodeType;
+import org.apache.tajo.plan.nameresolver.NameResolver;
+import org.apache.tajo.plan.nameresolver.NameResolvingMode;
+import org.apache.tajo.util.Pair;
+import org.apache.tajo.util.TUtil;
+import org.apache.tajo.util.datetime.DateTimeUtil;
+import org.apache.tajo.util.datetime.TimeMeta;
+
+import java.util.Set;
+import java.util.Stack;
+
+import static org.apache.tajo.algebra.WindowSpec.WindowFrameEndBoundType;
+import static org.apache.tajo.algebra.WindowSpec.WindowFrameStartBoundType;
+import static org.apache.tajo.catalog.proto.CatalogProtos.FunctionType;
+import static org.apache.tajo.common.TajoDataTypes.DataType;
+import static org.apache.tajo.common.TajoDataTypes.Type;
+import static org.apache.tajo.plan.logical.WindowSpec.*;
+
+/**
+ * <code>ExprAnnotator</code> makes an annotated expression called <code>EvalNode</code> from an
+ * {@link org.apache.tajo.algebra.Expr}. It visits descendants recursively from a given expression, and finally
+ * it returns an EvalNode.
+ */
+public class ExprAnnotator extends BaseAlgebraVisitor<ExprAnnotator.Context, EvalNode> {
+  private CatalogService catalog;
+
+  public ExprAnnotator(CatalogService catalog) {
+    this.catalog = catalog;
+  }
+
+  static class Context {
+    LogicalPlan plan;
+    LogicalPlan.QueryBlock currentBlock;
+    NameResolvingMode columnRsvLevel;
+
+    public Context(LogicalPlanner.PlanContext planContext, NameResolvingMode colRsvLevel) {
+      this.plan = planContext.plan;
+      this.currentBlock = planContext.queryBlock;
+      this.columnRsvLevel = colRsvLevel;
+    }
+  }
+
+  public EvalNode createEvalNode(LogicalPlanner.PlanContext planContext, Expr expr,
+                                 NameResolvingMode colRsvLevel)
+      throws PlanningException {
+    Context context = new Context(planContext, colRsvLevel);
+    return planContext.evalOptimizer.optimize(planContext, visit(context, new Stack<Expr>(), expr));
+  }
+
+  public static void assertEval(boolean condition, String message) throws PlanningException {
+    if (!condition) {
+      throw new PlanningException(message);
+    }
+  }
+
+  /**
+   * It checks both terms in binary expression. If one of both needs type conversion, it inserts a cast expression.
+   *
+   * @param lhs left hand side term
+   * @param rhs right hand side term
+   * @return a pair including left/right hand side terms
+   */
+  public static Pair<EvalNode, EvalNode> convertTypesIfNecessary(EvalNode lhs, EvalNode rhs) {
+    Type lhsType = lhs.getValueType().getType();
+    Type rhsType = rhs.getValueType().getType();
+
+    // If one of both is NULL, it just returns the original types without casting.
+    if (lhsType == Type.NULL_TYPE || rhsType == Type.NULL_TYPE) {
+      return new Pair<EvalNode, EvalNode>(lhs, rhs);
+    }
+
+    Type toBeCasted = TUtil.getFromNestedMap(CatalogUtil.OPERATION_CASTING_MAP, lhsType, rhsType);
+    if (toBeCasted != null) { // if not null, one of either should be converted to another type.
+      // Overwrite lhs, rhs, or both with cast expression.
+      if (lhsType != toBeCasted) {
+        lhs = convertType(lhs, CatalogUtil.newSimpleDataType(toBeCasted));
+      }
+      if (rhsType != toBeCasted) {
+        rhs = convertType(rhs, CatalogUtil.newSimpleDataType(toBeCasted));
+      }
+    }
+
+    return new Pair<EvalNode, EvalNode>(lhs, rhs);
+  }
+
+  /**
+   * Insert a type conversion expression to a given expression.
+   * If the type of expression and <code>toType</code> is already the same, it just returns the original expression.
+   *
+   * @param evalNode an expression
+   * @param toType target type
+   * @return type converted expression.
+   */
+  private static EvalNode convertType(EvalNode evalNode, DataType toType) {
+
+    // if original and toType is the same, we don't need type conversion.
+    if (evalNode.getValueType().equals(toType)) {
+      return evalNode;
+    }
+    // the conversion to null is not allowed.
+    if (evalNode.getValueType().getType() == Type.NULL_TYPE || toType.getType() == Type.NULL_TYPE) {
+      return evalNode;
+    }
+
+    if (evalNode.getType() == EvalType.BETWEEN) {
+      BetweenPredicateEval between = (BetweenPredicateEval) evalNode;
+
+      between.setPredicand(convertType(between.getPredicand(), toType));
+      between.setBegin(convertType(between.getBegin(), toType));
+      between.setEnd(convertType(between.getEnd(), toType));
+
+      return between;
+
+    } else if (evalNode.getType() == EvalType.CASE) {
+
+      CaseWhenEval caseWhenEval = (CaseWhenEval) evalNode;
+      for (CaseWhenEval.IfThenEval ifThen : caseWhenEval.getIfThenEvals()) {
+        ifThen.setResult(convertType(ifThen.getResult(), toType));
+      }
+
+      if (caseWhenEval.hasElse()) {
+        caseWhenEval.setElseResult(convertType(caseWhenEval.getElse(), toType));
+      }
+
+      return caseWhenEval;
+
+    } else if (evalNode.getType() == EvalType.ROW_CONSTANT) {
+      RowConstantEval original = (RowConstantEval) evalNode;
+
+      Datum[] datums = original.getValues();
+      Datum[] convertedDatum = new Datum[datums.length];
+
+      for (int i = 0; i < datums.length; i++) {
+        convertedDatum[i] = DatumFactory.cast(datums[i], toType);
+      }
+
+      RowConstantEval convertedRowConstant = new RowConstantEval(convertedDatum);
+
+      return convertedRowConstant;
+
+    } else if (evalNode.getType() == EvalType.CONST) {
+      ConstEval original = (ConstEval) evalNode;
+      ConstEval newConst = new ConstEval(DatumFactory.cast(original.getValue(), toType));
+      return newConst;
+
+    } else {
+      return new CastEval(evalNode, toType);
+    }
+  }
+
+  ///////////////////////////////////////////////////////////////////////////////////////////////////////////
+  // Logical Operator Section
+  ///////////////////////////////////////////////////////////////////////////////////////////////////////////
+
+  @Override
+  public EvalNode visitAnd(Context ctx, Stack<Expr> stack, BinaryOperator expr) throws PlanningException {
+    stack.push(expr);
+    EvalNode left = visit(ctx, stack, expr.getLeft());
+    EvalNode right = visit(ctx, stack, expr.getRight());
+    stack.pop();
+
+    return new BinaryEval(EvalType.AND, left, right);
+  }
+
+  @Override
+  public EvalNode visitOr(Context ctx, Stack<Expr> stack, BinaryOperator expr) throws PlanningException {
+    stack.push(expr);
+    EvalNode left = visit(ctx, stack, expr.getLeft());
+    EvalNode right = visit(ctx, stack, expr.getRight());
+    stack.pop();
+
+    return new BinaryEval(EvalType.OR, left, right);
+  }
+
+  @Override
+  public EvalNode visitNot(Context ctx, Stack<Expr> stack, NotExpr expr) throws PlanningException {
+    stack.push(expr);
+    EvalNode child = visit(ctx, stack, expr.getChild());
+    stack.pop();
+    return new NotEval(child);
+  }
+
+  ///////////////////////////////////////////////////////////////////////////////////////////////////////////
+  // Comparison Predicates Section
+  ///////////////////////////////////////////////////////////////////////////////////////////////////////////
+  @Override
+  public EvalNode visitEquals(Context ctx, Stack<Expr> stack, BinaryOperator expr) throws PlanningException {
+    return visitCommonComparison(ctx, stack, expr);
+  }
+
+  @Override
+  public EvalNode visitNotEquals(Context ctx, Stack<Expr> stack, BinaryOperator expr) throws PlanningException {
+    return visitCommonComparison(ctx, stack, expr);
+  }
+
+  @Override
+  public EvalNode visitLessThan(Context ctx, Stack<Expr> stack, BinaryOperator expr) throws PlanningException {
+    return visitCommonComparison(ctx, stack, expr);
+  }
+
+  @Override
+  public EvalNode visitLessThanOrEquals(Context ctx, Stack<Expr> stack, BinaryOperator expr) throws PlanningException {
+    return visitCommonComparison(ctx, stack, expr);
+  }
+
+  @Override
+  public EvalNode visitGreaterThan(Context ctx, Stack<Expr> stack, BinaryOperator expr) throws PlanningException {
+    return visitCommonComparison(ctx, stack, expr);
+  }
+
+  @Override
+  public EvalNode visitGreaterThanOrEquals(Context ctx, Stack<Expr> stack, BinaryOperator expr)
+      throws PlanningException {
+    return visitCommonComparison(ctx, stack, expr);
+  }
+
+  public EvalNode visitCommonComparison(Context ctx, Stack<Expr> stack, BinaryOperator expr) throws PlanningException {
+    stack.push(expr);
+    EvalNode left = visit(ctx, stack, expr.getLeft());
+    EvalNode right = visit(ctx, stack, expr.getRight());
+    stack.pop();
+
+    EvalType evalType;
+    switch (expr.getType()) {
+      case Equals:
+        evalType = EvalType.EQUAL;
+        break;
+      case NotEquals:
+        evalType = EvalType.NOT_EQUAL;
+        break;
+      case LessThan:
+        evalType = EvalType.LTH;
+        break;
+      case LessThanOrEquals:
+        evalType = EvalType.LEQ;
+        break;
+      case GreaterThan:
+        evalType = EvalType.GTH;
+        break;
+      case GreaterThanOrEquals:
+        evalType = EvalType.GEQ;
+        break;
+      default:
+      throw new IllegalStateException("Wrong Expr Type: " + expr.getType());
+    }
+
+    return createBinaryNode(evalType, left, right);
+  }
+
+  ///////////////////////////////////////////////////////////////////////////////////////////////////////////
+  // Other Predicates Section
+  ///////////////////////////////////////////////////////////////////////////////////////////////////////////
+
+  @Override
+  public EvalNode visitBetween(Context ctx, Stack<Expr> stack, BetweenPredicate between) throws PlanningException {
+    stack.push(between);
+    EvalNode predicand = visit(ctx, stack, between.predicand());
+    EvalNode begin = visit(ctx, stack, between.begin());
+    EvalNode end = visit(ctx, stack, between.end());
+    stack.pop();
+
+    // implicit type conversion
+    DataType widestType = null;
+
+    try {
+      widestType = CatalogUtil.getWidestType(predicand.getValueType(), begin.getValueType(), end.getValueType());
+    } catch (InvalidOperationException ioe) {
+      throw new PlanningException(ioe);
+    }
+
+    BetweenPredicateEval betweenEval = new BetweenPredicateEval(
+        between.isNot(),
+        between.isSymmetric(),
+        predicand, begin, end);
+
+    betweenEval = (BetweenPredicateEval) convertType(betweenEval, widestType);
+    return betweenEval;
+  }
+
+  @Override
+  public EvalNode visitCaseWhen(Context ctx, Stack<Expr> stack, CaseWhenPredicate caseWhen) throws PlanningException {
+    CaseWhenEval caseWhenEval = new CaseWhenEval();
+
+    EvalNode condition;
+    EvalNode result;
+
+    for (CaseWhenPredicate.WhenExpr when : caseWhen.getWhens()) {
+      condition = visit(ctx, stack, when.getCondition());
+      result = visit(ctx, stack, when.getResult());
+      caseWhenEval.addIfCond(condition, result);
+    }
+
+    if (caseWhen.hasElseResult()) {
+      caseWhenEval.setElseResult(visit(ctx, stack, caseWhen.getElseResult()));
+    }
+
+    // Getting the widest type from all if-then expressions and else expression.
+    DataType widestType = caseWhenEval.getIfThenEvals().get(0).getResult().getValueType();
+    for (int i = 1; i < caseWhenEval.getIfThenEvals().size(); i++) {
+      widestType = CatalogUtil.getWidestType(caseWhenEval.getIfThenEvals().get(i).getResult().getValueType(),
+          widestType);
+    }
+    if (caseWhen.hasElseResult()) {
+      widestType = CatalogUtil.getWidestType(widestType, caseWhenEval.getElse().getValueType());
+    }
+
+    assertEval(widestType != null, "Invalid Type Conversion for CaseWhen");
+
+    // implicit type conversion
+    caseWhenEval = (CaseWhenEval) convertType(caseWhenEval, widestType);
+
+    return caseWhenEval;
+  }
+
+  @Override
+  public EvalNode visitIsNullPredicate(Context ctx, Stack<Expr> stack, IsNullPredicate expr) throws PlanningException {
+    stack.push(expr);
+    EvalNode child = visit(ctx, stack, expr.getPredicand());
+    stack.pop();
+    return new IsNullEval(expr.isNot(), child);
+  }
+
+  @Override
+  public EvalNode visitInPredicate(Context ctx, Stack<Expr> stack, InPredicate expr) throws PlanningException {
+    stack.push(expr);
+    EvalNode lhs = visit(ctx, stack, expr.getLeft());
+    RowConstantEval rowConstantEval = (RowConstantEval) visit(ctx, stack, expr.getInValue());
+    stack.pop();
+
+    Pair<EvalNode, EvalNode> pair = convertTypesIfNecessary(lhs, rowConstantEval);
+
+    return new InEval(pair.getFirst(), (RowConstantEval) pair.getSecond(), expr.isNot());
+  }
+
+  @Override
+  public EvalNode visitValueListExpr(Context ctx, Stack<Expr> stack, ValueListExpr expr) throws PlanningException {
+    Datum[] values = new Datum[expr.getValues().length];
+    EvalNode [] evalNodes = new EvalNode[expr.getValues().length];
+    for (int i = 0; i < expr.getValues().length; i++) {
+      evalNodes[i] = visit(ctx, stack, expr.getValues()[i]);
+      if (!EvalTreeUtil.checkIfCanBeConstant(evalNodes[i])) {
+        throw new PlanningException("Non constant values cannot be included in IN PREDICATE.");
+      }
+      values[i] = EvalTreeUtil.evaluateImmediately(evalNodes[i]);
+    }
+    return new RowConstantEval(values);
+  }
+
+  @Override
+  public EvalNode visitExistsPredicate(Context ctx, Stack<Expr> stack, ExistsPredicate expr) throws PlanningException {
+    throw new PlanningException("Cannot support EXISTS clause yet");
+  }
+
+  ///////////////////////////////////////////////////////////////////////////////////////////////////////////
+  // String Operator or Pattern Matching Predicates Section
+  ///////////////////////////////////////////////////////////////////////////////////////////////////////////
+  @Override
+  public EvalNode visitLikePredicate(Context ctx, Stack<Expr> stack, PatternMatchPredicate expr)
+      throws PlanningException {
+    return visitPatternMatchPredicate(ctx, stack, expr);
+  }
+
+  @Override
+  public EvalNode visitSimilarToPredicate(Context ctx, Stack<Expr> stack, PatternMatchPredicate expr)
+      throws PlanningException {
+    return visitPatternMatchPredicate(ctx, stack, expr);
+  }
+
+  @Override
+  public EvalNode visitRegexpPredicate(Context ctx, Stack<Expr> stack, PatternMatchPredicate expr)
+      throws PlanningException {
+    return visitPatternMatchPredicate(ctx, stack, expr);
+  }
+
+  @Override
+  public EvalNode visitConcatenate(Context ctx, Stack<Expr> stack, BinaryOperator expr) throws PlanningException {
+    stack.push(expr);
+    EvalNode lhs = visit(ctx, stack, expr.getLeft());
+    EvalNode rhs = visit(ctx, stack, expr.getRight());
+    stack.pop();
+
+    if (lhs.getValueType().getType() != Type.TEXT) {
+      lhs = convertType(lhs, CatalogUtil.newSimpleDataType(Type.TEXT));
+    }
+    if (rhs.getValueType().getType() != Type.TEXT) {
+      rhs = convertType(rhs, CatalogUtil.newSimpleDataType(Type.TEXT));
+    }
+
+    return new BinaryEval(EvalType.CONCATENATE, lhs, rhs);
+  }
+
+  private EvalNode visitPatternMatchPredicate(Context ctx, Stack<Expr> stack, PatternMatchPredicate expr)
+      throws PlanningException {
+    EvalNode field = visit(ctx, stack, expr.getPredicand());
+    ConstEval pattern = (ConstEval) visit(ctx, stack, expr.getPattern());
+
+    // A pattern is a const value in pattern matching predicates.
+    // In a binary expression, the result is always null if a const value in left or right side is null.
+    if (pattern.getValue() instanceof NullDatum) {
+      return new ConstEval(NullDatum.get());
+    } else {
+      if (expr.getType() == OpType.LikePredicate) {
+        return new LikePredicateEval(expr.isNot(), field, pattern, expr.isCaseInsensitive());
+      } else if (expr.getType() == OpType.SimilarToPredicate) {
+        return new SimilarToPredicateEval(expr.isNot(), field, pattern);
+      } else {
+        return new RegexPredicateEval(expr.isNot(), field, pattern, expr.isCaseInsensitive());
+      }
+    }
+  }
+
+  ///////////////////////////////////////////////////////////////////////////////////////////////////////////
+  // Arithmetic Operators
+  ///////////////////////////////////////////////////////////////////////////////////////////////////////////
+
+  private static BinaryEval createBinaryNode(EvalType type, EvalNode lhs, EvalNode rhs) {
+    Pair<EvalNode, EvalNode> pair = convertTypesIfNecessary(lhs, rhs); // implicit type conversion if necessary
+    return new BinaryEval(type, pair.getFirst(), pair.getSecond());
+  }
+
+  @Override
+  public EvalNode visitPlus(Context ctx, Stack<Expr> stack, BinaryOperator expr) throws PlanningException {
+    stack.push(expr);
+    EvalNode left = visit(ctx, stack, expr.getLeft());
+    EvalNode right = visit(ctx, stack, expr.getRight());
+    stack.pop();
+
+    return createBinaryNode(EvalType.PLUS, left, right);
+  }
+
+  @Override
+  public EvalNode visitMinus(Context ctx, Stack<Expr> stack, BinaryOperator expr) throws PlanningException {
+    stack.push(expr);
+    EvalNode left = visit(ctx, stack, expr.getLeft());
+    EvalNode right = visit(ctx, stack, expr.getRight());
+    stack.pop();
+
+    return createBinaryNode(EvalType.MINUS, left, right);
+  }
+
+  @Override
+  public EvalNode visitMultiply(Context ctx, Stack<Expr> stack, BinaryOperator expr) throws PlanningException {
+    stack.push(expr);
+    EvalNode left = visit(ctx, stack, expr.getLeft());
+    EvalNode right = visit(ctx, stack, expr.getRight());
+    stack.pop();
+
+    return createBinaryNode(EvalType.MULTIPLY, left, right);
+  }
+
+  @Override
+  public EvalNode visitDivide(Context ctx, Stack<Expr> stack, BinaryOperator expr) throws PlanningException {
+    stack.push(expr);
+    EvalNode left = visit(ctx, stack, expr.getLeft());
+    EvalNode right = visit(ctx, stack, expr.getRight());
+    stack.pop();
+
+    return createBinaryNode(EvalType.DIVIDE, left, right);
+  }
+
+  @Override
+  public EvalNode visitModular(Context ctx, Stack<Expr> stack, BinaryOperator expr) throws PlanningException {
+    stack.push(expr);
+    EvalNode left = visit(ctx, stack, expr.getLeft());
+    EvalNode right = visit(ctx, stack, expr.getRight());
+    stack.pop();
+
+    return createBinaryNode(EvalType.MODULAR, left, right);
+  }
+
+  ///////////////////////////////////////////////////////////////////////////////////////////////////////////
+  // Other Expressions
+  ///////////////////////////////////////////////////////////////////////////////////////////////////////////
+
+  @Override
+  public EvalNode visitSign(Context ctx, Stack<Expr> stack, SignedExpr expr) throws PlanningException {
+    stack.push(expr);
+    EvalNode numericExpr = visit(ctx, stack, expr.getChild());
+    stack.pop();
+
+    if (expr.isNegative()) {
+      return new SignedEval(expr.isNegative(), numericExpr);
+    } else {
+      return numericExpr;
+    }
+  }
+
+  @Override
+  public EvalNode visitColumnReference(Context ctx, Stack<Expr> stack, ColumnReferenceExpr expr)
+      throws PlanningException {
+    Column column;
+
+    switch (ctx.columnRsvLevel) {
+    case LEGACY:
+      column = ctx.plan.resolveColumn(ctx.currentBlock, expr);
+      break;
+    case RELS_ONLY:
+    case RELS_AND_SUBEXPRS:
+    case SUBEXPRS_AND_RELS:
+      column = NameResolver.resolve(ctx.plan, ctx.currentBlock, expr, ctx.columnRsvLevel);
+      break;
+    default:
+      throw new PlanningException("Unsupported column resolving level: " + ctx.columnRsvLevel.name());
+    }
+    return new FieldEval(column);
+  }
+
+  @Override
+  public EvalNode visitTargetExpr(Context ctx, Stack<Expr> stack, NamedExpr expr) throws PlanningException {
+    throw new PlanningException("ExprAnnotator cannot take NamedExpr");
+  }
+
+  ///////////////////////////////////////////////////////////////////////////////////////////////////////////
+  // Functions and General Set Functions Section
+  ///////////////////////////////////////////////////////////////////////////////////////////////////////////
+
+  @Override
+  public EvalNode visitFunction(Context ctx, Stack<Expr> stack, FunctionExpr expr) throws PlanningException {
+    stack.push(expr); // <--- Push
+
+    // Given parameters
+    Expr[] params = expr.getParams();
+    if (params == null) {
+      params = new Expr[0];
+    }
+
+    EvalNode[] givenArgs = new EvalNode[params.length];
+    DataType[] paramTypes = new DataType[params.length];
+
+    for (int i = 0; i < params.length; i++) {
+      givenArgs[i] = visit(ctx, stack, params[i]);
+      paramTypes[i] = givenArgs[i].getValueType();
+    }
+
+    stack.pop(); // <--- Pop
+
+    if (!catalog.containFunction(expr.getSignature(), paramTypes)) {
+      throw new NoSuchFunctionException(expr.getSignature(), paramTypes);
+    }
+
+    FunctionDesc funcDesc = catalog.getFunction(expr.getSignature(), paramTypes);
+
+    // trying the implicit type conversion between actual parameter types and the definition types.
+    if (CatalogUtil.checkIfVariableLengthParamDefinition(TUtil.newList(funcDesc.getParamTypes()))) {
+      DataType lastDataType = funcDesc.getParamTypes()[0];
+      for (int i = 0; i < givenArgs.length; i++) {
+        if (i < (funcDesc.getParamTypes().length - 1)) { // variable length
+          lastDataType = funcDesc.getParamTypes()[i];
+        } else {
+          lastDataType = CatalogUtil.newSimpleDataType(CatalogUtil.getPrimitiveTypeOf(lastDataType.getType()));
+        }
+        givenArgs[i] = convertType(givenArgs[i], lastDataType);
+      }
+    } else {
+      assertEval(funcDesc.getParamTypes().length == givenArgs.length,
+          "The number of parameters is mismatched to the function definition: " + funcDesc.toString());
+      // According to our function matching method, each given argument can be casted to the definition parameter.
+      for (int i = 0; i < givenArgs.length; i++) {
+        givenArgs[i] = convertType(givenArgs[i], funcDesc.getParamTypes()[i]);
+      }
+    }
+
+
+    try {
+      FunctionType functionType = funcDesc.getFuncType();
+      if (functionType == FunctionType.GENERAL
+          || functionType == FunctionType.UDF) {
+        return new GeneralFunctionEval(funcDesc, (GeneralFunction) funcDesc.newInstance(), givenArgs);
+      } else if (functionType == FunctionType.AGGREGATION
+          || functionType == FunctionType.UDA) {
+        if (!ctx.currentBlock.hasNode(NodeType.GROUP_BY)) {
+          ctx.currentBlock.setAggregationRequire();
+        }
+        return new AggregationFunctionCallEval(funcDesc, (AggFunction) funcDesc.newInstance(), givenArgs);
+      } else if (functionType == FunctionType.DISTINCT_AGGREGATION
+          || functionType == FunctionType.DISTINCT_UDA) {
+        throw new PlanningException("Unsupported function: " + funcDesc.toString());
+      } else {
+        throw new PlanningException("Unsupported Function Type: " + functionType.name());
+      }
+    } catch (InternalException e) {
+      throw new PlanningException(e);
+    }
+  }
+
+  @Override
+  public EvalNode visitCountRowsFunction(Context ctx, Stack<Expr> stack, CountRowsFunctionExpr expr)
+      throws PlanningException {
+    FunctionDesc countRows = catalog.getFunction("count", FunctionType.AGGREGATION,
+        new DataType[] {});
+    if (countRows == null) {
+      throw new NoSuchFunctionException(expr.getSignature(), new DataType[]{});
+    }
+
+    try {
+      ctx.currentBlock.setAggregationRequire();
+
+      return new AggregationFunctionCallEval(countRows, (AggFunction) countRows.newInstance(),
+          new EvalNode[] {});
+    } catch (InternalException e) {
+      throw new NoSuchFunctionException(countRows.getFunctionName(), new DataType[]{});
+    }
+  }
+
+  @Override
+  public EvalNode visitGeneralSetFunction(Context ctx, Stack<Expr> stack, GeneralSetFunctionExpr setFunction)
+      throws PlanningException {
+
+    Expr[] params = setFunction.getParams();
+    EvalNode[] givenArgs = new EvalNode[params.length];
+    DataType[] paramTypes = new DataType[params.length];
+
+    FunctionType functionType = setFunction.isDistinct() ?
+        FunctionType.DISTINCT_AGGREGATION : FunctionType.AGGREGATION;
+    givenArgs[0] = visit(ctx, stack, params[0]);
+    if (setFunction.getSignature().equalsIgnoreCase("count")) {
+      paramTypes[0] = CatalogUtil.newSimpleDataType(Type.ANY);
+    } else {
+      paramTypes[0] = givenArgs[0].getValueType();
+    }
+
+    if (!catalog.containFunction(setFunction.getSignature(), functionType, paramTypes)) {
+      throw new NoSuchFunctionException(setFunction.getSignature(), paramTypes);
+    }
+
+    FunctionDesc funcDesc = catalog.getFunction(setFunction.getSignature(), functionType, paramTypes);
+    if (!ctx.currentBlock.hasNode(NodeType.GROUP_BY)) {
+      ctx.currentBlock.setAggregationRequire();
+    }
+
+    try {
+      return new AggregationFunctionCallEval(funcDesc, (AggFunction) funcDesc.newInstance(), givenArgs);
+    } catch (InternalException e) {
+      throw new PlanningException(e);
+    }
+  }
+
+  public static final Set<String> WINDOW_FUNCTIONS =
+      Sets.newHashSet("row_number", "rank", "dense_rank", "percent_rank", "cume_dist");
+
+  public EvalNode visitWindowFunction(Context ctx, Stack<Expr> stack, WindowFunctionExpr windowFunc)
+      throws PlanningException {
+
+    WindowSpec windowSpec = windowFunc.getWindowSpec();
+
+    Expr key;
+    if (windowSpec.hasPartitionBy()) {
+      for (int i = 0; i < windowSpec.getPartitionKeys().length; i++) {
+        key = windowSpec.getPartitionKeys()[i];
+        visit(ctx, stack, key);
+      }
+    }
+
+    EvalNode [] sortKeys = null;
+    if (windowSpec.hasOrderBy()) {
+      sortKeys = new EvalNode[windowSpec.getSortSpecs().length];
+      for (int i = 0; i < windowSpec.getSortSpecs().length; i++) {
+        key = windowSpec.getSortSpecs()[i].getKey();
+        sortKeys[i] = visit(ctx, stack, key);
+      }
+    }
+
+    String funcName = windowFunc.getSignature();
+    boolean distinct = windowFunc.isDistinct();
+    Expr[] params = windowFunc.getParams();
+    EvalNode[] givenArgs = new EvalNode[params.length];
+    TajoDataTypes.DataType[] paramTypes = new TajoDataTypes.DataType[params.length];
+    FunctionType functionType;
+
+    WindowFrame frame = null;
+
+    if (params.length > 0) {
+      givenArgs[0] = visit(ctx, stack, params[0]);
+      if (windowFunc.getSignature().equalsIgnoreCase("count")) {
+        paramTypes[0] = CatalogUtil.newSimpleDataType(TajoDataTypes.Type.ANY);
+      } else if (windowFunc.getSignature().equalsIgnoreCase("row_number")) {
+        paramTypes[0] = CatalogUtil.newSimpleDataType(Type.INT8);
+      } else {
+        paramTypes[0] = givenArgs[0].getValueType();
+      }
+    } else {
+      if (windowFunc.getSignature().equalsIgnoreCase("rank")) {
+        givenArgs = sortKeys != null ? sortKeys : new EvalNode[0];
+      }
+    }
+
+    if (frame == null) {
+      if (windowSpec.hasOrderBy()) {
+        frame = new WindowFrame(new WindowStartBound(WindowFrameStartBoundType.UNBOUNDED_PRECEDING),
+            new WindowEndBound(WindowFrameEndBoundType.CURRENT_ROW));
+      } else if (windowFunc.getSignature().equalsIgnoreCase("row_number")) {
+        frame = new WindowFrame(new WindowStartBound(WindowFrameStartBoundType.UNBOUNDED_PRECEDING),
+            new WindowEndBound(WindowFrameEndBoundType.UNBOUNDED_FOLLOWING));
+      } else {
+        frame = new WindowFrame();
+      }
+    }
+
+    // TODO - containFunction and getFunction should support the function type mask which provides ORing multiple types.
+    // the below checking against WINDOW_FUNCTIONS is a workaround code for the above problem.
+    if (WINDOW_FUNCTIONS.contains(funcName.toLowerCase())) {
+      if (distinct) {
+        throw new NoSuchFunctionException("row_number() does not support distinct keyword.");
+      }
+      functionType = FunctionType.WINDOW;
+    } else {
+      functionType = distinct ? FunctionType.DISTINCT_AGGREGATION : FunctionType.AGGREGATION;
+    }
+
+    if (!catalog.containFunction(windowFunc.getSignature(), functionType, paramTypes)) {
+      throw new NoSuchFunctionException(funcName, paramTypes);
+    }
+
+    FunctionDesc funcDesc = catalog.getFunction(funcName, functionType, paramTypes);
+
+    try {
+      return new WindowFunctionEval(funcDesc, (AggFunction) funcDesc.newInstance(), givenArgs, frame);
+    } catch (InternalException e) {
+      throw new PlanningException(e);
+    }
+  }
+
+  ///////////////////////////////////////////////////////////////////////////////////////////////////////////
+  // Literal Section
+  ///////////////////////////////////////////////////////////////////////////////////////////////////////////
+
+  @Override
+  public EvalNode visitDataType(Context ctx, Stack<Expr> stack, DataTypeExpr expr) throws PlanningException {
+    return super.visitDataType(ctx, stack, expr);
+  }
+
+  @Override
+  public EvalNode visitCastExpr(Context ctx, Stack<Expr> stack, CastExpr expr) throws PlanningException {
+    EvalNode child = super.visitCastExpr(ctx, stack, expr);
+
+    if (child.getType() == EvalType.CONST) { // if it is a casting operation for a constant value
+      ConstEval constEval = (ConstEval) child; // it will be pre-computed and casted to a constant value
+      return new ConstEval(DatumFactory.cast(constEval.getValue(), LogicalPlanner.convertDataType(expr.getTarget())));
+    } else {
+      return new CastEval(child, LogicalPlanner.convertDataType(expr.getTarget()));
+    }
+  }
+
+  @Override
+  public EvalNode visitLiteral(Context ctx, Stack<Expr> stack, LiteralValue expr) throws PlanningException {
+    switch (expr.getValueType()) {
+    case Boolean:
+      return new ConstEval(DatumFactory.createBool(((BooleanLiteral) expr).isTrue()));
+    case String:
+      return new ConstEval(DatumFactory.createText(expr.getValue()));
+    case Unsigned_Integer:
+      return new ConstEval(DatumFactory.createInt4(expr.getValue()));
+    case Unsigned_Large_Integer:
+      return new ConstEval(DatumFactory.createInt8(expr.getValue()));
+    case Unsigned_Float:
+      return new ConstEval(DatumFactory.createFloat8(expr.getValue()));
+    default:
+      throw new RuntimeException("Unsupported type: " + expr.getValueType());
+    }
+  }
+
+  @Override
+  public EvalNode visitNullLiteral(Context ctx, Stack<Expr> stack, NullLiteral expr) throws PlanningException {
+    return new ConstEval(NullDatum.get());
+  }
+
+  @Override
+  public EvalNode visitDateLiteral(Context context, Stack<Expr> stack, DateLiteral expr) throws PlanningException {
+    DateValue dateValue = expr.getDate();
+    int[] dates = dateToIntArray(dateValue.getYears(), dateValue.getMonths(), dateValue.getDays());
+
+    TimeMeta tm = new TimeMeta();
+    tm.years = dates[0];
+    tm.monthOfYear = dates[1];
+    tm.dayOfMonth = dates[2];
+
+    DateTimeUtil.j2date(DateTimeUtil.date2j(dates[0], dates[1], dates[2]), tm);
+
+    return new ConstEval(new DateDatum(DateTimeUtil.date2j(tm.years, tm.monthOfYear, tm.dayOfMonth)));
+  }
+
+  @Override
+  public EvalNode visitTimestampLiteral(Context ctx, Stack<Expr> stack, TimestampLiteral expr)
+      throws PlanningException {
+    DateValue dateValue = expr.getDate();
+    TimeValue timeValue = expr.getTime();
+
+    int [] dates = dateToIntArray(dateValue.getYears(),
+        dateValue.getMonths(),
+        dateValue.getDays());
+    int [] times = timeToIntArray(timeValue.getHours(),
+        timeValue.getMinutes(),
+        timeValue.getSeconds(),
+        timeValue.getSecondsFraction());
+
+    long timestamp;
+    if (timeValue.hasSecondsFraction()) {
+      timestamp = DateTimeUtil.toJulianTimestamp(dates[0], dates[1], dates[2], times[0], times[1], times[2],
+          times[3] * 1000);
+    } else {
+      timestamp = DateTimeUtil.toJulianTimestamp(dates[0], dates[1], dates[2], times[0], times[1], times[2], 0);
+    }
+
+    TimeMeta tm = new TimeMeta();
+    DateTimeUtil.toJulianTimeMeta(timestamp, tm);
+    DateTimeUtil.toUTCTimezone(tm);
+
+    return new ConstEval(new TimestampDatum(DateTimeUtil.toJulianTimestamp(tm)));
+  }
+
+  @Override
+  public EvalNode visitIntervalLiteral(Context ctx, Stack<Expr> stack, IntervalLiteral expr) throws PlanningException {
+    return new ConstEval(new IntervalDatum(expr.getExprStr()));
+  }
+
+  @Override
+  public EvalNode visitTimeLiteral(Context ctx, Stack<Expr> stack, TimeLiteral expr) throws PlanningException {
+    TimeValue timeValue = expr.getTime();
+    int [] times = timeToIntArray(timeValue.getHours(),
+        timeValue.getMinutes(),
+        timeValue.getSeconds(),
+        timeValue.getSecondsFraction());
+
+    long time;
+    if (timeValue.hasSecondsFraction()) {
+      time = DateTimeUtil.toTime(times[0], times[1], times[2], times[3] * 1000);
+    } else {
+      time = DateTimeUtil.toTime(times[0], times[1], times[2], 0);
+    }
+    TimeDatum timeDatum = new TimeDatum(time);
+    TimeMeta tm = timeDatum.toTimeMeta();
+    DateTimeUtil.toUTCTimezone(tm);
+
+    return new ConstEval(new TimeDatum(DateTimeUtil.toTime(tm)));
+  }
+
+  public static int [] dateToIntArray(String years, String months, String days)
+      throws PlanningException {
+    int year = Integer.valueOf(years);
+    int month = Integer.valueOf(months);
+    int day = Integer.valueOf(days);
+
+    if (!(1 <= year && year <= 9999)) {
+      throw new PlanningException(String.format("Years (%d) must be between 1 and 9999 integer value", year));
+    }
+
+    if (!(1 <= month && month <= 12)) {
+      throw new PlanningException(String.format("Months (%d) must be between 1 and 12 integer value", month));
+    }
+
+    if (!(1<= day && day <= 31)) {
+      throw new PlanningException(String.format("Days (%d) must be between 1 and 31 integer value", day));
+    }
+
+    int [] results = new int[3];
+    results[0] = year;
+    results[1] = month;
+    results[2] = day;
+
+    return results;
+  }
+
+  public static int [] timeToIntArray(String hours, String minutes, String seconds, String fractionOfSecond)
+      throws PlanningException {
+    int hour = Integer.valueOf(hours);
+    int minute = Integer.valueOf(minutes);
+    int second = Integer.valueOf(seconds);
+    int fraction = 0;
+    if (fractionOfSecond != null) {
+      fraction = Integer.valueOf(fractionOfSecond);
+    }
+
+    if (!(0 <= hour && hour <= 23)) {
+      throw new PlanningException(String.format("Hours (%d) must be between 0 and 24 integer value", hour));
+    }
+
+    if (!(0 <= minute && minute <= 59)) {
+      throw new PlanningException(String.format("Minutes (%d) must be between 0 and 59 integer value", minute));
+    }
+
+    if (!(0 <= second && second <= 59)) {
+      throw new PlanningException(String.format("Seconds (%d) must be between 0 and 59 integer value", second));
+    }
+
+    if (fraction != 0) {
+      if (!(0 <= fraction && fraction <= 999)) {
+        throw new PlanningException(String.format("Seconds (%d) must be between 0 and 999 integer value", fraction));
+      }
+    }
+
+    int [] results = new int[4];
+    results[0] = hour;
+    results[1] = minute;
+    results[2] = second;
+    results[3] = fraction;
+
+    return results;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/b143f991/tajo-plan/src/main/java/org/apache/tajo/plan/ExprNormalizer.java
----------------------------------------------------------------------
diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/ExprNormalizer.java b/tajo-plan/src/main/java/org/apache/tajo/plan/ExprNormalizer.java
new file mode 100644
index 0000000..2a6cb0b
--- /dev/null
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/ExprNormalizer.java
@@ -0,0 +1,389 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.plan;
+
+import com.google.common.collect.Sets;
+import org.apache.tajo.algebra.*;
+import org.apache.tajo.catalog.CatalogUtil;
+import org.apache.tajo.catalog.exception.NoSuchColumnException;
+import org.apache.tajo.plan.nameresolver.NameResolver;
+import org.apache.tajo.plan.nameresolver.NameResolvingMode;
+import org.apache.tajo.plan.visitor.SimpleAlgebraVisitor;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import java.util.Stack;
+
+/**
+ * ExprNormalizer performs two kinds of works:
+ *
+ * <h3>1. Duplication Removal.</h3>
+ *
+ * For example, assume a simple query as follows:
+ * <pre>
+ *   select price * rate as total_price, ..., order by price * rate
+ * </pre>
+ *
+ * The expression <code>price * rate</code> is duplicated in both select list and order by clause.
+ * Against those cases, ExprNormalizer removes duplicated expressions and replaces one with one reference.
+ * In the case, ExprNormalizer replaces price * rate with total_price reference.
+ *
+ * <h3>2. Dissection of Expression</h3>
+ *
+ * A expression can be a complex expressions, including a mixed of scalar and aggregation expressions.
+ * For example, assume an aggregation query as follows:
+ * <pre>
+ *   select sum(price * rate) * (1 - avg(discount_rate))), ...
+ * </pre>
+ *
+ * In this case, ExprNormalizer dissects the expression 'sum(price * rate) * (1 - avg(discount_rate)))'
+ * into the following expressions:
+ * <ul>
+ *   <li>$1 = price * rage</li>
+ *   <li>$2 = sum($1)</li>
+ *   <li>$3 = avg(discount_rate)</li>
+ *   <li>$4 = $2 * (1 - $3)</li>
+ * </ul>
+ *
+ * It mainly two advantages. Firstly, it makes complex expression evaluations easier across multiple physical executors.
+ * Second, it gives move opportunities to remove duplicated expressions.
+ *
+ * <h3>3. Name Normalization</h3>
+ *
+ * Users can use qualified column names, unqualified column names or aliased column references.
+ *
+ * Consider the following example:
+ *
+ * <pre>
+ *   select rate_a as total_rate, rate_a * 100, table1.rate_a, ... WHERE total_rate * 100
+ * </pre>
+ *
+ * <code>total_rate</code>, <code>rate_a</code>, and <code>table1.rate_a</code> are all the same references. But,
+ * they have different forms. Due to their different forms, duplication removal can be hard.
+ *
+ * In order to solve this problem, ExprNormalizer normalizes all column references as qualified names while it keeps
+ * its points..
+ */
+class ExprNormalizer extends SimpleAlgebraVisitor<ExprNormalizer.ExprNormalizedResult, Object> {
+
+  public static class ExprNormalizedResult {
+    private final LogicalPlan plan;
+    private final LogicalPlan.QueryBlock block;
+    private final boolean tryBinaryCommonTermsElimination;
+
+    Expr baseExpr; // outmost expressions, which can includes one or more references of the results of aggregation
+                   // function.
+    List<NamedExpr> aggExprs = new ArrayList<NamedExpr>(); // aggregation functions
+    List<NamedExpr> scalarExprs = new ArrayList<NamedExpr>(); // scalar expressions which can be referred
+    List<NamedExpr> windowAggExprs = new ArrayList<NamedExpr>(); // window expressions which can be referred
+    Set<WindowSpecReferences> windowSpecs = Sets.newLinkedHashSet();
+
+    public ExprNormalizedResult(LogicalPlanner.PlanContext context, boolean tryBinaryCommonTermsElimination) {
+      this.plan = context.plan;
+      this.block = context.queryBlock;
+      this.tryBinaryCommonTermsElimination = tryBinaryCommonTermsElimination;
+    }
+
+    public boolean isBinaryCommonTermsElimination() {
+      return tryBinaryCommonTermsElimination;
+    }
+
+    @Override
+    public String toString() {
+      return baseExpr.toString() + ", agg=" + aggExprs.size() + ", scalar=" + scalarExprs.size();
+    }
+  }
+
+  public ExprNormalizedResult normalize(LogicalPlanner.PlanContext context, Expr expr) throws PlanningException {
+    return normalize(context, expr, false);
+  }
+  public ExprNormalizedResult normalize(LogicalPlanner.PlanContext context, Expr expr, boolean subexprElimination)
+      throws PlanningException {
+    ExprNormalizedResult exprNormalizedResult = new ExprNormalizedResult(context, subexprElimination);
+    Stack<Expr> stack = new Stack<Expr>();
+    stack.push(expr);
+    visit(exprNormalizedResult, new Stack<Expr>(), expr);
+    exprNormalizedResult.baseExpr = stack.pop();
+    return exprNormalizedResult;
+  }
+
+  @Override
+  public Object visitCaseWhen(ExprNormalizedResult ctx, Stack<Expr> stack, CaseWhenPredicate expr)
+      throws PlanningException {
+    stack.push(expr);
+    for (CaseWhenPredicate.WhenExpr when : expr.getWhens()) {
+      visit(ctx, stack, when.getCondition());
+      visit(ctx, stack, when.getResult());
+
+      if (OpType.isAggregationFunction(when.getCondition().getType())) {
+        String referenceName = ctx.block.namedExprsMgr.addExpr(when.getCondition());
+        ctx.aggExprs.add(new NamedExpr(when.getCondition(), referenceName));
+        when.setCondition(new ColumnReferenceExpr(referenceName));
+      }
+
+      if (OpType.isAggregationFunction(when.getResult().getType())) {
+        String referenceName = ctx.block.namedExprsMgr.addExpr(when.getResult());
+        ctx.aggExprs.add(new NamedExpr(when.getResult(), referenceName));
+        when.setResult(new ColumnReferenceExpr(referenceName));
+      }
+    }
+
+    if (expr.hasElseResult()) {
+      visit(ctx, stack, expr.getElseResult());
+      if (OpType.isAggregationFunction(expr.getElseResult().getType())) {
+        String referenceName = ctx.block.namedExprsMgr.addExpr(expr.getElseResult());
+        ctx.aggExprs.add(new NamedExpr(expr.getElseResult(), referenceName));
+        expr.setElseResult(new ColumnReferenceExpr(referenceName));
+      }
+    }
+    stack.pop();
+    return expr;
+  }
+
+  @Override
+  public Expr visitUnaryOperator(ExprNormalizedResult ctx, Stack<Expr> stack, UnaryOperator expr) throws PlanningException {
+    super.visitUnaryOperator(ctx, stack, expr);
+    if (OpType.isAggregationFunction(expr.getChild().getType())) {
+      // Get an anonymous column name and replace the aggregation function by the column name
+      String refName = ctx.block.namedExprsMgr.addExpr(expr.getChild());
+      ctx.aggExprs.add(new NamedExpr(expr.getChild(), refName));
+      expr.setChild(new ColumnReferenceExpr(refName));
+    }
+
+    return expr;
+  }
+
+  private boolean isBinaryCommonTermsElimination(ExprNormalizedResult ctx, Expr expr) {
+    return ctx.isBinaryCommonTermsElimination() && expr.getType() != OpType.Column
+        && ctx.block.namedExprsMgr.contains(expr);
+  }
+
+  @Override
+  public Expr visitBinaryOperator(ExprNormalizedResult ctx, Stack<Expr> stack, BinaryOperator expr) throws PlanningException {
+    stack.push(expr);
+
+    visit(ctx, new Stack<Expr>(), expr.getLeft());
+    if (isBinaryCommonTermsElimination(ctx, expr.getLeft())) {
+      String refName = ctx.block.namedExprsMgr.addExpr(expr.getLeft());
+      expr.setLeft(new ColumnReferenceExpr(refName));
+    }
+
+    visit(ctx, new Stack<Expr>(), expr.getRight());
+    if (isBinaryCommonTermsElimination(ctx, expr.getRight())) {
+      String refName = ctx.block.namedExprsMgr.addExpr(expr.getRight());
+      expr.setRight(new ColumnReferenceExpr(refName));
+    }
+    stack.pop();
+
+    ////////////////////////
+    // For Left Term
+    ////////////////////////
+
+    if (OpType.isAggregationFunction(expr.getLeft().getType())) {
+      String leftRefName = ctx.block.namedExprsMgr.addExpr(expr.getLeft());
+      ctx.aggExprs.add(new NamedExpr(expr.getLeft(), leftRefName));
+      expr.setLeft(new ColumnReferenceExpr(leftRefName));
+    }
+
+
+    ////////////////////////
+    // For Right Term
+    ////////////////////////
+    if (OpType.isAggregationFunction(expr.getRight().getType())) {
+      String rightRefName = ctx.block.namedExprsMgr.addExpr(expr.getRight());
+      ctx.aggExprs.add(new NamedExpr(expr.getRight(), rightRefName));
+      expr.setRight(new ColumnReferenceExpr(rightRefName));
+    }
+
+    return expr;
+  }
+
+  ///////////////////////////////////////////////////////////////////////////////////////////////////////////
+  // Function Section
+  ///////////////////////////////////////////////////////////////////////////////////////////////////////////
+
+  @Override
+  public Expr visitFunction(ExprNormalizedResult ctx, Stack<Expr> stack, FunctionExpr expr) throws PlanningException {
+    stack.push(expr);
+
+    Expr param;
+    Expr[] paramExprs = expr.getParams();
+    if (paramExprs != null) {
+      for (int i = 0; i < paramExprs.length; i++) {
+        param = paramExprs[i];
+        visit(ctx, stack, param);
+
+        if (OpType.isAggregationFunction(param.getType())) {
+          String referenceName = ctx.plan.generateUniqueColumnName(param);
+          ctx.aggExprs.add(new NamedExpr(param, referenceName));
+          expr.getParams()[i] = new ColumnReferenceExpr(referenceName);
+        }
+      }
+    }
+    stack.pop();
+
+    return expr;
+  }
+
+  @Override
+  public Expr visitGeneralSetFunction(ExprNormalizedResult ctx, Stack<Expr> stack, GeneralSetFunctionExpr expr)
+      throws PlanningException {
+    stack.push(expr);
+
+    Expr param;
+    for (int i = 0; i < expr.getParams().length; i++) {
+      param = expr.getParams()[i];
+      visit(ctx, stack, param);
+
+
+      // If parameters are all constants, we don't need to dissect an aggregation expression into two parts:
+      // function and parameter parts.
+      if (!OpType.isLiteralType(param.getType()) && param.getType() != OpType.Column) {
+        String referenceName = ctx.block.namedExprsMgr.addExpr(param);
+        ctx.scalarExprs.add(new NamedExpr(param, referenceName));
+        expr.getParams()[i] = new ColumnReferenceExpr(referenceName);
+      }
+    }
+    stack.pop();
+    return expr;
+  }
+
+  public Expr visitWindowFunction(ExprNormalizedResult ctx, Stack<Expr> stack, WindowFunctionExpr expr)
+      throws PlanningException {
+    stack.push(expr);
+
+    WindowSpec windowSpec = expr.getWindowSpec();
+    Expr key;
+
+    WindowSpecReferences windowSpecReferences;
+    if (windowSpec.hasWindowName()) {
+      windowSpecReferences = new WindowSpecReferences(windowSpec.getWindowName());
+    } else {
+      String [] partitionKeyReferenceNames = null;
+      if (windowSpec.hasPartitionBy()) {
+        partitionKeyReferenceNames = new String [windowSpec.getPartitionKeys().length];
+        for (int i = 0; i < windowSpec.getPartitionKeys().length; i++) {
+          key = windowSpec.getPartitionKeys()[i];
+          visit(ctx, stack, key);
+          partitionKeyReferenceNames[i] = ctx.block.namedExprsMgr.addExpr(key);
+        }
+      }
+
+      String [] orderKeyReferenceNames = null;
+      if (windowSpec.hasOrderBy()) {
+        orderKeyReferenceNames = new String[windowSpec.getSortSpecs().length];
+        for (int i = 0; i < windowSpec.getSortSpecs().length; i++) {
+          key = windowSpec.getSortSpecs()[i].getKey();
+          visit(ctx, stack, key);
+          String referenceName = ctx.block.namedExprsMgr.addExpr(key);
+          if (OpType.isAggregationFunction(key.getType())) {
+            ctx.aggExprs.add(new NamedExpr(key, referenceName));
+            windowSpec.getSortSpecs()[i].setKey(new ColumnReferenceExpr(referenceName));
+          }
+          orderKeyReferenceNames[i] = referenceName;
+        }
+      }
+      windowSpecReferences =
+          new WindowSpecReferences(partitionKeyReferenceNames,orderKeyReferenceNames);
+    }
+    ctx.windowSpecs.add(windowSpecReferences);
+
+    String funcExprRef = ctx.block.namedExprsMgr.addExpr(expr);
+    ctx.windowAggExprs.add(new NamedExpr(expr, funcExprRef));
+    stack.pop();
+
+    ctx.block.setHasWindowFunction();
+    return expr;
+  }
+
+  ///////////////////////////////////////////////////////////////////////////////////////////////////////////
+  // Literal Section
+  ///////////////////////////////////////////////////////////////////////////////////////////////////////////
+
+  @Override
+  public Expr visitCastExpr(ExprNormalizedResult ctx, Stack<Expr> stack, CastExpr expr) throws PlanningException {
+    super.visitCastExpr(ctx, stack, expr);
+    if (OpType.isAggregationFunction(expr.getType())) {
+      String referenceName = ctx.block.namedExprsMgr.addExpr(expr.getChild());
+      ctx.aggExprs.add(new NamedExpr(expr.getChild(), referenceName));
+      expr.setChild(new ColumnReferenceExpr(referenceName));
+    }
+    return expr;
+  }
+
+  @Override
+  public Expr visitColumnReference(ExprNormalizedResult ctx, Stack<Expr> stack, ColumnReferenceExpr expr)
+      throws PlanningException {
+
+    if (ctx.block.isAliasedName(expr.getCanonicalName())) {
+      String originalName = ctx.block.getOriginalName(expr.getCanonicalName());
+      expr.setName(originalName);
+      return expr;
+    }
+    // if a column reference is not qualified, it finds and sets the qualified column name.
+    if (!(expr.hasQualifier() && CatalogUtil.isFQTableName(expr.getQualifier()))) {
+      if (!ctx.block.namedExprsMgr.contains(expr.getCanonicalName()) && expr.getType() == OpType.Column) {
+        try {
+          String normalized =
+              NameResolver.resolve(ctx.plan, ctx.block, expr, NameResolvingMode.LEGACY).getQualifiedName();
+          expr.setName(normalized);
+        } catch (NoSuchColumnException nsc) {
+        }
+      }
+    }
+    return expr;
+  }
+
+  public static class WindowSpecReferences {
+    String windowName;
+
+    String [] partitionKeys;
+    String [] orderKeys;
+
+    public WindowSpecReferences(String windowName) {
+      this.windowName = windowName;
+    }
+
+    public WindowSpecReferences(String [] partitionKeys, String [] orderKeys) {
+      this.partitionKeys = partitionKeys;
+      this.orderKeys = orderKeys;
+    }
+
+    public String getWindowName() {
+      return windowName;
+    }
+
+    public boolean hasPartitionKeys() {
+      return partitionKeys != null;
+    }
+
+    public String [] getPartitionKeys() {
+      return partitionKeys;
+    }
+
+    public boolean hasOrderBy() {
+      return orderKeys != null;
+    }
+
+    public String [] getOrderKeys() {
+      return orderKeys;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/b143f991/tajo-plan/src/main/java/org/apache/tajo/plan/IllegalQueryStatusException.java
----------------------------------------------------------------------
diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/IllegalQueryStatusException.java b/tajo-plan/src/main/java/org/apache/tajo/plan/IllegalQueryStatusException.java
new file mode 100644
index 0000000..31f9186
--- /dev/null
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/IllegalQueryStatusException.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.plan;
+
+public class IllegalQueryStatusException extends Exception {
+
+  public IllegalQueryStatusException() {
+
+  }
+
+  public IllegalQueryStatusException(String msg) {
+    super(msg);
+  }
+
+  public IllegalQueryStatusException(Exception e) {
+    super(e);
+  }
+
+  public IllegalQueryStatusException(String msg, Exception e) {
+    super(msg, e);
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/b143f991/tajo-plan/src/main/java/org/apache/tajo/plan/InvalidQueryException.java
----------------------------------------------------------------------
diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/InvalidQueryException.java b/tajo-plan/src/main/java/org/apache/tajo/plan/InvalidQueryException.java
new file mode 100644
index 0000000..db986e1
--- /dev/null
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/InvalidQueryException.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.plan;
+
+public class InvalidQueryException extends RuntimeException {
+	private static final long serialVersionUID = -7085849718839416246L;
+
+  public InvalidQueryException() {
+    super();
+  }
+
+	public InvalidQueryException(String message) {
+    super(message);
+  }
+	
+	public InvalidQueryException(Throwable t) {
+		super(t);
+	}
+}


Mime
View raw message