tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From haris...@apache.org
Subject tez git commit: TEZ-3673. Allocate smaller buffers in UnorderedPartitionedKVWriter. (harishjp)
Date Fri, 05 May 2017 10:13:44 GMT
Repository: tez
Updated Branches:
  refs/heads/master 68fe02338 -> 4ed4a5693


TEZ-3673. Allocate smaller buffers in UnorderedPartitionedKVWriter. (harishjp)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/4ed4a569
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/4ed4a569
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/4ed4a569

Branch: refs/heads/master
Commit: 4ed4a56939d8aa330485e4c400af0adfc309242d
Parents: 68fe023
Author: Harish JP <harishjp@gmail.com>
Authored: Fri May 5 15:42:42 2017 +0530
Committer: Harish JP <harishjp@gmail.com>
Committed: Fri May 5 15:42:42 2017 +0530

----------------------------------------------------------------------
 .../library/api/TezRuntimeConfiguration.java    |  11 ++
 .../writers/UnorderedPartitionedKVWriter.java   | 191 ++++++++++++++-----
 .../TestUnorderedPartitionedKVWriter.java       |  94 +++++++--
 3 files changed, 223 insertions(+), 73 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/4ed4a569/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java
index 4d24bfb..2eec276 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java
@@ -173,6 +173,16 @@ public class TezRuntimeConfiguration {
   public static final int TEZ_RUNTIME_PIPELINED_SORTER_SORT_THREADS_DEFAULT = 2;
 
   /**
+   * Integer value. Percentage of buffer to be filled before we spill to disk. Default value
is 0,
+   * which will spill for every buffer.
+   */
+  @ConfigurationProperty(type="int")
+  public static final String TEZ_RUNTIME_UNORDERED_PARTITIONED_KVWRITER_BUFFER_MERGE_PERCENT
=
+      TEZ_RUNTIME_PREFIX + "unordered-partitioned-kvwriter.buffer-merge-percent";
+  public static final int TEZ_RUNTIME_UNORDERED_PARTITIONED_KVWRITER_BUFFER_MERGE_PERCENT_DEFAULT
=
+      0;
+
+  /**
    * Report partition statistics (e.g better scheduling in ShuffleVertexManager). TEZ-2496
    * This can be enabled/disabled at vertex level.
    * {@link org.apache.tez.runtime.library.api.TezRuntimeConfiguration.ReportPartitionStats}
@@ -590,6 +600,7 @@ public class TezRuntimeConfiguration {
     tezRuntimeKeys.add(TEZ_RUNTIME_CONVERT_USER_PAYLOAD_TO_HISTORY_TEXT);
     tezRuntimeKeys.add(TEZ_RUNTIME_SORTER_CLASS);
     tezRuntimeKeys.add(TEZ_RUNTIME_CLEANUP_FILES_ON_INTERRUPT);
+    tezRuntimeKeys.add(TEZ_RUNTIME_UNORDERED_PARTITIONED_KVWRITER_BUFFER_MERGE_PERCENT);
 
     defaultConf.addResource("core-default.xml");
     defaultConf.addResource("core-site.xml");

http://git-wip-us.apache.org/repos/asf/tez/blob/4ed4a569/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
index ea49118..b01d00f 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
@@ -135,7 +135,11 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
   @VisibleForTesting
   int sizePerBuffer;
   @VisibleForTesting
+  int lastBufferSize;
+  @VisibleForTesting
   int numInitializedBuffers;
+  @VisibleForTesting
+  int spillLimit;
 
   private Throwable spillException;
   private AtomicBoolean isShutdown = new AtomicBoolean(false);
@@ -161,9 +165,12 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
 
   private final long indexFileSizeEstimate;
 
+  private List<WrappedBuffer> filledBuffers = new ArrayList<>();
+
   public UnorderedPartitionedKVWriter(OutputContext outputContext, Configuration conf,
       int numOutputs, long availableMemoryBytes) throws IOException {
     super(outputContext, conf, numOutputs);
+
     Preconditions.checkArgument(availableMemoryBytes >= 0, "availableMemory should be
>= 0 bytes");
 
     this.deflater = TezCommonUtils.newBestCompressionDeflater();
@@ -187,7 +194,8 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
 
     // Allow unit tests to control the buffer sizes.
     int maxSingleBufferSizeBytes = conf.getInt(
-        TezRuntimeConfiguration.TEZ_RUNTIME_UNORDERED_OUTPUT_MAX_PER_BUFFER_SIZE_BYTES, Integer.MAX_VALUE);
+        TezRuntimeConfiguration.TEZ_RUNTIME_UNORDERED_OUTPUT_MAX_PER_BUFFER_SIZE_BYTES,
+        Integer.MAX_VALUE);
     computeNumBuffersAndSize(maxSingleBufferSizeBytes);
 
     availableBuffers = new LinkedBlockingQueue<WrappedBuffer>();
@@ -251,11 +259,48 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
         + ", reportPartitionStats=" + reportPartitionStats);
   }
 
+  private static final int ALLOC_OVERHEAD = 64;
   private void computeNumBuffersAndSize(int bufferLimit) {
-    numBuffers = Math.max(2, (int) (availableMemory / bufferLimit)
-        + ((availableMemory % bufferLimit) == 0 ? 0 : 1));
-    sizePerBuffer = (int) (availableMemory / numBuffers);
+    numBuffers = (int)(availableMemory / bufferLimit);
+
+    if (numBuffers >= 2) {
+      sizePerBuffer = bufferLimit - ALLOC_OVERHEAD;
+      lastBufferSize = (int)(availableMemory % bufferLimit);
+      // Use leftover memory last buffer only if the leftover memory > 50% of bufferLimit
+      if (lastBufferSize > bufferLimit / 2) {
+        numBuffers += 1;
+      } else {
+        if (lastBufferSize > 0) {
+          LOG.warn("Underallocating memory. Unused memory size: {}.",  lastBufferSize);
+        }
+        lastBufferSize = sizePerBuffer;
+      }
+    } else {
+      // We should have minimum of 2 buffers.
+      numBuffers = 2;
+      if (availableMemory / numBuffers > Integer.MAX_VALUE) {
+        sizePerBuffer = Integer.MAX_VALUE;
+      } else {
+        sizePerBuffer = (int)(availableMemory / numBuffers);
+      }
+      // 2 equal sized buffers.
+      lastBufferSize = sizePerBuffer;
+    }
+    // Ensure allocation size is multiple of INT_SIZE, truncate down.
     sizePerBuffer = sizePerBuffer - (sizePerBuffer % INT_SIZE);
+    lastBufferSize = lastBufferSize - (lastBufferSize % INT_SIZE);
+
+    int mergePercent = conf.getInt(
+        TezRuntimeConfiguration.TEZ_RUNTIME_UNORDERED_PARTITIONED_KVWRITER_BUFFER_MERGE_PERCENT,
+        TezRuntimeConfiguration.TEZ_RUNTIME_UNORDERED_PARTITIONED_KVWRITER_BUFFER_MERGE_PERCENT_DEFAULT);
+    spillLimit = numBuffers * mergePercent / 100;
+    // Keep within limits.
+    if (spillLimit < 1) {
+      spillLimit = 1;
+    }
+    if (spillLimit > numBuffers) {
+      spillLimit = numBuffers;
+    }
   }
 
   @Override
@@ -375,18 +420,22 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
       currentBuffer.reset();
     } else {
       // Update overall stats
-      LOG.info(destNameTrimmed + ": " + "Moving to next buffer and triggering spill");
+      LOG.info(destNameTrimmed + ": " + "Moving to next buffer");
       updateGlobalStats(currentBuffer);
 
-      pendingSpillCount.incrementAndGet();
-
-      int spillNumber = numSpills.getAndIncrement();
-      ListenableFuture<SpillResult> future = spillExecutor.submit(
-          new SpillCallable(currentBuffer, codec, spilledRecordsCounter, spillNumber));
-      Futures.addCallback(future, new SpillCallback(spillNumber));
-      // Update once per buffer (instead of every record)
-      updateTezCountersAndNotify();
-
+      filledBuffers.add(currentBuffer);
+      if (filledBuffers.size() >= spillLimit) {
+        LOG.info(destNameTrimmed + ": triggering spill");
+        pendingSpillCount.incrementAndGet();
+        int spillNumber = numSpills.getAndIncrement();
+        ListenableFuture<SpillResult> future = spillExecutor.submit(new SpillCallable(
+            new ArrayList<WrappedBuffer>(filledBuffers), codec, spilledRecordsCounter,
+            spillNumber));
+        filledBuffers.clear();
+        Futures.addCallback(future, new SpillCallback(spillNumber));
+        // Update once per buffer (instead of every record)
+        updateTezCountersAndNotify();
+      }
       WrappedBuffer wb = getNextAvailableBuffer();
       currentBuffer = wb;
     }
@@ -408,7 +457,8 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
   private WrappedBuffer getNextAvailableBuffer() throws IOException {
     if (availableBuffers.peek() == null) {
       if (numInitializedBuffers < numBuffers) {
-        buffers[numInitializedBuffers] = new WrappedBuffer(numPartitions, sizePerBuffer);
+        buffers[numInitializedBuffers] = new WrappedBuffer(numPartitions,
+            numInitializedBuffers == numBuffers - 1 ? lastBufferSize : sizePerBuffer);
         numInitializedBuffers++;
         return buffers[numInitializedBuffers - 1];
       } else {
@@ -428,16 +478,16 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
   // All spills using compression for now.
   private class SpillCallable extends CallableWithNdc<SpillResult> {
 
-    private final WrappedBuffer wrappedBuffer;
+    private final List<WrappedBuffer> filledBuffers;
     private final CompressionCodec codec;
     private final TezCounter numRecordsCounter;
     private int spillIndex;
     private SpillPathDetails spillPathDetails;
     private int spillNumber;
 
-    public SpillCallable(WrappedBuffer wrappedBuffer, CompressionCodec codec,
+    public SpillCallable(List<WrappedBuffer> filledBuffers, CompressionCodec codec,
         TezCounter numRecordsCounter, SpillPathDetails spillPathDetails) {
-      this.wrappedBuffer = wrappedBuffer;
+      this.filledBuffers = filledBuffers;
       this.codec = codec;
       this.numRecordsCounter = numRecordsCounter;
       this.spillIndex = spillPathDetails.spillIndex;
@@ -446,9 +496,9 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
       this.spillPathDetails = spillPathDetails;
     }
 
-    public SpillCallable(WrappedBuffer wrappedBuffer, CompressionCodec codec,
+    public SpillCallable(List<WrappedBuffer> filledBuffers, CompressionCodec codec,
         TezCounter numRecordsCounter, int spillNumber) throws IOException {
-      this.wrappedBuffer = wrappedBuffer;
+      this.filledBuffers = filledBuffers;
       this.codec = codec;
       this.numRecordsCounter = numRecordsCounter;
       this.spillNumber = spillNumber;
@@ -474,33 +524,43 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
         IFile.Writer writer = null;
         try {
           long segmentStart = out.getPos();
-          if (wrappedBuffer.partitionPositions[i] == WrappedBuffer.PARTITION_ABSENT_POSITION)
{
-            // Skip empty partition.
-            continue;
+          long numRecords = 0;
+          for (WrappedBuffer buffer : filledBuffers) {
+            outputContext.notifyProgress();
+            if (buffer.partitionPositions[i] == WrappedBuffer.PARTITION_ABSENT_POSITION)
{
+              // Skip empty partition.
+              continue;
+            }
+            if (writer == null) {
+              writer = new Writer(conf, out, keyClass, valClass, codec, null, null);
+            }
+            numRecords += writePartition(buffer.partitionPositions[i], buffer, writer, key,
val);
           }
-          writer = new Writer(conf, out, keyClass, valClass, codec, null, null);
-          long numRecords = writePartition(wrappedBuffer.partitionPositions[i], wrappedBuffer,
-              writer, key, val);
-          if (numRecordsCounter != null) {
-            // TezCounter is not threadsafe; Since numRecordsCounter would be updated from
-            // multiple threads, it is good to synchronize it when incrementing it for correctness.
-            synchronized (numRecordsCounter) {
-              numRecordsCounter.increment(numRecords);
+          if (writer != null) {
+            if (numRecordsCounter != null) {
+              // TezCounter is not threadsafe; Since numRecordsCounter would be updated from
+              // multiple threads, it is good to synchronize it when incrementing it for
correctness.
+              synchronized (numRecordsCounter) {
+                numRecordsCounter.increment(numRecords);
+              }
             }
+            writer.close();
+            compressedLength += writer.getCompressedLength();
+            TezIndexRecord indexRecord = new TezIndexRecord(segmentStart, writer.getRawLength(),
+                writer.getCompressedLength());
+            spillRecord.putIndex(indexRecord, i);
+            writer = null;
           }
-          writer.close();
-          compressedLength += writer.getCompressedLength();
-          TezIndexRecord indexRecord = new TezIndexRecord(segmentStart, writer.getRawLength(),
-              writer.getCompressedLength());
-          spillRecord.putIndex(indexRecord, i);
-          writer = null;
         } finally {
           if (writer != null) {
             writer.close();
           }
         }
       }
-      spillResult = new SpillResult(compressedLength, this.wrappedBuffer);
+      key.close();
+      val.close();
+
+      spillResult = new SpillResult(compressedLength, this.filledBuffers);
 
       handleSpillIndex(spillPathDetails, spillRecord);
       LOG.info(destNameTrimmed + ": " + "Finished spill " + spillIndex);
@@ -717,10 +777,11 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
       return false;
     } else {
       updateGlobalStats(currentBuffer);
+      filledBuffers.add(currentBuffer);
 
       //setup output file and index file
       SpillPathDetails spillPathDetails = getSpillPathDetails(true, -1);
-      SpillCallable spillCallable = new SpillCallable(currentBuffer, codec, null, spillPathDetails);
+      SpillCallable spillCallable = new SpillCallable(filledBuffers, codec, null, spillPathDetails);
       try {
         SpillResult spillResult = spillCallable.call();
 
@@ -901,7 +962,9 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
               sizePerPartition[i] += writer.getRawLength();
             }
             writer.close();
-            additionalSpillBytesWritternCounter.increment(writer.getCompressedLength());
+            synchronized (additionalSpillBytesWritternCounter) {
+              additionalSpillBytesWritternCounter.increment(writer.getCompressedLength());
+            }
             TezIndexRecord indexRecord = new TezIndexRecord(recordStart, writer.getRawLength(),
                 writer.getCompressedLength());
             spillRecord.putIndex(indexRecord, i);
@@ -1072,25 +1135,47 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
 
     @Override
     public void onSuccess(SpillResult result) {
-      spilledSize += result.spillSize;
+      synchronized (UnorderedPartitionedKVWriter.this) {
+        spilledSize += result.spillSize;
+      }
 
-      sendPipelinedEventForSpill(result.wrappedBuffer.recordsPerPartition,
-          result.wrappedBuffer.sizePerPartition, spillNumber, false);
+      int recordsPerPartition[] = null;
+      long sizePerPartition[] = null;
+      if (result.filledBuffers.size() == 1) {
+        recordsPerPartition = result.filledBuffers.get(0).recordsPerPartition;
+        sizePerPartition = result.filledBuffers.get(0).sizePerPartition;
+      } else {
+        recordsPerPartition = new int[numPartitions];
+        sizePerPartition = new long[numPartitions];
+        for (WrappedBuffer buffer : result.filledBuffers) {
+          for (int i = 0; i < numPartitions; ++i) {
+            recordsPerPartition[i] += buffer.recordsPerPartition[i];
+            sizePerPartition[i] += buffer.sizePerPartition[i];
+          }
+        }
+      }
 
-      try {
-        result.wrappedBuffer.reset();
-        availableBuffers.add(result.wrappedBuffer);
+      sendPipelinedEventForSpill(recordsPerPartition, sizePerPartition, spillNumber, false);
 
+      try {
+        for (WrappedBuffer buffer : result.filledBuffers) {
+          buffer.reset();
+          availableBuffers.add(buffer);
+        }
       } catch (Throwable e) {
-        LOG.error(destNameTrimmed + ": " + "Failure while attempting to reset buffer after
spill", e);
+        LOG.error(destNameTrimmed + ": Failure while attempting to reset buffer after spill",
e);
         outputContext.reportFailure(TaskFailureType.NON_FATAL, e, "Failure while attempting
to reset buffer after spill");
       }
 
       if (!pipelinedShuffle) {
-        additionalSpillBytesWritternCounter.increment(result.spillSize);
+        synchronized(additionalSpillBytesWritternCounter) {
+          additionalSpillBytesWritternCounter.increment(result.spillSize);
+        }
       } else {
-        fileOutputBytesCounter.increment(indexFileSizeEstimate);
-        fileOutputBytesCounter.increment(result.spillSize);
+        synchronized(fileOutputBytesCounter) {
+          fileOutputBytesCounter.increment(indexFileSizeEstimate);
+          fileOutputBytesCounter.increment(result.spillSize);
+        }
       }
 
       spillLock.lock();
@@ -1121,11 +1206,11 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
 
   private static class SpillResult {
     final long spillSize;
-    final WrappedBuffer wrappedBuffer;
+    final List<WrappedBuffer> filledBuffers;
 
-    SpillResult(long size, WrappedBuffer wrappedBuffer) {
+    SpillResult(long size, List<WrappedBuffer> filledBuffers) {
       this.spillSize = size;
-      this.wrappedBuffer = wrappedBuffer;
+      this.filledBuffers = filledBuffers;
     }
   }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/4ed4a569/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java
b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java
index 41b2b97..d970b95 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java
@@ -93,7 +93,6 @@ import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameters;
 
 import com.google.common.collect.LinkedListMultimap;
 import com.google.common.collect.Multimap;
@@ -125,7 +124,7 @@ public class TestUnorderedPartitionedKVWriter {
   }
 
   @SuppressWarnings("deprecation")
-  @Parameterized.Parameters(name = "test[{0}, {1}, {2}]")
+  @Parameterized.Parameters(name = "test[{0}, {1}]")
   public static Collection<Object[]> data() {
     Object[][] data = new Object[][] {
         { false, ReportPartitionStats.DISABLED },
@@ -162,7 +161,8 @@ public class TestUnorderedPartitionedKVWriter {
     String uniqueId = UUID.randomUUID().toString();
     OutputContext outputContext = createMockOutputContext(counters, appId, uniqueId);
 
-    int maxSingleBufferSizeBytes = 2047;
+    final int maxSingleBufferSizeBytes = 2047;
+    final long sizePerBuffer = maxSingleBufferSizeBytes - 64 - maxSingleBufferSizeBytes %
4;
     Configuration conf = createConfiguration(outputContext, IntWritable.class, LongWritable.class,
         false, maxSingleBufferSizeBytes);
 
@@ -170,57 +170,106 @@ public class TestUnorderedPartitionedKVWriter {
 
     UnorderedPartitionedKVWriter kvWriter = null;
 
+    // Not enough memory so divide into 2 buffers.
     kvWriter = new UnorderedPartitionedKVWriterForTest(outputContext, conf, numOutputs, 2048);
     assertEquals(2, kvWriter.numBuffers);
     assertEquals(1024, kvWriter.sizePerBuffer);
+    assertEquals(1024, kvWriter.lastBufferSize);
     assertEquals(1, kvWriter.numInitializedBuffers);
+    assertEquals(1, kvWriter.spillLimit);
 
+    // allocate exact
     kvWriter = new UnorderedPartitionedKVWriterForTest(outputContext, conf, numOutputs,
         maxSingleBufferSizeBytes * 3);
     assertEquals(3, kvWriter.numBuffers);
-    assertEquals(maxSingleBufferSizeBytes - maxSingleBufferSizeBytes % 4, kvWriter.sizePerBuffer);
+    assertEquals(sizePerBuffer, kvWriter.sizePerBuffer);
+    assertEquals(sizePerBuffer, kvWriter.lastBufferSize);
     assertEquals(1, kvWriter.numInitializedBuffers);
+    assertEquals(1, kvWriter.spillLimit);
 
+    // under allocate
     kvWriter = new UnorderedPartitionedKVWriterForTest(outputContext, conf, numOutputs,
-        maxSingleBufferSizeBytes * 2 + 1);
+        maxSingleBufferSizeBytes * 2 + maxSingleBufferSizeBytes / 2);
+    assertEquals(2, kvWriter.numBuffers);
+    assertEquals(sizePerBuffer, kvWriter.sizePerBuffer);
+    assertEquals(sizePerBuffer, kvWriter.lastBufferSize);
+    assertEquals(1, kvWriter.numInitializedBuffers);
+    assertEquals(1, kvWriter.spillLimit);
+
+    // over allocate
+    kvWriter = new UnorderedPartitionedKVWriterForTest(outputContext, conf, numOutputs,
+        maxSingleBufferSizeBytes * 2 + maxSingleBufferSizeBytes / 2 + 1);
     assertEquals(3, kvWriter.numBuffers);
-    assertEquals(1364, kvWriter.sizePerBuffer);
+    assertEquals(sizePerBuffer, kvWriter.sizePerBuffer);
+    assertEquals(maxSingleBufferSizeBytes / 2 + 1, kvWriter.lastBufferSize);
+    assertEquals(1, kvWriter.numInitializedBuffers);
+    assertEquals(1, kvWriter.spillLimit);
+
+    // spill limit 1.
+    kvWriter = new UnorderedPartitionedKVWriterForTest(outputContext, conf, numOutputs,
+        4 * maxSingleBufferSizeBytes + 1);
+    assertEquals(4, kvWriter.numBuffers);
+    assertEquals(sizePerBuffer, kvWriter.sizePerBuffer);
+    assertEquals(sizePerBuffer, kvWriter.lastBufferSize);
+    assertEquals(1, kvWriter.numInitializedBuffers);
+    assertEquals(1, kvWriter.spillLimit);
+
+    // spill limit 2.
+    conf.setInt(
+        TezRuntimeConfiguration.TEZ_RUNTIME_UNORDERED_PARTITIONED_KVWRITER_BUFFER_MERGE_PERCENT,
+        50);
+    kvWriter = new UnorderedPartitionedKVWriterForTest(outputContext, conf, numOutputs,
+        4 * maxSingleBufferSizeBytes + 1);
+    assertEquals(4, kvWriter.numBuffers);
+    assertEquals(sizePerBuffer, kvWriter.sizePerBuffer);
+    assertEquals(sizePerBuffer, kvWriter.lastBufferSize);
     assertEquals(1, kvWriter.numInitializedBuffers);
+    assertEquals(2, kvWriter.spillLimit);
 
-    kvWriter = new UnorderedPartitionedKVWriterForTest(outputContext, conf, numOutputs, 10240);
-    assertEquals(6, kvWriter.numBuffers);
-    assertEquals(1704, kvWriter.sizePerBuffer);
+    // Available memory is less than buffer size.
+    conf.unset(TezRuntimeConfiguration.TEZ_RUNTIME_UNORDERED_OUTPUT_MAX_PER_BUFFER_SIZE_BYTES);
+    kvWriter = new UnorderedPartitionedKVWriterForTest(outputContext, conf, numOutputs,
+        2048);
+    assertEquals(2, kvWriter.numBuffers);
+    assertEquals(1024, kvWriter.sizePerBuffer);
+    assertEquals(1024, kvWriter.lastBufferSize);
     assertEquals(1, kvWriter.numInitializedBuffers);
+    assertEquals(1, kvWriter.spillLimit);
   }
 
   @Test(timeout = 10000)
   public void testNoSpill() throws IOException, InterruptedException {
-    baseTest(10, 10, null, shouldCompress);
+    baseTest(10, 10, null, shouldCompress, -1, 0);
   }
 
   @Test(timeout = 10000)
   public void testSingleSpill() throws IOException, InterruptedException {
-    baseTest(50, 10, null, shouldCompress);
+    baseTest(50, 10, null, shouldCompress, -1, 0);
   }
 
   @Test(timeout = 10000)
   public void testMultipleSpills() throws IOException, InterruptedException {
-    baseTest(200, 10, null, shouldCompress);
+    baseTest(200, 10, null, shouldCompress, -1, 0);
+  }
+
+  @Test(timeout = 10000)
+  public void testMergeBuffersAndSpill() throws IOException, InterruptedException {
+    baseTest(200, 10, null, shouldCompress, 2048, 10);
   }
 
   @Test(timeout = 10000)
   public void testNoRecords() throws IOException, InterruptedException {
-    baseTest(0, 10, null, shouldCompress);
+    baseTest(0, 10, null, shouldCompress, -1, 0);
   }
 
   @Test(timeout = 10000)
   public void testSkippedPartitions() throws IOException, InterruptedException {
-    baseTest(200, 10, Sets.newHashSet(2, 5), shouldCompress);
+    baseTest(200, 10, Sets.newHashSet(2, 5), shouldCompress, -1, 0);
   }
 
   @Test(timeout = 10000)
   public void testNoSpill_SinglePartition() throws IOException, InterruptedException {
-    baseTest(10, 1, null, shouldCompress);
+    baseTest(10, 1, null, shouldCompress, -1, 0);
   }
 
 
@@ -703,7 +752,8 @@ public class TestUnorderedPartitionedKVWriter {
 
 
   private void baseTest(int numRecords, int numPartitions, Set<Integer> skippedPartitions,
-      boolean shouldCompress) throws IOException, InterruptedException {
+      boolean shouldCompress, int maxSingleBufferSizeBytes, int bufferMergePercent)
+          throws IOException, InterruptedException {
     PartitionerForTest partitioner = new PartitionerForTest();
     ApplicationId appId = ApplicationId.newInstance(10000000, 1);
     TezCounters counters = new TezCounters();
@@ -711,7 +761,11 @@ public class TestUnorderedPartitionedKVWriter {
     OutputContext outputContext = createMockOutputContext(counters, appId, uniqueId);
 
     Configuration conf = createConfiguration(outputContext, IntWritable.class, LongWritable.class,
-        shouldCompress, -1);
+        shouldCompress, maxSingleBufferSizeBytes);
+    conf.setInt(
+        TezRuntimeConfiguration.TEZ_RUNTIME_UNORDERED_PARTITIONED_KVWRITER_BUFFER_MERGE_PERCENT,
+        bufferMergePercent);
+
     CompressionCodec codec = null;
     if (shouldCompress) {
       codec = new DefaultCodec();
@@ -752,7 +806,7 @@ public class TestUnorderedPartitionedKVWriter {
     List<Event> events = kvWriter.close();
 
     int recordsPerBuffer = sizePerBuffer / sizePerRecordWithOverhead;
-    int numExpectedSpills = numRecordsWritten / recordsPerBuffer;
+    int numExpectedSpills = numRecordsWritten / recordsPerBuffer / kvWriter.spillLimit;
 
     verify(outputContext, never()).reportFailure(any(TaskFailureType.class), any(Throwable.class),
any(String.class));
 
@@ -801,7 +855,7 @@ public class TestUnorderedPartitionedKVWriter {
         assertTrue(additionalSpillBytesRead > (recordsPerBuffer * numExpectedSpills *
sizePerRecord));
       }
     }
-    assertTrue(additionalSpillBytesWritten == additionalSpillBytesRead);
+    assertEquals(additionalSpillBytesWritten, additionalSpillBytesRead);
     assertEquals(numExpectedSpills, numAdditionalSpillsCounter.getValue());
 
     BitSet emptyPartitionBits = null;
@@ -889,7 +943,7 @@ public class TestUnorderedPartitionedKVWriter {
   }
 
   private static String createRandomString(int size) {
-    StringBuilder sb = new StringBuilder();
+    StringBuilder sb = new StringBuilder(size);
     Random random = new Random();
     for (int i = 0; i < size; i++) {
       int r = Math.abs(random.nextInt()) % 26;


Mime
View raw message