tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rbalamo...@apache.org
Subject tez git commit: TEZ-2575. Handle KeyValue pairs size which do not fit in a single block (Saikat via rbalamohan)
Date Sat, 04 Jul 2015 11:54:12 GMT
Repository: tez
Updated Branches:
  refs/heads/master aca83090e -> 714461f47


TEZ-2575. Handle KeyValue pairs size which do not fit in a single block (Saikat via rbalamohan)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/714461f4
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/714461f4
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/714461f4

Branch: refs/heads/master
Commit: 714461f47e6408ec331acd0ddd640335e6a7a06c
Parents: aca8309
Author: Rajesh Balamohan <rbalamohan@apache.org>
Authored: Sat Jul 4 17:26:09 2015 +0530
Committer: Rajesh Balamohan <rbalamohan@apache.org>
Committed: Sat Jul 4 17:26:09 2015 +0530

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../common/sort/impl/PipelinedSorter.java       | 134 +++++++++++++++----
 .../common/sort/impl/TestPipelinedSorter.java   | 111 +++++++++++++--
 3 files changed, 208 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/714461f4/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 5a62b45..3e095c4 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -11,6 +11,7 @@ INCOMPATIBLE CHANGES
   TEZ-2468. Change the minimum Java version to Java 7.
 
 ALL CHANGES:
+  TEZ-2575. Handle KeyValue pairs size which do not fit in a single block.
   TEZ-2599. Dont send obsoleted data movement events to tasks
   TEZ-2542. TezDAGID fromString array length check.
   TEZ-2296. Add option to print counters for tez-examples.

