tajo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jihoon...@apache.org
Subject [1/4] tajo git commit: TAJO-1408 Make IntermediateEntryProto more compact. (Contributed by navis, Committed by hyunsik)
Date Fri, 08 May 2015 05:53:52 GMT
Repository: tajo
Updated Branches:
  refs/heads/index_support 42bcf2de0 -> 410ca3188


TAJO-1408 Make IntermediateEntryProto more compact. (Contributed by navis, Committed by hyunsik)

close #428


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/fab63900
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/fab63900
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/fab63900

Branch: refs/heads/index_support
Commit: fab63900cf44b61d571fb9c2982285bb8b669702
Parents: 9b3824b
Author: Hyunsik Choi <hyunsik@apache.org>
Authored: Thu May 7 14:20:24 2015 -0700
Committer: Hyunsik Choi <hyunsik@apache.org>
Committed: Thu May 7 14:20:24 2015 -0700

----------------------------------------------------------------------
 CHANGES                                         |  3 +
 .../java/org/apache/tajo/util/NumberUtil.java   | 54 ++++++++++++
 .../apache/tajo/querymaster/Repartitioner.java  | 18 ++--
 .../java/org/apache/tajo/querymaster/Task.java  | 56 ++++++-------
 .../tajo/worker/ExecutionBlockContext.java      | 32 ++-----
 .../src/main/proto/TajoWorkerProtocol.proto     | 16 +---
 .../apache/tajo/master/TestRepartitioner.java   | 77 +++++++----------
 .../tajo/querymaster/TestIntermediateEntry.java | 24 +++---
 .../tajo/storage/HashShuffleAppender.java       | 87 ++++++++++++++------
 .../storage/HashShuffleAppenderManager.java     | 12 +--
 10 files changed, 205 insertions(+), 174 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/fab63900/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index a8074cd..d448c09 100644
--- a/CHANGES
+++ b/CHANGES
@@ -24,6 +24,9 @@ Release 0.11.0 - unreleased
 
   IMPROVEMENT
 
+    TAJO-1408: Make IntermediateEntryProto more compact. 
+    (Contributed by navis, Committed by hyunsik)
+
     TAJO-1584: Remove QueryMaster client sharing in TajoMaster and TajoWorker.
     (jinho)
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/fab63900/tajo-common/src/main/java/org/apache/tajo/util/NumberUtil.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/util/NumberUtil.java b/tajo-common/src/main/java/org/apache/tajo/util/NumberUtil.java
index 0d70cc2..d14e0b4 100644
--- a/tajo-common/src/main/java/org/apache/tajo/util/NumberUtil.java
+++ b/tajo-common/src/main/java/org/apache/tajo/util/NumberUtil.java
@@ -18,6 +18,7 @@
 
 package org.apache.tajo.util;
 
+import com.google.common.primitives.Longs;
 import io.netty.buffer.ByteBuf;
 import io.netty.util.internal.PlatformDependent;
 
@@ -25,6 +26,8 @@ import java.io.IOException;
 import java.io.OutputStream;
 import java.lang.reflect.Constructor;
 import java.nio.charset.Charset;
