tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ss...@apache.org
Subject tez git commit: TEZ-3206. Have unordered partitioned KV output send partition stats via VertexManagerEvent. Contributed by Ming Ma.
Date Mon, 23 May 2016 21:31:49 GMT
Repository: tez
Updated Branches:
  refs/heads/master cc68f7b3f -> 2ecef2527


TEZ-3206. Have unordered partitioned KV output send partition stats via
VertexManagerEvent. Contributed by Ming Ma.


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/2ecef252
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/2ecef252
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/2ecef252

Branch: refs/heads/master
Commit: 2ecef25276f17c4969ab031929684e81ef3beee3
Parents: cc68f7b
Author: Siddharth Seth <sseth@apache.org>
Authored: Mon May 23 14:31:36 2016 -0700
Committer: Siddharth Seth <sseth@apache.org>
Committed: Mon May 23 14:31:36 2016 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../vertexmanager/ShuffleVertexManager.java     |   2 +-
 .../writers/UnorderedPartitionedKVWriter.java   | 108 +++++++++++--
 .../TestUnorderedPartitionedKVWriter.java       | 150 ++++++++++++++-----
 .../output/TestOnFileUnorderedKVOutput.java     |   6 +-
 5 files changed, 211 insertions(+), 56 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/2ecef252/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index ba7a04e..c3499ac 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES
 
 ALL CHANGES:
 
+  TEZ-3206. Have unordered partitioned KV output send partition stats via VertexManagerEvent.
   TEZ-3237. Corrupted shuffle transfers to disk are not detected during transfer
   TEZ-3240. Improvements to tez.lib.uris to allow for multiple tarballs and mixing tarballs
and jars.
   TEZ-3246. Improve diagnostics when DAG killed by user

