tajo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From blrun...@apache.org
Subject git commit: TAJO-673: Assign proper number of tasks when inserting into partitioned table. (jaehwa)
Date Tue, 15 Jul 2014 03:35:19 GMT
Repository: tajo
Updated Branches:
  refs/heads/master 5e2d6c3c7 -> 48dbfd92c


TAJO-673: Assign proper number of tasks when inserting into partitioned table. (jaehwa)


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/48dbfd92
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/48dbfd92
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/48dbfd92

Branch: refs/heads/master
Commit: 48dbfd92c4f424ffef6c9aa03e905882bd42bbec
Parents: 5e2d6c3
Author: blrunner <jhjung@gruter.com>
Authored: Tue Jul 15 12:35:00 2014 +0900
Committer: blrunner <jhjung@gruter.com>
Committed: Tue Jul 15 12:35:00 2014 +0900

----------------------------------------------------------------------
 CHANGES                                         |   2 +
 .../java/org/apache/tajo/conf/TajoConf.java     |   3 +
 .../engine/planner/PhysicalPlannerImpl.java     |   1 +
 .../engine/planner/global/GlobalPlanner.java    |   3 +-
 .../physical/HashShuffleFileWriteExec.java      |   1 +
 .../tajo/master/querymaster/QueryUnit.java      |  13 +++
 .../master/querymaster/QueryUnitAttempt.java    |   2 +-
 .../tajo/master/querymaster/Repartitioner.java  | 113 ++++++++++++++++++-
 .../main/java/org/apache/tajo/worker/Task.java  |  11 ++
 .../apache/tajo/worker/TaskAttemptContext.java  |  18 +++
 .../src/main/proto/TajoWorkerProtocol.proto     |   2 +
 .../tajo/engine/query/TestTablePartitions.java  | 108 ++++++++++++++++--
 .../tajo/pullserver/PullServerAuxService.java   |   4 +-
 .../tajo/pullserver/TajoPullServerService.java  |   4 +-
 14 files changed, 264 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/48dbfd92/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index c0dcbe3..3803ded 100644
--- a/CHANGES
+++ b/CHANGES
@@ -84,6 +84,8 @@ Release 0.9.0 - unreleased
 
   BUG FIXES
 
+    TAJO-673: Assign proper number of tasks when inserting into partitioned table. (jaehwa)
+
     TAJO-916: SubQuery::computeStatFromTasks occasionally fail.
     (Hyoungjun Kim via hyunsik)
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/48dbfd92/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
index dd5327d..83ff9ed 100644
--- a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
+++ b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
@@ -249,6 +249,9 @@ public class TajoConf extends Configuration {
     DIST_QUERY_SORT_PARTITION_VOLUME("tajo.dist-query.sort.partition-volume-mb", 256),
     DIST_QUERY_GROUPBY_PARTITION_VOLUME("tajo.dist-query.groupby.partition-volume-mb", 256),
 
+    DIST_QUERY_TABLE_PARTITION_VOLUME("tajo.dist-query.table-partition.task-volume-mb",
+        256 * 1024 * 1024),
+
     //////////////////////////////////
     // Physical Executors
     //////////////////////////////////

http://git-wip-us.apache.org/repos/asf/tajo/blob/48dbfd92/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
b/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
index cf02ecd..6678e46 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
@@ -732,6 +732,7 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
                                                  ShuffleFileWriteNode plan, PhysicalExec
subOp) throws IOException {
     switch (plan.getShuffleType()) {
     case HASH_SHUFFLE:
+    case SCATTERED_HASH_SHUFFLE:
       return new HashShuffleFileWriteExec(ctx, sm, plan, subOp);
 
     case RANGE_SHUFFLE:

http://git-wip-us.apache.org/repos/asf/tajo/blob/48dbfd92/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
index 4e27574..69ecd02 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
@@ -1149,10 +1149,11 @@ public class GlobalPlanner {
         shuffleKeys[i++] = insertNode.getProjectedSchema().getColumn(id);
       }
       channel.setShuffleKeys(shuffleKeys);
+      channel.setShuffleType(SCATTERED_HASH_SHUFFLE);
     } else {
       channel.setShuffleKeys(partitionMethod.getExpressionSchema().toArray());
+      channel.setShuffleType(HASH_SHUFFLE);
     }
-    channel.setShuffleType(HASH_SHUFFLE);
     channel.setShuffleOutputNum(32);
   }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/48dbfd92/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashShuffleFileWriteExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashShuffleFileWriteExec.java
