tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rbalamo...@apache.org
Subject tez git commit: TEZ-3680. Optimizations to UnorderedPartitionedKVWriter (rbalamohan)
Date Thu, 20 Apr 2017 09:19:28 GMT
Repository: tez
Updated Branches:
  refs/heads/master a5179d649 -> cadf31b5e


TEZ-3680. Optimizations to UnorderedPartitionedKVWriter (rbalamohan)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/cadf31b5
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/cadf31b5
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/cadf31b5

Branch: refs/heads/master
Commit: cadf31b5e21fd824c53ccfa74ac97eb8aff508b9
Parents: a5179d6
Author: Rajesh Balamohan <rbalamohan@apache.org>
Authored: Thu Apr 20 14:49:20 2017 +0530
Committer: Rajesh Balamohan <rbalamohan@apache.org>
Committed: Thu Apr 20 14:49:20 2017 +0530

----------------------------------------------------------------------
 .../writers/UnorderedPartitionedKVWriter.java   | 110 ++++++++++++++-----
 1 file changed, 84 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/cadf31b5/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
index 0f38a29..d8cedac 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
@@ -28,8 +28,10 @@ import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.Condition;
@@ -45,7 +47,6 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.DataInputBuffer;
-import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.tez.common.CallableWithNdc;
 import org.apache.tez.common.TezCommonUtils;
@@ -114,6 +115,11 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
   private final ListeningExecutorService spillExecutor;
 
   private final int[] numRecordsPerPartition;
+  private long localOutputRecordBytesCounter;
+  private long localOutputBytesWithOverheadCounter;
+  private long localOutputRecordsCounter;
+  // notify after x records
+  private static final int NOTIFY_THRESHOLD = 1000;
   // uncompressed size for each partition
   private final long[] sizePerPartition;
   private volatile long spilledSize = 0;
@@ -200,15 +206,16 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
     valSerializer.open(dos);
     rfs = ((LocalFileSystem) FileSystem.getLocal(this.conf)).getRaw();
 
-    ExecutorService executor = Executors.newFixedThreadPool(
-        1,
+    ExecutorService executor = new ThreadPoolExecutor(1,  Math.max(2, numBuffers/2),
+        60L, TimeUnit.SECONDS,
+        new SynchronousQueue<Runnable>(),
         new ThreadFactoryBuilder()
             .setDaemon(true)
             .setNameFormat(
-                "UnorderedOutSpiller {"
-                    + TezUtilsInternal.cleanVertexName(
-                        outputContext.getDestinationVertexName()) + "}")
-            .build());
+                "UnorderedOutSpiller {" + TezUtilsInternal.cleanVertexName(
+                    outputContext.getDestinationVertexName()) + "}")
+            .build()
+    );
     spillExecutor = MoreExecutors.listeningDecorator(executor);
     numRecordsPerPartition = new int[numPartitions];
     reportPartitionStats = ReportPartitionStats.fromString(
@@ -338,10 +345,12 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
     currentBuffer.metaBuffer.put(metaIndex + INDEX_NEXT, indexNext);
     currentBuffer.skipSize += metaSkip; // For size estimation
     // Update stats on number of records
-    outputRecordBytesCounter.increment(currentBuffer.nextPosition - (metaStart + META_SIZE));
-    outputBytesWithOverheadCounter.increment((currentBuffer.nextPosition - metaStart) + metaSkip);
-    outputRecordsCounter.increment(1);
-    outputContext.notifyProgress();
+    localOutputRecordBytesCounter += (currentBuffer.nextPosition - (metaStart + META_SIZE));
+    localOutputBytesWithOverheadCounter += ((currentBuffer.nextPosition - metaStart) + metaSkip);
+    localOutputRecordsCounter++;
+    if (localOutputRecordBytesCounter % NOTIFY_THRESHOLD == 0) {
+      updateTezCountersAndNotify();
+    }
     currentBuffer.partitionPositions[partition] = metaStart;
     currentBuffer.recordsPerPartition[partition]++;
     currentBuffer.sizePerPartition[partition] +=
@@ -350,6 +359,16 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
 
   }
 
