tajo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hyun...@apache.org
Subject [2/2] git commit: TAJO-197: Implement Enforcer that forces physical planner to choose specified algorithms. (hyunsik)
Date Tue, 24 Sep 2013 05:58:13 GMT
TAJO-197: Implement Enforcer that forces physical planner to choose specified algorithms. (hyunsik)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tajo/commit/17287ef5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tajo/tree/17287ef5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tajo/diff/17287ef5

Branch: refs/heads/master
Commit: 17287ef58457da7766a78d2655cd332adbc56150
Parents: 3e82159
Author: Hyunsik Choi <hyunsik@apache.org>
Authored: Tue Sep 24 14:57:54 2013 +0900
Committer: Hyunsik Choi <hyunsik@apache.org>
Committed: Tue Sep 24 14:57:54 2013 +0900

----------------------------------------------------------------------
 CHANGES.txt                                     |   3 +
 .../java/org/apache/tajo/catalog/SortSpec.java  |  25 +-
 .../src/main/proto/CatalogProtos.proto          |   8 +-
 .../org/apache/tajo/TaskAttemptContext.java     |  10 +
 .../engine/planner/PhysicalPlannerImpl.java     | 312 +++++++--
 .../planner/PhysicalPlanningException.java      |  31 +
 .../apache/tajo/engine/planner/PlannerUtil.java |   4 +-
 .../tajo/engine/planner/enforce/Enforcer.java   | 166 +++++
 .../engine/planner/physical/BNLJoinExec.java    |  10 +-
 .../planner/physical/ExternalSortExec.java      |   3 +-
 .../engine/planner/physical/MemSortExec.java    |   3 +-
 .../planner/physical/UnaryPhysicalExec.java     |   4 +-
 .../tajo/engine/query/QueryUnitRequestImpl.java |  31 +-
 .../ipc/protocolrecords/QueryUnitRequest.java   |   2 +
 .../org/apache/tajo/master/ExecutionBlock.java  |  10 +-
 .../apache/tajo/master/TaskSchedulerImpl.java   |   5 +-
 .../main/java/org/apache/tajo/worker/Task.java  |   1 +
 .../src/main/proto/TajoWorkerProtocol.proto     |  73 +-
 .../planner/physical/TestBNLJoinExec.java       |  67 +-
 .../planner/physical/TestExternalSortExec.java  |   2 +
 .../planner/physical/TestHashAntiJoinExec.java  |   2 +
 .../planner/physical/TestHashJoinExec.java      |  38 +-
 .../planner/physical/TestHashSemiJoinExec.java  |   2 +
 .../planner/physical/TestMergeJoinExec.java     |  71 +-
 .../engine/planner/physical/TestNLJoinExec.java |   3 +
 .../planner/physical/TestPhysicalPlanner.java   | 131 +++-
 .../engine/planner/physical/TestSortExec.java   |   5 +-
 .../apache/tajo/engine/query/TestSortQuery.java |   1 +
 .../tajo/worker/TestRangeRetrieverHandler.java  |   9 +-
 .../apache/tajo/storage/TupleComparator.java    |  27 +-
 .../apache/tajo/storage/index/IndexProtos.java  | 665 -------------------
 .../apache/tajo/storage/index/bst/BSTIndex.java |   3 +-
 .../src/main/proto/IndexProtos.proto            |   2 +-
 33 files changed, 812 insertions(+), 917 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/17287ef5/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 53a6c07..9400154 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -29,6 +29,9 @@ Release 0.2.0 - unreleased
 
   IMPROVEMENTS
 
+    TAJO-197: Implement Enforcer that forces physical planner to choose
+    specified algorithms. (hyunsik)
+
     TAJO-194: LogicalNode should have an identifier to distinguish each
     logical node instance. (hyunsik)
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/17287ef5/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/SortSpec.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/SortSpec.java b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/SortSpec.java
index be73eb3..5cc0de1 100644
--- a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/SortSpec.java
+++ b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/SortSpec.java
@@ -19,11 +19,14 @@
 package org.apache.tajo.catalog;
 
 import com.google.gson.annotations.Expose;
-import org.apache.tajo.json.GsonObject;
 import org.apache.tajo.catalog.json.CatalogGsonHelper;
+import org.apache.tajo.common.ProtoObject;
+import org.apache.tajo.json.GsonObject;
 
+import static org.apache.tajo.catalog.proto.CatalogProtos.SortSpecProto;
 
