Return-Path: X-Original-To: apmail-tez-commits-archive@minotaur.apache.org Delivered-To: apmail-tez-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id C9B3810B12 for ; Tue, 25 Mar 2014 05:52:36 +0000 (UTC) Received: (qmail 84958 invoked by uid 500); 25 Mar 2014 05:52:36 -0000 Delivered-To: apmail-tez-commits-archive@tez.apache.org Received: (qmail 84892 invoked by uid 500); 25 Mar 2014 05:52:31 -0000 Mailing-List: contact commits-help@tez.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@tez.incubator.apache.org Delivered-To: mailing list commits@tez.incubator.apache.org Received: (qmail 84839 invoked by uid 99); 25 Mar 2014 05:52:29 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 25 Mar 2014 05:52:29 +0000 X-ASF-Spam-Status: No, hits=-2000.5 required=5.0 tests=ALL_TRUSTED,RP_MATCHES_RCVD X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO mail.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with SMTP; Tue, 25 Mar 2014 05:52:27 +0000 Received: (qmail 83605 invoked by uid 99); 25 Mar 2014 05:52:07 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 25 Mar 2014 05:52:07 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 71AEF8BB893; Tue, 25 Mar 2014 05:52:06 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sseth@apache.org To: commits@tez.incubator.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: git commit: TEZ-972. Shuffle Phase - optimize memory usage of empty partition data in DataMovementEvent. Contributed by Rajesh Balamohan. Date: Tue, 25 Mar 2014 05:52:05 +0000 (UTC) X-Virus-Checked: Checked by ClamAV on apache.org Repository: incubator-tez Updated Branches: refs/heads/master 0bf327c28 -> 762f322de TEZ-972. Shuffle Phase - optimize memory usage of empty partition data in DataMovementEvent. Contributed by Rajesh Balamohan. Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/762f322d Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/762f322d Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/762f322d Branch: refs/heads/master Commit: 762f322deef34e4b02d9df2a380784ceba359a13 Parents: 0bf327c Author: Siddharth Seth Authored: Mon Mar 24 22:51:20 2014 -0700 Committer: Siddharth Seth Committed: Mon Mar 24 22:51:20 2014 -0700 ---------------------------------------------------------------------- .../java/org/apache/tez/common/TezUtils.java | 27 +++++++++ .../org/apache/tez/common/TestTezUtils.java | 60 ++++++++++++++++++++ .../shuffle/impl/ShuffleInputEventHandler.java | 4 +- .../library/output/OnFileSortedOutput.java | 11 ++-- .../library/output/OnFileUnorderedKVOutput.java | 8 ++- .../impl/ShuffleInputEventHandlerImpl.java | 4 +- 6 files changed, 104 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/762f322d/tez-common/src/main/java/org/apache/tez/common/TezUtils.java ---------------------------------------------------------------------- diff --git a/tez-common/src/main/java/org/apache/tez/common/TezUtils.java b/tez-common/src/main/java/org/apache/tez/common/TezUtils.java index 2979d5f..e1fb5df 100644 --- a/tez-common/src/main/java/org/apache/tez/common/TezUtils.java +++ b/tez-common/src/main/java/org/apache/tez/common/TezUtils.java @@ -24,6 +24,7 @@ import java.io.FileNotFoundException; import java.io.IOException; import java.io.OutputStream; import java.io.PrintStream; +import java.util.BitSet; import java.util.Iterator; import java.util.List; import java.util.Map.Entry; @@ -298,4 +299,30 @@ public class TezUtils { return base + "_" + addend; } } + + public static BitSet fromByteArray(byte[] bytes) { + if (bytes == null) { + return new BitSet(); + } + BitSet bits = new BitSet(); + for (int i = 0; i < bytes.length * 8; i++) { + if ((bytes[(bytes.length) - (i / 8) - 1] & (1 << (i % 8))) > 0) { + bits.set(i); + } + } + return bits; + } + + public static byte[] toByteArray(BitSet bits) { + if (bits == null) { + return null; + } + byte[] bytes = new byte[bits.length() / 8 + 1]; + for (int i = 0; i < bits.length(); i++) { + if (bits.get(i)) { + bytes[(bytes.length) - (i / 8) - 1] |= 1 << (i % 8); + } + } + return bytes; + } } http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/762f322d/tez-common/src/test/java/org/apache/tez/common/TestTezUtils.java ---------------------------------------------------------------------- diff --git a/tez-common/src/test/java/org/apache/tez/common/TestTezUtils.java b/tez-common/src/test/java/org/apache/tez/common/TestTezUtils.java index ef4efe1..a994f8a 100644 --- a/tez-common/src/test/java/org/apache/tez/common/TestTezUtils.java +++ b/tez-common/src/test/java/org/apache/tez/common/TestTezUtils.java @@ -18,6 +18,8 @@ package org.apache.tez.common; import java.io.IOException; +import java.util.BitSet; +import java.util.Random; import org.apache.hadoop.conf.Configuration; import org.junit.Assert; @@ -61,6 +63,64 @@ public class TestTezUtils { Assert.assertTrue(cleaned.matches("\\w+")); } + @Test + public void testBitSetToByteArray() { + BitSet bitSet = createBitSet(0); + byte[] bytes = TezUtils.toByteArray(bitSet); + Assert.assertTrue(bytes.length == ((bitSet.length() / 8) + 1)); + + bitSet = createBitSet(1000); + bytes = TezUtils.toByteArray(bitSet); + Assert.assertTrue(bytes.length == ((bitSet.length() / 8) + 1)); + } + + @Test + public void testBitSetFromByteArray() { + BitSet bitSet = createBitSet(0); + byte[] bytes = TezUtils.toByteArray(bitSet); + Assert.assertEquals(TezUtils.fromByteArray(bytes).cardinality(), bitSet.cardinality()); + Assert.assertTrue(TezUtils.fromByteArray(bytes).equals(bitSet)); + + bitSet = createBitSet(1); + bytes = TezUtils.toByteArray(bitSet); + Assert.assertEquals(TezUtils.fromByteArray(bytes).cardinality(), bitSet.cardinality()); + Assert.assertTrue(TezUtils.fromByteArray(bytes).equals(bitSet)); + + bitSet = createBitSet(1000); + bytes = TezUtils.toByteArray(bitSet); + Assert.assertEquals(TezUtils.fromByteArray(bytes).cardinality(), bitSet.cardinality()); + Assert.assertTrue(TezUtils.fromByteArray(bytes).equals(bitSet)); + } + + @Test + public void testBitSetConversion() { + for (int i = 0 ; i < 16 ; i++) { + BitSet bitSet = createBitSetWithSingleEntry(i); + byte[] bytes = TezUtils.toByteArray(bitSet); + + BitSet deseraialized = TezUtils.fromByteArray(bytes); + Assert.assertEquals(bitSet, deseraialized); + Assert.assertEquals(bitSet.cardinality(), deseraialized.cardinality()); + Assert.assertEquals(1, deseraialized.cardinality()); + } + } + + private BitSet createBitSet(int size) { + BitSet bitSet = new BitSet(); + int bitsToEnable = (int) (size * 0.1); + Random rnd = new Random(); + for(int i = 0;i < bitsToEnable;i++) { + bitSet.set(rnd.nextInt(size)); + } + return bitSet; + } + + private BitSet createBitSetWithSingleEntry(int bitToSet) { + BitSet bitSet = new BitSet(); + bitSet.set(bitToSet); + return bitSet; + } + private Configuration getConf() { Configuration conf = new Configuration(false); conf.set("test1", "value1"); http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/762f322d/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandler.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandler.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandler.java index 00d9678..f11575b 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandler.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandler.java @@ -20,6 +20,7 @@ package org.apache.tez.runtime.library.common.shuffle.impl; import java.io.IOException; import java.net.URI; +import java.util.BitSet; import java.util.List; import org.apache.commons.logging.Log; @@ -87,7 +88,8 @@ public class ShuffleInputEventHandler { if (shufflePayload.hasEmptyPartitions()) { try { byte[] emptyPartitions = TezUtils.decompressByteStringToByteArray(shufflePayload.getEmptyPartitions()); - if (emptyPartitions[partitionId] == 1) { + BitSet emptyPartitionsBitSet = TezUtils.fromByteArray(emptyPartitions); + if (emptyPartitionsBitSet.get(partitionId)) { LOG.info("Source partition: " + partitionId + " did not generate any data. Not fetching."); scheduler.copySucceeded(srcAttemptIdentifier, null, 0, 0, 0, null); return; http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/762f322d/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileSortedOutput.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileSortedOutput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileSortedOutput.java index dfd238c..c8f2b22 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileSortedOutput.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileSortedOutput.java @@ -19,6 +19,7 @@ package org.apache.tez.runtime.library.output; import java.io.IOException; import java.nio.ByteBuffer; +import java.util.BitSet; import java.util.Collections; import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; @@ -154,20 +155,20 @@ public class OnFileSortedOutput implements LogicalOutput { if (sendEmptyPartitionDetails) { Path indexFile = sorter.getMapOutput().getOutputIndexFile(); TezSpillRecord spillRecord = new TezSpillRecord(indexFile, conf); - //TODO: replace with BitSet in JDK 1.7 (no support for valueOf, toByteArray in 1.6) - byte[] partitionDetails = new byte[numOutputs]; + BitSet emptyPartitionDetails = new BitSet(); int emptyPartitions = 0; for(int i=0;i 0) { - ByteString emptyPartitionsBytesString = TezUtils.compressByteArrayToByteString(partitionDetails); + ByteString emptyPartitionsBytesString = + TezUtils.compressByteArrayToByteString(TezUtils.toByteArray(emptyPartitionDetails)); payloadBuilder.setEmptyPartitions(emptyPartitionsBytesString); - LOG.info("EmptyPartition bitsetSize=" + partitionDetails.length + ", numOutputs=" + LOG.info("EmptyPartition bitsetSize=" + emptyPartitionDetails.cardinality() + ", numOutputs=" + numOutputs + ", emptyPartitions=" + emptyPartitions + ", compressedSize=" + emptyPartitionsBytesString.size()); } http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/762f322d/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileUnorderedKVOutput.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileUnorderedKVOutput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileUnorderedKVOutput.java index 867ccab..91ea94a 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileUnorderedKVOutput.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileUnorderedKVOutput.java @@ -19,6 +19,7 @@ package org.apache.tez.runtime.library.output; import java.nio.ByteBuffer; +import java.util.BitSet; import java.util.Collections; import java.util.List; @@ -128,9 +129,10 @@ public class OnFileUnorderedKVOutput implements LogicalOutput { // Set the list of empty partitions - single partition on this case. if (!outputGenerated) { LOG.info("No output was generated"); - byte[] emptyPartitions = new byte[1]; - emptyPartitions[0] = 1; - ByteString emptyPartitionsBytesString = TezUtils.compressByteArrayToByteString(emptyPartitions); + BitSet emptyPartitions = new BitSet(); + emptyPartitions.set(0); + ByteString emptyPartitionsBytesString = + TezUtils.compressByteArrayToByteString(TezUtils.toByteArray(emptyPartitions)); payloadBuilder.setEmptyPartitions(emptyPartitionsBytesString); } if (outputGenerated) { http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/762f322d/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/ShuffleInputEventHandlerImpl.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/ShuffleInputEventHandlerImpl.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/ShuffleInputEventHandlerImpl.java index 6f54b81..09cd1ea 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/ShuffleInputEventHandlerImpl.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/ShuffleInputEventHandlerImpl.java @@ -20,6 +20,7 @@ package org.apache.tez.runtime.library.shuffle.common.impl; import java.io.IOException; +import java.util.BitSet; import java.util.List; import org.apache.commons.logging.Log; @@ -101,7 +102,8 @@ public class ShuffleInputEventHandlerImpl implements ShuffleEventHandler { if (shufflePayload.hasEmptyPartitions()) { byte[] emptyPartitions = TezUtils.decompressByteStringToByteArray(shufflePayload .getEmptyPartitions()); - if (emptyPartitions[srcIndex] == 1) { + BitSet emptyPartionsBitSet = TezUtils.fromByteArray(emptyPartitions); + if (emptyPartionsBitSet.get(srcIndex)) { InputAttemptIdentifier srcAttemptIdentifier = new InputAttemptIdentifier(dme.getTargetIndex(), dme.getVersion()); LOG.info("Source partition: " + srcIndex + " did not generate any data. Not fetching.");