tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rbalamo...@apache.org
Subject tez git commit: TEZ-3701. UnorderedPartitionedKVWriter - issues with parallel Deflater usage, synchronousqueue in threadpool (rbalamohan)
Date Fri, 02 Jun 2017 01:59:15 GMT
Repository: tez
Updated Branches:
  refs/heads/master a2ba95043 -> d5e65e207


TEZ-3701. UnorderedPartitionedKVWriter - issues with parallel Deflater usage, synchronousqueue
in threadpool (rbalamohan)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/d5e65e20
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/d5e65e20
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/d5e65e20

Branch: refs/heads/master
Commit: d5e65e207492ee1663a737e2411dbb75e07b891f
Parents: a2ba950
Author: Rajesh Balamohan <rbalamohan@apache.org>
Authored: Fri Jun 2 07:28:58 2017 +0530
Committer: Rajesh Balamohan <rbalamohan@apache.org>
Committed: Fri Jun 2 07:28:58 2017 +0530

----------------------------------------------------------------------
 .../writers/UnorderedPartitionedKVWriter.java   | 106 +++++++++++++++----
 .../TestUnorderedPartitionedKVWriter.java       |  23 +++-
 2 files changed, 102 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/d5e65e20/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
index 6c65ca2..bcc9cf9 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
@@ -29,6 +29,7 @@ import java.util.List;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.Semaphore;
 import java.util.concurrent.SynchronousQueue;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
@@ -124,7 +125,23 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
   // uncompressed size for each partition
   private final long[] sizePerPartition;
   private volatile long spilledSize = 0;
