tajo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jihoon...@apache.org
Subject [1/4] TAJO-266: Extend ExecutionBlock and Task to support multiple outputs. (jihoon)
Date Tue, 26 Nov 2013 09:14:33 GMT
Updated Branches:
  refs/heads/DAG-execplan [created] 7c97735e1


http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/7c97735e/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
index b57ef3a..76d54e0 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
@@ -25,7 +25,9 @@ import org.apache.commons.codec.binary.Base64;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.tajo.*;
+import org.apache.tajo.LocalTajoTestingUtility;
+import org.apache.tajo.QueryUnitAttemptId;
+import org.apache.tajo.TajoTestingCluster;
 import org.apache.tajo.algebra.Expr;
 import org.apache.tajo.catalog.*;
 import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
@@ -40,11 +42,13 @@ import org.apache.tajo.engine.eval.EvalTreeUtil;
 import org.apache.tajo.engine.parser.SQLAnalyzer;
 import org.apache.tajo.engine.planner.*;
 import org.apache.tajo.engine.planner.enforce.Enforcer;
+import org.apache.tajo.engine.planner.global.DataChannel;
+import org.apache.tajo.engine.planner.global.ExecutionPlan;
 import org.apache.tajo.engine.planner.global.MasterPlan;
 import org.apache.tajo.engine.planner.logical.*;
-import org.apache.tajo.engine.planner.global.DataChannel;
 import org.apache.tajo.master.TajoMaster;
 import org.apache.tajo.storage.*;
+import org.apache.tajo.storage.Scanner;
 import org.apache.tajo.storage.fragment.FileFragment;
 import org.apache.tajo.storage.index.bst.BSTIndex;
 import org.apache.tajo.util.CommonTestingUtil;
@@ -58,9 +62,7 @@ import org.junit.Test;
 
 import java.io.File;
 import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
+import java.util.*;
 
 import static org.apache.tajo.ipc.TajoWorkerProtocol.PartitionType;
 import static org.apache.tajo.ipc.TajoWorkerProtocol.SortEnforce.SortAlgorithm;
@@ -192,9 +194,11 @@ public class TestPhysicalPlanner {
     LogicalNode rootNode =plan.getRootBlock().getRoot();
     optimizer.optimize(plan);
 
-
-    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf, sm);
-    PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode);
+    ExecutionPlan execPlan = new ExecutionPlan();
+    execPlan.addPlan(rootNode);
+    PhysicalPlannerImpl phyPlanner = new PhysicalPlannerImpl(conf, sm);
+    PhysicalExec exec = phyPlanner.createPlanWithoutMaterialize(ctx, execPlan);
+    exec = ((PhysicalRootExec)exec).getChild(0);
 
     Tuple tuple;
     int i = 0;
@@ -221,9 +225,11 @@ public class TestPhysicalPlanner {
     LogicalNode rootNode =plan.getRootBlock().getRoot();
     optimizer.optimize(plan);
 
-
-    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm);
-    PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode);
+    ExecutionPlan execPlan = new ExecutionPlan();
+    execPlan.addPlan(rootNode);
+    PhysicalPlannerImpl phyPlanner = new PhysicalPlannerImpl(conf,sm);
+    PhysicalExec exec = phyPlanner.createPlanWithoutMaterialize(ctx, execPlan);
+    exec = ((PhysicalRootExec)exec).getChild(0);
 
     Tuple tuple;
     int i = 0;
@@ -249,8 +255,11 @@ public class TestPhysicalPlanner {
     optimizer.optimize(plan);
     LogicalNode rootNode = plan.getRootBlock().getRoot();
 
-    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm);
-    PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode);
+    ExecutionPlan execPlan = new ExecutionPlan();
+    execPlan.addPlan(rootNode);
+    PhysicalPlannerImpl phyPlanner = new PhysicalPlannerImpl(conf,sm);
+    PhysicalExec exec = phyPlanner.createPlanWithoutMaterialize(ctx, execPlan);
+    exec = ((PhysicalRootExec)exec).getChild(0);
 
     int i = 0;
     Tuple tuple;
@@ -279,8 +288,11 @@ public class TestPhysicalPlanner {
     LogicalPlan plan = planner.createPlan(expr);
     LogicalNode rootNode = optimizer.optimize(plan);
 
-    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm);
-    PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode);
+    ExecutionPlan execPlan = new ExecutionPlan();
+    execPlan.addPlan(rootNode);
+    PhysicalPlannerImpl phyPlanner = new PhysicalPlannerImpl(conf,sm);
+    PhysicalExec exec = phyPlanner.createPlanWithoutMaterialize(ctx, execPlan);
+    exec = ((PhysicalRootExec)exec).getChild(0);
 
     int i = 0;
     Tuple tuple;
