tajo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jihoon...@apache.org
Subject [03/17] git commit: TAJO-987: Hash shuffle should be balanced according to intermediate volumes.
Date Mon, 11 Aug 2014 08:11:32 GMT
TAJO-987: Hash shuffle should be balanced according to intermediate volumes.

Closes #101


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/eeaf379a
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/eeaf379a
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/eeaf379a

Branch: refs/heads/index_support
Commit: eeaf379a48030dd819a6daf2040c779379543ac8
Parents: 072b5a3
Author: Hyunsik Choi <hyunsik@apache.org>
Authored: Mon Aug 4 12:08:51 2014 +0900
Committer: Hyunsik Choi <hyunsik@apache.org>
Committed: Mon Aug 4 12:08:51 2014 +0900

----------------------------------------------------------------------
 CHANGES                                         |   5 +-
 .../tajo/master/querymaster/Repartitioner.java  | 111 +++++++++++++++++--
 .../apache/tajo/master/TestRepartitioner.java   |  53 +++++++++
 3 files changed, 160 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/eeaf379a/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 705e91e..d6ac279 100644
--- a/CHANGES
+++ b/CHANGES
@@ -29,7 +29,10 @@ Release 0.9.0 - unreleased
 
   IMPROVEMENT
 
-    TAJO-955: Add database selection submit button in catalogview.jsp for text 
+    TAJO-987: Hash shuffle should be balanced according to intermediate
+    volumes. (hyunsik)
+
+    TAJO-955: Add database selection submit button in catalogview.jsp for text
     based browse. (Hyoungjun Kim via jihoon)
 
     TAJO-956: CONCAT should be support multiple params and null param.

http://git-wip-us.apache.org/repos/asf/tajo/blob/eeaf379a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
index f86106f..1fa3f11 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
@@ -18,6 +18,7 @@
 
 package org.apache.tajo.master.querymaster;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Lists;
 import com.google.common.primitives.Ints;
 import org.apache.commons.logging.Log;
@@ -48,6 +49,7 @@ import org.apache.tajo.storage.AbstractStorageManager;
 import org.apache.tajo.storage.RowStoreUtil;
 import org.apache.tajo.storage.TupleRange;
 import org.apache.tajo.storage.fragment.FileFragment;
+import org.apache.tajo.util.Pair;
 import org.apache.tajo.util.TUtil;
 import org.apache.tajo.util.TajoIdUtils;
 import org.apache.tajo.worker.FetchImpl;
@@ -59,9 +61,7 @@ import java.net.URI;
 import java.util.*;
 import java.util.Map.Entry;
 