+import java.util.Arrays;
+import java.util.Iterator;
 
 // this is an implementation copied from LazyPrimitives in hive
 public class NumberUtil {
@@ -1050,4 +1053,55 @@ public class NumberUtil {
 
     return returnNumber;
   }
+
+  public static long mergeToLong(int value1, int value2) {
+    return (long)value1 << 32 | value2 & 0xffffffffl;
+  }
+
+  public static int toHighInt(long value) {
+    return (int)(value >> 32);
+  }
+
+  public static int toLowInt(long value) {
+    return (int)value;
+  }
+
+  public static class PrimitiveLongs implements Iterable<Long> {
+    int index;
+    long[] longArray;
+
+    public PrimitiveLongs(int initLength) {
+      longArray = new long[initLength];
+    }
+    public void add(long value) {
+      reserve(1)[index++] = value;
+    }
+    public void add(long[] value) {
+      System.arraycopy(value, 0, reserve(value.length), index, value.length);
+      index += value.length;
+    }
+    public long[] backingArray() {
+      return longArray;
+    }
+    public long[] toArray() {
+      return Arrays.copyOfRange(longArray, 0, index);
+    }
+    public int size() {
+      return index;
+    }
+    private long[] reserve(int reserve) {
+      if (index + reserve < longArray.length) {
+        return longArray;
+      }
+      int newLength = Math.max(index + reserve, longArray.length << 1);
+      long[] newLongArray = new long[newLength];
+      System.arraycopy(longArray, 0, newLongArray, 0, index);
+      return longArray = newLongArray;
+    }
+
+    @Override
+    public Iterator<Long> iterator() {
+      return Longs.asList(toArray()).iterator();
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/fab63900/tajo-core/src/main/java/org/apache/tajo/querymaster/Repartitioner.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/Repartitioner.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/Repartitioner.java
index 8e9e343..545d615 100644
--- a/tajo-core/src/main/java/org/apache/tajo/querymaster/Repartitioner.java
+++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/Repartitioner.java
@@ -1074,15 +1074,17 @@ public class Repartitioner {
         firstSplitVolume = splitVolume;
       }
 
-      //Each Pair object in the splits variable is assigned to the next ExectionBlock's task.
+      //Each Pair object in the splits variable is assigned to the next ExecutionBlock's
task.
       //The first long value is a offset of the intermediate file and the second long value
is length.
-      List<Pair<Long, Long>> splits = currentInterm.split(firstSplitVolume, splitVolume);
-      if (splits == null || splits.isEmpty()) {
+      long[] splits = currentInterm.split(firstSplitVolume, splitVolume);
+      if (splits == null || splits.length == 0) {
         break;
       }
 
-      for (Pair<Long, Long> eachSplit: splits) {
-        if (fetchListVolume > 0 && fetchListVolume + eachSplit.getSecond() >=
splitVolume) {
+      for (int i = 0; i < splits.length; i += 2) {
+        long offset = splits[i];
+        long length = splits[i + 1];
+        if (fetchListVolume > 0 && fetchListVolume + length >= splitVolume)
{
           if (!fetchListForSingleTask.isEmpty()) {
             fetches.add(fetchListForSingleTask);
           }
@@ -1091,10 +1093,10 @@ public class Repartitioner {
         }
         FetchImpl fetch = new FetchImpl(currentInterm.getPullHost(), SCATTERED_HASH_SHUFFLE,
             ebId, currentInterm.getPartId(), TUtil.newList(currentInterm));
-        fetch.setOffset(eachSplit.getFirst());
-        fetch.setLength(eachSplit.getSecond());
+        fetch.setOffset(offset);
+        fetch.setLength(length);
         fetchListForSingleTask.add(fetch);
-        fetchListVolume += eachSplit.getSecond();
+        fetchListVolume += length;
       }
     }
     if (!fetchListForSingleTask.isEmpty()) {

http://git-wip-us.apache.org/repos/asf/tajo/blob/fab63900/tajo-core/src/main/java/org/apache/tajo/querymaster/Task.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/Task.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/Task.java
index 1da623e..d2be973 100644
--- a/tajo-core/src/main/java/org/apache/tajo/querymaster/Task.java
+++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/Task.java
@@ -23,6 +23,7 @@ import com.google.common.base.Objects;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
+import com.google.common.primitives.Longs;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -34,7 +35,6 @@ import org.apache.tajo.TajoProtos.TaskAttemptState;
 import org.apache.tajo.TaskAttemptId;
 import org.apache.tajo.TaskId;
 import org.apache.tajo.catalog.statistics.TableStats;
-import org.apache.tajo.ipc.TajoWorkerProtocol.FailureIntermediateProto;
 import org.apache.tajo.ipc.TajoWorkerProtocol.IntermediateEntryProto;
 import org.apache.tajo.master.TaskState;
 import org.apache.tajo.master.event.*;
@@ -44,6 +44,7 @@ import org.apache.tajo.storage.DataLocation;
 import org.apache.tajo.storage.fragment.FileFragment;
 import org.apache.tajo.storage.fragment.Fragment;
 import org.apache.tajo.storage.fragment.FragmentConvertor;
+import org.apache.tajo.util.NumberUtil;
 import org.apache.tajo.util.Pair;
 import org.apache.tajo.util.TajoIdUtils;
 import org.apache.tajo.util.history.TaskHistory;
@@ -785,8 +786,8 @@ public class Task implements EventHandler<TaskEvent> {
     int partId;
     PullHost host;
     long volume;
-    List<Pair<Long, Integer>> pages;
-    List<Pair<Long, Pair<Integer, Integer>>> failureRowNums;
+    long[] pages;
+    long[] failureRowNums;
 
     public IntermediateEntry(IntermediateEntryProto proto) {
       this.ebId = new ExecutionBlockId(proto.getEbId());
@@ -794,21 +795,12 @@ public class Task implements EventHandler<TaskEvent> {
       this.attemptId = proto.getAttemptId();
       this.partId = proto.getPartId();
 
-      String[] pullHost = proto.getHost().split(":");
+      String [] pullHost = proto.getAddress().split(":");
       this.host = new PullHost(pullHost[0], Integer.parseInt(pullHost[1]));
       this.volume = proto.getVolume();
 
-      failureRowNums = new ArrayList<Pair<Long, Pair<Integer, Integer>>>();
-      for (FailureIntermediateProto eachFailure: proto.getFailuresList()) {
-
-        failureRowNums.add(new Pair(eachFailure.getPagePos(),
-            new Pair(eachFailure.getStartRowNum(), eachFailure.getEndRowNum())));
-      }
-
-      pages = new ArrayList<Pair<Long, Integer>>();
-      for (IntermediateEntryProto.PageProto eachPage: proto.getPagesList()) {
-        pages.add(new Pair(eachPage.getPos(), eachPage.getLength()));
-      }
+      this.failureRowNums = Longs.toArray(proto.getFailuresList());
+      this.pages = Longs.toArray(proto.getPagesList());
     }
 
     public IntermediateEntry(int taskId, int attemptId, int partId, PullHost host) {
@@ -858,15 +850,15 @@ public class Task implements EventHandler<TaskEvent> {
       return this.volume = volume;
     }
 
-    public List<Pair<Long, Integer>> getPages() {
+    public long[] getPages() {
       return pages;
     }
 
-    public void setPages(List<Pair<Long, Integer>> pages) {
+    public void setPages(long[] pages) {
       this.pages = pages;
     }
 
-    public List<Pair<Long, Pair<Integer, Integer>>> getFailureRowNums()
{
+    public long[] getFailureRowNums() {
       return failureRowNums;
     }
 
@@ -875,38 +867,38 @@ public class Task implements EventHandler<TaskEvent> {
       return Objects.hashCode(ebId, taskId, partId, attemptId, host);
     }
 
-    public List<Pair<Long, Long>> split(long firstSplitVolume, long splitVolume)
{
-      List<Pair<Long, Long>> splits = new ArrayList<Pair<Long, Long>>();
+    public long[] split(long firstSplitVolume, long splitVolume) {
 
-      if (pages == null || pages.isEmpty()) {
-        return splits;
+      if (pages == null || pages.length == 0) {
+        return null;
       }
-      int pageSize = pages.size();
 
+      NumberUtil.PrimitiveLongs splits = new NumberUtil.PrimitiveLongs(100);
       long currentOffset = -1;
       long currentBytes = 0;
 
       long realSplitVolume = firstSplitVolume > 0 ? firstSplitVolume : splitVolume;
-      for (int i = 0; i < pageSize; i++) {
-        Pair<Long, Integer> eachPage = pages.get(i);
+      for (int i = 0; i < pages.length; i += 2) {
         if (currentOffset == -1) {
-          currentOffset = eachPage.getFirst();
+          currentOffset = pages[i];
         }
-        if (currentBytes > 0 && currentBytes + eachPage.getSecond() >= realSplitVolume)
{
-          splits.add(new Pair(currentOffset, currentBytes));
-          currentOffset = eachPage.getFirst();
+        if (currentBytes > 0 && currentBytes + pages[i + 1] >= realSplitVolume)
{
+          splits.add(currentOffset);
+          splits.add(currentBytes);
+          currentOffset = pages[i];
           currentBytes = 0;
           realSplitVolume = splitVolume;
         }
 
-        currentBytes += eachPage.getSecond();
+        currentBytes += pages[i + 1];
       }
 
       //add last
       if (currentBytes > 0) {
-        splits.add(new Pair(currentOffset, currentBytes));
+        splits.add(currentOffset);
+        splits.add(currentBytes);
       }
-      return splits;
+      return splits.toArray();
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/fab63900/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java b/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java
index cd4b6a6..270000a 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java
@@ -41,12 +41,12 @@ import org.apache.tajo.rpc.RpcClientManager;
 import org.apache.tajo.storage.HashShuffleAppenderManager;
 import org.apache.tajo.storage.StorageUtil;
 import org.apache.tajo.util.NetUtils;
-import org.apache.tajo.util.Pair;
 
 import java.io.IOException;
 import java.net.ConnectException;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentMap;
@@ -312,38 +312,18 @@ public class ExecutionBlockContext {
       }
 
       IntermediateEntryProto.Builder intermediateBuilder = IntermediateEntryProto.newBuilder();
-      IntermediateEntryProto.PageProto.Builder pageBuilder = IntermediateEntryProto.PageProto.newBuilder();
-      FailureIntermediateProto.Builder failureBuilder = FailureIntermediateProto.newBuilder();
 
+      WorkerConnectionInfo connectionInfo = getWorkerContext().getConnectionInfo();
+      String address = connectionInfo.getHost() + ":" + connectionInfo.getPullServerPort();
       for (HashShuffleAppenderManager.HashShuffleIntermediate eachShuffle: shuffles) {
-        List<IntermediateEntryProto.PageProto> pages = Lists.newArrayList();
-        List<FailureIntermediateProto> failureIntermediateItems = Lists.newArrayList();
-
-        for (Pair<Long, Integer> eachPage: eachShuffle.getPages()) {
-          pageBuilder.clear();
-          pageBuilder.setPos(eachPage.getFirst());
-          pageBuilder.setLength(eachPage.getSecond());
-          pages.add(pageBuilder.build());
-        }
-
-        for(Pair<Long, Pair<Integer, Integer>> eachFailure: eachShuffle.getFailureTskTupleIndexes())
{
-          failureBuilder.clear();
-          failureBuilder.setPagePos(eachFailure.getFirst());
-          failureBuilder.setStartRowNum(eachFailure.getSecond().getFirst());
-          failureBuilder.setEndRowNum(eachFailure.getSecond().getSecond());
-          failureIntermediateItems.add(failureBuilder.build());
-        }
-        intermediateBuilder.clear();
-
         intermediateBuilder.setEbId(ebId.getProto())
-            .setHost(getWorkerContext().getConnectionInfo().getHost() + ":" +
-                getWorkerContext().getConnectionInfo().getPullServerPort())
+            .setAddress(address)
             .setTaskId(-1)
             .setAttemptId(-1)
             .setPartId(eachShuffle.getPartId())
             .setVolume(eachShuffle.getVolume())
-            .addAllPages(pages)
-            .addAllFailures(failureIntermediateItems);
+            .addAllPages(eachShuffle.getPages())
+            .addAllFailures(eachShuffle.getFailureTskTupleIndexes());
         intermediateEntries.add(intermediateBuilder.build());
       }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/fab63900/tajo-core/src/main/proto/TajoWorkerProtocol.proto
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/proto/TajoWorkerProtocol.proto b/tajo-core/src/main/proto/TajoWorkerProtocol.proto
index fddef8f..5d8e446 100644
--- a/tajo-core/src/main/proto/TajoWorkerProtocol.proto
+++ b/tajo-core/src/main/proto/TajoWorkerProtocol.proto
@@ -97,25 +97,15 @@ message FetchProto {
     optional int64 length = 12;
 }
 
-message FailureIntermediateProto {
-    required int64 pagePos = 1;
-    required int32 startRowNum = 2;
-    required int32 endRowNum = 3;
-}
-
 message IntermediateEntryProto {
-    message PageProto {
-        required int64 pos = 1;
-        required int32 length = 2;
-    }
     required ExecutionBlockIdProto ebId = 1;
     required int32 taskId = 2;
     required int32 attemptId = 3;
     required int32 partId = 4;
-    required string host = 5;
+    required string address = 5;
     required int64 volume = 6;
-    repeated PageProto pages = 7;
-    repeated FailureIntermediateProto failures = 8;
+    repeated int64 pages = 7;       // pos : length
+    repeated int64 failures = 8;    // pagePos : startRowNum:endRowNum
 }
 
 message ExecutionBlockReport {

http://git-wip-us.apache.org/repos/asf/tajo/blob/fab63900/tajo-core/src/test/java/org/apache/tajo/master/TestRepartitioner.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/master/TestRepartitioner.java b/tajo-core/src/test/java/org/apache/tajo/master/TestRepartitioner.java
index 9910d79..17706f4 100644
--- a/tajo-core/src/test/java/org/apache/tajo/master/TestRepartitioner.java
+++ b/tajo-core/src/test/java/org/apache/tajo/master/TestRepartitioner.java
@@ -28,6 +28,7 @@ import org.apache.tajo.ipc.TajoWorkerProtocol;
 import org.apache.tajo.querymaster.Task;
 import org.apache.tajo.querymaster.Task.IntermediateEntry;
 import org.apache.tajo.querymaster.Repartitioner;
+import org.apache.tajo.util.NumberUtil;
 import org.apache.tajo.util.Pair;
 import org.apache.tajo.util.TUtil;
 import org.apache.tajo.worker.FetchImpl;
@@ -176,20 +177,7 @@ public class TestRepartitioner {
     List<IntermediateEntry> intermediateEntries = new ArrayList<IntermediateEntry>();
 
     int[] pageLengths = {10 * 1024 * 1024, 10 * 1024 * 1024, 10 * 1024 * 1024, 5 * 1024 *
1024};   //35 MB
-    long expectedTotalLength = 0;
-    for (int i = 0; i < 20; i++) {
-      List<Pair<Long, Integer>> pages = new ArrayList<Pair<Long, Integer>>();
-      long offset = 0;
-      for (int j = 0; j < pageLengths.length; j++) {
-        pages.add(new Pair(offset, pageLengths[j]));
-        offset += pageLengths[j];
-        expectedTotalLength += pageLengths[j];
-      }
-      IntermediateEntry interm = new IntermediateEntry(i, -1, -1, new Task.PullHost("" +
i, i));
-      interm.setPages(pages);
-      interm.setVolume(offset);
-      intermediateEntries.add(interm);
-    }
+    long expectedTotalLength = makeIntermediates(pageLengths, true, intermediateEntries);
 
     long splitVolume = 128 * 1024 * 1024;
     List<List<FetchImpl>> fetches = Repartitioner.splitOrMergeIntermediates(null,
intermediateEntries,
@@ -221,6 +209,27 @@ public class TestRepartitioner {
     assertEquals(expectedTotalLength, totalLength);
   }
 
+  private long makeIntermediates(int[] pageLengths, boolean uniqueHosts,
+                                 List<IntermediateEntry> intermediateEntries) {
+    long expectedTotalLength = 0;
+    for (int i = 0; i < 20; i++) {
+      NumberUtil.PrimitiveLongs pages = new NumberUtil.PrimitiveLongs(10);
+      long offset = 0;
+      for (int pageLength : pageLengths) {
+        pages.add(offset);
+        pages.add(pageLength);
+        offset += pageLength;
+        expectedTotalLength += pageLength;
+      }
+      IntermediateEntry interm = new IntermediateEntry(i, -1, -1,
+          new Task.PullHost(uniqueHosts ? "" + i : "", uniqueHosts ? i : 0));
+      interm.setPages(pages.toArray());
+      interm.setVolume(offset);
+      intermediateEntries.add(interm);
+    }
+    return expectedTotalLength;
+  }
+
   @Test
   public void testSplitIntermediates() {
     List<IntermediateEntry> intermediateEntries = new ArrayList<IntermediateEntry>();
@@ -234,20 +243,7 @@ public class TestRepartitioner {
       }
     }
 
-    long expectedTotalLength = 0;
-    for (int i = 0; i < 20; i++) {
-      List<Pair<Long, Integer>> pages = new ArrayList<Pair<Long, Integer>>();
-      long offset = 0;
-      for (int j = 0; j < pageLengths.length; j++) {
-        pages.add(new Pair(offset, pageLengths[j]));
-        offset += pageLengths[j];
-        expectedTotalLength += pageLengths[j];
-      }
-      IntermediateEntry interm = new IntermediateEntry(i, -1, 0, new Task.PullHost("" + i,
i));
-      interm.setPages(pages);
-      interm.setVolume(offset);
-      intermediateEntries.add(interm);
-    }
+    long expectedTotalLength = makeIntermediates(pageLengths, true, intermediateEntries);
 
     long splitVolume = 128 * 1024 * 1024;
     List<List<FetchImpl>> fetches = Repartitioner.splitOrMergeIntermediates(null,
intermediateEntries,
@@ -368,12 +364,12 @@ public class TestRepartitioner {
 
     List<IntermediateEntry> entries = new ArrayList<IntermediateEntry>();
     for (int i = 0; i < 2; i++) {
-      List<Pair<Long, Integer>> pages = new ArrayList<Pair<Long, Integer>>();
-      for (int j = 0; j < pageDatas.length; j++) {
-        pages.add(new Pair(pageDatas[j][0], (int) (pageDatas[j][1])));
+      NumberUtil.PrimitiveLongs pages = new NumberUtil.PrimitiveLongs(10);
+      for (long[] pageData : pageDatas) {
+        pages.add(pageData);
       }
       IntermediateEntry entry = new IntermediateEntry(-1, -1, 1, new Task.PullHost("host"
+ i , 9000));
-      entry.setPages(pages);
+      entry.setPages(pages.toArray());
 
       entries.add(entry);
     }
@@ -421,22 +417,7 @@ public class TestRepartitioner {
       }
     }
 
-    long expectedTotalLength = 0;
-    Task.PullHost pullHost = new Task.PullHost("host", 0);
-
-    for (int i = 0; i < 20; i++) {
-      List<Pair<Long, Integer>> pages = new ArrayList<Pair<Long, Integer>>();
-      long offset = 0;
-      for (int j = 0; j < pageLengths.length; j++) {
-        pages.add(new Pair(offset, pageLengths[j]));
-        offset += pageLengths[j];
-        expectedTotalLength += pageLengths[j];
-      }
-      IntermediateEntry interm = new IntermediateEntry(i, -1, 0, pullHost);
-      interm.setPages(pages);
-      interm.setVolume(offset);
-      intermediateEntries.add(interm);
-    }
+    long expectedTotalLength = makeIntermediates(pageLengths, false, intermediateEntries);
 
     long splitVolume = 128 * 1024 * 1024;
     List<List<FetchImpl>> fetches = Repartitioner.splitOrMergeIntermediates(null,
intermediateEntries,

http://git-wip-us.apache.org/repos/asf/tajo/blob/fab63900/tajo-core/src/test/java/org/apache/tajo/querymaster/TestIntermediateEntry.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/querymaster/TestIntermediateEntry.java
b/tajo-core/src/test/java/org/apache/tajo/querymaster/TestIntermediateEntry.java
index 237fb32..b81085b 100644
--- a/tajo-core/src/test/java/org/apache/tajo/querymaster/TestIntermediateEntry.java
+++ b/tajo-core/src/test/java/org/apache/tajo/querymaster/TestIntermediateEntry.java
@@ -18,12 +18,9 @@
 
 package org.apache.tajo.querymaster;
 
-import org.apache.tajo.util.Pair;
+import org.apache.tajo.util.NumberUtil;
 import org.junit.Test;
 
-import java.util.ArrayList;
-import java.util.List;
-
 import static org.junit.Assert.assertEquals;
 
 public class TestIntermediateEntry {
@@ -31,23 +28,22 @@ public class TestIntermediateEntry {
   public void testPage() {
     Task.IntermediateEntry interm = new Task.IntermediateEntry(-1, -1, 1, null);
 
-    List<Pair<Long, Integer>> pages = new ArrayList<Pair<Long, Integer>>();
-    pages.add(new Pair(0L, 1441275));
-    pages.add(new Pair(1441275L, 1447446));
-    pages.add(new Pair(2888721L, 1442507));
+    NumberUtil.PrimitiveLongs pages = new NumberUtil.PrimitiveLongs(10);
+    pages.add(new long[]{0L, 1441275});
+    pages.add(new long[]{1441275L, 1447446});
+    pages.add(new long[]{2888721L, 1442507});
 
-    interm.setPages(pages);
+    interm.setPages(pages.toArray());
 
     long splitBytes = 3 * 1024 * 1024;
 
-    List<Pair<Long, Long>> splits = interm.split(splitBytes, splitBytes);
-    assertEquals(2, splits.size());
+    long[] splits = interm.split(splitBytes, splitBytes);
+    assertEquals(2 << 1, splits.length);
 
     long[][] expected = { {0, 1441275 + 1447446}, {1441275 + 1447446, 1442507} };
     for (int i = 0; i < 2; i++) {
-      Pair<Long, Long> eachSplit = splits.get(i);
-      assertEquals(expected[i][0], eachSplit.getFirst().longValue());
-      assertEquals(expected[i][1], eachSplit.getSecond().longValue());
+      assertEquals(expected[i][0], splits[i << 1]);
+      assertEquals(expected[i][1], splits[(i << 1) + 1]);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/fab63900/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/HashShuffleAppender.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/HashShuffleAppender.java
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/HashShuffleAppender.java
index 4c772c9..ccf5dae 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/HashShuffleAppender.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/HashShuffleAppender.java
@@ -18,16 +18,21 @@
 
 package org.apache.tajo.storage;
 
+import com.google.common.primitives.Longs;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.tajo.ExecutionBlockId;
 import org.apache.tajo.TaskAttemptId;
 import org.apache.tajo.catalog.statistics.TableStats;
-import org.apache.tajo.util.Pair;
+import org.apache.tajo.util.NumberUtil;
+import org.apache.tajo.util.NumberUtil.PrimitiveLongs;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -42,12 +47,12 @@ public class HashShuffleAppender implements Appender {
   private TableStats tableStats;
 
   //<taskId,<page start offset,<task start, task end>>>
-  private Map<TaskAttemptId, List<Pair<Long, Pair<Integer, Integer>>>>
taskTupleIndexes;
+  private Map<TaskAttemptId, PrimitiveLongs> taskTupleIndexes;
 
   //page start offset, length
-  private List<Pair<Long, Integer>> pages = new ArrayList<Pair<Long, Integer>>();
+  private PrimitiveLongs pages = new PrimitiveLongs(100);
 
-  private Pair<Long, Integer> currentPage;
+  private long[] currentPage;
 
   private int pageSize; //MB
 
@@ -68,8 +73,8 @@ public class HashShuffleAppender implements Appender {
 
   @Override
   public void init() throws IOException {
-    currentPage = new Pair(0L, 0);
-    taskTupleIndexes = new HashMap<TaskAttemptId, List<Pair<Long, Pair<Integer,
Integer>>>>();
+    currentPage = new long[2];
+    taskTupleIndexes = new HashMap<TaskAttemptId, PrimitiveLongs>();
     rowNumInPage = 0;
   }
 
@@ -96,16 +101,16 @@ public class HashShuffleAppender implements Appender {
       int writtenBytes = (int)(posAfterWritten - currentPos);
 
       int nextRowNum = rowNumInPage + tuples.size();
-      List<Pair<Long, Pair<Integer, Integer>>> taskIndexes = taskTupleIndexes.get(taskId);
+      PrimitiveLongs taskIndexes = taskTupleIndexes.get(taskId);
       if (taskIndexes == null) {
-        taskIndexes = new ArrayList<Pair<Long, Pair<Integer, Integer>>>();
+        taskIndexes = new PrimitiveLongs(100);
         taskTupleIndexes.put(taskId, taskIndexes);
       }
-      taskIndexes.add(
-          new Pair<Long, Pair<Integer, Integer>>(currentPage.getFirst(), new
Pair(rowNumInPage, nextRowNum)));
+      taskIndexes.add(currentPage[0]);
+      taskIndexes.add(NumberUtil.mergeToLong(rowNumInPage, nextRowNum));
       rowNumInPage = nextRowNum;
 
-      if (posAfterWritten - currentPage.getFirst() > pageSize) {
+      if (posAfterWritten - currentPage[0] > pageSize) {
         nextPage(posAfterWritten);
         rowNumInPage = 0;
       }
@@ -124,9 +129,9 @@ public class HashShuffleAppender implements Appender {
   }
 
   private void nextPage(long pos) {
-    currentPage.setSecond((int) (pos - currentPage.getFirst()));
+    currentPage[1] = pos - currentPage[0];
     pages.add(currentPage);
-    currentPage = new Pair(pos, 0);
+    currentPage = new long[] {pos, 0};
   }
 
   @Override
@@ -157,16 +162,18 @@ public class HashShuffleAppender implements Appender {
       }
       appender.flush();
       offset = appender.getOffset();
-      if (offset > currentPage.getFirst()) {
+      if (offset > currentPage[0]) {
         nextPage(offset);
       }
       appender.close();
       if (LOG.isDebugEnabled()) {
-        if (!pages.isEmpty()) {
-          LOG.info(ebId + ",partId=" + partId + " Appender closed: fileLen=" + offset + ",
pages=" + pages.size()
-              + ", lastPage=" + pages.get(pages.size() - 1));
+        int size = pages.size();
+        if (size > 0) {
+          long[] array = pages.backingArray();
+          LOG.info(ebId + ",partId=" + partId + " Appender closed: fileLen=" + offset + ",
pages=" + size
+              + ", lastPage=" + array[size - 2] + ", " + array[size - 1]);
         } else {
-          LOG.info(ebId + ",partId=" + partId + " Appender closed: fileLen=" + offset + ",
pages=" + pages.size());
+          LOG.info(ebId + ",partId=" + partId + " Appender closed: fileLen=" + offset + ",
pages=" + size);
         }
       }
       closed.set(true);
@@ -185,22 +192,48 @@ public class HashShuffleAppender implements Appender {
     }
   }
 
-  public List<Pair<Long, Integer>> getPages() {
+  public PrimitiveLongs getPages() {
     return pages;
   }
 
-  public Map<TaskAttemptId, List<Pair<Long, Pair<Integer, Integer>>>>
getTaskTupleIndexes() {
+  public Map<TaskAttemptId, PrimitiveLongs> getTaskTupleIndexes() {
     return taskTupleIndexes;
   }
 
-  public List<Pair<Long, Pair<Integer, Integer>>> getMergedTupleIndexes()
{
-    List<Pair<Long, Pair<Integer, Integer>>> merged = new ArrayList<Pair<Long,
Pair<Integer, Integer>>>();
-
-    for (List<Pair<Long, Pair<Integer, Integer>>> eachFailureIndex: taskTupleIndexes.values())
{
-      merged.addAll(eachFailureIndex);
-    }
+  public Iterable<Long> getMergedTupleIndexes() {
+    return getIterable(taskTupleIndexes.values());
+  }
 
-    return merged;
+  public Iterable<Long> getIterable(final Collection<PrimitiveLongs> values)
{
+    return new Iterable<Long>() {
+      @Override
+      public Iterator<Long> iterator() {
+        final Iterator<PrimitiveLongs> iterator1 = values.iterator();
+        return new Iterator<Long>() {
+          Iterator<Long> iterator2 = null;
+          @Override
+          public boolean hasNext() {
+            while (iterator2 == null || !iterator2.hasNext()) {
+              if (!iterator1.hasNext()) {
+                return false;
+              }
+              iterator2 = iterator1.next().iterator();
+            }
+            return true;
+          }
+
+          @Override
+          public Long next() {
+            return iterator2.next();
+          }
+
+          @Override
+          public void remove() {
+            throw new UnsupportedOperationException();
+          }
+        };
+      }
+    };
   }
 
   public void taskFinished(TaskAttemptId taskId) {

http://git-wip-us.apache.org/repos/asf/tajo/blob/fab63900/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/HashShuffleAppenderManager.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/HashShuffleAppenderManager.java
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/HashShuffleAppenderManager.java
index d2e9b4d..4635b76 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/HashShuffleAppenderManager.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/HashShuffleAppenderManager.java
@@ -176,14 +176,14 @@ public class HashShuffleAppenderManager {
     private long volume;
 
     //[<page start offset,<task start, task end>>]
-    private Collection<Pair<Long, Pair<Integer, Integer>>> failureTskTupleIndexes;
+    private Iterable<Long> failureTskTupleIndexes;
 
     //[<page start offset, length>]
-    private List<Pair<Long, Integer>> pages = new ArrayList<Pair<Long,
Integer>>();
+    private Iterable<Long> pages;
 
     public HashShuffleIntermediate(int partId, long volume,
-                                   List<Pair<Long, Integer>> pages,
-                                   Collection<Pair<Long, Pair<Integer, Integer>>>
failureTskTupleIndexes) {
+                                   Iterable<Long> pages,
+                                   Iterable<Long> failureTskTupleIndexes) {
       this.partId = partId;
       this.volume = volume;
       this.failureTskTupleIndexes = failureTskTupleIndexes;
@@ -198,11 +198,11 @@ public class HashShuffleAppenderManager {
       return volume;
     }
 
-    public Collection<Pair<Long, Pair<Integer, Integer>>> getFailureTskTupleIndexes()
{
+    public Iterable<Long> getFailureTskTupleIndexes() {
       return failureTskTupleIndexes;
     }
 
-    public List<Pair<Long, Integer>> getPages() {
+    public Iterable<Long> getPages() {
       return pages;
     }
   }


Mime
View raw message