tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From acmur...@apache.org
Subject svn commit: r1457129 [11/38] - in /incubator/tez: ./ tez-ampool/ tez-ampool/src/ tez-ampool/src/main/ tez-ampool/src/main/bin/ tez-ampool/src/main/conf/ tez-ampool/src/main/java/ tez-ampool/src/main/java/org/ tez-ampool/src/main/java/org/apache/ tez-am...
Date Fri, 15 Mar 2013 21:26:48 GMT
Added: incubator/tez/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/dflt/DefaultSorter.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/dflt/DefaultSorter.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/dflt/DefaultSorter.java (added)
+++ incubator/tez/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/dflt/DefaultSorter.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,1133 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.engine.common.sort.impl.dflt;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.IntBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.util.IndexedSortable;
+import org.apache.hadoop.util.Progress;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.tez.api.Master;
+import org.apache.tez.common.TezJobConfig;
+import org.apache.tez.common.TezTask;
+import org.apache.tez.engine.common.ConfigUtils;
+import org.apache.tez.engine.common.sort.impl.ExternalSorter;
+import org.apache.tez.engine.common.sort.impl.IFile;
+import org.apache.tez.engine.common.sort.impl.TezIndexRecord;
+import org.apache.tez.engine.common.sort.impl.TezMerger;
+import org.apache.tez.engine.common.sort.impl.TezRawKeyValueIterator;
+import org.apache.tez.engine.common.sort.impl.TezSpillRecord;
+import org.apache.tez.engine.common.sort.impl.IFile.Writer;
+import org.apache.tez.engine.common.sort.impl.TezMerger.Segment;
+import org.apache.tez.records.TezTaskAttemptID;
+import org.apache.tez.records.OutputContext;
+
+import com.google.inject.Inject;
+import com.google.inject.assistedinject.Assisted;
+
+@SuppressWarnings({"unchecked", "rawtypes"})
+public class DefaultSorter extends ExternalSorter implements IndexedSortable {
+  
+  private static final Log LOG = LogFactory.getLog(DefaultSorter.class);
+  
+  /**
+   * The size of each record in the index file for the map-outputs.
+   */
+  public static final int MAP_OUTPUT_INDEX_RECORD_LENGTH = 24;
+
+  private final static int APPROX_HEADER_LENGTH = 150;
+  
+  // k/v accounting
+  IntBuffer kvmeta; // metadata overlay on backing store
+  int kvstart;            // marks origin of spill metadata
+  int kvend;              // marks end of spill metadata
+  int kvindex;            // marks end of fully serialized records
+
+  int equator;            // marks origin of meta/serialization
+  int bufstart;           // marks beginning of spill
+  int bufend;             // marks beginning of collectable
+  int bufmark;            // marks end of record
+  int bufindex;           // marks end of collected
+  int bufvoid;            // marks the point where we should stop
+                          // reading at the end of the buffer
+
+  byte[] kvbuffer;        // main output buffer
+  private final byte[] b0 = new byte[0];
+
+  protected static final int INDEX = 0;            // index offset in acct
+  protected static final int VALSTART = 1;         // val offset in acct
+  protected static final int KEYSTART = 2;         // key offset in acct
+  protected static final int PARTITION = 3;        // partition offset in acct
+  protected static final int NMETA = 4;            // num meta ints
+  protected static final int METASIZE = NMETA * 4; // size in bytes
+
+  // spill accounting
+  int maxRec;
+  int softLimit;
+  boolean spillInProgress;
+  int bufferRemaining;
+  volatile Throwable sortSpillException = null;
+
+  int numSpills = 0;
+  int minSpillsForCombine;
+  final ReentrantLock spillLock = new ReentrantLock();
+  final Condition spillDone = spillLock.newCondition();
+  final Condition spillReady = spillLock.newCondition();
+  final BlockingBuffer bb = new BlockingBuffer();
+  volatile boolean spillThreadRunning = false;
+  final SpillThread spillThread = new SpillThread();
+
+  final ArrayList<TezSpillRecord> indexCacheList =
+    new ArrayList<TezSpillRecord>();
+  private int totalIndexCacheMemory;
+  private int indexCacheMemoryLimit;
+
+  @Inject
+  public DefaultSorter(
+      @Assisted TezTask task
+      ) throws IOException {
+  }
+
+  @Override
+  public void initialize(Configuration conf, Master master) 
+      throws IOException, InterruptedException {
+    if (task == null) {
+      LOG.info("Bailing!", new IOException());
+      return;
+    }
+    
+    super.initialize(conf, master);
+    
+    // sanity checks
+    final float spillper = job.getFloat(
+        TezJobConfig.TEZ_ENGINE_SORT_SPILL_PERCENT,
+        TezJobConfig.DEFAULT_TEZ_ENGINE_SORT_SPILL_PERCENT);
+    final int sortmb = job.getInt(TezJobConfig.TEZ_ENGINE_IO_SORT_MB,
+        TezJobConfig.DEFAULT_TEZ_ENGINE_IO_SORT_MB);
+    if (spillper > (float) 1.0 || spillper <= (float) 0.0) {
+      throw new IOException("Invalid \""
+          + TezJobConfig.TEZ_ENGINE_SORT_SPILL_PERCENT + "\": " + spillper);
+    }
+    if ((sortmb & 0x7FF) != sortmb) {
+      throw new IOException("Invalid \"" + TezJobConfig.TEZ_ENGINE_IO_SORT_MB
+          + "\": " + sortmb);
+    }
+
+    indexCacheMemoryLimit = job.getInt(TezJobConfig.TEZ_ENGINE_INDEX_CACHE_MEMORY_LIMIT_BYTES,
+                                       TezJobConfig.DEFAULT_TEZ_ENGINE_INDEX_CACHE_MEMORY_LIMIT_BYTES);
+    
+    // buffers and accounting
+    int maxMemUsage = sortmb << 20;
+    maxMemUsage -= maxMemUsage % METASIZE;
+    kvbuffer = new byte[maxMemUsage];
+    bufvoid = kvbuffer.length;
+    kvmeta = ByteBuffer.wrap(kvbuffer)
+       .order(ByteOrder.nativeOrder())
+       .asIntBuffer();
+    setEquator(0);
+    bufstart = bufend = bufindex = equator;
+    kvstart = kvend = kvindex;
+
+    maxRec = kvmeta.capacity() / NMETA;
+    softLimit = (int)(kvbuffer.length * spillper);
+    bufferRemaining = softLimit;
+    if (LOG.isInfoEnabled()) {
+      LOG.info(TezJobConfig.TEZ_ENGINE_IO_SORT_MB + ": " + sortmb);
+      LOG.info("soft limit at " + softLimit);
+      LOG.info("bufstart = " + bufstart + "; bufvoid = " + bufvoid);
+      LOG.info("kvstart = " + kvstart + "; length = " + maxRec);
+    }
+
+    // k/v serialization
+    valSerializer.open(bb);
+    keySerializer.open(bb);   
+    
+    spillInProgress = false;
+    minSpillsForCombine = job.getInt(TezJobConfig.TEZ_ENGINE_COMBINE_MIN_SPILLS, 3);
+    spillThread.setDaemon(true);
+    spillThread.setName("SpillThread");
+    spillLock.lock();
+    try {
+      spillThread.start();
+      while (!spillThreadRunning) {
+        spillDone.await();
+      }
+    } catch (InterruptedException e) {
+      throw new IOException("Spill thread failed to initialize", e);
+    } finally {
+      spillLock.unlock();
+    }
+    if (sortSpillException != null) {
+      throw new IOException("Spill thread failed to initialize",
+          sortSpillException);
+    }
+  }
+
+  @Override
+  public void write(Object key, Object value) 
+      throws IOException, InterruptedException {
+    collect(
+        key, value, partitioner.getPartition(key, value, partitions));
+  }
+
+  /**
+   * Serialize the key, value to intermediate storage.
+   * When this method returns, kvindex must refer to sufficient unused
+   * storage to store one METADATA.
+   */
+  synchronized void collect(Object key, Object value, final int partition
+                                   ) throws IOException {
+    task.getTaskReporter().progress();
+    if (key.getClass() != keyClass) {
+      throw new IOException("Type mismatch in key from map: expected "
+                            + keyClass.getName() + ", received "
+                            + key.getClass().getName());
+    }
+    if (value.getClass() != valClass) {
+      throw new IOException("Type mismatch in value from map: expected "
+                            + valClass.getName() + ", received "
+                            + value.getClass().getName());
+    }
+    if (partition < 0 || partition >= partitions) {
+      throw new IOException("Illegal partition for " + key + " (" +
+          partition + ")");
+    }
+    checkSpillException();
+    bufferRemaining -= METASIZE;
+    if (bufferRemaining <= 0) {
+      // start spill if the thread is not running and the soft limit has been
+      // reached
+      spillLock.lock();
+      try {
+        do {
+          if (!spillInProgress) {
+            final int kvbidx = 4 * kvindex;
+            final int kvbend = 4 * kvend;
+            // serialized, unspilled bytes always lie between kvindex and
+            // bufindex, crossing the equator. Note that any void space
+            // created by a reset must be included in "used" bytes
+            final int bUsed = distanceTo(kvbidx, bufindex);
+            final boolean bufsoftlimit = bUsed >= softLimit;
+            if ((kvbend + METASIZE) % kvbuffer.length !=
+                equator - (equator % METASIZE)) {
+              // spill finished, reclaim space
+              resetSpill();
+              bufferRemaining = Math.min(
+                  distanceTo(bufindex, kvbidx) - 2 * METASIZE,
+                  softLimit - bUsed) - METASIZE;
+              continue;
+            } else if (bufsoftlimit && kvindex != kvend) {
+              // spill records, if any collected; check latter, as it may
+              // be possible for metadata alignment to hit spill pcnt
+              startSpill();
+              final int avgRec = (int)
+                (mapOutputByteCounter.getValue() /
+                mapOutputRecordCounter.getValue());
+              // leave at least half the split buffer for serialization data
+              // ensure that kvindex >= bufindex
+              final int distkvi = distanceTo(bufindex, kvbidx);
+              final int newPos = (bufindex +
+                Math.max(2 * METASIZE - 1,
+                        Math.min(distkvi / 2,
+                                 distkvi / (METASIZE + avgRec) * METASIZE)))
+                % kvbuffer.length;
+              setEquator(newPos);
+              bufmark = bufindex = newPos;
+              final int serBound = 4 * kvend;
+              // bytes remaining before the lock must be held and limits
+              // checked is the minimum of three arcs: the metadata space, the
+              // serialization space, and the soft limit
+              bufferRemaining = Math.min(
+                  // metadata max
+                  distanceTo(bufend, newPos),
+                  Math.min(
+                    // serialization max
+                    distanceTo(newPos, serBound),
+                    // soft limit
+                    softLimit)) - 2 * METASIZE;
+            }
+          }
+        } while (false);
+      } finally {
+        spillLock.unlock();
+      }
+    }
+
+    try {
+      // serialize key bytes into buffer
+      int keystart = bufindex;
+      keySerializer.serialize(key);
+      if (bufindex < keystart) {
+        // wrapped the key; must make contiguous
+        bb.shiftBufferedKey();
+        keystart = 0;
+      }
+      // serialize value bytes into buffer
+      final int valstart = bufindex;
+      valSerializer.serialize(value);
+      // It's possible for records to have zero length, i.e. the serializer
+      // will perform no writes. To ensure that the boundary conditions are
+      // checked and that the kvindex invariant is maintained, perform a
+      // zero-length write into the buffer. The logic monitoring this could be
+      // moved into collect, but this is cleaner and inexpensive. For now, it
+      // is acceptable.
+      bb.write(b0, 0, 0);
+
+      // the record must be marked after the preceding write, as the metadata
+      // for this record are not yet written
+      int valend = bb.markRecord();
+
+      mapOutputRecordCounter.increment(1);
+      mapOutputByteCounter.increment(
+          distanceTo(keystart, valend, bufvoid));
+
+      // write accounting info
+      kvmeta.put(kvindex + INDEX, kvindex);
+      kvmeta.put(kvindex + PARTITION, partition);
+      kvmeta.put(kvindex + KEYSTART, keystart);
+      kvmeta.put(kvindex + VALSTART, valstart);
+      // advance kvindex
+      kvindex = (kvindex - NMETA + kvmeta.capacity()) % kvmeta.capacity();
+    } catch (MapBufferTooSmallException e) {
+      LOG.info("Record too large for in-memory buffer: " + e.getMessage());
+      spillSingleRecord(key, value, partition);
+      mapOutputRecordCounter.increment(1);
+      return;
+    }
+  }
+
+  /**
+   * Set the point from which meta and serialization data expand. The meta
+   * indices are aligned with the buffer, so metadata never spans the ends of
+   * the circular buffer.
+   */
+  private void setEquator(int pos) {
+    equator = pos;
+    // set index prior to first entry, aligned at meta boundary
+    final int aligned = pos - (pos % METASIZE);
+    kvindex =
+      ((aligned - METASIZE + kvbuffer.length) % kvbuffer.length) / 4;
+    if (LOG.isInfoEnabled()) {
+      LOG.info("(EQUATOR) " + pos + " kvi " + kvindex +
+          "(" + (kvindex * 4) + ")");
+    }
+  }
+
+  /**
+   * The spill is complete, so set the buffer and meta indices to be equal to
+   * the new equator to free space for continuing collection. Note that when
+   * kvindex == kvend == kvstart, the buffer is empty.
+   */
+  private void resetSpill() {
+    final int e = equator;
+    bufstart = bufend = e;
+    final int aligned = e - (e % METASIZE);
+    // set start/end to point to first meta record
+    kvstart = kvend =
+      ((aligned - METASIZE + kvbuffer.length) % kvbuffer.length) / 4;
+    if (LOG.isInfoEnabled()) {
+      LOG.info("(RESET) equator " + e + " kv " + kvstart + "(" +
+        (kvstart * 4) + ")" + " kvi " + kvindex + "(" + (kvindex * 4) + ")");
+    }
+  }
+
+  /**
+   * Compute the distance in bytes between two indices in the serialization
+   * buffer.
+   * @see #distanceTo(int,int,int)
+   */
+  final int distanceTo(final int i, final int j) {
+    return distanceTo(i, j, kvbuffer.length);
+  }
+
+  /**
+   * Compute the distance between two indices in the circular buffer given the
+   * max distance.
+   */
+  int distanceTo(final int i, final int j, final int mod) {
+    return i <= j
+      ? j - i
+      : mod - i + j;
+  }
+
+  /**
+   * For the given meta position, return the dereferenced position in the
+   * integer array. Each meta block contains several integers describing
+   * record data in its serialized form, but the INDEX is not necessarily
+   * related to the proximate metadata. The index value at the referenced int
+   * position is the start offset of the associated metadata block. So the
+   * metadata INDEX at metapos may point to the metadata described by the
+   * metadata block at metapos + k, which contains information about that
+   * serialized record.
+   */
+  int offsetFor(int metapos) {
+    return kvmeta.get((metapos % maxRec) * NMETA + INDEX);
+  }
+
+  /**
+   * Compare logical range, st i, j MOD offset capacity.
+   * Compare by partition, then by key.
+   * @see IndexedSortable#compare
+   */
+  public int compare(final int mi, final int mj) {
+    final int kvi = offsetFor(mi);
+    final int kvj = offsetFor(mj);
+    final int kvip = kvmeta.get(kvi + PARTITION);
+    final int kvjp = kvmeta.get(kvj + PARTITION);
+    // sort by partition
+    if (kvip != kvjp) {
+      return kvip - kvjp;
+    }
+    // sort by key
+    return comparator.compare(kvbuffer,
+        kvmeta.get(kvi + KEYSTART),
+        kvmeta.get(kvi + VALSTART) - kvmeta.get(kvi + KEYSTART),
+        kvbuffer,
+        kvmeta.get(kvj + KEYSTART),
+        kvmeta.get(kvj + VALSTART) - kvmeta.get(kvj + KEYSTART));
+  }
+
+  /**
+   * Swap logical indices st i, j MOD offset capacity.
+   * @see IndexedSortable#swap
+   */
+  public void swap(final int mi, final int mj) {
+    final int kvi = (mi % maxRec) * NMETA + INDEX;
+    final int kvj = (mj % maxRec) * NMETA + INDEX;
+    int tmp = kvmeta.get(kvi);
+    kvmeta.put(kvi, kvmeta.get(kvj));
+    kvmeta.put(kvj, tmp);
+  }
+
+  /**
+   * Inner class managing the spill of serialized records to disk.
+   */
+  protected class BlockingBuffer extends DataOutputStream {
+
+    public BlockingBuffer() {
+      super(new Buffer());
+    }
+
+    /**
+     * Mark end of record. Note that this is required if the buffer is to
+     * cut the spill in the proper place.
+     */
+    public int markRecord() {
+      bufmark = bufindex;
+      return bufindex;
+    }
+
+    /**
+     * Set position from last mark to end of writable buffer, then rewrite
+     * the data between last mark and kvindex.
+     * This handles a special case where the key wraps around the buffer.
+     * If the key is to be passed to a RawComparator, then it must be
+     * contiguous in the buffer. This recopies the data in the buffer back
+     * into itself, but starting at the beginning of the buffer. Note that
+     * this method should <b>only</b> be called immediately after detecting
+     * this condition. To call it at any other time is undefined and would
+     * likely result in data loss or corruption.
+     * @see #markRecord()
+     */
+    protected void shiftBufferedKey() throws IOException {
+      // spillLock unnecessary; both kvend and kvindex are current
+      int headbytelen = bufvoid - bufmark;
+      bufvoid = bufmark;
+      final int kvbidx = 4 * kvindex;
+      final int kvbend = 4 * kvend;
+      final int avail =
+        Math.min(distanceTo(0, kvbidx), distanceTo(0, kvbend));
+      if (bufindex + headbytelen < avail) {
+        System.arraycopy(kvbuffer, 0, kvbuffer, headbytelen, bufindex);
+        System.arraycopy(kvbuffer, bufvoid, kvbuffer, 0, headbytelen);
+        bufindex += headbytelen;
+        bufferRemaining -= kvbuffer.length - bufvoid;
+      } else {
+        byte[] keytmp = new byte[bufindex];
+        System.arraycopy(kvbuffer, 0, keytmp, 0, bufindex);
+        bufindex = 0;
+        out.write(kvbuffer, bufmark, headbytelen);
+        out.write(keytmp);
+      }
+    }
+  }
+
+  public class Buffer extends OutputStream {
+    private final byte[] scratch = new byte[1];
+
+    @Override
+    public void write(int v)
+        throws IOException {
+      scratch[0] = (byte)v;
+      write(scratch, 0, 1);
+    }
+
+    /**
+     * Attempt to write a sequence of bytes to the collection buffer.
+     * This method will block if the spill thread is running and it
+     * cannot write.
+     * @throws MapBufferTooSmallException if record is too large to
+     *    deserialize into the collection buffer.
+     */
+    @Override
+    public void write(byte b[], int off, int len)
+        throws IOException {
+      // must always verify the invariant that at least METASIZE bytes are
+      // available beyond kvindex, even when len == 0
+      bufferRemaining -= len;
+      if (bufferRemaining <= 0) {
+        // writing these bytes could exhaust available buffer space or fill
+        // the buffer to soft limit. check if spill or blocking are necessary
+        boolean blockwrite = false;
+        spillLock.lock();
+        try {
+          do {
+            checkSpillException();
+
+            final int kvbidx = 4 * kvindex;
+            final int kvbend = 4 * kvend;
+            // ser distance to key index
+            final int distkvi = distanceTo(bufindex, kvbidx);
+            // ser distance to spill end index
+            final int distkve = distanceTo(bufindex, kvbend);
+
+            // if kvindex is closer than kvend, then a spill is neither in
+            // progress nor complete and reset since the lock was held. The
+            // write should block only if there is insufficient space to
+            // complete the current write, write the metadata for this record,
+            // and write the metadata for the next record. If kvend is closer,
+            // then the write should block if there is too little space for
+            // either the metadata or the current write. Note that collect
+            // ensures its metadata requirement with a zero-length write
+            blockwrite = distkvi <= distkve
+              ? distkvi <= len + 2 * METASIZE
+              : distkve <= len || distanceTo(bufend, kvbidx) < 2 * METASIZE;
+
+            if (!spillInProgress) {
+              if (blockwrite) {
+                if ((kvbend + METASIZE) % kvbuffer.length !=
+                    equator - (equator % METASIZE)) {
+                  // spill finished, reclaim space
+                  // need to use meta exclusively; zero-len rec & 100% spill
+                  // pcnt would fail
+                  resetSpill(); // resetSpill doesn't move bufindex, kvindex
+                  bufferRemaining = Math.min(
+                      distkvi - 2 * METASIZE,
+                      softLimit - distanceTo(kvbidx, bufindex)) - len;
+                  continue;
+                }
+                // we have records we can spill; only spill if blocked
+                if (kvindex != kvend) {
+                  startSpill();
+                  // Blocked on this write, waiting for the spill just
+                  // initiated to finish. Instead of repositioning the marker
+                  // and copying the partial record, we set the record start
+                  // to be the new equator
+                  setEquator(bufmark);
+                } else {
+                  // We have no buffered records, and this record is too large
+                  // to write into kvbuffer. We must spill it directly from
+                  // collect
+                  final int size = distanceTo(bufstart, bufindex) + len;
+                  setEquator(0);
+                  bufstart = bufend = bufindex = equator;
+                  kvstart = kvend = kvindex;
+                  bufvoid = kvbuffer.length;
+                  throw new MapBufferTooSmallException(size + " bytes");
+                }
+              }
+            }
+
+            if (blockwrite) {
+              // wait for spill
+              try {
+                while (spillInProgress) {
+                  task.getTaskReporter().progress();
+                  spillDone.await();
+                }
+              } catch (InterruptedException e) {
+                  throw new IOException(
+                      "Buffer interrupted while waiting for the writer", e);
+              }
+            }
+          } while (blockwrite);
+        } finally {
+          spillLock.unlock();
+        }
+      }
+      // here, we know that we have sufficient space to write
+      if (bufindex + len > bufvoid) {
+        final int gaplen = bufvoid - bufindex;
+        System.arraycopy(b, off, kvbuffer, bufindex, gaplen);
+        len -= gaplen;
+        off += gaplen;
+        bufindex = 0;
+      }
+      System.arraycopy(b, off, kvbuffer, bufindex, len);
+      bufindex += len;
+    }
+  }
+
+  @Override
+  public void flush() throws IOException, InterruptedException {
+    LOG.info("Starting flush of map output");
+    spillLock.lock();
+    try {
+      while (spillInProgress) {
+        task.getTaskReporter().progress();
+        spillDone.await();
+      }
+      checkSpillException();
+
+      final int kvbend = 4 * kvend;
+      if ((kvbend + METASIZE) % kvbuffer.length !=
+          equator - (equator % METASIZE)) {
+        // spill finished
+        resetSpill();
+      }
+      if (kvindex != kvend) {
+        kvend = (kvindex + NMETA) % kvmeta.capacity();
+        bufend = bufmark;
+        if (LOG.isInfoEnabled()) {
+          LOG.info("Sorting & Spilling map output");
+          LOG.info("bufstart = " + bufstart + "; bufend = " + bufmark +
+                   "; bufvoid = " + bufvoid);
+          LOG.info("kvstart = " + kvstart + "(" + (kvstart * 4) +
+                   "); kvend = " + kvend + "(" + (kvend * 4) +
+                   "); length = " + (distanceTo(kvend, kvstart,
+                         kvmeta.capacity()) + 1) + "/" + maxRec);
+        }
+        sortAndSpill();
+      }
+    } catch (InterruptedException e) {
+      throw new IOException("Interrupted while waiting for the writer", e);
+    } finally {
+      spillLock.unlock();
+    }
+    assert !spillLock.isHeldByCurrentThread();
+    // shut down spill thread and wait for it to exit. Since the preceding
+    // ensures that it is finished with its work (and sortAndSpill did not
+    // throw), we elect to use an interrupt instead of setting a flag.
+    // Spilling simultaneously from this thread while the spill thread
+    // finishes its work might be both a useful way to extend this and also
+    // sufficient motivation for the latter approach.
+    try {
+      spillThread.interrupt();
+      spillThread.join();
+    } catch (InterruptedException e) {
+      throw new IOException("Spill failed", e);
+    }
+    // release sort buffer before the merge
+    //FIXME
+    //kvbuffer = null;
+    mergeParts();
+    Path outputPath = mapOutputFile.getOutputFile();
+    fileOutputByteCounter.increment(rfs.getFileStatus(outputPath).getLen());
+  }
+
+  @Override
+  public void close() throws IOException, InterruptedException { }
+
+  protected class SpillThread extends Thread {
+
+    @Override
+    public void run() {
+      spillLock.lock();
+      spillThreadRunning = true;
+      try {
+        while (true) {
+          spillDone.signal();
+          while (!spillInProgress) {
+            spillReady.await();
+          }
+          try {
+            spillLock.unlock();
+            sortAndSpill();
+          } catch (Throwable t) {
+            sortSpillException = t;
+          } finally {
+            spillLock.lock();
+            if (bufend < bufstart) {
+              bufvoid = kvbuffer.length;
+            }
+            kvstart = kvend;
+            bufstart = bufend;
+            spillInProgress = false;
+          }
+        }
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+      } finally {
+        spillLock.unlock();
+        spillThreadRunning = false;
+      }
+    }
+  }
+
+  private void checkSpillException() throws IOException {
+    final Throwable lspillException = sortSpillException;
+    if (lspillException != null) {
+      if (lspillException instanceof Error) {
+        final String logMsg = "Task " + task.getTaskAttemptId() + " failed : " +
+          StringUtils.stringifyException(lspillException);
+        task.getTaskReporter().reportFatalError(
+            task.getTaskAttemptId(), lspillException, logMsg);
+      }
+      throw new IOException("Spill failed", lspillException);
+    }
+  }
+
+  private void startSpill() {
+    assert !spillInProgress;
+    kvend = (kvindex + NMETA) % kvmeta.capacity();
+    bufend = bufmark;
+    spillInProgress = true;
+    if (LOG.isInfoEnabled()) {
+      LOG.info("Spilling map output");
+      LOG.info("bufstart = " + bufstart + "; bufend = " + bufmark +
+               "; bufvoid = " + bufvoid);
+      LOG.info("kvstart = " + kvstart + "(" + (kvstart * 4) +
+               "); kvend = " + kvend + "(" + (kvend * 4) +
+               "); length = " + (distanceTo(kvend, kvstart,
+                     kvmeta.capacity()) + 1) + "/" + maxRec);
+    }
+    spillReady.signal();
+  }
+
+  int getMetaStart() {
+    return kvend / NMETA;
+  }
+  
+  int getMetaEnd() {
+    return 1 + // kvend is a valid record
+        (kvstart >= kvend
+        ? kvstart
+        : kvmeta.capacity() + kvstart) / NMETA;
+  }
+  
+  protected void sortAndSpill() 
+      throws IOException, InterruptedException {
+    final int mstart = getMetaStart();
+    final int mend = getMetaEnd();
+    sorter.sort(this, mstart, mend, task.getTaskReporter());
+    spill(mstart, mend); 
+  }
+  
+  protected void spill(int mstart, int mend) 
+      throws IOException, InterruptedException {
+
+    //approximate the length of the output file to be the length of the
+    //buffer + header lengths for the partitions
+    final long size = (bufend >= bufstart
+        ? bufend - bufstart
+        : (bufvoid - bufend) + bufstart) +
+                partitions * APPROX_HEADER_LENGTH;
+    FSDataOutputStream out = null;
+    try {
+      // create spill file
+      final TezSpillRecord spillRec = new TezSpillRecord(partitions);
+      final Path filename =
+          mapOutputFile.getSpillFileForWrite(numSpills, size);
+      out = rfs.create(filename);
+
+      int spindex = mstart;
+      final InMemValBytes value = createInMemValBytes();
+      for (int i = 0; i < partitions; ++i) {
+        IFile.Writer writer = null;
+        try {
+          long segmentStart = out.getPos();
+          writer = new Writer(job, out, keyClass, valClass, codec,
+                                    spilledRecordsCounter);
+          if (combineProcessor == null) {
+            // spill directly
+            DataInputBuffer key = new DataInputBuffer();
+            while (spindex < mend &&
+                kvmeta.get(offsetFor(spindex) + PARTITION) == i) {
+              final int kvoff = offsetFor(spindex);
+              key.reset(
+                  kvbuffer, 
+                  kvmeta.get(kvoff + KEYSTART),
+                  (kvmeta.get(kvoff + VALSTART) - kvmeta.get(kvoff + KEYSTART))
+                  );
+              getVBytesForOffset(kvoff, value);
+              writer.append(key, value);
+              ++spindex;
+            }
+          } else {
+            int spstart = spindex;
+            while (spindex < mend &&
+                kvmeta.get(offsetFor(spindex)
+                          + PARTITION) == i) {
+              ++spindex;
+            }
+            // Note: we would like to avoid the combiner if we've fewer
+            // than some threshold of records for a partition
+            if (spstart != spindex) {
+              TezRawKeyValueIterator kvIter =
+                new MRResultIterator(spstart, spindex);
+              runCombineProcessor(kvIter, writer);
+            }
+          }
+
+          // close the writer
+          writer.close();
+
+          // record offsets
+          final TezIndexRecord rec = 
+              new TezIndexRecord(
+                  segmentStart, 
+                  writer.getRawLength(), 
+                  writer.getCompressedLength());
+          spillRec.putIndex(rec, i);
+
+          writer = null;
+        } finally {
+          if (null != writer) writer.close();
+        }
+      }
+
+      if (totalIndexCacheMemory >= indexCacheMemoryLimit) {
+        // create spill index file
+        Path indexFilename =
+            mapOutputFile.getSpillIndexFileForWrite(numSpills, partitions
+                * MAP_OUTPUT_INDEX_RECORD_LENGTH);
+        spillRec.writeToFile(indexFilename, job);
+      } else {
+        indexCacheList.add(spillRec);
+        totalIndexCacheMemory +=
+          spillRec.size() * MAP_OUTPUT_INDEX_RECORD_LENGTH;
+      }
+      LOG.info("Finished spill " + numSpills);
+      ++numSpills;
+    } finally {
+      if (out != null) out.close();
+    }
+  }
+
+  /**
+   * Handles the degenerate case where serialization fails to fit in
+   * the in-memory buffer, so we must spill the record from collect
+   * directly to a spill file. Consider this "losing".
+   */
+  private void spillSingleRecord(final Object key, final Object value,
+                                 int partition) throws IOException {
+    long size = kvbuffer.length + partitions * APPROX_HEADER_LENGTH;
+    FSDataOutputStream out = null;
+    try {
+      // create spill file
+      final TezSpillRecord spillRec = new TezSpillRecord(partitions);
+      final Path filename =
+          mapOutputFile.getSpillFileForWrite(numSpills, size);
+      out = rfs.create(filename);
+
+      // we don't run the combiner for a single record
+      for (int i = 0; i < partitions; ++i) {
+        IFile.Writer writer = null;
+        try {
+          long segmentStart = out.getPos();
+          // Create a new codec, don't care!
+          writer = new IFile.Writer(job, out, keyClass, valClass, codec,
+                                          spilledRecordsCounter);
+
+          if (i == partition) {
+            final long recordStart = out.getPos();
+            writer.append(key, value);
+            // Note that our map byte count will not be accurate with
+            // compression
+            mapOutputByteCounter.increment(out.getPos() - recordStart);
+          }
+          writer.close();
+
+          // record offsets
+          TezIndexRecord rec = 
+              new TezIndexRecord(
+                  segmentStart, 
+                  writer.getRawLength(), 
+                  writer.getCompressedLength());
+          spillRec.putIndex(rec, i);
+
+          writer = null;
+        } catch (IOException e) {
+          if (null != writer) writer.close();
+          throw e;
+        }
+      }
+      if (totalIndexCacheMemory >= indexCacheMemoryLimit) {
+        // create spill index file
+        Path indexFilename =
+            mapOutputFile.getSpillIndexFileForWrite(numSpills, partitions
+                * MAP_OUTPUT_INDEX_RECORD_LENGTH);
+        spillRec.writeToFile(indexFilename, job);
+      } else {
+        indexCacheList.add(spillRec);
+        totalIndexCacheMemory +=
+          spillRec.size() * MAP_OUTPUT_INDEX_RECORD_LENGTH;
+      }
+      ++numSpills;
+    } finally {
+      if (out != null) out.close();
+    }
+  }
+
+  protected int getInMemVBytesLength(int kvoff) {
+    // get the keystart for the next serialized value to be the end
+    // of this value. If this is the last value in the buffer, use bufend
+    final int nextindex = kvoff == kvend
+      ? bufend
+      : kvmeta.get(
+          (kvoff - NMETA + kvmeta.capacity() + KEYSTART) % kvmeta.capacity());
+    // calculate the length of the value
+    int vallen = (nextindex >= kvmeta.get(kvoff + VALSTART))
+      ? nextindex - kvmeta.get(kvoff + VALSTART)
+      : (bufvoid - kvmeta.get(kvoff + VALSTART)) + nextindex;
+      return vallen;
+  }
+  
+  /**
+   * Given an offset, populate vbytes with the associated set of
+   * deserialized value bytes. Should only be called during a spill.
+   */
+  int getVBytesForOffset(int kvoff, InMemValBytes vbytes) {
+    int vallen = getInMemVBytesLength(kvoff);
+    vbytes.reset(kvbuffer, kvmeta.get(kvoff + VALSTART), vallen);
+    return vallen;
+  }
+
+  /**
+   * Inner class wrapping valuebytes, used for appendRaw.
+   */
+  static class InMemValBytes extends DataInputBuffer {
+    private byte[] buffer;
+    private int start;
+    private int length;
+    private final int bufvoid;
+
+    public InMemValBytes(int bufvoid) {
+      this.bufvoid = bufvoid;
+    }
+    
+    public void reset(byte[] buffer, int start, int length) {
+      this.buffer = buffer;
+      this.start = start;
+      this.length = length;
+
+      if (start + length > bufvoid) {
+        this.buffer = new byte[this.length];
+        final int taillen = bufvoid - start;
+        System.arraycopy(buffer, start, this.buffer, 0, taillen);
+        System.arraycopy(buffer, 0, this.buffer, taillen, length-taillen);
+        this.start = 0;
+      }
+
+      super.reset(this.buffer, this.start, this.length);
+    }
+  }
+
+  InMemValBytes createInMemValBytes() {
+    return new InMemValBytes(bufvoid);
+  }
+  
+  protected class MRResultIterator implements TezRawKeyValueIterator {
+    private final DataInputBuffer keybuf = new DataInputBuffer();
+    private final InMemValBytes vbytes = createInMemValBytes();
+    private final int end;
+    private int current;
+    public MRResultIterator(int start, int end) {
+      this.end = end;
+      current = start - 1;
+    }
+    public boolean next() throws IOException {
+      return ++current < end;
+    }
+    public DataInputBuffer getKey() throws IOException {
+      final int kvoff = offsetFor(current);
+      keybuf.reset(kvbuffer, kvmeta.get(kvoff + KEYSTART),
+          kvmeta.get(kvoff + VALSTART) - kvmeta.get(kvoff + KEYSTART));
+      return keybuf;
+    }
+    public DataInputBuffer getValue() throws IOException {
+      getVBytesForOffset(offsetFor(current), vbytes);
+      return vbytes;
+    }
+    public Progress getProgress() {
+      return null;
+    }
+    public void close() { }
+  }
+
+  private void mergeParts() throws IOException, InterruptedException {
+    // get the approximate size of the final output/index files
+    long finalOutFileSize = 0;
+    long finalIndexFileSize = 0;
+    final Path[] filename = new Path[numSpills];
+    final TezTaskAttemptID mapId = task.getTaskAttemptId();
+
+    for(int i = 0; i < numSpills; i++) {
+      filename[i] = mapOutputFile.getSpillFile(i);
+      finalOutFileSize += rfs.getFileStatus(filename[i]).getLen();
+    }
+    if (numSpills == 1) { //the spill is the final output
+      sameVolRename(filename[0],
+          mapOutputFile.getOutputFileForWriteInVolume(filename[0]));
+      if (indexCacheList.size() == 0) {
+        sameVolRename(mapOutputFile.getSpillIndexFile(0),
+          mapOutputFile.getOutputIndexFileForWriteInVolume(filename[0]));
+      } else {
+        indexCacheList.get(0).writeToFile(
+          mapOutputFile.getOutputIndexFileForWriteInVolume(filename[0]), job);
+      }
+      sortPhase.complete();
+      return;
+    }
+
+    // read in paged indices
+    for (int i = indexCacheList.size(); i < numSpills; ++i) {
+      Path indexFileName = mapOutputFile.getSpillIndexFile(i);
+      indexCacheList.add(new TezSpillRecord(indexFileName, job));
+    }
+
+    //make correction in the length to include the sequence file header
+    //lengths for each partition
+    finalOutFileSize += partitions * APPROX_HEADER_LENGTH;
+    finalIndexFileSize = partitions * MAP_OUTPUT_INDEX_RECORD_LENGTH;
+    Path finalOutputFile =
+        mapOutputFile.getOutputFileForWrite(finalOutFileSize);
+    Path finalIndexFile =
+        mapOutputFile.getOutputIndexFileForWrite(finalIndexFileSize);
+
+    //The output stream for the final single output file
+    FSDataOutputStream finalOut = rfs.create(finalOutputFile, true, 4096);
+
+    if (numSpills == 0) {
+      //create dummy files
+      
+      TezSpillRecord sr = new TezSpillRecord(partitions);
+      try {
+        for (int i = 0; i < partitions; i++) {
+          long segmentStart = finalOut.getPos();
+          Writer writer =
+            new Writer(job, finalOut, keyClass, valClass, codec, null);
+          writer.close();
+          
+          TezIndexRecord rec = 
+              new TezIndexRecord(
+                  segmentStart, 
+                  writer.getRawLength(), 
+                  writer.getCompressedLength());
+          sr.putIndex(rec, i);
+        }
+        sr.writeToFile(finalIndexFile, job);
+      } finally {
+        finalOut.close();
+      }
+      sortPhase.complete();
+      return;
+    }
+    {
+      sortPhase.addPhases(partitions); // Divide sort phase into sub-phases
+      TezMerger.considerFinalMergeForProgress();
+      
+      final TezSpillRecord spillRec = new TezSpillRecord(partitions);
+      for (int parts = 0; parts < partitions; parts++) {
+        //create the segments to be merged
+        List<Segment> segmentList =
+          new ArrayList<Segment>(numSpills);
+        for(int i = 0; i < numSpills; i++) {
+          TezIndexRecord indexRecord = indexCacheList.get(i).getIndex(parts);
+
+          Segment s =
+            new Segment(job, rfs, filename[i], indexRecord.getStartOffset(),
+                             indexRecord.getPartLength(), codec, true);
+          segmentList.add(i, s);
+
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("MapId=" + mapId + " Reducer=" + parts +
+                "Spill =" + i + "(" + indexRecord.getStartOffset() + "," +
+                indexRecord.getRawLength() + ", " + 
+                indexRecord.getPartLength() + ")");
+          }
+        }
+
+        int mergeFactor = 
+            job.getInt(TezJobConfig.TEZ_ENGINE_IO_SORT_FACTOR, 
+                TezJobConfig.DEFAULT_TEZ_ENGINE_IO_SORT_FACTOR);
+        // sort the segments only if there are intermediate merges
+        boolean sortSegments = segmentList.size() > mergeFactor;
+        //merge
+        TezRawKeyValueIterator kvIter = TezMerger.merge(job, rfs,
+                       keyClass, valClass, codec,
+                       segmentList, mergeFactor,
+                       new Path(mapId.toString()),
+                       (RawComparator)ConfigUtils.getOutputKeyComparator(job), 
+                       task.getTaskReporter(), sortSegments,
+                       null, spilledRecordsCounter, 
+                       sortPhase.phase());
+
+        //write merged output to disk
+        long segmentStart = finalOut.getPos();
+        Writer writer =
+            new Writer(job, finalOut, keyClass, valClass, codec,
+                             spilledRecordsCounter);
+        if (combineProcessor == null || numSpills < minSpillsForCombine) {
+          TezMerger.writeFile(kvIter, writer, task.getTaskReporter(), job);
+          writer.close();
+        } else {
+          runCombineProcessor(kvIter, writer);
+        }
+
+        sortPhase.startNextPhase();
+        
+        // record offsets
+        final TezIndexRecord rec = 
+            new TezIndexRecord(
+                segmentStart, 
+                writer.getRawLength(), 
+                writer.getCompressedLength());
+        spillRec.putIndex(rec, parts);
+      }
+      spillRec.writeToFile(finalIndexFile, job);
+      finalOut.close();
+      for(int i = 0; i < numSpills; i++) {
+        rfs.delete(filename[i],true);
+      }
+    }
+  }
+  
+  @Override
+  public OutputContext getOutputContext() {
+    return null;
+  }
+
+}