b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashShuffleFileWriteExec.java
index 678b745..44e8646 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashShuffleFileWriteExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashShuffleFileWriteExec.java
@@ -129,6 +129,7 @@ public final class HashShuffleFileWriteExec extends UnaryPhysicalExec
{
       statSet.add(app.getStats());
       if (app.getStats().getNumRows() > 0) {
         context.addShuffleFileOutput(partNum, getDataFile(partNum).getName());
+        context.addPartitionOutputVolume(partNum, app.getStats().getNumBytes());
       }
     }
     

http://git-wip-us.apache.org/repos/asf/tajo/blob/48dbfd92/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java
index 6cada07..806c0f1 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java
@@ -667,6 +667,7 @@ public class QueryUnit implements EventHandler<TaskEvent> {
     int attemptId;
     int partId;
     PullHost host;
+    long volume;
 
     public IntermediateEntry(int taskId, int attemptId, int partId, PullHost host) {
       this.taskId = taskId;
@@ -675,6 +676,14 @@ public class QueryUnit implements EventHandler<TaskEvent> {
       this.host = host;
     }
 
+    public IntermediateEntry(int taskId, int attemptId, int partId, PullHost host, long volume)
{
+      this.taskId = taskId;
+      this.attemptId = attemptId;
+      this.partId = partId;
+      this.host = host;
+      this.volume = volume;
+    }
+
     public ExecutionBlockId getEbId() {
       return ebId;
     }
@@ -699,6 +708,10 @@ public class QueryUnit implements EventHandler<TaskEvent> {
       return this.host;
     }
 
+    public long getVolume() {
+      return this.volume;
+    }
+
     @Override
     public int hashCode() {
       return Objects.hashCode(ebId, taskId, partId, attemptId, host);

http://git-wip-us.apache.org/repos/asf/tajo/blob/48dbfd92/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnitAttempt.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnitAttempt.java
b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnitAttempt.java
index 361f88f..a4fa12f 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnitAttempt.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnitAttempt.java
@@ -280,7 +280,7 @@ public class QueryUnitAttempt implements EventHandler<TaskAttemptEvent>
{
       PullHost host = new PullHost(getHost(), getPullServerPort());
       for (ShuffleFileOutput p : report.getShuffleFileOutputsList()) {
         IntermediateEntry entry = new IntermediateEntry(getId().getQueryUnitId().getId(),
-            getId().getId(), p.getPartId(), host);
+            getId().getId(), p.getPartId(), host, p.getVolume());
         partitions.add(entry);
       }
     }

http://git-wip-us.apache.org/repos/asf/tajo/blob/48dbfd92/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
index ce2194e..1cc5b78 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
@@ -19,6 +19,7 @@
 package org.apache.tajo.master.querymaster;
 
 import com.google.common.collect.Lists;
+import com.google.common.primitives.Ints;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.Path;
@@ -60,6 +61,7 @@ import java.util.Map.Entry;
 
 import static org.apache.tajo.ipc.TajoWorkerProtocol.ShuffleType.HASH_SHUFFLE;
 import static org.apache.tajo.ipc.TajoWorkerProtocol.ShuffleType.RANGE_SHUFFLE;
+import static org.apache.tajo.ipc.TajoWorkerProtocol.ShuffleType.SCATTERED_HASH_SHUFFLE;
 
 /**
  * Repartitioner creates non-leaf tasks and shuffles intermediate data.
@@ -529,7 +531,8 @@ public class Repartitioner {
                                                       MasterPlan masterPlan, SubQuery subQuery,
int maxNum)
       throws IOException {
     DataChannel channel = masterPlan.getIncomingChannels(subQuery.getBlock().getId()).get(0);
-    if (channel.getShuffleType() == HASH_SHUFFLE) {
+    if (channel.getShuffleType() == HASH_SHUFFLE
+        || channel.getShuffleType() == SCATTERED_HASH_SHUFFLE) {
       scheduleHashShuffledFetches(schedulerContext, masterPlan, subQuery, channel, maxNum);
     } else if (channel.getShuffleType() == RANGE_SHUFFLE) {
       scheduleRangeShuffledFetches(schedulerContext, masterPlan, subQuery, channel, maxNum);
@@ -668,12 +671,23 @@ public class Repartitioner {
 
     Map<QueryUnit.PullHost, List<IntermediateEntry>> hashedByHost;
     Map<Integer, Collection<FetchImpl>> finalFetches = new HashMap<Integer,
Collection<FetchImpl>>();
+    Map<ExecutionBlockId, List<IntermediateEntry>> intermediates = new HashMap<ExecutionBlockId,
+        List<IntermediateEntry>>();
 
     for (ExecutionBlock block : masterPlan.getChilds(execBlock)) {
       List<IntermediateEntry> partitions = new ArrayList<IntermediateEntry>();
       for (QueryUnit tasks : subQuery.getContext().getSubQuery(block.getId()).getQueryUnits())
{
         if (tasks.getIntermediateData() != null) {
           partitions.addAll(tasks.getIntermediateData());
+
+          // In scattered hash shuffle, Collecting each IntermediateEntry
+          if (channel.getShuffleType() == SCATTERED_HASH_SHUFFLE) {
+            if (intermediates.containsKey(block.getId())) {
+              intermediates.get(block.getId()).addAll(tasks.getIntermediateData());
+            } else {
+              intermediates.put(block.getId(), tasks.getIntermediateData());
+            }
+          }
         }
       }
       Map<Integer, List<IntermediateEntry>> hashed = hashByKey(partitions);
@@ -709,12 +723,99 @@ public class Repartitioner {
     }
 
     // set the proper number of tasks to the estimated task num
-    schedulerContext.setEstimatedTaskNum(determinedTaskNum);
-    // divide fetch uris into the the proper number of tasks in a round robin manner.
-    scheduleFetchesByRoundRobin(subQuery, finalFetches, scan.getTableName(), determinedTaskNum);
-    LOG.info(subQuery.getId() + ", DeterminedTaskNum : " + determinedTaskNum);
+    if (channel.getShuffleType() == SCATTERED_HASH_SHUFFLE) {
+      scheduleScatteredHashShuffleFetches(schedulerContext, subQuery, intermediates,
+          scan.getTableName());
+    } else {
+      schedulerContext.setEstimatedTaskNum(determinedTaskNum);
+      // divide fetch uris into the the proper number of tasks in a round robin manner.
+      scheduleFetchesByRoundRobin(subQuery, finalFetches, scan.getTableName(), determinedTaskNum);
+      LOG.info(subQuery.getId() + ", DeterminedTaskNum : " + determinedTaskNum);
+    }
   }
 
+  // Scattered hash shuffle hashes the key columns and groups the hash keys associated with
+  // the same hash key. Then, if the volume of a group is larger
+  // than DIST_QUERY_TABLE_PARTITION_VOLUME, it divides the group into more than two sub
groups
+  // according to DIST_QUERY_TABLE_PARTITION_VOLUME (default size = 256MB).
+  // As a result, each group size always becomes the less than or equal
+  // to DIST_QUERY_TABLE_PARTITION_VOLUME. Finally, each subgroup is assigned to a query
unit.
+  // It is usually used for writing partitioned tables.
+  public static void scheduleScatteredHashShuffleFetches(TaskSchedulerContext schedulerContext,
+       SubQuery subQuery, Map<ExecutionBlockId, List<IntermediateEntry>> intermediates,
+       String tableName) {
+    int i = 0;
+    int splitVolume =   subQuery.getContext().getConf().
+        getIntVar(ConfVars.DIST_QUERY_TABLE_PARTITION_VOLUME);
+
+    long sumNumBytes = 0L;
+    Map<Integer, List<FetchImpl>> fetches = new HashMap<Integer, List<FetchImpl>>();
+
+    // Step 1 : divide fetch uris into the the proper number of tasks by
+    // SCATTERED_HASH_SHUFFLE_SPLIT_VOLUME
+    for (Entry<ExecutionBlockId, List<IntermediateEntry>> listEntry : intermediates.entrySet())
{
+
+      // Step 2: Sort IntermediateEntry by partition id. After first sort,
+      // we need to sort again by PullHost address because of data locality.
+      Collections.sort(listEntry.getValue(), new IntermediateEntryComparator());
+      for (IntermediateEntry interm : listEntry.getValue()) {
+        FetchImpl fetch = new FetchImpl(interm.getPullHost(), SCATTERED_HASH_SHUFFLE,
+            listEntry.getKey(), interm.getPartId(), TUtil.newList(interm));
+        if (fetches.size() == 0) {
+          fetches.put(i, TUtil.newList(fetch));
+        } else {
+
+          // Step 3: Compare current partition id with previous partition id because One
task just
+          // can include one partitionId.
+          if (fetches.get(i).get(0).getPartitionId() != interm.getPartId()) {
+            i++;
+            fetches.put(i, TUtil.newList(fetch));
+            sumNumBytes = 0L;
+          } else {
+            if ((sumNumBytes + interm.getVolume()) < splitVolume) {
+              fetches.get(i).add(fetch);
+            } else {
+              i++;
+              fetches.put(i, TUtil.newList(fetch));
+              sumNumBytes = 0L;
+            }
+          }
+        }
+        sumNumBytes += interm.getVolume();
+      }
+    }
+
+    // Step 4 : Set the proper number of tasks to the estimated task num
+    schedulerContext.setEstimatedTaskNum(fetches.size());
+
+    // Step 5 : Apply divided fetches
+    i = 0;
+    Map<String, List<FetchImpl>>[] fetchesArray = new Map[fetches.size()];
+    for(Entry<Integer, List<FetchImpl>> entry : fetches.entrySet()) {
+      fetchesArray[i] = new HashMap<String, List<FetchImpl>>();
+      fetchesArray[i].put(tableName, entry.getValue());
+
+      SubQuery.scheduleFetches(subQuery, fetchesArray[i]);
+      i++;
+    }
+
+    LOG.info(subQuery.getId()
+        + ", ShuffleType:" + SCATTERED_HASH_SHUFFLE.name()
+        + ", DeterminedTaskNum : " + fetches.size());
+  }
+
+  static class IntermediateEntryComparator implements Comparator<IntermediateEntry>
{
+
+    @Override
+    public int compare(IntermediateEntry o1, IntermediateEntry o2) {
+      int cmp = Ints.compare(o1.getPartId(), o2.getPartId());
+      if (cmp != 0) {
+        return cmp;
+      }
+
+      return o1.getPullHost().getHost().compareTo(o2.getPullHost().getHost());
+    }
+  }
 
   public static List<URI> createFetchURL(FetchImpl fetch, boolean includeParts) {
     String scheme = "http://";
@@ -729,6 +830,8 @@ public class Repartitioner {
       urlPrefix.append("h");
     } else if (fetch.getType() == RANGE_SHUFFLE) {
       urlPrefix.append("r").append("&").append(fetch.getRangeParams());
+    } else if (fetch.getType() == SCATTERED_HASH_SHUFFLE) {
+      urlPrefix.append("s");
     }
 
     List<URI> fetchURLs = new ArrayList<URI>();

http://git-wip-us.apache.org/repos/asf/tajo/blob/48dbfd92/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/Task.java b/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
index ee3c40d..991dc4b 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
@@ -348,6 +348,17 @@ public class Task {
         Entry<Integer,String> entry = it.next();
         ShuffleFileOutput.Builder part = ShuffleFileOutput.newBuilder();
         part.setPartId(entry.getKey());
+
+        // Set output volume
+        if (context.getPartitionOutputVolume() != null) {
+          for (Entry<Integer, Long> e : context.getPartitionOutputVolume().entrySet())
{
+            if (entry.getKey().equals(e.getKey())) {
+              part.setVolume(e.getValue().longValue());
+              break;
+            }
+          }
+        }
+
         builder.addShuffleFileOutputs(part.build());
       } while (it.hasNext());
     }

http://git-wip-us.apache.org/repos/asf/tajo/blob/48dbfd92/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java
index e073652..1f0c410 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java
@@ -71,6 +71,9 @@ public class TaskAttemptContext {
   private Enforcer enforcer;
   private QueryContext queryContext;
 
+  /** a output volume for each partition */
+  private Map<Integer, Long> partitionOutputVolume;
+
   public TaskAttemptContext(TajoConf conf, QueryContext queryContext, final QueryUnitAttemptId
queryId,
                             final FragmentProto[] fragments,
                             final Path workDir) {
@@ -94,6 +97,8 @@ public class TaskAttemptContext {
     this.shuffleFileOutputs = Maps.newHashMap();
 
     state = TaskAttemptState.TA_PENDING;
+
+    this.partitionOutputVolume = Maps.newHashMap();
   }
 
   @VisibleForTesting
@@ -193,6 +198,19 @@ public class TaskAttemptContext {
     return shuffleFileOutputs.entrySet().iterator();
   }
   
+  public void addPartitionOutputVolume(int partId, long volume) {
+    if (partitionOutputVolume.containsKey(partId)) {
+      long sum = partitionOutputVolume.get(partId);
+      partitionOutputVolume.put(partId, sum + volume);
+    } else {
+      partitionOutputVolume.put(partId, volume);
+    }
+  }
+
+  public Map<Integer, Long> getPartitionOutputVolume() {
+    return partitionOutputVolume;
+  }
+
   public void updateAssignedFragments(String tableId, Fragment[] fragments) {
     fragmentMap.remove(tableId);
     for(Fragment t : fragments) {

http://git-wip-us.apache.org/repos/asf/tajo/blob/48dbfd92/tajo-core/src/main/proto/TajoWorkerProtocol.proto
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/proto/TajoWorkerProtocol.proto b/tajo-core/src/main/proto/TajoWorkerProtocol.proto
index 3bf6e13..ce8ce86 100644
--- a/tajo-core/src/main/proto/TajoWorkerProtocol.proto
+++ b/tajo-core/src/main/proto/TajoWorkerProtocol.proto
@@ -126,6 +126,7 @@ enum CommandType {
 message ShuffleFileOutput {
     required int32 partId = 1;
     optional string fileName = 2;
+    optional int64 volume = 3;
 }
 
 message QueryExecutionRequestProto {
@@ -145,6 +146,7 @@ enum ShuffleType {
   NONE_SHUFFLE = 0;
   HASH_SHUFFLE = 1;
   RANGE_SHUFFLE = 2;
+  SCATTERED_HASH_SHUFFLE = 3;
 }
 
 enum TransmitType {

http://git-wip-us.apache.org/repos/asf/tajo/blob/48dbfd92/tajo-core/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java
b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java
index c34c3f4..9db8e41 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java
@@ -25,21 +25,33 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.io.compress.CompressionCodecFactory;
 import org.apache.hadoop.io.compress.DeflateCodec;
+import org.apache.tajo.QueryId;
 import org.apache.tajo.QueryTestCaseBase;
 import org.apache.tajo.TajoConstants;
 import org.apache.tajo.TajoTestingCluster;
 import org.apache.tajo.catalog.CatalogService;
 import org.apache.tajo.catalog.CatalogUtil;
 import org.apache.tajo.catalog.TableDesc;
+import org.apache.tajo.engine.planner.global.DataChannel;
+import org.apache.tajo.engine.planner.global.ExecutionBlock;
+import org.apache.tajo.engine.planner.global.MasterPlan;
+import org.apache.tajo.engine.planner.logical.NodeType;
 import org.apache.tajo.ipc.ClientProtos;
+import org.apache.tajo.jdbc.TajoResultSet;
+import org.apache.tajo.master.querymaster.QueryMasterTask;
+import org.apache.tajo.worker.TajoWorker;
 import org.junit.Test;
 
 import java.io.IOException;
 import java.sql.ResultSet;
+import java.util.List;
 import java.util.Map;
 
+import static junit.framework.TestCase.*;
 import static org.apache.tajo.TajoConstants.DEFAULT_DATABASE_NAME;
-import static org.junit.Assert.*;
+import static org.apache.tajo.ipc.TajoWorkerProtocol.ShuffleType.SCATTERED_HASH_SHUFFLE;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
 
 public class TestTablePartitions extends QueryTestCaseBase {
 
@@ -62,6 +74,77 @@ public class TestTablePartitions extends QueryTestCaseBase {
     res = testBase.execute(
         "insert overwrite into " + tableName + " select l_orderkey, l_partkey, " +
             "l_quantity from lineitem");
+
+    MasterPlan plan = getQueryPlan(res);
+    ExecutionBlock rootEB = plan.getRoot();
+
+    /*
+    -------------------------------------------------------------------------------
+    |-eb_1405354886454_0001_000003
+       |-eb_1405354886454_0001_000002
+          |-eb_1405354886454_0001_000001
+     */
+    assertEquals(1, plan.getChildCount(rootEB.getId()));
+
+    ExecutionBlock insertEB = plan.getChild(rootEB.getId(), 0);
+    assertNotNull(insertEB);
+    assertEquals(NodeType.INSERT, insertEB.getPlan().getType());
+    assertEquals(1, plan.getChildCount(insertEB.getId()));
+
+    ExecutionBlock scanEB = plan.getChild(insertEB.getId(), 0);
+
+    List<DataChannel> list = plan.getOutgoingChannels(scanEB.getId());
+    assertEquals(1, list.size());
+    DataChannel channel = list.get(0);
+    assertNotNull(channel);
+    assertEquals(SCATTERED_HASH_SHUFFLE, channel.getShuffleType());
+    assertEquals(1, channel.getShuffleKeys().length);
+
+    res.close();
+  }
+
+  @Test
+  public final void testCreateColumnPartitionedTableWithJoin() throws Exception {
+    String tableName = CatalogUtil.normalizeIdentifier("testCreateColumnPartitionedTableWithJoin");
+    ResultSet res = executeString(
+        "create table " + tableName + " (col1 int4, col2 int4) partition by column(key float8)
");
+    res.close();
+
+    assertTrue(catalog.existsTable(DEFAULT_DATABASE_NAME, tableName));
+    assertEquals(2, catalog.getTableDesc(DEFAULT_DATABASE_NAME, tableName).getSchema().size());
+    assertEquals(3, catalog.getTableDesc(DEFAULT_DATABASE_NAME, tableName).getLogicalSchema().size());
+
+    res = testBase.execute(
+        "insert overwrite into " + tableName + " select l_orderkey, l_partkey, " +
+            "l_quantity from lineitem join orders on l_orderkey = o_orderkey");
+
+    MasterPlan plan = getQueryPlan(res);
+    ExecutionBlock rootEB = plan.getRoot();
+
+    /*
+    -------------------------------------------------------------------------------
+    |-eb_1405356074433_0001_000005
+       |-eb_1405356074433_0001_000004
+          |-eb_1405356074433_0001_000003
+             |-eb_1405356074433_0001_000002
+             |-eb_1405356074433_0001_000001
+     */
+    assertEquals(1, plan.getChildCount(rootEB.getId()));
+
+    ExecutionBlock insertEB = plan.getChild(rootEB.getId(), 0);
+    assertNotNull(insertEB);
+    assertEquals(NodeType.INSERT, insertEB.getPlan().getType());
+    assertEquals(1, plan.getChildCount(insertEB.getId()));
+
+    ExecutionBlock scanEB = plan.getChild(insertEB.getId(), 0);
+
+    List<DataChannel> list = plan.getOutgoingChannels(scanEB.getId());
+    assertEquals(1, list.size());
+    DataChannel channel = list.get(0);
+    assertNotNull(channel);
+    assertEquals(SCATTERED_HASH_SHUFFLE, channel.getShuffleType());
+    assertEquals(1, channel.getShuffleKeys().length);
+
     res.close();
   }
 
@@ -240,21 +323,13 @@ public class TestTablePartitions extends QueryTestCaseBase {
       assertEquals(5, desc.getStats().getNumRows().intValue());
     }
 
-    String expected = "N\n" +
-        "N\n" +
-        "N\n" +
-        "R\n" +
-        "R\n";
-
-    String tableData = getTableFileContents(desc.getPath());
-    assertEquals(expected, tableData);
-
     res = executeString("select * from " + tableName + " where col2 = 2");
 
     Map<Double, int []> resultRows1 = Maps.newHashMap();
     resultRows1.put(45.0d, new int[]{3, 2});
     resultRows1.put(38.0d, new int[]{2, 2});
 
+
     for (int i = 0; i < 2; i++) {
       assertTrue(res.next());
       assertEquals(resultRows1.get(res.getDouble(4))[0], res.getInt(2));
@@ -575,4 +650,17 @@ public class TestTablePartitions extends QueryTestCaseBase {
       assertEquals(5, desc.getStats().getNumRows().intValue());
     }
   }
+
+  private MasterPlan getQueryPlan(ResultSet res) {
+    QueryId queryId = ((TajoResultSet)res).getQueryId();
+    for (TajoWorker eachWorker: testingCluster.getTajoWorkers()) {
+      QueryMasterTask queryMasterTask = eachWorker.getWorkerContext().getQueryMaster().getQueryMasterTask(queryId,
true);
+      if (queryMasterTask != null) {
+        return queryMasterTask.getQuery().getPlan();
+      }
+    }
+
+    fail("Can't find query from workers" + queryId);
+    return null;
+  }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/48dbfd92/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/PullServerAuxService.java
----------------------------------------------------------------------
diff --git a/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/PullServerAuxService.java
b/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/PullServerAuxService.java
index b8fda29..dd3bee3 100644
--- a/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/PullServerAuxService.java
+++ b/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/PullServerAuxService.java
@@ -424,8 +424,8 @@ public class PullServerAuxService extends AuxiliaryService {
           chunks.add(chunk);
         }
 
-        // if a subquery requires a hash repartition
-      } else if (repartitionType.equals("h")) {
+        // if a subquery requires a hash repartition  or a scattered hash repartition
+      } else if (repartitionType.equals("h") || repartitionType.equals("s")) {
         for (String ta : taskIds) {
           Path path = localFS.makeQualified(
               lDirAlloc.getLocalPathToRead(queryBaseDir + "/" + sid + "/" +

http://git-wip-us.apache.org/repos/asf/tajo/blob/48dbfd92/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java
----------------------------------------------------------------------
diff --git a/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java
b/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java
index 373642b..5b76da5 100644
--- a/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java
+++ b/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java
@@ -430,8 +430,8 @@ public class TajoPullServerService extends AbstractService {
           chunks.add(chunk);
         }
 
-        // if a subquery requires a hash shuffle
-      } else if (shuffleType.equals("h")) {
+        // if a subquery requires a hash shuffle or a scattered hash shuffle
+      } else if (shuffleType.equals("h") || shuffleType.equals("s")) {
         for (String ta : taskIds) {
           if (!lDirAlloc.ifExists(queryBaseDir + "/" + sid + "/" + ta + "/output/" + partId,
conf)) {
             LOG.warn(e);


Mime
View raw message