Return-Path: X-Original-To: apmail-tajo-commits-archive@minotaur.apache.org Delivered-To: apmail-tajo-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id F12AF17FF2 for ; Fri, 8 May 2015 05:53:52 +0000 (UTC) Received: (qmail 63709 invoked by uid 500); 8 May 2015 05:53:52 -0000 Delivered-To: apmail-tajo-commits-archive@tajo.apache.org Received: (qmail 63659 invoked by uid 500); 8 May 2015 05:53:52 -0000 Mailing-List: contact commits-help@tajo.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@tajo.apache.org Delivered-To: mailing list commits@tajo.apache.org Received: (qmail 63637 invoked by uid 99); 8 May 2015 05:53:52 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 08 May 2015 05:53:52 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 99A01E4422; Fri, 8 May 2015 05:53:52 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: jihoonson@apache.org To: commits@tajo.apache.org Date: Fri, 08 May 2015 05:53:52 -0000 Message-Id: <6b563d0e11fa48538b1ffa148441447a@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [1/4] tajo git commit: TAJO-1408 Make IntermediateEntryProto more compact. (Contributed by navis, Committed by hyunsik) Repository: tajo Updated Branches: refs/heads/index_support 42bcf2de0 -> 410ca3188 TAJO-1408 Make IntermediateEntryProto more compact. (Contributed by navis, Committed by hyunsik) close #428 Project: http://git-wip-us.apache.org/repos/asf/tajo/repo Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/fab63900 Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/fab63900 Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/fab63900 Branch: refs/heads/index_support Commit: fab63900cf44b61d571fb9c2982285bb8b669702 Parents: 9b3824b Author: Hyunsik Choi Authored: Thu May 7 14:20:24 2015 -0700 Committer: Hyunsik Choi Committed: Thu May 7 14:20:24 2015 -0700 ---------------------------------------------------------------------- CHANGES | 3 + .../java/org/apache/tajo/util/NumberUtil.java | 54 ++++++++++++ .../apache/tajo/querymaster/Repartitioner.java | 18 ++-- .../java/org/apache/tajo/querymaster/Task.java | 56 ++++++------- .../tajo/worker/ExecutionBlockContext.java | 32 ++----- .../src/main/proto/TajoWorkerProtocol.proto | 16 +--- .../apache/tajo/master/TestRepartitioner.java | 77 +++++++---------- .../tajo/querymaster/TestIntermediateEntry.java | 24 +++--- .../tajo/storage/HashShuffleAppender.java | 87 ++++++++++++++------ .../storage/HashShuffleAppenderManager.java | 12 +-- 10 files changed, 205 insertions(+), 174 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/fab63900/CHANGES ---------------------------------------------------------------------- diff --git a/CHANGES b/CHANGES index a8074cd..d448c09 100644 --- a/CHANGES +++ b/CHANGES @@ -24,6 +24,9 @@ Release 0.11.0 - unreleased IMPROVEMENT + TAJO-1408: Make IntermediateEntryProto more compact. + (Contributed by navis, Committed by hyunsik) + TAJO-1584: Remove QueryMaster client sharing in TajoMaster and TajoWorker. (jinho) http://git-wip-us.apache.org/repos/asf/tajo/blob/fab63900/tajo-common/src/main/java/org/apache/tajo/util/NumberUtil.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/util/NumberUtil.java b/tajo-common/src/main/java/org/apache/tajo/util/NumberUtil.java index 0d70cc2..d14e0b4 100644 --- a/tajo-common/src/main/java/org/apache/tajo/util/NumberUtil.java +++ b/tajo-common/src/main/java/org/apache/tajo/util/NumberUtil.java @@ -18,6 +18,7 @@ package org.apache.tajo.util; +import com.google.common.primitives.Longs; import io.netty.buffer.ByteBuf; import io.netty.util.internal.PlatformDependent; @@ -25,6 +26,8 @@ import java.io.IOException; import java.io.OutputStream; import java.lang.reflect.Constructor; import java.nio.charset.Charset; +import java.util.Arrays; +import java.util.Iterator; // this is an implementation copied from LazyPrimitives in hive public class NumberUtil { @@ -1050,4 +1053,55 @@ public class NumberUtil { return returnNumber; } + + public static long mergeToLong(int value1, int value2) { + return (long)value1 << 32 | value2 & 0xffffffffl; + } + + public static int toHighInt(long value) { + return (int)(value >> 32); + } + + public static int toLowInt(long value) { + return (int)value; + } + + public static class PrimitiveLongs implements Iterable { + int index; + long[] longArray; + + public PrimitiveLongs(int initLength) { + longArray = new long[initLength]; + } + public void add(long value) { + reserve(1)[index++] = value; + } + public void add(long[] value) { + System.arraycopy(value, 0, reserve(value.length), index, value.length); + index += value.length; + } + public long[] backingArray() { + return longArray; + } + public long[] toArray() { + return Arrays.copyOfRange(longArray, 0, index); + } + public int size() { + return index; + } + private long[] reserve(int reserve) { + if (index + reserve < longArray.length) { + return longArray; + } + int newLength = Math.max(index + reserve, longArray.length << 1); + long[] newLongArray = new long[newLength]; + System.arraycopy(longArray, 0, newLongArray, 0, index); + return longArray = newLongArray; + } + + @Override + public Iterator iterator() { + return Longs.asList(toArray()).iterator(); + } + } } http://git-wip-us.apache.org/repos/asf/tajo/blob/fab63900/tajo-core/src/main/java/org/apache/tajo/querymaster/Repartitioner.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/Repartitioner.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/Repartitioner.java index 8e9e343..545d615 100644 --- a/tajo-core/src/main/java/org/apache/tajo/querymaster/Repartitioner.java +++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/Repartitioner.java @@ -1074,15 +1074,17 @@ public class Repartitioner { firstSplitVolume = splitVolume; } - //Each Pair object in the splits variable is assigned to the next ExectionBlock's task. + //Each Pair object in the splits variable is assigned to the next ExecutionBlock's task. //The first long value is a offset of the intermediate file and the second long value is length. - List> splits = currentInterm.split(firstSplitVolume, splitVolume); - if (splits == null || splits.isEmpty()) { + long[] splits = currentInterm.split(firstSplitVolume, splitVolume); + if (splits == null || splits.length == 0) { break; } - for (Pair eachSplit: splits) { - if (fetchListVolume > 0 && fetchListVolume + eachSplit.getSecond() >= splitVolume) { + for (int i = 0; i < splits.length; i += 2) { + long offset = splits[i]; + long length = splits[i + 1]; + if (fetchListVolume > 0 && fetchListVolume + length >= splitVolume) { if (!fetchListForSingleTask.isEmpty()) { fetches.add(fetchListForSingleTask); } @@ -1091,10 +1093,10 @@ public class Repartitioner { } FetchImpl fetch = new FetchImpl(currentInterm.getPullHost(), SCATTERED_HASH_SHUFFLE, ebId, currentInterm.getPartId(), TUtil.newList(currentInterm)); - fetch.setOffset(eachSplit.getFirst()); - fetch.setLength(eachSplit.getSecond()); + fetch.setOffset(offset); + fetch.setLength(length); fetchListForSingleTask.add(fetch); - fetchListVolume += eachSplit.getSecond(); + fetchListVolume += length; } } if (!fetchListForSingleTask.isEmpty()) { http://git-wip-us.apache.org/repos/asf/tajo/blob/fab63900/tajo-core/src/main/java/org/apache/tajo/querymaster/Task.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/Task.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/Task.java index 1da623e..d2be973 100644 --- a/tajo-core/src/main/java/org/apache/tajo/querymaster/Task.java +++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/Task.java @@ -23,6 +23,7 @@ import com.google.common.base.Objects; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; +import com.google.common.primitives.Longs; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -34,7 +35,6 @@ import org.apache.tajo.TajoProtos.TaskAttemptState; import org.apache.tajo.TaskAttemptId; import org.apache.tajo.TaskId; import org.apache.tajo.catalog.statistics.TableStats; -import org.apache.tajo.ipc.TajoWorkerProtocol.FailureIntermediateProto; import org.apache.tajo.ipc.TajoWorkerProtocol.IntermediateEntryProto; import org.apache.tajo.master.TaskState; import org.apache.tajo.master.event.*; @@ -44,6 +44,7 @@ import org.apache.tajo.storage.DataLocation; import org.apache.tajo.storage.fragment.FileFragment; import org.apache.tajo.storage.fragment.Fragment; import org.apache.tajo.storage.fragment.FragmentConvertor; +import org.apache.tajo.util.NumberUtil; import org.apache.tajo.util.Pair; import org.apache.tajo.util.TajoIdUtils; import org.apache.tajo.util.history.TaskHistory; @@ -785,8 +786,8 @@ public class Task implements EventHandler { int partId; PullHost host; long volume; - List> pages; - List>> failureRowNums; + long[] pages; + long[] failureRowNums; public IntermediateEntry(IntermediateEntryProto proto) { this.ebId = new ExecutionBlockId(proto.getEbId()); @@ -794,21 +795,12 @@ public class Task implements EventHandler { this.attemptId = proto.getAttemptId(); this.partId = proto.getPartId(); - String[] pullHost = proto.getHost().split(":"); + String [] pullHost = proto.getAddress().split(":"); this.host = new PullHost(pullHost[0], Integer.parseInt(pullHost[1])); this.volume = proto.getVolume(); - failureRowNums = new ArrayList>>(); - for (FailureIntermediateProto eachFailure: proto.getFailuresList()) { - - failureRowNums.add(new Pair(eachFailure.getPagePos(), - new Pair(eachFailure.getStartRowNum(), eachFailure.getEndRowNum()))); - } - - pages = new ArrayList>(); - for (IntermediateEntryProto.PageProto eachPage: proto.getPagesList()) { - pages.add(new Pair(eachPage.getPos(), eachPage.getLength())); - } + this.failureRowNums = Longs.toArray(proto.getFailuresList()); + this.pages = Longs.toArray(proto.getPagesList()); } public IntermediateEntry(int taskId, int attemptId, int partId, PullHost host) { @@ -858,15 +850,15 @@ public class Task implements EventHandler { return this.volume = volume; } - public List> getPages() { + public long[] getPages() { return pages; } - public void setPages(List> pages) { + public void setPages(long[] pages) { this.pages = pages; } - public List>> getFailureRowNums() { + public long[] getFailureRowNums() { return failureRowNums; } @@ -875,38 +867,38 @@ public class Task implements EventHandler { return Objects.hashCode(ebId, taskId, partId, attemptId, host); } - public List> split(long firstSplitVolume, long splitVolume) { - List> splits = new ArrayList>(); + public long[] split(long firstSplitVolume, long splitVolume) { - if (pages == null || pages.isEmpty()) { - return splits; + if (pages == null || pages.length == 0) { + return null; } - int pageSize = pages.size(); + NumberUtil.PrimitiveLongs splits = new NumberUtil.PrimitiveLongs(100); long currentOffset = -1; long currentBytes = 0; long realSplitVolume = firstSplitVolume > 0 ? firstSplitVolume : splitVolume; - for (int i = 0; i < pageSize; i++) { - Pair eachPage = pages.get(i); + for (int i = 0; i < pages.length; i += 2) { if (currentOffset == -1) { - currentOffset = eachPage.getFirst(); + currentOffset = pages[i]; } - if (currentBytes > 0 && currentBytes + eachPage.getSecond() >= realSplitVolume) { - splits.add(new Pair(currentOffset, currentBytes)); - currentOffset = eachPage.getFirst(); + if (currentBytes > 0 && currentBytes + pages[i + 1] >= realSplitVolume) { + splits.add(currentOffset); + splits.add(currentBytes); + currentOffset = pages[i]; currentBytes = 0; realSplitVolume = splitVolume; } - currentBytes += eachPage.getSecond(); + currentBytes += pages[i + 1]; } //add last if (currentBytes > 0) { - splits.add(new Pair(currentOffset, currentBytes)); + splits.add(currentOffset); + splits.add(currentBytes); } - return splits; + return splits.toArray(); } } } http://git-wip-us.apache.org/repos/asf/tajo/blob/fab63900/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java b/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java index cd4b6a6..270000a 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java @@ -41,12 +41,12 @@ import org.apache.tajo.rpc.RpcClientManager; import org.apache.tajo.storage.HashShuffleAppenderManager; import org.apache.tajo.storage.StorageUtil; import org.apache.tajo.util.NetUtils; -import org.apache.tajo.util.Pair; import java.io.IOException; import java.net.ConnectException; import java.net.InetSocketAddress; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentMap; @@ -312,38 +312,18 @@ public class ExecutionBlockContext { } IntermediateEntryProto.Builder intermediateBuilder = IntermediateEntryProto.newBuilder(); - IntermediateEntryProto.PageProto.Builder pageBuilder = IntermediateEntryProto.PageProto.newBuilder(); - FailureIntermediateProto.Builder failureBuilder = FailureIntermediateProto.newBuilder(); + WorkerConnectionInfo connectionInfo = getWorkerContext().getConnectionInfo(); + String address = connectionInfo.getHost() + ":" + connectionInfo.getPullServerPort(); for (HashShuffleAppenderManager.HashShuffleIntermediate eachShuffle: shuffles) { - List pages = Lists.newArrayList(); - List failureIntermediateItems = Lists.newArrayList(); - - for (Pair eachPage: eachShuffle.getPages()) { - pageBuilder.clear(); - pageBuilder.setPos(eachPage.getFirst()); - pageBuilder.setLength(eachPage.getSecond()); - pages.add(pageBuilder.build()); - } - - for(Pair> eachFailure: eachShuffle.getFailureTskTupleIndexes()) { - failureBuilder.clear(); - failureBuilder.setPagePos(eachFailure.getFirst()); - failureBuilder.setStartRowNum(eachFailure.getSecond().getFirst()); - failureBuilder.setEndRowNum(eachFailure.getSecond().getSecond()); - failureIntermediateItems.add(failureBuilder.build()); - } - intermediateBuilder.clear(); - intermediateBuilder.setEbId(ebId.getProto()) - .setHost(getWorkerContext().getConnectionInfo().getHost() + ":" + - getWorkerContext().getConnectionInfo().getPullServerPort()) + .setAddress(address) .setTaskId(-1) .setAttemptId(-1) .setPartId(eachShuffle.getPartId()) .setVolume(eachShuffle.getVolume()) - .addAllPages(pages) - .addAllFailures(failureIntermediateItems); + .addAllPages(eachShuffle.getPages()) + .addAllFailures(eachShuffle.getFailureTskTupleIndexes()); intermediateEntries.add(intermediateBuilder.build()); } http://git-wip-us.apache.org/repos/asf/tajo/blob/fab63900/tajo-core/src/main/proto/TajoWorkerProtocol.proto ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/proto/TajoWorkerProtocol.proto b/tajo-core/src/main/proto/TajoWorkerProtocol.proto index fddef8f..5d8e446 100644 --- a/tajo-core/src/main/proto/TajoWorkerProtocol.proto +++ b/tajo-core/src/main/proto/TajoWorkerProtocol.proto @@ -97,25 +97,15 @@ message FetchProto { optional int64 length = 12; } -message FailureIntermediateProto { - required int64 pagePos = 1; - required int32 startRowNum = 2; - required int32 endRowNum = 3; -} - message IntermediateEntryProto { - message PageProto { - required int64 pos = 1; - required int32 length = 2; - } required ExecutionBlockIdProto ebId = 1; required int32 taskId = 2; required int32 attemptId = 3; required int32 partId = 4; - required string host = 5; + required string address = 5; required int64 volume = 6; - repeated PageProto pages = 7; - repeated FailureIntermediateProto failures = 8; + repeated int64 pages = 7; // pos : length + repeated int64 failures = 8; // pagePos : startRowNum:endRowNum } message ExecutionBlockReport { http://git-wip-us.apache.org/repos/asf/tajo/blob/fab63900/tajo-core/src/test/java/org/apache/tajo/master/TestRepartitioner.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/master/TestRepartitioner.java b/tajo-core/src/test/java/org/apache/tajo/master/TestRepartitioner.java index 9910d79..17706f4 100644 --- a/tajo-core/src/test/java/org/apache/tajo/master/TestRepartitioner.java +++ b/tajo-core/src/test/java/org/apache/tajo/master/TestRepartitioner.java @@ -28,6 +28,7 @@ import org.apache.tajo.ipc.TajoWorkerProtocol; import org.apache.tajo.querymaster.Task; import org.apache.tajo.querymaster.Task.IntermediateEntry; import org.apache.tajo.querymaster.Repartitioner; +import org.apache.tajo.util.NumberUtil; import org.apache.tajo.util.Pair; import org.apache.tajo.util.TUtil; import org.apache.tajo.worker.FetchImpl; @@ -176,20 +177,7 @@ public class TestRepartitioner { List intermediateEntries = new ArrayList(); int[] pageLengths = {10 * 1024 * 1024, 10 * 1024 * 1024, 10 * 1024 * 1024, 5 * 1024 * 1024}; //35 MB - long expectedTotalLength = 0; - for (int i = 0; i < 20; i++) { - List> pages = new ArrayList>(); - long offset = 0; - for (int j = 0; j < pageLengths.length; j++) { - pages.add(new Pair(offset, pageLengths[j])); - offset += pageLengths[j]; - expectedTotalLength += pageLengths[j]; - } - IntermediateEntry interm = new IntermediateEntry(i, -1, -1, new Task.PullHost("" + i, i)); - interm.setPages(pages); - interm.setVolume(offset); - intermediateEntries.add(interm); - } + long expectedTotalLength = makeIntermediates(pageLengths, true, intermediateEntries); long splitVolume = 128 * 1024 * 1024; List> fetches = Repartitioner.splitOrMergeIntermediates(null, intermediateEntries, @@ -221,6 +209,27 @@ public class TestRepartitioner { assertEquals(expectedTotalLength, totalLength); } + private long makeIntermediates(int[] pageLengths, boolean uniqueHosts, + List intermediateEntries) { + long expectedTotalLength = 0; + for (int i = 0; i < 20; i++) { + NumberUtil.PrimitiveLongs pages = new NumberUtil.PrimitiveLongs(10); + long offset = 0; + for (int pageLength : pageLengths) { + pages.add(offset); + pages.add(pageLength); + offset += pageLength; + expectedTotalLength += pageLength; + } + IntermediateEntry interm = new IntermediateEntry(i, -1, -1, + new Task.PullHost(uniqueHosts ? "" + i : "", uniqueHosts ? i : 0)); + interm.setPages(pages.toArray()); + interm.setVolume(offset); + intermediateEntries.add(interm); + } + return expectedTotalLength; + } + @Test public void testSplitIntermediates() { List intermediateEntries = new ArrayList(); @@ -234,20 +243,7 @@ public class TestRepartitioner { } } - long expectedTotalLength = 0; - for (int i = 0; i < 20; i++) { - List> pages = new ArrayList>(); - long offset = 0; - for (int j = 0; j < pageLengths.length; j++) { - pages.add(new Pair(offset, pageLengths[j])); - offset += pageLengths[j]; - expectedTotalLength += pageLengths[j]; - } - IntermediateEntry interm = new IntermediateEntry(i, -1, 0, new Task.PullHost("" + i, i)); - interm.setPages(pages); - interm.setVolume(offset); - intermediateEntries.add(interm); - } + long expectedTotalLength = makeIntermediates(pageLengths, true, intermediateEntries); long splitVolume = 128 * 1024 * 1024; List> fetches = Repartitioner.splitOrMergeIntermediates(null, intermediateEntries, @@ -368,12 +364,12 @@ public class TestRepartitioner { List entries = new ArrayList(); for (int i = 0; i < 2; i++) { - List> pages = new ArrayList>(); - for (int j = 0; j < pageDatas.length; j++) { - pages.add(new Pair(pageDatas[j][0], (int) (pageDatas[j][1]))); + NumberUtil.PrimitiveLongs pages = new NumberUtil.PrimitiveLongs(10); + for (long[] pageData : pageDatas) { + pages.add(pageData); } IntermediateEntry entry = new IntermediateEntry(-1, -1, 1, new Task.PullHost("host" + i , 9000)); - entry.setPages(pages); + entry.setPages(pages.toArray()); entries.add(entry); } @@ -421,22 +417,7 @@ public class TestRepartitioner { } } - long expectedTotalLength = 0; - Task.PullHost pullHost = new Task.PullHost("host", 0); - - for (int i = 0; i < 20; i++) { - List> pages = new ArrayList>(); - long offset = 0; - for (int j = 0; j < pageLengths.length; j++) { - pages.add(new Pair(offset, pageLengths[j])); - offset += pageLengths[j]; - expectedTotalLength += pageLengths[j]; - } - IntermediateEntry interm = new IntermediateEntry(i, -1, 0, pullHost); - interm.setPages(pages); - interm.setVolume(offset); - intermediateEntries.add(interm); - } + long expectedTotalLength = makeIntermediates(pageLengths, false, intermediateEntries); long splitVolume = 128 * 1024 * 1024; List> fetches = Repartitioner.splitOrMergeIntermediates(null, intermediateEntries, http://git-wip-us.apache.org/repos/asf/tajo/blob/fab63900/tajo-core/src/test/java/org/apache/tajo/querymaster/TestIntermediateEntry.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/querymaster/TestIntermediateEntry.java b/tajo-core/src/test/java/org/apache/tajo/querymaster/TestIntermediateEntry.java index 237fb32..b81085b 100644 --- a/tajo-core/src/test/java/org/apache/tajo/querymaster/TestIntermediateEntry.java +++ b/tajo-core/src/test/java/org/apache/tajo/querymaster/TestIntermediateEntry.java @@ -18,12 +18,9 @@ package org.apache.tajo.querymaster; -import org.apache.tajo.util.Pair; +import org.apache.tajo.util.NumberUtil; import org.junit.Test; -import java.util.ArrayList; -import java.util.List; - import static org.junit.Assert.assertEquals; public class TestIntermediateEntry { @@ -31,23 +28,22 @@ public class TestIntermediateEntry { public void testPage() { Task.IntermediateEntry interm = new Task.IntermediateEntry(-1, -1, 1, null); - List> pages = new ArrayList>(); - pages.add(new Pair(0L, 1441275)); - pages.add(new Pair(1441275L, 1447446)); - pages.add(new Pair(2888721L, 1442507)); + NumberUtil.PrimitiveLongs pages = new NumberUtil.PrimitiveLongs(10); + pages.add(new long[]{0L, 1441275}); + pages.add(new long[]{1441275L, 1447446}); + pages.add(new long[]{2888721L, 1442507}); - interm.setPages(pages); + interm.setPages(pages.toArray()); long splitBytes = 3 * 1024 * 1024; - List> splits = interm.split(splitBytes, splitBytes); - assertEquals(2, splits.size()); + long[] splits = interm.split(splitBytes, splitBytes); + assertEquals(2 << 1, splits.length); long[][] expected = { {0, 1441275 + 1447446}, {1441275 + 1447446, 1442507} }; for (int i = 0; i < 2; i++) { - Pair eachSplit = splits.get(i); - assertEquals(expected[i][0], eachSplit.getFirst().longValue()); - assertEquals(expected[i][1], eachSplit.getSecond().longValue()); + assertEquals(expected[i][0], splits[i << 1]); + assertEquals(expected[i][1], splits[(i << 1) + 1]); } } } http://git-wip-us.apache.org/repos/asf/tajo/blob/fab63900/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/HashShuffleAppender.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/HashShuffleAppender.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/HashShuffleAppender.java index 4c772c9..ccf5dae 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/HashShuffleAppender.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/HashShuffleAppender.java @@ -18,16 +18,21 @@ package org.apache.tajo.storage; +import com.google.common.primitives.Longs; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.tajo.ExecutionBlockId; import org.apache.tajo.TaskAttemptId; import org.apache.tajo.catalog.statistics.TableStats; -import org.apache.tajo.util.Pair; +import org.apache.tajo.util.NumberUtil; +import org.apache.tajo.util.NumberUtil.PrimitiveLongs; import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; import java.util.HashMap; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; @@ -42,12 +47,12 @@ public class HashShuffleAppender implements Appender { private TableStats tableStats; //>> - private Map>>> taskTupleIndexes; + private Map taskTupleIndexes; //page start offset, length - private List> pages = new ArrayList>(); + private PrimitiveLongs pages = new PrimitiveLongs(100); - private Pair currentPage; + private long[] currentPage; private int pageSize; //MB @@ -68,8 +73,8 @@ public class HashShuffleAppender implements Appender { @Override public void init() throws IOException { - currentPage = new Pair(0L, 0); - taskTupleIndexes = new HashMap>>>(); + currentPage = new long[2]; + taskTupleIndexes = new HashMap(); rowNumInPage = 0; } @@ -96,16 +101,16 @@ public class HashShuffleAppender implements Appender { int writtenBytes = (int)(posAfterWritten - currentPos); int nextRowNum = rowNumInPage + tuples.size(); - List>> taskIndexes = taskTupleIndexes.get(taskId); + PrimitiveLongs taskIndexes = taskTupleIndexes.get(taskId); if (taskIndexes == null) { - taskIndexes = new ArrayList>>(); + taskIndexes = new PrimitiveLongs(100); taskTupleIndexes.put(taskId, taskIndexes); } - taskIndexes.add( - new Pair>(currentPage.getFirst(), new Pair(rowNumInPage, nextRowNum))); + taskIndexes.add(currentPage[0]); + taskIndexes.add(NumberUtil.mergeToLong(rowNumInPage, nextRowNum)); rowNumInPage = nextRowNum; - if (posAfterWritten - currentPage.getFirst() > pageSize) { + if (posAfterWritten - currentPage[0] > pageSize) { nextPage(posAfterWritten); rowNumInPage = 0; } @@ -124,9 +129,9 @@ public class HashShuffleAppender implements Appender { } private void nextPage(long pos) { - currentPage.setSecond((int) (pos - currentPage.getFirst())); + currentPage[1] = pos - currentPage[0]; pages.add(currentPage); - currentPage = new Pair(pos, 0); + currentPage = new long[] {pos, 0}; } @Override @@ -157,16 +162,18 @@ public class HashShuffleAppender implements Appender { } appender.flush(); offset = appender.getOffset(); - if (offset > currentPage.getFirst()) { + if (offset > currentPage[0]) { nextPage(offset); } appender.close(); if (LOG.isDebugEnabled()) { - if (!pages.isEmpty()) { - LOG.info(ebId + ",partId=" + partId + " Appender closed: fileLen=" + offset + ", pages=" + pages.size() - + ", lastPage=" + pages.get(pages.size() - 1)); + int size = pages.size(); + if (size > 0) { + long[] array = pages.backingArray(); + LOG.info(ebId + ",partId=" + partId + " Appender closed: fileLen=" + offset + ", pages=" + size + + ", lastPage=" + array[size - 2] + ", " + array[size - 1]); } else { - LOG.info(ebId + ",partId=" + partId + " Appender closed: fileLen=" + offset + ", pages=" + pages.size()); + LOG.info(ebId + ",partId=" + partId + " Appender closed: fileLen=" + offset + ", pages=" + size); } } closed.set(true); @@ -185,22 +192,48 @@ public class HashShuffleAppender implements Appender { } } - public List> getPages() { + public PrimitiveLongs getPages() { return pages; } - public Map>>> getTaskTupleIndexes() { + public Map getTaskTupleIndexes() { return taskTupleIndexes; } - public List>> getMergedTupleIndexes() { - List>> merged = new ArrayList>>(); - - for (List>> eachFailureIndex: taskTupleIndexes.values()) { - merged.addAll(eachFailureIndex); - } + public Iterable getMergedTupleIndexes() { + return getIterable(taskTupleIndexes.values()); + } - return merged; + public Iterable getIterable(final Collection values) { + return new Iterable() { + @Override + public Iterator iterator() { + final Iterator iterator1 = values.iterator(); + return new Iterator() { + Iterator iterator2 = null; + @Override + public boolean hasNext() { + while (iterator2 == null || !iterator2.hasNext()) { + if (!iterator1.hasNext()) { + return false; + } + iterator2 = iterator1.next().iterator(); + } + return true; + } + + @Override + public Long next() { + return iterator2.next(); + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + }; + } + }; } public void taskFinished(TaskAttemptId taskId) { http://git-wip-us.apache.org/repos/asf/tajo/blob/fab63900/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/HashShuffleAppenderManager.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/HashShuffleAppenderManager.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/HashShuffleAppenderManager.java index d2e9b4d..4635b76 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/HashShuffleAppenderManager.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/HashShuffleAppenderManager.java @@ -176,14 +176,14 @@ public class HashShuffleAppenderManager { private long volume; //[>] - private Collection>> failureTskTupleIndexes; + private Iterable failureTskTupleIndexes; //[] - private List> pages = new ArrayList>(); + private Iterable pages; public HashShuffleIntermediate(int partId, long volume, - List> pages, - Collection>> failureTskTupleIndexes) { + Iterable pages, + Iterable failureTskTupleIndexes) { this.partId = partId; this.volume = volume; this.failureTskTupleIndexes = failureTskTupleIndexes; @@ -198,11 +198,11 @@ public class HashShuffleAppenderManager { return volume; } - public Collection>> getFailureTskTupleIndexes() { + public Iterable getFailureTskTupleIndexes() { return failureTskTupleIndexes; } - public List> getPages() { + public Iterable getPages() { return pages; } }