Added: incubator/tez/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/dflt/InMemoryShuffleSorter.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/dflt/InMemoryShuffleSorter.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/dflt/InMemoryShuffleSorter.java (added)
+++ incubator/tez/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/dflt/InMemoryShuffleSorter.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,144 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.engine.common.sort.impl.dflt;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.IntBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.util.DataChecksum;
+import org.apache.tez.api.Master;
+import org.apache.tez.common.TezTask;
+import org.apache.tez.engine.common.shuffle.impl.ShuffleHeader;
+import org.apache.tez.engine.common.shuffle.server.ShuffleHandler;
+import org.apache.tez.engine.common.sort.impl.IFile;
+import org.apache.tez.records.OutputContext;
+
+import com.google.inject.Inject;
+import com.google.inject.assistedinject.Assisted;
+
+public class InMemoryShuffleSorter extends DefaultSorter {
+
+  private static final Log LOG = LogFactory.getLog(InMemoryShuffleSorter.class);
+  
+  static final int IFILE_EOF_LENGTH = 
+      2 * WritableUtils.getVIntSize(IFile.EOF_MARKER);
+  static final int IFILE_CHECKSUM_LENGTH = DataChecksum.Type.CRC32.size;
+  
+  private List<Integer> spillIndices = new ArrayList<Integer>();
+  private List<ShuffleHeader> shuffleHeaders = new ArrayList<ShuffleHeader>();
+
+  ShuffleHandler shuffleHandler = new ShuffleHandler(this);
+  
+  byte[] kvbuffer;
+  IntBuffer kvmeta;
+  
+  @Inject
+  public InMemoryShuffleSorter(
+      @Assisted TezTask task
+      ) throws IOException {
+    super(task);
+  }
+
+  @Override
+  public void initialize(Configuration conf, Master master) throws IOException,
+      InterruptedException {
+    super.initialize(conf, master);
+    shuffleHandler.init(conf, task);
+  }
+
+  @Override
+  protected void spill(int mstart, int mend) 
+      throws IOException, InterruptedException {
+    // Start the shuffleHandler
+    shuffleHandler.start();
+
+    // Don't spill!
+    
+    // Make a copy
+    this.kvbuffer = super.kvbuffer;
+    this.kvmeta = super.kvmeta;
+
+    // Just save spill-indices for serving later
+    int spindex = mstart;
+    for (int i = 0; i < partitions; ++i) {
+      spillIndices.add(spindex);
+      
+      int length = 0;
+      while (spindex < mend &&
+          kvmeta.get(offsetFor(spindex) + PARTITION) == i) {
+
+        final int kvoff = offsetFor(spindex);
+        int keyLen = 
+            kvmeta.get(kvoff + VALSTART) - kvmeta.get(kvoff + KEYSTART);
+        int valLen = getInMemVBytesLength(kvoff);
+        length += 
+            (keyLen + WritableUtils.getVIntSize(keyLen)) + 
+            (valLen + WritableUtils.getVIntSize(valLen));
+
+        ++spindex;
+      }
+      length += IFILE_EOF_LENGTH;
+      
+      shuffleHeaders.add( 
+          new ShuffleHeader(
+              task.getTaskAttemptId().toString(), 
+              length + IFILE_CHECKSUM_LENGTH, length, i)
+          );
+      LOG.info("shuffleHeader[" + i + "]:" +
+      		" rawLen=" + length + " partLen=" + (length + IFILE_CHECKSUM_LENGTH) + 
+          " spillIndex=" + spillIndices.get(i));
+    }
+    
+    LOG.info("Saved " + spillIndices.size() + " spill-indices and " + 
+        shuffleHeaders.size() + " shuffle headers");
+  }
+
+  @Override
+  public InputStream getSortedStream(int partition) {
+    return new SortBufferInputStream(this, partition);
+  }
+
+  @Override
+  public void close() throws IOException, InterruptedException{
+    // FIXME
+    //shuffleHandler.stop();
+  }
+
+  @Override
+  public ShuffleHeader getShuffleHeader(int reduce) {
+    return shuffleHeaders.get(reduce);
+  }
+
+  public int getSpillIndex(int partition) {
+    return spillIndices.get(partition);
+  }
+
+  @Override
+  public OutputContext getOutputContext() {
+    return new OutputContext(shuffleHandler.getPort());
+  }
+  
+}

