Return-Path: X-Original-To: apmail-tez-commits-archive@minotaur.apache.org Delivered-To: apmail-tez-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 8B16010F32 for ; Tue, 15 Oct 2013 06:55:31 +0000 (UTC) Received: (qmail 40600 invoked by uid 500); 15 Oct 2013 06:55:29 -0000 Delivered-To: apmail-tez-commits-archive@tez.apache.org Received: (qmail 40553 invoked by uid 500); 15 Oct 2013 06:55:23 -0000 Mailing-List: contact commits-help@tez.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@tez.incubator.apache.org Delivered-To: mailing list commits@tez.incubator.apache.org Received: (qmail 40539 invoked by uid 99); 15 Oct 2013 06:55:20 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 15 Oct 2013 06:55:20 +0000 X-ASF-Spam-Status: No, hits=-2000.5 required=5.0 tests=ALL_TRUSTED,RP_MATCHES_RCVD X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO mail.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with SMTP; Tue, 15 Oct 2013 06:55:18 +0000 Received: (qmail 40465 invoked by uid 99); 15 Oct 2013 06:54:58 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 15 Oct 2013 06:54:58 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 958D88B432B; Tue, 15 Oct 2013 06:54:57 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sseth@apache.org To: commits@tez.incubator.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: git commit: TEZ-561. Change MRHelper byte helpers to use compression. (sseth) Date: Tue, 15 Oct 2013 06:54:57 +0000 (UTC) X-Virus-Checked: Checked by ClamAV on apache.org Updated Branches: refs/heads/master b719d2d44 -> 59539dde2 TEZ-561. Change MRHelper byte helpers to use compression. (sseth) Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/59539dde Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/59539dde Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/59539dde Branch: refs/heads/master Commit: 59539dde2e9dcbf280d7865fca6533fd55a584cd Parents: b719d2d Author: Siddharth Seth Authored: Mon Oct 14 23:54:42 2013 -0700 Committer: Siddharth Seth Committed: Mon Oct 14 23:54:42 2013 -0700 ---------------------------------------------------------------------- pom.xml | 5 + tez-common/pom.xml | 6 ++ .../java/org/apache/tez/common/TezUtils.java | 100 +++++++++++++++++-- .../apache/tez/mapreduce/hadoop/MRHelpers.java | 1 - 4 files changed, 103 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/59539dde/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index ad22ead..3addf16 100644 --- a/pom.xml +++ b/pom.xml @@ -246,6 +246,11 @@ guice 3.0 + + org.xerial.snappy + snappy-java + 1.0.4.1 + http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/59539dde/tez-common/pom.xml ---------------------------------------------------------------------- diff --git a/tez-common/pom.xml b/tez-common/pom.xml index 457d6e1..91c9d91 100644 --- a/tez-common/pom.xml +++ b/tez-common/pom.xml @@ -41,6 +41,12 @@ org.apache.tez tez-api + + org.xerial.snappy + snappy-java + jar + compile + http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/59539dde/tez-common/src/main/java/org/apache/tez/common/TezUtils.java ---------------------------------------------------------------------- diff --git a/tez-common/src/main/java/org/apache/tez/common/TezUtils.java b/tez-common/src/main/java/org/apache/tez/common/TezUtils.java index 65ac1a4..4e433aa 100644 --- a/tez-common/src/main/java/org/apache/tez/common/TezUtils.java +++ b/tez-common/src/main/java/org/apache/tez/common/TezUtils.java @@ -17,24 +17,35 @@ package org.apache.tez.common; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.FileInputStream; import java.io.IOException; import java.util.List; +import java.util.zip.DataFormatException; +import java.util.zip.Deflater; +import java.util.zip.DeflaterOutputStream; +import java.util.zip.Inflater; +import java.util.zip.InflaterInputStream; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.DataInputBuffer; -import org.apache.hadoop.io.DataInputByteBuffer; import org.apache.hadoop.io.DataOutputBuffer; import org.apache.tez.dag.api.TezConfiguration; import org.apache.tez.dag.api.records.DAGProtos.ConfigurationProto; import org.apache.tez.dag.api.records.DAGProtos.PlanKeyValuePair; import com.google.common.base.Preconditions; +import com.google.common.base.Stopwatch; import com.google.protobuf.ByteString; public class TezUtils { + private static final Log LOG = LogFactory.getLog(TezUtils.class); + public static void addUserSpecifiedTezConfiguration(Configuration conf) throws IOException { FileInputStream confPBBinaryStream = null; @@ -63,26 +74,30 @@ public class TezUtils { throws IOException { Preconditions.checkNotNull(conf, "Configuration must be specified"); ByteString.Output os = ByteString.newOutput(); - DataOutputStream dos = new DataOutputStream(os); + //SnappyOutputStream compressOs = new SnappyOutputStream(os); + DeflaterOutputStream compressOs = new DeflaterOutputStream(os, new Deflater(Deflater.BEST_SPEED)); + DataOutputStream dos = new DataOutputStream(compressOs); conf.write(dos); + dos.close(); return os.toByteString(); } - + public static byte[] createUserPayloadFromConf(Configuration conf) throws IOException { Preconditions.checkNotNull(conf, "Configuration must be specified"); DataOutputBuffer dob = new DataOutputBuffer(); conf.write(dob); - return dob.getData(); + return compressBytes(dob.getData()); } public static Configuration createConfFromByteString(ByteString byteString) throws IOException { Preconditions.checkNotNull(byteString, "ByteString must be specified"); - DataInputByteBuffer dibb = new DataInputByteBuffer(); - dibb.reset(byteString.asReadOnlyByteBuffer()); +// SnappyInputStream uncompressIs = new SnappyInputStream(byteString.newInput()); + InflaterInputStream uncompressIs = new InflaterInputStream(byteString.newInput()); + DataInputStream dataInputStream = new DataInputStream(uncompressIs); Configuration conf = new Configuration(false); - conf.readFields(dibb); + conf.readFields(dataInputStream); return conf; } @@ -90,11 +105,80 @@ public class TezUtils { throws IOException { // TODO Avoid copy ? Preconditions.checkNotNull(bb, "Bytes must be specified"); + byte[] uncompressed = uncompressBytes(bb); DataInputBuffer dib = new DataInputBuffer(); - dib.reset(bb, 0, bb.length); + dib.reset(uncompressed, 0, uncompressed.length); Configuration conf = new Configuration(false); conf.readFields(dib); return conf; } + public static byte[] compressBytes(byte[] inBytes) throws IOException { + Stopwatch sw = null; + if (LOG.isDebugEnabled()) { + sw = new Stopwatch().start(); + } + byte[] compressed = compressBytesInflateDeflate(inBytes); + if (LOG.isDebugEnabled()) { + sw.stop(); + LOG.debug("UncompressedSize: " + inBytes.length + ", CompressedSize: " + + compressed.length + ", CompressTime: " + sw.elapsedMillis()); + } + return compressed; + } + + public static byte[] uncompressBytes(byte[] inBytes) throws IOException { + Stopwatch sw = null; + if (LOG.isDebugEnabled()) { + sw = new Stopwatch().start(); + } + byte[] uncompressed = uncompressBytesInflateDeflate(inBytes); + if (LOG.isDebugEnabled()) { + sw.stop(); + LOG.debug("CompressedSize: " + inBytes.length + ", UncompressedSize: " + + uncompressed.length + ", UncompressTimeTaken: " + + sw.elapsedMillis()); + } + return uncompressed; + } + +// private static byte[] compressBytesSnappy(byte[] inBytes) throws IOException { +// return Snappy.compress(inBytes); +// } +// +// private static byte[] uncompressBytesSnappy(byte[] inBytes) throws IOException { +// return Snappy.uncompress(inBytes); +// } + + private static byte[] compressBytesInflateDeflate(byte[] inBytes) { + Deflater deflater = new Deflater(Deflater.BEST_SPEED); + deflater.setInput(inBytes); + ByteArrayOutputStream bos = new ByteArrayOutputStream(inBytes.length); + deflater.finish(); + byte[] buffer = new byte[1024 * 8]; + while (!deflater.finished()) { + int count = deflater.deflate(buffer); + bos.write(buffer, 0, count); + } + byte[] output = bos.toByteArray(); + return output; + } + + private static byte[] uncompressBytesInflateDeflate(byte[] inBytes) throws IOException { + Inflater inflater = new Inflater(); + inflater.setInput(inBytes); + ByteArrayOutputStream bos = new ByteArrayOutputStream(inBytes.length); + byte[] buffer = new byte[1024 * 8]; + while (!inflater.finished()) { + int count; + try { + count = inflater.inflate(buffer); + } catch (DataFormatException e) { + throw new IOException(e); + } + bos.write(buffer, 0, count); + } + byte[] output = bos.toByteArray(); + return output; + } } http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/59539dde/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRHelpers.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRHelpers.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRHelpers.java index 6736467..98645f2 100644 --- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRHelpers.java +++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRHelpers.java @@ -19,7 +19,6 @@ package org.apache.tez.mapreduce.hadoop; import java.io.DataOutputStream; -import java.io.File; import java.io.IOException; import java.net.InetAddress; import java.util.ArrayList;