tajo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hyun...@apache.org
Subject git commit: TAJO-881: JOIN with union query occurs NPE. (Hyoungjun Kim via hyunsik)
Date Thu, 26 Jun 2014 03:24:22 GMT
Repository: tajo
Updated Branches:
  refs/heads/master b37583613 -> 6cfd448f7


TAJO-881: JOIN with union query occurs NPE. (Hyoungjun Kim via hyunsik)

Closes #48


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/6cfd448f
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/6cfd448f
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/6cfd448f

Branch: refs/heads/master
Commit: 6cfd448f7fb60a254a237230f071456f188f9179
Parents: b375836
Author: Hyunsik Choi <hyunsik@apache.org>
Authored: Thu Jun 26 12:08:12 2014 +0900
Committer: Hyunsik Choi <hyunsik@apache.org>
Committed: Thu Jun 26 12:08:12 2014 +0900

----------------------------------------------------------------------
 CHANGES                                         |   2 +
 .../tajo/engine/planner/enforce/Enforcer.java   |   2 +
 .../engine/planner/global/ExecutionBlock.java   |  10 +
 .../engine/planner/global/GlobalPlanner.java    | 157 +++++++++++--
 .../tajo/engine/planner/global/MasterPlan.java  |   7 +-
 .../planner/rewrite/ProjectionPushDownRule.java |  11 +-
 .../tajo/master/DefaultTaskScheduler.java       |   7 +-
 .../tajo/master/querymaster/QueryUnit.java      |  12 +-
 .../tajo/master/querymaster/Repartitioner.java  |  91 ++++----
 .../tajo/engine/query/TestUnionQuery.java       | 220 +++++++++++++++++++
 10 files changed, 457 insertions(+), 62 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/6cfd448f/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 77186e6..6389bf9 100644
--- a/CHANGES
+++ b/CHANGES
@@ -74,6 +74,8 @@ Release 0.9.0 - unreleased
 
   BUG FIXES
 
+    TAJO-881: JOIN with union query occurs NPE. (Hyoungjun Kim via hyunsik)
+
     TAJO-884: complex join conditions should be supported in ON clause.
     (hyunsik)
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/6cfd448f/tajo-core/src/main/java/org/apache/tajo/engine/planner/enforce/Enforcer.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/enforce/Enforcer.java
b/tajo-core/src/main/java/org/apache/tajo/engine/planner/enforce/Enforcer.java
index 36820cc..031569e 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/enforce/Enforcer.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/enforce/Enforcer.java
@@ -311,6 +311,8 @@ public class Enforcer implements ProtoObject<EnforcerProto> {
       }
       break;
     case SORTED_INPUT:
+      SortedInputEnforce sortedInput = property.getSortedInput();
+      sb.append("sorted input=" + sortedInput.getTableName());
     }
 
     return sb.toString();

http://git-wip-us.apache.org/repos/asf/tajo/blob/6cfd448f/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/ExecutionBlock.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/ExecutionBlock.java
b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/ExecutionBlock.java
index b731cec..1d14996 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/ExecutionBlock.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/ExecutionBlock.java
@@ -34,6 +34,9 @@ public class ExecutionBlock {
   private List<ScanNode> scanlist = new ArrayList<ScanNode>();
   private Enforcer enforcer = new Enforcer();
 
+  // Actual ScanNode's ExecutionBlockId -> Delegated ScanNode's ExecutionBlockId.
+  private Map<ExecutionBlockId, ExecutionBlockId> unionScanMap = new HashMap<ExecutionBlockId,
ExecutionBlockId>();
+
   private boolean hasJoinPlan;
   private boolean hasUnionPlan;
 
@@ -83,6 +86,13 @@ public class ExecutionBlock {
     }
   }
 
