tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rbalamo...@apache.org
Subject tez git commit: TEZ-2732. DefaultSorter throws ArrayIndex exceptions on 2047 Mb size sort buffers (rbalamohan)
Date Sat, 22 Aug 2015 01:06:03 GMT
Repository: tez
Updated Branches:
  refs/heads/branch-0.6 c27cc583e -> 253e8dee4


TEZ-2732. DefaultSorter throws ArrayIndex exceptions on 2047 Mb size sort buffers (rbalamohan)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/253e8dee
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/253e8dee
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/253e8dee

Branch: refs/heads/branch-0.6
Commit: 253e8dee424d400495804794329f3670d8439fe0
Parents: c27cc58
Author: Rajesh Balamohan <rbalamohan@apache.org>
Authored: Sat Aug 22 06:36:55 2015 +0530
Committer: Rajesh Balamohan <rbalamohan@apache.org>
Committed: Sat Aug 22 06:36:55 2015 +0530

----------------------------------------------------------------------
 CHANGES.txt                                     |  2 +
 .../common/sort/impl/dflt/DefaultSorter.java    | 25 ++++--
 .../sort/impl/dflt/TestDefaultSorter.java       | 91 +++++++++++++++++++-
 3 files changed, 111 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/253e8dee/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index a0dfcdb..b9b9694 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -6,6 +6,7 @@ Release 0.6.3: Unreleased
 INCOMPATIBLE CHANGES
 
 ALL CHANGES:
+  TEZ-2732. DefaultSorter throws ArrayIndex exceptions on 2047 Mb size sort buffers
   TEZ-2290. Scale memory for Default Sorter down to a max of 2047 MB if configured higher
   TEZ-2734. Add a test to verify the filename generated by OnDiskMerge
   TEZ-2687. ATS History shutdown happens before the min-held containers are released
@@ -227,6 +228,7 @@ INCOMPATIBLE CHANGES
   TEZ-2552. CRC errors can cause job to run for very long time in large jobs.
 
 ALL CHANGES:
+  TEZ-2732. DefaultSorter throws ArrayIndex exceptions on 2047 Mb size sort buffers
   TEZ-2290. Scale memory for Default Sorter down to a max of 2047 MB if configured higher
   TEZ-2734. Add a test to verify the filename generated by OnDiskMerge
   TEZ-2687. ATS History shutdown happens before the min-held containers are released

http://git-wip-us.apache.org/repos/asf/tez/blob/253e8dee/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
index c009923..69a303c 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
@@ -117,6 +117,8 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable
{
   private long totalKeys = 0;
   private long sameKey = 0;
 
+  public static final int MAX_IO_SORT_MB = 1800;
+
   public DefaultSorter(OutputContext outputContext, Configuration conf, int numOutputs,
       long initialMemoryAvailable) throws IOException {
     super(outputContext, conf, numOutputs, initialMemoryAvailable);
@@ -188,13 +190,15 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable
{
           "=" + availableMemoryMB + ". It should be > 0");
     }
 
-    if (availableMemoryMB > 2047) {
+    if (availableMemoryMB > MAX_IO_SORT_MB) {
       LOG.warn("Scaling down " + TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB +
-          "=" + availableMemoryMB + " to 2047 (max sort buffer size supported for DefaultSorter)");
+          "=" + availableMemoryMB + " to " + MAX_IO_SORT_MB
+          + " (max sort buffer size supported forDefaultSorter)");
     }
 
-    //cap sort buffer to 2047 for DefaultSorter.
-    return Math.min(2047, availableMemoryMB);
+    // cap sort buffer to MAX_IO_SORT_MB for DefaultSorter.
+    // Not using 2047 to avoid any ArrayIndexOutofBounds in collect() phase.
+    return Math.min(MAX_IO_SORT_MB, availableMemoryMB);
   }
 
 