@@ -308,8 +320,11 @@ public class TestPhysicalPlanner {
     LogicalPlan plan = planner.createPlan(context);
     optimizer.optimize(plan);
 
-    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm);
-    PhysicalExec exec = phyPlanner.createPlan(ctx, plan.getRootBlock().getRoot());
+    ExecutionPlan execPlan = new ExecutionPlan();
+    execPlan.addPlan(plan.getRootBlock().getRoot());
+    PhysicalPlannerImpl phyPlanner = new PhysicalPlannerImpl(conf,sm);
+    PhysicalExec exec = phyPlanner.createPlanWithoutMaterialize(ctx, execPlan);
+    exec = ((PhysicalRootExec)exec).getChild(0);
 
     /*HashAggregateExec hashAgg = (HashAggregateExec) exec;
 
@@ -371,10 +386,13 @@ public class TestPhysicalPlanner {
 
     TableMeta outputMeta = CatalogUtil.newTableMeta(StoreType.CSV);
 
-    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm);
-    PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode);
+    ExecutionPlan execPlan = new ExecutionPlan();
+    execPlan.addPlan(rootNode);
+    PhysicalPlannerImpl phyPlanner = new PhysicalPlannerImpl(conf,sm);
+    PhysicalExec exec = phyPlanner.createPlanWithoutMaterialize(ctx, execPlan);
+    exec = ((PhysicalRootExec)exec).getChild(0);
     exec.init();
-    exec.next();
+    while (exec.next() != null);
     exec.close();
 
     Scanner scanner = StorageManagerFactory.getStorageManager(conf).getFileScanner(outputMeta,
rootNode.getOutSchema(),
@@ -411,10 +429,13 @@ public class TestPhysicalPlanner {
 
     TableMeta outputMeta = CatalogUtil.newTableMeta(StoreType.RCFILE);
 
-    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm);
-    PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode);
+    ExecutionPlan execPlan = new ExecutionPlan();
+    execPlan.addPlan(rootNode);
+    PhysicalPlannerImpl phyPlanner = new PhysicalPlannerImpl(conf,sm);
+    PhysicalExec exec = phyPlanner.createPlanWithoutMaterialize(ctx, execPlan);
+    exec = ((PhysicalRootExec)exec).getChild(0);
     exec.init();
-    exec.next();
+    while (exec.next() != null);
     exec.close();
 
     Scanner scanner = StorageManagerFactory.getStorageManager(conf).getFileScanner(outputMeta,
rootNode.getOutSchema(),
@@ -445,24 +466,29 @@ public class TestPhysicalPlanner {
     ctx.setEnforcer(new Enforcer());
     Expr context = analyzer.parse(QUERIES[7]);
     LogicalPlan plan = planner.createPlan(context);
+    LogicalNode rootNode = optimizer.optimize(plan);
 
     int numPartitions = 3;
     Column key1 = new Column("score.deptName", Type.TEXT);
     Column key2 = new Column("score.class", Type.TEXT);
     DataChannel dataChannel = new DataChannel(masterPlan.newExecutionBlockId(), masterPlan.newExecutionBlockId(),
-        PartitionType.HASH_PARTITION, numPartitions);
+        ((LogicalRootNode)rootNode).getChild().getPID(), null, PartitionType.HASH_PARTITION,
numPartitions);
+    dataChannel.setSchema(rootNode.getOutSchema());
     dataChannel.setPartitionKey(new Column[]{key1, key2});
-    ctx.setDataChannel(dataChannel);
-    LogicalNode rootNode = optimizer.optimize(plan);
+    List<DataChannel> channels = new ArrayList<DataChannel>();
+    channels.add(dataChannel);
+    ctx.setOutgoingChannels(channels);
 
     TableMeta outputMeta = CatalogUtil.newTableMeta(StoreType.CSV);
 
     FileSystem fs = sm.getFileSystem();
 
-    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm);
-    PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode);
+    ExecutionPlan execPlan = new ExecutionPlan();
+    execPlan.addPlan(rootNode);
+    PhysicalPlannerImpl phyPlanner = new PhysicalPlannerImpl(conf,sm);
+    PhysicalExec exec = phyPlanner.createPlan(ctx, execPlan);
     exec.init();
-    exec.next();
+    while (exec.next() != null);
     exec.close();
 
     Path path = new Path(workDir, "output");
@@ -505,20 +531,24 @@ public class TestPhysicalPlanner {
     ctx.setEnforcer(new Enforcer());
     Expr expr = analyzer.parse(QUERIES[14]);
     LogicalPlan plan = planner.createPlan(expr);
-    LogicalNode rootNode = plan.getRootBlock().getRoot();
+    LogicalNode rootNode = optimizer.optimize(plan);
     int numPartitions = 1;
     DataChannel dataChannel = new DataChannel(masterPlan.newExecutionBlockId(), masterPlan.newExecutionBlockId(),
-        PartitionType.HASH_PARTITION, numPartitions);
+        ((LogicalRootNode)rootNode).getChild().getPID(), null, PartitionType.HASH_PARTITION,
numPartitions);
+    dataChannel.setSchema(rootNode.getOutSchema());
     dataChannel.setPartitionKey(new Column[]{});
-    ctx.setDataChannel(dataChannel);
-    optimizer.optimize(plan);
+    List<DataChannel> channels = new ArrayList<DataChannel>();
+    channels.add(dataChannel);
+    ctx.setOutgoingChannels(channels);
 
     TableMeta outputMeta = CatalogUtil.newTableMeta(StoreType.CSV);
 
-    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm);
-    PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode);
+    ExecutionPlan execPlan = new ExecutionPlan();
+    execPlan.addPlan(rootNode);
+    PhysicalPlannerImpl phyPlanner = new PhysicalPlannerImpl(conf,sm);
+    PhysicalExec exec = phyPlanner.createPlan(ctx, execPlan);
     exec.init();
-    exec.next();
+    while (exec.next() != null);
     exec.close();
 
     Path path = new Path(workDir, "output");
@@ -571,8 +601,11 @@ public class TestPhysicalPlanner {
       }
     }
 
-    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm);
-    PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode);
+    ExecutionPlan execPlan = new ExecutionPlan();
+    execPlan.addPlan(rootNode);
+    PhysicalPlannerImpl phyPlanner = new PhysicalPlannerImpl(conf,sm);
+    PhysicalExec exec = phyPlanner.createPlanWithoutMaterialize(ctx, execPlan);
+    exec = ((PhysicalRootExec)exec).getChild(0);
 
     exec.init();
     Tuple tuple = exec.next();
@@ -606,8 +639,11 @@ public class TestPhysicalPlanner {
       }
     }
 
-    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm);
-    PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode);
+    ExecutionPlan execPlan = new ExecutionPlan();
+    execPlan.addPlan(rootNode);
+    PhysicalPlannerImpl phyPlanner = new PhysicalPlannerImpl(conf,sm);
+    PhysicalExec exec = phyPlanner.createPlanWithoutMaterialize(ctx, execPlan);
+    exec = ((PhysicalRootExec)exec).getChild(0);
     exec.init();
     Tuple tuple = exec.next();
     assertEquals(30, tuple.get(0).asInt8());
@@ -627,8 +663,11 @@ public class TestPhysicalPlanner {
     LogicalPlan plan = planner.createPlan(context);
     LogicalNode rootNode = optimizer.optimize(plan);
 
-    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm);
-    PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode);
+    ExecutionPlan execPlan = new ExecutionPlan();
+    execPlan.addPlan(rootNode);
+    PhysicalPlannerImpl phyPlanner = new PhysicalPlannerImpl(conf,sm);
+    PhysicalExec exec = phyPlanner.createPlanWithoutMaterialize(ctx, execPlan);
+    exec = ((PhysicalRootExec)exec).getChild(0);
 
     int count = 0;
     exec.init();
@@ -650,11 +689,14 @@ public class TestPhysicalPlanner {
     LogicalPlan plan = planner.createPlan(context);
     LogicalNode rootNode = optimizer.optimize(plan);
     LogicalRootNode root = (LogicalRootNode) rootNode;
-    UnionNode union = new UnionNode(plan.newPID(), root.getChild(), root.getChild());
+    UnionNode union = new UnionNode(plan.newPID(), root.getChild(), clonePlan(plan, root.getChild()));
     root.setChild(union);
 
-    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm);
-    PhysicalExec exec = phyPlanner.createPlan(ctx, root);
+    ExecutionPlan execPlan = new ExecutionPlan();
+    execPlan.addPlan(root);
+    PhysicalPlannerImpl phyPlanner = new PhysicalPlannerImpl(conf,sm);
+    PhysicalExec exec = phyPlanner.createPlanWithoutMaterialize(ctx, execPlan);
+    exec = ((PhysicalRootExec)exec).getChild(0);
 
     int count = 0;
     exec.init();
@@ -665,6 +707,31 @@ public class TestPhysicalPlanner {
     assertEquals(200, count);
   }
 
+  private LogicalNode clonePlan(LogicalPlan plan, LogicalNode node) {
+    try {
+      LogicalNode clone = (LogicalNode) node.clone();
+      Stack<LogicalNode> stack = new Stack<LogicalNode>();
+      stack.push(clone);
+      LogicalNode current;
+
+      while (!stack.isEmpty()) {
+        current = stack.pop();
+        current.setPid(plan.newPID());
+        if (current instanceof UnaryNode) {
+          stack.push(((UnaryNode) current).getChild());
+        } else if (current instanceof BinaryNode) {
+          stack.push(((BinaryNode) current).getLeftChild());
+          stack.push(((BinaryNode) current).getRightChild());
+        }
+      }
+
+      return clone;
+    } catch (CloneNotSupportedException e) {
+
+    }
+    return null;
+  }
+
   @Test
   public final void testEvalExpr() throws IOException, PlanningException {
     Path workDir = CommonTestingUtil.getTestDir("target/test-data/testEvalExpr");
@@ -674,8 +741,11 @@ public class TestPhysicalPlanner {
     LogicalPlan plan = planner.createPlan(expr);
     LogicalNode rootNode = optimizer.optimize(plan);
 
-    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf, sm);
-    PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode);
+    ExecutionPlan execPlan = new ExecutionPlan();
+    execPlan.addPlan(rootNode);
+    PhysicalPlannerImpl phyPlanner = new PhysicalPlannerImpl(conf, sm);
+    PhysicalExec exec = phyPlanner.createPlanWithoutMaterialize(ctx, execPlan);
+    exec = ((PhysicalRootExec)exec).getChild(0);
     Tuple tuple;
     exec.init();
     tuple = exec.next();
@@ -687,8 +757,11 @@ public class TestPhysicalPlanner {
     plan = planner.createPlan(expr);
     rootNode = optimizer.optimize(plan);
 
+    execPlan = new ExecutionPlan();
+    execPlan.addPlan(rootNode);
     phyPlanner = new PhysicalPlannerImpl(conf, sm);
-    exec = phyPlanner.createPlan(ctx, rootNode);
+    exec = phyPlanner.createPlanWithoutMaterialize(ctx, execPlan);
+    exec = ((PhysicalRootExec)exec).getChild(0);
     exec.init();
     tuple = exec.next();
     exec.close();
@@ -710,8 +783,11 @@ public class TestPhysicalPlanner {
     LogicalPlan plan = planner.createPlan(context);
     LogicalNode rootNode = optimizer.optimize(plan);
 
-    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf, sm);
-    PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode);
+    ExecutionPlan execPlan = new ExecutionPlan();
+    execPlan.addPlan(rootNode);
+    PhysicalPlannerImpl phyPlanner = new PhysicalPlannerImpl(conf, sm);
+    PhysicalExec exec = phyPlanner.createPlanWithoutMaterialize(ctx, execPlan);
+    exec = ((PhysicalRootExec)exec).getChild(0);
     exec.init();
     while (exec.next() != null) {
     }
@@ -738,8 +814,11 @@ public class TestPhysicalPlanner {
     LogicalPlan plan = planner.createPlan(expr);
     LogicalNode rootNode = optimizer.optimize(plan);
 
-    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm);
-    PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode);
+    ExecutionPlan execPlan = new ExecutionPlan();
+    execPlan.addPlan(rootNode);
+    PhysicalPlannerImpl phyPlanner = new PhysicalPlannerImpl(conf,sm);
+    PhysicalExec exec = phyPlanner.createPlanWithoutMaterialize(ctx, execPlan);
+    exec = ((PhysicalRootExec)exec).getChild(0);
     Tuple tuple;
 
     int cnt = 0;
@@ -773,16 +852,21 @@ public class TestPhysicalPlanner {
 
     SortNode sortNode = PlannerUtil.findTopNode(rootNode, NodeType.SORT);
     DataChannel channel = new DataChannel(masterPlan.newExecutionBlockId(), masterPlan.newExecutionBlockId(),
-        PartitionType.RANGE_PARTITION);
+        ((LogicalRootNode)rootNode).getChild().getPID(), null, PartitionType.RANGE_PARTITION);
+    channel.setSchema(rootNode.getOutSchema());
     channel.setPartitionKey(PlannerUtil.sortSpecsToSchema(sortNode.getSortKeys()).toArray());
-    ctx.setDataChannel(channel);
+    List<DataChannel> channels = new ArrayList<DataChannel>();
+    channels.add(channel);
+    ctx.setOutgoingChannels(channels);
 
-    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm);
-    PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode);
+    ExecutionPlan execPlan = new ExecutionPlan();
+    execPlan.addPlan(rootNode);
+    PhysicalPlannerImpl phyPlanner = new PhysicalPlannerImpl(conf,sm);
+    PhysicalExec exec = phyPlanner.createPlan(ctx, execPlan);
 
     Tuple tuple;
     exec.init();
-    exec.next();
+    while (exec.next() != null);
     exec.close();
 
     Schema keySchema = new Schema();
@@ -866,8 +950,11 @@ public class TestPhysicalPlanner {
         new FileFragment[] {frags[0]}, workDir);
     ctx.setEnforcer(enforcer);
 
-    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm);
-    PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode);
+    ExecutionPlan execPlan = new ExecutionPlan();
+    execPlan.addPlan(rootNode);
+    PhysicalPlannerImpl phyPlanner = new PhysicalPlannerImpl(conf,sm);
+    PhysicalExec exec = phyPlanner.createPlanWithoutMaterialize(ctx, execPlan);
+    exec = ((PhysicalRootExec)exec).getChild(0);
     exec.init();
     exec.next();
     exec.close();
@@ -887,8 +974,11 @@ public class TestPhysicalPlanner {
         new FileFragment[] {frags[0]}, workDir);
     ctx.setEnforcer(enforcer);
 
+    execPlan = new ExecutionPlan();
+    execPlan.addPlan(rootNode);
     phyPlanner = new PhysicalPlannerImpl(conf,sm);
-    exec = phyPlanner.createPlan(ctx, rootNode);
+    exec = phyPlanner.createPlanWithoutMaterialize(ctx, execPlan);
+    exec = ((PhysicalRootExec)exec).getChild(0);
     exec.init();
     exec.next();
     exec.close();
@@ -914,8 +1004,11 @@ public class TestPhysicalPlanner {
         new FileFragment[] {frags[0]}, workDir);
     ctx.setEnforcer(enforcer);
 
-    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm);
-    PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode);
+    ExecutionPlan execPlan = new ExecutionPlan();
+    execPlan.addPlan(rootNode);
+    PhysicalPlannerImpl phyPlanner = new PhysicalPlannerImpl(conf,sm);
+    PhysicalExec exec = phyPlanner.createPlanWithoutMaterialize(ctx, execPlan);
+    exec = ((PhysicalRootExec)exec).getChild(0);
     exec.init();
     exec.next();
     exec.close();
@@ -935,8 +1028,11 @@ public class TestPhysicalPlanner {
         new FileFragment[] {frags[0]}, workDir);
     ctx.setEnforcer(enforcer);
 
+    execPlan = new ExecutionPlan();
+    execPlan.addPlan(rootNode);
     phyPlanner = new PhysicalPlannerImpl(conf,sm);
-    exec = phyPlanner.createPlan(ctx, rootNode);
+    exec = phyPlanner.createPlanWithoutMaterialize(ctx, execPlan);
+    exec = ((PhysicalRootExec)exec).getChild(0);
     exec.init();
     exec.next();
     exec.close();

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/7c97735e/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestRightOuterHashJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestRightOuterHashJoinExec.java
b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestRightOuterHashJoinExec.java
index d582e2b..42a0cb7 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestRightOuterHashJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestRightOuterHashJoinExec.java
@@ -19,9 +19,8 @@
 package org.apache.tajo.engine.planner.physical;
 
 import org.apache.hadoop.fs.Path;
+import org.apache.tajo.LocalTajoTestingUtility;
 import org.apache.tajo.TajoTestingCluster;
-import org.apache.tajo.storage.fragment.FileFragment;
-import org.apache.tajo.worker.TaskAttemptContext;
 import org.apache.tajo.algebra.Expr;
 import org.apache.tajo.catalog.*;
 import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
@@ -31,22 +30,22 @@ import org.apache.tajo.datum.Datum;
 import org.apache.tajo.datum.DatumFactory;
 import org.apache.tajo.engine.parser.SQLAnalyzer;
 import org.apache.tajo.engine.planner.LogicalPlanner;
-import org.apache.tajo.engine.planner.PhysicalPlanner;
 import org.apache.tajo.engine.planner.PhysicalPlannerImpl;
 import org.apache.tajo.engine.planner.PlanningException;
+import org.apache.tajo.engine.planner.global.ExecutionPlan;
 import org.apache.tajo.engine.planner.logical.LogicalNode;
 import org.apache.tajo.storage.*;
+import org.apache.tajo.storage.fragment.FileFragment;
 import org.apache.tajo.util.CommonTestingUtil;
 import org.apache.tajo.util.TUtil;
+import org.apache.tajo.worker.TaskAttemptContext;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
-import org.apache.tajo.LocalTajoTestingUtility;
 
 import java.io.IOException;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
 
 // this is not a physical operator in itself, but it uses the HashLeftOuterJoinExec with
switched inputs order
 public class TestRightOuterHashJoinExec {
@@ -232,10 +231,12 @@ public class TestRightOuterHashJoinExec {
     Expr expr = analyzer.parse(QUERIES[0]);
     LogicalNode plan = planner.createPlan(expr).getRootBlock().getRoot();
 
-    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf, sm);
-    PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
+    ExecutionPlan execPlan = new ExecutionPlan();
+    execPlan.addPlan(plan);
+    PhysicalPlannerImpl phyPlanner = new PhysicalPlannerImpl(conf, sm);
+    PhysicalExec exec = phyPlanner.createPlanWithoutMaterialize(ctx, execPlan);
 
-    ProjectionExec proj = (ProjectionExec) exec;
+    ProjectionExec proj = (ProjectionExec) ((PhysicalRootExec)exec).getChild(0);
     if (proj.getChild() instanceof RightOuterMergeJoinExec) {
        //for this small data set this is not likely to happen
       
@@ -245,13 +246,13 @@ public class TestRightOuterHashJoinExec {
        Tuple tuple;
        int count = 0;
        int i = 1;
-       exec.init();
+       proj.init();
   
-       while ((tuple = exec.next()) != null) {
+       while ((tuple = proj.next()) != null) {
          //TODO check contents
          count = count + 1;
        }
-       exec.close();
+       proj.close();
        assertEquals(12, count);
     }
   }
@@ -272,10 +273,12 @@ public class TestRightOuterHashJoinExec {
     Expr expr = analyzer.parse(QUERIES[1]);
     LogicalNode plan = planner.createPlan(expr).getRootBlock().getRoot();
 
-    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf, sm);
-    PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
+    ExecutionPlan execPlan = new ExecutionPlan();
+    execPlan.addPlan(plan);
+    PhysicalPlannerImpl phyPlanner = new PhysicalPlannerImpl(conf, sm);
+    PhysicalExec exec = phyPlanner.createPlanWithoutMaterialize(ctx, execPlan);
 
-    ProjectionExec proj = (ProjectionExec) exec;
+    ProjectionExec proj = (ProjectionExec) ((PhysicalRootExec)exec).getChild(0);
     if (proj.getChild() instanceof RightOuterMergeJoinExec) {
        //for this small data set this is not likely to happen
       
@@ -285,13 +288,13 @@ public class TestRightOuterHashJoinExec {
        Tuple tuple;
        int count = 0;
        int i = 1;
-       exec.init();
+       proj.init();
   
-       while ((tuple = exec.next()) != null) {
+       while ((tuple = proj.next()) != null) {
          //TODO check contents
          count = count + 1;
        }
-       exec.close();
+       proj.close();
        assertEquals(5, count);
     }
   }
@@ -312,10 +315,12 @@ public class TestRightOuterHashJoinExec {
     Expr expr = analyzer.parse(QUERIES[2]);
     LogicalNode plan = planner.createPlan(expr).getRootBlock().getRoot();
 
-    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf, sm);
-    PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
+    ExecutionPlan execPlan = new ExecutionPlan();
+    execPlan.addPlan(plan);
+    PhysicalPlannerImpl phyPlanner = new PhysicalPlannerImpl(conf, sm);
+    PhysicalExec exec = phyPlanner.createPlanWithoutMaterialize(ctx, execPlan);
 
-    ProjectionExec proj = (ProjectionExec) exec;
+    ProjectionExec proj = (ProjectionExec) ((PhysicalRootExec)exec).getChild(0);
     if (proj.getChild() instanceof RightOuterMergeJoinExec) {
       //for this small data set this is not likely to happen
       
@@ -325,13 +330,13 @@ public class TestRightOuterHashJoinExec {
        Tuple tuple;
        int count = 0;
        int i = 1;
-       exec.init();
+       proj.init();
   
-       while ((tuple = exec.next()) != null) {
+       while ((tuple = proj.next()) != null) {
          //TODO check contents
          count = count + 1;
        }
-       exec.close();
+       proj.close();
        assertEquals(7, count);
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/7c97735e/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestRightOuterMergeJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestRightOuterMergeJoinExec.java
b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestRightOuterMergeJoinExec.java
index 5bbb4aa..948680d 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestRightOuterMergeJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestRightOuterMergeJoinExec.java
@@ -21,8 +21,6 @@ package org.apache.tajo.engine.planner.physical;
 import org.apache.hadoop.fs.Path;
 import org.apache.tajo.LocalTajoTestingUtility;
 import org.apache.tajo.TajoTestingCluster;
-import org.apache.tajo.storage.fragment.FileFragment;
-import org.apache.tajo.worker.TaskAttemptContext;
 import org.apache.tajo.algebra.Expr;
 import org.apache.tajo.catalog.*;
 import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
@@ -33,12 +31,15 @@ import org.apache.tajo.datum.DatumFactory;
 import org.apache.tajo.engine.parser.SQLAnalyzer;
 import org.apache.tajo.engine.planner.*;
 import org.apache.tajo.engine.planner.enforce.Enforcer;
+import org.apache.tajo.engine.planner.global.ExecutionPlan;
 import org.apache.tajo.engine.planner.logical.JoinNode;
 import org.apache.tajo.engine.planner.logical.LogicalNode;
 import org.apache.tajo.engine.planner.logical.NodeType;
 import org.apache.tajo.storage.*;
+import org.apache.tajo.storage.fragment.FileFragment;
 import org.apache.tajo.util.CommonTestingUtil;
 import org.apache.tajo.util.TUtil;
+import org.apache.tajo.worker.TaskAttemptContext;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -46,9 +47,7 @@ import org.junit.Test;
 import java.io.IOException;
 
 import static org.apache.tajo.ipc.TajoWorkerProtocol.JoinEnforce.JoinAlgorithm;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.*;
 
 public class TestRightOuterMergeJoinExec {
   private TajoConf conf;
@@ -309,20 +308,22 @@ public class TestRightOuterMergeJoinExec {
         LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
     ctx.setEnforcer(enforcer);
 
-    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm);
-    PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
+    ExecutionPlan execPlan = new ExecutionPlan();
+    execPlan.addPlan(plan);
+    PhysicalPlannerImpl phyPlanner = new PhysicalPlannerImpl(conf,sm);
+    PhysicalExec exec = phyPlanner.createPlanWithoutMaterialize(ctx, execPlan);
 
-    ProjectionExec proj = (ProjectionExec) exec;
+    ProjectionExec proj = (ProjectionExec) ((PhysicalRootExec)exec).getChild(0);
     assertTrue(proj.getChild() instanceof RightOuterMergeJoinExec);
 
     int count = 0;
-    exec.init();
-    while (exec.next() != null) {
+    proj.init();
+    while (proj.next() != null) {
       //TODO check contents
       count = count + 1;
     }
-    assertNull(exec.next());
-    exec.close();
+    assertNull(proj.next());
+    proj.close();
     assertEquals(12, count);
   }
 
@@ -344,20 +345,22 @@ public class TestRightOuterMergeJoinExec {
         LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
     ctx.setEnforcer(enforcer);
 
-    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm);
-    PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
+    ExecutionPlan execPlan = new ExecutionPlan();
+    execPlan.addPlan(plan);
+    PhysicalPlannerImpl phyPlanner = new PhysicalPlannerImpl(conf,sm);
+    PhysicalExec exec = phyPlanner.createPlanWithoutMaterialize(ctx, execPlan);
 
-    ProjectionExec proj = (ProjectionExec) exec;
+    ProjectionExec proj = (ProjectionExec) ((PhysicalRootExec)exec).getChild(0);
     assertTrue(proj.getChild() instanceof RightOuterMergeJoinExec);
 
     int count = 0;
-    exec.init();
-    while (exec.next() != null) {
+    proj.init();
+    while (proj.next() != null) {
       //TODO check contents
       count = count + 1;
     }
-    assertNull(exec.next());
-    exec.close();
+    assertNull(proj.next());
+    proj.close();
     assertEquals(5, count);
   }
 
@@ -378,19 +381,21 @@ public class TestRightOuterMergeJoinExec {
         LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
     ctx.setEnforcer(enforcer);
 
-    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm);
-    PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
-    ProjectionExec proj = (ProjectionExec) exec;
+    ExecutionPlan execPlan = new ExecutionPlan();
+    execPlan.addPlan(plan);
+    PhysicalPlannerImpl phyPlanner = new PhysicalPlannerImpl(conf,sm);
+    PhysicalExec exec = phyPlanner.createPlanWithoutMaterialize(ctx, execPlan);
+    ProjectionExec proj = (ProjectionExec) ((PhysicalRootExec)exec).getChild(0);
     assertTrue(proj.getChild() instanceof RightOuterMergeJoinExec);
 
     int count = 0;
-    exec.init();
-    while (exec.next() != null) {
+    proj.init();
+    while (proj.next() != null) {
       //TODO check contents
       count = count + 1;
     }
-    assertNull(exec.next());
-    exec.close();
+    assertNull(proj.next());
+    proj.close();
     assertEquals(7, count);
   }
 
@@ -412,21 +417,23 @@ public class TestRightOuterMergeJoinExec {
         LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
     ctx.setEnforcer(enforcer);
 
-    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm);
-    PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
+    ExecutionPlan execPlan = new ExecutionPlan();
+    execPlan.addPlan(plan);
+    PhysicalPlannerImpl phyPlanner = new PhysicalPlannerImpl(conf,sm);
+    PhysicalExec exec = phyPlanner.createPlanWithoutMaterialize(ctx, execPlan);
 
-    ProjectionExec proj = (ProjectionExec) exec;
+    ProjectionExec proj = (ProjectionExec) ((PhysicalRootExec)exec).getChild(0);
     assertTrue(proj.getChild() instanceof RightOuterMergeJoinExec);
 
     int count = 0;
-    exec.init();
+    proj.init();
 
-    while (exec.next() != null) {
+    while (proj.next() != null) {
       //TODO check contents
       count = count + 1;
     }
-    assertNull(exec.next());
-    exec.close();
+    assertNull(proj.next());
+    proj.close();
     assertEquals(13, count);
   }
 
@@ -448,20 +455,22 @@ public class TestRightOuterMergeJoinExec {
         LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
     ctx.setEnforcer(enforcer);
 
-    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm);
-    PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
-    ProjectionExec proj = (ProjectionExec) exec;
+    ExecutionPlan execPlan = new ExecutionPlan();
+    execPlan.addPlan(plan);
+    PhysicalPlannerImpl phyPlanner = new PhysicalPlannerImpl(conf,sm);
+    PhysicalExec exec = phyPlanner.createPlanWithoutMaterialize(ctx, execPlan);
+    ProjectionExec proj = (ProjectionExec) ((PhysicalRootExec)exec).getChild(0);
     assertTrue(proj.getChild() instanceof RightOuterMergeJoinExec);
 
     int count = 0;
-    exec.init();
+    proj.init();
 
-    while (exec.next() != null) {
+    while (proj.next() != null) {
       //TODO check contents
       count = count + 1;
     }
-    assertNull(exec.next());
-    exec.close();
+    assertNull(proj.next());
+    proj.close();
     assertEquals(0, count);
   }
 
@@ -484,19 +493,21 @@ public class TestRightOuterMergeJoinExec {
         LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
     ctx.setEnforcer(enforcer);
 
-    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm);
-    PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
-    ProjectionExec proj = (ProjectionExec) exec;
+    ExecutionPlan execPlan = new ExecutionPlan();
+    execPlan.addPlan(plan);
+    PhysicalPlannerImpl phyPlanner = new PhysicalPlannerImpl(conf,sm);
+    PhysicalExec exec = phyPlanner.createPlanWithoutMaterialize(ctx, execPlan);
+    ProjectionExec proj = (ProjectionExec) ((PhysicalRootExec)exec).getChild(0);
     assertTrue(proj.getChild() instanceof RightOuterMergeJoinExec);
 
     int count = 0;
-    exec.init();
+    proj.init();
 
-    while (exec.next() != null) {
+    while (proj.next() != null) {
       //TODO check contents
       count = count + 1;
     }
-    exec.close();
+    proj.close();
     assertEquals(7, count);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/7c97735e/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestSortExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestSortExec.java
b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestSortExec.java
index 45badd5..77b93de 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestSortExec.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestSortExec.java
@@ -21,8 +21,6 @@ package org.apache.tajo.engine.planner.physical;
 import org.apache.hadoop.fs.Path;
 import org.apache.tajo.LocalTajoTestingUtility;
 import org.apache.tajo.TajoTestingCluster;
-import org.apache.tajo.storage.fragment.FileFragment;
-import org.apache.tajo.worker.TaskAttemptContext;
 import org.apache.tajo.algebra.Expr;
 import org.apache.tajo.catalog.*;
 import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
@@ -33,9 +31,12 @@ import org.apache.tajo.datum.DatumFactory;
 import org.apache.tajo.engine.parser.SQLAnalyzer;
 import org.apache.tajo.engine.planner.*;
 import org.apache.tajo.engine.planner.enforce.Enforcer;
+import org.apache.tajo.engine.planner.global.ExecutionPlan;
 import org.apache.tajo.engine.planner.logical.LogicalNode;
 import org.apache.tajo.storage.*;
+import org.apache.tajo.storage.fragment.FileFragment;
 import org.apache.tajo.util.CommonTestingUtil;
+import org.apache.tajo.worker.TaskAttemptContext;
 import org.junit.After;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -118,8 +119,11 @@ public class TestSortExec {
     LogicalPlan plan = planner.createPlan(context);
     LogicalNode rootNode = optimizer.optimize(plan);
 
-    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf, sm);
-    PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode);
+    ExecutionPlan execPlan = new ExecutionPlan();
+    execPlan.addPlan(rootNode);
+    PhysicalPlannerImpl phyPlanner = new PhysicalPlannerImpl(conf, sm);
+    PhysicalExec exec = phyPlanner.createPlanWithoutMaterialize(ctx, execPlan);
+    exec = ((PhysicalRootExec)exec).getChild(0);
 
     Tuple tuple;
     Datum preVal = null;

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/7c97735e/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/worker/TestRangeRetrieverHandler.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/worker/TestRangeRetrieverHandler.java
b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/worker/TestRangeRetrieverHandler.java
index 6934872..39408a0 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/worker/TestRangeRetrieverHandler.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/worker/TestRangeRetrieverHandler.java
@@ -34,6 +34,7 @@ import org.apache.tajo.datum.Datum;
 import org.apache.tajo.datum.DatumFactory;
 import org.apache.tajo.engine.parser.SQLAnalyzer;
 import org.apache.tajo.engine.planner.*;
+import org.apache.tajo.engine.planner.global.ExecutionPlan;
 import org.apache.tajo.engine.planner.logical.LogicalNode;
 import org.apache.tajo.engine.planner.physical.*;
 import org.apache.tajo.storage.*;
@@ -105,7 +106,8 @@ public class TestRangeRetrieverHandler {
 
     Path tableDir = StorageUtil.concatPath(testDir, "testGet", "table.csv");
     fs.mkdirs(tableDir.getParent());
-    Appender appender = sm.getAppender(employeeMeta, schema, tableDir);
+    Appender appender =
+        sm.getAppender(employeeMeta, schema, tableDir);
     appender.init();
 
     Tuple tuple = new VTuple(schema.getColumnNum());
@@ -137,10 +139,12 @@ public class TestRangeRetrieverHandler {
     LogicalPlan plan = planner.createPlan(expr);
     LogicalNode rootNode = optimizer.optimize(plan);
 
-    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm);
-    PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode);
+    ExecutionPlan execPlan = new ExecutionPlan();
+    execPlan.addPlan(rootNode);
+    PhysicalPlannerImpl phyPlanner = new PhysicalPlannerImpl(conf,sm);
+    PhysicalExec exec = phyPlanner.createPlanWithoutMaterialize(ctx, execPlan);
 
-    ProjectionExec proj = (ProjectionExec) exec;
+    ProjectionExec proj = (ProjectionExec) ((PhysicalRootExec)exec).getChild(0);
     MemSortExec sort = (MemSortExec) proj.getChild();
 
     SortSpec[] sortSpecs = sort.getPlan().getSortKeys();
@@ -149,7 +153,7 @@ public class TestRangeRetrieverHandler {
 
     exec = idxStoreExec;
     exec.init();
-    exec.next();
+    while (exec.next() != null);
     exec.close();
 
     Schema keySchema = PlannerUtil.sortSpecsToSchema(sortSpecs);
@@ -250,10 +254,12 @@ public class TestRangeRetrieverHandler {
     LogicalPlan plan = planner.createPlan(expr);
     LogicalNode rootNode = optimizer.optimize(plan);
 
-    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm);
-    PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode);
+    ExecutionPlan execPlan = new ExecutionPlan();
+    execPlan.addPlan(rootNode);
+    PhysicalPlannerImpl phyPlanner = new PhysicalPlannerImpl(conf,sm);
+    PhysicalExec exec = phyPlanner.createPlanWithoutMaterialize(ctx, execPlan);
 
-    ProjectionExec proj = (ProjectionExec) exec;
+    ProjectionExec proj = (ProjectionExec) ((PhysicalRootExec)exec).getChild(0);
     MemSortExec sort = (MemSortExec) proj.getChild();
     SortSpec[] sortSpecs = sort.getPlan().getSortKeys();
     IndexedStoreExec idxStoreExec = new IndexedStoreExec(ctx, sm, sort,
@@ -261,7 +267,7 @@ public class TestRangeRetrieverHandler {
 
     exec = idxStoreExec;
     exec.init();
-    exec.next();
+    while (exec.next() != null);
     exec.close();
 
     Schema keySchema = PlannerUtil.sortSpecsToSchema(sortSpecs);


Mime
View raw message