tajo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hyun...@apache.org
Subject [37/51] [partial] tajo git commit: TAJO-1761: Separate an integration unit test kit into an independent module.
Date Fri, 14 Aug 2015 14:30:16 GMT
http://git-wip-us.apache.org/repos/asf/tajo/blob/a4106883/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestRightOuterMergeJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestRightOuterMergeJoinExec.java b/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestRightOuterMergeJoinExec.java
new file mode 100644
index 0000000..9a5a85f
--- /dev/null
+++ b/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestRightOuterMergeJoinExec.java
@@ -0,0 +1,531 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.physical;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.LocalTajoTestingUtility;
+import org.apache.tajo.TajoTestingCluster;
+import org.apache.tajo.algebra.Expr;
+import org.apache.tajo.catalog.*;
+import org.apache.tajo.common.TajoDataTypes.Type;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.datum.DatumFactory;
+import org.apache.tajo.engine.parser.SQLAnalyzer;
+import org.apache.tajo.engine.planner.PhysicalPlanner;
+import org.apache.tajo.engine.planner.PhysicalPlannerImpl;
+import org.apache.tajo.engine.planner.enforce.Enforcer;
+import org.apache.tajo.engine.query.QueryContext;
+import org.apache.tajo.exception.TajoException;
+import org.apache.tajo.plan.LogicalPlanner;
+import org.apache.tajo.plan.logical.JoinNode;
+import org.apache.tajo.plan.logical.LogicalNode;
+import org.apache.tajo.plan.logical.NodeType;
+import org.apache.tajo.plan.util.PlannerUtil;
+import org.apache.tajo.storage.Appender;
+import org.apache.tajo.storage.FileTablespace;
+import org.apache.tajo.storage.TablespaceManager;
+import org.apache.tajo.storage.VTuple;
+import org.apache.tajo.storage.fragment.FileFragment;
+import org.apache.tajo.util.CommonTestingUtil;
+import org.apache.tajo.util.TUtil;
+import org.apache.tajo.worker.TaskAttemptContext;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static org.apache.tajo.TajoConstants.DEFAULT_DATABASE_NAME;
+import static org.apache.tajo.TajoConstants.DEFAULT_TABLESPACE_NAME;
+import static org.apache.tajo.plan.serder.PlanProto.JoinEnforce.JoinAlgorithm;
+import static org.junit.Assert.*;
+
+public class TestRightOuterMergeJoinExec {
+  private TajoConf conf;
+  private final String TEST_PATH = TajoTestingCluster.DEFAULT_TEST_DIRECTORY + "/TestRightOuterMergeJoinExec";
+  private TajoTestingCluster util;
+  private CatalogService catalog;
+  private SQLAnalyzer analyzer;
+  private LogicalPlanner planner;
+  private Path testDir;
+  private QueryContext defaultContext;
+
+
+  private TableDesc dep3;
+  private TableDesc dep4;
+  private TableDesc job3;
+  private TableDesc emp3;
+  private TableDesc phone3;
+
+  private final String DEP3_NAME = CatalogUtil.buildFQName(DEFAULT_DATABASE_NAME, "dep3");
+  private final String DEP4_NAME = CatalogUtil.buildFQName(DEFAULT_DATABASE_NAME, "dep4");
+  private final String JOB3_NAME = CatalogUtil.buildFQName(DEFAULT_DATABASE_NAME, "job3");
+  private final String EMP3_NAME = CatalogUtil.buildFQName(DEFAULT_DATABASE_NAME, "emp3");
+  private final String PHONE3_NAME = CatalogUtil.buildFQName(DEFAULT_DATABASE_NAME, "phone3");
+
+  @Before
+  public void setUp() throws Exception {
+    util = new TajoTestingCluster();
+    util.initTestDir();
+    catalog = util.startCatalogCluster().getCatalog();
+    testDir = CommonTestingUtil.getTestDir(TEST_PATH);
+    catalog.createTablespace(DEFAULT_TABLESPACE_NAME, testDir.toUri().toString());
+    catalog.createDatabase(DEFAULT_DATABASE_NAME, DEFAULT_TABLESPACE_NAME);
+    conf = util.getConfiguration();
+
+    //----------------- dep3 ------------------------------
+    // dep_id | dep_name  | loc_id
+    //--------------------------------
+    //  0     | dep_0     | 1000
+    //  1     | dep_1     | 1001
+    //  2     | dep_2     | 1002
+    //  3     | dep_3     | 1003
+    //  4     | dep_4     | 1004
+    //  5     | dep_5     | 1005
+    //  6     | dep_6     | 1006
+    //  7     | dep_7     | 1007
+    //  8     | dep_8     | 1008
+    //  9     | dep_9     | 1009
+    Schema dep3Schema = new Schema();
+    dep3Schema.addColumn("dep_id", Type.INT4);
+    dep3Schema.addColumn("dep_name", Type.TEXT);
+    dep3Schema.addColumn("loc_id", Type.INT4);
+
+
+    TableMeta dep3Meta = CatalogUtil.newTableMeta("TEXT");
+    Path dep3Path = new Path(testDir, "dep3.csv");
+    Appender appender1 = ((FileTablespace) TablespaceManager.getLocalFs())
+        .getAppender(dep3Meta, dep3Schema, dep3Path);
+    appender1.init();
+    VTuple tuple = new VTuple(dep3Schema.size());
+    for (int i = 0; i < 10; i++) {
+      tuple.put(new Datum[] { DatumFactory.createInt4(i),
+          DatumFactory.createText("dept_" + i),
+          DatumFactory.createInt4(1000 + i) });
+      appender1.addTuple(tuple);
+    }
+
+    appender1.flush();
+    appender1.close();
+    dep3 = CatalogUtil.newTableDesc(DEP3_NAME, dep3Schema, dep3Meta, dep3Path);
+    catalog.createTable(dep3);
+
+
+    //----------------- dep4 ------------------------------
+    // dep_id | dep_name  | loc_id
+    //--------------------------------
+    //  0     | dep_0     | 1000
+    //  1     | dep_1     | 1001
+    //  2     | dep_2     | 1002
+    //  3     | dep_3     | 1003
+    //  4     | dep_4     | 1004
+    //  5     | dep_5     | 1005
+    //  6     | dep_6     | 1006
+    //  7     | dep_7     | 1007
+    //  8     | dep_8     | 1008
+    //  9     | dep_9     | 1009
+    // 10     | dep_10    | 1010
+    Schema dep4Schema = new Schema();
+    dep4Schema.addColumn("dep_id", Type.INT4);
+    dep4Schema.addColumn("dep_name", Type.TEXT);
+    dep4Schema.addColumn("loc_id", Type.INT4);
+
+
+    TableMeta dep4Meta = CatalogUtil.newTableMeta("TEXT");
+    Path dep4Path = new Path(testDir, "dep4.csv");
+    Appender appender4 = ((FileTablespace) TablespaceManager.getLocalFs())
+        .getAppender(dep4Meta, dep4Schema, dep4Path);
+    appender4.init();
+    VTuple tuple4 = new VTuple(dep4Schema.size());
+    for (int i = 0; i < 11; i++) {
+      tuple4.put(new Datum[] { DatumFactory.createInt4(i),
+          DatumFactory.createText("dept_" + i),
+          DatumFactory.createInt4(1000 + i) });
+      appender4.addTuple(tuple4);
+    }
+
+    appender4.flush();
+    appender4.close();
+    dep4 = CatalogUtil.newTableDesc(DEP4_NAME, dep4Schema, dep4Meta, dep4Path);
+    catalog.createTable(dep4);
+
+
+
+    //----------------- job3 ------------------------------
+    //  job_id  | job_title
+    // ----------------------
+    //   101    |  job_101
+    //   102    |  job_102
+    //   103    |  job_103
+
+    Schema job3Schema = new Schema();
+    job3Schema.addColumn("job_id", Type.INT4);
+    job3Schema.addColumn("job_title", Type.TEXT);
+
+
+    TableMeta job3Meta = CatalogUtil.newTableMeta("TEXT");
+    Path job3Path = new Path(testDir, "job3.csv");
+    Appender appender2 = ((FileTablespace) TablespaceManager.getLocalFs())
+        .getAppender(job3Meta, job3Schema, job3Path);
+    appender2.init();
+    VTuple tuple2 = new VTuple(job3Schema.size());
+    for (int i = 1; i < 4; i++) {
+      int x = 100 + i;
+      tuple2.put(new Datum[] { DatumFactory.createInt4(100 + i),
+          DatumFactory.createText("job_" + x) });
+      appender2.addTuple(tuple2);
+    }
+
+    appender2.flush();
+    appender2.close();
+    job3 = CatalogUtil.newTableDesc(JOB3_NAME, job3Schema, job3Meta, job3Path);
+    catalog.createTable(job3);
+
+
+
+    //---------------------emp3 --------------------
+    // emp_id  | first_name | last_name | dep_id | salary | job_id
+    // ------------------------------------------------------------
+    //  11     |  fn_11     |  ln_11    |  1     | 123    | 101
+    //  13     |  fn_13     |  ln_13    |  3     | 369    | 103
+    //  15     |  fn_15     |  ln_15    |  5     | 615    | null
+    //  17     |  fn_17     |  ln_17    |  7     | 861    | null
+    //  19     |  fn_19     |  ln_19    |  9     | 1107   | null
+    //  21     |  fn_21     |  ln_21    |  1     | 123    | 101
+    //  23     |  fn_23     |  ln_23    |  3     | 369    | 103
+
+    Schema emp3Schema = new Schema();
+    emp3Schema.addColumn("emp_id", Type.INT4);
+    emp3Schema.addColumn("first_name", Type.TEXT);
+    emp3Schema.addColumn("last_name", Type.TEXT);
+    emp3Schema.addColumn("dep_id", Type.INT4);
+    emp3Schema.addColumn("salary", Type.FLOAT4);
+    emp3Schema.addColumn("job_id", Type.INT4);
+
+
+    TableMeta emp3Meta = CatalogUtil.newTableMeta("TEXT");
+    Path emp3Path = new Path(testDir, "emp3.csv");
+    Appender appender3 = ((FileTablespace) TablespaceManager.getLocalFs())
+        .getAppender(emp3Meta, emp3Schema, emp3Path);
+    appender3.init();
+    VTuple tuple3 = new VTuple(emp3Schema.size());
+
+    for (int i = 1; i < 4; i += 2) {
+      int x = 10 + i;
+      tuple3.put(new Datum[] { DatumFactory.createInt4(10 + i),
+          DatumFactory.createText("firstname_" + x),
+          DatumFactory.createText("lastname_" + x),
+          DatumFactory.createInt4(i),
+          DatumFactory.createFloat4(123 * i),
+          DatumFactory.createInt4(100 + i) });
+      appender3.addTuple(tuple3);
+
+      int y = 20 + i;
+      tuple3.put(new Datum[] { DatumFactory.createInt4(20 + i),
+          DatumFactory.createText("firstname_" + y),
+          DatumFactory.createText("lastname_" + y),
+          DatumFactory.createInt4(i),
+          DatumFactory.createFloat4(123 * i),
+          DatumFactory.createInt4(100 + i) });
+      appender3.addTuple(tuple3);
+    }
+
+    for (int i = 5; i < 10; i += 2) {
+      int x = 10 + i;
+      tuple3.put(new Datum[] { DatumFactory.createInt4(10 + i),
+          DatumFactory.createText("firstname_" + x),
+          DatumFactory.createText("lastname_" + x),
+          DatumFactory.createInt4(i),
+          DatumFactory.createFloat4(123 * i),
+          DatumFactory.createNullDatum() });
+      appender3.addTuple(tuple3);
+    }
+
+    appender3.flush();
+    appender3.close();
+    emp3 = CatalogUtil.newTableDesc(EMP3_NAME, emp3Schema, emp3Meta, emp3Path);
+    catalog.createTable(emp3);
+
+    //---------------------phone3 --------------------
+    // emp_id  | phone_number
+    // -----------------------------------------------
+    // this table is empty, no rows
+
+    Schema phone3Schema = new Schema();
+    phone3Schema.addColumn("emp_id", Type.INT4);
+    phone3Schema.addColumn("phone_number", Type.TEXT);
+
+
+    TableMeta phone3Meta = CatalogUtil.newTableMeta("TEXT");
+    Path phone3Path = new Path(testDir, "phone3.csv");
+    Appender appender5 = ((FileTablespace) TablespaceManager.getLocalFs())
+        .getAppender(phone3Meta, phone3Schema, phone3Path);
+    appender5.init();
+
+    appender5.flush();
+    appender5.close();
+    phone3 = CatalogUtil.newTableDesc(PHONE3_NAME, phone3Schema, phone3Meta, phone3Path);
+    catalog.createTable(phone3);
+
+    analyzer = new SQLAnalyzer();
+    planner = new LogicalPlanner(catalog, TablespaceManager.getInstance());
+
+    defaultContext = LocalTajoTestingUtility.createDummyContext(conf);
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    util.shutdownCatalogCluster();
+  }
+
+  String[] QUERIES = {
+      // [0] no nulls
+      "select dep3.dep_id, dep_name, emp_id, salary from emp3 right outer join dep3 on dep3.dep_id = emp3.dep_id",
+      // [1] nulls on the left operand
+      "select job3.job_id, job_title, emp_id, salary from emp3 right outer join job3 on job3.job_id=emp3.job_id",
+      // [2] nulls on the right side
+      "select job3.job_id, job_title, emp_id, salary from job3 right outer join emp3 on job3.job_id=emp3.job_id",
+      // [3] no nulls, right continues after left
+      "select dep4.dep_id, dep_name, emp_id, salary from emp3 right outer join dep4 on dep4.dep_id = emp3.dep_id",
+      // [4] one operand is empty
+      "select emp3.emp_id, first_name, phone_number from emp3 right outer join phone3 on emp3.emp_id = phone3.emp_id",
+      // [5] one operand is empty
+      "select phone_number, emp3.emp_id, first_name from phone3 right outer join emp3 on emp3.emp_id = phone3.emp_id"
+  };
+
+  @Test
+  public final void testRightOuterMergeJoin0() throws IOException, TajoException {
+    Expr expr = analyzer.parse(QUERIES[0]);
+    LogicalNode plan = planner.createPlan(defaultContext, expr).getRootBlock().getRoot();
+    JoinNode joinNode = PlannerUtil.findTopNode(plan, NodeType.JOIN);
+    Enforcer enforcer = new Enforcer();
+    enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.MERGE_JOIN);
+
+    FileFragment[] emp3Frags = FileTablespace.splitNG(conf, EMP3_NAME, emp3.getMeta(), new Path(emp3.getUri()),
+        Integer.MAX_VALUE);
+    FileFragment[] dep3Frags = FileTablespace.splitNG(conf, DEP3_NAME, dep3.getMeta(), new Path(dep3.getUri()),
+        Integer.MAX_VALUE);
+    FileFragment[] merged = TUtil.concat(emp3Frags, dep3Frags);
+
+    Path workDir = CommonTestingUtil.getTestDir(TajoTestingCluster.DEFAULT_TEST_DIRECTORY + "/testRightOuterMergeJoin0");
+    TaskAttemptContext ctx = new TaskAttemptContext(new QueryContext(conf),
+        LocalTajoTestingUtility.newTaskAttemptId(), merged, workDir);
+    ctx.setEnforcer(enforcer);
+
+    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf);
+    PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
+
+    ProjectionExec proj = (ProjectionExec) exec;
+    assertTrue(proj.getChild() instanceof RightOuterMergeJoinExec);
+
+    int count = 0;
+    exec.init();
+    while (exec.next() != null) {
+      //TODO check contents
+      count = count + 1;
+    }
+    assertNull(exec.next());
+    exec.close();
+    assertEquals(12, count);
+  }
+
+
+  @Test
+  public final void testRightOuter_MergeJoin1() throws IOException, TajoException {
+    Expr expr = analyzer.parse(QUERIES[1]);
+    LogicalNode plan = planner.createPlan(defaultContext, expr).getRootBlock().getRoot();
+    JoinNode joinNode = PlannerUtil.findTopNode(plan, NodeType.JOIN);
+    Enforcer enforcer = new Enforcer();
+    enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.MERGE_JOIN);
+
+    FileFragment[] emp3Frags =
+        FileTablespace.splitNG(conf, EMP3_NAME, emp3.getMeta(), new Path(emp3.getUri()), Integer.MAX_VALUE);
+    FileFragment[] job3Frags =
+        FileTablespace.splitNG(conf, JOB3_NAME, job3.getMeta(), new Path(job3.getUri()), Integer.MAX_VALUE);
+    FileFragment[] merged = TUtil.concat(job3Frags, emp3Frags);
+
+    Path workDir = CommonTestingUtil.getTestDir(TajoTestingCluster.DEFAULT_TEST_DIRECTORY + "/testRightOuterMergeJoin1");
+    TaskAttemptContext ctx = new TaskAttemptContext(new QueryContext(conf),
+        LocalTajoTestingUtility.newTaskAttemptId(), merged, workDir);
+    ctx.setEnforcer(enforcer);
+
+    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf);
+    PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
+
+    ProjectionExec proj = (ProjectionExec) exec;
+    assertTrue(proj.getChild() instanceof RightOuterMergeJoinExec);
+
+    int count = 0;
+    exec.init();
+    while (exec.next() != null) {
+      //TODO check contents
+      count = count + 1;
+    }
+    assertNull(exec.next());
+    exec.close();
+    assertEquals(5, count);
+  }
+
+  @Test
+  public final void testRightOuterMergeJoin2() throws IOException, TajoException {
+    Expr expr = analyzer.parse(QUERIES[2]);
+    LogicalNode plan = planner.createPlan(defaultContext, expr).getRootBlock().getRoot();
+    JoinNode joinNode = PlannerUtil.findTopNode(plan, NodeType.JOIN);
+    Enforcer enforcer = new Enforcer();
+    enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.MERGE_JOIN);
+
+    FileFragment[] emp3Frags =
+        FileTablespace.splitNG(conf, EMP3_NAME, emp3.getMeta(), new Path(emp3.getUri()), Integer.MAX_VALUE);
+    FileFragment[] job3Frags =
+        FileTablespace.splitNG(conf, JOB3_NAME, job3.getMeta(), new Path(job3.getUri()), Integer.MAX_VALUE);
+    FileFragment[] merged = TUtil.concat(job3Frags, emp3Frags);
+
+    Path workDir = CommonTestingUtil.getTestDir(TajoTestingCluster.DEFAULT_TEST_DIRECTORY + "/testRightOuterMergeJoin2");
+    TaskAttemptContext ctx = new TaskAttemptContext(new QueryContext(conf),
+        LocalTajoTestingUtility.newTaskAttemptId(), merged, workDir);
+    ctx.setEnforcer(enforcer);
+
+    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf);
+    PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
+    ProjectionExec proj = (ProjectionExec) exec;
+    assertTrue(proj.getChild() instanceof RightOuterMergeJoinExec);
+
+    int count = 0;
+    exec.init();
+    while (exec.next() != null) {
+      //TODO check contents
+      count = count + 1;
+    }
+    assertNull(exec.next());
+    exec.close();
+    assertEquals(7, count);
+  }
+
+
+  @Test
+  public final void testRightOuter_MergeJoin3() throws IOException, TajoException {
+    Expr expr = analyzer.parse(QUERIES[3]);
+    LogicalNode plan = planner.createPlan(defaultContext, expr).getRootBlock().getRoot();
+    JoinNode joinNode = PlannerUtil.findTopNode(plan, NodeType.JOIN);
+    Enforcer enforcer = new Enforcer();
+    enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.MERGE_JOIN);
+
+    FileFragment[] emp3Frags =
+        FileTablespace.splitNG(conf, EMP3_NAME, emp3.getMeta(), new Path(emp3.getUri()), Integer.MAX_VALUE);
+    FileFragment[] dep4Frags =
+        FileTablespace.splitNG(conf, DEP4_NAME, dep4.getMeta(), new Path(dep4.getUri()), Integer.MAX_VALUE);
+    FileFragment[] merged = TUtil.concat(emp3Frags, dep4Frags);
+
+    Path workDir = CommonTestingUtil.getTestDir(TajoTestingCluster.DEFAULT_TEST_DIRECTORY + "/testRightOuter_MergeJoin3");
+    TaskAttemptContext ctx = new TaskAttemptContext(new QueryContext(conf),
+        LocalTajoTestingUtility.newTaskAttemptId(), merged, workDir);
+    ctx.setEnforcer(enforcer);
+
+    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf);
+    PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
+
+    ProjectionExec proj = (ProjectionExec) exec;
+    assertTrue(proj.getChild() instanceof RightOuterMergeJoinExec);
+
+    int count = 0;
+    exec.init();
+
+    while (exec.next() != null) {
+      //TODO check contents
+      count = count + 1;
+    }
+    assertNull(exec.next());
+    exec.close();
+    assertEquals(13, count);
+  }
+
+  @Test
+  public final void testRightOuter_MergeJoin4() throws IOException, TajoException {
+    Expr expr = analyzer.parse(QUERIES[4]);
+    LogicalNode plan = planner.createPlan(defaultContext, expr).getRootBlock().getRoot();
+    JoinNode joinNode = PlannerUtil.findTopNode(plan, NodeType.JOIN);
+    Enforcer enforcer = new Enforcer();
+    enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.MERGE_JOIN);
+
+    FileFragment[] emp3Frags =
+        FileTablespace.splitNG(conf, EMP3_NAME, emp3.getMeta(), new Path(emp3.getUri()), Integer.MAX_VALUE);
+    FileFragment[] phone3Frags =
+        FileTablespace.splitNG(conf, PHONE3_NAME, phone3.getMeta(), new Path(phone3.getUri()),
+            Integer.MAX_VALUE);
+    FileFragment[] merged = TUtil.concat(emp3Frags, phone3Frags);
+
+    Path workDir = CommonTestingUtil.getTestDir(TajoTestingCluster.DEFAULT_TEST_DIRECTORY + "/testRightOuter_MergeJoin4");
+    TaskAttemptContext ctx = new TaskAttemptContext(new QueryContext(conf),
+        LocalTajoTestingUtility.newTaskAttemptId(), merged, workDir);
+    ctx.setEnforcer(enforcer);
+
+    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf);
+    PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
+    ProjectionExec proj = (ProjectionExec) exec;
+    assertTrue(proj.getChild() instanceof RightOuterMergeJoinExec);
+
+    int count = 0;
+    exec.init();
+
+    while (exec.next() != null) {
+      //TODO check contents
+      count = count + 1;
+    }
+    assertNull(exec.next());
+    exec.close();
+    assertEquals(0, count);
+  }
+
+  @Test
+  public final void testRightOuterMergeJoin5() throws IOException, TajoException {
+    Expr expr = analyzer.parse(QUERIES[5]);
+    LogicalNode plan = planner.createPlan(defaultContext, expr).getRootBlock().getRoot();
+    JoinNode joinNode = PlannerUtil.findTopNode(plan, NodeType.JOIN);
+    Enforcer enforcer = new Enforcer();
+    enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.MERGE_JOIN);
+
+    FileFragment[] emp3Frags =
+        FileTablespace.splitNG(conf, EMP3_NAME, emp3.getMeta(), new Path(emp3.getUri()), Integer.MAX_VALUE);
+    FileFragment[] phone3Frags = FileTablespace.splitNG(conf, PHONE3_NAME, phone3.getMeta(), new Path(phone3.getUri()),
+        Integer.MAX_VALUE);
+    FileFragment[] merged = TUtil.concat(phone3Frags,emp3Frags);
+
+
+    Path workDir = CommonTestingUtil.getTestDir(TajoTestingCluster.DEFAULT_TEST_DIRECTORY + "/testRightOuterMergeJoin5");
+    TaskAttemptContext ctx = new TaskAttemptContext(new QueryContext(conf),
+        LocalTajoTestingUtility.newTaskAttemptId(), merged, workDir);
+    ctx.setEnforcer(enforcer);
+
+    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf);
+    PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
+    ProjectionExec proj = (ProjectionExec) exec;
+    assertTrue(proj.getChild() instanceof RightOuterMergeJoinExec);
+
+    int count = 0;
+    exec.init();
+
+    while (exec.next() != null) {
+      //TODO check contents
+      count = count + 1;
+    }
+    exec.close();
+    assertEquals(7, count);
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/a4106883/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestSortExec.java
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestSortExec.java b/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestSortExec.java
new file mode 100644
index 0000000..81cb945
--- /dev/null
+++ b/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestSortExec.java
@@ -0,0 +1,179 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.physical;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.LocalTajoTestingUtility;
+import org.apache.tajo.TajoConstants;
+import org.apache.tajo.TajoTestingCluster;
+import org.apache.tajo.TpchTestBase;
+import org.apache.tajo.algebra.Expr;
+import org.apache.tajo.catalog.*;
+import org.apache.tajo.common.TajoDataTypes.Type;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.datum.DatumFactory;
+import org.apache.tajo.engine.parser.SQLAnalyzer;
+import org.apache.tajo.engine.planner.PhysicalPlanner;
+import org.apache.tajo.engine.planner.PhysicalPlannerImpl;
+import org.apache.tajo.engine.planner.RangePartitionAlgorithm;
+import org.apache.tajo.engine.planner.UniformRangePartition;
+import org.apache.tajo.engine.planner.enforce.Enforcer;
+import org.apache.tajo.engine.query.QueryContext;
+import org.apache.tajo.exception.TajoException;
+import org.apache.tajo.plan.LogicalOptimizer;
+import org.apache.tajo.plan.LogicalPlan;
+import org.apache.tajo.plan.LogicalPlanner;
+import org.apache.tajo.plan.logical.LogicalNode;
+import org.apache.tajo.plan.util.PlannerUtil;
+import org.apache.tajo.storage.*;
+import org.apache.tajo.storage.fragment.FileFragment;
+import org.apache.tajo.util.CommonTestingUtil;
+import org.apache.tajo.worker.TaskAttemptContext;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Random;
+
+import static org.junit.Assert.assertTrue;
+
+public class TestSortExec {
+  private static TajoConf conf;
+  private static final String TEST_PATH = TajoTestingCluster.DEFAULT_TEST_DIRECTORY + "/TestPhysicalPlanner";
+  private static TajoTestingCluster util;
+  private static CatalogService catalog;
+  private static SQLAnalyzer analyzer;
+  private static LogicalPlanner planner;
+  private static LogicalOptimizer optimizer;
+  private static FileTablespace sm;
+  private static Path workDir;
+  private static Path tablePath;
+  private static TableMeta employeeMeta;
+  private static QueryContext queryContext;
+
+  private static Random rnd = new Random(System.currentTimeMillis());
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    conf = new TajoConf();
+    conf.set(CommonTestingUtil.TAJO_TEST_KEY, CommonTestingUtil.TAJO_TEST_TRUE);
+    util = TpchTestBase.getInstance().getTestingCluster();
+    catalog = util.getMaster().getCatalog();
+    workDir = CommonTestingUtil.getTestDir(TEST_PATH);
+    sm = TablespaceManager.getLocalFs();
+
+    Schema schema = new Schema();
+    schema.addColumn("managerid", Type.INT4);
+    schema.addColumn("empid", Type.INT4);
+    schema.addColumn("deptname", Type.TEXT);
+
+    employeeMeta = CatalogUtil.newTableMeta("TEXT");
+
+    tablePath = StorageUtil.concatPath(workDir, "employee", "table1");
+    sm.getFileSystem().mkdirs(tablePath.getParent());
+
+    Appender appender = ((FileTablespace) TablespaceManager.getLocalFs())
+        .getAppender(employeeMeta, schema, tablePath);
+    appender.init();
+    VTuple tuple = new VTuple(schema.size());
+    for (int i = 0; i < 100; i++) {
+      tuple.put(new Datum[] {
+          DatumFactory.createInt4(rnd.nextInt(5)),
+          DatumFactory.createInt4(rnd.nextInt(10)),
+          DatumFactory.createText("dept_" + rnd.nextInt(10))});
+      appender.addTuple(tuple);
+    }
+    appender.flush();
+    appender.close();
+
+    TableDesc desc = new TableDesc(
+        CatalogUtil.buildFQName(TajoConstants.DEFAULT_DATABASE_NAME, "employee"), schema, employeeMeta,
+        tablePath.toUri());
+    catalog.createTable(desc);
+
+    queryContext = new QueryContext(conf);
+    analyzer = new SQLAnalyzer();
+    planner = new LogicalPlanner(catalog, TablespaceManager.getInstance());
+    optimizer = new LogicalOptimizer(conf, catalog);
+  }
+
+  public static String[] QUERIES = {
+      "select managerId, empId, deptName from employee order by managerId, empId desc" };
+
+  @Test
+  public final void testNext() throws IOException, TajoException {
+    FileFragment[] frags = FileTablespace.splitNG(conf, "default.employee", employeeMeta, tablePath, Integer.MAX_VALUE);
+    Path workDir = CommonTestingUtil.getTestDir(TajoTestingCluster.DEFAULT_TEST_DIRECTORY + "/TestSortExec");
+    TaskAttemptContext ctx = new TaskAttemptContext(queryContext,
+        LocalTajoTestingUtility
+        .newTaskAttemptId(), new FileFragment[] { frags[0] }, workDir);
+    ctx.setEnforcer(new Enforcer());
+    Expr context = analyzer.parse(QUERIES[0]);
+    LogicalPlan plan = planner.createPlan(LocalTajoTestingUtility.createDummyContext(conf), context);
+    LogicalNode rootNode = optimizer.optimize(plan);
+
+    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf);
+    PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode);
+
+    Tuple tuple;
+    Datum preVal = null;
+    Datum curVal;
+    exec.init();
+    while ((tuple = exec.next()) != null) {
+      curVal = tuple.asDatum(0);
+      if (preVal != null) {
+        assertTrue(preVal.lessThanEqual(curVal).asBool());
+      }
+
+      preVal = curVal;
+    }
+    exec.close();
+  }
+
+  @Test
+  /**
+   * TODO - Now, in FSM branch, TestUniformRangePartition is ported to Java.
+   * So, this test is located in here.
+   * Later it should be moved TestUniformPartitions.
+   */
+  public void testTAJO_946() {
+    Schema schema = new Schema();
+    schema.addColumn("l_orderkey", Type.INT8);
+    SortSpec [] sortSpecs = PlannerUtil.schemaToSortSpecs(schema);
+
+    VTuple s = new VTuple(1);
+    s.put(0, DatumFactory.createInt8(0));
+    VTuple e = new VTuple(1);
+    e.put(0, DatumFactory.createInt8(6000000000l));
+    TupleRange expected = new TupleRange(sortSpecs, s, e);
+    RangePartitionAlgorithm partitioner
+        = new UniformRangePartition(expected, sortSpecs, true);
+    TupleRange [] ranges = partitioner.partition(967);
+
+    TupleRange prev = null;
+    for (TupleRange r : ranges) {
+      if (prev == null) {
+        prev = r;
+      } else {
+        assertTrue(prev.compareTo(r) < 0);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/a4106883/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestSortIntersectExec.java
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestSortIntersectExec.java b/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestSortIntersectExec.java
new file mode 100644
index 0000000..7289472
--- /dev/null
+++ b/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestSortIntersectExec.java
@@ -0,0 +1,293 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.physical;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.LocalTajoTestingUtility;
+import org.apache.tajo.TajoTestingCluster;
+import org.apache.tajo.algebra.Expr;
+import org.apache.tajo.catalog.*;
+import org.apache.tajo.common.TajoDataTypes;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.datum.DatumFactory;
+import org.apache.tajo.engine.parser.SQLAnalyzer;
+import org.apache.tajo.engine.planner.PhysicalPlanner;
+import org.apache.tajo.engine.planner.PhysicalPlannerImpl;
+import org.apache.tajo.engine.planner.enforce.Enforcer;
+import org.apache.tajo.engine.query.QueryContext;
+import org.apache.tajo.exception.TajoException;
+import org.apache.tajo.plan.LogicalOptimizer;
+import org.apache.tajo.plan.LogicalPlan;
+import org.apache.tajo.plan.LogicalPlanner;
+import org.apache.tajo.plan.logical.LogicalNode;
+import org.apache.tajo.plan.logical.SortNode;
+import org.apache.tajo.plan.util.PlannerUtil;
+import org.apache.tajo.storage.*;
+import org.apache.tajo.storage.fragment.FileFragment;
+import org.apache.tajo.util.CommonTestingUtil;
+import org.apache.tajo.util.TUtil;
+import org.apache.tajo.worker.TaskAttemptContext;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static org.apache.tajo.TajoConstants.DEFAULT_DATABASE_NAME;
+import static org.apache.tajo.TajoConstants.DEFAULT_TABLESPACE_NAME;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class TestSortIntersectExec {
+  private TajoConf conf;
+  private final String TEST_PATH = TajoTestingCluster.DEFAULT_TEST_DIRECTORY + "/TestSortIntersectExec";
+  private TajoTestingCluster util;
+  private CatalogService catalog;
+  private SQLAnalyzer analyzer;
+  private LogicalPlanner planner;
+  private LogicalOptimizer optimizer;
+  private Path testDir;
+
+  private TableDesc employee1;
+  private TableDesc employee2;
+
+  private int[] leftNum = new int[] {1, 2, 3, 3, 9, 9, 3, 0, 3};
+  private int[] rightNum = new int[] {3, 7, 3, 5};
+  private int[] answerAllNum = new int[] {3, 3}; // this should be set as leftNum intersect all rightNum + order by
+  private int[] answerDistinctNum = new int[] {3}; // this should be set as leftNum intersect rightNum + order by
+
+  @Before
+  public void setUp() throws Exception {
+    util = new TajoTestingCluster();
+    util.initTestDir();
+    catalog = util.startCatalogCluster().getCatalog();
+    testDir = CommonTestingUtil.getTestDir(TEST_PATH);
+    catalog.createTablespace(DEFAULT_TABLESPACE_NAME, testDir.toUri().toString());
+    catalog.createDatabase(DEFAULT_DATABASE_NAME, DEFAULT_TABLESPACE_NAME);
+    conf = util.getConfiguration();
+
+    Schema employeeSchema1 = new Schema();
+    employeeSchema1.addColumn("managerid", TajoDataTypes.Type.INT4);
+    employeeSchema1.addColumn("empid", TajoDataTypes.Type.INT4);
+    employeeSchema1.addColumn("memid", TajoDataTypes.Type.INT4);
+    employeeSchema1.addColumn("deptname", TajoDataTypes.Type.TEXT);
+
+    TableMeta employeeMeta1 = CatalogUtil.newTableMeta("TEXT");
+    Path employeePath1 = new Path(testDir, "employee1.csv");
+    Appender appender = ((FileTablespace) TablespaceManager.getLocalFs()).
+        getAppender(employeeMeta1, employeeSchema1, employeePath1);
+    appender.init();
+    Tuple tuple = new VTuple(employeeSchema1.size());
+
+    for (int i : leftNum) {
+      tuple.put(new Datum[] {
+          DatumFactory.createInt4(i),
+          DatumFactory.createInt4(i), // empid [0-8]
+          DatumFactory.createInt4(10 + i),
+          DatumFactory.createText("dept_" + i) });
+      appender.addTuple(tuple);
+    }
+
+    appender.flush();
+    appender.close();
+    employee1 = CatalogUtil.newTableDesc("default.employee1", employeeSchema1, employeeMeta1, employeePath1);
+    catalog.createTable(employee1);
+
+    Schema employeeSchema2 = new Schema();
+    employeeSchema2.addColumn("managerid", TajoDataTypes.Type.INT4);
+    employeeSchema2.addColumn("empid", TajoDataTypes.Type.INT4);
+    employeeSchema2.addColumn("memid", TajoDataTypes.Type.INT4);
+    employeeSchema2.addColumn("deptname", TajoDataTypes.Type.TEXT);
+
+    TableMeta employeeMeta2 = CatalogUtil.newTableMeta("TEXT");
+    Path employeePath2 = new Path(testDir, "employee2.csv");
+    Appender appender2 = ((FileTablespace) TablespaceManager.getLocalFs()).
+        getAppender(employeeMeta2, employeeSchema2, employeePath2);
+    appender2.init();
+    Tuple tuple2 = new VTuple(employeeSchema2.size());
+
+    for (int i : rightNum) {
+      tuple2.put(new Datum[]{
+          DatumFactory.createInt4(i),
+          DatumFactory.createInt4(i), // empid [1-9]
+          DatumFactory.createInt4(10 + i),
+          DatumFactory.createText("dept_" + i)});
+      appender2.addTuple(tuple2);
+    }
+
+    appender2.flush();
+    appender2.close();
+    employee2 = CatalogUtil.newTableDesc("default.employee2", employeeSchema2, employeeMeta2, employeePath2);
+    catalog.createTable(employee2);
+
+
+    analyzer = new SQLAnalyzer();
+    planner = new LogicalPlanner(catalog, TablespaceManager.getInstance());
+    optimizer = new LogicalOptimizer(conf, catalog);
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    util.shutdownCatalogCluster();
+  }
+
+
+  // relation descriptions
+  // employee1 (managerid, empid, memid, deptname)
+  // employee2 (managerid, empid, memid, deptname)
+
+  String[] QUERIES = {
+      "select * from employee1 as e1, employee2 as e2 where e1.empId = e2.empId"
+  };
+
+  @Test
+  public final void testSortIntersectAll() throws IOException, TajoException {
+    FileFragment[] empFrags1 = ((FileTablespace) TablespaceManager.getLocalFs()).
+        splitNG(conf, "default.e1", employee1.getMeta(),
+            new Path(employee1.getUri()), Integer.MAX_VALUE);
+    FileFragment[] empFrags2 = ((FileTablespace) TablespaceManager.getLocalFs())
+        .splitNG(conf, "default.e2", employee2.getMeta(),
+            new Path(employee2.getUri()), Integer.MAX_VALUE);
+
+    FileFragment[] merged = TUtil.concat(empFrags1, empFrags2);
+
+    Path workDir = CommonTestingUtil.getTestDir(TajoTestingCluster.DEFAULT_TEST_DIRECTORY + "/testSortIntersectAll");
+    TaskAttemptContext ctx = new TaskAttemptContext(new QueryContext(conf),
+        LocalTajoTestingUtility.newTaskAttemptId(), merged, workDir);
+    ctx.setEnforcer(new Enforcer());
+    Expr expr = analyzer.parse(QUERIES[0]);
+    LogicalPlan plan = planner.createPlan(LocalTajoTestingUtility.createDummyContext(conf), expr);
+    optimizer.optimize(plan);
+    LogicalNode rootNode = plan.getRootBlock().getRoot();
+
+    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf);
+    PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode);
+
+    // replace an equal join with sort intersect all .
+    if (exec instanceof MergeJoinExec) {
+      MergeJoinExec join = (MergeJoinExec) exec;
+      exec = new SortIntersectExec(ctx, join.getLeftChild(), join.getRightChild(), false);
+    } else if (exec instanceof HashJoinExec) {
+      // we need to sort the results from both left and right children
+      HashJoinExec join = (HashJoinExec) exec;
+      SortSpec[] sortSpecsLeft = PlannerUtil.schemaToSortSpecs(join.getLeftChild().getSchema());
+      SortSpec[] sortSpecsRight = PlannerUtil.schemaToSortSpecs(join.getRightChild().getSchema());
+
+      SortNode leftSortNode = LogicalPlan.createNodeWithoutPID(SortNode.class);
+      leftSortNode.setSortSpecs(sortSpecsLeft);
+      leftSortNode.setInSchema(join.getLeftChild().getSchema());
+      leftSortNode.setOutSchema(join.getLeftChild().getSchema());
+      ExternalSortExec leftSort = new ExternalSortExec(ctx, leftSortNode, join.getLeftChild());
+
+      SortNode rightSortNode = LogicalPlan.createNodeWithoutPID(SortNode.class);
+      rightSortNode.setSortSpecs(sortSpecsRight);
+      rightSortNode.setInSchema(join.getRightChild().getSchema());
+      rightSortNode.setOutSchema(join.getRightChild().getSchema());
+      ExternalSortExec rightSort = new ExternalSortExec(ctx, rightSortNode, join.getRightChild());
+
+      exec = new SortIntersectExec(ctx, leftSort, rightSort, false);
+    }
+
+    Tuple tuple;
+    int count = 0;
+    int i = 0;
+    exec.init();
+
+    while ((tuple = exec.next()) != null) {
+      count++;
+      int answer = answerAllNum[i];
+      assertTrue(answer == tuple.asDatum(0).asInt4());
+      assertTrue(answer == tuple.asDatum(1).asInt4());
+      assertTrue(10 + answer == tuple.asDatum(2).asInt4());
+      assertTrue(("dept_" + answer).equals(tuple.asDatum(3).asChars()));
+
+      i++;
+    }
+    exec.close();
+    assertEquals(answerAllNum.length , count);
+  }
+
+  @Test
+  public final void testSortIntersect() throws IOException, TajoException {
+    FileFragment[] empFrags1 = ((FileTablespace) TablespaceManager.getLocalFs())
+        .splitNG(conf, "default.e1", employee1.getMeta(),
+            new Path(employee1.getUri()), Integer.MAX_VALUE);
+    FileFragment[] empFrags2 = ((FileTablespace) TablespaceManager.getLocalFs())
+        .splitNG(conf, "default.e2", employee2.getMeta(),
+        new Path(employee2.getUri()), Integer.MAX_VALUE);
+
+    FileFragment[] merged = TUtil.concat(empFrags1, empFrags2);
+
+    Path workDir = CommonTestingUtil.getTestDir(TajoTestingCluster.DEFAULT_TEST_DIRECTORY + "/testSortIntersect");
+    TaskAttemptContext ctx = new TaskAttemptContext(new QueryContext(conf),
+        LocalTajoTestingUtility.newTaskAttemptId(), merged, workDir);
+    ctx.setEnforcer(new Enforcer());
+    Expr expr = analyzer.parse(QUERIES[0]);
+    LogicalPlan plan = planner.createPlan(LocalTajoTestingUtility.createDummyContext(conf), expr);
+    optimizer.optimize(plan);
+    LogicalNode rootNode = plan.getRootBlock().getRoot();
+
+    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf);
+    PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode);
+
+    // replace an equal join with sort intersect all .
+    if (exec instanceof MergeJoinExec) {
+      MergeJoinExec join = (MergeJoinExec) exec;
+      exec = new SortIntersectExec(ctx, join.getLeftChild(), join.getRightChild(), true);
+    } else if (exec instanceof HashJoinExec) {
+      // we need to sort the results from both left and right children
+      HashJoinExec join = (HashJoinExec) exec;
+      SortSpec[] sortSpecsLeft = PlannerUtil.schemaToSortSpecs(join.getLeftChild().getSchema());
+      SortSpec[] sortSpecsRight = PlannerUtil.schemaToSortSpecs(join.getRightChild().getSchema());
+
+      SortNode leftSortNode = LogicalPlan.createNodeWithoutPID(SortNode.class);
+      leftSortNode.setSortSpecs(sortSpecsLeft);
+      leftSortNode.setInSchema(join.getLeftChild().getSchema());
+      leftSortNode.setOutSchema(join.getLeftChild().getSchema());
+      ExternalSortExec leftSort = new ExternalSortExec(ctx, leftSortNode, join.getLeftChild());
+
+      SortNode rightSortNode = LogicalPlan.createNodeWithoutPID(SortNode.class);
+      rightSortNode.setSortSpecs(sortSpecsRight);
+      rightSortNode.setInSchema(join.getRightChild().getSchema());
+      rightSortNode.setOutSchema(join.getRightChild().getSchema());
+      ExternalSortExec rightSort = new ExternalSortExec(ctx, rightSortNode, join.getRightChild());
+
+      exec = new SortIntersectExec(ctx, leftSort, rightSort, true);
+    }
+
+    Tuple tuple;
+    int count = 0;
+    int i = 0;
+    exec.init();
+
+    while ((tuple = exec.next()) != null) {
+      count++;
+      int answer = answerDistinctNum[i];
+      assertTrue(answer == tuple.asDatum(0).asInt4());
+      assertTrue(answer == tuple.asDatum(1).asInt4());
+      assertTrue(10 + answer == tuple.asDatum(2).asInt4());
+      assertTrue(("dept_" + answer).equals(tuple.asDatum(3).asChars()));
+
+      i++;
+    }
+    exec.close();
+    assertEquals(answerDistinctNum.length , count);
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/a4106883/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestTupleSorter.java
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestTupleSorter.java b/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestTupleSorter.java
new file mode 100644
index 0000000..4061b25
--- /dev/null
+++ b/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestTupleSorter.java
@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.physical;
+
+import com.google.common.collect.Iterators;
+import com.google.common.primitives.Ints;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.SortSpec;
+import org.apache.tajo.common.TajoDataTypes.Type;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.datum.DatumFactory;
+import org.apache.tajo.storage.BaseTupleComparator;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.storage.VTuple;
+import org.junit.Test;
+
+import java.util.*;
+
+import static org.junit.Assert.assertArrayEquals;
+
+public class TestTupleSorter {
+
+  private static final Log LOG = LogFactory.getLog(TestTupleSorter.class);
+
+  private static final Random rnd = new Random(-1);
+
+  @Test
+  public final void testSortBench() {
+    final int MAX_SORT_KEY = 3;
+    final int ITERATION = 10;
+    final int LENGTH = 1000000;
+    final int SAMPLING = 100;
+
+    Tuple[] tuples = new Tuple[LENGTH];
+    for (int i = 0; i < LENGTH; i++) {
+      Datum[] datums = new Datum[]{
+          DatumFactory.createInt4(rnd.nextInt(Short.MAX_VALUE)),
+          DatumFactory.createInt4(rnd.nextInt()),
+          DatumFactory.createText("dept_" + rnd.nextInt()),
+          DatumFactory.createBool(rnd.nextBoolean()),
+          DatumFactory.createInt8(rnd.nextLong()),
+          DatumFactory.createInterval(rnd.nextInt(), rnd.nextLong())};
+      tuples[i] = new VTuple(datums);
+    }
+
+    Column col0 = new Column("col0", Type.INT2);
+    Column col1 = new Column("col1", Type.INT4);
+    Column col2 = new Column("col2", Type.TEXT);
+    Column col3 = new Column("col3", Type.BOOLEAN);
+    Column col4 = new Column("col4", Type.INT8);
+    Column col5 = new Column("col5", Type.INTERVAL);
+
+    Schema schema = new Schema(new Column[] {col0, col1, col2, col3, col4, col5});
+
+    long[] time1 = new long[ITERATION];
+    long[] time2 = new long[ITERATION];
+    for(int iteration = 0; iteration < ITERATION; iteration++) {
+      TupleList target = new TupleList(tuples.length);
+      target.addAll(Arrays.asList(Arrays.copyOf(tuples, tuples.length)));
+      Set<Integer> keys = new TreeSet<Integer>();
+      for (int i = 0; i < MAX_SORT_KEY; i++) {
+        keys.add(rnd.nextInt(schema.size()));
+      }
+      int[] keyIndices = Ints.toArray(keys);
+      SortSpec[] sortKeys = new SortSpec[keyIndices.length];
+      for (int i = 0; i < keyIndices.length; i++) {
+        sortKeys[i] = new SortSpec(schema.getColumn(keyIndices[i]), rnd.nextBoolean(), rnd.nextBoolean());
+      }
+
+      long start = System.currentTimeMillis();
+      VectorizedSorter sorter = new VectorizedSorter(target, sortKeys, keyIndices);
+      Iterator<Tuple> iterator = sorter.sort().iterator();
+
+      String[] result1 = new String[SAMPLING];
+      for (int i = 0; i < result1.length; i++) {
+        Tuple tuple = Iterators.get(iterator, LENGTH / result1.length - 1);
+        StringBuilder builder = new StringBuilder();
+        for (int keyIndex : keyIndices) {
+          builder.append(tuple.getText(keyIndex));
+        }
+        result1[i] = builder.toString();
+      }
+      time1[iteration] = System.currentTimeMillis() - start;
+
+      BaseTupleComparator comparator = new BaseTupleComparator(schema, sortKeys);
+
+      start = System.currentTimeMillis();
+      Collections.sort(target, comparator);
+      iterator = target.iterator();
+
+      String[] result2 = new String[SAMPLING];
+      for (int i = 0; i < result2.length; i++) {
+        Tuple tuple = Iterators.get(iterator, LENGTH / result2.length - 1);
+        StringBuilder builder = new StringBuilder();
+        for (int keyIndex : keyIndices) {
+          builder.append(tuple.getText(keyIndex));
+        }
+        result2[i] = builder.toString();
+      }
+      time2[iteration] = System.currentTimeMillis() - start;
+
+      LOG.info("Sort on keys " + Arrays.toString(keyIndices) +
+          ": Vectorized " + time1[iteration]+ " msec, Original " + time2[iteration] + " msec");
+
+      assertArrayEquals(result1, result2);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/a4106883/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestAlterTable.java
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestAlterTable.java b/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestAlterTable.java
new file mode 100644
index 0000000..8339ea7
--- /dev/null
+++ b/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestAlterTable.java
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.query;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.IntegrationTest;
+import org.apache.tajo.QueryTestCaseBase;
+import org.apache.tajo.catalog.CatalogUtil;
+import org.apache.tajo.catalog.TableDesc;
+import org.apache.tajo.catalog.proto.CatalogProtos;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.sql.ResultSet;
+import java.util.List;
+
+import static org.junit.Assert.*;
+
+@Category(IntegrationTest.class)
+public class TestAlterTable extends QueryTestCaseBase {
+  @Test
+  public final void testAlterTableName() throws Exception {
+    List<String> createdNames = executeDDL("table1_ddl.sql", "table1.tbl", "ABC");
+    assertTableExists(createdNames.get(0));
+    executeDDL("alter_table_rename_table_ddl.sql", null);
+    assertTableExists("DEF");
+  }
+
+  @Test
+  public final void testAlterTableColumnName() throws Exception {
+    List<String> createdNames = executeDDL("table1_ddl.sql", "table1.tbl", "XYZ");
+    executeDDL("alter_table_rename_column_ddl.sql", null);
+    assertColumnExists(createdNames.get(0), "renum");
+  }
+
+  @Test
+  public final void testAlterTableAddNewColumn() throws Exception {
+    List<String> createdNames = executeDDL("table1_ddl.sql", "table1.tbl", "EFG");
+    executeDDL("alter_table_add_new_column_ddl.sql", null);
+    assertColumnExists(createdNames.get(0),"cool");
+  }
+
+  @Test
+  public final void testAlterTableSetProperty() throws Exception {
+    executeDDL("table2_ddl.sql", "table2.tbl", "ALTX");
+    ResultSet before_res = executeQuery();
+    assertResultSet(before_res, "before_set_property_delimiter.result");
+    cleanupQuery(before_res);
+
+    executeDDL("alter_table_set_property_delimiter.sql", null);
+
+    ResultSet after_res = executeQuery();
+    assertResultSet(after_res, "after_set_property_delimiter.result");
+    cleanupQuery(after_res);
+  }
+
+  @Test
+  public final void testAlterTableAddPartition() throws Exception {
+    executeDDL("create_partitioned_table.sql", null);
+
+    String tableName = CatalogUtil.buildFQName("TestAlterTable", "partitioned_table");
+    assertTrue(catalog.existsTable(tableName));
+
+    TableDesc retrieved = catalog.getTableDesc(tableName);
+    assertEquals(retrieved.getName(), tableName);
+    assertEquals(retrieved.getPartitionMethod().getPartitionType(), CatalogProtos.PartitionType.COLUMN);
+    assertEquals(retrieved.getPartitionMethod().getExpressionSchema().getAllColumns().size(), 2);
+    assertEquals(retrieved.getPartitionMethod().getExpressionSchema().getColumn(0).getSimpleName(), "col3");
+    assertEquals(retrieved.getPartitionMethod().getExpressionSchema().getColumn(1).getSimpleName(), "col4");
+
+    executeDDL("alter_table_add_partition1.sql", null);
+    executeDDL("alter_table_add_partition2.sql", null);
+
+    List<CatalogProtos.PartitionDescProto> partitions = catalog.getPartitions("TestAlterTable", "partitioned_table");
+    assertNotNull(partitions);
+    assertEquals(partitions.size(), 1);
+    assertEquals(partitions.get(0).getPartitionName(), "col3=1/col4=2");
+    assertEquals(partitions.get(0).getPartitionKeysList().get(0).getColumnName(), "col3");
+    assertEquals(partitions.get(0).getPartitionKeysList().get(0).getPartitionValue(), "1");
+    assertEquals(partitions.get(0).getPartitionKeysList().get(1).getColumnName(), "col4");
+    assertEquals(partitions.get(0).getPartitionKeysList().get(1).getPartitionValue(), "2");
+
+    assertNotNull(partitions.get(0).getPath());
+    Path partitionPath = new Path(partitions.get(0).getPath());
+    FileSystem fs = partitionPath.getFileSystem(conf);
+    assertTrue(fs.exists(partitionPath));
+    assertTrue(partitionPath.toString().indexOf("col3=1/col4=2") > 0);
+
+    executeDDL("alter_table_drop_partition1.sql", null);
+    executeDDL("alter_table_drop_partition2.sql", null);
+
+    partitions = catalog.getPartitions("TestAlterTable", "partitioned_table");
+    assertNotNull(partitions);
+    assertEquals(partitions.size(), 0);
+    assertFalse(fs.exists(partitionPath));
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/a4106883/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestAlterTablespace.java
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestAlterTablespace.java b/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestAlterTablespace.java
new file mode 100644
index 0000000..8509b07
--- /dev/null
+++ b/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestAlterTablespace.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.query;
+
+import org.apache.tajo.IntegrationTest;
+import org.apache.tajo.QueryTestCaseBase;
+import org.apache.tajo.catalog.proto.CatalogProtos;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import static org.junit.Assert.*;
+
+@Category(IntegrationTest.class)
+public class TestAlterTablespace extends QueryTestCaseBase {
+
+  @Test
+  public final void testAlterLocation() throws Exception {
+    if (!testingCluster.isHiveCatalogStoreRunning()) {
+      //////////////////////////////////////////////////////////////////////////////
+      // Create two table spaces
+      //////////////////////////////////////////////////////////////////////////////
+
+      assertFalse(catalog.existTablespace("space1"));
+      catalog.createTablespace("space1", "hdfs://xxx.com/warehouse");
+      assertTrue(catalog.existTablespace("space1"));
+
+      // pre verification
+      CatalogProtos.TablespaceProto space1 = catalog.getTablespace("space1");
+      assertEquals("space1", space1.getSpaceName());
+      assertEquals("hdfs://xxx.com/warehouse", space1.getUri());
+
+      executeString("ALTER TABLESPACE space1 LOCATION 'hdfs://yyy.com/warehouse';");
+
+      // Verify ALTER TABLESPACE space1
+      space1 = catalog.getTablespace("space1");
+      assertEquals("space1", space1.getSpaceName());
+      assertEquals("hdfs://yyy.com/warehouse", space1.getUri());
+
+      catalog.dropTablespace("space1");
+      assertFalse(catalog.existTablespace("space1"));
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/a4106883/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestCTASQuery.java
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestCTASQuery.java b/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestCTASQuery.java
new file mode 100644
index 0000000..f327c85
--- /dev/null
+++ b/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestCTASQuery.java
@@ -0,0 +1,281 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.query;
+
+import com.google.common.collect.Maps;
+import org.apache.commons.lang.StringEscapeUtils;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.IntegrationTest;
+import org.apache.tajo.QueryTestCaseBase;
+import org.apache.tajo.TajoConstants;
+import org.apache.tajo.TajoTestingCluster;
+import org.apache.tajo.catalog.CatalogService;
+import org.apache.tajo.catalog.CatalogUtil;
+import org.apache.tajo.catalog.TableDesc;
+import org.apache.tajo.catalog.partition.PartitionMethodDesc;
+import org.apache.tajo.catalog.proto.CatalogProtos;
+import org.apache.tajo.storage.StorageConstants;
+import org.apache.tajo.util.KeyValueSet;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.sql.ResultSet;
+import java.util.Map;
+
+import static org.apache.tajo.TajoConstants.DEFAULT_DATABASE_NAME;
+import static org.apache.tajo.catalog.CatalogUtil.buildFQName;
+import static org.junit.Assert.*;
+
+
+/**
+ * Test CREATE TABLE AS SELECT statements
+ */
+@Category(IntegrationTest.class)
+public class TestCTASQuery extends QueryTestCaseBase {
+
+  public TestCTASQuery() {
+    super(TajoConstants.DEFAULT_DATABASE_NAME);
+  }
+
+  @Test
+  public final void testCtasWithoutTableDefinition() throws Exception {
+    ResultSet res = executeQuery();
+    res.close();
+
+    String tableName = CatalogUtil.normalizeIdentifier("testCtasWithoutTableDefinition");
+    CatalogService catalog = testBase.getTestingCluster().getMaster().getCatalog();
+    String qualifiedTableName = buildFQName(DEFAULT_DATABASE_NAME, tableName);
+    TableDesc desc = catalog.getTableDesc(qualifiedTableName);
+    assertTrue(catalog.existsTable(qualifiedTableName));
+
+    assertTrue(desc.getSchema().contains("default.testctaswithouttabledefinition.col1"));
+    PartitionMethodDesc partitionDesc = desc.getPartitionMethod();
+    assertEquals(partitionDesc.getPartitionType(), CatalogProtos.PartitionType.COLUMN);
+    assertEquals("key", partitionDesc.getExpressionSchema().getRootColumns().get(0).getSimpleName());
+
+    FileSystem fs = FileSystem.get(testBase.getTestingCluster().getConfiguration());
+    Path path = new Path(desc.getUri());
+    assertTrue(fs.isDirectory(path));
+    assertTrue(fs.isDirectory(new Path(path.toUri() + "/key=17.0")));
+    assertTrue(fs.isDirectory(new Path(path.toUri() + "/key=36.0")));
+    assertTrue(fs.isDirectory(new Path(path.toUri() + "/key=38.0")));
+    assertTrue(fs.isDirectory(new Path(path.toUri() + "/key=45.0")));
+    assertTrue(fs.isDirectory(new Path(path.toUri() + "/key=49.0")));
+    if (!testingCluster.isHiveCatalogStoreRunning()) {
+      assertEquals(5, desc.getStats().getNumRows().intValue());
+    }
+
+    ResultSet res2 = executeFile("check1.sql");
+
+    Map<Double, int []> resultRows1 = Maps.newHashMap();
+    resultRows1.put(45.0d, new int[]{3, 2});
+    resultRows1.put(38.0d, new int[]{2, 2});
+
+    int i = 0;
+    while(res2.next()) {
+      assertEquals(resultRows1.get(res2.getDouble(3))[0], res2.getInt(1));
+      assertEquals(resultRows1.get(res2.getDouble(3))[1], res2.getInt(2));
+      i++;
+    }
+    res2.close();
+    assertEquals(2, i);
+  }
+
+  @Test
+  public final void testCtasWithColumnedPartition() throws Exception {
+    ResultSet res = executeQuery();
+    res.close();
+
+    String tableName = CatalogUtil.normalizeIdentifier("testCtasWithColumnedPartition");
+
+    TajoTestingCluster cluster = testBase.getTestingCluster();
+    CatalogService catalog = cluster.getMaster().getCatalog();
+    TableDesc desc = catalog.getTableDesc(DEFAULT_DATABASE_NAME, tableName);
+    assertTrue(catalog.existsTable(DEFAULT_DATABASE_NAME, tableName));
+    PartitionMethodDesc partitionDesc = desc.getPartitionMethod();
+    assertEquals(partitionDesc.getPartitionType(), CatalogProtos.PartitionType.COLUMN);
+    assertEquals("key", partitionDesc.getExpressionSchema().getRootColumns().get(0).getSimpleName());
+
+    FileSystem fs = FileSystem.get(cluster.getConfiguration());
+    Path path = new Path(desc.getUri());
+    assertTrue(fs.isDirectory(path));
+    assertTrue(fs.isDirectory(new Path(path.toUri() + "/key=17.0")));
+    assertTrue(fs.isDirectory(new Path(path.toUri() + "/key=36.0")));
+    assertTrue(fs.isDirectory(new Path(path.toUri() + "/key=38.0")));
+    assertTrue(fs.isDirectory(new Path(path.toUri() + "/key=45.0")));
+    assertTrue(fs.isDirectory(new Path(path.toUri() + "/key=49.0")));
+    if (!cluster.isHiveCatalogStoreRunning()) {
+      assertEquals(5, desc.getStats().getNumRows().intValue());
+    }
+
+    ResultSet res2 = executeFile("check2.sql");
+
+    Map<Double, int []> resultRows1 = Maps.newHashMap();
+    resultRows1.put(45.0d, new int[]{3, 2});
+    resultRows1.put(38.0d, new int[]{2, 2});
+
+    int i = 0;
+    while(res2.next()) {
+      assertEquals(resultRows1.get(res2.getDouble(3))[0], res2.getInt(1));
+      assertEquals(resultRows1.get(res2.getDouble(3))[1], res2.getInt(2));
+      i++;
+    }
+    res2.close();
+    assertEquals(2, i);
+  }
+
+  @Test
+  public final void testCtasWithGroupby() throws Exception {
+    ResultSet res = executeFile("CtasWithGroupby.sql");
+    res.close();
+
+    ResultSet res2 = executeQuery();
+    assertResultSet(res2);
+    res2.close();
+  }
+
+  @Test
+  public final void testCtasWithOrderby() throws Exception {
+    ResultSet res = executeFile("CtasWithOrderby.sql");
+    res.close();
+
+    ResultSet res2 = executeQuery();
+    assertResultSet(res2);
+    res2.close();
+  }
+
+  @Test
+  public final void testCtasWithLimit() throws Exception {
+    ResultSet res = executeFile("CtasWithLimit.sql");
+    res.close();
+
+    ResultSet res2 = executeQuery();
+    assertResultSet(res2);
+    res2.close();
+  }
+
+  @Test
+  public final void testCtasWithUnion() throws Exception {
+    ResultSet res = executeFile("CtasWithUnion.sql");
+    res.close();
+
+    ResultSet res2 = executeQuery();
+    resultSetToString(res2);
+    res2.close();
+  }
+
+  @Test
+  public final void testCtasWithMultipleUnions() throws Exception {
+    ResultSet res = executeFile("CtasWithMultipleUnions.sql");
+    res.close();
+
+    ResultSet res2 = executeQuery();
+    String actual = resultSetToString(res2);
+    res2.close();
+
+    String expected = "c_custkey,c_nationkey\n" +
+        "-------------------------------\n" +
+        "1,15\n" +
+        "2,13\n" +
+        "3,1\n" +
+        "4,4\n" +
+        "5,3\n" +
+        "1,15\n" +
+        "2,13\n" +
+        "3,1\n" +
+        "4,4\n" +
+        "5,3\n";
+
+    assertEquals(expected, actual);
+
+    TableDesc desc = client.getTableDesc(CatalogUtil.normalizeIdentifier(res2.getMetaData().getTableName(1)));
+    assertNotNull(desc);
+  }
+
+  @Test
+  public final void testCtasWithStoreType() throws Exception {
+    ResultSet res = executeFile("CtasWithStoreType.sql");
+    res.close();
+
+    ResultSet res2 = executeQuery();
+    resultSetToString(res2);
+    res2.close();
+
+    TableDesc desc =  client.getTableDesc(CatalogUtil.normalizeIdentifier(res2.getMetaData().getTableName(1)));
+    assertNotNull(desc);
+    assertTrue("RCFILE".equalsIgnoreCase(desc.getMeta().getStoreType()));
+  }
+
+  @Test
+  public final void testCtasWithTextFile() throws Exception {
+    ResultSet res = executeFile("CtasWithTextFile.sql");
+    res.close();
+
+    ResultSet res2 = executeQuery();
+    resultSetToString(res2);
+    res2.close();
+
+    TableDesc desc =  client.getTableDesc(CatalogUtil.normalizeIdentifier(res2.getMetaData().getTableName(1)));
+    assertNotNull(desc);
+    assertTrue("TEXT".equalsIgnoreCase(desc.getMeta().getStoreType()));
+  }
+
+  @Test
+  public final void testCtasWithOptions() throws Exception {
+    ResultSet res = executeFile("CtasWithOptions.sql");
+    res.close();
+
+    ResultSet res2 = executeQuery();
+    resultSetToString(res2);
+    res2.close();
+
+    TableDesc desc =  client.getTableDesc(CatalogUtil.normalizeIdentifier(res2.getMetaData().getTableName(1)));
+    assertNotNull(desc);
+    assertTrue("TEXT".equalsIgnoreCase(desc.getMeta().getStoreType()));
+
+
+    KeyValueSet options = desc.getMeta().getOptions();
+    assertNotNull(options);
+    assertEquals(StringEscapeUtils.escapeJava("\u0001"), options.get(StorageConstants.TEXT_DELIMITER));
+  }
+
+  @Test
+  public final void testCtasWithManagedTable() throws Exception {
+    ResultSet res = executeFile("CtasWithManagedTable.sql");
+    res.close();
+
+    if (testingCluster.isHiveCatalogStoreRunning()) {
+      assertTrue(client.existTable("managed_table1"));
+
+      TableDesc desc =  client.getTableDesc("managed_table1");
+
+      assertNotNull(desc);
+      assertEquals("managed_table1", new Path(desc.getUri()).getName());
+    } else {
+      assertFalse(client.existTable("managed_Table1"));
+      assertTrue(client.existTable("MANAGED_TABLE1"));
+
+      TableDesc desc =  client.getTableDesc("MANAGED_TABLE1");
+
+      assertNotNull(desc);
+      assertEquals("MANAGED_TABLE1", new Path(desc.getUri()).getName());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/a4106883/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestCaseByCases.java
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestCaseByCases.java b/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestCaseByCases.java
new file mode 100644
index 0000000..bcf00f8
--- /dev/null
+++ b/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestCaseByCases.java
@@ -0,0 +1,183 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.query;
+
+import org.apache.tajo.QueryTestCaseBase;
+import org.apache.tajo.TajoConstants;
+import org.junit.Test;
+
+import java.sql.ResultSet;
+
+import static org.junit.Assert.assertEquals;
+
+public class TestCaseByCases extends QueryTestCaseBase {
+
+  public TestCaseByCases() {
+    super(TajoConstants.DEFAULT_DATABASE_NAME);
+  }
+
+  @Test
+  public final void testTAJO415Case() throws Exception {
+    ResultSet res = executeQuery();
+    assertResultSet(res);
+    cleanupQuery(res);
+  }
+
+  @Test
+  public final void testTAJO418Case() throws Exception {
+    ResultSet res = executeQuery();
+    assertResultSet(res);
+    cleanupQuery(res);
+  }
+
+  /**
+   * It's an unit test to reproduce TAJO-619 (https://issues.apache.org/jira/browse/TAJO-619).
+   */
+  @Test
+  public final void testTAJO619Case() throws Exception {
+    ResultSet res = executeQuery();
+    assertResultSet(res);
+    cleanupQuery(res);
+  }
+
+  @Test
+  public final void testTAJO718Case() throws Exception {
+    ResultSet res = executeQuery();
+    assertResultSet(res);
+    cleanupQuery(res);
+  }
+
+  @Test
+  public final void testTAJO739Case() throws Exception {
+    ResultSet res = executeQuery();
+    assertResultSet(res);
+    cleanupQuery(res);
+  }
+
+  @Test
+  public final void testTAJO880Case1() throws Exception {
+    //TAJO-880: NULL in CASE clause occurs Exception.
+    ResultSet res = executeString(
+        "select case when l_returnflag != 'R' then l_orderkey else null end from lineitem"
+    );
+
+    String expected =
+        "?casewhen\n" +
+        "-------------------------------\n" +
+        "1\n" +
+        "1\n" +
+        "2\n" +
+        "null\n" +
+        "null\n";
+
+    assertEquals(expected, resultSetToString(res));
+    cleanupQuery(res);
+  }
+
+  @Test
+  public final void testTAJO880Case2() throws Exception {
+    //TAJO-880: NULL in CASE clause occurs Exception.
+    ResultSet res = executeString(
+        "select case when l_returnflag != 'R' then null else l_orderkey end from lineitem"
+    );
+
+    String expected =
+        "?casewhen\n" +
+        "-------------------------------\n" +
+        "null\n" +
+        "null\n" +
+        "null\n" +
+        "3\n" +
+        "3\n";
+
+    assertEquals(expected, resultSetToString(res));
+    cleanupQuery(res);
+  }
+
+  @Test
+  public final void testTAJO880Case3() throws Exception {
+    //TAJO-880: NULL in CASE clause occurs Exception.
+    ResultSet res = executeString(
+        "select case " +
+            "when l_orderkey = 1 then null " +
+            "when l_orderkey = 2 then l_orderkey " +
+            "else null end " +
+        "from lineitem"
+    );
+
+    String expected =
+        "?casewhen\n" +
+            "-------------------------------\n" +
+            "null\n" +
+            "null\n" +
+            "2\n" +
+            "null\n" +
+            "null\n";
+
+    assertEquals(expected, resultSetToString(res));
+    cleanupQuery(res);
+  }
+
+  @Test
+  public final void testTAJO914Case1() throws Exception {
+    ResultSet res = executeQuery();
+    assertResultSet(res);
+    cleanupQuery(res);
+  }
+
+  @Test
+  public final void testTAJO914Case2() throws Exception {
+    ResultSet res = executeQuery();
+    assertResultSet(res);
+    cleanupQuery(res);
+  }
+
+  @Test
+  public final void testTAJO914Case3() throws Exception {
+    executeString("CREATE TABLE T3 (l_orderkey bigint, col1 text);").close();
+    ResultSet res = executeQuery();
+    res.close();
+
+    res = executeString("select * from T3;");
+    assertResultSet(res);
+    cleanupQuery(res);
+  }
+
+  @Test
+  public final void testTAJO914Case4() throws Exception {
+    ResultSet res = executeQuery();
+    assertResultSet(res);
+    cleanupQuery(res);
+  }
+
+  @Test
+  public final void testTAJO917Case1() throws Exception {
+    ResultSet res = executeQuery();
+    assertResultSet(res);
+    cleanupQuery(res);
+  }
+
+  @Test
+  public final void testTAJO1224Case1() throws Exception {
+    executeString("CREATE TABLE TAJO1224 USING JSON AS SELECT * FROM LINEITEM").close();
+    ResultSet res = executeQuery();
+    assertResultSet(res);
+    cleanupQuery(res);
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/a4106883/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestCreateDatabase.java
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestCreateDatabase.java b/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestCreateDatabase.java
new file mode 100644
index 0000000..453c174
--- /dev/null
+++ b/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestCreateDatabase.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.query;
+
+import org.apache.tajo.IntegrationTest;
+import org.apache.tajo.QueryTestCaseBase;
+import org.apache.tajo.catalog.CatalogUtil;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.sql.ResultSet;
+
+@Category(IntegrationTest.class)
+public class TestCreateDatabase extends QueryTestCaseBase {
+
+  @Test
+  public final void testCreateAndDropDatabase() throws Exception {
+    String databaseName = CatalogUtil.normalizeIdentifier("testCreateAndDropDatabase");
+
+    ResultSet res = null;
+    try {
+      res = executeString("CREATE DATABASE testCreateAndDropDatabase;");
+      assertDatabaseExists(databaseName);
+      executeString("DROP DATABASE testCreateAndDropDatabase;");
+      assertDatabaseNotExists(databaseName);
+    } finally {
+      cleanupQuery(res);
+    }
+  }
+
+  @Test
+  public final void testCreateIfNotExists() throws Exception {
+    String databaseName = CatalogUtil.normalizeIdentifier("testCreateIfNotExists");
+
+    assertDatabaseNotExists(databaseName);
+    executeString("CREATE DATABASE " + databaseName + ";").close();
+    assertDatabaseExists(databaseName);
+
+    executeString("CREATE DATABASE IF NOT EXISTS " + databaseName + ";").close();
+    assertDatabaseExists(databaseName);
+
+    executeString("DROP DATABASE " + databaseName + ";").close();
+    assertDatabaseNotExists(databaseName);
+  }
+
+  @Test
+  public final void testDropIfExists() throws Exception {
+    String databaseName = CatalogUtil.normalizeIdentifier("testDropIfExists");
+    assertDatabaseNotExists(databaseName);
+    executeString("CREATE DATABASE " + databaseName + ";").close();
+    assertDatabaseExists(databaseName);
+
+    executeString("DROP DATABASE " + databaseName + ";").close();
+    assertDatabaseNotExists(databaseName);
+
+    executeString("DROP DATABASE IF EXISTS " + databaseName + ";");
+    assertDatabaseNotExists(databaseName);
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/a4106883/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestCreateIndex.java
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestCreateIndex.java b/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestCreateIndex.java
new file mode 100644
index 0000000..81df04a
--- /dev/null
+++ b/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestCreateIndex.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.query;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.IntegrationTest;
+import org.apache.tajo.QueryTestCaseBase;
+import org.apache.tajo.TajoConstants;
+import org.apache.tajo.conf.TajoConf.ConfVars;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.io.IOException;
+
+import static org.junit.Assert.*;
+
+@Category(IntegrationTest.class)
+public class TestCreateIndex extends QueryTestCaseBase {
+
+  public TestCreateIndex() {
+    super(TajoConstants.DEFAULT_DATABASE_NAME);
+  }
+
+  private static void assertIndexNotExist(String databaseName, String indexName) throws IOException {
+    Path indexPath = new Path(conf.getVar(ConfVars.WAREHOUSE_DIR), databaseName + "/" + indexName);
+    FileSystem fs = indexPath.getFileSystem(conf);
+    if (fs.exists(indexPath)) {
+      fs.deleteOnExit(indexPath);
+      assertFalse("Index is not deleted from the file system.", true);
+    }
+  }
+
+  @Test
+  public final void testCreateIndex() throws Exception {
+    executeQuery();
+    assertTrue(catalog.existIndexByName(getCurrentDatabase(), "l_orderkey_idx"));
+    assertTrue(catalog.existIndexByColumnNames(getCurrentDatabase(), "lineitem", new String[]{"l_orderkey"}));
+    executeString("drop index l_orderkey_idx");
+    assertFalse(catalog.existIndexByName(getCurrentDatabase(), "l_orderkey_idx"));
+    assertIndexNotExist(getCurrentDatabase(), "l_orderkey_idx");
+  }
+
+  @Test
+  public final void testCreateIndexOnMultiAttrs() throws Exception {
+    executeQuery();
+    assertTrue(catalog.existIndexByName(getCurrentDatabase(), "l_orderkey_partkey_idx"));
+    assertTrue(catalog.existIndexByColumnNames(getCurrentDatabase(), "lineitem", new String[]{"l_orderkey", "l_partkey"}));
+    executeString("drop index l_orderkey_partkey_idx");
+    assertFalse(catalog.existIndexByName(getCurrentDatabase(), "l_orderkey_partkey_idx"));
+    assertIndexNotExist(getCurrentDatabase(), "l_orderkey_partkey_idx");
+  }
+
+  @Test
+  public final void testCreateIndexWithCondition() throws Exception {
+    executeQuery();
+    assertTrue(catalog.existIndexByName(getCurrentDatabase(), "l_orderkey_partkey_lt10_idx"));
+    assertTrue(catalog.existIndexByColumnNames(getCurrentDatabase(), "lineitem", new String[]{"l_orderkey", "l_partkey"}));
+    executeString("drop index l_orderkey_partkey_lt10_idx");
+    assertFalse(catalog.existIndexByName(getCurrentDatabase(), "l_orderkey_partkey_lt10_idx"));
+    assertIndexNotExist(getCurrentDatabase(), "l_orderkey_partkey_lt10_idx");
+  }
+
+  @Test
+  public final void testCreateIndexOnExpression() throws Exception {
+    executeQuery();
+    assertTrue(catalog.existIndexByName(getCurrentDatabase(), "l_orderkey_100_lt10_idx"));
+    executeString("drop index l_orderkey_100_lt10_idx");
+    assertFalse(catalog.existIndexByName(getCurrentDatabase(), "l_orderkey_100_lt10_idx"));
+    assertIndexNotExist(getCurrentDatabase(), "l_orderkey_100_lt10_idx");
+  }
+
+  @Test
+  public final void testCreateIndexOnMultiExprs() throws Exception {
+    executeQuery();
+    assertTrue(catalog.existIndexByName(getCurrentDatabase(), "l_orderkey_100_l_linenumber_10_lt10_idx"));
+    executeString("drop index l_orderkey_100_l_linenumber_10_lt10_idx");
+    assertFalse(catalog.existIndexByName(getCurrentDatabase(), "l_orderkey_100_l_linenumber_10_lt10_idx"));
+    assertIndexNotExist(getCurrentDatabase(), "l_orderkey_100_l_linenumber_10_lt10_idx");
+  }
+
+  @Test
+  public final void testCreateIndexOnLocation() throws Exception {
+    executeQuery();
+    assertTrue(catalog.existIndexByName(getCurrentDatabase(), "l_orderkey_idx"));
+    assertTrue(catalog.existIndexByColumnNames(getCurrentDatabase(), "lineitem", new String[]{"l_orderkey"}));
+    catalog.dropIndex(getCurrentDatabase(), "l_orderkey_idx");
+    assertFalse(catalog.existIndexByName(getCurrentDatabase(), "l_orderkey_idx"));
+    executeString("create index l_orderkey_idx on lineitem (l_orderkey asc null first) location '/tajo/warehouse/default/l_orderkey_idx';");
+    assertTrue(catalog.existIndexByName(getCurrentDatabase(), "l_orderkey_idx"));
+    assertTrue(catalog.existIndexByColumnNames(getCurrentDatabase(), "lineitem", new String[]{"l_orderkey"}));
+    executeString("drop index l_orderkey_idx");
+    assertFalse(catalog.existIndexByName(getCurrentDatabase(), "l_orderkey_idx"));
+    assertIndexNotExist(getCurrentDatabase(), "l_orderkey_idx");
+  }
+}


Mime
View raw message