tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rbalamo...@apache.org
Subject tez git commit: TEZ-2575. Handle KeyValue pairs size which do not fit in a single block in PipelinedSorter (Saikat via rbalamohan)
Date Wed, 02 Sep 2015 13:44:31 GMT
Repository: tez
Updated Branches:
  refs/heads/branch-0.7 aa767b90e -> 0e155e718


TEZ-2575. Handle KeyValue pairs size which do not fit in a single block in PipelinedSorter
(Saikat via rbalamohan)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/0e155e71
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/0e155e71
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/0e155e71

Branch: refs/heads/branch-0.7
Commit: 0e155e7185d1350f64dead488103777295ac76d1
Parents: aa767b9
Author: Rajesh Balamohan <rbalamohan@apache.org>
Authored: Wed Sep 2 19:18:45 2015 +0530
Committer: Rajesh Balamohan <rbalamohan@apache.org>
Committed: Wed Sep 2 19:18:45 2015 +0530

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../common/sort/impl/PipelinedSorter.java       | 130 +++++++++++++++----
 .../common/sort/impl/TestPipelinedSorter.java   | 111 ++++++++++++++--
 3 files changed, 206 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/0e155e71/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index e098f7f..63706c9 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -6,6 +6,7 @@ Release 0.7.1: Unreleased
 INCOMPATIBLE CHANGES
 
 ALL CHANGES:
+  TEZ-2575. Handle KeyValue pairs size which do not fit in a single block in PipelinedSorter
   TEZ-2198. Fix sorter spill counts
   TEZ-2440. Sorter should check for indexCacheList.size() in flush()
   TEZ-2742. VertexImpl.finished() terminationCause hides member var of the