http://git-wip-us.apache.org/repos/asf/tez/blob/714461f4/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
index e3824be..720d518 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
@@ -210,21 +210,18 @@ public class PipelinedSorter extends ExternalSorter {
     SortSpan newSpan = span.next();
 
     if(newSpan == null) {
-      Stopwatch stopWatch = new Stopwatch();
-      stopWatch.start();
-      // sort in the same thread, do not wait for the thread pool
-      merger.add(span.sort(sorter));
-      spill();
-      stopWatch.stop();
-      LOG.info("Time taken for spill " + (stopWatch.elapsedMillis()) + " ms");
-      if (pipelinedShuffle) {
-        List<Event> events = Lists.newLinkedList();
-        String pathComponent = (outputContext.getUniqueIdentifier() + "_" + (numSpills-1));
-        ShuffleUtils.generateEventOnSpill(events, isFinalMergeEnabled(), false, outputContext,
-            (numSpills - 1), indexCacheList.get(numSpills - 1), partitions, sendEmptyPartitionDetails,
-            pathComponent);
-        outputContext.sendEvents(events);
-        LOG.info("Adding spill event for spill (final update=false), spillId=" + (numSpills
- 1));
+      //avoid sort/spill of empty span
+      if (span.length() > 0) {
+        Stopwatch stopWatch = new Stopwatch();
+        stopWatch.start();
+        // sort in the same thread, do not wait for the thread pool
+        merger.add(span.sort(sorter));
+        spill();
+        stopWatch.stop();
+        LOG.info("Time taken for spill " + (stopWatch.elapsedMillis()) + " ms");
+        if (pipelinedShuffle) {
+          sendPipelinedShuffleEvents();
+        }
       }
       //safe to reset the iterator
       listIterator = bufferList.listIterator();
@@ -253,6 +250,17 @@ public class PipelinedSorter extends ExternalSorter {
     keySerializer.open(span.out);
   }
 
+  // if pipelined shuffle is enabled, this method is called to send events for every spill
+  private void sendPipelinedShuffleEvents() throws IOException{
+    List<Event> events = Lists.newLinkedList();
+    String pathComponent = (outputContext.getUniqueIdentifier() + "_" + (numSpills-1));
+    ShuffleUtils.generateEventOnSpill(events, isFinalMergeEnabled(), false, outputContext,
+        (numSpills - 1), indexCacheList.get(numSpills - 1), partitions, sendEmptyPartitionDetails,
+        pathComponent);
+    outputContext.sendEvents(events);
+    LOG.info("Added spill event for spill (final update=false), spillId=" + (numSpills -
1));
+  }
+
   @Override
   public void write(Object key, Object value)
       throws IOException {
@@ -281,13 +289,18 @@ public class PipelinedSorter extends ExternalSorter {
       throw new IOException("Illegal partition for " + key + " (" +
           partition + ")");
     }
-    if(span.kvmeta.remaining() < METASIZE) {
+    // TBD:FIX in TEZ-2574
+    if (span.kvmeta.remaining() < METASIZE) {
       this.sort();
+      if (span.length() == 0) {
+        spillSingleRecord(key, value, partition);
+        return;
+      }
     }
     int keystart = span.kvbuffer.position();
     int valstart = -1;
     int valend = -1;
-    try { 
+    try {
       keySerializer.serialize(key);
       valstart = span.kvbuffer.position();      
       valSerializer.serialize(value);
@@ -296,13 +309,13 @@ public class PipelinedSorter extends ExternalSorter {
       // restore limit
       span.kvbuffer.position(keystart);
       this.sort();
-
-      bufferOverflowRecursion++;
-      if (bufferOverflowRecursion > bufferList.size()) {
-        throw new MapBufferTooSmallException("Record too large for in-memory buffer. Exceeded
"
-            + "buffer overflow limit, bufferOverflowRecursion=" + bufferOverflowRecursion
+ ", bufferList"
-            + ".size=" + bufferList.size() + ", blockSize=" + blockSize);
+      if (span.length() == 0 || bufferOverflowRecursion > bufferList.size()) {
+        // spill the current key value pair
+        spillSingleRecord(key, value, partition);
+        bufferOverflowRecursion = 0;
+        return;
       }
+      bufferOverflowRecursion++;
       // try again
       this.collect(key, value, partition);
       return;
@@ -344,6 +357,75 @@ public class PipelinedSorter extends ExternalSorter {
     }
   }
 
+  // it is guaranteed that when spillSingleRecord is called, there is
+  // no merger spans queued in executor.
+  private void spillSingleRecord(final Object key, final Object value,
+          int partition) throws IOException {
+    final TezSpillRecord spillRec = new TezSpillRecord(partitions);
+    // getSpillFileForWrite with size -1 as the serialized size of KV pair is still unknown
+    final Path filename = mapOutputFile.getSpillFileForWrite(numSpills, -1);
+    spillFilePaths.put(numSpills, filename);
+    FSDataOutputStream out = rfs.create(filename, true, 4096);
+
+    try {
+      LOG.info("Spilling to " + filename.toString());
+      for (int i = 0; i < partitions; ++i) {
+        if (isThreadInterrupted()) {
+          return;
+        }
+        Writer writer = null;
+        try {
+          long segmentStart = out.getPos();
+          writer = new Writer(conf, out, keyClass, valClass, codec,
+              spilledRecordsCounter, null, false);
+          // we need not check for combiner since its a single record
+          if (i == partition) {
+            final long recordStart = out.getPos();
+            writer.append(key, value);
+            mapOutputRecordCounter.increment(1);
+            mapOutputByteCounter.increment(out.getPos() - recordStart);
+          }
+
+          writer.close();
+          adjustSpillCounters(writer.getRawLength(), writer.getCompressedLength());
+
+          // record offsets
+          final TezIndexRecord rec =
+              new TezIndexRecord(
+                  segmentStart,
+                  writer.getRawLength(),
+                  writer.getCompressedLength());
+          spillRec.putIndex(rec, i);
+          writer = null;
+        } finally {
+          if (null != writer) {
+            writer.close();
+          }
+        }
+      }
+
+      Path indexFilename =
+          mapOutputFile.getSpillIndexFileForWrite(numSpills, partitions
+              * MAP_OUTPUT_INDEX_RECORD_LENGTH);
+      LOG.info("Spill Index filename:" + indexFilename);
+      spillFileIndexPaths.put(numSpills, indexFilename);
+      spillRec.writeToFile(indexFilename, conf);
+      //TODO: honor cache limits
+      indexCacheList.add(spillRec);
+      ++numSpills;
+      if (!isFinalMergeEnabled()) {
+          fileOutputByteCounter.increment(rfs.getFileStatus(filename).getLen());
+          //No final merge. Set the number of files offered via shuffle-handler
+          numShuffleChunks.setValue(numSpills);
+      }
+      if (pipelinedShuffle) {
+        sendPipelinedShuffleEvents();
+      }
+    } finally {
+        out.close();
+    }
+  }
+
   public void spill() throws IOException {
     // create spill file
     final long size = capacity +
@@ -383,7 +465,6 @@ public class PipelinedSorter extends ExternalSorter {
         //close
         writer.close();
         adjustSpillCounters(writer.getRawLength(), writer.getCompressedLength());
-
         // record offsets
         final TezIndexRecord rec = 
             new TezIndexRecord(
@@ -463,7 +544,6 @@ public class PipelinedSorter extends ExternalSorter {
 
         for (int i = startIndex; i < endIndex; i++) {
           boolean isLastEvent = (i == numSpills - 1);
-
           String pathComponent = (outputContext.getUniqueIdentifier() + "_" + i);
           ShuffleUtils.generateEventOnSpill(events, isFinalMergeEnabled(), isLastEvent,
               outputContext, i, indexCacheList.get(i), partitions,
@@ -507,7 +587,6 @@ public class PipelinedSorter extends ExternalSorter {
             "numSpills: " + numSpills + ", finalOutputFile:" + finalOutputFile + ", finalIndexFile:"
                 + finalIndexFile);
       }
-
       //The output stream for the final single output file
       FSDataOutputStream finalOut = rfs.create(finalOutputFile, true, 4096);
 
@@ -542,7 +621,6 @@ public class PipelinedSorter extends ExternalSorter {
             nullProgressable, sortSegments, true,
             null, spilledRecordsCounter, additionalSpillBytesRead,
             null); // Not using any Progress in TezMerger. Should just work.
-
         //write merged output to disk
         long segmentStart = finalOut.getPos();
         Writer writer =
@@ -668,7 +746,7 @@ public class PipelinedSorter extends ExternalSorter {
       ByteBuffer reserved = source.duplicate();
       reserved.mark();
       LOG.info("reserved.remaining() = " + reserved.remaining());
-      LOG.info("reserved.size = "+ metasize);
+      LOG.info("reserved.metasize = "+ metasize);
       reserved.position(metasize);
       kvbuffer = reserved.slice();
       reserved.flip();

http://git-wip-us.apache.org/repos/asf/tez/blob/714461f4/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java
b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java
index 601782e..765f1a4 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java
@@ -138,15 +138,8 @@ public class TestPipelinedSorter {
 
   @Test
   public void basicTestWithSmallBlockSize() throws IOException {
-    try {
-      //3 MB key & 3 MB value, whereas block size is just 3 MB
-      basicTest(1, 5, (3 << 20), (10 * 1024l * 1024l), 3 << 20);
-      fail();
-    } catch (IOException ioe) {
-      Assert.assertTrue(
-          ioe.getMessage().contains("Record too large for in-memory buffer."
-              + " Exceeded buffer overflow limit"));
-    }
+    //3 MB key & 3 MB value, whereas block size is just 3 MB
+    basicTest(1, 5, (3 << 20), (10 * 1024l * 1024l), 3 << 20);
   }
 
   @Test
@@ -157,6 +150,77 @@ public class TestPipelinedSorter {
   }
 
   @Test
+  public void testKVExceedsBuffer() throws IOException {
+    // a single block of 1mb, 2KV pair, key 1mb, value 1mb
+    basicTest(1, 2, (1 << 20), (1 * 1024l * 1024l), 1<<20);
+  }
+
+  @Test
+  public void testKVExceedsBuffer2() throws IOException {
+    // a list of 4 blocks each 256kb, 2KV pair, key 1mb, value 1mb
+    basicTest(1, 2, (1 << 20), (1 * 1024l * 1024l), 256<<10);
+  }
+
+  @Test
+  public void testExceedsKVWithMultiplePartitions() throws IOException {
+    conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT, true);
+    this.numOutputs = 5;
+    this.initialAvailableMem = 1 * 1024 * 1024;
+    PipelinedSorter sorter = new PipelinedSorter(this.outputContext, conf, numOutputs,
+        initialAvailableMem, 0);
+
+    writeData(sorter, 100, 1<<20);
+    verifyCounters(sorter, outputContext);
+  }
+
+  @Test
+  public void testExceedsKVWithPipelinedShuffle() throws IOException {
+    this.numOutputs = 1;
+    this.initialAvailableMem = 1 *1024 * 1024;
+    conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT, false);
+    PipelinedSorter sorter = new PipelinedSorter(this.outputContext, conf, numOutputs,
+        initialAvailableMem, 1 << 20);
+
+    writeData(sorter, 5, 1<<20);
+
+    // final merge is disabled. Final output file would not be populated in this case.
+    assertTrue(sorter.finalOutputFile == null);
+    TezCounter numShuffleChunks = outputContext.getCounters().findCounter(TaskCounter.SHUFFLE_CHUNK_COUNT);
+    assertTrue(sorter.getNumSpills() == numShuffleChunks.getValue());
+    conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT, true);
+  }
+
+  @Test
+  // first write a KV which dosnt fit into span, this will spill to disk
+  // next write smaller keys, which will update the hint
+  public void testWithVariableKVLength1() throws IOException {
+    int numkeys[] = {2, 2};
+    int keylens[] = {32 << 20, 7 << 20};
+    basicTest2(1, numkeys, keylens, 64 << 20, 32 << 20);
+  }
+
+  @Test
+  // first write a kv pair which fits into buffer,
+  // next try to write a kv pair which doesnt fit into remaining buffer
+  public void testWithVariableKVLength() throws IOException {
+    //2 KVpairs of 2X2mb, 2 KV of 2X7mb
+    int numkeys[] = {2, 2};
+    int keylens[] = {2 << 20, 7<<20};
+    basicTest2(1, numkeys, keylens, 64 << 20, 32 << 20);
+  }
+
+  @Test
+  // first write KV which fits into span
+  // then write KV which doesnot fit in buffer. this will be spilled to disk
+  // all keys should be merged properly
+  public void testWithVariableKVLength2() throws IOException {
+    // 20 KVpairs of 2X10kb, 10 KV of 2X200kb, 20KV of 2X10kb
+    int numkeys[] = {20, 10, 20};
+    int keylens[] = {10<<10, 200<<10, 10<<10};
+    basicTest2(1, numkeys, keylens, (1 * 1024l * 1024l), 1 << 18);
+  }
+
+  @Test
   public void testWithCustomComparator() throws IOException {
     //Test with custom comparator
     conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_COMPARATOR_CLASS, CustomComparator.class.getName());
@@ -177,6 +241,7 @@ public class TestPipelinedSorter {
 
     //final merge is disabled. Final output file would not be populated in this case.
     assertTrue(sorter.finalOutputFile == null);
+    conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT, true);
     verify(outputContext, times(1)).sendEvents(anyListOf(Event.class));
   }
 
@@ -192,6 +257,32 @@ public class TestPipelinedSorter {
     verifyCounters(sorter, outputContext);
   }
 
+  public void basicTest2(int partitions, int[] numkeys, int[] keysize,
+      long initialAvailableMem, int  blockSize) throws IOException {
+    this.numOutputs = partitions; // single output
+    PipelinedSorter sorter = new PipelinedSorter(this.outputContext, conf, numOutputs,
+        initialAvailableMem, blockSize);
+    writeData2(sorter, numkeys, keysize);
+    verifyCounters(sorter, outputContext);
+  }
+
+  private void writeData2(ExternalSorter sorter,
+      int[] numKeys, int[] keyLen) throws IOException {
+    sortedDataMap.clear();
+    int counter = 0;
+    for (int numkey : numKeys) {
+      int curKeyLen = keyLen[counter];
+      for (int i = 0; i < numkey; i++) {
+        Text key = new Text(RandomStringUtils.randomAlphanumeric(curKeyLen));
+        Text value = new Text(RandomStringUtils.randomAlphanumeric(curKeyLen));
+        sorter.write(key, value);
+      }
+      counter++;
+    }
+    sorter.flush();
+    sorter.close();
+  }
+
   public void basicTest(int partitions, int numKeys, int keySize,
       long initialAvailableMem, int blockSize) throws IOException {
     this.numOutputs = partitions; // single output
@@ -200,10 +291,10 @@ public class TestPipelinedSorter {
 
     writeData(sorter, numKeys, keySize);
 
+
     verifyCounters(sorter, outputContext);
     Path outputFile = sorter.finalOutputFile;
     FileSystem fs = outputFile.getFileSystem(conf);
-
     IFile.Reader reader = new IFile.Reader(fs, outputFile, null, null, null, false, -1, 4096);
     //Verify dataset
     verifyData(reader);


Mime
View raw message