-import static org.apache.tajo.ipc.TajoWorkerProtocol.ShuffleType.HASH_SHUFFLE;
-import static org.apache.tajo.ipc.TajoWorkerProtocol.ShuffleType.RANGE_SHUFFLE;
-import static org.apache.tajo.ipc.TajoWorkerProtocol.ShuffleType.SCATTERED_HASH_SHUFFLE;
+import static org.apache.tajo.ipc.TajoWorkerProtocol.ShuffleType.*;
 
 /**
  * Repartitioner creates non-leaf tasks and shuffles intermediate data.
@@ -675,6 +675,31 @@ public class Repartitioner {
     }
   }
 
+  @VisibleForTesting
+  public static class FetchGroupMeta {
+    long totalVolume;
+    List<FetchImpl> fetchUrls;
+
+    public FetchGroupMeta(long volume, FetchImpl fetchUrls) {
+      this.totalVolume = volume;
+      this.fetchUrls = Lists.newArrayList(fetchUrls);
+    }
+
+    public FetchGroupMeta addFetche(FetchImpl fetches) {
+      this.fetchUrls.add(fetches);
+      return this;
+    }
+
+    public void increaseVolume(long volume) {
+      this.totalVolume += volume;
+    }
+
+    public long getVolume() {
+      return totalVolume;
+    }
+
+  }
+
   public static void scheduleHashShuffledFetches(TaskSchedulerContext schedulerContext, MasterPlan
masterPlan,
                                                  SubQuery subQuery, DataChannel channel,
                                                  int maxNum) {
@@ -689,7 +714,7 @@ public class Repartitioner {
     SubQuery.scheduleFragments(subQuery, fragments);
 
     Map<QueryUnit.PullHost, List<IntermediateEntry>> hashedByHost;
-    Map<Integer, Collection<FetchImpl>> finalFetches = new HashMap<Integer,
Collection<FetchImpl>>();
+    Map<Integer, FetchGroupMeta> finalFetches = new HashMap<Integer, FetchGroupMeta>();
     Map<ExecutionBlockId, List<IntermediateEntry>> intermediates = new HashMap<ExecutionBlockId,
         List<IntermediateEntry>>();
 
@@ -717,10 +742,15 @@ public class Repartitioner {
           FetchImpl fetch = new FetchImpl(e.getKey(), channel.getShuffleType(),
               block.getId(), interm.getKey(), e.getValue());
 
+          long volumeSum = 0;
+          for (IntermediateEntry ie : e.getValue()) {
+            volumeSum += ie.getVolume();
+          }
+
           if (finalFetches.containsKey(interm.getKey())) {
-            finalFetches.get(interm.getKey()).add(fetch);
+            finalFetches.get(interm.getKey()).addFetche(fetch).increaseVolume(volumeSum);
           } else {
-            finalFetches.put(interm.getKey(), TUtil.newList(fetch));
+            finalFetches.put(interm.getKey(), new FetchGroupMeta(volumeSum, fetch));
           }
         }
       }
@@ -756,12 +786,77 @@ public class Repartitioner {
           scan.getTableName());
     } else {
       schedulerContext.setEstimatedTaskNum(determinedTaskNum);
-      // divide fetch uris into the the proper number of tasks in a round robin manner.
-      scheduleFetchesByRoundRobin(subQuery, finalFetches, scan.getTableName(), determinedTaskNum);
+      // divide fetch uris into the the proper number of tasks according to volumes
+      scheduleFetchesByEvenDistributedVolumes(subQuery, finalFetches, scan.getTableName(),
determinedTaskNum);
       LOG.info(subQuery.getId() + ", DeterminedTaskNum : " + determinedTaskNum);
     }
   }
 
+  public static Pair<Long [], Map<String, List<FetchImpl>>[]> makeEvenDistributedFetchImpl(
+      Map<Integer, FetchGroupMeta> partitions, String tableName, int num) {
+
+    // Sort fetchGroupMeta in a descending order of data volumes.
+    List<FetchGroupMeta> fetchGroupMetaList = Lists.newArrayList(partitions.values());
+    Collections.sort(fetchGroupMetaList, new Comparator<FetchGroupMeta>() {
+      @Override
+      public int compare(FetchGroupMeta o1, FetchGroupMeta o2) {
+        return o1.getVolume() < o2.getVolume() ? 1 : (o1.getVolume() > o2.getVolume()
? -1 : 0);
+      }
+    });
+
+    // Initialize containers
+    Map<String, List<FetchImpl>>[] fetchesArray = new Map[num];
+    Long [] assignedVolumes = new Long[num];
+    // initialization
+    for (int i = 0; i < num; i++) {
+      fetchesArray[i] = new HashMap<String, List<FetchImpl>>();
+      assignedVolumes[i] = 0l;
+    }
+
+    // This algorithm assignes bigger first manner by using a sorted iterator. It is a kind
of greedy manner.
+    // Its complexity is O(n). Since FetchGroup can be more than tens of thousands, we should
consider its complexity.
+    // In terms of this point, it will show reasonable performance and results. even though
it is not an optimal
+    // algorithm.
+    Iterator<FetchGroupMeta> iterator = fetchGroupMetaList.iterator();
+
+    int p = 0;
+    while(iterator.hasNext()) {
+      while (p < num && iterator.hasNext()) {
+        FetchGroupMeta fetchGroupMeta = iterator.next();
+        assignedVolumes[p] += fetchGroupMeta.getVolume();
+
+        TUtil.putCollectionToNestedList(fetchesArray[p], tableName, fetchGroupMeta.fetchUrls);
+        p++;
+      }
+
+      p = num - 1;
+      while (p > 0 && iterator.hasNext()) {
+        FetchGroupMeta fetchGroupMeta = iterator.next();
+        assignedVolumes[p] += fetchGroupMeta.getVolume();
+
+        // While the current one is smaller than next one, it adds additional fetches to
current one.
+        while(iterator.hasNext() && assignedVolumes[p - 1] > assignedVolumes[p])
{
+          FetchGroupMeta additionalFetchGroup = iterator.next();
+          assignedVolumes[p] += additionalFetchGroup.getVolume();
+          TUtil.putCollectionToNestedList(fetchesArray[p], tableName, additionalFetchGroup.fetchUrls);
+        }
+
+        p--;
+      }
+    }
+
+    return new Pair<Long[], Map<String, List<FetchImpl>>[]>(assignedVolumes,
fetchesArray);
+  }
+
+  public static void scheduleFetchesByEvenDistributedVolumes(SubQuery subQuery, Map<Integer,
FetchGroupMeta> partitions,
+                                                             String tableName, int num) {
+    Map<String, List<FetchImpl>>[] fetchsArray = makeEvenDistributedFetchImpl(partitions,
tableName, num).getSecond();
+    // Schedule FetchImpls
+    for (Map<String, List<FetchImpl>> eachFetches : fetchsArray) {
+      SubQuery.scheduleFetches(subQuery, eachFetches);
+    }
+  }
+
   // Scattered hash shuffle hashes the key columns and groups the hash keys associated with
   // the same hash key. Then, if the volume of a group is larger
   // than DIST_QUERY_TABLE_PARTITION_VOLUME, it divides the group into more than two sub
groups

http://git-wip-us.apache.org/repos/asf/tajo/blob/eeaf379a/tajo-core/src/test/java/org/apache/tajo/master/TestRepartitioner.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/master/TestRepartitioner.java b/tajo-core/src/test/java/org/apache/tajo/master/TestRepartitioner.java
index 0ccaebe..009c02e 100644
--- a/tajo-core/src/test/java/org/apache/tajo/master/TestRepartitioner.java
+++ b/tajo-core/src/test/java/org/apache/tajo/master/TestRepartitioner.java
@@ -18,11 +18,14 @@
 
 package org.apache.tajo.master;
 
+import com.google.common.collect.Maps;
 import org.apache.tajo.ExecutionBlockId;
 import org.apache.tajo.QueryId;
 import org.apache.tajo.TestTajoIds;
 import org.apache.tajo.ipc.TajoWorkerProtocol;
 import org.apache.tajo.master.querymaster.QueryUnit;
+import org.apache.tajo.master.querymaster.Repartitioner;
+import org.apache.tajo.util.Pair;
 import org.apache.tajo.util.TUtil;
 import org.apache.tajo.worker.FetchImpl;
 import org.jboss.netty.handler.codec.http.QueryStringDecoder;
@@ -35,6 +38,8 @@ import java.util.List;
 import java.util.Map;
 
 import static junit.framework.Assert.assertEquals;
+import static org.apache.tajo.master.querymaster.Repartitioner.FetchGroupMeta;
+import static org.junit.Assert.assertTrue;
 
 public class TestRepartitioner {
   @Test
@@ -83,4 +88,52 @@ public class TestRepartitioner {
     }
     return ret;
   }
+
+  @Test
+  public void testScheduleFetchesByEvenDistributedVolumes() {
+    Map<Integer, FetchGroupMeta> fetchGroups = Maps.newHashMap();
+    String tableName = "test1";
+
+
+    fetchGroups.put(0, new FetchGroupMeta(100, new FetchImpl()));
+    fetchGroups.put(1, new FetchGroupMeta(80, new FetchImpl()));
+    fetchGroups.put(2, new FetchGroupMeta(70, new FetchImpl()));
+    fetchGroups.put(3, new FetchGroupMeta(30, new FetchImpl()));
+    fetchGroups.put(4, new FetchGroupMeta(10, new FetchImpl()));
+    fetchGroups.put(5, new FetchGroupMeta(5, new FetchImpl()));
+
+    Pair<Long [], Map<String, List<FetchImpl>>[]> results;
+
+    results = Repartitioner.makeEvenDistributedFetchImpl(fetchGroups, tableName, 1);
+    long expected [] = {100 + 80 + 70 + 30 + 10 + 5};
+    assertFetchVolumes(expected, results.getFirst());
+
+    results = Repartitioner.makeEvenDistributedFetchImpl(fetchGroups, tableName, 2);
+    long expected0 [] = {130, 165};
+    assertFetchVolumes(expected0, results.getFirst());
+
+    results = Repartitioner.makeEvenDistributedFetchImpl(fetchGroups, tableName, 3);
+    long expected1 [] = {100, 95, 100};
+    assertFetchVolumes(expected1, results.getFirst());
+
+    results = Repartitioner.makeEvenDistributedFetchImpl(fetchGroups, tableName, 4);
+    long expected2 [] = {100, 80, 70, 45};
+    assertFetchVolumes(expected2, results.getFirst());
+
+    results = Repartitioner.makeEvenDistributedFetchImpl(fetchGroups, tableName, 5);
+    long expected3 [] = {100, 80, 70, 30, 15};
+    assertFetchVolumes(expected3, results.getFirst());
+
+    results = Repartitioner.makeEvenDistributedFetchImpl(fetchGroups, tableName, 6);
+    long expected4 [] = {100, 80, 70, 30, 10, 5};
+    assertFetchVolumes(expected4, results.getFirst());
+  }
+
+  private static void assertFetchVolumes(long [] expected, Long [] results) {
+    assertEquals("the lengths of volumes are mismatch", expected.length, results.length);
+
+    for (int i = 0; i < expected.length; i++) {
+      assertTrue(expected[i] + " is expected, but " + results[i], expected[i] == results[i]);
+    }
+  }
 }


Mime
View raw message