+  public void addUnionScan(ExecutionBlockId realScanEbId, ExecutionBlockId delegatedScanEbId)
{
+    unionScanMap.put(realScanEbId, delegatedScanEbId);
+  }
+
+  public Map<ExecutionBlockId, ExecutionBlockId> getUnionScanMap() {
+    return unionScanMap;
+  }
 
   public LogicalNode getPlan() {
     return plan;

http://git-wip-us.apache.org/repos/asf/tajo/blob/6cfd448f/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
index 536dbd8..f12b7e2 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
@@ -25,6 +25,7 @@ import com.google.common.collect.Sets;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.Path;
+import org.apache.tajo.ExecutionBlockId;
 import org.apache.tajo.algebra.JoinType;
 import org.apache.tajo.catalog.*;
 import org.apache.tajo.catalog.partition.PartitionMethodDesc;
@@ -230,7 +231,7 @@ public class GlobalPlanner {
                                         ExecutionBlock leftBlock, ExecutionBlock rightBlock)
       throws PlanningException {
     MasterPlan masterPlan = context.plan;
-    ExecutionBlock currentBlock = null;
+    ExecutionBlock currentBlock;
 
     boolean autoBroadcast = conf.getBoolVar(TajoConf.ConfVars.DIST_QUERY_BROADCAST_JOIN_AUTO);
 
@@ -310,22 +311,143 @@ public class GlobalPlanner {
     }
 
     // symmetric repartition join
-    currentBlock = masterPlan.newExecutionBlock();
+    boolean leftUnion = leftNode.getType() == NodeType.TABLE_SUBQUERY &&
+        ((TableSubQueryNode)leftNode).getSubQuery().getType() == NodeType.UNION;
+    boolean rightUnion = rightNode.getType() == NodeType.TABLE_SUBQUERY &&
+        ((TableSubQueryNode)rightNode).getSubQuery().getType() == NodeType.UNION;
+
+    if (leftUnion || rightUnion) { // if one of child execution block is union
+      /*
+       Join with tableC and result of union tableA, tableB is expected the following physical
plan.
+       But Union execution block is not necessary.
+       |-eb_0001_000006 (Terminal)
+          |-eb_0001_000005 (Join eb_0001_000003, eb_0001_000004)
+             |-eb_0001_000004 (Scan TableC)
+             |-eb_0001_000003 (Union TableA, TableB)
+               |-eb_0001_000002 (Scan TableB)
+               |-eb_0001_000001 (Scan TableA)
+
+       The above plan can be changed to the following plan.
+       |-eb_0001_000005 (Terminal)
+          |-eb_0001_000003    (Join [eb_0001_000001, eb_0001_000002], eb_0001_000004)
+             |-eb_0001_000004 (Scan TableC)
+             |-eb_0001_000002 (Scan TableB)
+             |-eb_0001_000001 (Scan TableA)
+
+       eb_0001_000003's left child should be eb_0001_000001 + eb_0001_000001 and right child
should be eb_0001_000004.
+       For this eb_0001_000001 is representative of eb_0001_000001, eb_0001_000002.
+       So eb_0001_000003's left child is eb_0001_000001
+       */
+      Column[][] joinColumns = null;
+      if (joinNode.getJoinType() != JoinType.CROSS) {
+        // ShuffleKeys need to not have thea-join condition because Tajo supports only equi-join.
+        joinColumns = PlannerUtil.joinJoinKeyForEachTable(joinNode.getJoinQual(),
+            leftNode.getOutSchema(), rightNode.getOutSchema(), false);
+      }
 
-    DataChannel leftChannel = createDataChannelFromJoin(leftBlock, rightBlock, currentBlock,
joinNode, true);
-    DataChannel rightChannel = createDataChannelFromJoin(leftBlock, rightBlock, currentBlock,
joinNode, false);
+      if (leftUnion && !rightUnion) { // if only left is union
+        currentBlock = leftBlock;
+        context.execBlockMap.remove(leftNode.getPID());
+        Column[] shuffleKeys = (joinColumns != null) ? joinColumns[0] : null;
+        Column[] otherSideShuffleKeys = (joinColumns != null) ? joinColumns[1] : null;
+        buildJoinPlanWithUnionChannel(context, joinNode, currentBlock, leftBlock, rightBlock,
leftNode,
+            shuffleKeys, otherSideShuffleKeys, true);
+        currentBlock.setPlan(joinNode);
+      } else if (!leftUnion && rightUnion) { // if only right is union
+        currentBlock = rightBlock;
+        context.execBlockMap.remove(rightNode.getPID());
+        Column[] shuffleKeys = (joinColumns != null) ? joinColumns[1] : null;
+        Column[] otherSideShuffleKeys = (joinColumns != null) ? joinColumns[0] : null;
+        buildJoinPlanWithUnionChannel(context, joinNode, currentBlock, rightBlock, leftBlock,
rightNode,
+            shuffleKeys, otherSideShuffleKeys, false);
+        currentBlock.setPlan(joinNode);
+      } else { // if both are unions
+        currentBlock = leftBlock;
+        context.execBlockMap.remove(leftNode.getPID());
+        context.execBlockMap.remove(rightNode.getPID());
+        buildJoinPlanWithUnionChannel(context, joinNode, currentBlock, leftBlock, null, leftNode,
+            (joinColumns != null ? joinColumns[0] : null), null, true);
+        buildJoinPlanWithUnionChannel(context, joinNode, currentBlock, rightBlock, null,
rightNode,
+            (joinColumns != null ? joinColumns[1] : null), null, false);
+        currentBlock.setPlan(joinNode);
+      }
 
-    ScanNode leftScan = buildInputExecutor(masterPlan.getLogicalPlan(), leftChannel);
-    ScanNode rightScan = buildInputExecutor(masterPlan.getLogicalPlan(), rightChannel);
+      return currentBlock;
+    } else {
+      // !leftUnion && !rightUnion
+      currentBlock = masterPlan.newExecutionBlock();
+      DataChannel leftChannel = createDataChannelFromJoin(leftBlock, rightBlock, currentBlock,
joinNode, true);
+      DataChannel rightChannel = createDataChannelFromJoin(leftBlock, rightBlock, currentBlock,
joinNode, false);
 
-    joinNode.setLeftChild(leftScan);
-    joinNode.setRightChild(rightScan);
-    currentBlock.setPlan(joinNode);
+      ScanNode leftScan = buildInputExecutor(masterPlan.getLogicalPlan(), leftChannel);
+      ScanNode rightScan = buildInputExecutor(masterPlan.getLogicalPlan(), rightChannel);
 
-    masterPlan.addConnect(leftChannel);
-    masterPlan.addConnect(rightChannel);
+      joinNode.setLeftChild(leftScan);
+      joinNode.setRightChild(rightScan);
+      currentBlock.setPlan(joinNode);
 
-    return currentBlock;
+      masterPlan.addConnect(leftChannel);
+      masterPlan.addConnect(rightChannel);
+
+      return currentBlock;
+    }
+  }
+
+  private void buildJoinPlanWithUnionChannel(GlobalPlanContext context, JoinNode joinNode,
+                                             ExecutionBlock targetBlock,
+                                             ExecutionBlock sourceBlock,
+                                             ExecutionBlock otherSideBlock,
+                                             LogicalNode childNode,
+                                             Column[] shuffleKeys,
+                                             Column[] otherSideShuffleKeys,
+                                             boolean left) {
+    MasterPlan masterPlan = context.getPlan();
+    String subQueryRelationName = ((TableSubQueryNode)childNode).getCanonicalName();
+    ExecutionBlockId dedicatedScanNodeBlock = null;
+    for (DataChannel channel : masterPlan.getIncomingChannels(sourceBlock.getId())) {
+      // If all union and right, add channel to left
+      if (otherSideBlock == null && !left) {
+        DataChannel oldChannel = channel;
+        masterPlan.disconnect(oldChannel.getSrcId(), oldChannel.getTargetId());
+        channel = new DataChannel(oldChannel.getSrcId(), targetBlock.getId());
+      }
+      channel.setSchema(childNode.getOutSchema());
+      channel.setShuffleType(HASH_SHUFFLE);
+      channel.setShuffleOutputNum(32);
+      if (shuffleKeys != null) {
+        channel.setShuffleKeys(shuffleKeys);
+      }
+
+      ScanNode scanNode = buildInputExecutor(masterPlan.getLogicalPlan(), channel);
+      scanNode.getOutSchema().setQualifier(subQueryRelationName);
+      if (dedicatedScanNodeBlock == null) {
+        dedicatedScanNodeBlock = channel.getSrcId();
+        if (left) {
+          joinNode.setLeftChild(scanNode);
+        } else {
+          joinNode.setRightChild(scanNode);
+        }
+      }
+      masterPlan.addConnect(channel);
+      targetBlock.addUnionScan(channel.getSrcId(), dedicatedScanNodeBlock);
+    }
+
+    // create other side channel
+    if (otherSideBlock != null) {
+      DataChannel otherSideChannel = new DataChannel(otherSideBlock, targetBlock, HASH_SHUFFLE,
32);
+      otherSideChannel.setStoreType(storeType);
+      if (otherSideShuffleKeys != null) {
+        otherSideChannel.setShuffleKeys(otherSideShuffleKeys);
+      }
+      masterPlan.addConnect(otherSideChannel);
+
+      ScanNode scan = buildInputExecutor(masterPlan.getLogicalPlan(), otherSideChannel);
+      if (left) {
+        joinNode.setRightChild(scan);
+      } else {
+        joinNode.setLeftChild(scan);
+      }
+    }
   }
 
   private AggregationFunctionCallEval createSumFunction(EvalNode [] args) throws InternalException
{
@@ -1211,7 +1333,6 @@ public class GlobalPlanner {
             }
           }
         }
