Return-Path: Delivered-To: apmail-hadoop-core-commits-archive@www.apache.org Received: (qmail 72265 invoked from network); 23 Apr 2009 22:10:35 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.3) by minotaur.apache.org with SMTP; 23 Apr 2009 22:10:35 -0000 Received: (qmail 72336 invoked by uid 500); 23 Apr 2009 22:10:35 -0000 Delivered-To: apmail-hadoop-core-commits-archive@hadoop.apache.org Received: (qmail 72278 invoked by uid 500); 23 Apr 2009 22:10:35 -0000 Mailing-List: contact core-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: core-dev@hadoop.apache.org Delivered-To: mailing list core-commits@hadoop.apache.org Received: (qmail 72269 invoked by uid 99); 23 Apr 2009 22:10:35 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 23 Apr 2009 22:10:35 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 23 Apr 2009 22:10:34 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id D4A14238899B; Thu, 23 Apr 2009 22:10:13 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r768073 - in /hadoop/core/trunk: CHANGES.txt src/mapred/org/apache/hadoop/mapred/join/TupleWritable.java src/test/org/apache/hadoop/mapred/join/TestTupleWritable.java Date: Thu, 23 Apr 2009 22:10:13 -0000 To: core-commits@hadoop.apache.org From: cdouglas@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20090423221013.D4A14238899B@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: cdouglas Date: Thu Apr 23 22:10:13 2009 New Revision: 768073 URL: http://svn.apache.org/viewvc?rev=768073&view=rev Log: HADOOP-5589. Eliminate source limit of 64 for map-side joins imposed by TupleWritable encoding. Contributed by Jingkei Ly Modified: hadoop/core/trunk/CHANGES.txt hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/join/TupleWritable.java hadoop/core/trunk/src/test/org/apache/hadoop/mapred/join/TestTupleWritable.java Modified: hadoop/core/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=768073&r1=768072&r2=768073&view=diff ============================================================================== --- hadoop/core/trunk/CHANGES.txt (original) +++ hadoop/core/trunk/CHANGES.txt Thu Apr 23 22:10:13 2009 @@ -253,6 +253,9 @@ HADOOP-5705. Improve TotalOrderPartitioner efficiency by updating the trie construction. (Dick King via cdouglas) + HADOOP-5589. Eliminate source limit of 64 for map-side joins imposed by + TupleWritable encoding. (Jingkei Ly via cdouglas) + OPTIMIZATIONS HADOOP-5595. NameNode does not need to run a replicator to choose a Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/join/TupleWritable.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/join/TupleWritable.java?rev=768073&r1=768072&r2=768073&view=diff ============================================================================== --- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/join/TupleWritable.java (original) +++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/join/TupleWritable.java Thu Apr 23 22:10:13 2009 @@ -21,6 +21,7 @@ import java.io.DataOutput; import java.io.DataInput; import java.io.IOException; +import java.util.BitSet; import java.util.Iterator; import java.util.NoSuchElementException; @@ -42,20 +43,22 @@ */ public class TupleWritable implements Writable, Iterable { - private long written; + private BitSet written; private Writable[] values; /** * Create an empty tuple with no allocated storage for writables. */ - public TupleWritable() { } + public TupleWritable() { + written = new BitSet(0); + } /** * Initialize tuple with storage; unknown whether any of them contain * "written" values. */ public TupleWritable(Writable[] vals) { - written = 0L; + written = new BitSet(vals.length); values = vals; } @@ -63,7 +66,7 @@ * Return true if tuple has an element at the position provided. */ public boolean has(int i) { - return 0 != ((1L << i) & written); + return written.get(i); } /** @@ -86,7 +89,7 @@ public boolean equals(Object other) { if (other instanceof TupleWritable) { TupleWritable that = (TupleWritable)other; - if (this.size() != that.size() || this.written != that.written) { + if (!this.written.equals(that.written)) { return false; } for (int i = 0; i < values.length; ++i) { @@ -102,7 +105,7 @@ public int hashCode() { assert false : "hashCode not designed"; - return (int)written; + return written.hashCode(); } /** @@ -113,24 +116,22 @@ public Iterator iterator() { final TupleWritable t = this; return new Iterator() { - long i = written; - long last = 0L; + int bitIndex = written.nextSetBit(0); public boolean hasNext() { - return 0L != i; + return bitIndex >= 0; } public Writable next() { - last = Long.lowestOneBit(i); - if (0 == last) + int returnIndex = bitIndex; + if (returnIndex < 0) throw new NoSuchElementException(); - i ^= last; - // numberOfTrailingZeros rtn 64 if lsb set - return t.get(Long.numberOfTrailingZeros(last) % 64); + bitIndex = written.nextSetBit(bitIndex+1); + return t.get(returnIndex); } public void remove() { - t.written ^= last; - if (t.has(Long.numberOfTrailingZeros(last))) { + if (!written.get(bitIndex)) { throw new IllegalStateException("Attempt to remove non-existent val"); } + written.clear(bitIndex); } }; } @@ -162,7 +163,7 @@ */ public void write(DataOutput out) throws IOException { WritableUtils.writeVInt(out, values.length); - WritableUtils.writeVLong(out, written); + writeBitSet(out, values.length, written); for (int i = 0; i < values.length; ++i) { Text.writeString(out, values[i].getClass().getName()); } @@ -180,7 +181,7 @@ public void readFields(DataInput in) throws IOException { int card = WritableUtils.readVInt(in); values = new Writable[card]; - written = WritableUtils.readVLong(in); + readBitSet(in, card, written); Class[] cls = new Class[card]; try { for (int i = 0; i < card; ++i) { @@ -205,7 +206,7 @@ * Record that the tuple contains an element at the position provided. */ void setWritten(int i) { - written |= 1L << i; + written.set(i); } /** @@ -213,7 +214,7 @@ * provided. */ void clearWritten(int i) { - written &= -1 ^ (1L << i); + written.clear(i); } /** @@ -221,7 +222,67 @@ * releasing storage. */ void clearWritten() { - written = 0L; + written.clear(); + } + + /** + * Writes the bit set to the stream. The first 64 bit-positions of the bit set + * are written as a VLong for backwards-compatibility with older versions of + * TupleWritable. All bit-positions >= 64 are encoded as a byte for every 8 + * bit-positions. + */ + private static final void writeBitSet(DataOutput stream, int nbits, BitSet bitSet) + throws IOException { + long bits = 0L; + + int bitSetIndex = bitSet.nextSetBit(0); + for (;bitSetIndex >= 0 && bitSetIndex < Long.SIZE; + bitSetIndex=bitSet.nextSetBit(bitSetIndex+1)) { + bits |= 1L << bitSetIndex; + } + WritableUtils.writeVLong(stream,bits); + + if (nbits > Long.SIZE) { + bits = 0L; + for (int lastWordWritten = 0; bitSetIndex >= 0 && bitSetIndex < nbits; + bitSetIndex = bitSet.nextSetBit(bitSetIndex+1)) { + int bitsIndex = bitSetIndex % Byte.SIZE; + int word = (bitSetIndex-Long.SIZE) / Byte.SIZE; + if (word > lastWordWritten) { + stream.writeByte((byte)bits); + bits = 0L; + for (lastWordWritten++;lastWordWritten