http://git-wip-us.apache.org/repos/asf/tez/blob/2ecef252/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
index aee8b6f..b83c64e 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
@@ -585,7 +585,7 @@ public class ShuffleVertexManager extends VertexManagerPlugin {
 
   private void handleVertexManagerEvent(VertexManagerEvent vmEvent) {
     // currently events from multiple attempts of the same task can be ignored because
-    // their output will be the same. However, with pipelined events that may not hold.
+    // their output will be the same.
     TaskIdentifier producerTask = vmEvent.getProducerAttemptIdentifier().getTaskIdentifier();
     if (!taskWithVmEvents.add(producerTask)) {
       LOG.info("Ignoring vertex manager event from: " + producerTask);

http://git-wip-us.apache.org/repos/asf/tez/blob/2ecef252/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
index c7e3059..76075bb 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
@@ -45,6 +45,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.tez.common.CallableWithNdc;
 import org.apache.tez.common.TezCommonUtils;
@@ -55,6 +56,7 @@ import org.apache.tez.runtime.api.Event;
 import org.apache.tez.runtime.api.TaskFailureType;
 import org.apache.tez.runtime.api.OutputContext;
 import org.apache.tez.runtime.api.events.CompositeDataMovementEvent;
+import org.apache.tez.runtime.api.events.VertexManagerEvent;
 import org.apache.tez.runtime.library.api.IOInterruptedException;
 import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
 import org.apache.tez.runtime.library.common.Constants;
@@ -63,7 +65,9 @@ import org.apache.tez.runtime.library.common.sort.impl.TezIndexRecord;
 import org.apache.tez.runtime.library.common.sort.impl.IFile.Writer;
 import org.apache.tez.runtime.library.common.sort.impl.TezSpillRecord;
 import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils;
+import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads;
 import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads.DataMovementEventPayloadProto;
+import org.roaringbitmap.RoaringBitmap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -111,6 +115,8 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
   private final ListeningExecutorService spillExecutor;
 
   private final int[] numRecordsPerPartition;
+  // uncompressed size for each partition
+  private final long[] sizePerPartition;
   private volatile long spilledSize = 0;
 
   /**
@@ -197,10 +203,12 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
             .setDaemon(true)
             .setNameFormat(
                 "UnorderedOutSpiller {"
-                    + TezUtilsInternal.cleanVertexName(outputContext.getDestinationVertexName())
+ "}")
+                    + TezUtilsInternal.cleanVertexName(
+                        outputContext.getDestinationVertexName()) + "}")
             .build());
     spillExecutor = MoreExecutors.listeningDecorator(executor);
     numRecordsPerPartition = new int[numPartitions];
+    sizePerPartition = new long[numPartitions];
 
     outputLargeRecordsCounter = outputContext.getCounters().findCounter(
         TaskCounter.OUTPUT_LARGE_RECORDS);
@@ -250,7 +258,11 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
     }
     if (skipBuffers) {
       //special case, where we have only one partition and pipelining is disabled.
-      writer.append(key, value); // ???? Why is outputrecordscounter not updated here?
+      // The reason outputRecordsCounter isn't updated here:
+      // For skipBuffers case, IFile writer has the reference to
+      // outputRecordsCounter and during its close method call,
+      // it will update the outputRecordsCounter.
+      writer.append(key, value);
       outputContext.notifyProgress();
     } else {
       int partition = partitioner.getPartition(key, value, numPartitions);
@@ -272,7 +284,7 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
     int metaStart = currentBuffer.nextPosition;
     currentBuffer.availableSize -= (META_SIZE + metaSkip);
     currentBuffer.nextPosition += META_SIZE;
-    
+
     keySerializer.serialize(key);
 
     if (currentBuffer.full) {
@@ -293,7 +305,7 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
 
     int valStart = currentBuffer.nextPosition;
     valSerializer.serialize(value);
-      
+
     if (currentBuffer.full) {
       // Value too large for current buffer, or K-V too large for entire buffer.
       if (metaStart == 0) {
@@ -324,6 +336,8 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
     outputContext.notifyProgress();
     currentBuffer.partitionPositions[partition] = metaStart;
     currentBuffer.recordsPerPartition[partition]++;
+    currentBuffer.sizePerPartition[partition] +=
+        currentBuffer.nextPosition - (metaStart + META_SIZE);
     currentBuffer.numRecords++;
 
   }
@@ -353,6 +367,7 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
   private void updateGlobalStats(WrappedBuffer buffer) {
     for (int i = 0; i < numPartitions; i++) {
       numRecordsPerPartition[i] += buffer.recordsPerPartition[i];
+      sizePerPartition[i] += buffer.sizePerPartition[i];
     }
   }
 
@@ -472,6 +487,7 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
 
   @Override
   public List<Event> close() throws IOException, InterruptedException {
+    List<Event> eventList = Lists.newLinkedList();
     isShutdown.set(true);
     spillLock.lock();
     try {
@@ -513,12 +529,15 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
           if (outputRecordsCounter.getValue() == 0) {
             emptyPartitions.set(0);
           }
+          sizePerPartition[0] = rawLen;
           cleanupCurrentBuffer();
 
           outputBytesWithOverheadCounter.increment(rawLen);
           fileOutputBytesCounter.increment(compLen + indexFileSizeEstimate);
-          return Collections.singletonList(generateDMEvent(false, -1, false, outputContext
+          eventList.add(generateVMEvent());
+          eventList.add(generateDMEvent(false, -1, false, outputContext
               .getUniqueIdentifier(), emptyPartitions));
+          return eventList;
         }
 
         //Regular code path.
@@ -528,12 +547,17 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
           finalSpill();
         }
         cleanupCurrentBuffer();
-        return Collections.singletonList(generateDMEvent());
+        eventList.add(generateVMEvent());
+        eventList.add(generateDMEvent());
+        return eventList;
       }
 
       //For pipelined case, send out an event in case finalspill generated a spill file.
       if (finalSpill()) {
-        sendPipelinedEventForSpill(currentBuffer.recordsPerPartition, numSpills.get() - 1,
true);
+        // VertexManagerEvent is only sent at the end and thus sizePerPartition is used
+        // for the sum of all spills.
+        sendPipelinedEventForSpill(currentBuffer.recordsPerPartition,
+            sizePerPartition, numSpills.get() - 1, true);
       }
       cleanupCurrentBuffer();
       return events;
@@ -551,6 +575,39 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
     return emptyPartitions;
   }
 
+  private Event generateVMEvent() throws IOException {
+    return generateVMEvent(this.sizePerPartition);
+  }
+
+  private Event generateVMEvent(long[] sizePerPartition) throws IOException {
+    ShuffleUserPayloads.VertexManagerEventPayloadProto.Builder vmBuilder =
+        ShuffleUserPayloads.VertexManagerEventPayloadProto.newBuilder();
+
+    long outputSize = outputContext.getCounters().
+        findCounter(TaskCounter.OUTPUT_BYTES).getValue();
+
+    // Set this information only when required.  In pipelined shuffle,
+    // multiple events would end up adding up to final output size.
+    // This is needed for auto-reduce parallelism to work properly.
+    vmBuilder.setOutputSize(outputSize);
+
+    //set partition stats
+    if (sizePerPartition != null && sizePerPartition.length > 0) {
+      RoaringBitmap stats = ShuffleUtils.getPartitionStatsForPhysicalOutput(
+          sizePerPartition);
+      DataOutputBuffer dout = new DataOutputBuffer();
+      stats.serialize(dout);
+      ByteString partitionStatsBytes =
+          TezCommonUtils.compressByteArrayToByteString(dout.getData());
+      vmBuilder.setPartitionStats(partitionStatsBytes);
+    }
+
+    VertexManagerEvent vmEvent = VertexManagerEvent.create(
+        outputContext.getDestinationVertexName(),
+            vmBuilder.build().toByteString().asReadOnlyByteBuffer());
+    return vmEvent;
+  }
+
   private Event generateDMEvent() throws IOException {
     BitSet emptyPartitions = getEmptyPartitions(numRecordsPerPartition);
     return generateDMEvent(false, -1, false, outputContext.getUniqueIdentifier(), emptyPartitions);
@@ -609,12 +666,14 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
   private boolean finalSpill() throws IOException {
     if (currentBuffer.nextPosition == 0) {
       if (pipelinedShuffle) {
+        List<Event> eventList = Lists.newLinkedList();
+        eventList.add(generateVMEvent(new long[numPartitions]));
         //Send final event with all empty partitions and null path component.
         BitSet emptyPartitions = new BitSet(numPartitions);
         emptyPartitions.flip(0, numPartitions);
-
-        outputContext.sendEvents(
-            Collections.singletonList(generateDMEvent(true, numSpills.get(), true, null,
emptyPartitions)));
+        eventList.add(generateDMEvent(true, numSpills.get(), true,
+            null, emptyPartitions));
+        outputContext.sendEvents(eventList);
       }
       return false;
     } else {
@@ -785,6 +844,7 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
             writer.append(key, value);
             outputLargeRecordsCounter.increment(1);
             numRecordsPerPartition[i]++;
+            sizePerPartition[i] += writer.getRawLength();
             writer.close();
             additionalSpillBytesWritternCounter.increment(writer.getCompressedLength());
             TezIndexRecord indexRecord = new TezIndexRecord(recordStart, writer.getRawLength(),
@@ -805,7 +865,8 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
       }
       handleSpillIndex(spillPathDetails, spillRecord);
 
-      sendPipelinedEventForSpill(emptyPartitions, spillIndex, false);
+      sendPipelinedEventForSpill(emptyPartitions, sizePerPartition,
+          spillIndex, false);
 
       LOG.info(destNameTrimmed + ": " + "Finished writing large record of size " + outSize
+ " to spill file " + spillIndex);
       if (LOG.isDebugEnabled()) {
@@ -862,6 +923,8 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
 
     private final int[] partitionPositions;
     private final int[] recordsPerPartition;
+    // uncompressed size for each partition
+    private final long[] sizePerPartition;
     private final int numPartitions;
     private final int size;
 
@@ -878,10 +941,12 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
     WrappedBuffer(int numPartitions, int size) {
       this.partitionPositions = new int[numPartitions];
       this.recordsPerPartition = new int[numPartitions];
+      this.sizePerPartition = new long[numPartitions];
       this.numPartitions = numPartitions;
       for (int i = 0; i < numPartitions; i++) {
         this.partitionPositions[i] = PARTITION_ABSENT_POSITION;
         this.recordsPerPartition[i] = 0;
+        this.sizePerPartition[i] = 0;
       }
       size = size - (size % INT_SIZE);
       this.size = size;
@@ -894,6 +959,7 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
       for (int i = 0; i < numPartitions; i++) {
         this.partitionPositions[i] = PARTITION_ABSENT_POSITION;
         this.recordsPerPartition[i] = 0;
+        this.sizePerPartition[i] = 0;
       }
       numRecords = 0;
       nextPosition = 0;
@@ -908,27 +974,36 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
     }
   }
 
-  private void sendPipelinedEventForSpill(BitSet emptyPartitions, int spillNumber, boolean
isFinalUpdate) {
+  private void sendPipelinedEventForSpill(
+      BitSet emptyPartitions, long[] sizePerPartition, int spillNumber,
+      boolean isFinalUpdate) {
+    List<Event> eventList = Lists.newLinkedList();
     if (!pipelinedShuffle) {
       return;
     }
     //Send out an event for consuming.
     try {
       String pathComponent = (outputContext.getUniqueIdentifier() + "_" + spillNumber);
+      if (isFinalUpdate) {
+        eventList.add(generateVMEvent(sizePerPartition));
+      }
       Event compEvent = generateDMEvent(true, spillNumber, isFinalUpdate,
           pathComponent, emptyPartitions);
+      eventList.add(compEvent);
 
       LOG.info(destNameTrimmed + ": " + "Adding spill event for spill (final update=" + isFinalUpdate
+ "), spillId=" + spillNumber);
-      outputContext.sendEvents(Collections.singletonList(compEvent));
+      outputContext.sendEvents(eventList);
     } catch (IOException e) {
       LOG.error(destNameTrimmed + ": " + "Error in sending pipelined events", e);
       outputContext.reportFailure(TaskFailureType.NON_FATAL, e, "Error in sending pipelined
events");
     }
   }
 
-  private void sendPipelinedEventForSpill(int[] recordsPerPartition, int spillNumber, boolean
isFinalUpdate) {
+  private void sendPipelinedEventForSpill(int[] recordsPerPartition,
+      long[] sizePerPartition, int spillNumber, boolean isFinalUpdate) {
     BitSet emptyPartitions = getEmptyPartitions(recordsPerPartition);
-    sendPipelinedEventForSpill(emptyPartitions, spillNumber, isFinalUpdate);
+    sendPipelinedEventForSpill(emptyPartitions, sizePerPartition, spillNumber,
+        isFinalUpdate);
   }
 
   private class SpillCallback implements FutureCallback<SpillResult> {
@@ -943,7 +1018,8 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
     public void onSuccess(SpillResult result) {
       spilledSize += result.spillSize;
 
-      sendPipelinedEventForSpill(result.wrappedBuffer.recordsPerPartition, spillNumber, false);
+      sendPipelinedEventForSpill(result.wrappedBuffer.recordsPerPartition,
+          result.wrappedBuffer.sizePerPartition, spillNumber, false);
 
       try {
         result.wrappedBuffer.reset();

http://git-wip-us.apache.org/repos/asf/tez/blob/2ecef252/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java
b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java
index 8c935eb..3b82690 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java
@@ -32,12 +32,15 @@ import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.BitSet;
 import java.util.Collection;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
@@ -46,6 +49,10 @@ import java.util.UUID;
 
 import com.google.protobuf.ByteString;
 import org.apache.tez.runtime.api.TaskFailureType;
+import org.apache.tez.runtime.api.events.VertexManagerEvent;
+import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads;
+import org.apache.tez.runtime.library.utils.DATA_RANGE_IN_MB;
+import org.roaringbitmap.RoaringBitmap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configurable;
@@ -197,7 +204,7 @@ public class TestUnorderedPartitionedKVWriter {
   }
 
   @Test(timeout = 10000)
-  public void testNoSpill_SinglPartition() throws IOException, InterruptedException {
+  public void testNoSpill_SinglePartition() throws IOException, InterruptedException {
     baseTest(10, 1, null, shouldCompress);
   }
 
@@ -336,20 +343,24 @@ public class TestUnorderedPartitionedKVWriter {
       return;
     }
 
-    // Validate the event
-    assertEquals(1, events.size());
-    assertTrue(events.get(0) instanceof CompositeDataMovementEvent);
-    CompositeDataMovementEvent cdme = (CompositeDataMovementEvent) events.get(0);
+    // Validate the events
+    assertEquals(2, events.size());
+
+    assertTrue(events.get(0) instanceof VertexManagerEvent);
+    VertexManagerEvent vme = (VertexManagerEvent) events.get(0);
+    verifyPartitionStats(vme, partitionsWithData);
+
+    assertTrue(events.get(1) instanceof CompositeDataMovementEvent);
+    CompositeDataMovementEvent cdme = (CompositeDataMovementEvent) events.get(1);
     assertEquals(0, cdme.getSourceIndexStart());
     assertEquals(numPartitions, cdme.getCount());
     DataMovementEventPayloadProto eventProto = DataMovementEventPayloadProto.parseFrom(
-        ByteString.copyFrom(cdme
-            .getUserPayload()));
+        ByteString.copyFrom(cdme.getUserPayload()));
     BitSet emptyPartitionBits = null;
     if (partitionsWithData.cardinality() != numPartitions) {
       assertTrue(eventProto.hasEmptyPartitions());
-      byte[] emptyPartitions = TezCommonUtils.decompressByteStringToByteArray(eventProto
-          .getEmptyPartitions());
+      byte[] emptyPartitions = TezCommonUtils.decompressByteStringToByteArray(
+          eventProto.getEmptyPartitions());
       emptyPartitionBits = TezUtilsInternal.fromByteArray(emptyPartitions);
       assertEquals(numPartitions - partitionsWithData.cardinality(),
           emptyPartitionBits.cardinality());
@@ -404,6 +415,43 @@ public class TestUnorderedPartitionedKVWriter {
     assertEquals(0, expectedValues.size());
   }
 
+  private long[] getPartitionStats(
+      VertexManagerEvent vme) throws IOException {
+    RoaringBitmap partitionStats = new RoaringBitmap();
+    ShuffleUserPayloads.VertexManagerEventPayloadProto
+        payload = ShuffleUserPayloads.VertexManagerEventPayloadProto
+        .parseFrom(ByteString.copyFrom(vme.getUserPayload()));
+    assertTrue(payload.hasPartitionStats());
+    ByteString compressedPartitionStats = payload.getPartitionStats();
+    byte[] rawData = TezCommonUtils.decompressByteStringToByteArray(
+        compressedPartitionStats);
+    ByteArrayInputStream bin = new ByteArrayInputStream(rawData);
+    partitionStats.deserialize(new DataInputStream(bin));
+    long[] stats = new long[partitionStats.getCardinality()];
+    Iterator<Integer> it = partitionStats.iterator();
+    final DATA_RANGE_IN_MB[] RANGES = DATA_RANGE_IN_MB.values();
+    final int RANGE_LEN = RANGES.length;
+    while (it.hasNext()) {
+      int pos = it.next();
+      int index = ((pos) / RANGE_LEN);
+      int rangeIndex = ((pos) % RANGE_LEN);
+      if (RANGES[rangeIndex].getSizeInMB() > 0) {
+        stats[index] += RANGES[rangeIndex].getSizeInMB();
+      }
+    }
+    return stats;
+  }
+
+  private void verifyPartitionStats(VertexManagerEvent vme,
+      BitSet expectedPartitionsWithData) throws IOException {
+    long[] stats = getPartitionStats(vme);
+    for (int i = 0; i < stats.length; i++) {
+      // The stats should be greater than zero if and only if
+      // the partition has data
+      assertTrue(expectedPartitionsWithData.get(i) == (stats[i] > 0));
+    }
+  }
+
   @Test(timeout = 10000)
   public void testNoSpill_WithPipelinedShuffle() throws IOException, InterruptedException
{
     baseTestWithPipelinedTransfer(10, 10, null, shouldCompress);
@@ -469,6 +517,7 @@ public class TestUnorderedPartitionedKVWriter {
     int sizePerRecord = 4 + 8; // IntW + LongW
     int sizePerRecordWithOverhead = sizePerRecord + 12; // Record + META_OVERHEAD
 
+    BitSet partitionsWithData = new BitSet(numPartitions);
     IntWritable intWritable = new IntWritable();
     LongWritable longWritable = new LongWritable();
     for (int i = 0; i < numRecords; i++) {
@@ -478,6 +527,7 @@ public class TestUnorderedPartitionedKVWriter {
       if (skippedPartitions != null && skippedPartitions.contains(partition)) {
         continue;
       }
+      partitionsWithData.set(partition);
       kvWriter.write(intWritable, longWritable);
       numRecordsWritten++;
     }
@@ -486,15 +536,29 @@ public class TestUnorderedPartitionedKVWriter {
     int numExpectedSpills = numRecordsWritten / recordsPerBuffer;
 
     ArgumentCaptor<List> eventCaptor = ArgumentCaptor.forClass(List.class);
-    List<Event> events = kvWriter.close();
-    assertTrue(events.size() == 0); //no events are sent to kvWriter upon close with pipelining
-
+    List<Event> lastEvents = kvWriter.close();
+    //no events are sent to kvWriter upon close with pipelining
+    assertTrue(lastEvents.size() == 0);
     verify(outputContext, atLeast(numExpectedSpills)).sendEvents(eventCaptor.capture());
-    events = eventCaptor.getValue();
-
-    assertTrue(events.size() == 1); //the last event which was sent out
+    int numOfCapturedEvents = eventCaptor.getAllValues().size();
+    lastEvents = eventCaptor.getAllValues().get(numOfCapturedEvents - 1);
+    VertexManagerEvent VMEvent = (VertexManagerEvent)lastEvents.get(0);
+
+    for (int i=0; i<numOfCapturedEvents; i++) {
+      List<Event> events = eventCaptor.getAllValues().get(i);
+      if (i < numOfCapturedEvents - 1) {
+        assertTrue(events.size() == 1);
+        assertTrue(events.get(0) instanceof CompositeDataMovementEvent);
+      } else {
+        assertTrue(events.size() == 2);
+        assertTrue(events.get(0) instanceof VertexManagerEvent);
+        assertTrue(events.get(1) instanceof CompositeDataMovementEvent);
+      }
+    }
+    verifyPartitionStats(VMEvent, partitionsWithData);
 
-    verify(outputContext, never()).reportFailure(any(TaskFailureType.class), any(Throwable.class),
any(String.class));
+    verify(outputContext, never()).reportFailure(any(TaskFailureType.class),
+        any(Throwable.class), any(String.class));
 
     // Verify the status of the buffers
     if (numExpectedSpills == 0) {
@@ -506,19 +570,24 @@ public class TestUnorderedPartitionedKVWriter {
     assertEquals(0, kvWriter.availableBuffers.size());
 
     // Verify the counters
-    TezCounter outputRecordBytesCounter = counters.findCounter(TaskCounter.OUTPUT_BYTES);
-    TezCounter outputRecordsCounter = counters.findCounter(TaskCounter.OUTPUT_RECORDS);
-    TezCounter outputBytesWithOverheadCounter = counters
-        .findCounter(TaskCounter.OUTPUT_BYTES_WITH_OVERHEAD);
-    TezCounter fileOutputBytesCounter = counters.findCounter(TaskCounter.OUTPUT_BYTES_PHYSICAL);
-    TezCounter spilledRecordsCounter = counters.findCounter(TaskCounter.SPILLED_RECORDS);
+    TezCounter outputRecordBytesCounter =
+        counters.findCounter(TaskCounter.OUTPUT_BYTES);
+    TezCounter outputRecordsCounter =
+        counters.findCounter(TaskCounter.OUTPUT_RECORDS);
+    TezCounter outputBytesWithOverheadCounter =
+        counters.findCounter(TaskCounter.OUTPUT_BYTES_WITH_OVERHEAD);
+    TezCounter fileOutputBytesCounter =
+        counters.findCounter(TaskCounter.OUTPUT_BYTES_PHYSICAL);
+    TezCounter spilledRecordsCounter =
+        counters.findCounter(TaskCounter.SPILLED_RECORDS);
     TezCounter additionalSpillBytesWritternCounter = counters
         .findCounter(TaskCounter.ADDITIONAL_SPILLS_BYTES_WRITTEN);
     TezCounter additionalSpillBytesReadCounter = counters
         .findCounter(TaskCounter.ADDITIONAL_SPILLS_BYTES_READ);
     TezCounter numAdditionalSpillsCounter = counters
         .findCounter(TaskCounter.ADDITIONAL_SPILL_COUNT);
-    assertEquals(numRecordsWritten * sizePerRecord, outputRecordBytesCounter.getValue());
+    assertEquals(numRecordsWritten * sizePerRecord,
+        outputRecordBytesCounter.getValue());
     assertEquals(numRecordsWritten, outputRecordsCounter.getValue());
     assertEquals(numRecordsWritten * sizePerRecordWithOverhead,
         outputBytesWithOverheadCounter.getValue());
@@ -531,8 +600,10 @@ public class TestUnorderedPartitionedKVWriter {
     } else {
       assertEquals(0, fileOutputBytes);
     }
-    assertEquals(recordsPerBuffer * numExpectedSpills, spilledRecordsCounter.getValue());
-    long additionalSpillBytesWritten = additionalSpillBytesWritternCounter.getValue();
+    assertEquals(recordsPerBuffer * numExpectedSpills,
+        spilledRecordsCounter.getValue());
+    long additionalSpillBytesWritten =
+        additionalSpillBytesWritternCounter.getValue();
     long additionalSpillBytesRead = additionalSpillBytesReadCounter.getValue();
 
     //No additional spill bytes written when final merge is disabled.
@@ -545,20 +616,22 @@ public class TestUnorderedPartitionedKVWriter {
     assertEquals(numAdditionalSpillsCounter.getValue(), 0);
 
     BitSet emptyPartitionBits = null;
-    assertTrue(events.size() > 0);
+    assertTrue(lastEvents.size() > 0);
     //Get the last event
-    int index = events.size() - 1;
-    assertTrue(events.get(index) instanceof CompositeDataMovementEvent);
-    CompositeDataMovementEvent cdme = (CompositeDataMovementEvent) events.get(index);
+    int index = lastEvents.size() - 1;
+    assertTrue(lastEvents.get(index) instanceof CompositeDataMovementEvent);
+    CompositeDataMovementEvent cdme =
+        (CompositeDataMovementEvent)lastEvents.get(index);
     assertEquals(0, cdme.getSourceIndexStart());
     assertEquals(numOutputs, cdme.getCount());
     DataMovementEventPayloadProto eventProto =
-        DataMovementEventPayloadProto.parseFrom(ByteString.copyFrom(cdme.getUserPayload()));
+        DataMovementEventPayloadProto.parseFrom(
+            ByteString.copyFrom(cdme.getUserPayload()));
     //Ensure that this is the last event
     assertTrue(eventProto.getLastEvent());
     if (eventProto.hasEmptyPartitions()) {
-      byte[] emptyPartitions = TezCommonUtils.decompressByteStringToByteArray(eventProto
-          .getEmptyPartitions());
+      byte[] emptyPartitions = TezCommonUtils.decompressByteStringToByteArray(
+          eventProto.getEmptyPartitions());
       emptyPartitionBits = TezUtilsInternal.fromByteArray(emptyPartitions);
       if (numRecordsWritten == 0) {
         assertEquals(numPartitions, emptyPartitionBits.cardinality());
@@ -636,6 +709,7 @@ public class TestUnorderedPartitionedKVWriter {
 
     IntWritable intWritable = new IntWritable();
     LongWritable longWritable = new LongWritable();
+    BitSet partitionsWithData = new BitSet(numPartitions);
     for (int i = 0; i < numRecords; i++) {
       intWritable.set(i);
       longWritable.set(i);
@@ -643,6 +717,7 @@ public class TestUnorderedPartitionedKVWriter {
       if (skippedPartitions != null && skippedPartitions.contains(partition)) {
         continue;
       }
+      partitionsWithData.set(partition);
       expectedValues.get(partition).put(intWritable.get(), longWritable.get());
       kvWriter.write(intWritable, longWritable);
       numRecordsWritten++;
@@ -709,10 +784,13 @@ public class TestUnorderedPartitionedKVWriter {
     assertEquals(numExpectedSpills, numAdditionalSpillsCounter.getValue());
 
     BitSet emptyPartitionBits = null;
-    // Verify the event returned
-    assertEquals(1, events.size());
-    assertTrue(events.get(0) instanceof CompositeDataMovementEvent);
-    CompositeDataMovementEvent cdme = (CompositeDataMovementEvent) events.get(0);
+    // Verify the events returned
+    assertEquals(2, events.size());
+    assertTrue(events.get(0) instanceof VertexManagerEvent);
+    VertexManagerEvent vme = (VertexManagerEvent) events.get(0);
+    verifyPartitionStats(vme, partitionsWithData);
+    assertTrue(events.get(1) instanceof CompositeDataMovementEvent);
+    CompositeDataMovementEvent cdme = (CompositeDataMovementEvent) events.get(1);
     assertEquals(0, cdme.getSourceIndexStart());
     assertEquals(numOutputs, cdme.getCount());
     DataMovementEventPayloadProto eventProto =

http://git-wip-us.apache.org/repos/asf/tez/blob/2ecef252/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileUnorderedKVOutput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileUnorderedKVOutput.java
b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileUnorderedKVOutput.java
index 884f0e6..38a60a2 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileUnorderedKVOutput.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileUnorderedKVOutput.java
@@ -143,8 +143,8 @@ public class TestOnFileUnorderedKVOutput {
     events = kvOutput.close();
     assertEquals(45, task.getTaskStatistics().getIOStatistics().values().iterator().next().getDataSize());
     assertEquals(5, task.getTaskStatistics().getIOStatistics().values().iterator().next().getItemsProcessed());
-    assertTrue(events != null && events.size() == 1);
-    CompositeDataMovementEvent dmEvent = (CompositeDataMovementEvent)events.get(0);
+    assertTrue(events != null && events.size() == 2);
+    CompositeDataMovementEvent dmEvent = (CompositeDataMovementEvent)events.get(1);
 
     assertEquals("Invalid source index", 0, dmEvent.getSourceIndexStart());
 
@@ -191,7 +191,7 @@ public class TestOnFileUnorderedKVOutput {
     events = eventsCaptor.getValue();
 
 
-    CompositeDataMovementEvent dmEvent = (CompositeDataMovementEvent)events.get(0);
+    CompositeDataMovementEvent dmEvent = (CompositeDataMovementEvent)events.get(1);
     assertEquals("Invalid source index", 0, dmEvent.getSourceIndexStart());
 
     DataMovementEventPayloadProto shufflePayload = DataMovementEventPayloadProto


Mime
View raw message