tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From acmur...@apache.org
Subject svn commit: r1457129 [10/38] - in /incubator/tez: ./ tez-ampool/ tez-ampool/src/ tez-ampool/src/main/ tez-ampool/src/main/bin/ tez-ampool/src/main/conf/ tez-ampool/src/main/java/ tez-ampool/src/main/java/org/ tez-ampool/src/main/java/org/apache/ tez-am...
Date Fri, 15 Mar 2013 21:26:48 GMT
Added: incubator/tez/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/PipelinedSorter.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/PipelinedSorter.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/PipelinedSorter.java (added)
+++ incubator/tez/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/PipelinedSorter.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,961 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.tez.engine.common.sort.impl;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.BufferOverflowException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.IntBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.PriorityQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.HashComparator;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.util.IndexedSortable;
+import org.apache.hadoop.util.IndexedSorter;
+import org.apache.hadoop.util.Progress;
+import org.apache.tez.api.Master;
+import org.apache.tez.common.TezJobConfig;
+import org.apache.tez.common.TezTask;
+import org.apache.tez.engine.common.ConfigUtils;
+import org.apache.tez.engine.common.sort.SortingOutput;
+import org.apache.tez.engine.common.sort.impl.IFile.Writer;
+import org.apache.tez.engine.common.sort.impl.TezMerger.Segment;
+import org.apache.tez.records.TezTaskAttemptID;
+import org.apache.tez.records.OutputContext;
+
+import com.google.inject.Inject;
+import com.google.inject.assistedinject.Assisted;
+
+@SuppressWarnings({"unchecked", "rawtypes"})
+public class PipelinedSorter extends ExternalSorter implements SortingOutput {
+  
+  private static final Log LOG = LogFactory.getLog(PipelinedSorter.class);
+  
+  /**
+   * The size of each record in the index file for the map-outputs.
+   */
+  public static final int MAP_OUTPUT_INDEX_RECORD_LENGTH = 24;
+
+  private final static int APPROX_HEADER_LENGTH = 150;
+    
+  int partitionBits;
+  
+  private static final int PARTITION = 0;        // partition offset in acct
+  private static final int KEYSTART = 1;         // key offset in acct
+  private static final int VALSTART = 2;         // val offset in acct
+  private static final int VALLEN = 3;           // val len in acct
+  private static final int NMETA = 4;            // num meta ints
+  private static final int METASIZE = NMETA * 4; // size in bytes
+
+  // spill accounting
+  volatile Throwable sortSpillException = null;
+
+  int numSpills = 0;
+  int minSpillsForCombine;
+  private HashComparator hasher;
+  // SortSpans  
+  private SortSpan span;
+  private ByteBuffer largeBuffer;
+  // Merger
+  private SpanMerger merger; 
+  private ExecutorService sortmaster;
+
+  final ArrayList<TezSpillRecord> indexCacheList =
+    new ArrayList<TezSpillRecord>();
+  private int totalIndexCacheMemory;
+  private int indexCacheMemoryLimit;
+
+
+  @Inject
+  public PipelinedSorter(
+      @Assisted TezTask task
+      ) throws IOException {
+  }
+
+  public void initialize(Configuration conf, Master master) 
+      throws IOException, InterruptedException {
+    
+    if (task == null) {
+      LOG.info("Bailing!", new IOException());
+      return;
+    }
+    super.initialize(conf, master);
+    
+    partitionBits = bitcount(partitions)+1;
+   
+    //sanity checks
+    final float spillper =
+      job.getFloat(
+          TezJobConfig.TEZ_ENGINE_SORT_SPILL_PERCENT, 
+          TezJobConfig.DEFAULT_TEZ_ENGINE_SORT_SPILL_PERCENT);
+    final int sortmb = 
+        job.getInt(
+            TezJobConfig.TEZ_ENGINE_IO_SORT_MB, 
+            TezJobConfig.DEFAULT_TEZ_ENGINE_IO_SORT_MB);
+    indexCacheMemoryLimit = job.getInt(TezJobConfig.TEZ_ENGINE_INDEX_CACHE_MEMORY_LIMIT_BYTES,
+                                       TezJobConfig.DEFAULT_TEZ_ENGINE_INDEX_CACHE_MEMORY_LIMIT_BYTES);
+    if (spillper > (float)1.0 || spillper <= (float)0.0) {
+      throw new IOException("Invalid \"" + TezJobConfig.TEZ_ENGINE_SORT_SPILL_PERCENT +
+          "\": " + spillper);
+    }
+    if ((sortmb & 0x7FF) != sortmb) {
+      throw new IOException(
+          "Invalid \"" + TezJobConfig.TEZ_ENGINE_IO_SORT_MB + "\": " + sortmb);
+    }
+    
+    // buffers and accounting
+    int maxMemUsage = sortmb << 20;
+    maxMemUsage -= maxMemUsage % METASIZE;
+    largeBuffer = ByteBuffer.allocate(maxMemUsage);
+    LOG.info(TezJobConfig.TEZ_ENGINE_IO_SORT_MB + " = " + sortmb);
+    // TODO: configurable setting?
+    span = new SortSpan(largeBuffer, 1024*1024, 16);
+    merger = new SpanMerger(comparator);
+    final int sortThreads = 
+            job.getInt(
+                TezJobConfig.TEZ_ENGINE_SORT_THREADS, 
+                TezJobConfig.DEFAULT_TEZ_ENGINE_SORT_THREADS);
+    sortmaster = Executors.newFixedThreadPool(sortThreads);
+
+    // k/v serialization    
+    if(comparator instanceof HashComparator) {
+      hasher = (HashComparator)comparator;
+      LOG.info("Using the HashComparator");
+    } else {
+      hasher = null;
+    }    
+    valSerializer.open(span.out);
+    keySerializer.open(span.out);
+    minSpillsForCombine = job.getInt(TezJobConfig.TEZ_ENGINE_COMBINE_MIN_SPILLS, 3);
+  }
+
+  private int bitcount(int n) {
+    int bit = 0;
+    while(n!=0) {
+      bit++;
+      n >>= 1;
+    }
+    return bit;
+  }
+  
+  public void sort() throws IOException {
+    SortSpan newSpan = span.next();
+
+    if(newSpan == null) {
+      // sort in the same thread, do not wait for the thread pool
+      merger.add(span.sort(sorter, comparator));
+      spill();
+      int items = 1024*1024;
+      int perItem = 16;
+      if(span.length() != 0) {
+        items = span.length();
+        perItem = span.kvbuffer.limit()/items;
+        items = (largeBuffer.capacity())/(METASIZE+perItem);
+        if(items > 1024*1024) {
+            // our goal is to have 1M splits and sort early
+            items = 1024*1024;
+        }
+      }      
+      span = new SortSpan(largeBuffer, items, perItem);
+    } else {
+      // queue up the sort
+      SortTask task = new SortTask(span, sorter, comparator);
+      Future<SpanIterator> future = sortmaster.submit(task);
+      merger.add(future);
+      span = newSpan;
+    }
+    valSerializer.open(span.out);
+    keySerializer.open(span.out);
+  }
+
+  public void write(Object key, Object value) 
+      throws IOException, InterruptedException {
+    collect(
+        key, value, partitioner.getPartition(key, value, partitions));
+  }
+
+  /**
+   * Serialize the key, value to intermediate storage.
+   * When this method returns, kvindex must refer to sufficient unused
+   * storage to store one METADATA.
+   */
+  synchronized void collect(Object key, Object value, final int partition
+                                   ) throws IOException {
+    task.getTaskReporter().progress();
+    if (key.getClass() != keyClass) {
+      throw new IOException("Type mismatch in key from map: expected "
+                            + keyClass.getName() + ", received "
+                            + key.getClass().getName());
+    }
+    if (value.getClass() != valClass) {
+      throw new IOException("Type mismatch in value from map: expected "
+                            + valClass.getName() + ", received "
+                            + value.getClass().getName());
+    }
+    if (partition < 0 || partition >= partitions) {
+      throw new IOException("Illegal partition for " + key + " (" +
+          partition + ")");
+    }
+    if(span.kvmeta.remaining() < METASIZE) {
+      this.sort();
+    }
+    int keystart = span.kvbuffer.position();
+    int valstart = -1;
+    int valend = -1;
+    try { 
+      keySerializer.serialize(key);
+      valstart = span.kvbuffer.position();      
+      valSerializer.serialize(value);
+      valend = span.kvbuffer.position();
+    } catch(BufferOverflowException overflow) {
+      // restore limit
+      span.kvbuffer.position(keystart);
+      this.sort();
+      // try again
+      this.collect(key, value, partition);
+      return;
+    }
+
+    int prefix = 0;
+
+    if(hasher != null) {
+      prefix = hasher.getHashCode(key);
+    }
+
+    prefix = (partition << (32 - partitionBits)) | (prefix >>> partitionBits);
+
+    /* maintain order as in PARTITION, KEYSTART, VALSTART, VALLEN */
+    span.kvmeta.put(prefix);
+    span.kvmeta.put(keystart);
+    span.kvmeta.put(valstart);
+    span.kvmeta.put(valend - valstart);
+    if((valstart - keystart) > span.keymax) {
+      span.keymax = (valstart - keystart);
+    }
+    if((valend - valstart) > span.valmax) {
+      span.valmax = (valend - valstart);
+    }
+    mapOutputRecordCounter.increment(1);
+    mapOutputByteCounter.increment(valend - keystart);
+    task.getTaskReporter().progress();
+  }
+
+  public void spill() throws IOException { 
+    // create spill file
+    final long size = largeBuffer.capacity() + 
+      (partitions * APPROX_HEADER_LENGTH);
+    final TezSpillRecord spillRec = new TezSpillRecord(partitions);
+    final Path filename =
+      mapOutputFile.getSpillFileForWrite(numSpills, size);    
+    FSDataOutputStream out = rfs.create(filename, true, 4096);
+
+    try {
+      merger.ready(); // wait for all the future results from sort threads
+      LOG.info("Spilling to " + filename.toString());
+      for (int i = 0; i < partitions; ++i) {
+        TezRawKeyValueIterator kvIter = merger.filter(i);
+        //write merged output to disk
+        long segmentStart = out.getPos();
+        Writer writer =
+          new Writer(job, out, keyClass, valClass, codec,
+              spilledRecordsCounter);
+        writer.setRLE(merger.needsRLE());
+        if (combineProcessor == null) {
+          while(kvIter.next()) {
+            writer.append(kvIter.getKey(), kvIter.getValue());
+          }
+        } else {          
+          runCombineProcessor(kvIter, writer);
+        }
+        //close
+        writer.close();
+
+        // record offsets
+        final TezIndexRecord rec = 
+            new TezIndexRecord(
+                segmentStart, 
+                writer.getRawLength(), 
+                writer.getCompressedLength());
+        spillRec.putIndex(rec, i);
+      }
+
+      Path indexFilename =
+        mapOutputFile.getSpillIndexFileForWrite(numSpills, partitions
+            * MAP_OUTPUT_INDEX_RECORD_LENGTH);
+      // TODO: cache
+      spillRec.writeToFile(indexFilename, job);
+      ++numSpills;
+    } catch(InterruptedException ie) {
+      // TODO:the combiner has been interrupted
+    } finally {
+      out.close();
+    }
+  }
+
+  @Override
+  public void flush() throws IOException, InterruptedException {
+    final TezTaskAttemptID mapId = task.getTaskAttemptId();
+    Path finalOutputFile =
+        mapOutputFile.getOutputFileForWrite(0); //TODO
+    Path finalIndexFile =
+        mapOutputFile.getOutputIndexFileForWrite(0); //TODO
+
+    LOG.info("Starting flush of map output");
+    span.end();
+    merger.add(span.sort(sorter, comparator));
+    spill();
+    sortmaster.shutdown();
+
+    largeBuffer = null;
+
+    if(numSpills == 1) {
+      // someday be able to pass this directly to shuffle
+      // without writing to disk
+      final Path filename =
+          mapOutputFile.getSpillFile(0);
+      Path indexFilename =
+              mapOutputFile.getSpillIndexFile(0);
+      sameVolRename(filename, finalOutputFile);
+      sameVolRename(indexFilename, finalIndexFile);
+      return;
+    }
+    
+    //The output stream for the final single output file
+    FSDataOutputStream finalOut = rfs.create(finalOutputFile, true, 4096);
+    
+    sortPhase.addPhases(partitions); // Divide sort phase into sub-phases
+    TezMerger.considerFinalMergeForProgress();
+
+    final TezSpillRecord spillRec = new TezSpillRecord(partitions);
+    final ArrayList<TezSpillRecord> indexCacheList = new ArrayList<TezSpillRecord>();
+
+    for(int i = 0; i < numSpills; i++) {
+      // TODO: build this cache before
+      Path indexFilename = mapOutputFile.getSpillIndexFile(i);
+      TezSpillRecord spillIndex = new TezSpillRecord(indexFilename, job);
+      indexCacheList.add(spillIndex);
+    }
+    
+    for (int parts = 0; parts < partitions; parts++) {
+      //create the segments to be merged
+      List<Segment> segmentList =
+          new ArrayList<Segment>(numSpills);
+      for(int i = 0; i < numSpills; i++) {
+        Path spillFilename = mapOutputFile.getSpillFile(i);
+        TezIndexRecord indexRecord = indexCacheList.get(i).getIndex(parts);
+
+        Segment s =
+            new Segment(job, rfs, spillFilename, indexRecord.getStartOffset(),
+                             indexRecord.getPartLength(), codec, true);
+        segmentList.add(i, s);
+      }
+
+      int mergeFactor = 
+              job.getInt(TezJobConfig.TEZ_ENGINE_IO_SORT_FACTOR, 
+                  TezJobConfig.DEFAULT_TEZ_ENGINE_IO_SORT_FACTOR);
+      // sort the segments only if there are intermediate merges
+      boolean sortSegments = segmentList.size() > mergeFactor;
+      //merge
+      @SuppressWarnings("unchecked")
+      TezRawKeyValueIterator kvIter = TezMerger.merge(job, rfs,
+                     keyClass, valClass, codec,
+                     segmentList, mergeFactor,
+                     new Path(mapId.toString()),
+                     (RawComparator)ConfigUtils.getOutputKeyComparator(job), 
+                     task.getTaskReporter(), sortSegments,
+                     null, spilledRecordsCounter, sortPhase.phase());
+
+      //write merged output to disk
+      long segmentStart = finalOut.getPos();
+      Writer writer =
+          new Writer(job, finalOut, keyClass, valClass, codec,
+                           spilledRecordsCounter);
+      writer.setRLE(merger.needsRLE());
+      if (combineProcessor == null || numSpills < minSpillsForCombine) {
+        TezMerger.writeFile(kvIter, writer, task.getTaskReporter(), job);
+      } else {
+    	runCombineProcessor(kvIter, writer);
+      }
+
+      //close
+      writer.close();
+
+      sortPhase.startNextPhase();
+      
+      // record offsets
+      final TezIndexRecord rec = 
+          new TezIndexRecord(
+              segmentStart, 
+              writer.getRawLength(), 
+              writer.getCompressedLength());
+      spillRec.putIndex(rec, parts);
+    }
+
+    spillRec.writeToFile(finalIndexFile, job);
+    finalOut.close();
+    for(int i = 0; i < numSpills; i++) {
+      Path indexFilename = mapOutputFile.getSpillIndexFile(i);
+      Path spillFilename = mapOutputFile.getSpillFile(i);
+      rfs.delete(indexFilename,true);
+      rfs.delete(spillFilename,true);
+    }
+  }
+
+  public void close() { }
+
+  private interface PartitionedRawKeyValueIterator extends TezRawKeyValueIterator {
+    int getPartition();
+  }
+
+  private class BufferStreamWrapper extends OutputStream 
+  {
+    private final ByteBuffer out;
+    public BufferStreamWrapper(ByteBuffer out) {
+      this.out = out;
+    }
+    
+    @Override
+    public void write(int b) throws IOException { out.put((byte)b); }
+    @Override
+    public void write(byte[] b) throws IOException { out.put(b); }
+    @Override
+    public void write(byte[] b, int off, int len) throws IOException { out.put(b, off, len); }
+  }
+
+  protected class InputByteBuffer extends DataInputBuffer {
+    private byte[] buffer = new byte[256]; 
+    private ByteBuffer wrapped = ByteBuffer.wrap(buffer);
+    private void resize(int length) {
+      if(length > buffer.length) {
+        buffer = new byte[length];
+        wrapped = ByteBuffer.wrap(buffer);
+      }
+      wrapped.limit(length);
+    }
+    public void reset(ByteBuffer b, int start, int length) {
+      resize(length);
+      b.position(start);
+      b.get(buffer, 0, length);
+      super.reset(buffer, 0, length);
+    }
+    // clone-ish function
+    public void reset(DataInputBuffer clone) {
+      byte[] data = clone.getData();
+      int start = clone.getPosition();
+      int length = clone.getLength();
+      resize(length);
+      System.arraycopy(data, start, buffer, 0, length);
+      super.reset(buffer, 0, length);
+    }
+  }
+
+  private class SortSpan  implements IndexedSortable {
+    final IntBuffer kvmeta;
+    final ByteBuffer kvbuffer;
+    final DataOutputStream out;    
+    private RawComparator comparator; 
+    final int imeta[] = new int[NMETA];
+    final int jmeta[] = new int[NMETA];
+    int keymax = 1;
+    int valmax = 1;
+    private int i,j;
+    private byte[] ki;
+    private byte[] kj;
+    private int index = 0;
+    private InputByteBuffer hay = new InputByteBuffer();
+    private long eq = 0;
+
+    public SortSpan(ByteBuffer source, int maxItems, int perItem) {
+      int capacity = source.remaining(); 
+      int metasize = METASIZE*maxItems;
+      int dataSize = maxItems * perItem;
+      if(capacity < (metasize+dataSize)) {
+        // try to allocate less meta space, because we have sample data
+        metasize = METASIZE*(capacity/(perItem+METASIZE));
+      }
+      ByteBuffer reserved = source.duplicate();
+      reserved.mark();
+      LOG.info("reserved.remaining() = "+reserved.remaining());
+      LOG.info("reserved.size = "+metasize);
+      reserved.position(metasize);
+      kvbuffer = reserved.slice();
+      reserved.flip();
+      reserved.limit(metasize);
+      kvmeta = reserved
+                .slice()
+                .order(ByteOrder.nativeOrder())
+               .asIntBuffer();
+      out = new DataOutputStream(
+              new BufferStreamWrapper(kvbuffer));
+    }
+
+    public SpanIterator sort(IndexedSorter sorter, RawComparator comparator) {
+    	this.comparator = comparator;
+      ki = new byte[keymax];
+      kj = new byte[keymax];
+      LOG.info("begin sorting Span"+index + " ("+length()+")");
+      if(length() > 1) {
+        sorter.sort(this, 0, length(), task.getTaskReporter());
+      }
+      LOG.info("done sorting Span"+index);
+      return new SpanIterator(this);
+    }
+
+    int offsetFor(int i) {
+      return (i * NMETA);
+    }
+
+    public void swap(final int mi, final int mj) {
+      final int kvi = offsetFor(mi);
+      final int kvj = offsetFor(mj);
+
+      kvmeta.position(kvi); kvmeta.get(imeta);
+      kvmeta.position(kvj); kvmeta.get(jmeta);
+      kvmeta.position(kvj); kvmeta.put(imeta);
+      kvmeta.position(kvi); kvmeta.put(jmeta);
+
+      if(i == mi || j == mj) i = -1;
+      if(i == mi || j == mj) j = -1;
+    }
+
+    public int compare(final int mi, final int mj) {
+      final int kvi = offsetFor(mi);
+      final int kvj = offsetFor(mj);
+      final int kvip = kvmeta.get(kvi + PARTITION);
+      final int kvjp = kvmeta.get(kvj + PARTITION);
+      // sort by partition      
+      if (kvip != kvjp) {
+        return kvip - kvjp;
+      }
+      
+      final int istart = kvmeta.get(kvi + KEYSTART);
+      final int jstart = kvmeta.get(kvj + KEYSTART);
+      final int ilen   = kvmeta.get(kvi + VALSTART) - istart;
+      final int jlen   = kvmeta.get(kvj + VALSTART) - jstart;
+
+      kvbuffer.position(istart);
+      kvbuffer.get(ki, 0, ilen);
+      kvbuffer.position(jstart);
+      kvbuffer.get(kj, 0, jlen);
+      // sort by key
+      final int cmp = comparator.compare(ki, 0, ilen, kj, 0, jlen);
+      if(cmp == 0) eq++;
+      return cmp;
+    }
+
+    public SortSpan next() {
+      ByteBuffer remaining = end();
+      if(remaining != null) {
+        int items = length();
+        int perItem = kvbuffer.position()/items;
+        SortSpan newSpan = new SortSpan(remaining, items, perItem);
+        newSpan.index = index+1;
+        return newSpan;
+      }
+      return null;
+    }
+
+    public int length() {
+      return kvmeta.limit()/NMETA;
+    }
+
+    public ByteBuffer end() {
+      ByteBuffer remaining = kvbuffer.duplicate();
+      remaining.position(kvbuffer.position());
+      remaining = remaining.slice();
+      kvbuffer.limit(kvbuffer.position());
+      kvmeta.limit(kvmeta.position());
+      int items = length();
+      if(items == 0) {
+        return null;
+      }
+      int perItem = kvbuffer.position()/items;
+      LOG.info(String.format("Span%d.length = %d, perItem = %d", index, length(), perItem));
+      if(remaining.remaining() < NMETA+perItem) {
+        return null;
+      }
+      return remaining;
+    }
+
+    private int compareInternal(DataInputBuffer needle, int needlePart, int index) {
+      int cmp = 0;
+      int keystart;
+      int valstart;
+      int partition;
+      partition = kvmeta.get(span.offsetFor(index) + PARTITION);
+      if(partition != needlePart) {
+          cmp = (partition-needlePart);
+      } else {
+        keystart = kvmeta.get(span.offsetFor(index) + KEYSTART);
+        valstart = kvmeta.get(span.offsetFor(index) + VALSTART);
+        // hay is allocated ahead of time
+        hay.reset(kvbuffer, keystart, valstart - keystart);
+        cmp = comparator.compare(hay.getData(), 
+            hay.getPosition(), hay.getLength(),
+            needle.getData(), 
+            needle.getPosition(), needle.getLength());
+      }
+      return cmp;
+    }
+    
+    public long getEq() {
+      return eq;
+    }
+    
+    @Override
+    public String toString() {
+        return String.format("Span[%d,%d]", NMETA*kvmeta.capacity(), kvbuffer.limit());
+    }
+  }
+
+  private class SpanIterator implements PartitionedRawKeyValueIterator, Comparable<SpanIterator> {
+    private int kvindex = -1;
+    private int maxindex;
+    private IntBuffer kvmeta;
+    private ByteBuffer kvbuffer;
+    private SortSpan span;
+    private InputByteBuffer key = new InputByteBuffer();
+    private InputByteBuffer value = new InputByteBuffer();
+    private Progress progress = new Progress();
+
+    private final int minrun = (1 << 4);
+
+    public SpanIterator(SortSpan span) {
+      this.kvmeta = span.kvmeta;
+      this.kvbuffer = span.kvbuffer;
+      this.span = span;
+      this.maxindex = (kvmeta.limit()/NMETA) - 1;
+    }
+
+    public DataInputBuffer getKey() throws IOException {
+      final int keystart = kvmeta.get(span.offsetFor(kvindex) + KEYSTART);
+      final int valstart = kvmeta.get(span.offsetFor(kvindex) + VALSTART);
+      key.reset(kvbuffer, keystart, valstart - keystart);
+      return key;
+    }
+
+    public DataInputBuffer getValue() throws IOException {
+      final int valstart = kvmeta.get(span.offsetFor(kvindex) + VALSTART);
+      final int vallen = kvmeta.get(span.offsetFor(kvindex) + VALLEN);
+      value.reset(kvbuffer, valstart, vallen);
+      return value;
+    }
+
+    public boolean next() throws IOException {
+      // caveat: since we use this as a comparable in the merger 
+      if(kvindex == maxindex) return false;
+      if(kvindex % 100 == 0) {
+          progress.set((kvindex-maxindex) / maxindex);
+      }
+      kvindex += 1;
+      return true;
+    }
+
+    public void close() throws IOException {
+    }
+
+    public Progress getProgress() { 
+      return progress;
+    }
+
+    public int getPartition() {
+      final int partition = kvmeta.get(span.offsetFor(kvindex) + PARTITION);
+      return partition;
+    }
+
+    public int size() {
+      return (maxindex - kvindex);
+    }
+
+    public int compareTo(SpanIterator other) {
+      try {
+        return span.compareInternal(other.getKey(), other.getPartition(), kvindex);
+      } catch(IOException ie) {
+        // since we're not reading off disk, how could getKey() throw exceptions?
+      }
+      return -1;
+    }
+    
+    @Override
+    public String toString() {
+        return String.format("SpanIterator<%d:%d> (span=%s)", kvindex, maxindex, span.toString());
+    }
+
+    /**
+     * bisect returns the next insertion point for a given raw key, skipping keys
+     * which are <= needle using a binary search instead of a linear comparison.
+     * This is massively efficient when long strings of identical keys occur.
+     * @param needle 
+     * @param needlePart
+     * @return
+     */
+    int bisect(DataInputBuffer needle, int needlePart) {
+      int start = kvindex;
+      int end = maxindex-1;
+      int mid = start;
+      int cmp = 0;
+
+      if(end - start < minrun) {
+        return 0;
+      }
+
+      if(span.compareInternal(needle, needlePart, start) > 0) {
+        return kvindex;
+      }
+      
+      // bail out early if we haven't got a min run 
+      if(span.compareInternal(needle, needlePart, start+minrun) > 0) {
+        return 0;
+      }
+
+      if(span.compareInternal(needle, needlePart, end) < 0) {
+        return end - kvindex;
+      }
+      
+      boolean found = false;
+      
+      // we sort 100k items, the max it can do is 20 loops, but break early
+      for(int i = 0; start < end && i < 16; i++) {
+        mid = start + (end - start)/2;
+        cmp = span.compareInternal(needle, needlePart, mid);
+        if(cmp == 0) {
+          start = mid;
+          found = true;
+        } else if(cmp < 0) {
+          start = mid; 
+          found = true;
+        }
+        if(cmp > 0) {
+          end = mid;
+        }
+      }
+
+      if(found) {
+        return start - kvindex;
+      }
+      return 0;
+    }
+  }
+
+  private class SortTask implements Callable<SpanIterator> {
+    private final SortSpan sortable;
+    private final IndexedSorter sorter;
+    private final RawComparator comparator;
+    
+    public SortTask(SortSpan sortable, 
+              IndexedSorter sorter, RawComparator comparator) {
+        this.sortable = sortable;
+        this.sorter = sorter;
+        this.comparator = comparator;
+    }
+
+    public SpanIterator call() {
+      return sortable.sort(sorter, comparator);
+    }
+  }
+
+  private class PartitionFilter implements TezRawKeyValueIterator {
+    private final PartitionedRawKeyValueIterator iter;
+    private int partition;
+    private boolean dirty = false;
+    public PartitionFilter(PartitionedRawKeyValueIterator iter) {
+      this.iter = iter;
+    }
+    public DataInputBuffer getKey() throws IOException { return iter.getKey(); }
+    public DataInputBuffer getValue() throws IOException { return iter.getValue(); }
+    public void close() throws IOException { }
+    public Progress getProgress() {
+      return new Progress();
+    }
+    public boolean next() throws IOException {
+      if(dirty || iter.next()) { 
+        int prefix = iter.getPartition();
+
+        if((prefix >>> (32 - partitionBits)) == partition) {
+          dirty = false; // we found what we were looking for, good
+          return true;
+        } else if(!dirty) {
+          dirty = true; // we did a lookahead and failed to find partition
+        }
+      }
+      return false;
+    }
+
+    public void reset(int partition) {
+      this.partition = partition;
+    }
+
+    public int getPartition() {
+      return this.partition;
+    }
+  }
+
+  private class SpanHeap extends java.util.PriorityQueue<SpanIterator> {
+    public SpanHeap() {
+      super(256);
+    }
+    /**
+     * {@link PriorityQueue}.poll() by a different name 
+     * @return
+     */
+    public SpanIterator pop() {
+      return this.poll();
+    }
+  }
+
+  private class SpanMerger implements PartitionedRawKeyValueIterator {
+    private final RawComparator comparator;
+    InputByteBuffer key = new InputByteBuffer();
+    InputByteBuffer value = new InputByteBuffer();
+    int partition;
+
+    private ArrayList< Future<SpanIterator>> futures = new ArrayList< Future<SpanIterator>>();
+
+    private SpanHeap heap = new SpanHeap();
+    private PartitionFilter partIter;
+
+    private int gallop = 0;
+    private SpanIterator horse;
+    private long total = 0;
+    private long count = 0;
+    private long eq = 0;
+    
+    public SpanMerger(RawComparator comparator) {
+      this.comparator = comparator;
+      partIter = new PartitionFilter(this);
+    }
+
+    public void add(SpanIterator iter) throws IOException{
+      if(iter.next()) {
+        heap.add(iter);
+      }
+    }
+
+    public void add(Future<SpanIterator> iter) throws IOException{
+      this.futures.add(iter);
+    }
+
+    public boolean ready() throws IOException, InterruptedException {
+      try {
+        SpanIterator iter = null;
+        while(this.futures.size() > 0) {
+          Future<SpanIterator> futureIter = this.futures.remove(0);
+          iter = futureIter.get();
+          this.add(iter);
+        }
+        
+        StringBuilder sb = new StringBuilder();
+        for(SpanIterator sp: heap) {
+            sb.append(sp.toString());
+            sb.append(",");
+            total += sp.span.length();
+            eq += sp.span.getEq();
+        }
+        LOG.info("Heap = " + sb.toString());
+        return true;
+      } catch(Exception e) {
+        LOG.info(e.toString());
+        return false;
+      }
+    }
+
+    private SpanIterator pop() throws IOException {
+      if(gallop > 0) {
+        gallop--;
+        return horse;
+      }
+      SpanIterator current = heap.pop();
+      SpanIterator next = heap.peek();
+      if(next != null && current != null &&
+        ((Object)horse) == ((Object)current)) {
+        // TODO: a better threshold check
+        gallop = current.bisect(next.getKey(), next.getPartition())-1;
+      }
+      horse = current;
+      return current;
+    }
+    
+    public boolean needsRLE() {
+      return (eq > 0.1 * total);
+    }
+    
+    private SpanIterator peek() throws IOException {
+    	if(gallop > 0) {
+            return horse;
+        }
+    	return heap.peek();
+    }
+
+    public boolean next() throws IOException {
+      SpanIterator current = pop();
+
+      if(current != null) {
+        // keep local copies, since add() will move it all out
+        key.reset(current.getKey());
+        value.reset(current.getValue());
+        partition = current.getPartition();
+        if(gallop <= 0) {
+          this.add(current);
+        } else {
+          // galloping
+          current.next();
+        }
+        return true;
+      }
+      return false;
+    }
+
+    public DataInputBuffer getKey() throws IOException { return key; }
+    public DataInputBuffer getValue() throws IOException { return value; }
+    public int getPartition() { return partition; }
+
+    public void close() throws IOException {
+    }
+
+    public Progress getProgress() {
+      // TODO
+      return new Progress();
+    }
+
+    public TezRawKeyValueIterator filter(int partition) {
+      partIter.reset(partition);
+      return partIter;
+    }
+
+  }
+
+  @Override
+  public OutputContext getOutputContext() {
+    return null;
+  }
+}