-public class SortSpec implements Cloneable, GsonObject {
+
+public class SortSpec implements Cloneable, GsonObject, ProtoObject<SortSpecProto> {
   @Expose private Column sortKey;
   @Expose private boolean ascending = true;
   @Expose private boolean nullFirst = false;
@@ -45,6 +48,12 @@ public class SortSpec implements Cloneable, GsonObject {
     this.nullFirst = nullFirst;
   }
 
+  public SortSpec(SortSpecProto sortSpec) {
+    this.sortKey = new Column(sortSpec.getColumn());
+    this.ascending = sortSpec.getAscending();
+    this.nullFirst = sortSpec.getNullFirst();
+  }
+
   public final boolean isAscending() {
     return this.ascending;
   }
@@ -93,7 +102,15 @@ public class SortSpec implements Cloneable, GsonObject {
   }
 
   public String toString() {
-    return "Sortkey (key="+sortKey
-        + " "+(ascending ? "asc" : "desc")+")";
+    return "Sortkey (key="+sortKey + " "+(ascending ? "asc" : "desc")+")";
+  }
+
+  @Override
+  public SortSpecProto getProto() {
+    SortSpecProto.Builder builder = SortSpecProto.newBuilder();
+    builder.setColumn(sortKey.getProto());
+    builder.setAscending(ascending);
+    builder.setNullFirst(nullFirst);
+    return builder.build();
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/17287ef5/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto b/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto
index 477362c..6ef7613 100644
--- a/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto
+++ b/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto
@@ -207,8 +207,14 @@ message ServerNameProto {
   required int32 port = 3;
 }
 
+message TupleComparatorSpecProto {
+  required int32 columnId = 1;
+  optional bool ascending = 2 [default = true];
+  optional bool nullFirst = 3 [default = false];
+}
+
 message SortSpecProto {
-  required int32 sortColumnId = 1;
+  required ColumnProto column = 1;
   optional bool ascending = 2 [default = true];
   optional bool nullFirst = 3 [default = false];
 }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/17287ef5/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/TaskAttemptContext.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/TaskAttemptContext.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/TaskAttemptContext.java
index 7eac43d..32c06cc 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/TaskAttemptContext.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/TaskAttemptContext.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.tajo.TajoProtos.TaskAttemptState;
 import org.apache.tajo.catalog.statistics.TableStat;
 import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.engine.planner.enforce.Enforcer;
 import org.apache.tajo.storage.Fragment;
 
 import java.io.File;
@@ -55,6 +56,7 @@ public class TaskAttemptContext {
   private boolean interQuery = false;
   private Path outputPath;
   private DataChannel dataChannel;
+  private Enforcer enforcer;
 
   public TaskAttemptContext(TajoConf conf, final QueryUnitAttemptId queryId,
                             final Fragment[] fragments,
@@ -99,6 +101,14 @@ public class TaskAttemptContext {
     return dataChannel;
   }
 
+  public void setEnforcer(Enforcer enforcer) {
+    this.enforcer = enforcer;
+  }
+
+  public Enforcer getEnforcer() {
+    return this.enforcer;
+  }
+
   public boolean hasResultStats() {
     return resultStats != null;
   }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/17287ef5/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
index 4813fd7..ddedd80 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
@@ -31,6 +31,7 @@ import org.apache.tajo.catalog.Column;
 import org.apache.tajo.catalog.SortSpec;
 import org.apache.tajo.catalog.proto.CatalogProtos;
 import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.engine.planner.enforce.Enforcer;
 import org.apache.tajo.engine.planner.logical.*;
 import org.apache.tajo.engine.planner.physical.*;
 import org.apache.tajo.exception.InternalException;
@@ -40,8 +41,12 @@ import org.apache.tajo.storage.TupleComparator;
 import org.apache.tajo.util.IndexUtil;
 
 import java.io.IOException;
+import java.util.List;
 
-import static org.apache.tajo.ipc.TajoWorkerProtocol.PartitionType;
+import static org.apache.tajo.ipc.TajoWorkerProtocol.*;
+import static org.apache.tajo.ipc.TajoWorkerProtocol.EnforceProperty.EnforceType;
+import static org.apache.tajo.ipc.TajoWorkerProtocol.GroupbyEnforce.GroupbyAlgorithm;
+import static org.apache.tajo.ipc.TajoWorkerProtocol.JoinEnforce.JoinAlgorithm;
 
 public class PhysicalPlannerImpl implements PhysicalPlanner {
   private static final Log LOG = LogFactory.getLog(PhysicalPlannerImpl.class);
@@ -63,7 +68,8 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
 
     try {
       execPlan = createPlanRecursive(context, logicalPlan);
-      if (execPlan instanceof StoreTableExec || execPlan instanceof IndexedStoreExec
+      if (execPlan instanceof StoreTableExec
+          || execPlan instanceof IndexedStoreExec
           || execPlan instanceof PartitionedStoreExec) {
         return execPlan;
       } else if (context.getDataChannel() != null) {
@@ -181,60 +187,142 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
     return size;
   }
 
-  public PhysicalExec createJoinPlan(TaskAttemptContext ctx, JoinNode joinNode,
-                                     PhysicalExec outer, PhysicalExec inner)
-      throws IOException {
+  public PhysicalExec createJoinPlan(TaskAttemptContext context, JoinNode joinNode, PhysicalExec leftExec,
+                                     PhysicalExec rightExec) throws IOException {
+
     switch (joinNode.getJoinType()) {
       case CROSS:
         LOG.info("The planner chooses [Nested Loop Join]");
-        return new NLJoinExec(ctx, joinNode, outer, inner);
+        return createCrossJoinPlan(context, joinNode, leftExec, rightExec);
 
       case INNER:
-        String [] outerLineage = PlannerUtil.getLineage(joinNode.getLeftChild());
-        String [] innerLineage = PlannerUtil.getLineage(joinNode.getRightChild());
-        long outerSize = estimateSizeRecursive(ctx, outerLineage);
-        long innerSize = estimateSizeRecursive(ctx, innerLineage);
+        return createInnerJoinPlan(context, joinNode, leftExec, rightExec);
 
-        final long threshold = 1048576 * 128; // 64MB
+      case FULL_OUTER:
+      case LEFT_OUTER:
+      case RIGHT_OUTER:
 
-        boolean hashJoin = false;
-        if (outerSize < threshold || innerSize < threshold) {
-          hashJoin = true;
-        }
+      case LEFT_SEMI:
+      case RIGHT_SEMI:
 
-        if (hashJoin) {
-          PhysicalExec selectedOuter;
-          PhysicalExec selectedInner;
+      case LEFT_ANTI:
+      case RIGHT_ANTI:
 
-          // HashJoinExec loads the inner relation to memory.
-          if (outerSize <= innerSize) {
-            selectedInner = outer;
-            selectedOuter = inner;
-          } else {
-            selectedInner = inner;
-            selectedOuter = outer;
-          }
+      default:
+        throw new PhysicalPlanningException("Cannot support join type: " + joinNode.getJoinType().name());
+    }
+  }
 
-          LOG.info("The planner chooses [InMemory Hash Join]");
-          return new HashJoinExec(ctx, joinNode, selectedOuter, selectedInner);
-        }
+  private PhysicalExec createCrossJoinPlan(TaskAttemptContext context, JoinNode plan,
+                                           PhysicalExec leftExec, PhysicalExec rightExec) throws IOException {
+    Enforcer enforcer = context.getEnforcer();
+    EnforceProperty property = getAlgorithmEnforceProperty(enforcer, plan);
+
+    if (property != null) {
+      JoinAlgorithm algorithm = property.getJoin().getAlgorithm();
+
+      switch (algorithm) {
+        case NESTED_LOOP_JOIN:
+          LOG.info("Join (" + plan.getPID() +") chooses [Nested Loop Join]");
+          return new NLJoinExec(context, plan, leftExec, rightExec);
+        case BLOCK_NESTED_LOOP_JOIN:
+          LOG.info("Join (" + plan.getPID() +") chooses [Block Nested Loop Join]");
+          return new BNLJoinExec(context, plan, leftExec, rightExec);
+        default:
+          // fallback algorithm
+          LOG.error("Invalid Cross Join Algorithm Enforcer: " + algorithm.name());
+          return new BNLJoinExec(context, plan, leftExec, rightExec);
+      }
 
-      default:
-        SortSpec[][] sortSpecs = PlannerUtil.getSortKeysFromJoinQual(
-            joinNode.getJoinQual(), outer.getSchema(), inner.getSchema());
-        ExternalSortExec outerSort = new ExternalSortExec(ctx, sm,
-            new SortNode(UNGENERATED_PID, sortSpecs[0], outer.getSchema(), outer.getSchema()),
-            outer);
-        ExternalSortExec innerSort = new ExternalSortExec(ctx, sm,
-            new SortNode(UNGENERATED_PID, sortSpecs[1], inner.getSchema(), inner.getSchema()),
-            inner);
-
-        LOG.info("The planner chooses [Merge Join]");
-        return new MergeJoinExec(ctx, joinNode, outerSort, innerSort,
-            sortSpecs[0], sortSpecs[1]);
+    } else {
+      return new BNLJoinExec(context, plan, leftExec, rightExec);
     }
   }
 
+  private PhysicalExec createInnerJoinPlan(TaskAttemptContext context, JoinNode plan,
+                                           PhysicalExec leftExec, PhysicalExec rightExec) throws IOException {
+    Enforcer enforcer = context.getEnforcer();
+    EnforceProperty property = getAlgorithmEnforceProperty(enforcer, plan);
+
+    if (property != null) {
+      JoinAlgorithm algorithm = property.getJoin().getAlgorithm();
+
+      switch (algorithm) {
+        case NESTED_LOOP_JOIN:
+          LOG.info("Join (" + plan.getPID() +") chooses [Nested Loop Join]");
+          return new NLJoinExec(context, plan, leftExec, rightExec);
+        case BLOCK_NESTED_LOOP_JOIN:
+          LOG.info("Join (" + plan.getPID() +") chooses [Block Nested Loop Join]");
+          return new BNLJoinExec(context, plan, leftExec, rightExec);
+        case IN_MEMORY_HASH_JOIN:
+          LOG.info("Join (" + plan.getPID() +") chooses [In-memory Hash Join]");
+          return new HashJoinExec(context, plan, leftExec, rightExec);
+        case MERGE_JOIN:
+          LOG.info("Join (" + plan.getPID() +") chooses [Sort Merge Join]");
+          return createMergeJoin(context, plan, leftExec, rightExec);
+        case HYBRID_HASH_JOIN:
+
+        default:
+          LOG.error("Invalid Inner Join Algorithm Enforcer: " + algorithm.name());
+          LOG.error("Choose a fallback inner join algorithm: " + JoinAlgorithm.MERGE_JOIN.name());
+          return createMergeJoin(context, plan, leftExec, rightExec);
+      }
+
+
+    } else {
+      return createBestInnerJoinPlan(context, plan, leftExec, rightExec);
+    }
+  }
+
+  private PhysicalExec createBestInnerJoinPlan(TaskAttemptContext context, JoinNode plan,
+                                               PhysicalExec leftExec, PhysicalExec rightExec) throws IOException {
+    String [] leftLineage = PlannerUtil.getLineage(plan.getLeftChild());
+    String [] rightLineage = PlannerUtil.getLineage(plan.getRightChild());
+    long leftSize = estimateSizeRecursive(context, leftLineage);
+    long rightSize = estimateSizeRecursive(context, rightLineage);
+
+    final long threshold = 1048576 * 128; // 64MB
+
+    boolean hashJoin = false;
+    if (leftSize < threshold || rightSize < threshold) {
+      hashJoin = true;
+    }
+
+    if (hashJoin) {
+      PhysicalExec selectedOuter;
+      PhysicalExec selectedInner;
+
+      // HashJoinExec loads the inner relation to memory.
+      if (leftSize <= rightSize) {
+        selectedInner = leftExec;
+        selectedOuter = rightExec;
+      } else {
+        selectedInner = rightExec;
+        selectedOuter = leftExec;
+      }
+
+      LOG.info("Join (" + plan.getPID() +") chooses [InMemory Hash Join]");
+      return new HashJoinExec(context, plan, selectedOuter, selectedInner);
+    } else {
+      return createMergeJoin(context, plan, leftExec, rightExec);
+    }
+  }
+
+  private MergeJoinExec createMergeJoin(TaskAttemptContext context, JoinNode plan,
+                                        PhysicalExec leftExec, PhysicalExec rightExec) throws IOException {
+    SortSpec[][] sortSpecs = PlannerUtil.getSortKeysFromJoinQual(
+        plan.getJoinQual(), leftExec.getSchema(), rightExec.getSchema());
+    ExternalSortExec outerSort = new ExternalSortExec(context, sm,
+        new SortNode(UNGENERATED_PID, sortSpecs[0], leftExec.getSchema(), leftExec.getSchema()),
+        leftExec);
+    ExternalSortExec innerSort = new ExternalSortExec(context, sm,
+        new SortNode(UNGENERATED_PID, sortSpecs[1], rightExec.getSchema(), rightExec.getSchema()),
+        rightExec);
+
+    LOG.info("Join (" + plan.getPID() +") chooses [Merge Join]");
+    return new MergeJoinExec(context, plan, outerSort, innerSort, sortSpecs[0], sortSpecs[1]);
+  }
+
   public PhysicalExec createStorePlan(TaskAttemptContext ctx,
                                       StoreTableNode plan, PhysicalExec subOp) throws IOException {
     if (plan.getPartitionType() == PartitionType.HASH_PARTITION
@@ -277,42 +365,94 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
     return new SeqScanExec(ctx, sm, scanNode, fragments);
   }
 
-  public PhysicalExec createGroupByPlan(TaskAttemptContext ctx,
-                                        GroupbyNode groupbyNode, PhysicalExec subOp) throws IOException {
+  public PhysicalExec createGroupByPlan(TaskAttemptContext context,GroupbyNode groupbyNode, PhysicalExec subOp)
+      throws IOException {
+
+    Enforcer enforcer = context.getEnforcer();
+    EnforceProperty property = getAlgorithmEnforceProperty(enforcer, groupbyNode);
+    if (property != null) {
+      GroupbyAlgorithm algorithm = property.getGroupby().getAlgorithm();
+      if (algorithm == GroupbyAlgorithm.HASH_AGGREGATION) {
+        return createInMemoryHashAggregation(context, groupbyNode, subOp);
+      } else {
+        return createSortAggregation(context, groupbyNode, subOp);
+      }
+    }
+    return createBestAggregationPlan(context, groupbyNode, subOp);
+  }
+
+  private PhysicalExec createInMemoryHashAggregation(TaskAttemptContext ctx,GroupbyNode groupbyNode, PhysicalExec subOp)
+      throws IOException {
+    LOG.info("The planner chooses [Hash Aggregation]");
+    return new HashAggregateExec(ctx, groupbyNode, subOp);
+  }
+
+  private PhysicalExec createSortAggregation(TaskAttemptContext ctx,GroupbyNode groupbyNode, PhysicalExec subOp)
+      throws IOException {
+    Column[] grpColumns = groupbyNode.getGroupingColumns();
+    SortSpec[] specs = new SortSpec[grpColumns.length];
+    for (int i = 0; i < grpColumns.length; i++) {
+      specs[i] = new SortSpec(grpColumns[i], true, false);
+    }
+    SortNode sortNode = new SortNode(-1, specs);
+    sortNode.setInSchema(subOp.getSchema());
+    sortNode.setOutSchema(subOp.getSchema());
+    // SortExec sortExec = new SortExec(sortNode, child);
+    ExternalSortExec sortExec = new ExternalSortExec(ctx, sm, sortNode, subOp);
+    LOG.info("The planner chooses [Sort Aggregation]");
+    return new SortAggregateExec(ctx, groupbyNode, sortExec);
+  }
+
+  private PhysicalExec createBestAggregationPlan(TaskAttemptContext context, GroupbyNode groupbyNode,
+                                                 PhysicalExec subOp) throws IOException {
     Column[] grpColumns = groupbyNode.getGroupingColumns();
     if (grpColumns.length == 0) {
+      return createInMemoryHashAggregation(context, groupbyNode, subOp);
+    }
+
+    String [] outerLineage = PlannerUtil.getLineage(groupbyNode.getChild());
+    long estimatedSize = estimateSizeRecursive(context, outerLineage);
+    final long threshold = conf.getLongVar(TajoConf.ConfVars.HASH_AGGREGATION_THRESHOLD);
+
+    // if the relation size is less than the threshold,
+    // the hash aggregation will be used.
+    if (estimatedSize <= threshold) {
       LOG.info("The planner chooses [Hash Aggregation]");
-      return new HashAggregateExec(ctx, groupbyNode, subOp);
+      return createInMemoryHashAggregation(context, groupbyNode, subOp);
     } else {
-      String [] outerLineage = PlannerUtil.getLineage(groupbyNode.getChild());
-      long estimatedSize = estimateSizeRecursive(ctx, outerLineage);
-      final long threshold = conf.getLongVar(TajoConf.ConfVars.HASH_AGGREGATION_THRESHOLD);
-
-      // if the relation size is less than the threshold,
-      // the hash aggregation will be used.
-      if (estimatedSize <= threshold) {
-        LOG.info("The planner chooses [Hash Aggregation]");
-        return new HashAggregateExec(ctx, groupbyNode, subOp);
+      return createSortAggregation(context, groupbyNode, subOp);
+    }
+  }
+
+  public PhysicalExec createSortPlan(TaskAttemptContext context, SortNode sortNode,
+                                     PhysicalExec child) throws IOException {
+    Enforcer enforcer = context.getEnforcer();
+    EnforceProperty property = getAlgorithmEnforceProperty(enforcer, sortNode);
+    if (property != null) {
+      SortEnforce.SortAlgorithm algorithm = property.getSort().getAlgorithm();
+      if (algorithm == SortEnforce.SortAlgorithm.IN_MEMORY_SORT) {
+        return new MemSortExec(context, sortNode, child);
       } else {
-        SortSpec[] specs = new SortSpec[grpColumns.length];
-        for (int i = 0; i < grpColumns.length; i++) {
-          specs[i] = new SortSpec(grpColumns[i], true, false);
-        }
-        SortNode sortNode = new SortNode(UNGENERATED_PID, specs);
-        sortNode.setInSchema(subOp.getSchema());
-        sortNode.setOutSchema(subOp.getSchema());
-        // SortExec sortExec = new SortExec(sortNode, child);
-        ExternalSortExec sortExec = new ExternalSortExec(ctx, sm, sortNode,
-            subOp);
-        LOG.info("The planner chooses [Sort Aggregation]");
-        return new SortAggregateExec(ctx, groupbyNode, sortExec);
+        return new ExternalSortExec(context, sm, sortNode, child);
       }
     }
+
+    return createBestSortPlan(context, sortNode, child);
   }
 
-  public PhysicalExec createSortPlan(TaskAttemptContext ctx, SortNode sortNode,
-                                     PhysicalExec subOp) throws IOException {
-    return new ExternalSortExec(ctx, sm, sortNode, subOp);
+  public SortExec createBestSortPlan(TaskAttemptContext context, SortNode sortNode,
+                                     PhysicalExec child) throws IOException {
+    String [] outerLineage = PlannerUtil.getLineage(sortNode.getChild());
+    long estimatedSize = estimateSizeRecursive(context, outerLineage);
+    final long threshold = 1048576 * 2000;
+
+    // if the relation size is less than the reshold,
+    // the in-memory sort will be used.
+    if (estimatedSize <= threshold) {
+      return new MemSortExec(context, sortNode, child);
+    } else {
+      return new ExternalSortExec(context, sm, sortNode, child);
+    }
   }
 
   public PhysicalExec createIndexScanExec(TaskAttemptContext ctx,
@@ -335,4 +475,38 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
         annotation.getDatum());
 
   }
+
+  private EnforceProperty getAlgorithmEnforceProperty(Enforcer enforcer, LogicalNode node) {
+    if (enforcer == null) {
+      return null;
+    }
+
+    EnforceType type;
+    if (node.getType() == NodeType.JOIN) {
+      type = EnforceType.JOIN;
+    } else if (node.getType() == NodeType.GROUP_BY) {
+      type = EnforceType.GROUP_BY;
+    } else if (node.getType() == NodeType.SORT) {
+      type = EnforceType.SORT;
+    } else {
+      return null;
+    }
+
+    if (enforcer.hasEnforceProperty(type)) {
+      List<EnforceProperty> properties = enforcer.getEnforceProperties(type);
+      EnforceProperty found = null;
+      for (EnforceProperty property : properties) {
+        if (type == EnforceType.JOIN && property.getJoin().getPid() == node.getPID()) {
+          found = property;
+        } else if (type == EnforceType.GROUP_BY && property.getGroupby().getPid() == node.getPID()) {
+          found = property;
+        } else if (type == EnforceType.SORT && property.getSort().getPid() == node.getPID()) {
+          found = property;
+        }
+      }
+      return found;
+    } else {
+      return null;
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/17287ef5/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlanningException.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlanningException.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlanningException.java
new file mode 100644
index 0000000..1b0a7c3
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlanningException.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner;
+
+import java.io.IOException;
+
+public class PhysicalPlanningException extends IOException {
+  public PhysicalPlanningException(String message) {
+    super(message);
+  }
+
+  public PhysicalPlanningException(Throwable t) {
+    super(t);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/17287ef5/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java
index 7b39d26..4907c40 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java
@@ -224,7 +224,7 @@ public class PlannerUtil {
    * @param type to find
    * @return the parent node of a found logical node
    */
-  public static LogicalNode findTopParentNode(LogicalNode node, NodeType type) {
+  public static <T extends LogicalNode> T findTopParentNode(LogicalNode node, NodeType type) {
     Preconditions.checkNotNull(node);
     Preconditions.checkNotNull(type);
     
@@ -234,7 +234,7 @@ public class PlannerUtil {
     if (finder.getFoundNodes().size() == 0) {
       return null;
     }
-    return finder.getFoundNodes().get(0);
+    return (T) finder.getFoundNodes().get(0);
   }
 
   public static boolean canBeEvaluated(EvalNode eval, LogicalNode node) {

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/17287ef5/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/enforce/Enforcer.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/enforce/Enforcer.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/enforce/Enforcer.java
new file mode 100644
index 0000000..a268a39
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/enforce/Enforcer.java
@@ -0,0 +1,166 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.enforce;
+
+
+import org.apache.tajo.catalog.SortSpec;
+import org.apache.tajo.common.ProtoObject;
+import org.apache.tajo.util.TUtil;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.tajo.ipc.TajoWorkerProtocol.*;
+import static org.apache.tajo.ipc.TajoWorkerProtocol.EnforceProperty.EnforceType;
+
+public class Enforcer implements ProtoObject<EnforcerProto> {
+  Map<EnforceType, List<EnforceProperty>> properties;
+  private EnforcerProto proto;
+
+  @SuppressWarnings("unused")
+  public Enforcer() {
+    properties = TUtil.newHashMap();
+  }
+
+  public Enforcer(EnforcerProto proto) {
+    this.proto = proto;
+  }
+
+  private EnforceProperty.Builder newProperty() {
+    return EnforceProperty.newBuilder();
+  }
+
+  private void initProperties() {
+    if (properties == null) {
+      properties = TUtil.newHashMap();
+      for (EnforceProperty property : proto.getPropertiesList()) {
+        TUtil.putToNestedList(properties, property.getType(), property);
+      }
+    }
+  }
+
+  public boolean hasEnforceProperty(EnforceType type) {
+    initProperties();
+    return properties.containsKey(type);
+  }
+
+  public List<EnforceProperty> getEnforceProperties(EnforceType type) {
+    initProperties();
+    return properties.get(type);
+  }
+
+  public void addSortedInput(String tableName, SortSpec[] sortSpecs) {
+    EnforceProperty.Builder builder = newProperty();
+    SortedInputEnforce.Builder enforce = SortedInputEnforce.newBuilder();
+    enforce.setTableName(tableName);
+    for (SortSpec sortSpec : sortSpecs) {
+      enforce.addSortSpecs(sortSpec.getProto());
+    }
+
+    builder.setType(EnforceType.SORTED_INPUT);
+    builder.setSortedInput(enforce.build());
+    TUtil.putToNestedList(properties, builder.getType(), builder.build());
+  }
+
+  public void addOutputDistinct() {
+    EnforceProperty.Builder builder = newProperty();
+    OutputDistinctEnforce.Builder enforce = OutputDistinctEnforce.newBuilder();
+
+    builder.setType(EnforceType.OUTPUT_DISTINCT);
+    builder.setOutputDistinct(enforce.build());
+    TUtil.putToNestedList(properties, builder.getType(), builder.build());
+  }
+
+  public void addJoin(int pid, JoinEnforce.JoinAlgorithm algorithm) {
+    EnforceProperty.Builder builder = newProperty();
+    JoinEnforce.Builder enforce = JoinEnforce.newBuilder();
+    enforce.setPid(pid);
+    enforce.setAlgorithm(algorithm);
+
+    builder.setType(EnforceType.JOIN);
+    builder.setJoin(enforce.build());
+    TUtil.putToNestedList(properties, builder.getType(), builder.build());
+  }
+
+  public void addGroupby(int pid, GroupbyEnforce.GroupbyAlgorithm algorithm) {
+    EnforceProperty.Builder builder = newProperty();
+    GroupbyEnforce.Builder enforce = GroupbyEnforce.newBuilder();
+    enforce.setPid(pid);
+    enforce.setAlgorithm(algorithm);
+
+    builder.setType(EnforceType.GROUP_BY);
+    builder.setGroupby(enforce.build());
+    TUtil.putToNestedList(properties, builder.getType(), builder.build());
+  }
+
+  public void addSort(int pid, SortEnforce.SortAlgorithm algorithm) {
+    EnforceProperty.Builder builder = newProperty();
+    SortEnforce.Builder enforce = SortEnforce.newBuilder();
+    enforce.setPid(pid);
+    enforce.setAlgorithm(algorithm);
+
+    builder.setType(EnforceType.SORT);
+    builder.setSort(enforce.build());
+    TUtil.putToNestedList(properties, builder.getType(), builder.build());
+  }
+
+  public void addBroadcast(String tableName) {
+    EnforceProperty.Builder builder = newProperty();
+    BroadcastEnforce.Builder enforce = BroadcastEnforce.newBuilder();
+    enforce.setTableName(tableName);
+
+    builder.setType(EnforceType.BROADCAST);
+    builder.setBroadcast(enforce);
+    TUtil.putToNestedList(properties, builder.getType(), builder.build());
+  }
+
+  public Collection<EnforceProperty> getProperties() {
+    if (proto != null) {
+      return proto.getPropertiesList();
+    } else {
+      List<EnforceProperty> list = TUtil.newList();
+      for (List<EnforceProperty> propertyList : properties.values()) {
+        list.addAll(propertyList);
+      }
+      return list;
+    }
+  }
+
+  public String toString() {
+    StringBuilder sb = new StringBuilder("Enforce ").append(properties.size()).append(" properties: ");
+    boolean first = true;
+    for (EnforceType enforceType : properties.keySet()) {
+      if (first) {
+        first = false;
+      } else {
+        sb.append(", ");
+      }
+      sb.append(enforceType);
+    }
+    return sb.toString();
+  }
+
+  @Override
+  public EnforcerProto getProto() {
+    EnforcerProto.Builder builder = EnforcerProto.newBuilder();
+    builder.addAllProperties(getProperties());
+    return builder.build();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/17287ef5/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/BNLJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/BNLJoinExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/BNLJoinExec.java
index 694602b..ba01b52 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/BNLJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/BNLJoinExec.java
@@ -35,6 +35,7 @@ import java.util.List;
 
 public class BNLJoinExec extends BinaryPhysicalExec {
   // from logical plan
+  private JoinNode plan;
   private EvalNode joinQual;
   private EvalContext qualCtx;
 
@@ -57,11 +58,12 @@ public class BNLJoinExec extends BinaryPhysicalExec {
   // projection
   private final int[] targetIds;
 
-  public BNLJoinExec(final TaskAttemptContext context, final JoinNode join,
+  public BNLJoinExec(final TaskAttemptContext context, final JoinNode plan,
                      final PhysicalExec outer, PhysicalExec inner) {
     super(context, SchemaUtil.merge(outer.getSchema(), inner.getSchema()),
         SchemaUtil.merge(outer.getSchema(), inner.getSchema()), outer, inner);
-    this.joinQual = join.getJoinQual();
+    this.plan = plan;
+    this.joinQual = plan.getJoinQual();
     if (joinQual != null) { // if join type is not 'cross join'
       this.qualCtx = this.joinQual.newContext();
     }
@@ -80,6 +82,10 @@ public class BNLJoinExec extends BinaryPhysicalExec {
     outputTuple = new VTuple(outSchema.getColumnNum());
   }
 
+  public JoinNode getPlan() {
+    return plan;
+  }
+
   public Tuple next() throws IOException {
 
     if (outerTupleSlots.isEmpty()) {

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/17287ef5/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java
index 6d49880..8fcd527 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java
@@ -47,8 +47,7 @@ public class ExternalSortExec extends SortExec {
   public ExternalSortExec(final TaskAttemptContext context,
       final AbstractStorageManager sm, final SortNode plan, final PhysicalExec child)
       throws IOException {
-    super(context, plan.getInSchema(), plan.getOutSchema(), child,
-        plan.getSortKeys());
+    super(context, plan.getInSchema(), plan.getOutSchema(), child, plan.getSortKeys());
     this.plan = plan;
 
     this.SORT_BUFFER_SIZE = context.getConf().getIntVar(ConfVars.EXT_SORT_BUFFER);

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/17287ef5/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/MemSortExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/MemSortExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/MemSortExec.java
index 1532307..a95e70e 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/MemSortExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/MemSortExec.java
@@ -37,8 +37,7 @@ public class MemSortExec extends SortExec {
   
   public MemSortExec(final TaskAttemptContext context,
                      SortNode plan, PhysicalExec child) {
-    super(context, plan.getInSchema(), plan.getOutSchema(), child,
-        plan.getSortKeys());
+    super(context, plan.getInSchema(), plan.getOutSchema(), child, plan.getSortKeys());
     this.plan = plan;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/17287ef5/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/UnaryPhysicalExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/UnaryPhysicalExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/UnaryPhysicalExec.java
index 4f61a50..9d54cca 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/UnaryPhysicalExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/UnaryPhysicalExec.java
@@ -34,8 +34,8 @@ public abstract class UnaryPhysicalExec extends PhysicalExec {
     this.child = child;
   }
 
-  public PhysicalExec getChild() {
-    return this.child;
+  public <T extends PhysicalExec> T getChild() {
+    return (T) this.child;
   }
 
   @VisibleForTesting

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/17287ef5/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/query/QueryUnitRequestImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/query/QueryUnitRequestImpl.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/query/QueryUnitRequestImpl.java
index 0676277..0b08ccf 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/query/QueryUnitRequestImpl.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/query/QueryUnitRequestImpl.java
@@ -20,6 +20,7 @@ package org.apache.tajo.engine.query;
 
 import org.apache.tajo.DataChannel;
 import org.apache.tajo.QueryUnitAttemptId;
+import org.apache.tajo.engine.planner.enforce.Enforcer;
 import org.apache.tajo.ipc.TajoWorkerProtocol.Fetch;
 import org.apache.tajo.ipc.TajoWorkerProtocol.QueryUnitRequestProto;
 import org.apache.tajo.ipc.TajoWorkerProtocol.QueryUnitRequestProtoOrBuilder;
@@ -44,6 +45,7 @@ public class QueryUnitRequestImpl implements QueryUnitRequest {
   private Boolean shouldDie;
   private QueryContext queryContext;
   private DataChannel dataChannel;
+  private Enforcer enforcer;
 	
 	private QueryUnitRequestProto proto = QueryUnitRequestProto.getDefaultInstance();
 	private QueryUnitRequestProto.Builder builder = null;
@@ -57,9 +59,9 @@ public class QueryUnitRequestImpl implements QueryUnitRequest {
 	
 	public QueryUnitRequestImpl(QueryUnitAttemptId id, List<Fragment> fragments,
 			String outputTable, boolean clusteredOutput,
-			String serializedData, QueryContext queryContext, DataChannel channel) {
+			String serializedData, QueryContext queryContext, DataChannel channel, Enforcer enforcer) {
 		this();
-		this.set(id, fragments, outputTable, clusteredOutput, serializedData, queryContext, channel);
+		this.set(id, fragments, outputTable, clusteredOutput, serializedData, queryContext, channel, enforcer);
 	}
 	
 	public QueryUnitRequestImpl(QueryUnitRequestProto proto) {
@@ -71,7 +73,7 @@ public class QueryUnitRequestImpl implements QueryUnitRequest {
 	
 	public void set(QueryUnitAttemptId id, List<Fragment> fragments,
 			String outputTable, boolean clusteredOutput,
-			String serializedData, QueryContext queryContext, DataChannel dataChannel) {
+			String serializedData, QueryContext queryContext, DataChannel dataChannel, Enforcer enforcer) {
 		this.id = id;
 		this.fragments = fragments;
 		this.outputTable = outputTable;
@@ -81,6 +83,7 @@ public class QueryUnitRequestImpl implements QueryUnitRequest {
     this.queryContext = queryContext;
     this.queryContext = queryContext;
     this.dataChannel = dataChannel;
+    this.enforcer = enforcer;
 	}
 
 	@Override
@@ -214,14 +217,27 @@ public class QueryUnitRequestImpl implements QueryUnitRequest {
     if (dataChannel != null) {
       return dataChannel;
     }
-    if (!p.hasQueryContext()) {
+    if (!p.hasDataChannel()) {
       return null;
     }
     this.dataChannel = new DataChannel(p.getDataChannel());
     return this.dataChannel;
   }
-	
-	public List<Fetch> getFetches() {
+
+  @Override
+  public Enforcer getEnforcer() {
+    QueryUnitRequestProtoOrBuilder p = viaProto ? proto : builder;
+    if (enforcer != null) {
+      return enforcer;
+    }
+    if (!p.hasEnforcer()) {
+      return null;
+    }
+    this.enforcer = new Enforcer(p.getEnforcer());
+    return this.enforcer;
+  }
+
+  public List<Fetch> getFetches() {
 	  initFetches();    
 
     return this.fetches;
@@ -297,6 +313,9 @@ public class QueryUnitRequestImpl implements QueryUnitRequest {
     if (this.dataChannel != null) {
       builder.setDataChannel(dataChannel.getProto());
     }
+    if (this.enforcer != null) {
+      builder.setEnforcer(enforcer.getProto());
+    }
 	}
 
 	private void mergeLocalToProto() {

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/17287ef5/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/ipc/protocolrecords/QueryUnitRequest.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/ipc/protocolrecords/QueryUnitRequest.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/ipc/protocolrecords/QueryUnitRequest.java
index a9f3706..971f13a 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/ipc/protocolrecords/QueryUnitRequest.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/ipc/protocolrecords/QueryUnitRequest.java
@@ -24,6 +24,7 @@ package org.apache.tajo.ipc.protocolrecords;
 import org.apache.tajo.DataChannel;
 import org.apache.tajo.QueryUnitAttemptId;
 import org.apache.tajo.common.ProtoObject;
+import org.apache.tajo.engine.planner.enforce.Enforcer;
 import org.apache.tajo.ipc.TajoWorkerProtocol;
 import org.apache.tajo.master.QueryContext;
 import org.apache.tajo.storage.Fragment;
@@ -46,4 +47,5 @@ public interface QueryUnitRequest extends ProtoObject<TajoWorkerProtocol.QueryUn
   public void setShouldDie();
   public QueryContext getQueryContext();
   public DataChannel getDataChannel();
+  public Enforcer getEnforcer();
 }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/17287ef5/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/ExecutionBlock.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/ExecutionBlock.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/ExecutionBlock.java
index 5637d58..3c03e50 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/ExecutionBlock.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/ExecutionBlock.java
@@ -16,6 +16,7 @@ package org.apache.tajo.master;
 
 import org.apache.tajo.ExecutionBlockId;
 import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.engine.planner.enforce.Enforcer;
 import org.apache.tajo.engine.planner.logical.*;
 
 import java.util.*;
@@ -37,6 +38,7 @@ public class ExecutionBlock {
   private ExecutionBlock parent;
   private Map<ScanNode, ExecutionBlock> childSubQueries = new HashMap<ScanNode, ExecutionBlock>();
   private PartitionType outputType;
+  private Enforcer enforcer = new Enforcer();
 
   private boolean hasJoinPlan;
   private boolean hasUnionPlan;
@@ -51,10 +53,6 @@ public class ExecutionBlock {
     return executionBlockId;
   }
 
-  public void setPartitionType(PartitionType partitionType) {
-    this.outputType = partitionType;
-  }
-
   public PartitionType getPartitionType() {
     return outputType;
   }
@@ -96,6 +94,10 @@ public class ExecutionBlock {
     return plan;
   }
 
+  public Enforcer getEnforcer() {
+    return enforcer;
+  }
+
   public boolean isRoot() {
     return !hasParentBlock() || !(getParentBlock().hasParentBlock()) && getParentBlock().hasUnion();
   }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/17287ef5/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TaskSchedulerImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TaskSchedulerImpl.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TaskSchedulerImpl.java
index 24eea42..cdbd803 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TaskSchedulerImpl.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TaskSchedulerImpl.java
@@ -466,7 +466,7 @@ public class TaskSchedulerImpl extends AbstractService
               false,
               task.getLogicalPlan().toJson(),
               context.getQueryContext(),
-              subQuery.getDataChannel());
+              subQuery.getDataChannel(), subQuery.getBlock().getEnforcer());
           if (!subQuery.getBlock().isRoot()) {
             taskAssign.setInterQuery();
           }
@@ -512,7 +512,8 @@ public class TaskSchedulerImpl extends AbstractService
               false,
               task.getLogicalPlan().toJson(),
               context.getQueryContext(),
-              subQuery.getDataChannel());
+              subQuery.getDataChannel(),
+              subQuery.getBlock().getEnforcer());
           if (!subQuery.getBlock().isRoot()) {
             taskAssign.setInterQuery();
           }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/17287ef5/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Task.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Task.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Task.java
index 63d8f04..0a45cfb 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Task.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Task.java
@@ -151,6 +151,7 @@ public class Task {
         request.getFragments().toArray(new Fragment[request.getFragments().size()]),
         taskDir);
     this.context.setDataChannel(request.getDataChannel());
+    this.context.setEnforcer(request.getEnforcer());
 
     plan = CoreGsonHelper.fromJson(request.getSerializedData(), LogicalNode.class);
     interQuery = request.getProto().getInterQuery();

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/17287ef5/tajo-core/tajo-core-backend/src/main/proto/TajoWorkerProtocol.proto
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/proto/TajoWorkerProtocol.proto b/tajo-core/tajo-core-backend/src/main/proto/TajoWorkerProtocol.proto
index 1726c7b..e70f780 100644
--- a/tajo-core/tajo-core-backend/src/main/proto/TajoWorkerProtocol.proto
+++ b/tajo-core/tajo-core-backend/src/main/proto/TajoWorkerProtocol.proto
@@ -61,6 +61,7 @@ message QueryUnitRequestProto {
     optional bool shouldDie = 8;
     optional KeyValueSetProto queryContext = 9;
     optional DataChannelProto dataChannel = 10;
+    optional EnforcerProto enforcer = 11;
 }
 
 message Fetch {
@@ -139,7 +140,6 @@ message DataChannelProto {
 
   repeated ColumnProto partitionKey = 7;
   optional int32 partitionNum = 8 [default = 1];
-  repeated SortSpecProto sortSpecs = 9;
 
   optional StoreType storeType = 10 [default = CSV];
 }
@@ -166,4 +166,75 @@ service TajoWorkerProtocolService {
 
   //from QueryMaster(Worker)
   rpc executeExecutionBlock(RunExecutionBlockRequestProto) returns (BoolProto);
+}
+
+message EnforceProperty {
+  enum EnforceType {
+    SORTED_INPUT = 0;
+    OUTPUT_DISTINCT = 1;
+    GROUP_BY = 2;
+    JOIN = 3;
+    SORT = 4;
+    BROADCAST = 5;
+  }
+
+  // Identifies which field is filled in.
+  required EnforceType type = 1;
+
+  // One of the following will be filled in.
+  optional SortedInputEnforce sortedInput = 2;
+  optional OutputDistinctEnforce outputDistinct = 3;
+  optional GroupbyEnforce groupby = 4;
+  optional JoinEnforce join = 5;
+  optional SortEnforce sort = 6;
+  optional BroadcastEnforce broadcast = 7;
+}
+
+message SortedInputEnforce {
+  required string tableName = 1;
+  repeated SortSpecProto sortSpecs = 2;
+}
+
+message OutputDistinctEnforce {
+}
+
+message JoinEnforce {
+  enum JoinAlgorithm {
+    NESTED_LOOP_JOIN = 0;
+    BLOCK_NESTED_LOOP_JOIN = 1;
+    IN_MEMORY_HASH_JOIN = 2;
+    HYBRID_HASH_JOIN = 3;
+    MERGE_JOIN = 4;
+  }
+
+  required int32 pid = 1;
+  required JoinAlgorithm algorithm = 2;
+}
+
+message GroupbyEnforce {
+  enum GroupbyAlgorithm {
+    HASH_AGGREGATION = 0;
+    SORT_AGGREGATION = 1;
+  }
+
+  required int32 pid = 1;
+  required GroupbyAlgorithm algorithm = 2;
+}
+
+message SortEnforce {
+  enum SortAlgorithm {
+    IN_MEMORY_SORT = 0;
+    MERGE_SORT = 1;
+  }
+
+  required int32 pid = 1;
+  required SortAlgorithm algorithm = 2;
+}
+
+message BroadcastEnforce {
+  required string tableName = 1;
+}
+
+message EnforcerProto {
+  repeated EnforceProperty properties = 1;
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/17287ef5/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestBNLJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestBNLJoinExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestBNLJoinExec.java
index 118f352..425418f 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestBNLJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestBNLJoinExec.java
@@ -30,12 +30,12 @@ import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.datum.Datum;
 import org.apache.tajo.datum.DatumFactory;
 import org.apache.tajo.engine.parser.SQLAnalyzer;
-import org.apache.tajo.engine.planner.LogicalPlanner;
-import org.apache.tajo.engine.planner.PhysicalPlanner;
-import org.apache.tajo.engine.planner.PhysicalPlannerImpl;
-import org.apache.tajo.engine.planner.PlanningException;
+import org.apache.tajo.engine.planner.*;
+import org.apache.tajo.engine.planner.enforce.Enforcer;
 import org.apache.tajo.engine.planner.logical.JoinNode;
 import org.apache.tajo.engine.planner.logical.LogicalNode;
+import org.apache.tajo.engine.planner.logical.NodeType;
+import org.apache.tajo.ipc.TajoWorkerProtocol;
 import org.apache.tajo.storage.*;
 import org.apache.tajo.util.CommonTestingUtil;
 import org.apache.tajo.util.TUtil;
@@ -45,6 +45,7 @@ import org.junit.Test;
 
 import java.io.IOException;
 
+import static org.apache.tajo.ipc.TajoWorkerProtocol.JoinEnforce.JoinAlgorithm;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
@@ -134,29 +135,27 @@ public class TestBNLJoinExec {
 
   @Test
   public final void testBNLCrossJoin() throws IOException, PlanningException {
+    Expr expr = analyzer.parse(QUERIES[0]);
+    LogicalNode plan = planner.createPlan(expr).getRootBlock().getRoot();
+    JoinNode joinNode = PlannerUtil.findTopNode(plan, NodeType.JOIN);
+    Enforcer enforcer = new Enforcer();
+    enforcer.addJoin(joinNode.getPID(), JoinAlgorithm.BLOCK_NESTED_LOOP_JOIN);
+
     Fragment[] empFrags = StorageManager.splitNG(conf, "e", employee.getMeta(), employee.getPath(),
         Integer.MAX_VALUE);
     Fragment[] peopleFrags = StorageManager.splitNG(conf, "p", people.getMeta(), people.getPath(),
         Integer.MAX_VALUE);
-
     Fragment[] merged = TUtil.concat(empFrags, peopleFrags);
-
     Path workDir = CommonTestingUtil.getTestDir("target/test-data/testBNLCrossJoin");
     TaskAttemptContext ctx = new TaskAttemptContext(conf,
         LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
-    Expr expr = analyzer.parse(QUERIES[0]);
-    LogicalNode plan = planner.createPlan(expr).getRootBlock().getRoot();
+    ctx.setEnforcer(enforcer);
 
     PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm);
     PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
 
     ProjectionExec proj = (ProjectionExec) exec;
-    NLJoinExec nlJoin = (NLJoinExec) proj.getChild();
-    SeqScanExec scanOuter = (SeqScanExec) nlJoin.getLeftChild();
-    SeqScanExec scanInner = (SeqScanExec) nlJoin.getRightChild();
-
-    BNLJoinExec bnl = new BNLJoinExec(ctx, nlJoin.getPlan(), scanOuter, scanInner);
-    proj.setChild(bnl);
+    assertTrue(proj.getChild() instanceof BNLJoinExec);
 
     int i = 0;
     exec.init();
@@ -169,45 +168,31 @@ public class TestBNLJoinExec {
 
   @Test
   public final void testBNLInnerJoin() throws IOException, PlanningException {
+    Expr context = analyzer.parse(QUERIES[1]);
+    LogicalNode plan = planner.createPlan(context).getRootBlock().getRoot();
+
     Fragment[] empFrags = StorageManager.splitNG(conf, "e", employee.getMeta(), employee.getPath(),
         Integer.MAX_VALUE);
     Fragment[] peopleFrags = StorageManager.splitNG(conf, "p", people.getMeta(), people.getPath(),
         Integer.MAX_VALUE);
-
     Fragment[] merged = TUtil.concat(empFrags, peopleFrags);
 
+
+    JoinNode joinNode = PlannerUtil.findTopNode(plan, NodeType.JOIN);
+    Enforcer enforcer = new Enforcer();
+    enforcer.addJoin(joinNode.getPID(), JoinAlgorithm.BLOCK_NESTED_LOOP_JOIN);
+
     Path workDir = CommonTestingUtil.getTestDir("target/test-data/testBNLInnerJoin");
-    TaskAttemptContext ctx =
-        new TaskAttemptContext(conf, LocalTajoTestingUtility.newQueryUnitAttemptId(),
-            merged, workDir);
-    Expr context = analyzer.parse(QUERIES[1]);
-    LogicalNode plan = planner.createPlan(context).getRootBlock().getRoot();
+    TaskAttemptContext ctx = new TaskAttemptContext(conf, LocalTajoTestingUtility.newQueryUnitAttemptId(),
+        merged, workDir);
+    ctx.setEnforcer(enforcer);
+
 
     PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm);
     PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
 
-    SeqScanExec scanOuter = null;
-    SeqScanExec scanInner = null;
-
     ProjectionExec proj = (ProjectionExec) exec;
-    JoinNode joinNode = null;
-    if (proj.getChild() instanceof MergeJoinExec) {
-      MergeJoinExec join = (MergeJoinExec) proj.getChild();
-      ExternalSortExec sortOut = (ExternalSortExec) join.getLeftChild();
-      ExternalSortExec sortIn = (ExternalSortExec) join.getRightChild();
-      scanOuter = (SeqScanExec) sortOut.getChild();
-      scanInner = (SeqScanExec) sortIn.getChild();
-      joinNode = join.getPlan();
-    } else if (proj.getChild() instanceof HashJoinExec) {
-      HashJoinExec join = (HashJoinExec) proj.getChild();
-      scanOuter = (SeqScanExec) join.getLeftChild();
-      scanInner = (SeqScanExec) join.getRightChild();
-      joinNode = join.getPlan();
-    }
-
-    BNLJoinExec bnl = new BNLJoinExec(ctx, joinNode, scanOuter,
-        scanInner);
-    proj.setChild(bnl);
+    assertTrue(proj.getChild() instanceof BNLJoinExec);
 
     Tuple tuple;
     int i = 1;

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/17287ef5/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestExternalSortExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestExternalSortExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestExternalSortExec.java
index 864c776..3ba7fec 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestExternalSortExec.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestExternalSortExec.java
@@ -31,6 +31,7 @@ import org.apache.tajo.datum.Datum;
 import org.apache.tajo.datum.DatumFactory;
 import org.apache.tajo.engine.parser.SQLAnalyzer;
 import org.apache.tajo.engine.planner.*;
+import org.apache.tajo.engine.planner.enforce.Enforcer;
 import org.apache.tajo.engine.planner.logical.LogicalNode;
 import org.apache.tajo.storage.*;
 import org.apache.tajo.util.CommonTestingUtil;
@@ -113,6 +114,7 @@ public class TestExternalSortExec {
     Path workDir = new Path(testDir, TestExternalSortExec.class.getName());
     TaskAttemptContext ctx = new TaskAttemptContext(conf,
         LocalTajoTestingUtility.newQueryUnitAttemptId(), new Fragment[] { frags[0] }, workDir);
+    ctx.setEnforcer(new Enforcer());
     Expr expr = analyzer.parse(QUERIES[0]);
     LogicalPlan plan = planner.createPlan(expr);
     LogicalNode rootNode = plan.getRootBlock().getRoot();

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/17287ef5/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashAntiJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashAntiJoinExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashAntiJoinExec.java
index 4b5a422..47d6a94 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashAntiJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashAntiJoinExec.java
@@ -31,6 +31,7 @@ import org.apache.tajo.datum.Datum;
 import org.apache.tajo.datum.DatumFactory;
 import org.apache.tajo.engine.parser.SQLAnalyzer;
 import org.apache.tajo.engine.planner.*;
+import org.apache.tajo.engine.planner.enforce.Enforcer;
 import org.apache.tajo.engine.planner.logical.LogicalNode;
 import org.apache.tajo.storage.*;
 import org.apache.tajo.util.CommonTestingUtil;
@@ -149,6 +150,7 @@ public class TestHashAntiJoinExec {
     Path workDir = CommonTestingUtil.getTestDir("target/test-data/testHashAntiJoin");
     TaskAttemptContext ctx = new TaskAttemptContext(conf,
         LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
+    ctx.setEnforcer(new Enforcer());
     Expr expr = analyzer.parse(QUERIES[0]);
     LogicalPlan plan = planner.createPlan(expr);
     optimizer.optimize(plan);

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/17287ef5/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashJoinExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashJoinExec.java
index 913c60e..3466d17 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashJoinExec.java
@@ -30,11 +30,12 @@ import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.datum.Datum;
 import org.apache.tajo.datum.DatumFactory;
 import org.apache.tajo.engine.parser.SQLAnalyzer;
-import org.apache.tajo.engine.planner.LogicalPlanner;
-import org.apache.tajo.engine.planner.PhysicalPlanner;
-import org.apache.tajo.engine.planner.PhysicalPlannerImpl;
-import org.apache.tajo.engine.planner.PlanningException;
+import org.apache.tajo.engine.planner.*;
+import org.apache.tajo.engine.planner.enforce.Enforcer;
+import org.apache.tajo.engine.planner.logical.JoinNode;
 import org.apache.tajo.engine.planner.logical.LogicalNode;
+import org.apache.tajo.engine.planner.logical.NodeType;
+import org.apache.tajo.ipc.TajoWorkerProtocol;
 import org.apache.tajo.storage.*;
 import org.apache.tajo.util.CommonTestingUtil;
 import org.apache.tajo.util.TUtil;
@@ -44,6 +45,7 @@ import org.junit.Test;
 
 import java.io.IOException;
 
+import static org.apache.tajo.ipc.TajoWorkerProtocol.JoinEnforce.JoinAlgorithm;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
@@ -132,35 +134,27 @@ public class TestHashJoinExec {
 
   @Test
   public final void testHashInnerJoin() throws IOException, PlanningException {
-    Fragment[] empFrags = StorageManager.splitNG(conf, "e", employee.getMeta(), employee.getPath(),
-        Integer.MAX_VALUE);
-    Fragment[] peopleFrags = StorageManager.splitNG(conf, "p", people.getMeta(), people.getPath(),
-        Integer.MAX_VALUE);
+    Expr expr = analyzer.parse(QUERIES[0]);
+    LogicalNode plan = planner.createPlan(expr).getRootBlock().getRoot();
 
+    JoinNode joinNode = PlannerUtil.findTopNode(plan, NodeType.JOIN);
+    Enforcer enforcer = new Enforcer();
+    enforcer.addJoin(joinNode.getPID(), JoinAlgorithm.IN_MEMORY_HASH_JOIN);
+
+    Fragment[] empFrags = StorageManager.splitNG(conf, "e", employee.getMeta(), employee.getPath(), Integer.MAX_VALUE);
+    Fragment[] peopleFrags = StorageManager.splitNG(conf, "p", people.getMeta(), people.getPath(), Integer.MAX_VALUE);
     Fragment[] merged = TUtil.concat(empFrags, peopleFrags);
 
     Path workDir = CommonTestingUtil.getTestDir("target/test-data/testHashInnerJoin");
     TaskAttemptContext ctx = new TaskAttemptContext(conf,
         LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
-    Expr expr = analyzer.parse(QUERIES[0]);
-    LogicalNode plan = planner.createPlan(expr).getRootBlock().getRoot();
+    ctx.setEnforcer(enforcer);
 
     PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf, sm);
     PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
 
     ProjectionExec proj = (ProjectionExec) exec;
-    if (proj.getChild() instanceof MergeJoinExec) {
-      MergeJoinExec join = (MergeJoinExec) proj.getChild();
-      ExternalSortExec sortout = (ExternalSortExec) join.getLeftChild();
-      ExternalSortExec sortin = (ExternalSortExec) join.getRightChild();
-      SeqScanExec scanout = (SeqScanExec) sortout.getChild();
-      SeqScanExec scanin = (SeqScanExec) sortin.getChild();
-
-      HashJoinExec hashjoin = new HashJoinExec(ctx, join.getPlan(), scanout, scanin);
-      proj.setChild(hashjoin);
-
-      exec = proj;
-    }
+    assertTrue(proj.getChild() instanceof HashJoinExec);
 
     Tuple tuple;
     int count = 0;

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/17287ef5/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashSemiJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashSemiJoinExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashSemiJoinExec.java
index 377be20..1db8300 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashSemiJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashSemiJoinExec.java
@@ -31,6 +31,7 @@ import org.apache.tajo.datum.Datum;
 import org.apache.tajo.datum.DatumFactory;
 import org.apache.tajo.engine.parser.SQLAnalyzer;
 import org.apache.tajo.engine.planner.*;
+import org.apache.tajo.engine.planner.enforce.Enforcer;
 import org.apache.tajo.engine.planner.logical.LogicalNode;
 import org.apache.tajo.storage.*;
 import org.apache.tajo.util.CommonTestingUtil;
@@ -153,6 +154,7 @@ public class TestHashSemiJoinExec {
     Path workDir = CommonTestingUtil.getTestDir("target/test-data/testHashSemiJoin");
     TaskAttemptContext ctx = new TaskAttemptContext(conf,
         LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
+    ctx.setEnforcer(new Enforcer());
     Expr expr = analyzer.parse(QUERIES[0]);
     LogicalPlan plan = planner.createPlan(expr);
     optimizer.optimize(plan);

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/17287ef5/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestMergeJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestMergeJoinExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestMergeJoinExec.java
index 141fbb7..cae4853 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestMergeJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestMergeJoinExec.java
@@ -32,8 +32,12 @@ import org.apache.tajo.datum.Datum;
 import org.apache.tajo.datum.DatumFactory;
 import org.apache.tajo.engine.parser.SQLAnalyzer;
 import org.apache.tajo.engine.planner.*;
+import org.apache.tajo.engine.planner.enforce.Enforcer;
+import org.apache.tajo.engine.planner.logical.JoinNode;
 import org.apache.tajo.engine.planner.logical.LogicalNode;
+import org.apache.tajo.engine.planner.logical.NodeType;
 import org.apache.tajo.engine.planner.logical.SortNode;
+import org.apache.tajo.ipc.TajoWorkerProtocol;
 import org.apache.tajo.storage.*;
 import org.apache.tajo.util.CommonTestingUtil;
 import org.apache.tajo.util.TUtil;
@@ -43,6 +47,7 @@ import org.junit.Test;
 
 import java.io.IOException;
 
+import static org.apache.tajo.ipc.TajoWorkerProtocol.JoinEnforce.JoinAlgorithm;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
@@ -145,71 +150,27 @@ public class TestMergeJoinExec {
 
   @Test
   public final void testMergeInnerJoin() throws IOException, PlanningException {
-    Fragment[] empFrags = sm.splitNG(conf, "e", employee.getMeta(), employee.getPath(),
-        Integer.MAX_VALUE);
-    Fragment[] peopleFrags = sm.splitNG(conf, "p", people.getMeta(), people.getPath(),
-        Integer.MAX_VALUE);
+    Expr expr = analyzer.parse(QUERIES[0]);
+    LogicalPlan plan = planner.createPlan(expr);
+    LogicalNode root = plan.getRootBlock().getRoot();
 
+    JoinNode joinNode = PlannerUtil.findTopNode(root, NodeType.JOIN);
+    Enforcer enforcer = new Enforcer();
+    enforcer.addJoin(joinNode.getPID(), JoinAlgorithm.MERGE_JOIN);;
+
+    Fragment[] empFrags = sm.splitNG(conf, "e", employee.getMeta(), employee.getPath(), Integer.MAX_VALUE);
+    Fragment[] peopleFrags = sm.splitNG(conf, "p", people.getMeta(), people.getPath(), Integer.MAX_VALUE);
     Fragment[] merged = TUtil.concat(empFrags, peopleFrags);
 
     Path workDir = CommonTestingUtil.getTestDir("target/test-data/testMergeInnerJoin");
     TaskAttemptContext ctx = new TaskAttemptContext(conf,
         LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
-    Expr expr = analyzer.parse(QUERIES[0]);
-    LogicalPlan plan = planner.createPlan(expr);
-    LogicalNode root = plan.getRootBlock().getRoot();
+    ctx.setEnforcer(enforcer);
 
     PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm);
     PhysicalExec exec = phyPlanner.createPlan(ctx, root);
-
     ProjectionExec proj = (ProjectionExec) exec;
-
-    // TODO - should be planed with user's optimization hint
-    if (!(proj.getChild() instanceof MergeJoinExec)) {
-      BinaryPhysicalExec nestedLoopJoin = (BinaryPhysicalExec) proj.getChild();
-      SeqScanExec outerScan = (SeqScanExec) nestedLoopJoin.getLeftChild();
-      SeqScanExec innerScan = (SeqScanExec) nestedLoopJoin.getRightChild();
-
-      SeqScanExec tmp;
-      if (!outerScan.getTableName().equals("employee")) {
-        tmp = outerScan;
-        outerScan = innerScan;
-        innerScan = tmp;
-      }
-
-      SortSpec[] outerSortKeys = new SortSpec[2];
-      SortSpec[] innerSortKeys = new SortSpec[2];
-
-      Schema employeeSchema = catalog.getTableDesc("employee").getMeta()
-          .getSchema();
-      employeeSchema.setQualifier("e", true);
-      outerSortKeys[0] = new SortSpec(
-          employeeSchema.getColumnByName("empId"));
-      outerSortKeys[1] = new SortSpec(
-          employeeSchema.getColumnByName("memId"));
-      SortNode outerSort = new SortNode(plan.newPID(), outerSortKeys);
-      outerSort.setInSchema(outerScan.getSchema());
-      outerSort.setOutSchema(outerScan.getSchema());
-
-      Schema peopleSchema = catalog.getTableDesc("people").getMeta().getSchema();
-      peopleSchema.setQualifier("p", true);
-      innerSortKeys[0] = new SortSpec(
-          peopleSchema.getColumnByName("empId"));
-      innerSortKeys[1] = new SortSpec(
-          peopleSchema.getColumnByName("fk_memid"));
-      SortNode innerSort = new SortNode(plan.newPID(), innerSortKeys);
-      innerSort.setInSchema(innerScan.getSchema());
-      innerSort.setOutSchema(innerScan.getSchema());
-
-      MemSortExec outerSortExec = new MemSortExec(ctx, outerSort, outerScan);
-      MemSortExec innerSortExec = new MemSortExec(ctx, innerSort, innerScan);
-
-      MergeJoinExec mergeJoin = new MergeJoinExec(ctx,
-          ((HashJoinExec)nestedLoopJoin).getPlan(), outerSortExec, innerSortExec,
-          outerSortKeys, innerSortKeys);
-      proj.setChild(mergeJoin);
-      exec = proj;
-    }
+    assertTrue(proj.getChild() instanceof MergeJoinExec);
 
     Tuple tuple;
     int count = 0;

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/17287ef5/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestNLJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestNLJoinExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestNLJoinExec.java
index e79e2f6..5483235 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestNLJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestNLJoinExec.java
@@ -34,6 +34,7 @@ import org.apache.tajo.engine.planner.LogicalPlanner;
 import org.apache.tajo.engine.planner.PhysicalPlanner;
 import org.apache.tajo.engine.planner.PhysicalPlannerImpl;
 import org.apache.tajo.engine.planner.PlanningException;
+import org.apache.tajo.engine.planner.enforce.Enforcer;
 import org.apache.tajo.engine.planner.global.MasterPlan;
 import org.apache.tajo.engine.planner.logical.LogicalNode;
 import org.apache.tajo.storage.*;
@@ -149,6 +150,7 @@ public class TestNLJoinExec {
     Path workDir = CommonTestingUtil.getTestDir("target/test-data/testNLCrossJoin");
     TaskAttemptContext ctx = new TaskAttemptContext(conf,
         LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
+    ctx.setEnforcer(new Enforcer());
     Expr context = analyzer.parse(QUERIES[0]);
     LogicalNode plan = planner.createPlan(context).getRootBlock().getRoot();
 
@@ -176,6 +178,7 @@ public class TestNLJoinExec {
     Path workDir = CommonTestingUtil.getTestDir("target/test-data/testNLInnerJoin");
     TaskAttemptContext ctx = new TaskAttemptContext(conf,
         LocalTajoTestingUtility.newQueryUnitAttemptId(masterPlan), merged, workDir);
+    ctx.setEnforcer(new Enforcer());
     Expr context =  analyzer.parse(QUERIES[1]);
     LogicalNode plan = planner.createPlan(context).getRootBlock().getRoot();
     //LogicalOptimizer.optimize(ctx, plan);

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/17287ef5/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
index f3aa20a..c5366a1 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
@@ -39,8 +39,10 @@ import org.apache.tajo.engine.eval.EvalNode;
 import org.apache.tajo.engine.eval.EvalTreeUtil;
 import org.apache.tajo.engine.parser.SQLAnalyzer;
 import org.apache.tajo.engine.planner.*;
+import org.apache.tajo.engine.planner.enforce.Enforcer;
 import org.apache.tajo.engine.planner.global.MasterPlan;
 import org.apache.tajo.engine.planner.logical.*;
+import org.apache.tajo.ipc.TajoWorkerProtocol;
 import org.apache.tajo.master.TajoMaster;
 import org.apache.tajo.storage.*;
 import org.apache.tajo.storage.index.bst.BSTIndex;
@@ -58,7 +60,9 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import static org.apache.tajo.ipc.TajoWorkerProtocol.GroupbyEnforce;
 import static org.apache.tajo.ipc.TajoWorkerProtocol.PartitionType;
+import static org.apache.tajo.ipc.TajoWorkerProtocol.SortEnforce.SortAlgorithm;
 import static org.junit.Assert.*;
 
 public class TestPhysicalPlanner {
@@ -241,6 +245,7 @@ public class TestPhysicalPlanner {
     Path workDir = CommonTestingUtil.getTestDir("target/test-data/testGroupByPlan");
     TaskAttemptContext ctx = new TaskAttemptContext(conf, LocalTajoTestingUtility.newQueryUnitAttemptId(masterPlan),
         new Fragment[] { frags[0] }, workDir);
+    ctx.setEnforcer(new Enforcer());
     Expr context = analyzer.parse(QUERIES[7]);
     LogicalPlan plan = planner.createPlan(context);
     optimizer.optimize(plan);
@@ -271,6 +276,7 @@ public class TestPhysicalPlanner {
         "target/test-data/testHashGroupByPlanWithALLField");
     TaskAttemptContext ctx = new TaskAttemptContext(conf, LocalTajoTestingUtility.newQueryUnitAttemptId(masterPlan),
         new Fragment[] { frags[0] }, workDir);
+    ctx.setEnforcer(new Enforcer());
     Expr expr = analyzer.parse(QUERIES[15]);
     LogicalPlan plan = planner.createPlan(expr);
     LogicalNode rootNode = optimizer.optimize(plan);
@@ -299,6 +305,7 @@ public class TestPhysicalPlanner {
     Path workDir = CommonTestingUtil.getTestDir("target/test-data/testSortGroupByPlan");
     TaskAttemptContext ctx = new TaskAttemptContext(conf, LocalTajoTestingUtility.newQueryUnitAttemptId(masterPlan),
         new Fragment[]{frags[0]}, workDir);
+    ctx.setEnforcer(new Enforcer());
     Expr context = analyzer.parse(QUERIES[7]);
     LogicalPlan plan = planner.createPlan(context);
     optimizer.optimize(plan);
@@ -355,8 +362,8 @@ public class TestPhysicalPlanner {
         Integer.MAX_VALUE);
     Path workDir = CommonTestingUtil.getTestDir("target/test-data/testStorePlan");
     TaskAttemptContext ctx = new TaskAttemptContext(conf, LocalTajoTestingUtility.newQueryUnitAttemptId(masterPlan),
-        new Fragment[] { frags[0] },
-        workDir);
+        new Fragment[] { frags[0] }, workDir);
+    ctx.setEnforcer(new Enforcer());
     ctx.setOutputPath(new Path(workDir, "grouped1"));
 
     Expr context = analyzer.parse(CreateTableAsStmts[0]);
@@ -396,8 +403,8 @@ public class TestPhysicalPlanner {
         Integer.MAX_VALUE);
     Path workDir = CommonTestingUtil.getTestDir("target/test-data/testStorePlanWithRCFile");
     TaskAttemptContext ctx = new TaskAttemptContext(conf, LocalTajoTestingUtility.newQueryUnitAttemptId(masterPlan),
-        new Fragment[] { frags[0] },
-        workDir);
+        new Fragment[] { frags[0] }, workDir);
+    ctx.setEnforcer(new Enforcer());
     ctx.setOutputPath(new Path(workDir, "grouped2"));
 
     Expr context = analyzer.parse(CreateTableAsStmts[1]);
@@ -437,9 +444,9 @@ public class TestPhysicalPlanner {
     QueryUnitAttemptId id = LocalTajoTestingUtility.newQueryUnitAttemptId(masterPlan);
     Path workDir = CommonTestingUtil.getTestDir("target/test-data/testPartitionedStorePlan");
     TaskAttemptContext ctx = new TaskAttemptContext(conf, id, new Fragment[] { frags[0] }, workDir);
+    ctx.setEnforcer(new Enforcer());
     Expr context = analyzer.parse(QUERIES[7]);
     LogicalPlan plan = planner.createPlan(context);
-    LogicalNode rootNode = plan.getRootBlock().getRoot();
 
     int numPartitions = 3;
     Column key1 = new Column("score.deptName", Type.TEXT);
@@ -448,7 +455,7 @@ public class TestPhysicalPlanner {
         PartitionType.HASH_PARTITION, numPartitions);
     dataChannel.setPartitionKey(new Column[]{key1, key2});
     ctx.setDataChannel(dataChannel);
-    rootNode = optimizer.optimize(plan);
+    LogicalNode rootNode = optimizer.optimize(plan);
 
     TableMeta outputMeta = CatalogUtil.newTableMeta(rootNode.getOutSchema(), StoreType.CSV);
 
@@ -496,8 +503,8 @@ public class TestPhysicalPlanner {
 
     Path workDir = CommonTestingUtil.getTestDir(
         "target/test-data/testPartitionedStorePlanWithEmptyGroupingSet");
-    TaskAttemptContext ctx = new TaskAttemptContext(conf, id, new Fragment[] { frags[0] },
-        workDir);
+    TaskAttemptContext ctx = new TaskAttemptContext(conf, id, new Fragment[] { frags[0] }, workDir);
+    ctx.setEnforcer(new Enforcer());
     Expr expr = analyzer.parse(QUERIES[14]);
     LogicalPlan plan = planner.createPlan(expr);
     LogicalNode rootNode = plan.getRootBlock().getRoot();
@@ -552,6 +559,7 @@ public class TestPhysicalPlanner {
     Path workDir = CommonTestingUtil.getTestDir("target/test-data/testAggregationFunction");
     TaskAttemptContext ctx = new TaskAttemptContext(conf, LocalTajoTestingUtility.newQueryUnitAttemptId(masterPlan),
         new Fragment[] { frags[0] }, workDir);
+    ctx.setEnforcer(new Enforcer());
     Expr context = analyzer.parse(QUERIES[8]);
     LogicalPlan plan = planner.createPlan(context);
     LogicalNode rootNode = optimizer.optimize(plan);
@@ -585,6 +593,7 @@ public class TestPhysicalPlanner {
     Path workDir = CommonTestingUtil.getTestDir("target/test-data/testCountFunction");
     TaskAttemptContext ctx = new TaskAttemptContext(conf, LocalTajoTestingUtility.newQueryUnitAttemptId(masterPlan),
         new Fragment[] { frags[0] }, workDir);
+    ctx.setEnforcer(new Enforcer());
     Expr context = analyzer.parse(QUERIES[9]);
     LogicalPlan plan = planner.createPlan(context);
     LogicalNode rootNode = optimizer.optimize(plan);
@@ -616,6 +625,7 @@ public class TestPhysicalPlanner {
     Path workDir = CommonTestingUtil.getTestDir("target/test-data/testGroupByWithNullValue");
     TaskAttemptContext ctx = new TaskAttemptContext(conf, LocalTajoTestingUtility.newQueryUnitAttemptId(masterPlan),
         new Fragment[] { frags[0] }, workDir);
+    ctx.setEnforcer(new Enforcer());
     Expr context = analyzer.parse(QUERIES[11]);
     LogicalPlan plan = planner.createPlan(context);
     LogicalNode rootNode = optimizer.optimize(plan);
@@ -726,6 +736,7 @@ public class TestPhysicalPlanner {
     Path workDir = CommonTestingUtil.getTestDir("target/test-data/testDuplicateEliminate");
     TaskAttemptContext ctx = new TaskAttemptContext(conf, LocalTajoTestingUtility.newQueryUnitAttemptId(masterPlan),
         new Fragment[] {frags[0]}, workDir);
+    ctx.setEnforcer(new Enforcer());
     Expr expr = analyzer.parse(duplicateElimination[0]);
     LogicalPlan plan = planner.createPlan(expr);
     LogicalNode rootNode = optimizer.optimize(plan);
@@ -758,6 +769,7 @@ public class TestPhysicalPlanner {
     Path workDir = CommonTestingUtil.getTestDir("target/test-data/testIndexedStoreExec");
     TaskAttemptContext ctx = new TaskAttemptContext(conf, LocalTajoTestingUtility.newQueryUnitAttemptId(masterPlan),
         new Fragment[] {frags[0]}, workDir);
+    ctx.setEnforcer(new Enforcer());
     Expr context = analyzer.parse(SORT_QUERY[0]);
     LogicalPlan plan = planner.createPlan(context);
     LogicalNode rootNode = optimizer.optimize(plan);
@@ -771,12 +783,6 @@ public class TestPhysicalPlanner {
     PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm);
     PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode);
 
-//    ProjectionExec proj = (ProjectionExec) exec;
-//    ExternalSortExec sort = (ExternalSortExec) proj.getChild();
-//
-//    SortSpec[] sortSpecs = sort.getPlan().getSortKeys();
-    //IndexedStoreExec idxStoreExec = new IndexedStoreExec(ctx, sm, sort, sort.getSchema(), sort.getSchema(), sortSpecs);
-
     Tuple tuple;
     exec.init();
     exec.next();
@@ -843,4 +849,101 @@ public class TestPhysicalPlanner {
 
     scanner.close();
   }
+
+  @Test
+  public final void testSortEnforcer() throws IOException, PlanningException {
+    Fragment[] frags = StorageManager.splitNG(conf, "employee", employee.getMeta(),
+        employee.getPath(), Integer.MAX_VALUE);
+
+    Path workDir = CommonTestingUtil.getTestDir("target/test-data/testSortEnforcer");
+    Expr context = analyzer.parse(SORT_QUERY[0]);
+    LogicalPlan plan = planner.createPlan(context);
+    optimizer.optimize(plan);
+    LogicalNode rootNode = plan.getRootBlock().getRoot();
+
+    SortNode sortNode = PlannerUtil.findTopNode(rootNode, NodeType.SORT);
+
+    Enforcer enforcer = new Enforcer();
+    enforcer.addSort(sortNode.getPID(), SortAlgorithm.IN_MEMORY_SORT);
+    TaskAttemptContext ctx = new TaskAttemptContext(conf, LocalTajoTestingUtility.newQueryUnitAttemptId(masterPlan),
+        new Fragment[] {frags[0]}, workDir);
+    ctx.setEnforcer(enforcer);
+
+    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm);
+    PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode);
+    exec.init();
+    exec.next();
+    exec.close();
+
+    assertTrue(((ProjectionExec)exec).getChild() instanceof MemSortExec);
+
+    context = analyzer.parse(SORT_QUERY[0]);
+    plan = planner.createPlan(context);
+    optimizer.optimize(plan);
+    rootNode = plan.getRootBlock().getRoot();
+
+    sortNode = PlannerUtil.findTopNode(rootNode, NodeType.SORT);
+
+    enforcer = new Enforcer();
+    enforcer.addSort(sortNode.getPID(), SortAlgorithm.MERGE_SORT);
+    ctx = new TaskAttemptContext(conf, LocalTajoTestingUtility.newQueryUnitAttemptId(masterPlan),
+        new Fragment[] {frags[0]}, workDir);
+    ctx.setEnforcer(enforcer);
+
+    phyPlanner = new PhysicalPlannerImpl(conf,sm);
+    exec = phyPlanner.createPlan(ctx, rootNode);
+    exec.init();
+    exec.next();
+    exec.close();
+
+    assertTrue(((ProjectionExec)exec).getChild() instanceof ExternalSortExec);
+  }
+
+  @Test
+  public final void testGroupByEnforcer() throws IOException, PlanningException {
+    Fragment[] frags = StorageManager.splitNG(conf, "score", score.getMeta(), score.getPath(), Integer.MAX_VALUE);
+
+    Path workDir = CommonTestingUtil.getTestDir("target/test-data/testGroupByEnforcer");
+    Expr context = analyzer.parse(QUERIES[7]);
+    LogicalPlan plan = planner.createPlan(context);
+    optimizer.optimize(plan);
+    LogicalNode rootNode = plan.getRootBlock().getRoot();
+
+    GroupbyNode groupByNode = PlannerUtil.findTopNode(rootNode, NodeType.GROUP_BY);
+
+    Enforcer enforcer = new Enforcer();
+    enforcer.addGroupby(groupByNode.getPID(), GroupbyEnforce.GroupbyAlgorithm.HASH_AGGREGATION);
+    TaskAttemptContext ctx = new TaskAttemptContext(conf, LocalTajoTestingUtility.newQueryUnitAttemptId(masterPlan),
+        new Fragment[] {frags[0]}, workDir);
+    ctx.setEnforcer(enforcer);
+
+    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm);
+    PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode);
+    exec.init();
+    exec.next();
+    exec.close();
+
+    assertTrue(exec instanceof HashAggregateExec);
+
+    context = analyzer.parse(QUERIES[7]);
+    plan = planner.createPlan(context);
+    optimizer.optimize(plan);
+    rootNode = plan.getRootBlock().getRoot();
+
+    groupByNode = PlannerUtil.findTopNode(rootNode, NodeType.GROUP_BY);
+
+    enforcer = new Enforcer();
+    enforcer.addGroupby(groupByNode.getPID(), GroupbyEnforce.GroupbyAlgorithm.SORT_AGGREGATION);
+    ctx = new TaskAttemptContext(conf, LocalTajoTestingUtility.newQueryUnitAttemptId(masterPlan),
+        new Fragment[] {frags[0]}, workDir);
+    ctx.setEnforcer(enforcer);
+
+    phyPlanner = new PhysicalPlannerImpl(conf,sm);
+    exec = phyPlanner.createPlan(ctx, rootNode);
+    exec.init();
+    exec.next();
+    exec.close();
+
+    assertTrue(exec instanceof SortAggregateExec);
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/17287ef5/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestSortExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestSortExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestSortExec.java
index c655e05..b75c5e7 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestSortExec.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestSortExec.java
@@ -31,6 +31,7 @@ import org.apache.tajo.datum.Datum;
 import org.apache.tajo.datum.DatumFactory;
 import org.apache.tajo.engine.parser.SQLAnalyzer;
 import org.apache.tajo.engine.planner.*;
+import org.apache.tajo.engine.planner.enforce.Enforcer;
 import org.apache.tajo.engine.planner.logical.LogicalNode;
 import org.apache.tajo.storage.*;
 import org.apache.tajo.util.CommonTestingUtil;
@@ -110,8 +111,8 @@ public class TestSortExec {
     Fragment [] frags = sm.splitNG(conf, "employee", employeeMeta, tablePath, Integer.MAX_VALUE);
     Path workDir = CommonTestingUtil.getTestDir("target/test-data/TestSortExec");
     TaskAttemptContext ctx = new TaskAttemptContext(conf, LocalTajoTestingUtility
-        .newQueryUnitAttemptId(),
-        new Fragment[] { frags[0] }, workDir);
+        .newQueryUnitAttemptId(), new Fragment[] { frags[0] }, workDir);
+    ctx.setEnforcer(new Enforcer());
     Expr context = analyzer.parse(QUERIES[0]);
     LogicalPlan plan = planner.createPlan(context);
     LogicalNode rootNode = optimizer.optimize(plan);


Mime
View raw message