http://git-wip-us.apache.org/repos/asf/tez/blob/0e155e71/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
index ba909ad..d43c3a3 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
@@ -209,21 +209,18 @@ public class PipelinedSorter extends ExternalSorter {
     SortSpan newSpan = span.next();
 
     if(newSpan == null) {
-      Stopwatch stopWatch = new Stopwatch();
-      stopWatch.start();
-      // sort in the same thread, do not wait for the thread pool
-      merger.add(span.sort(sorter));
-      spill();
-      stopWatch.stop();
-      LOG.info("Time taken for spill " + (stopWatch.elapsedMillis()) + " ms");
-      if (pipelinedShuffle) {
-        List<Event> events = Lists.newLinkedList();
-        String pathComponent = (outputContext.getUniqueIdentifier() + "_" + (numSpills-1));
-        ShuffleUtils.generateEventOnSpill(events, isFinalMergeEnabled(), false, outputContext,
-            (numSpills - 1), indexCacheList.get(numSpills - 1), partitions, sendEmptyPartitionDetails,
-            pathComponent);
-        outputContext.sendEvents(events);
-        LOG.info("Adding spill event for spill (final update=false), spillId=" + (numSpills
- 1));
+      //avoid sort/spill of empty span
+      if (span.length() > 0) {
+        Stopwatch stopWatch = new Stopwatch();
+        stopWatch.start();
+        // sort in the same thread, do not wait for the thread pool
+        merger.add(span.sort(sorter));
+        spill();
+        stopWatch.stop();
+        LOG.info("Time taken for spill " + (stopWatch.elapsedMillis()) + " ms");
+        if (pipelinedShuffle) {
+          sendPipelinedShuffleEvents();
+        }
       }
       //safe to reset the iterator
       listIterator = bufferList.listIterator();
@@ -252,6 +249,17 @@ public class PipelinedSorter extends ExternalSorter {
     keySerializer.open(span.out);
   }
 
+  // if pipelined shuffle is enabled, this method is called to send events for every spill
+  private void sendPipelinedShuffleEvents() throws IOException{
+    List<Event> events = Lists.newLinkedList();
+    String pathComponent = (outputContext.getUniqueIdentifier() + "_" + (numSpills-1));
+    ShuffleUtils.generateEventOnSpill(events, isFinalMergeEnabled(), false, outputContext,
+        (numSpills - 1), indexCacheList.get(numSpills - 1), partitions, sendEmptyPartitionDetails,
+        pathComponent);
+    outputContext.sendEvents(events);
+    LOG.info("Added spill event for spill (final update=false), spillId=" + (numSpills -
1));
+  }
+
   @Override
   public void write(Object key, Object value)
       throws IOException {
@@ -280,13 +288,18 @@ public class PipelinedSorter extends ExternalSorter {
       throw new IOException("Illegal partition for " + key + " (" +
           partition + ")");
     }
-    if(span.kvmeta.remaining() < METASIZE) {
+    // TBD:FIX in TEZ-2574
+    if (span.kvmeta.remaining() < METASIZE) {
       this.sort();
+      if (span.length() == 0) {
+        spillSingleRecord(key, value, partition);
+        return;
+      }
     }
     int keystart = span.kvbuffer.position();
     int valstart = -1;
     int valend = -1;
-    try { 
+    try {
       keySerializer.serialize(key);
       valstart = span.kvbuffer.position();      
       valSerializer.serialize(value);
@@ -295,13 +308,13 @@ public class PipelinedSorter extends ExternalSorter {
       // restore limit
       span.kvbuffer.position(keystart);
       this.sort();
-
-      bufferOverflowRecursion++;
-      if (bufferOverflowRecursion > bufferList.size()) {
-        throw new MapBufferTooSmallException("Record too large for in-memory buffer. Exceeded
"
-            + "buffer overflow limit, bufferOverflowRecursion=" + bufferOverflowRecursion
+ ", bufferList"
-            + ".size=" + bufferList.size() + ", blockSize=" + blockSize);
+      if (span.length() == 0 || bufferOverflowRecursion > bufferList.size()) {
+        // spill the current key value pair
+        spillSingleRecord(key, value, partition);
+        bufferOverflowRecursion = 0;
+        return;
       }
+      bufferOverflowRecursion++;
       // try again
       this.collect(key, value, partition);
       return;
@@ -343,6 +356,72 @@ public class PipelinedSorter extends ExternalSorter {
     }
   }
 
+  // it is guaranteed that when spillSingleRecord is called, there is
+  // no merger spans queued in executor.
+  private void spillSingleRecord(final Object key, final Object value,
+      int partition) throws IOException {
+    final TezSpillRecord spillRec = new TezSpillRecord(partitions);
+    // getSpillFileForWrite with size -1 as the serialized size of KV pair is still unknown
+    final Path filename = mapOutputFile.getSpillFileForWrite(numSpills, -1);
+    spillFilePaths.put(numSpills, filename);
+    FSDataOutputStream out = rfs.create(filename, true, 4096);
+
+    try {
+      LOG.info("Spilling to " + filename.toString());
+      for (int i = 0; i < partitions; ++i) {
+        Writer writer = null;
+        try {
+          long segmentStart = out.getPos();
+          writer = new Writer(conf, out, keyClass, valClass, codec,
+              spilledRecordsCounter, null, false);
+          // we need not check for combiner since its a single record
+          if (i == partition) {
+            final long recordStart = out.getPos();
+            writer.append(key, value);
+            mapOutputRecordCounter.increment(1);
+            mapOutputByteCounter.increment(out.getPos() - recordStart);
+          }
+
+          writer.close();
+          adjustSpillCounters(writer.getRawLength(), writer.getCompressedLength());
+
+          // record offsets
+          final TezIndexRecord rec =
+              new TezIndexRecord(
+                  segmentStart,
+                  writer.getRawLength(),
+                  writer.getCompressedLength());
+          spillRec.putIndex(rec, i);
+          writer = null;
+        } finally {
+          if (null != writer) {
+            writer.close();
+          }
+        }
+      }
+
+      Path indexFilename =
+          mapOutputFile.getSpillIndexFileForWrite(numSpills, partitions
+              * MAP_OUTPUT_INDEX_RECORD_LENGTH);
+      LOG.info("Spill Index filename:" + indexFilename);
+      spillFileIndexPaths.put(numSpills, indexFilename);
+      spillRec.writeToFile(indexFilename, conf);
+      //TODO: honor cache limits
+      indexCacheList.add(spillRec);
+      ++numSpills;
+      if (!isFinalMergeEnabled()) {
+        fileOutputByteCounter.increment(rfs.getFileStatus(filename).getLen());
+        //No final merge. Set the number of files offered via shuffle-handler
+        numShuffleChunks.setValue(numSpills);
+      }
+      if (pipelinedShuffle) {
+        sendPipelinedShuffleEvents();
+      }
+    } finally {
+      out.close();
+    }
+  }
+
   public void spill() throws IOException { 
     // create spill file
     final long size = capacity +
@@ -373,7 +452,6 @@ public class PipelinedSorter extends ExternalSorter {
         //close
         writer.close();
         adjustSpillCounters(writer.getRawLength(), writer.getCompressedLength());
-
         // record offsets
         final TezIndexRecord rec = 
             new TezIndexRecord(
@@ -390,12 +468,12 @@ public class PipelinedSorter extends ExternalSorter {
       spillRec.writeToFile(indexFilename, conf);
       //TODO: honor cache limits
       indexCacheList.add(spillRec);
+      ++numSpills;
       if (!isFinalMergeEnabled()) {
         fileOutputByteCounter.increment(rfs.getFileStatus(filename).getLen());
         //No final merge. Set the number of files offered via shuffle-handler
         numShuffleChunks.setValue(numSpills);
       }
-      ++numSpills;
     } catch(InterruptedException ie) {
       // TODO:the combiner has been interrupted
     } finally {
@@ -629,7 +707,7 @@ public class PipelinedSorter extends ExternalSorter {
       ByteBuffer reserved = source.duplicate();
       reserved.mark();
       LOG.info("reserved.remaining() = " + reserved.remaining());
-      LOG.info("reserved.size = "+ metasize);
+      LOG.info("reserved.metasize = "+ metasize);
       reserved.position(metasize);
       kvbuffer = reserved.slice();
       reserved.flip();

http://git-wip-us.apache.org/repos/asf/tez/blob/0e155e71/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java
b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java
index 8bf91ce..135dc78 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java
@@ -138,15 +138,8 @@ public class TestPipelinedSorter {
 
   @Test
   public void basicTestWithSmallBlockSize() throws IOException {
-    try {
-      //3 MB key & 3 MB value, whereas block size is just 3 MB
-      basicTest(1, 5, (3 << 20), (10 * 1024l * 1024l), 3 << 20);
-      fail();
-    } catch (IOException ioe) {
-      Assert.assertTrue(
-          ioe.getMessage().contains("Record too large for in-memory buffer."
-              + " Exceeded buffer overflow limit"));
-    }
+    //3 MB key & 3 MB value, whereas block size is just 3 MB
+    basicTest(1, 5, (3 << 20), (10 * 1024l * 1024l), 3 << 20);
   }
 
   @Test
@@ -157,6 +150,77 @@ public class TestPipelinedSorter {
   }
 
   @Test
+  public void testKVExceedsBuffer() throws IOException {
+    // a single block of 1mb, 2KV pair, key 1mb, value 1mb
+    basicTest(1, 2, (1 << 20), (1 * 1024l * 1024l), 1<<20);
+  }
+
+  @Test
+  public void testKVExceedsBuffer2() throws IOException {
+    // a list of 4 blocks each 256kb, 2KV pair, key 1mb, value 1mb
+    basicTest(1, 2, (1 << 20), (1 * 1024l * 1024l), 256<<10);
+  }
+
+  @Test
+  public void testExceedsKVWithMultiplePartitions() throws IOException {
+    conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT, true);
+    this.numOutputs = 5;
+    this.initialAvailableMem = 1 * 1024 * 1024;
+    PipelinedSorter sorter = new PipelinedSorter(this.outputContext, conf, numOutputs,
+        initialAvailableMem, 0);
+
+    writeData(sorter, 100, 1<<20);
+    verifyCounters(sorter, outputContext);
+  }
+
+  @Test
+  public void testExceedsKVWithPipelinedShuffle() throws IOException {
+    this.numOutputs = 1;
+    this.initialAvailableMem = 1 *1024 * 1024;
+    conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT, false);
+    PipelinedSorter sorter = new PipelinedSorter(this.outputContext, conf, numOutputs,
+        initialAvailableMem, 1 << 20);
+
+    writeData(sorter, 5, 1<<20);
+
+    // final merge is disabled. Final output file would not be populated in this case.
+    assertTrue(sorter.finalOutputFile == null);
+    TezCounter numShuffleChunks = outputContext.getCounters().findCounter(TaskCounter.SHUFFLE_CHUNK_COUNT);
+    assertTrue(sorter.getNumSpills() == numShuffleChunks.getValue());
+    conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT, true);
+  }
+
+  @Test
+  // first write a KV which dosnt fit into span, this will spill to disk
+  // next write smaller keys, which will update the hint
+  public void testWithVariableKVLength1() throws IOException {
+    int numkeys[] = {2, 2};
+    int keylens[] = {32 << 20, 7 << 20};
+    basicTest2(1, numkeys, keylens, 64 << 20, 32 << 20);
+  }
+
+  @Test
+  // first write a kv pair which fits into buffer,
+  // next try to write a kv pair which doesnt fit into remaining buffer
+  public void testWithVariableKVLength() throws IOException {
+    //2 KVpairs of 2X2mb, 2 KV of 2X7mb
+    int numkeys[] = {2, 2};
+    int keylens[] = {2 << 20, 7<<20};
+    basicTest2(1, numkeys, keylens, 64 << 20, 32 << 20);
+  }
+
+  @Test
+  // first write KV which fits into span
+  // then write KV which doesnot fit in buffer. this will be spilled to disk
+  // all keys should be merged properly
+  public void testWithVariableKVLength2() throws IOException {
+    // 20 KVpairs of 2X10kb, 10 KV of 2X200kb, 20KV of 2X10kb
+    int numkeys[] = {20, 10, 20};
+    int keylens[] = {10<<10, 200<<10, 10<<10};
+    basicTest2(1, numkeys, keylens, (1 * 1024l * 1024l), 1 << 18);
+  }
+
+  @Test
   public void testWithCustomComparator() throws IOException {
     //Test with custom comparator
     conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_COMPARATOR_CLASS, CustomComparator.class.getName());
@@ -177,9 +241,36 @@ public class TestPipelinedSorter {
 
     //final merge is disabled. Final output file would not be populated in this case.
     assertTrue(sorter.finalOutputFile == null);
+    conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT, true);
     verify(outputContext, times(1)).sendEvents(anyListOf(Event.class));
   }
 
+  public void basicTest2(int partitions, int[] numkeys, int[] keysize,
+      long initialAvailableMem, int  blockSize) throws IOException {
+    this.numOutputs = partitions; // single output
+    PipelinedSorter sorter = new PipelinedSorter(this.outputContext, conf, numOutputs,
+        initialAvailableMem, blockSize);
+    writeData2(sorter, numkeys, keysize);
+    verifyCounters(sorter, outputContext);
+  }
+
+  private void writeData2(ExternalSorter sorter,
+      int[] numKeys, int[] keyLen) throws IOException {
+    sortedDataMap.clear();
+    int counter = 0;
+    for (int numkey : numKeys) {
+      int curKeyLen = keyLen[counter];
+      for (int i = 0; i < numkey; i++) {
+        Text key = new Text(RandomStringUtils.randomAlphanumeric(curKeyLen));
+        Text value = new Text(RandomStringUtils.randomAlphanumeric(curKeyLen));
+        sorter.write(key, value);
+      }
+      counter++;
+    }
+    sorter.flush();
+    sorter.close();
+  }
+
   public void basicTest(int partitions, int numKeys, int keySize,
       long initialAvailableMem, int blockSize) throws IOException {
     this.numOutputs = partitions; // single output
@@ -188,10 +279,10 @@ public class TestPipelinedSorter {
 
     writeData(sorter, numKeys, keySize);
 
+
     verifyCounters(sorter, outputContext);
     Path outputFile = sorter.finalOutputFile;
     FileSystem fs = outputFile.getFileSystem(conf);
-
     IFile.Reader reader = new IFile.Reader(fs, outputFile, null, null, null, false, -1, 4096);
     //Verify dataset
     verifyData(reader);


Mime
View raw message