-
         if (leftMostSubQueryNode != null) {
           // replace target column name
           Target[] targets = leftMostSubQueryNode.getTargets();
@@ -1221,6 +1342,16 @@ public class GlobalPlanner {
               throw new PlanningException("Target of a UnionNode's subquery should be FieldEval.");
             }
             int index = leftMostSubQueryNode.getInSchema().getColumnId(targets[i].getNamedColumn().getQualifiedName());
+            if (index < 0) {
+              // If a target has alias, getNamedColumn() only returns alias
+              Set<Column> columns = EvalTreeUtil.findUniqueColumns(targets[i].getEvalTree());
+              Column column = columns.iterator().next();
+              index = leftMostSubQueryNode.getInSchema().getColumnId(column.getQualifiedName());
+            }
+            if (index < 0) {
+              throw new PlanningException("Can't find matched Target in UnionNode's input
schema: " + targets[i]
+                  + "->" + leftMostSubQueryNode.getInSchema());
+            }
             targetMappings[i] = index;
           }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/6cfd448f/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/MasterPlan.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/MasterPlan.java
b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/MasterPlan.java
index 37b0db1..a8593e5 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/MasterPlan.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/MasterPlan.java
@@ -238,7 +238,11 @@ public class MasterPlan {
       if (!isLeaf(block)) {
         sb.append("\n[Incoming]\n");
         for (DataChannel channel : getIncomingChannels(block.getId())) {
-          sb.append(channel).append("\n");
+          sb.append(channel);
+          if (block.getUnionScanMap().containsKey(channel.getSrcId())) {
+            sb.append(", union delegated scan: ").append(block.getUnionScanMap().get(channel.getSrcId()));
+          }
+          sb.append("\n");
         }
       }
 
@@ -250,6 +254,7 @@ public class MasterPlan {
         }
       }
 