Added: incubator/tez/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/dflt/SortBufferInputStream.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/dflt/SortBufferInputStream.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/dflt/SortBufferInputStream.java (added)
+++ incubator/tez/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/dflt/SortBufferInputStream.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,271 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.engine.common.sort.impl.dflt;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.IntBuffer;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.BoundedByteArrayOutputStream;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.tez.engine.common.shuffle.impl.InMemoryWriter;
+import org.apache.tez.engine.common.sort.impl.dflt.DefaultSorter.InMemValBytes;
+
+class SortBufferInputStream extends InputStream {
+
+  private static final Log LOG = LogFactory.getLog(SortBufferInputStream.class);
+  
+  private final InMemoryShuffleSorter sorter;
+  private InMemoryWriter sortOutput;
+  
+  private int mend;
+  private int recIndex;
+  private final byte[] kvbuffer;       
+  private final IntBuffer kvmeta;
+  private final int partitionBytes;
+  private final int partition;
+  
+  byte[] dualBuf = new byte[8192];
+  DualBufferOutputStream out;
+  private int readBytes = 0;
+  
+  public SortBufferInputStream(
+      InMemoryShuffleSorter sorter, int partition) {
+    this.sorter = sorter;
+    this.partitionBytes = 
+        (int)sorter.getShuffleHeader(partition).getCompressedLength();
+    this.partition = partition;
+    this.mend = sorter.getMetaEnd();
+    this.recIndex = sorter.getSpillIndex(partition);
+    this.kvbuffer = sorter.kvbuffer;
+    this.kvmeta = sorter.kvmeta;
+    out = new DualBufferOutputStream(null, 0, 0, dualBuf);
+    sortOutput = new InMemoryWriter(out);
+  }
+  
+  byte[] one = new byte[1];
+  
+  @Override
+  public int read() throws IOException {
+    int b = read(one, 0, 1);
+    return (b == -1) ? b : one[0]; 
+  }
+
+  @Override
+  public int read(byte[] b) throws IOException {
+    return read(b, 0, b.length);
+  }
+
+  @Override
+  public int read(byte[] b, int off, int len) throws IOException {
+    if (available() == 0) {
+      return -1;
+    }
+    
+    int currentOffset = off;
+    int currentLength = len;
+    int currentReadBytes = 0;
+    
+    // Check if there is residual data in the dualBuf
+    int residualLen = out.getCurrent();
+    if (residualLen > 0) {
+      int readable = Math.min(currentLength, residualLen);
+      System.arraycopy(dualBuf, 0, b, currentOffset, readable);
+      currentOffset += readable;
+      currentReadBytes += readable;
+      out.setCurrentPointer(-readable);
+      
+      // buffer has less capacity
+      currentLength -= readable;
+      
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("XXX read_residual:" +
+            " readable=" + readable +
+            " readBytes=" + readBytes);
+      }
+    }
+    
+    // Now, use the provided buffer
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("XXX read: out.reset" +
+          " b=" + b + 
+          " currentOffset=" + currentOffset + 
+          " currentLength=" + currentLength +
+          " recIndex=" + recIndex);
+    }
+    out.reset(b, currentOffset, currentLength);
+    
+    // Read from sort-buffer into the provided buffer, space permitting
+    DataInputBuffer key = new DataInputBuffer();
+    final InMemValBytes value = sorter.createInMemValBytes();
+    
+    int kvPartition = 0;
+    int numRec = 0;
+    for (;
+         currentLength > 0 && recIndex < mend && 
+             (kvPartition = getKVPartition(recIndex)) == partition;
+        ++recIndex) {
+      
+      final int kvoff = sorter.offsetFor(recIndex);
+      
+      int keyLen = 
+          (kvmeta.get(kvoff + InMemoryShuffleSorter.VALSTART) - 
+              kvmeta.get(kvoff + InMemoryShuffleSorter.KEYSTART));
+      key.reset(
+          kvbuffer, 
+          kvmeta.get(kvoff + InMemoryShuffleSorter.KEYSTART),
+          keyLen
+          );
+      
+      int valLen = sorter.getVBytesForOffset(kvoff, value);
+
+      int recLen = 
+          (keyLen + WritableUtils.getVIntSize(keyLen)) + 
+          (valLen + WritableUtils.getVIntSize(valLen));
+      
+      currentReadBytes += recLen;
+      currentOffset += recLen;
+      currentLength -= recLen;
+
+      // Write out key/value into the in-mem ifile
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("XXX read: sortOutput.append" +
+            " #rec=" + ++numRec +
+            " recIndex=" + recIndex + " kvoff=" + kvoff + 
+            " keyLen=" + keyLen + " valLen=" + valLen + " recLen=" + recLen +
+            " readBytes=" + readBytes +
+            " currentReadBytes="  + currentReadBytes +
+            " currentLength=" + currentLength);
+      }
+      sortOutput.append(key, value);
+    }
+
+    // If we are at the end of the segment, close the ifile
+    if (currentLength > 0 && 
+        (recIndex == mend || kvPartition != partition)) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("XXX About to call close:" +
+            " currentLength=" + currentLength + 
+            " recIndex=" + recIndex + " mend=" + mend + 
+            " kvPartition=" + kvPartition + " partitino=" + partition);
+      }
+      sortOutput.close();
+      currentReadBytes += 
+          (InMemoryShuffleSorter.IFILE_EOF_LENGTH + 
+              InMemoryShuffleSorter.IFILE_CHECKSUM_LENGTH);
+    } else {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("XXX Hmm..." +
+            " currentLength=" + currentLength + 
+            " recIndex=" + recIndex + " mend=" + mend + 
+            " kvPartition=" + kvPartition + " partitino=" + partition);
+      }
+    }
+    
+    int retVal = Math.min(currentReadBytes, len);
+    readBytes += retVal;
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("XXX read: done" +
+          " retVal=" + retVal + 
+          " currentReadBytes=" + currentReadBytes +
+          " len=" + len + 
+          " readBytes=" + readBytes +
+          " partitionBytes=" + partitionBytes +
+          " residualBytes=" + out.getCurrent());
+    }
+    return retVal;
+  }
+
+  private int getKVPartition(int recIndex) {
+    return kvmeta.get(
+        sorter.offsetFor(recIndex) + InMemoryShuffleSorter.PARTITION);
+  }
+  
+  @Override
+  public int available() throws IOException {
+    return (partitionBytes - readBytes);
+  }
+
+  @Override
+  public void close() throws IOException {
+    super.close();
+  }
+
+  @Override
+  public boolean markSupported() {
+    return false;
+  }
+  
+  static class DualBufferOutputStream extends BoundedByteArrayOutputStream {
+
+    byte[] dualBuf;
+    int currentPointer = 0;
+    byte[] one = new byte[1];
+    
+    public DualBufferOutputStream(
+        byte[] buf, int offset, int length, 
+        byte[] altBuf) {
+      super(buf, offset, length);
+      this.dualBuf = altBuf;
+    }
+    
+    public void reset(byte[] b, int off, int len) {
+      super.resetBuffer(b, off, len);
+    }
+
+    @Override
+    public void write(int b) throws IOException {
+      one[0] = (byte)b;
+      write(one, 0, 1);
+    }
+
+    @Override
+    public void write(byte[] b) throws IOException {
+      write(b, 0, b.length);
+    }
+
+    @Override
+    public void write(byte[] b, int off, int len) throws IOException {
+      int available = super.available();
+      if (available >= len) {
+        super.write(b, off, len);
+      } else {
+        super.write(b, off, available);
+        System.arraycopy(b, off+available, dualBuf, currentPointer, len-available);
+        currentPointer += (len - available);
+      }
+    }
+    
+    int getCurrent() {
+      return currentPointer;
+    }
+    
+    void setCurrentPointer(int delta) {
+      if ((currentPointer + delta) > dualBuf.length) {
+        throw new IndexOutOfBoundsException("Trying to set dualBuf 'current'" +
+        		" marker to " + (currentPointer+delta) + " when " +
+        		" dualBuf.length is " + dualBuf.length);
+      }
+      currentPointer = (currentPointer + delta) % dualBuf.length;
+    }
+  }
+}

