Return-Path: Delivered-To: apmail-incubator-pig-commits-archive@locus.apache.org Received: (qmail 50501 invoked from network); 8 Jan 2008 16:21:11 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 8 Jan 2008 16:21:11 -0000 Received: (qmail 76922 invoked by uid 500); 8 Jan 2008 16:21:00 -0000 Delivered-To: apmail-incubator-pig-commits-archive@incubator.apache.org Received: (qmail 76909 invoked by uid 500); 8 Jan 2008 16:20:59 -0000 Mailing-List: contact pig-commits-help@incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: pig-dev@incubator.apache.org Delivered-To: mailing list pig-commits@incubator.apache.org Received: (qmail 76882 invoked by uid 99); 8 Jan 2008 16:20:59 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 08 Jan 2008 08:20:59 -0800 X-ASF-Spam-Status: No, hits=-100.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO eris.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 08 Jan 2008 16:20:42 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 53FC41A987E; Tue, 8 Jan 2008 08:20:47 -0800 (PST) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r610047 - in /incubator/pig/trunk/src/org/apache/pig/data: DataBag.java DefaultDataBag.java DistinctDataBag.java SortedDataBag.java Date: Tue, 08 Jan 2008 16:20:46 -0000 To: pig-commits@incubator.apache.org From: gates@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20080108162047.53FC41A987E@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: gates Date: Tue Jan 8 08:20:45 2008 New Revision: 610047 URL: http://svn.apache.org/viewvc?rev=610047&view=rev Log: PIG-30 Added hashCode() implementation to DataBag and added additional comments throughout DataBag and extenders implementation. Modified: incubator/pig/trunk/src/org/apache/pig/data/DataBag.java incubator/pig/trunk/src/org/apache/pig/data/DefaultDataBag.java incubator/pig/trunk/src/org/apache/pig/data/DistinctDataBag.java incubator/pig/trunk/src/org/apache/pig/data/SortedDataBag.java Modified: incubator/pig/trunk/src/org/apache/pig/data/DataBag.java URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/data/DataBag.java?rev=610047&r1=610046&r2=610047&view=diff ============================================================================== --- incubator/pig/trunk/src/org/apache/pig/data/DataBag.java (original) +++ incubator/pig/trunk/src/org/apache/pig/data/DataBag.java Tue Jan 8 08:20:45 2008 @@ -32,7 +32,38 @@ import org.apache.pig.impl.mapreduceExec.PigMapReduce; /** - * A collection of Tuples + * A collection of Tuples. A DataBag may or may not fit into memory. + * DataBag extends spillable, which means that it registers with a memory + * manager. By default, it attempts to keep all of its contents in memory. + * If it is asked by the memory manager to spill to disk (by a call to + * spill()), it takes whatever it has in memory, opens a spill file, and + * writes the contents out. This may happen multiple times. The bag + * tracks all of the files it's spilled to. + * + * DataBag provides an Iterator interface, that allows callers to read + * through the contents. The iterators are aware of the data spilling. + * They have to be able to handle reading from files, as well as the fact + * that data they were reading from memory may have been spilled to disk + * underneath them. + * + * The DataBag interface assumes that all data is written before any is + * read. That is, a DataBag cannot be used as a queue. If data is written + * after data is read, the results are undefined. This condition is not + * checked on each add or read, for reasons of speed. Caveat emptor. + * + * Since spills are asynchronous (the memory manager requesting a spill + * runs in a separate thread), all operations dealing with the mContents + * Collection (which is the collection of tuples contained in the bag) have + * to be synchronized. This means that reading from a DataBag is currently + * serialized. This is ok for the moment because pig execution is + * currently single threaded. A ReadWriteLock was experimented with, but + * it was found to be about 10x slower than using the synchronize keyword. + * If pig changes its execution model to be multithreaded, we may need to + * return to this issue, as synchronizing reads will most likely defeat the + * purpose of multi-threading execution. + * + * DataBag come in several types, default, sorted, and distinct. The type + * must be chosen up front, there is no way to convert a bag on the fly. */ public abstract class DataBag extends Datum implements Spillable { // Container that holds the tuples. Actual object instantiated by @@ -170,6 +201,10 @@ } } + /** + * This method is potentially very expensive since it may require a + * sort of the bag; don't call it unless you have to. + */ public int compareTo(Object other) { // Do we really need to be able to compare to DataAtom and Tuple? // When does that happen? @@ -182,9 +217,31 @@ else return -1; } - // Don't sort them, just go tuple by tuple. - Iterator thisIt = this.iterator(); - Iterator otherIt = bOther.iterator(); + // Ugh, this is bogus. But I have to know if two bags have the + // same tuples, regardless of order. Hopefully most of the + // time the size check above will prevent this. + // If either bag isn't already sorted, create a sorted bag out + // of it so I can guarantee order. + DataBag thisClone; + DataBag otherClone; + if (this instanceof SortedDataBag || + this instanceof DistinctDataBag) { + thisClone = this; + } else { + thisClone = new SortedDataBag(null); + Iterator i = iterator(); + while (i.hasNext()) thisClone.add(i.next()); + } + if (other instanceof SortedDataBag || + this instanceof DistinctDataBag) { + otherClone = bOther; + } else { + otherClone = new SortedDataBag(null); + Iterator i = bOther.iterator(); + while (i.hasNext()) otherClone.add(i.next()); + } + Iterator thisIt = thisClone.iterator(); + Iterator otherIt = otherClone.iterator(); while (thisIt.hasNext() && otherIt.hasNext()) { Tuple thisT = thisIt.next(); Tuple otherT = otherIt.next(); @@ -203,6 +260,7 @@ } } + @Override public boolean equals(Object other) { return compareTo(other) == 0; } @@ -270,6 +328,17 @@ } sb.append('}'); return sb.toString(); + } + + @Override + public int hashCode() { + int hash = 1; + Iterator i = iterator(); + while (i.hasNext()) { + // Use 37 because we want a prime, and tuple uses 31. + hash = 37 * hash + i.next().hashCode(); + } + return hash; } /** Modified: incubator/pig/trunk/src/org/apache/pig/data/DefaultDataBag.java URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/data/DefaultDataBag.java?rev=610047&r1=610046&r2=610047&view=diff ============================================================================== --- incubator/pig/trunk/src/org/apache/pig/data/DefaultDataBag.java (original) +++ incubator/pig/trunk/src/org/apache/pig/data/DefaultDataBag.java Tue Jan 8 08:20:45 2008 @@ -31,7 +31,9 @@ /** - * An unordered collection of Tuples (possibly) with multiples. + * An unordered collection of Tuples (possibly) with multiples. The tuples + * are stored in an ArrayList, since there is no concern for order or + * distinctness. */ public class DefaultDataBag extends DataBag { @@ -69,7 +71,7 @@ while (i.hasNext()) { i.next().write(out); spilled++; - // This will spill every 16383 records. + // This will report progress every 16383 records. if ((spilled & 0x3fff) == 0) reportProgress(); } out.flush(); Modified: incubator/pig/trunk/src/org/apache/pig/data/DistinctDataBag.java URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/data/DistinctDataBag.java?rev=610047&r1=610046&r2=610047&view=diff ============================================================================== --- incubator/pig/trunk/src/org/apache/pig/data/DistinctDataBag.java (original) +++ incubator/pig/trunk/src/org/apache/pig/data/DistinctDataBag.java Tue Jan 8 08:20:45 2008 @@ -222,6 +222,10 @@ // from memory that were already in the queue will be fine, // as they're guaranteed to be ahead of the point we fast // foward to. + // We're guaranteed that the file we want to read from for + // the fast forward is the last element in mSpillFiles, + // because we don't support calls to add() after calls to + // iterator(), and spill() won't create empty files. try { in = new DataInputStream(new BufferedInputStream( new FileInputStream(mSpillFiles.get( Modified: incubator/pig/trunk/src/org/apache/pig/data/SortedDataBag.java URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/data/SortedDataBag.java?rev=610047&r1=610046&r2=610047&view=diff ============================================================================== --- incubator/pig/trunk/src/org/apache/pig/data/SortedDataBag.java (original) +++ incubator/pig/trunk/src/org/apache/pig/data/SortedDataBag.java Tue Jan 8 08:20:45 2008 @@ -40,9 +40,13 @@ /** * An ordered collection of Tuples (possibly) with multiples. Data is - * stored unsorted as it comes in, and only sorted when it is time to dump + * stored unsorted in an ArrayList as it comes in, and only sorted when it + * is time to dump * it to a file or when the first iterator is requested. Experementation * found this to be the faster than storing it sorted to begin with. + * + * We allow a user defined comparator, but provide a default comparator in + * cases where the user doesn't specify one. */ public class SortedDataBag extends DataBag { private Comparator mComp; @@ -59,6 +63,11 @@ } + /** + * @param spec EvalSpec to use to do the sorting. spec.getComparator() + * will be called to populate our mComp field. If null, + * DefaultComparator will be used. + */ public SortedDataBag(EvalSpec spec) { if (spec == null) { mComp = new DefaultComparator(); @@ -97,8 +106,14 @@ DataOutputStream out = getSpillFile(); // Have to sort the data before we can dump it. It's bogus // that we have to do this under the lock, but there's no way - // around it. - Collections.sort((ArrayList)mContents, mComp); + // around it. If the reads alread started, then we've + // already sorted it. No reason to do it again. Don't + // set mReadStarted, because we could still be in the add + // phase, in which case more (unsorted) will be added + // later. + if (!mReadStarted) { + Collections.sort((ArrayList)mContents, mComp); + } Iterator i = mContents.iterator(); while (i.hasNext()) { i.next().write(out); @@ -130,6 +145,12 @@ */ private class SortedDataBagIterator implements Iterator { + /** + * A container to hold tuples in a priority queue. Stores the + * file number the tuple came from, so that when the tuple is read + * out of the queue, we know which file to read its replacement + * tuple from. + */ private class PQContainer implements Comparable { public Tuple tuple; public int fileNum; @@ -199,6 +220,10 @@ // from memory that were already in the queue will be fine, // as they're guaranteed to be ahead of the point we fast // foward to. + // We're guaranteed that the file we want to read from for + // the fast forward is the last element in mSpillFiles, + // because we don't support calls to add() after calls to + // iterator(), and spill() won't create empty files. try { in = new DataInputStream(new BufferedInputStream( new FileInputStream(mSpillFiles.get(