tajo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jihoon...@apache.org
Subject [38/50] [abbrv] git commit: TAJO-432: Add shuffle phase for column-partitioned table store. (Min Zhou via jihoon)
Date Sat, 28 Dec 2013 06:36:35 GMT
TAJO-432: Add shuffle phase for column-partitioned table store. (Min Zhou via jihoon)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tajo/commit/eac2507a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tajo/tree/eac2507a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tajo/diff/eac2507a

Branch: refs/heads/DAG-execplan
Commit: eac2507afaecea32ce406a32d7632016ebbc593d
Parents: 3a5a617
Author: Jihoon Son <jihoonson@apache.org>
Authored: Thu Dec 26 14:46:24 2013 +0900
Committer: Jihoon Son <jihoonson@apache.org>
Committed: Thu Dec 26 14:46:24 2013 +0900

----------------------------------------------------------------------
 CHANGES.txt                                     |  2 +
 .../engine/planner/global/GlobalPlanner.java    | 68 ++++++++++++++++++--
 2 files changed, 65 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/eac2507a/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 385ef9f..d084bf8 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -105,6 +105,8 @@ Release 0.8.0 - unreleased
 
   IMPROVEMENTS
 
+    TAJO-432: Add shuffle phase for column-partitioned table store. (Min Zhou via jihoon)
+
     TAJO-135: Bump up hadoop to 2.2.0. (jihoon)
 
     TAJO-435: Improve intermediate file. (jinho)

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/eac2507a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
index ea3c366..877a179 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
@@ -27,6 +27,7 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.Path;
 import org.apache.tajo.algebra.JoinType;
 import org.apache.tajo.catalog.*;
+import org.apache.tajo.catalog.partition.PartitionDesc;
 import org.apache.tajo.catalog.proto.CatalogProtos;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.engine.eval.AggregationFunctionCallEval;
@@ -40,6 +41,9 @@ import java.util.*;
 
 import static org.apache.tajo.ipc.TajoWorkerProtocol.PartitionType.*;
 
+/**
+ * Build DAG
+ */
 public class GlobalPlanner {
   private static Log LOG = LogFactory.getLog(GlobalPlanner.class);
 
@@ -316,6 +320,62 @@ public class GlobalPlanner {
     return currentBlock;
   }
 
+
+  private ExecutionBlock buildStorePlan(GlobalPlanContext context,
+                                        ExecutionBlock childBlock,
+                                        StoreTableNode currentNode) {
+    PartitionDesc partitionDesc = currentNode.getPartitions();
+
+    // if result table is not a partitioned table, directly store it
+    if(partitionDesc == null) {
+      currentNode.setChild(childBlock.getPlan());
+      currentNode.setInSchema(childBlock.getPlan().getOutSchema());
+      childBlock.setPlan(currentNode);
+      return childBlock;
+    }
+
+    // if result table is a partitioned table
+    // 1. replace StoreTableNode with its child node,
+    //    old execution block ends at the child node
+    LogicalNode childNode = currentNode.getChild();
+    childBlock.setPlan(childNode);
+
+    // 2. create a new execution block, pipeline 2 exec blocks through a DataChannel
+    MasterPlan masterPlan = context.plan;
+    ExecutionBlock currentBlock = masterPlan.newExecutionBlock();
+    DataChannel channel = null;
+    CatalogProtos.PartitionsType partitionsType = partitionDesc.getPartitionsType();
+    if(partitionsType == CatalogProtos.PartitionsType.COLUMN) {
+      channel = new DataChannel(childBlock, currentBlock, HASH_PARTITION, 32);
+      Column[] columns = new Column[partitionDesc.getColumns().size()];
+      channel.setPartitionKey(partitionDesc.getColumns().toArray(columns));
+      channel.setSchema(childNode.getOutSchema());
+      channel.setStoreType(storeType);
+    } else if (partitionsType == CatalogProtos.PartitionsType.HASH) {
+      channel = new DataChannel(childBlock, currentBlock, HASH_PARTITION,
+          partitionDesc.getNumPartitions());
+      Column[] columns = new Column[partitionDesc.getColumns().size()];
+      channel.setPartitionKey(partitionDesc.getColumns().toArray(columns));
+      channel.setSchema(childNode.getOutSchema());
+      channel.setStoreType(storeType);
+    } else if(partitionsType == CatalogProtos.PartitionsType.RANGE) {
+      // TODO
+    } else if(partitionsType == CatalogProtos.PartitionsType.LIST) {
+      // TODO
+    }
+
+    // 3. create a ScanNode for scanning shuffle data
+    //    StoreTableNode as the root node of the new execution block
+    ScanNode scanNode = buildInputExecutor(masterPlan.getLogicalPlan(), channel);
+    currentNode.setChild(scanNode);
+    currentNode.setInSchema(scanNode.getOutSchema());
+    currentBlock.setPlan(currentNode);
+
+    masterPlan.addConnect(channel);
+
+    return currentBlock;
+  }
+
   public class DistributedPlannerVisitor extends BasicLogicalPlanVisitor<GlobalPlanContext,
LogicalNode> {
 
     @Override
@@ -525,11 +585,9 @@ public class GlobalPlanner {
                                        Stack<LogicalNode> stack) throws PlanningException
{
       LogicalNode child = super.visitStoreTable(context, plan, node, stack);
 
-      ExecutionBlock execBlock = context.execBlockMap.remove(child.getPID());
-      node.setChild(execBlock.getPlan());
-      node.setInSchema(execBlock.getPlan().getOutSchema());
-      execBlock.setPlan(node);
-      context.execBlockMap.put(node.getPID(), execBlock);
+      ExecutionBlock childBlock = context.execBlockMap.remove(child.getPID());
+      ExecutionBlock newExecBlock = buildStorePlan(context, childBlock, node);
+      context.execBlockMap.put(node.getPID(), newExecBlock);
 
       return node;
     }


Mime
View raw message