Added: incubator/tez/tez-engine/src/main/java/org/apache/tez/engine/common/task/impl/CombineValuesIterator.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-engine/src/main/java/org/apache/tez/engine/common/task/impl/CombineValuesIterator.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-engine/src/main/java/org/apache/tez/engine/common/task/impl/CombineValuesIterator.java (added)
+++ incubator/tez/tez-engine/src/main/java/org/apache/tez/engine/common/task/impl/CombineValuesIterator.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,51 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.engine.common.task.impl;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.tez.common.TezTaskReporter;
+import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.engine.common.sort.impl.TezRawKeyValueIterator;
+
+/** Iterator to return Combined values */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class CombineValuesIterator<KEY,VALUE>
+extends ValuesIterator<KEY,VALUE> {
+
+  private final TezCounter combineInputCounter;
+
+  public CombineValuesIterator(TezRawKeyValueIterator in,
+      RawComparator<KEY> comparator, Class<KEY> keyClass,
+      Class<VALUE> valClass, Configuration conf, TezTaskReporter reporter,
+      TezCounter combineInputCounter) throws IOException {
+    super(in, comparator, keyClass, valClass, conf, reporter);
+    this.combineInputCounter = combineInputCounter;
+  }
+
+  public VALUE next() {
+    combineInputCounter.increment(1);
+    return super.next();
+  }
+}