+  private void updateTezCountersAndNotify() {
+    outputRecordBytesCounter.increment(localOutputRecordBytesCounter);
+    outputBytesWithOverheadCounter.increment(localOutputBytesWithOverheadCounter);
+    outputRecordsCounter.increment(localOutputRecordsCounter);
+    outputContext.notifyProgress();
+    localOutputRecordBytesCounter = 0;
+    localOutputBytesWithOverheadCounter = 0;
+    localOutputRecordsCounter = 0;
+  }
+
   private void setupNextBuffer() throws IOException {
 
     if (currentBuffer.numRecords == 0) {
@@ -361,11 +380,11 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
 
       pendingSpillCount.incrementAndGet();
 
-      SpillPathDetails spillPathDetails = getSpillPathDetails(false, -1);
-
       ListenableFuture<SpillResult> future = spillExecutor.submit(
-          new SpillCallable(currentBuffer, codec, spilledRecordsCounter, spillPathDetails));
-      Futures.addCallback(future, new SpillCallback(spillPathDetails.spillIndex));
+          new SpillCallable(currentBuffer, codec, spilledRecordsCounter, numSpills.getAndIncrement()));
+      Futures.addCallback(future, new SpillCallback(numSpills.get()));
+      // Update once per buffer (instead of every record)
+      updateTezCountersAndNotify();
 
       WrappedBuffer wb = getNextAvailableBuffer();
       currentBuffer = wb;
@@ -411,8 +430,9 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
     private final WrappedBuffer wrappedBuffer;
     private final CompressionCodec codec;
     private final TezCounter numRecordsCounter;
-    private final int spillIndex;
-    private final SpillPathDetails spillPathDetails;
+    private int spillIndex;
+    private SpillPathDetails spillPathDetails;
+    private int spillNumber;
 
     public SpillCallable(WrappedBuffer wrappedBuffer, CompressionCodec codec,
         TezCounter numRecordsCounter, SpillPathDetails spillPathDetails) {
@@ -425,6 +445,14 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
       this.spillPathDetails = spillPathDetails;
     }
 
+    public SpillCallable(WrappedBuffer wrappedBuffer, CompressionCodec codec,
+        TezCounter numRecordsCounter, int spillNumber) throws IOException {
+      this.wrappedBuffer = wrappedBuffer;
+      this.codec = codec;
+      this.numRecordsCounter = numRecordsCounter;
+      this.spillNumber = spillNumber;
+    }
+
     @Override
     protected SpillResult callInternal() throws IOException {
       // This should not be called with an empty buffer. Check before invoking.
@@ -432,6 +460,10 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
       // Number of parallel spills determined by number of threads.
       // Last spill synchronization handled separately.
       SpillResult spillResult = null;
+      if (spillPathDetails == null) {
+        this.spillPathDetails = getSpillPathDetails(false, -1, spillNumber);
+        this.spillIndex = spillPathDetails.spillIndex;
+      }
       FSDataOutputStream out = rfs.create(spillPathDetails.outputFilePath);
       TezSpillRecord spillRecord = new TezSpillRecord(numPartitions);
       DataInputBuffer key = new DataInputBuffer();
@@ -439,15 +471,22 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
       long compressedLength = 0;
       for (int i = 0; i < numPartitions; i++) {
         IFile.Writer writer = null;
-        outputContext.notifyProgress();
         try {
           long segmentStart = out.getPos();
           if (wrappedBuffer.partitionPositions[i] == WrappedBuffer.PARTITION_ABSENT_POSITION)
{
             // Skip empty partition.
             continue;
           }
-          writer = new Writer(conf, out, keyClass, valClass, codec, numRecordsCounter, null);
-          writePartition(wrappedBuffer.partitionPositions[i], wrappedBuffer, writer, key,
val);
+          writer = new Writer(conf, out, keyClass, valClass, codec, null, null);
+          long numRecords = writePartition(wrappedBuffer.partitionPositions[i], wrappedBuffer,
+              writer, key, val);
+          if (numRecordsCounter != null) {
+            // TezCounter is not threadsafe; Since numRecordsCounter would be updated from
+            // multiple threads, it is good to synchronize it when incrementing it for correctness.
+            synchronized (numRecordsCounter) {
+              numRecordsCounter.increment(numRecords);
+            }
+          }
           writer.close();
           compressedLength += writer.getCompressedLength();
           TezIndexRecord indexRecord = new TezIndexRecord(segmentStart, writer.getRawLength(),
@@ -473,8 +512,9 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
     }
   }
 
-  private void writePartition(int pos, WrappedBuffer wrappedBuffer, Writer writer,
+  private long writePartition(int pos, WrappedBuffer wrappedBuffer, Writer writer,
       DataInputBuffer keyBuffer, DataInputBuffer valBuffer) throws IOException {
+    long numRecords = 0;
     while (pos != WrappedBuffer.PARTITION_ABSENT_POSITION) {
       int metaIndex = pos / INT_SIZE;
       int keyLength = wrappedBuffer.metaBuffer.get(metaIndex + INDEX_KEYLEN);
@@ -483,8 +523,10 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
       valBuffer.reset(wrappedBuffer.buffer, pos + META_SIZE + keyLength, valLength);
 
       writer.append(keyBuffer, valBuffer);
+      numRecords++;
       pos = wrappedBuffer.metaBuffer.get(metaIndex + INDEX_NEXT);
     }
+    return numRecords;
   }
 
   public static long getInitialMemoryRequirement(Configuration conf, long maxAvailableTaskMemory)
{
@@ -562,6 +604,7 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
         } else {
           finalSpill();
         }
+        updateTezCountersAndNotify();
         cleanupCurrentBuffer();
         eventList.add(generateVMEvent());
         eventList.add(generateDMEvent());
@@ -575,6 +618,7 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
         sendPipelinedEventForSpill(currentBuffer.recordsPerPartition,
             sizePerPartition, numSpills.get() - 1, true);
       }
+      updateTezCountersAndNotify();
       cleanupCurrentBuffer();
       return events;
     }
@@ -697,9 +741,23 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
    * @return SpillPathDetails
    * @throws IOException
    */
-  private SpillPathDetails getSpillPathDetails(boolean isFinalSpill, long expectedSpillSize)
throws
-      IOException {
+  private SpillPathDetails getSpillPathDetails(boolean isFinalSpill, long expectedSpillSize)
+      throws IOException {
     int spillNumber = numSpills.getAndIncrement();
+    return getSpillPathDetails(isFinalSpill, expectedSpillSize, spillNumber);
+  }
+
+  /**
+   * Set up spill output file, index file details.
+   *
+   * @param isFinalSpill
+   * @param expectedSpillSize
+   * @param spillNumber
+   * @return SpillPathDetails
+   * @throws IOException
+   */
+  private SpillPathDetails getSpillPathDetails(boolean isFinalSpill, long expectedSpillSize,
+      int spillNumber) throws IOException {
     long spillSize = (expectedSpillSize < 0) ?
         (currentBuffer.nextPosition + numPartitions * APPROX_HEADER_LENGTH) : expectedSpillSize;
 
@@ -768,7 +826,6 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
           }
           synchronized (spillInfoList) {
             for (SpillInfo spillInfo : spillInfoList) {
-              outputContext.notifyProgress();
               TezIndexRecord indexRecord = spillInfo.spillRecord.getIndex(i);
               if (indexRecord.getPartLength() == 0) {
                 // Skip empty partitions within a spill
@@ -795,6 +852,7 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
               writer.getCompressedLength());
           writer = null;
           finalSpillRecord.putIndex(indexRecord, i);
+          outputContext.notifyProgress();
         } finally {
           if (writer != null) {
             writer.close();
@@ -973,10 +1031,10 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
   private void sendPipelinedEventForSpill(
       BitSet emptyPartitions, long[] sizePerPartition, int spillNumber,
       boolean isFinalUpdate) {
-    List<Event> eventList = Lists.newLinkedList();
     if (!pipelinedShuffle) {
       return;
     }
+    List<Event> eventList = Lists.newLinkedList();
     //Send out an event for consuming.
     try {
       String pathComponent = (outputContext.getUniqueIdentifier() + "_" + spillNumber);


Mime
View raw message