Added: incubator/tez/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/TezIndexRecord.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/TezIndexRecord.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/TezIndexRecord.java (added)
+++ incubator/tez/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/TezIndexRecord.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,45 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.engine.common.sort.impl;
+
+public class TezIndexRecord {
+  private long startOffset;
+  private long rawLength;
+  private long partLength;
+
+  public TezIndexRecord() { }
+
+  public TezIndexRecord(long startOffset, long rawLength, long partLength) {
+    this.startOffset = startOffset;
+    this.rawLength = rawLength;
+    this.partLength = partLength;
+  }
+
+  public long getStartOffset() {
+    return startOffset;
+  }
+
+  public long getRawLength() {
+    return rawLength;
+  }
+
+  public long getPartLength() {
+    return partLength;
+  }
+}

Added: incubator/tez/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/TezMerger.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/TezMerger.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/TezMerger.java (added)
+++ incubator/tez/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/TezMerger.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,797 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.engine.common.sort.impl;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.ChecksumFileSystem;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalDirAllocator;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.util.PriorityQueue;
+import org.apache.hadoop.util.Progress;
+import org.apache.hadoop.util.Progressable;
+import org.apache.tez.common.Constants;
+import org.apache.tez.common.TezJobConfig;
+import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.engine.common.sort.impl.IFile.Reader;
+import org.apache.tez.engine.common.sort.impl.IFile.Writer;
+
+/**
+ * Merger is an utility class used by the Map and Reduce tasks for merging
+ * both their memory and disk segments
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+@SuppressWarnings({"unchecked", "rawtypes"})
+public class TezMerger {  
+  private static final Log LOG = LogFactory.getLog(TezMerger.class);
+
+  // Local directories
+  private static LocalDirAllocator lDirAlloc = 
+    new LocalDirAllocator(TezJobConfig.LOCAL_DIR);
+
+  public static
+  TezRawKeyValueIterator merge(Configuration conf, FileSystem fs,
+                            Class keyClass, Class valueClass, 
+                            CompressionCodec codec,
+                            Path[] inputs, boolean deleteInputs, 
+                            int mergeFactor, Path tmpDir,
+                            RawComparator comparator, Progressable reporter,
+                            TezCounter readsCounter,
+                            TezCounter writesCounter,
+                            Progress mergePhase)
+  throws IOException {
+    return 
+      new MergeQueue(conf, fs, inputs, deleteInputs, codec, comparator, 
+                           reporter, null).merge(keyClass, valueClass,
+                                           mergeFactor, tmpDir,
+                                           readsCounter, writesCounter, 
+                                           mergePhase);
+  }
+
+  public static 
+  TezRawKeyValueIterator merge(Configuration conf, FileSystem fs,
+                            Class keyClass, Class valueClass, 
+                            CompressionCodec codec,
+                            Path[] inputs, boolean deleteInputs, 
+                            int mergeFactor, Path tmpDir,
+                            RawComparator comparator,
+                            Progressable reporter,
+                            TezCounter readsCounter,
+                            TezCounter writesCounter,
+                            TezCounter mergedMapOutputsCounter,
+                            Progress mergePhase)
+  throws IOException {
+    return 
+      new MergeQueue(conf, fs, inputs, deleteInputs, codec, comparator, 
+                           reporter, mergedMapOutputsCounter).merge(
+                                           keyClass, valueClass,
+                                           mergeFactor, tmpDir,
+                                           readsCounter, writesCounter,
+                                           mergePhase);
+  }
+  
+  public static
+  TezRawKeyValueIterator merge(Configuration conf, FileSystem fs, 
+                            Class keyClass, Class valueClass, 
+                            List<Segment> segments, 
+                            int mergeFactor, Path tmpDir,
+                            RawComparator comparator, Progressable reporter,
+                            TezCounter readsCounter,
+                            TezCounter writesCounter,
+                            Progress mergePhase)
+      throws IOException {
+    return merge(conf, fs, keyClass, valueClass, segments, mergeFactor, tmpDir,
+                 comparator, reporter, false, readsCounter, writesCounter,
+                 mergePhase);
+  }
+
+  public static <K extends Object, V extends Object>
+  TezRawKeyValueIterator merge(Configuration conf, FileSystem fs,
+                            Class keyClass, Class valueClass,
+                            List<Segment> segments,
+                            int mergeFactor, Path tmpDir,
+                            RawComparator comparator, Progressable reporter,
+                            boolean sortSegments,
+                            TezCounter readsCounter,
+                            TezCounter writesCounter,
+                            Progress mergePhase)
+      throws IOException {
+    return new MergeQueue(conf, fs, segments, comparator, reporter,
+                           sortSegments).merge(keyClass, valueClass,
+                                               mergeFactor, tmpDir,
+                                               readsCounter, writesCounter,
+                                               mergePhase);
+  }
+
+  public static <K extends Object, V extends Object>
+  TezRawKeyValueIterator merge(Configuration conf, FileSystem fs,
+                            Class keyClass, Class valueClass,
+                            CompressionCodec codec,
+                            List<Segment> segments,
+                            int mergeFactor, Path tmpDir,
+                            RawComparator comparator, Progressable reporter,
+                            boolean sortSegments,
+                            TezCounter readsCounter,
+                            TezCounter writesCounter,
+                            Progress mergePhase)
+      throws IOException {
+    return new MergeQueue(conf, fs, segments, comparator, reporter,
+                           sortSegments, codec).merge(keyClass, valueClass,
+                                               mergeFactor, tmpDir,
+                                               readsCounter, writesCounter,
+                                               mergePhase);
+  }
+
+  public static <K extends Object, V extends Object>
+    TezRawKeyValueIterator merge(Configuration conf, FileSystem fs,
+                            Class keyClass, Class valueClass,
+                            List<Segment> segments,
+                            int mergeFactor, int inMemSegments, Path tmpDir,
+                            RawComparator comparator, Progressable reporter,
+                            boolean sortSegments,
+                            TezCounter readsCounter,
+                            TezCounter writesCounter,
+                            Progress mergePhase)
+      throws IOException {
+    return new MergeQueue(conf, fs, segments, comparator, reporter,
+                           sortSegments).merge(keyClass, valueClass,
+                                               mergeFactor, inMemSegments,
+                                               tmpDir,
+                                               readsCounter, writesCounter,
+                                               mergePhase);
+  }
+
+
+  static <K extends Object, V extends Object>
+  TezRawKeyValueIterator merge(Configuration conf, FileSystem fs,
+                          Class keyClass, Class valueClass,
+                          CompressionCodec codec,
+                          List<Segment> segments,
+                          int mergeFactor, int inMemSegments, Path tmpDir,
+                          RawComparator comparator, Progressable reporter,
+                          boolean sortSegments,
+                          TezCounter readsCounter,
+                          TezCounter writesCounter,
+                          Progress mergePhase)
+    throws IOException {
+  return new MergeQueue(conf, fs, segments, comparator, reporter,
+                         sortSegments, codec).merge(keyClass, valueClass,
+                                             mergeFactor, inMemSegments,
+                                             tmpDir,
+                                             readsCounter, writesCounter,
+                                             mergePhase);
+}
+
+  public static <K extends Object, V extends Object>
+  void writeFile(TezRawKeyValueIterator records, Writer writer, 
+                 Progressable progressable, Configuration conf) 
+  throws IOException {
+    long progressBar = 
+        conf.getLong(TezJobConfig.RECORDS_BEFORE_PROGRESS, 
+            TezJobConfig.DEFAULT_RECORDS_BEFORE_PROGRESS);
+    long recordCtr = 0;
+    while(records.next()) {
+      writer.append(records.getKey(), records.getValue());
+      
+      if (((recordCtr++) % progressBar) == 0) {
+        progressable.progress();
+      }
+    }
+}
+
+  @InterfaceAudience.Private
+  @InterfaceStability.Unstable
+  public static class Segment<K extends Object, V extends Object> {
+    Reader reader = null;
+    final DataInputBuffer key = new DataInputBuffer();
+    
+    Configuration conf = null;
+    FileSystem fs = null;
+    Path file = null;
+    boolean preserve = false;
+    CompressionCodec codec = null;
+    long segmentOffset = 0;
+    long segmentLength = -1;
+    
+    TezCounter mapOutputsCounter = null;
+
+    public Segment(Configuration conf, FileSystem fs, Path file,
+                   CompressionCodec codec, boolean preserve)
+    throws IOException {
+      this(conf, fs, file, codec, preserve, null);
+    }
+
+    public Segment(Configuration conf, FileSystem fs, Path file,
+                   CompressionCodec codec, boolean preserve,
+                   TezCounter mergedMapOutputsCounter)
+  throws IOException {
+      this(conf, fs, file, 0, fs.getFileStatus(file).getLen(), codec, preserve, 
+           mergedMapOutputsCounter);
+    }
+
+    public Segment(Configuration conf, FileSystem fs, Path file,
+                   long segmentOffset, long segmentLength,
+                   CompressionCodec codec,
+                   boolean preserve) throws IOException {
+      this(conf, fs, file, segmentOffset, segmentLength, codec, preserve, null);
+    }
+
+    public Segment(Configuration conf, FileSystem fs, Path file,
+        long segmentOffset, long segmentLength, CompressionCodec codec,
+        boolean preserve, TezCounter mergedMapOutputsCounter)
+    throws IOException {
+      this.conf = conf;
+      this.fs = fs;
+      this.file = file;
+      this.codec = codec;
+      this.preserve = preserve;
+
+      this.segmentOffset = segmentOffset;
+      this.segmentLength = segmentLength;
+      
+      this.mapOutputsCounter = mergedMapOutputsCounter;
+    }
+    
+    public Segment(Reader reader, boolean preserve) {
+      this(reader, preserve, null);
+    }
+    
+    public Segment(Reader reader, boolean preserve, 
+                   TezCounter mapOutputsCounter) {
+      this.reader = reader;
+      this.preserve = preserve;
+      
+      this.segmentLength = reader.getLength();
+      
+      this.mapOutputsCounter = mapOutputsCounter;
+    }
+
+    void init(TezCounter readsCounter) throws IOException {
+      if (reader == null) {
+        FSDataInputStream in = fs.open(file);
+        in.seek(segmentOffset);
+        reader = new Reader(conf, in, segmentLength, codec, readsCounter);
+      }
+      
+      if (mapOutputsCounter != null) {
+        mapOutputsCounter.increment(1);
+      }
+    }
+    
+    boolean inMemory() {
+      return fs == null;
+    }
+    
+    DataInputBuffer getKey() { return key; }
+
+    DataInputBuffer getValue(DataInputBuffer value) throws IOException {
+      nextRawValue(value);
+      return value;
+    }
+
+    public long getLength() { 
+      return (reader == null) ?
+        segmentLength : reader.getLength();
+    }
+    
+    boolean nextRawKey() throws IOException {
+      return reader.nextRawKey(key);
+    }
+
+    void nextRawValue(DataInputBuffer value) throws IOException {
+      reader.nextRawValue(value);
+    }
+
+    void closeReader() throws IOException {
+      if (reader != null) {
+        reader.close();
+        reader = null;
+      }
+    }
+    
+    void close() throws IOException {
+      closeReader();
+      if (!preserve && fs != null) {
+        fs.delete(file, false);
+      }
+    }
+
+    public long getPosition() throws IOException {
+      return reader.getPosition();
+    }
+
+    // This method is used by BackupStore to extract the 
+    // absolute position after a reset
+    long getActualPosition() throws IOException {
+      return segmentOffset + reader.getPosition();
+    }
+
+    Reader getReader() {
+      return reader;
+    }
+    
+    // This method is used by BackupStore to reinitialize the
+    // reader to start reading from a different segment offset
+    void reinitReader(int offset) throws IOException {
+      if (!inMemory()) {
+        closeReader();
+        segmentOffset = offset;
+        segmentLength = fs.getFileStatus(file).getLen() - segmentOffset;
+        init(null);
+      }
+    }
+  }
+  
+  // Boolean variable for including/considering final merge as part of sort
+  // phase or not. This is true in map task, false in reduce task. It is
+  // used in calculating mergeProgress.
+  static boolean includeFinalMerge = false;
+  
+  /**
+   * Sets the boolean variable includeFinalMerge to true. Called from
+   * map task before calling merge() so that final merge of map task
+   * is also considered as part of sort phase.
+   */
+  public static void considerFinalMergeForProgress() {
+    includeFinalMerge = true;
+  }
+  
+  private static class MergeQueue<K extends Object, V extends Object> 
+  extends PriorityQueue<Segment> implements TezRawKeyValueIterator {
+    Configuration conf;
+    FileSystem fs;
+    CompressionCodec codec;
+    
+    List<Segment> segments = new ArrayList<Segment>();
+    
+    RawComparator comparator;
+    
+    private long totalBytesProcessed;
+    private float progPerByte;
+    private Progress mergeProgress = new Progress();
+    
+    Progressable reporter;
+    
+    DataInputBuffer key;
+    final DataInputBuffer value = new DataInputBuffer();
+    final DataInputBuffer diskIFileValue = new DataInputBuffer();
+    
+    Segment minSegment;
+    Comparator<Segment> segmentComparator =   
+      new Comparator<Segment>() {
+      public int compare(Segment o1, Segment o2) {
+        if (o1.getLength() == o2.getLength()) {
+          return 0;
+        }
+
+        return o1.getLength() < o2.getLength() ? -1 : 1;
+      }
+    };
+
+    public MergeQueue(Configuration conf, FileSystem fs, 
+                      Path[] inputs, boolean deleteInputs, 
+                      CompressionCodec codec, RawComparator comparator,
+                      Progressable reporter, 
+                      TezCounter mergedMapOutputsCounter) 
+    throws IOException {
+      this.conf = conf;
+      this.fs = fs;
+      this.codec = codec;
+      this.comparator = comparator;
+      this.reporter = reporter;
+      
+      for (Path file : inputs) {
+        LOG.debug("MergeQ: adding: " + file);
+        segments.add(new Segment(conf, fs, file, codec, !deleteInputs, 
+                                       (file.toString().endsWith(
+                                           Constants.MERGED_OUTPUT_PREFIX) ? 
+                                        null : mergedMapOutputsCounter)));
+      }
+      
+      // Sort segments on file-lengths
+      Collections.sort(segments, segmentComparator); 
+    }
+    
+    public MergeQueue(Configuration conf, FileSystem fs, 
+        List<Segment> segments, RawComparator comparator,
+        Progressable reporter, boolean sortSegments) {
+      this.conf = conf;
+      this.fs = fs;
+      this.comparator = comparator;
+      this.segments = segments;
+      this.reporter = reporter;
+      if (sortSegments) {
+        Collections.sort(segments, segmentComparator);
+      }
+    }
+
+    public MergeQueue(Configuration conf, FileSystem fs,
+        List<Segment> segments, RawComparator comparator,
+        Progressable reporter, boolean sortSegments, CompressionCodec codec) {
+      this(conf, fs, segments, comparator, reporter, sortSegments);
+      this.codec = codec;
+    }
+
+    public void close() throws IOException {
+      Segment segment;
+      while((segment = pop()) != null) {
+        segment.close();
+      }
+    }
+
+    public DataInputBuffer getKey() throws IOException {
+      return key;
+    }
+
+    public DataInputBuffer getValue() throws IOException {
+      return value;
+    }
+
+    private void adjustPriorityQueue(Segment reader) throws IOException{
+      long startPos = reader.getPosition();
+      boolean hasNext = reader.nextRawKey();
+      long endPos = reader.getPosition();
+      totalBytesProcessed += endPos - startPos;
+      mergeProgress.set(totalBytesProcessed * progPerByte);
+      if (hasNext) {
+        adjustTop();
+      } else {
+        pop();
+        reader.close();
+      }
+    }
+
+    public boolean next() throws IOException {
+      if (size() == 0)
+        return false;
+
+      if (minSegment != null) {
+        //minSegment is non-null for all invocations of next except the first
+        //one. For the first invocation, the priority queue is ready for use
+        //but for the subsequent invocations, first adjust the queue 
+        adjustPriorityQueue(minSegment);
+        if (size() == 0) {
+          minSegment = null;
+          return false;
+        }
+      }
+      minSegment = top();
+      if (!minSegment.inMemory()) {
+        //When we load the value from an inmemory segment, we reset
+        //the "value" DIB in this class to the inmem segment's byte[].
+        //When we load the value bytes from disk, we shouldn't use
+        //the same byte[] since it would corrupt the data in the inmem
+        //segment. So we maintain an explicit DIB for value bytes
+        //obtained from disk, and if the current segment is a disk
+        //segment, we reset the "value" DIB to the byte[] in that (so 
+        //we reuse the disk segment DIB whenever we consider
+        //a disk segment).
+        value.reset(diskIFileValue.getData(), diskIFileValue.getLength());
+      }
+      long startPos = minSegment.getPosition();
+      key = minSegment.getKey();
+      minSegment.getValue(value);
+      long endPos = minSegment.getPosition();
+      totalBytesProcessed += endPos - startPos;
+      mergeProgress.set(totalBytesProcessed * progPerByte);
+      return true;
+    }
+
+    protected boolean lessThan(Object a, Object b) {
+      DataInputBuffer key1 = ((Segment)a).getKey();
+      DataInputBuffer key2 = ((Segment)b).getKey();
+      int s1 = key1.getPosition();
+      int l1 = key1.getLength() - s1;
+      int s2 = key2.getPosition();
+      int l2 = key2.getLength() - s2;
+
+      return comparator.compare(key1.getData(), s1, l1, key2.getData(), s2, l2) < 0;
+    }
+    
+    public TezRawKeyValueIterator merge(Class keyClass, Class valueClass,
+                                     int factor, Path tmpDir,
+                                     TezCounter readsCounter,
+                                     TezCounter writesCounter,
+                                     Progress mergePhase)
+        throws IOException {
+      return merge(keyClass, valueClass, factor, 0, tmpDir,
+                   readsCounter, writesCounter, mergePhase);
+    }
+
+    TezRawKeyValueIterator merge(Class keyClass, Class valueClass,
+                                     int factor, int inMem, Path tmpDir,
+                                     TezCounter readsCounter,
+                                     TezCounter writesCounter,
+                                     Progress mergePhase)
+        throws IOException {
+      LOG.info("Merging " + segments.size() + " sorted segments");
+
+      /*
+       * If there are inMemory segments, then they come first in the segments
+       * list and then the sorted disk segments. Otherwise(if there are only
+       * disk segments), then they are sorted segments if there are more than
+       * factor segments in the segments list.
+       */
+      int numSegments = segments.size();
+      int origFactor = factor;
+      int passNo = 1;
+      if (mergePhase != null) {
+        mergeProgress = mergePhase;
+      }
+
+      long totalBytes = computeBytesInMerges(factor, inMem);
+      if (totalBytes != 0) {
+        progPerByte = 1.0f / (float)totalBytes;
+      }
+      
+      //create the MergeStreams from the sorted map created in the constructor
+      //and dump the final output to a file
+      do {
+        //get the factor for this pass of merge. We assume in-memory segments
+        //are the first entries in the segment list and that the pass factor
+        //doesn't apply to them
+        factor = getPassFactor(factor, passNo, numSegments - inMem);
+        if (1 == passNo) {
+          factor += inMem;
+        }
+        List<Segment> segmentsToMerge =
+          new ArrayList<Segment>();
+        int segmentsConsidered = 0;
+        int numSegmentsToConsider = factor;
+        long startBytes = 0; // starting bytes of segments of this merge
+        while (true) {
+          //extract the smallest 'factor' number of segments  
+          //Call cleanup on the empty segments (no key/value data)
+          List<Segment> mStream = 
+            getSegmentDescriptors(numSegmentsToConsider);
+          for (Segment segment : mStream) {
+            // Initialize the segment at the last possible moment;
+            // this helps in ensuring we don't use buffers until we need them
+            segment.init(readsCounter);
+            long startPos = segment.getPosition();
+            boolean hasNext = segment.nextRawKey();
+            long endPos = segment.getPosition();
+            
+            if (hasNext) {
+              startBytes += endPos - startPos;
+              segmentsToMerge.add(segment);
+              segmentsConsidered++;
+            }
+            else {
+              segment.close();
+              numSegments--; //we ignore this segment for the merge
+            }
+          }
+          //if we have the desired number of segments
+          //or looked at all available segments, we break
+          if (segmentsConsidered == factor || 
+              segments.size() == 0) {
+            break;
+          }
+            
+          numSegmentsToConsider = factor - segmentsConsidered;
+        }
+        
+        //feed the streams to the priority queue
+        initialize(segmentsToMerge.size());
+        clear();
+        for (Segment segment : segmentsToMerge) {
+          put(segment);
+        }
+        
+        //if we have lesser number of segments remaining, then just return the
+        //iterator, else do another single level merge
+        if (numSegments <= factor) {
+          if (!includeFinalMerge) { // for reduce task
+
+            // Reset totalBytesProcessed and recalculate totalBytes from the
+            // remaining segments to track the progress of the final merge.
+            // Final merge is considered as the progress of the reducePhase,
+            // the 3rd phase of reduce task.
+            totalBytesProcessed = 0;
+            totalBytes = 0;
+            for (int i = 0; i < segmentsToMerge.size(); i++) {
+              totalBytes += segmentsToMerge.get(i).getLength();
+            }
+          }
+          if (totalBytes != 0) //being paranoid
+            progPerByte = 1.0f / (float)totalBytes;
+          
+          totalBytesProcessed += startBytes;         
+          if (totalBytes != 0)
+            mergeProgress.set(totalBytesProcessed * progPerByte);
+          else
+            mergeProgress.set(1.0f); // Last pass and no segments left - we're done
+          
+          LOG.info("Down to the last merge-pass, with " + numSegments + 
+                   " segments left of total size: " +
+                   (totalBytes - totalBytesProcessed) + " bytes");
+          return this;
+        } else {
+          LOG.info("Merging " + segmentsToMerge.size() + 
+                   " intermediate segments out of a total of " + 
+                   (segments.size()+segmentsToMerge.size()));
+          
+          long bytesProcessedInPrevMerges = totalBytesProcessed;
+          totalBytesProcessed += startBytes;
+
+          //we want to spread the creation of temp files on multiple disks if 
+          //available under the space constraints
+          long approxOutputSize = 0; 
+          for (Segment s : segmentsToMerge) {
+            approxOutputSize += s.getLength() + 
+                                ChecksumFileSystem.getApproxChkSumLength(
+                                s.getLength());
+          }
+          Path tmpFilename = 
+            new Path(tmpDir, "intermediate").suffix("." + passNo);
+
+          Path outputFile =  lDirAlloc.getLocalPathForWrite(
+                                              tmpFilename.toString(),
+                                              approxOutputSize, conf);
+
+          Writer writer = 
+            new Writer(conf, fs, outputFile, keyClass, valueClass, codec,
+                             writesCounter);
+          writeFile(this, writer, reporter, conf);
+          writer.close();
+          
+          //we finished one single level merge; now clean up the priority 
+          //queue
+          this.close();
+
+          // Add the newly create segment to the list of segments to be merged
+          Segment tempSegment = 
+            new Segment(conf, fs, outputFile, codec, false);
+
+          // Insert new merged segment into the sorted list
+          int pos = Collections.binarySearch(segments, tempSegment,
+                                             segmentComparator);
+          if (pos < 0) {
+            // binary search failed. So position to be inserted at is -pos-1
+            pos = -pos-1;
+          }
+          segments.add(pos, tempSegment);
+          numSegments = segments.size();
+          
+          // Subtract the difference between expected size of new segment and 
+          // actual size of new segment(Expected size of new segment is
+          // inputBytesOfThisMerge) from totalBytes. Expected size and actual
+          // size will match(almost) if combiner is not called in merge.
+          long inputBytesOfThisMerge = totalBytesProcessed -
+                                       bytesProcessedInPrevMerges;
+          totalBytes -= inputBytesOfThisMerge - tempSegment.getLength();
+          if (totalBytes != 0) {
+            progPerByte = 1.0f / (float)totalBytes;
+          }
+          
+          passNo++;
+        }
+        //we are worried about only the first pass merge factor. So reset the 
+        //factor to what it originally was
+        factor = origFactor;
+      } while(true);
+    }
+    
+    /**
+     * Determine the number of segments to merge in a given pass. Assuming more
+     * than factor segments, the first pass should attempt to bring the total
+     * number of segments - 1 to be divisible by the factor - 1 (each pass
+     * takes X segments and produces 1) to minimize the number of merges.
+     */
+    private int getPassFactor(int factor, int passNo, int numSegments) {
+      if (passNo > 1 || numSegments <= factor || factor == 1) 
+        return factor;
+      int mod = (numSegments - 1) % (factor - 1);
+      if (mod == 0)
+        return factor;
+      return mod + 1;
+    }
+    
+    /** Return (& remove) the requested number of segment descriptors from the
+     * sorted map.
+     */
+    private List<Segment> getSegmentDescriptors(int numDescriptors) {
+      if (numDescriptors > segments.size()) {
+        List<Segment> subList = new ArrayList<Segment>(segments);
+        segments.clear();
+        return subList;
+      }
+      
+      List<Segment> subList = 
+        new ArrayList<Segment>(segments.subList(0, numDescriptors));
+      for (int i=0; i < numDescriptors; ++i) {
+        segments.remove(0);
+      }
+      return subList;
+    }
+    
+    /**
+     * Compute expected size of input bytes to merges, will be used in
+     * calculating mergeProgress. This simulates the above merge() method and
+     * tries to obtain the number of bytes that are going to be merged in all
+     * merges(assuming that there is no combiner called while merging).
+     * @param factor mapreduce.task.io.sort.factor
+     * @param inMem  number of segments in memory to be merged
+     */
+    long computeBytesInMerges(int factor, int inMem) {
+      int numSegments = segments.size();
+      List<Long> segmentSizes = new ArrayList<Long>(numSegments);
+      long totalBytes = 0;
+      int n = numSegments - inMem;
+      // factor for 1st pass
+      int f = getPassFactor(factor, 1, n) + inMem;
+      n = numSegments;
+ 
+      for (int i = 0; i < numSegments; i++) {
+        // Not handling empty segments here assuming that it would not affect
+        // much in calculation of mergeProgress.
+        segmentSizes.add(segments.get(i).getLength());
+      }
+      
+      // If includeFinalMerge is true, allow the following while loop iterate
+      // for 1 more iteration. This is to include final merge as part of the
+      // computation of expected input bytes of merges
+      boolean considerFinalMerge = includeFinalMerge;
+      
+      while (n > f || considerFinalMerge) {
+        if (n <=f ) {
+          considerFinalMerge = false;
+        }
+        long mergedSize = 0;
+        f = Math.min(f, segmentSizes.size());
+        for (int j = 0; j < f; j++) {
+          mergedSize += segmentSizes.remove(0);
+        }
+        totalBytes += mergedSize;
+        
+        // insert new size into the sorted list
+        int pos = Collections.binarySearch(segmentSizes, mergedSize);
+        if (pos < 0) {
+          pos = -pos-1;
+        }
+        segmentSizes.add(pos, mergedSize);
+        
+        n -= (f-1);
+        f = factor;
+      }
+
+      return totalBytes;
+    }
+
+    public Progress getProgress() {
+      return mergeProgress;
+    }
+
+  }
+}