Added: incubator/tez/tez-engine/src/main/java/org/apache/tez/engine/common/task/impl/ValuesIterator.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-engine/src/main/java/org/apache/tez/engine/common/task/impl/ValuesIterator.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-engine/src/main/java/org/apache/tez/engine/common/task/impl/ValuesIterator.java (added)
+++ incubator/tez/tez-engine/src/main/java/org/apache/tez/engine/common/task/impl/ValuesIterator.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,142 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.engine.common.task.impl;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.io.serializer.Deserializer;
+import org.apache.hadoop.io.serializer.SerializationFactory;
+import org.apache.hadoop.util.Progressable;
+import org.apache.tez.engine.common.sort.impl.TezRawKeyValueIterator;
+
+/** Iterates values while keys match in sorted input. */
+public class ValuesIterator<KEY,VALUE> implements Iterator<VALUE> {
+  protected TezRawKeyValueIterator in; //input iterator
+  private KEY key;               // current key
+  private KEY nextKey;
+  private VALUE value;             // current value
+  private boolean hasNext;                      // more w/ this key
+  private boolean more;                         // more in file
+  private RawComparator<KEY> comparator;
+  protected Progressable reporter;
+  private Deserializer<KEY> keyDeserializer;
+  private Deserializer<VALUE> valDeserializer;
+  private DataInputBuffer keyIn = new DataInputBuffer();
+  private DataInputBuffer valueIn = new DataInputBuffer();
+  
+  public ValuesIterator (TezRawKeyValueIterator in, 
+                         RawComparator<KEY> comparator, 
+                         Class<KEY> keyClass,
+                         Class<VALUE> valClass, Configuration conf, 
+                         Progressable reporter)
+    throws IOException {
+    this.in = in;
+    this.comparator = comparator;
+    this.reporter = reporter;
+    SerializationFactory serializationFactory = new SerializationFactory(conf);
+    this.keyDeserializer = serializationFactory.getDeserializer(keyClass);
+    this.keyDeserializer.open(keyIn);
+    this.valDeserializer = serializationFactory.getDeserializer(valClass);
+    this.valDeserializer.open(this.valueIn);
+    readNextKey();
+    key = nextKey;
+    nextKey = null; // force new instance creation
+    hasNext = more;
+  }
+
+  TezRawKeyValueIterator getRawIterator() { return in; }
+  
+  /// Iterator methods
+
+  public boolean hasNext() { return hasNext; }
+
+  private int ctr = 0;
+  public VALUE next() {
+    if (!hasNext) {
+      throw new NoSuchElementException("iterate past last value");
+    }
+    try {
+      readNextValue();
+      readNextKey();
+    } catch (IOException ie) {
+      throw new RuntimeException("problem advancing post rec#"+ctr, ie);
+    }
+    reporter.progress();
+    return value;
+  }
+
+  public void remove() { throw new RuntimeException("not implemented"); }
+
+  /// Auxiliary methods
+
+  /** Start processing next unique key. */
+  public void nextKey() throws IOException {
+    // read until we find a new key
+    while (hasNext) { 
+      readNextKey();
+    }
+    ++ctr;
+    
+    // move the next key to the current one
+    KEY tmpKey = key;
+    key = nextKey;
+    nextKey = tmpKey;
+    hasNext = more;
+  }
+
+  /** True iff more keys remain. */
+  public boolean more() { 
+    return more; 
+  }
+
+  /** The current key. */
+  public KEY getKey() { 
+    return key; 
+  }
+
+  /** 
+   * read the next key 
+   */
+  private void readNextKey() throws IOException {
+    more = in.next();
+    if (more) {
+      DataInputBuffer nextKeyBytes = in.getKey();
+      keyIn.reset(nextKeyBytes.getData(), nextKeyBytes.getPosition(), nextKeyBytes.getLength());
+      nextKey = keyDeserializer.deserialize(nextKey);
+      hasNext = key != null && (comparator.compare(key, nextKey) == 0);
+    } else {
+      hasNext = false;
+    }
+  }
+
+  /**
+   * Read the next value
+   * @throws IOException
+   */
+  private void readNextValue() throws IOException {
+    DataInputBuffer nextValueBytes = in.getValue();
+    valueIn.reset(nextValueBytes.getData(), nextValueBytes.getPosition(), nextValueBytes.getLength());
+    value = valDeserializer.deserialize(value);
+  }
+}