-  private final Deflater deflater;
+
+  static final ThreadLocal<Deflater> deflater = new ThreadLocal<Deflater>() {
+
+    @Override
+    public Deflater initialValue() {
+      return TezCommonUtils.newBestCompressionDeflater();
+    }
+
+    @Override
+    public Deflater get() {
+      Deflater deflater = super.get();
+      deflater.reset();
+      return deflater;
+    }
+  };
+
+  private final Semaphore availableSlots;
 
   /**
    * Represents final number of records written (spills are not counted)
@@ -174,7 +191,6 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
 
     Preconditions.checkArgument(availableMemoryBytes >= 0, "availableMemory should be
>= 0 bytes");
 
-    this.deflater = TezCommonUtils.newBestCompressionDeflater();
     this.destNameTrimmed = TezUtilsInternal.cleanVertexName(outputContext.getDestinationVertexName());
     //Not checking for TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT as it might not add much
value in
     // this case.  Add it later if needed.
@@ -215,16 +231,22 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
     valSerializer.open(dos);
     rfs = ((LocalFileSystem) FileSystem.getLocal(this.conf)).getRaw();
 
-    ExecutorService executor = new ThreadPoolExecutor(1,  Math.max(2, numBuffers/2),
+    int maxThreads = Math.max(2, numBuffers/2);
+    //TODO: Make use of TezSharedExecutor later
+    ExecutorService executor = new ThreadPoolExecutor(1, maxThreads,
         60L, TimeUnit.SECONDS,
         new SynchronousQueue<Runnable>(),
         new ThreadFactoryBuilder()
             .setDaemon(true)
             .setNameFormat(
                 "UnorderedOutSpiller {" + TezUtilsInternal.cleanVertexName(
-                    outputContext.getDestinationVertexName()) + "}")
+                    outputContext.getDestinationVertexName()) + "} #%d")
             .build()
     );
+    // to restrict submission of more tasks than threads (e.g numBuffers > numThreads)
+    // This is maxThreads - 1, to avoid race between callback thread releasing semaphore
and the
+    // thread calling tryAcquire.
+    availableSlots = new Semaphore(maxThreads - 1, true);
     spillExecutor = MoreExecutors.listeningDecorator(executor);
     numRecordsPerPartition = new int[numPartitions];
     reportPartitionStats = ReportPartitionStats.fromString(
@@ -428,23 +450,56 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
       updateGlobalStats(currentBuffer);
 
       filledBuffers.add(currentBuffer);
-      if (filledBuffers.size() >= spillLimit) {
-        if (LOG.isDebugEnabled() || (filledBufferCount % 10) == 0) {
-          LOG.info(destNameTrimmed + ": triggering spill");
+      mayBeSpill(false);
+
+      currentBuffer = getNextAvailableBuffer();
+
+      // in case spill threads are free, check if spilling is needed
+      mayBeSpill(false);
+    }
+  }
+
+  private void mayBeSpill(boolean shouldBlock) throws IOException {
+    if (filledBuffers.size() >= spillLimit) {
+      // Do not block; possible that there are more buffers
+      scheduleSpill(shouldBlock);
+    }
+  }
+
+  private boolean scheduleSpill(boolean block) throws IOException {
+    if (filledBuffers.isEmpty()) {
+      return false;
+    }
+
+    try {
+      if (block) {
+        availableSlots.acquire();
+      } else {
+        if (!availableSlots.tryAcquire()) {
+          // Data in filledBuffers would be spilled in subsequent iteration.
+          return false;
         }
-        pendingSpillCount.incrementAndGet();
-        int spillNumber = numSpills.getAndIncrement();
-        ListenableFuture<SpillResult> future = spillExecutor.submit(new SpillCallable(
-            new ArrayList<WrappedBuffer>(filledBuffers), codec, spilledRecordsCounter,
-            spillNumber));
-        filledBuffers.clear();
-        Futures.addCallback(future, new SpillCallback(spillNumber));
-        // Update once per buffer (instead of every record)
-        updateTezCountersAndNotify();
       }
-      WrappedBuffer wb = getNextAvailableBuffer();
-      currentBuffer = wb;
+
+      final int filledBufferCount = filledBuffers.size();
+      if (LOG.isDebugEnabled() || (filledBufferCount % 10) == 0) {
+        LOG.info(destNameTrimmed + ": triggering spill. filledBuffers.size=" + filledBufferCount);
+      }
+      pendingSpillCount.incrementAndGet();
+      int spillNumber = numSpills.getAndIncrement();
+
+      ListenableFuture<SpillResult> future = spillExecutor.submit(new SpillCallable(
+          new ArrayList<WrappedBuffer>(filledBuffers), codec, spilledRecordsCounter,
+          spillNumber));
+      filledBuffers.clear();
+      Futures.addCallback(future, new SpillCallback(spillNumber));
+      // Update once per buffer (instead of every record)
+      updateTezCountersAndNotify();
+      return true;
+    } catch(InterruptedException ie) {
+      Thread.currentThread().interrupt(); // reset interrupt status
     }
+    return false;
   }
 
   private boolean reportPartitionStats() {
@@ -470,6 +525,8 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
       } else {
         // All buffers initialized, and none available right now. Wait
         try {
+          // Ensure that spills are triggered so that buffers can be released.
+          mayBeSpill(true);
           return availableBuffers.take();
         } catch (InterruptedException e) {
           Thread.currentThread().interrupt();
@@ -610,6 +667,8 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
 
   @Override
   public List<Event> close() throws IOException, InterruptedException {
+    // In case there are buffers to be spilled, schedule spilling
+    scheduleSpill(true);
     List<Event> eventList = Lists.newLinkedList();
     isShutdown.set(true);
     spillLock.lock();
@@ -708,7 +767,7 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
 
   private Event generateVMEvent() throws IOException {
     return ShuffleUtils.generateVMEvent(outputContext, this.sizePerPartition,
-        this.reportDetailedPartitionStats(), deflater);
+        this.reportDetailedPartitionStats(), deflater.get());
   }
 
   private Event generateDMEvent() throws IOException {
@@ -728,7 +787,8 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
     if (emptyPartitions.cardinality() != 0) {
       // Empty partitions exist
       ByteString emptyPartitionsByteString =
-          TezCommonUtils.compressByteArrayToByteString(TezUtilsInternal.toByteArray(emptyPartitions),
deflater);
+          TezCommonUtils.compressByteArrayToByteString(TezUtilsInternal.toByteArray
+              (emptyPartitions), deflater.get());
       payloadBuilder.setEmptyPartitions(emptyPartitionsByteString);
     }
 
@@ -772,7 +832,7 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
         List<Event> eventList = Lists.newLinkedList();
         eventList.add(ShuffleUtils.generateVMEvent(outputContext,
             reportPartitionStats() ? new long[numPartitions] : null,
-                reportDetailedPartitionStats(), deflater));
+                reportDetailedPartitionStats(), deflater.get()));
         //Send final event with all empty partitions and null path component.
         BitSet emptyPartitions = new BitSet(numPartitions);
         emptyPartitions.flip(0, numPartitions);
@@ -1110,7 +1170,7 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
       String pathComponent = (outputContext.getUniqueIdentifier() + "_" + spillNumber);
       if (isFinalUpdate) {
         eventList.add(ShuffleUtils.generateVMEvent(outputContext,
-            sizePerPartition, reportDetailedPartitionStats(), deflater));
+            sizePerPartition, reportDetailedPartitionStats(), deflater.get()));
       }
       Event compEvent = generateDMEvent(true, spillNumber, isFinalUpdate,
           pathComponent, emptyPartitions);
@@ -1191,6 +1251,7 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
         }
       } finally {
         spillLock.unlock();
+        availableSlots.release();
       }
     }
 
@@ -1206,6 +1267,7 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
         spillInProgress.signal();
       } finally {
         spillLock.unlock();
+        availableSlots.release();
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/tez/blob/d5e65e20/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java
b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java
index 27e7992..1a0bbf0 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java
@@ -259,6 +259,12 @@ public class TestUnorderedPartitionedKVWriter {
   }
 
   @Test(timeout = 10000)
+  public void testMultipleSpillsWithSmallBuffer() throws IOException, InterruptedException
{
+    // numBuffers is much higher than available threads.
+    baseTest(200, 10, null, shouldCompress, 512, 0, 9600);
+  }
+
+  @Test(timeout = 10000)
   public void testMergeBuffersAndSpill() throws IOException, InterruptedException {
     baseTest(200, 10, null, shouldCompress, 2048, 10);
   }
@@ -702,8 +708,8 @@ public class TestUnorderedPartitionedKVWriter {
     } else {
       assertEquals(0, fileOutputBytes);
     }
-    assertEquals(recordsPerBuffer * numExpectedSpills,
-        spilledRecordsCounter.getValue());
+    // due to multiple threads, buffers could be merged in chunks in scheduleSpill.
+    assertTrue(recordsPerBuffer * numExpectedSpills >= spilledRecordsCounter.getValue());
     long additionalSpillBytesWritten =
         additionalSpillBytesWritternCounter.getValue();
     long additionalSpillBytesRead = additionalSpillBytesReadCounter.getValue();
@@ -776,9 +782,16 @@ public class TestUnorderedPartitionedKVWriter {
     }
   }
 
-
   private void baseTest(int numRecords, int numPartitions, Set<Integer> skippedPartitions,
       boolean shouldCompress, int maxSingleBufferSizeBytes, int bufferMergePercent)
+      throws IOException, InterruptedException {
+    baseTest(numRecords, numPartitions, skippedPartitions, shouldCompress,
+        maxSingleBufferSizeBytes, bufferMergePercent, 2048);
+  }
+
+  private void baseTest(int numRecords, int numPartitions, Set<Integer> skippedPartitions,
+      boolean shouldCompress, int maxSingleBufferSizeBytes, int bufferMergePercent, int
+      availableMemory)
           throws IOException, InterruptedException {
     PartitionerForTest partitioner = new PartitionerForTest();
     ApplicationId appId = ApplicationId.newInstance(10000000, 1);
@@ -802,7 +815,6 @@ public class TestUnorderedPartitionedKVWriter {
     }
 
     int numOutputs = numPartitions;
-    long availableMemory = 2048;
     int numRecordsWritten = 0;
 
     Map<Integer, Multimap<Integer, Long>> expectedValues = new HashMap<Integer,
Multimap<Integer, Long>>();
@@ -885,7 +897,8 @@ public class TestUnorderedPartitionedKVWriter {
       }
     }
     assertEquals(additionalSpillBytesWritten, additionalSpillBytesRead);
-    assertEquals(numExpectedSpills, numAdditionalSpillsCounter.getValue());
+    // due to multiple threads, buffers could be merged in chunks in scheduleSpill.
+    assertTrue(numExpectedSpills >= numAdditionalSpillsCounter.getValue());
 
     BitSet emptyPartitionBits = null;
     // Verify the events returned


Mime
View raw message