Added: incubator/tez/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/TezRawKeyValueIterator.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/TezRawKeyValueIterator.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/TezRawKeyValueIterator.java (added)
+++ incubator/tez/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/TezRawKeyValueIterator.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.engine.common.sort.impl;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.util.Progress;
+
+/**
+ * <code>TezRawKeyValueIterator</code> is an iterator used to iterate over
+ * the raw keys and values during sort/merge of intermediate data. 
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public interface TezRawKeyValueIterator {
+  /** 
+   * Gets the current raw key.
+   * 
+   * @return Gets the current raw key as a DataInputBuffer
+   * @throws IOException
+   */
+  DataInputBuffer getKey() throws IOException;
+  
+  /** 
+   * Gets the current raw value.
+   * 
+   * @return Gets the current raw value as a DataInputBuffer 
+   * @throws IOException
+   */
+  DataInputBuffer getValue() throws IOException;
+  
+  /** 
+   * Sets up the current key and value (for getKey and getValue).
+   * 
+   * @return <code>true</code> if there exists a key/value, 
+   *         <code>false</code> otherwise. 
+   * @throws IOException
+   */
+  boolean next() throws IOException;
+  
+  /** 
+   * Closes the iterator so that the underlying streams can be closed.
+   * 
+   * @throws IOException
+   */
+  void close() throws IOException;
+  
+  /** Gets the Progress object; this has a float (0.0 - 1.0) 
+   * indicating the bytes processed by the iterator so far
+   */
+  Progress getProgress();
+}