Added: incubator/tez/tez-engine/src/main/java/org/apache/tez/engine/common/task/local/output/TezLocalTaskOutputFiles.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-engine/src/main/java/org/apache/tez/engine/common/task/local/output/TezLocalTaskOutputFiles.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-engine/src/main/java/org/apache/tez/engine/common/task/local/output/TezLocalTaskOutputFiles.java (added)
+++ incubator/tez/tez-engine/src/main/java/org/apache/tez/engine/common/task/local/output/TezLocalTaskOutputFiles.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,237 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.engine.common.task.local.output;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalDirAllocator;
+import org.apache.hadoop.fs.Path;
+import org.apache.tez.common.Constants;
+import org.apache.tez.common.TezJobConfig;
+import org.apache.tez.records.TezTaskID;
+
+/**
+ * Manipulate the working area for the transient store for maps and reduces.
+ *
+ * This class is used by map and reduce tasks to identify the directories that
+ * they need to write to/read from for intermediate files. The callers of
+ * these methods are from the Child running the Task.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class TezLocalTaskOutputFiles extends TezTaskOutput {
+
+  private LocalDirAllocator lDirAlloc =
+    new LocalDirAllocator(TezJobConfig.LOCAL_DIR);
+
+  public TezLocalTaskOutputFiles() {
+  }
+
+  /**
+   * Return the path to local map output file created earlier
+   *
+   * @return path
+   * @throws IOException
+   */
+  @Override
+  public Path getOutputFile()
+      throws IOException {
+    return lDirAlloc.getLocalPathToRead(Constants.TASK_OUTPUT_DIR + Path.SEPARATOR
+        + Constants.TEZ_ENGINE_TASK_OUTPUT_FILENAME_STRING, getConf());
+  }
+
+  /**
+   * Create a local map output file name.
+   *
+   * @param size the size of the file
+   * @return path
+   * @throws IOException
+   */
+  @Override
+  public Path getOutputFileForWrite(long size)
+      throws IOException {
+    return lDirAlloc.getLocalPathForWrite(Constants.TASK_OUTPUT_DIR + Path.SEPARATOR
+        + Constants.TEZ_ENGINE_TASK_OUTPUT_FILENAME_STRING, size, getConf());
+  }
+
+  /**
+   * Create a local map output file name on the same volume.
+   */
+  @Override
+  public Path getOutputFileForWriteInVolume(Path existing) {
+    return new Path(existing.getParent(), Constants.TEZ_ENGINE_TASK_OUTPUT_FILENAME_STRING);
+  }
+
+  /**
+   * Return the path to a local map output index file created earlier
+   *
+   * @return path
+   * @throws IOException
+   */
+  @Override
+  public Path getOutputIndexFile()
+      throws IOException {
+    return lDirAlloc.getLocalPathToRead(Constants.TASK_OUTPUT_DIR + Path.SEPARATOR
+        + Constants.TEZ_ENGINE_TASK_OUTPUT_FILENAME_STRING + Constants.TEZ_ENGINE_TASK_OUTPUT_INDEX_SUFFIX_STRING,
+        getConf());
+  }
+
+  /**
+   * Create a local map output index file name.
+   *
+   * @param size the size of the file
+   * @return path
+   * @throws IOException
+   */
+  @Override
+  public Path getOutputIndexFileForWrite(long size)
+      throws IOException {
+    return lDirAlloc.getLocalPathForWrite(Constants.TASK_OUTPUT_DIR + Path.SEPARATOR
+        + Constants.TEZ_ENGINE_TASK_OUTPUT_FILENAME_STRING + Constants.TEZ_ENGINE_TASK_OUTPUT_INDEX_SUFFIX_STRING,
+        size, getConf());
+  }
+
+  /**
+   * Create a local map output index file name on the same volume.
+   */
+  @Override
+  public Path getOutputIndexFileForWriteInVolume(Path existing) {
+    return new Path(existing.getParent(),
+        Constants.TEZ_ENGINE_TASK_OUTPUT_FILENAME_STRING + Constants.TEZ_ENGINE_TASK_OUTPUT_INDEX_SUFFIX_STRING);
+  }
+
+  /**
+   * Return a local map spill file created earlier.
+   *
+   * @param spillNumber the number
+   * @return path
+   * @throws IOException
+   */
+  @Override
+  public Path getSpillFile(int spillNumber)
+      throws IOException {
+    return lDirAlloc.getLocalPathToRead(Constants.TASK_OUTPUT_DIR + "/spill"
+        + spillNumber + ".out", getConf());
+  }
+
+  /**
+   * Create a local map spill file name.
+   *
+   * @param spillNumber the number
+   * @param size the size of the file
+   * @return path
+   * @throws IOException
+   */
+  @Override
+  public Path getSpillFileForWrite(int spillNumber, long size)
+      throws IOException {
+    return lDirAlloc.getLocalPathForWrite(Constants.TASK_OUTPUT_DIR + "/spill"
+        + spillNumber + ".out", size, getConf());
+  }
+
+  /**
+   * Return a local map spill index file created earlier
+   *
+   * @param spillNumber the number
+   * @return path
+   * @throws IOException
+   */
+  @Override
+  public Path getSpillIndexFile(int spillNumber)
+      throws IOException {
+    return lDirAlloc.getLocalPathToRead(Constants.TASK_OUTPUT_DIR + "/spill"
+        + spillNumber + ".out.index", getConf());
+  }
+
+  /**
+   * Create a local map spill index file name.
+   *
+   * @param spillNumber the number
+   * @param size the size of the file
+   * @return path
+   * @throws IOException
+   */
+  @Override
+  public Path getSpillIndexFileForWrite(int spillNumber, long size)
+      throws IOException {
+    return lDirAlloc.getLocalPathForWrite(Constants.TASK_OUTPUT_DIR + "/spill"
+        + spillNumber + ".out.index", size, getConf());
+  }
+
+  /**
+   * Return a local reduce input file created earlier
+   *
+   * @param mapId a map task id
+   * @return path
+   * @throws IOException
+   */
+  @Override
+  public Path getInputFile(int mapId)
+      throws IOException {
+    return lDirAlloc.getLocalPathToRead(String.format(
+        Constants.TEZ_ENGINE_TASK_INPUT_FILE_FORMAT_STRING, 
+        Constants.TASK_OUTPUT_DIR, Integer.valueOf(mapId)), getConf());
+  }
+
+  /**
+   * Create a local reduce input file name.
+   *
+   * @param mapId a map task id
+   * @param size the size of the file
+   * @return path
+   * @throws IOException
+   */
+  @Override
+  public Path getInputFileForWrite(TezTaskID mapId,
+                                   long size)
+      throws IOException {
+    return lDirAlloc.getLocalPathForWrite(String.format(
+        Constants.TEZ_ENGINE_TASK_INPUT_FILE_FORMAT_STRING, Constants.TASK_OUTPUT_DIR, mapId.getId()),
+        size, getConf());
+  }
+
+  /** Removes all of the files related to a task. */
+  @Override
+  public void removeAll()
+      throws IOException {
+    deleteLocalFiles(Constants.TASK_OUTPUT_DIR);
+  }
+
+  @Override
+  public void setConf(Configuration conf) {
+    super.setConf(conf);
+  }
+
+  private String[] getLocalDirs() throws IOException {
+    return getConf().getStrings(TezJobConfig.LOCAL_DIR);
+  }
+
+  @SuppressWarnings("deprecation")
+  private void deleteLocalFiles(String subdir) throws IOException {
+    String[] localDirs = getLocalDirs();
+    for (int i = 0; i < localDirs.length; i++) {
+      FileSystem.getLocal(getConf()).delete(new Path(localDirs[i], subdir));
+    }
+  }
+
+}