@@ -261,6 +265,15 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable
{
               // leave at least half the split buffer for serialization data
               // ensure that kvindex >= bufindex
               final int distkvi = distanceTo(bufindex, kvbidx);
+              /**
+               * Reason for capping sort buffer to MAX_IO_SORT_MB
+               * E.g
+               * kvbuffer.length = 2146435072 (2047 MB)
+               * Corner case: bufIndex=2026133899, kvbidx=523629312.
+               * distkvi = mod - i + j = 2146435072 - 2026133899 + 523629312 = 643930485
+               * newPos = (2026133899 + (max(.., min(643930485/2, 271128624))) (This would
+               * overflow)
+               */
               final int newPos = (bufindex +
                 Math.max(2 * METASIZE - 1,
                         Math.min(distkvi / 2,
@@ -591,7 +604,9 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable
{
         }
       }
       // here, we know that we have sufficient space to write
-      if (bufindex + len > bufvoid) {
+      // int overflow possible with (bufindex + len)
+      long futureBufIndex = (long) bufindex + len;
+      if (futureBufIndex > bufvoid) {
         final int gaplen = bufvoid - bufindex;
         System.arraycopy(b, off, kvbuffer, bufindex, gaplen);
         len -= gaplen;

http://git-wip-us.apache.org/repos/asf/tez/blob/253e8dee/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/dflt/TestDefaultSorter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/dflt/TestDefaultSorter.java
b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/dflt/TestDefaultSorter.java
index 4d4c99a..654cfef 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/dflt/TestDefaultSorter.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/dflt/TestDefaultSorter.java
@@ -29,10 +29,14 @@ import static org.mockito.Mockito.mock;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.math3.random.RandomDataGenerator;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.util.StringInterner;
 import org.apache.tez.common.TezRuntimeFrameworkConfigs;
 import org.apache.tez.common.TezUtils;
 import org.apache.tez.common.counters.TezCounters;
@@ -42,8 +46,10 @@ import org.apache.tez.runtime.api.OutputContext;
 import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
 import org.apache.tez.runtime.library.common.MemoryUpdateCallbackHandler;
 import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils;
+import org.apache.tez.runtime.library.common.sort.impl.ExternalSorter;
 import org.apache.tez.runtime.library.partitioner.HashPartitioner;
 import org.junit.Before;
+import org.junit.Ignore;
 import org.junit.Test;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
@@ -90,11 +96,92 @@ public class TestDefaultSorter {
     }
   }
 
+
+  @Test
+  @Ignore
+  /**
+   * Disabling this, as this would need 2047 MB sort mb for testing.
+   * Set DefaultSorter.MAX_IO_SORT_MB = 20467 for running this.
+   */
+  public void testSortLimitsWithSmallRecord() throws IOException {
+    conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS, Text.class.getName());
+    conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS, NullWritable.class.getName());
+    OutputContext context = createTezOutputContext();
+
+    doReturn(2800 * 1024 * 1024l).when(context).getTotalMemoryAvailableToTask();
+
+    //Setting IO_SORT_MB to 2047 MB
+    conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB, 2047);
+    context.requestInitialMemory(
+        ExternalSorter.getInitialMemoryRequirement(conf,
+            context.getTotalMemoryAvailableToTask()), new MemoryUpdateCallbackHandler());
+
+    DefaultSorter sorter = new DefaultSorter(context, conf, 2, 2047 << 20);
+
+    //Reset key/value in conf back to Text for other test cases
+    conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS, Text.class.getName());
+    conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS, Text.class.getName());
+
+    int i = 0;
+    /**
+     * If io.sort.mb is not capped to 1800, this would end up throwing
+     * "java.lang.ArrayIndexOutOfBoundsException" after many spills.
+     * Intentionally made it as infinite loop.
+     */
+    while (true) {
+      //test for the avg record size 2 (in lower spectrum)
+      Text key = new Text(i + "");
+      sorter.write(key, NullWritable.get());
+      i = (i + 1) % 10;
+    }
+  }
+
+  @Test
+  @Ignore
+  /**
+   * Disabling this, as this would need 2047 MB io.sort.mb for testing.
+   * Provide > 2GB to JVM when running this test to avoid OOM in string generation.
+   *
+   * Set DefaultSorter.MAX_IO_SORT_MB = 2047 for running this.
+   */
+  public void testSortLimitsWithLargeRecords() throws IOException {
+    OutputContext context = createTezOutputContext();
+
+    doReturn(2800 * 1024 * 1024l).when(context).getTotalMemoryAvailableToTask();
+
+    //Setting IO_SORT_MB to 2047 MB
+    conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB, 2047);
+    context.requestInitialMemory(
+        ExternalSorter.getInitialMemoryRequirement(conf,
+            context.getTotalMemoryAvailableToTask()), new MemoryUpdateCallbackHandler());
+
+    DefaultSorter sorter = new DefaultSorter(context, conf, 2, 2047 << 20);
+
+    int i = 0;
+    /**
+     * If io.sort.mb is not capped to 1800, this would end up throwing
+     * "java.lang.ArrayIndexOutOfBoundsException" after many spills.
+     * Intentionally made it as infinite loop.
+     */
+    RandomDataGenerator rnd = new RandomDataGenerator();
+    while (true) {
+      Text key = new Text(i + "");
+      //Generate random size between 1 MB to 100 MB.
+      int valSize = rnd.nextInt(1 * 1024 * 1024, 100 * 1024 * 1024);
+      String val = StringInterner.weakIntern(StringUtils.repeat("v", valSize));
+      sorter.write(key, new Text(val));
+      i = (i + 1) % 10;
+    }
+  }
+
+
   @Test(timeout = 5000)
   public void testSortMBLimits() throws Exception {
 
-    assertTrue("Expected 2047", DefaultSorter.computeSortBufferSize(4096) == 2047);
-    assertTrue("Expected 2047", DefaultSorter.computeSortBufferSize(2047) == 2047);
+    assertTrue("Expected " + DefaultSorter.MAX_IO_SORT_MB,
+        DefaultSorter.computeSortBufferSize(4096) == DefaultSorter.MAX_IO_SORT_MB);
+    assertTrue("Expected " + DefaultSorter.MAX_IO_SORT_MB,
+        DefaultSorter.computeSortBufferSize(2047) == DefaultSorter.MAX_IO_SORT_MB);
     assertTrue("Expected 1024", DefaultSorter.computeSortBufferSize(1024) == 1024);
 
     try {


Mime
View raw message