+
       if (block.getEnforcer().getProperties().size() > 0) {
         sb.append("\n[Enforcers]\n");
         int i = 0;

http://git-wip-us.apache.org/repos/asf/tajo/blob/6cfd448f/tajo-core/src/main/java/org/apache/tajo/engine/planner/rewrite/ProjectionPushDownRule.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/rewrite/ProjectionPushDownRule.java
b/tajo-core/src/main/java/org/apache/tajo/engine/planner/rewrite/ProjectionPushDownRule.java
index 827daee..8d80d39 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/rewrite/ProjectionPushDownRule.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/rewrite/ProjectionPushDownRule.java
@@ -27,6 +27,7 @@ import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.catalog.SortSpec;
 import org.apache.tajo.engine.eval.*;
 import org.apache.tajo.engine.planner.*;
+import org.apache.tajo.engine.planner.LogicalPlan.QueryBlock;
 import org.apache.tajo.engine.planner.logical.*;
 import org.apache.tajo.engine.utils.SchemaUtil;
 import org.apache.tajo.util.TUtil;
@@ -54,11 +55,15 @@ public class ProjectionPushDownRule extends
   public boolean isEligible(LogicalPlan plan) {
     LogicalNode toBeOptimized = plan.getRootBlock().getRoot();
 
-    if (PlannerUtil.checkIfDDLPlan(toBeOptimized) || !plan.getRootBlock().hasTableExpression())
{
+    if (PlannerUtil.checkIfDDLPlan(toBeOptimized)) {
       return false;
     }
-
-    return true;
+    for (QueryBlock eachBlock: plan.getQueryBlocks()) {
+      if (eachBlock.hasTableExpression()) {
+        return true;
+      }
+    }
+    return false;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/tajo/blob/6cfd448f/tajo-core/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java b/tajo-core/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java
index 94d0381..21df4e9 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java
@@ -30,7 +30,6 @@ import org.apache.tajo.QueryIdFactory;
 import org.apache.tajo.QueryUnitAttemptId;
 import org.apache.tajo.engine.planner.global.ExecutionBlock;
 import org.apache.tajo.engine.planner.global.MasterPlan;
-import org.apache.tajo.engine.planner.logical.ScanNode;
 import org.apache.tajo.engine.query.QueryUnitRequest;
 import org.apache.tajo.engine.query.QueryUnitRequestImpl;
 import org.apache.tajo.ipc.TajoWorkerProtocol;
@@ -872,11 +871,11 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler {
           if (checkIfInterQuery(subQuery.getMasterPlan(), subQuery.getBlock())) {
             taskAssign.setInterQuery();
           }
-          for (ScanNode scan : task.getScanNodes()) {
-            Collection<FetchImpl> fetches = task.getFetch(scan);
+          for(Map.Entry<String, Set<FetchImpl>> entry: task.getFetchMap().entrySet())
{
+            Collection<FetchImpl> fetches = entry.getValue();
             if (fetches != null) {
               for (FetchImpl fetch : fetches) {
-                taskAssign.addFetch(scan.getTableName(), fetch);
+                taskAssign.addFetch(entry.getKey(), fetch);
               }
             }
           }

http://git-wip-us.apache.org/repos/asf/tajo/blob/6cfd448f/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java
index 33cf19b..6cada07 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java
@@ -28,6 +28,7 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.state.*;
+import org.apache.tajo.ExecutionBlockId;
 import org.apache.tajo.QueryIdFactory;
 import org.apache.tajo.QueryUnitAttemptId;
 import org.apache.tajo.QueryUnitId;
@@ -661,6 +662,7 @@ public class QueryUnit implements EventHandler<TaskEvent> {
   }
 
   public static class IntermediateEntry {
+    ExecutionBlockId ebId;
     int taskId;
     int attemptId;
     int partId;
@@ -673,6 +675,14 @@ public class QueryUnit implements EventHandler<TaskEvent> {
       this.host = host;
     }
 
+    public ExecutionBlockId getEbId() {
+      return ebId;
+    }
+
+    public void setEbId(ExecutionBlockId ebId) {
+      this.ebId = ebId;
+    }
+
     public int getTaskId() {
       return this.taskId;
     }
@@ -691,7 +701,7 @@ public class QueryUnit implements EventHandler<TaskEvent> {
 
     @Override
     public int hashCode() {
-      return Objects.hashCode(taskId, partId, attemptId, host);
+      return Objects.hashCode(ebId, taskId, partId, attemptId, host);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/6cfd448f/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
index 6c000a1..6373bb4 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
@@ -87,13 +87,9 @@ public class Repartitioner {
     for (int i = 0; i < scans.length; i++) {
       TableDesc tableDesc = masterContext.getTableDescMap().get(scans[i].getCanonicalName());
       if (tableDesc == null) { // if it is a real table stored on storage
-        // TODO - to be fixed (wrong directory)
-        ExecutionBlock [] childBlocks = new ExecutionBlock[2];
-        childBlocks[0] = masterPlan.getChild(execBlock.getId(), 0);
-        childBlocks[1] = masterPlan.getChild(execBlock.getId(), 1);
-
         tablePath = storageManager.getTablePath(scans[i].getTableName());
-        stats[i] = masterContext.getSubQuery(childBlocks[i].getId()).getResultStats().getNumBytes();
+        ExecutionBlockId scanEBId = TajoIdUtils.createExecutionBlockId(scans[i].getTableName());
+        stats[i] = masterContext.getSubQuery(scanEBId).getResultStats().getNumBytes();
         fragments[i] = new FileFragment(scans[i].getCanonicalName(), tablePath, 0, 0, new
String[]{UNKNOWN_HOST});
       } else {
         tablePath = tableDesc.getPath();
@@ -161,28 +157,38 @@ public class Repartitioner {
     } else {
       LOG.info("[Distributed Join Strategy] : Symmetric Repartition Join");
       // The hash map is modeling as follows:
-      // <Part Id, <Table Name, Intermediate Data>>
-      Map<Integer, Map<String, List<IntermediateEntry>>> hashEntries =
new HashMap<Integer, Map<String, List<IntermediateEntry>>>();
+      // <Part Id, <EbId, Intermediate Data>>
+      Map<Integer, Map<ExecutionBlockId, List<IntermediateEntry>>> hashEntries
=
+          new HashMap<Integer, Map<ExecutionBlockId, List<IntermediateEntry>>>();
 
       // Grouping IntermediateData by a partition key and a table name
-      for (ScanNode scan : scans) {
-        SubQuery childSubQuery = masterContext.getSubQuery(TajoIdUtils.createExecutionBlockId(scan.getCanonicalName()));
-        for (QueryUnit task : childSubQuery.getQueryUnits()) {
+      List<ExecutionBlock> childBlocks = masterPlan.getChilds(subQuery.getId());
+
+      // In the case of join with union, there is one ScanNode for union.
+      Map<ExecutionBlockId, ExecutionBlockId> unionScanMap = execBlock.getUnionScanMap();
+      for (ExecutionBlock childBlock : childBlocks) {
+        ExecutionBlockId scanEbId = unionScanMap.get(childBlock.getId());
+        if (scanEbId == null) {
+          scanEbId = childBlock.getId();
+        }
+        SubQuery childExecSM = subQuery.getContext().getSubQuery(childBlock.getId());
+        for (QueryUnit task : childExecSM.getQueryUnits()) {
           if (task.getIntermediateData() != null && !task.getIntermediateData().isEmpty())
{
             for (IntermediateEntry intermEntry : task.getIntermediateData()) {
+              intermEntry.setEbId(childBlock.getId());
               if (hashEntries.containsKey(intermEntry.getPartId())) {
-                Map<String, List<IntermediateEntry>> tbNameToInterm =
+                Map<ExecutionBlockId, List<IntermediateEntry>> tbNameToInterm
=
                     hashEntries.get(intermEntry.getPartId());
 
-                if (tbNameToInterm.containsKey(scan.getCanonicalName())) {
-                  tbNameToInterm.get(scan.getCanonicalName()).add(intermEntry);
+                if (tbNameToInterm.containsKey(scanEbId)) {
+                  tbNameToInterm.get(scanEbId).add(intermEntry);
                 } else {
-                  tbNameToInterm.put(scan.getCanonicalName(), TUtil.newList(intermEntry));
+                  tbNameToInterm.put(scanEbId, TUtil.newList(intermEntry));
                 }
               } else {
-                Map<String, List<IntermediateEntry>> tbNameToInterm =
-                    new HashMap<String, List<IntermediateEntry>>();
-                tbNameToInterm.put(scan.getCanonicalName(), TUtil.newList(intermEntry));
+                Map<ExecutionBlockId, List<IntermediateEntry>> tbNameToInterm
=
+                    new HashMap<ExecutionBlockId, List<IntermediateEntry>>();
+                tbNameToInterm.put(scanEbId, TUtil.newList(intermEntry));
                 hashEntries.put(intermEntry.getPartId(), tbNameToInterm);
               }
             }
@@ -190,15 +196,15 @@ public class Repartitioner {
             //if no intermidatedata(empty table), make empty entry
             int emptyPartitionId = 0;
             if (hashEntries.containsKey(emptyPartitionId)) {
-              Map<String, List<IntermediateEntry>> tbNameToInterm = hashEntries.get(emptyPartitionId);
-              if (tbNameToInterm.containsKey(scan.getCanonicalName()))
-                tbNameToInterm.get(scan.getCanonicalName())
-                    .addAll(new ArrayList<IntermediateEntry>());
+              Map<ExecutionBlockId, List<IntermediateEntry>> tbNameToInterm =
hashEntries.get(emptyPartitionId);
+              if (tbNameToInterm.containsKey(scanEbId))
+                tbNameToInterm.get(scanEbId).addAll(new ArrayList<IntermediateEntry>());
               else
-                tbNameToInterm.put(scan.getCanonicalName(), new ArrayList<IntermediateEntry>());
+                tbNameToInterm.put(scanEbId, new ArrayList<IntermediateEntry>());
             } else {
-              Map<String, List<IntermediateEntry>> tbNameToInterm = new HashMap<String,
List<IntermediateEntry>>();
-              tbNameToInterm.put(scan.getCanonicalName(), new ArrayList<IntermediateEntry>());
+              Map<ExecutionBlockId, List<IntermediateEntry>> tbNameToInterm =
+                  new HashMap<ExecutionBlockId, List<IntermediateEntry>>();
+              tbNameToInterm.put(scanEbId, new ArrayList<IntermediateEntry>());
               hashEntries.put(emptyPartitionId, tbNameToInterm);
             }
           }
@@ -234,7 +240,7 @@ public class Repartitioner {
       SubQuery.scheduleFragment(subQuery, fragments[0], Arrays.asList(new FileFragment[]{fragments[1]}));
 
       // Assign partitions to tasks in a round robin manner.
-      for (Entry<Integer, Map<String, List<IntermediateEntry>>> entry
+      for (Entry<Integer, Map<ExecutionBlockId, List<IntermediateEntry>>>
entry
           : hashEntries.entrySet()) {
         addJoinShuffle(subQuery, entry.getKey(), entry.getValue());
       }
@@ -321,17 +327,19 @@ public class Repartitioner {
   }
 
   private static void addJoinShuffle(SubQuery subQuery, int partitionId,
-                                     Map<String, List<IntermediateEntry>> grouppedPartitions)
{
+                                     Map<ExecutionBlockId, List<IntermediateEntry>>
grouppedPartitions) {
     Map<String, List<FetchImpl>> fetches = new HashMap<String, List<FetchImpl>>();
     for (ExecutionBlock execBlock : subQuery.getMasterPlan().getChilds(subQuery.getId()))
{
-      Collection<FetchImpl> requests;
-      if (grouppedPartitions.containsKey(execBlock.getId().toString())) {
-          requests = mergeShuffleRequest(execBlock.getId(), partitionId, HASH_SHUFFLE,
-              grouppedPartitions.get(execBlock.getId().toString()));
-      } else {
-        return;
+      if (grouppedPartitions.containsKey(execBlock.getId())) {
+        Collection<FetchImpl> requests = mergeShuffleRequest(partitionId, HASH_SHUFFLE,
+            grouppedPartitions.get(execBlock.getId()));
+        fetches.put(execBlock.getId().toString(), Lists.newArrayList(requests));
       }
-      fetches.put(execBlock.getId().toString(), Lists.newArrayList(requests));
+    }
+
+    if (fetches.isEmpty()) {
+      LOG.info(subQuery.getId() + "'s " + partitionId + " partition has empty result.");
+      return;
     }
     SubQuery.scheduleFetches(subQuery, fetches);
   }
@@ -342,20 +350,23 @@ public class Repartitioner {
    *
    * @return key: pullserver's address, value: a list of requests
    */
-  private static Collection<FetchImpl> mergeShuffleRequest(ExecutionBlockId ebid, int
partitionId,
+  private static Collection<FetchImpl> mergeShuffleRequest(int partitionId,
                                                           TajoWorkerProtocol.ShuffleType
type,
                                                           List<IntermediateEntry> partitions)
{
-    Map<QueryUnit.PullHost, FetchImpl> mergedPartitions = new HashMap<QueryUnit.PullHost,
FetchImpl>();
+    // ebId + pullhost -> FetchImmpl
+    Map<String, FetchImpl> mergedPartitions = new HashMap<String, FetchImpl>();
 
     for (IntermediateEntry partition : partitions) {
-      QueryUnit.PullHost host = partition.getPullHost();
-      if (mergedPartitions.containsKey(host)) {
+      String mergedKey = partition.getEbId().toString() + "," + partition.getPullHost();
+
+      if (mergedPartitions.containsKey(mergedKey)) {
         FetchImpl fetch = mergedPartitions.get(partition.getPullHost());
         fetch.addPart(partition.getTaskId(), partition.getAttemptId());
       } else {
-        FetchImpl fetch = new FetchImpl(host, type, ebid, partitionId);
+        // In some cases like union each IntermediateEntry has different EBID.
+        FetchImpl fetch = new FetchImpl(partition.getPullHost(), type, partition.getEbId(),
partitionId);
         fetch.addPart(partition.getTaskId(), partition.getAttemptId());
-        mergedPartitions.put(partition.getPullHost(), fetch);
+        mergedPartitions.put(mergedKey, fetch);
       }
     }
     return mergedPartitions.values();

http://git-wip-us.apache.org/repos/asf/tajo/blob/6cfd448f/tajo-core/src/test/java/org/apache/tajo/engine/query/TestUnionQuery.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestUnionQuery.java b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestUnionQuery.java
index 857cb63..1ec2c33 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestUnionQuery.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestUnionQuery.java
@@ -26,6 +26,8 @@ import org.junit.experimental.categories.Category;
 
 import java.sql.ResultSet;
 
+import static org.junit.Assert.assertEquals;
+
 /*
  * Notations
  * - S - select
@@ -205,4 +207,222 @@ public class TestUnionQuery extends QueryTestCaseBase {
     assertResultSet(res);
     cleanupQuery(res);
   }
+
+  @Test
+  public final void testLeftUnionWithJoin() throws Exception {
+    // https://issues.apache.org/jira/browse/TAJO-881
+    ResultSet res = executeString(
+        "select * from ( " +
+        "  select a.id, b.c_name, a.code from ( " +
+        "    select l_orderkey as id, 'lineitem' as code from lineitem " +
+        "    union all " +
+        "    select o_orderkey as id, 'order' as code from orders " +
+         "  ) a " +
+         "  join customer b on a.id = b.c_custkey" +
+        ") c order by id, code"
+    );
+
+    String expected =
+        "id,c_name,code\n" +
+            "-------------------------------\n" +
+            "1,Customer#000000001,lineitem\n" +
+            "1,Customer#000000001,lineitem\n" +
+            "1,Customer#000000001,order\n" +
+            "2,Customer#000000002,lineitem\n" +
+            "2,Customer#000000002,order\n" +
+            "3,Customer#000000003,lineitem\n" +
+            "3,Customer#000000003,lineitem\n" +
+            "3,Customer#000000003,order\n";
+
+    assertEquals(expected, resultSetToString(res));
+
+    cleanupQuery(res);
+  }
+
+  @Test
+  public final void testRightUnionWithJoin() throws Exception {
+    // https://issues.apache.org/jira/browse/TAJO-881
+    ResultSet res = executeString(
+            "select * from ( " +
+            "  select a.id, b.c_name, a.code from customer b " +
+            "  join ( " +
+            "    select l_orderkey as id, 'lineitem' as code from lineitem " +
+            "    union all " +
+            "    select o_orderkey as id, 'order' as code from orders " +
+            "  ) a on a.id = b.c_custkey" +
+            ") c order by id, code"
+    );
+
+    String expected =
+        "id,c_name,code\n" +
+            "-------------------------------\n" +
+            "1,Customer#000000001,lineitem\n" +
+            "1,Customer#000000001,lineitem\n" +
+            "1,Customer#000000001,order\n" +
+            "2,Customer#000000002,lineitem\n" +
+            "2,Customer#000000002,order\n" +
+            "3,Customer#000000003,lineitem\n" +
+            "3,Customer#000000003,lineitem\n" +
+            "3,Customer#000000003,order\n";
+
+    assertEquals(expected, resultSetToString(res));
+
+    cleanupQuery(res);
+  }
+
+  @Test
+  public final void testAllUnionWithJoin() throws Exception {
+    // https://issues.apache.org/jira/browse/TAJO-881
+    ResultSet res = executeString(
+        "select * from ( " +
+        "  select a.id, a.code as code, b.name, b.code as code2 from ( " +
+        "    select l_orderkey as id, 'lineitem' as code from lineitem " +
+        "    union all " +
+        "    select o_orderkey as id, 'order' as code from orders " +
+        "  ) a " +
+        "  join ( " +
+        "    select c_custkey as id, c_name as name, 'customer' as code from customer " +
+        "    union all " +
+        "    select p_partkey as id, p_name as name, 'part' as code from part " +
+        "  ) b on a.id = b.id" +
+        ") c order by id, code, code2"
+    );
+
+    String expected =
+        "id,code,name,code2\n" +
+            "-------------------------------\n" +
+            "1,lineitem,Customer#000000001,customer\n" +
+            "1,lineitem,Customer#000000001,customer\n" +
+            "1,lineitem,goldenrod lavender spring chocolate lace,part\n" +
+            "1,lineitem,goldenrod lavender spring chocolate lace,part\n" +
+            "1,order,Customer#000000001,customer\n" +
+            "1,order,goldenrod lavender spring chocolate lace,part\n" +
+            "2,lineitem,Customer#000000002,customer\n" +
+            "2,lineitem,blush thistle blue yellow saddle,part\n" +
+            "2,order,Customer#000000002,customer\n" +
+            "2,order,blush thistle blue yellow saddle,part\n" +
+            "3,lineitem,Customer#000000003,customer\n" +
+            "3,lineitem,Customer#000000003,customer\n" +
+            "3,lineitem,spring green yellow purple cornsilk,part\n" +
+            "3,lineitem,spring green yellow purple cornsilk,part\n" +
+            "3,order,Customer#000000003,customer\n" +
+            "3,order,spring green yellow purple cornsilk,part\n";
+
+    assertEquals(expected, resultSetToString(res));
+
+    cleanupQuery(res);
+  }
+
+  @Test
+  public final void testUnionWithCrossJoin() throws Exception {
+    // https://issues.apache.org/jira/browse/TAJO-881
+    ResultSet res = executeString(
+        "select * from ( " +
+        "  select a.id, b.c_name, a.code from ( " +
+            "    select l_orderkey as id, 'lineitem' as code from lineitem " +
+            "    union all " +
+            "    select o_orderkey as id, 'order' as code from orders " +
+            "  ) a, " +
+            "  customer b " +
+        ") c order by id, code, c_name"
+    );
+
+    String expected =
+        "id,c_name,code\n" +
+            "-------------------------------\n" +
+            "1,Customer#000000001,lineitem\n" +
+            "1,Customer#000000001,lineitem\n" +
+            "1,Customer#000000002,lineitem\n" +
+            "1,Customer#000000002,lineitem\n" +
+            "1,Customer#000000003,lineitem\n" +
+            "1,Customer#000000003,lineitem\n" +
+            "1,Customer#000000004,lineitem\n" +
+            "1,Customer#000000004,lineitem\n" +
+            "1,Customer#000000005,lineitem\n" +
+            "1,Customer#000000005,lineitem\n" +
+            "1,Customer#000000001,order\n" +
+            "1,Customer#000000002,order\n" +
+            "1,Customer#000000003,order\n" +
+            "1,Customer#000000004,order\n" +
+            "1,Customer#000000005,order\n" +
+            "2,Customer#000000001,lineitem\n" +
+            "2,Customer#000000002,lineitem\n" +
+            "2,Customer#000000003,lineitem\n" +
+            "2,Customer#000000004,lineitem\n" +
+            "2,Customer#000000005,lineitem\n" +
+            "2,Customer#000000001,order\n" +
+            "2,Customer#000000002,order\n" +
+            "2,Customer#000000003,order\n" +
+            "2,Customer#000000004,order\n" +
+            "2,Customer#000000005,order\n" +
+            "3,Customer#000000001,lineitem\n" +
+            "3,Customer#000000001,lineitem\n" +
+            "3,Customer#000000002,lineitem\n" +
+            "3,Customer#000000002,lineitem\n" +
+            "3,Customer#000000003,lineitem\n" +
+            "3,Customer#000000003,lineitem\n" +
+            "3,Customer#000000004,lineitem\n" +
+            "3,Customer#000000004,lineitem\n" +
+            "3,Customer#000000005,lineitem\n" +
+            "3,Customer#000000005,lineitem\n" +
+            "3,Customer#000000001,order\n" +
+            "3,Customer#000000002,order\n" +
+            "3,Customer#000000003,order\n" +
+            "3,Customer#000000004,order\n" +
+            "3,Customer#000000005,order\n";
+
+    assertEquals(expected, resultSetToString(res));
+
+    cleanupQuery(res);
+  }
+
+  @Test
+  public final void testThreeJoinInUnion() throws Exception {
+    // https://issues.apache.org/jira/browse/TAJO-881
+    ResultSet res = executeString(
+      "select orders.o_orderkey \n" +
+          "from orders\n" +
+          "join lineitem on orders.o_orderkey = lineitem.l_orderkey\n" +
+          "join customer on orders.o_custkey =  customer.c_custkey\n" +
+          "union all \n" +
+          "select nation.n_nationkey from nation"
+    );
+    String expected =
+        "o_orderkey\n" +
+            "-------------------------------\n" +
+            "1\n" +
+            "1\n" +
+            "2\n" +
+            "3\n" +
+            "3\n" +
+            "0\n" +
+            "1\n" +
+            "2\n" +
+            "3\n" +
+            "4\n" +
+            "5\n" +
+            "6\n" +
+            "7\n" +
+            "8\n" +
+            "9\n" +
+            "10\n" +
+            "11\n" +
+            "12\n" +
+            "13\n" +
+            "14\n" +
+            "15\n" +
+            "16\n" +
+            "17\n" +
+            "18\n" +
+            "19\n" +
+            "20\n" +
+            "21\n" +
+            "22\n" +
+            "23\n" +
+            "24\n";
+
+    assertEquals(expected, resultSetToString(res));
+
+    cleanupQuery(res);
+  }
 }
\ No newline at end of file


Mime
View raw message