Added: incubator/tez/tez-engine/src/main/java/org/apache/tez/engine/common/task/local/output/TezTaskOutput.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-engine/src/main/java/org/apache/tez/engine/common/task/local/output/TezTaskOutput.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-engine/src/main/java/org/apache/tez/engine/common/task/local/output/TezTaskOutput.java (added)
+++ incubator/tez/tez-engine/src/main/java/org/apache/tez/engine/common/task/local/output/TezTaskOutput.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,165 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.engine.common.task.local.output;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.fs.Path;
+import org.apache.tez.records.TezTaskID;
+
+/**
+ * Manipulate the working area for the transient store for maps and reduces.
+ *
+ * This class is used by map and reduce tasks to identify the directories that
+ * they need to write to/read from for intermediate files. The callers of
+ * these methods are from child space and see mapreduce.cluster.local.dir as
+ * taskTracker/jobCache/jobId/attemptId
+ * This class should not be used from TaskTracker space.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public abstract class TezTaskOutput implements Configurable {
+
+  private Configuration conf;
+
+  public TezTaskOutput() {
+  }
+
+  /**
+   * Return the path to local map output file created earlier
+   *
+   * @return path
+   * @throws IOException
+   */
+  public abstract Path getOutputFile() throws IOException;
+
+  /**
+   * Create a local map output file name.
+   *
+   * @param size the size of the file
+   * @return path
+   * @throws IOException
+   */
+  public abstract Path getOutputFileForWrite(long size) throws IOException;
+
+  /**
+   * Create a local map output file name on the same volume.
+   */
+  public abstract Path getOutputFileForWriteInVolume(Path existing);
+
+  /**
+   * Return the path to a local map output index file created earlier
+   *
+   * @return path
+   * @throws IOException
+   */
+  public abstract Path getOutputIndexFile() throws IOException;
+
+  /**
+   * Create a local map output index file name.
+   *
+   * @param size the size of the file
+   * @return path
+   * @throws IOException
+   */
+  public abstract Path getOutputIndexFileForWrite(long size) throws IOException;
+
+  /**
+   * Create a local map output index file name on the same volume.
+   */
+  public abstract Path getOutputIndexFileForWriteInVolume(Path existing);
+
+  /**
+   * Return a local map spill file created earlier.
+   *
+   * @param spillNumber the number
+   * @return path
+   * @throws IOException
+   */
+  public abstract Path getSpillFile(int spillNumber) throws IOException;
+
+  /**
+   * Create a local map spill file name.
+   *
+   * @param spillNumber the number
+   * @param size the size of the file
+   * @return path
+   * @throws IOException
+   */
+  public abstract Path getSpillFileForWrite(int spillNumber, long size)
+      throws IOException;
+
+  /**
+   * Return a local map spill index file created earlier
+   *
+   * @param spillNumber the number
+   * @return path
+   * @throws IOException
+   */
+  public abstract Path getSpillIndexFile(int spillNumber) throws IOException;
+
+  /**
+   * Create a local map spill index file name.
+   *
+   * @param spillNumber the number
+   * @param size the size of the file
+   * @return path
+   * @throws IOException
+   */
+  public abstract Path getSpillIndexFileForWrite(int spillNumber, long size)
+      throws IOException;
+
+  /**
+   * Return a local reduce input file created earlier
+   *
+   * @param mapId a map task id
+   * @return path
+   * @throws IOException
+   */
+  public abstract Path getInputFile(int mapId) throws IOException;
+
+  /**
+   * Create a local reduce input file name.
+   *
+   * @param mapId a map task id
+   * @param size the size of the file
+   * @return path
+   * @throws IOException
+   */
+  public abstract Path getInputFileForWrite(
+      TezTaskID mapId, long size) throws IOException;
+
+  /** Removes all of the files related to a task. */
+  public abstract void removeAll() throws IOException;
+
+  @Override
+  public void setConf(Configuration conf) {
+    this.conf = conf;
+  }
+
+  @Override
+  public Configuration getConf() {
+    return conf;
+  }
+
+}