Added: incubator/tez/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/TezSpillRecord.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/TezSpillRecord.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/TezSpillRecord.java (added)
+++ incubator/tez/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/TezSpillRecord.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,146 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.engine.common.sort.impl;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.LongBuffer;
+import java.util.zip.CheckedInputStream;
+import java.util.zip.CheckedOutputStream;
+import java.util.zip.Checksum;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.ChecksumException;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.util.PureJavaCrc32;
+import org.apache.tez.common.Constants;
+
+public class TezSpillRecord {
+
+  /** Backing store */
+  private final ByteBuffer buf;
+  /** View of backing storage as longs */
+  private final LongBuffer entries;
+
+  public TezSpillRecord(int numPartitions) {
+    buf = ByteBuffer.allocate(
+        numPartitions * Constants.MAP_OUTPUT_INDEX_RECORD_LENGTH);
+    entries = buf.asLongBuffer();
+  }
+
+  public TezSpillRecord(Path indexFileName, Configuration job) throws IOException {
+    this(indexFileName, job, null);
+  }
+
+  public TezSpillRecord(Path indexFileName, Configuration job, String expectedIndexOwner)
+    throws IOException {
+    this(indexFileName, job, new PureJavaCrc32(), expectedIndexOwner);
+  }
+
+  public TezSpillRecord(Path indexFileName, Configuration job, Checksum crc,
+                     String expectedIndexOwner)
+      throws IOException {
+
+    final FileSystem rfs = FileSystem.getLocal(job).getRaw();
+    final FSDataInputStream in = rfs.open(indexFileName);
+    try {
+      final long length = rfs.getFileStatus(indexFileName).getLen();
+      final int partitions = 
+          (int) length / Constants.MAP_OUTPUT_INDEX_RECORD_LENGTH;
+      final int size = partitions * Constants.MAP_OUTPUT_INDEX_RECORD_LENGTH;
+
+      buf = ByteBuffer.allocate(size);
+      if (crc != null) {
+        crc.reset();
+        CheckedInputStream chk = new CheckedInputStream(in, crc);
+        IOUtils.readFully(chk, buf.array(), 0, size);
+        if (chk.getChecksum().getValue() != in.readLong()) {
+          throw new ChecksumException("Checksum error reading spill index: " +
+                                indexFileName, -1);
+        }
+      } else {
+        IOUtils.readFully(in, buf.array(), 0, size);
+      }
+      entries = buf.asLongBuffer();
+    } finally {
+      in.close();
+    }
+  }
+
+  /**
+   * Return number of IndexRecord entries in this spill.
+   */
+  public int size() {
+    return entries.capacity() / (Constants.MAP_OUTPUT_INDEX_RECORD_LENGTH / 8);
+  }
+
+  /**
+   * Get spill offsets for given partition.
+   */
+  public TezIndexRecord getIndex(int partition) {
+    final int pos = partition * Constants.MAP_OUTPUT_INDEX_RECORD_LENGTH / 8;
+    return new TezIndexRecord(entries.get(pos), entries.get(pos + 1),
+                           entries.get(pos + 2));
+  }
+
+  /**
+   * Set spill offsets for given partition.
+   */
+  public void putIndex(TezIndexRecord rec, int partition) {
+    final int pos = partition * Constants.MAP_OUTPUT_INDEX_RECORD_LENGTH / 8;
+    entries.put(pos, rec.getStartOffset());
+    entries.put(pos + 1, rec.getRawLength());
+    entries.put(pos + 2, rec.getPartLength());
+  }
+
+  /**
+   * Write this spill record to the location provided.
+   */
+  public void writeToFile(Path loc, Configuration job)
+      throws IOException {
+    writeToFile(loc, job, new PureJavaCrc32());
+  }
+
+  public void writeToFile(Path loc, Configuration job, Checksum crc)
+      throws IOException {
+    final FileSystem rfs = FileSystem.getLocal(job).getRaw();
+    CheckedOutputStream chk = null;
+    final FSDataOutputStream out = rfs.create(loc);
+    try {
+      if (crc != null) {
+        crc.reset();
+        chk = new CheckedOutputStream(out, crc);
+        chk.write(buf.array());
+        out.writeLong(chk.getChecksum().getValue());
+      } else {
+        out.write(buf.array());
+      }
+    } finally {
+      if (chk != null) {
+        chk.close();
+      } else {
+        out.close();
+      }
+    }
+  }
+
+}



Mime
View raw message