Added: incubator/tez/tez-engine/src/main/java/org/apache/tez/engine/common/task/local/output/TezTaskOutputFiles.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-engine/src/main/java/org/apache/tez/engine/common/task/local/output/TezTaskOutputFiles.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-engine/src/main/java/org/apache/tez/engine/common/task/local/output/TezTaskOutputFiles.java (added)
+++ incubator/tez/tez-engine/src/main/java/org/apache/tez/engine/common/task/local/output/TezTaskOutputFiles.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,234 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.engine.common.task.local.output;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.LocalDirAllocator;
+import org.apache.hadoop.fs.Path;
+import org.apache.tez.common.Constants;
+import org.apache.tez.common.TezJobConfig;
+import org.apache.tez.records.TezTaskID;
+
+/**
+ * Manipulate the working area for the transient store for maps and reduces.
+ *
+ * This class is used by map and reduce tasks to identify the directories that
+ * they need to write to/read from for intermediate files. The callers of
+ * these methods are from child space and see mapreduce.cluster.local.dir as
+ * taskTracker/jobCache/jobId/attemptId
+ * This class should not be used from TaskTracker space.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class TezTaskOutputFiles extends TezTaskOutput {
+
+  private Configuration conf;
+
+  private static final String SPILL_FILE_PATTERN = "%s_spill_%d.out";
+  private static final String SPILL_INDEX_FILE_PATTERN = SPILL_FILE_PATTERN
+      + ".index";
+
+  public TezTaskOutputFiles() {
+  }
+
+  // assume configured to $localdir/usercache/$user/appcache/$appId
+  private LocalDirAllocator lDirAlloc = 
+    new LocalDirAllocator(TezJobConfig.LOCAL_DIR);
+
+  private Path getAttemptOutputDir() {
+    System.err.println("getAttemptOutputDir: " + 
+        Constants.TASK_OUTPUT_DIR + "/" + conf.get(Constants.TEZ_ENGINE_TASK_ATTEMPT_ID));
+    return new Path(Constants.TASK_OUTPUT_DIR, conf.get(Constants.TEZ_ENGINE_TASK_ATTEMPT_ID));
+  }
+  
+  /**
+   * Return the path to local map output file created earlier
+   * 
+   * @return path
+   * @throws IOException
+   */
+  public Path getOutputFile() throws IOException {
+    Path attemptOutput =
+      new Path(getAttemptOutputDir(), Constants.TEZ_ENGINE_TASK_OUTPUT_FILENAME_STRING);
+    return lDirAlloc.getLocalPathToRead(attemptOutput.toString(), conf);
+  }
+
+  /**
+   * Create a local map output file name.
+   * 
+   * @param size the size of the file
+   * @return path
+   * @throws IOException
+   */
+  public Path getOutputFileForWrite(long size) throws IOException {
+    Path attemptOutput = 
+      new Path(getAttemptOutputDir(), Constants.TEZ_ENGINE_TASK_OUTPUT_FILENAME_STRING);
+    return lDirAlloc.getLocalPathForWrite(attemptOutput.toString(), size, conf);
+  }
+
+  /**
+   * Create a local map output file name on the same volume.
+   */
+  public Path getOutputFileForWriteInVolume(Path existing) {
+    Path outputDir = new Path(existing.getParent(), Constants.TASK_OUTPUT_DIR);
+    Path attemptOutputDir = new Path(outputDir,
+        conf.get(Constants.TEZ_ENGINE_TASK_ATTEMPT_ID));
+    return new Path(attemptOutputDir, Constants.TEZ_ENGINE_TASK_OUTPUT_FILENAME_STRING);
+  }
+
+  /**
+   * Return the path to a local map output index file created earlier
+   * 
+   * @return path
+   * @throws IOException
+   */
+  public Path getOutputIndexFile() throws IOException {
+    Path attemptIndexOutput =
+      new Path(getAttemptOutputDir(), Constants.TEZ_ENGINE_TASK_OUTPUT_FILENAME_STRING +
+                                      Constants.TEZ_ENGINE_TASK_OUTPUT_INDEX_SUFFIX_STRING);
+    return lDirAlloc.getLocalPathToRead(attemptIndexOutput.toString(), conf);
+  }
+
+  /**
+   * Create a local map output index file name.
+   * 
+   * @param size the size of the file
+   * @return path
+   * @throws IOException
+   */
+  public Path getOutputIndexFileForWrite(long size) throws IOException {
+    Path attemptIndexOutput =
+      new Path(getAttemptOutputDir(), Constants.TEZ_ENGINE_TASK_OUTPUT_FILENAME_STRING +
+                                      Constants.TEZ_ENGINE_TASK_OUTPUT_INDEX_SUFFIX_STRING);
+    return lDirAlloc.getLocalPathForWrite(attemptIndexOutput.toString(),
+        size, conf);
+  }
+
+  /**
+   * Create a local map output index file name on the same volume.
+   */
+  public Path getOutputIndexFileForWriteInVolume(Path existing) {
+    Path outputDir = new Path(existing.getParent(), Constants.TASK_OUTPUT_DIR);
+    Path attemptOutputDir = new Path(outputDir,
+        conf.get(Constants.TEZ_ENGINE_TASK_ATTEMPT_ID));
+    return new Path(attemptOutputDir, Constants.TEZ_ENGINE_TASK_OUTPUT_FILENAME_STRING +
+                                      Constants.TEZ_ENGINE_TASK_OUTPUT_INDEX_SUFFIX_STRING);
+  }
+
+  /**
+   * Return a local map spill file created earlier.
+   * 
+   * @param spillNumber the number
+   * @return path
+   * @throws IOException
+   */
+  public Path getSpillFile(int spillNumber) throws IOException {
+    return lDirAlloc.getLocalPathToRead(
+        String.format(SPILL_FILE_PATTERN,
+            conf.get(Constants.TEZ_ENGINE_TASK_ATTEMPT_ID), spillNumber), conf);
+  }
+
+  /**
+   * Create a local map spill file name.
+   * 
+   * @param spillNumber the number
+   * @param size the size of the file
+   * @return path
+   * @throws IOException
+   */
+  public Path getSpillFileForWrite(int spillNumber, long size)
+      throws IOException {
+    return lDirAlloc.getLocalPathForWrite(
+        String.format(String.format(SPILL_FILE_PATTERN,
+            conf.get(Constants.TEZ_ENGINE_TASK_ATTEMPT_ID), spillNumber)), size, conf);
+  }
+
+  /**
+   * Return a local map spill index file created earlier
+   * 
+   * @param spillNumber the number
+   * @return path
+   * @throws IOException
+   */
+  public Path getSpillIndexFile(int spillNumber) throws IOException {
+    return lDirAlloc.getLocalPathToRead(
+        String.format(SPILL_INDEX_FILE_PATTERN,
+            conf.get(Constants.TEZ_ENGINE_TASK_ATTEMPT_ID), spillNumber), conf);
+  }
+
+  /**
+   * Create a local map spill index file name.
+   * 
+   * @param spillNumber the number
+   * @param size the size of the file
+   * @return path
+   * @throws IOException
+   */
+  public Path getSpillIndexFileForWrite(int spillNumber, long size)
+      throws IOException {
+    return lDirAlloc.getLocalPathForWrite(
+        String.format(SPILL_INDEX_FILE_PATTERN,
+            conf.get(Constants.TEZ_ENGINE_TASK_ATTEMPT_ID), spillNumber), size, conf);
+  }
+
+  /**
+   * Return a local reduce input file created earlier
+   * 
+   * @param mapId a map task id
+   * @return path
+   * @throws IOException 
+   */
+  public Path getInputFile(int mapId) throws IOException {
+    throw new UnsupportedOperationException("Incompatible with LocalRunner");
+  }
+
+  /**
+   * Create a local reduce input file name.
+   * 
+   * @param mapId a map task id
+   * @param size the size of the file
+   * @return path
+   * @throws IOException
+   */
+  public Path getInputFileForWrite(TezTaskID mapId,
+      long size) throws IOException {
+    return lDirAlloc.getLocalPathForWrite(String.format(
+        Constants.TEZ_ENGINE_TASK_INPUT_FILE_FORMAT_STRING,
+        getAttemptOutputDir().toString(), mapId.getId()),
+        size, conf);
+  }
+
+  /** Removes all of the files related to a task. */
+  public void removeAll() throws IOException {
+    throw new UnsupportedOperationException("Incompatible with LocalRunner");
+  }
+
+  public void setConf(Configuration conf) {
+    this.conf = conf;
+  }
+
+  public Configuration getConf() {
+    return conf;
+  }
+  
+}



Mime
View raw message