tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jeag...@apache.org
Subject tez git commit: TEZ-3452. Auto-reduce parallelism calculation can overflow with large inputs (jeagles)
Date Thu, 20 Oct 2016 16:13:10 GMT
Repository: tez
Updated Branches:
  refs/heads/branch-0.8 327b29685 -> 7f8687f24


TEZ-3452. Auto-reduce parallelism calculation can overflow with large inputs (jeagles)

(cherry picked from commit ed03611245423c89a9881af8bdc85ab909992a5d)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/7f8687f2
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/7f8687f2
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/7f8687f2

Branch: refs/heads/branch-0.8
Commit: 7f8687f249e63b523d4260006cee1b5223828805
Parents: 327b296
Author: Jonathan Eagles <jeagles@yahoo-inc.com>
Authored: Wed Oct 19 16:56:26 2016 -0500
Committer: Jonathan Eagles <jeagles@yahoo-inc.com>
Committed: Thu Oct 20 11:12:29 2016 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |  2 +
 .../vertexmanager/ShuffleVertexManager.java     | 47 ++++++----
 .../vertexmanager/TestShuffleVertexManager.java | 94 +++++++++++++++++++-
 3 files changed, 124 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/7f8687f2/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index efdea58..fce5e1c 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES
 
 ALL CHANGES:
 
+  TEZ-3452. Auto-reduce parallelism calculation can overflow with large inputs
   TEZ-3439. Tez joinvalidate fails when first input argument size is bigger than the second.
   TEZ-3464. Fix findbugs warnings in tez-dag mainLoop
   TEZ-3330. Propagate additional config parameters when running MR jobs via Tez.
@@ -516,6 +517,7 @@ INCOMPATIBLE CHANGES
 
 ALL CHANGES:
 
+  TEZ-3452. Auto-reduce parallelism calculation can overflow with large inputs
   TEZ-3439. Tez joinvalidate fails when first input argument size is bigger than the second.
   TEZ-3464. Fix findbugs warnings in tez-dag mainLoop
   TEZ-3335. DAG client thinks app is still running when app status is null

http://git-wip-us.apache.org/repos/asf/tez/blob/7f8687f2/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
index c4058c4..e8abe67 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
@@ -65,6 +65,7 @@ import javax.annotation.Nullable;
 import java.io.ByteArrayInputStream;
 import java.io.DataInputStream;
 import java.io.IOException;
+import java.math.BigInteger;
 import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.BitSet;
@@ -696,28 +697,44 @@ public class ShuffleVertexManager extends VertexManagerPlugin {
     // Change this to use per partition stats for more accuracy TEZ-2962.
     // Instead of aggregating overall size and then dividing equally - coalesce partitions
until 
     // desired per partition size is achieved.
-    long expectedTotalSourceTasksOutputSize = 0;
+    BigInteger expectedTotalSourceTasksOutputSize = BigInteger.ZERO;
     for (Map.Entry<String, SourceVertexInfo> vInfo : getBipartiteInfo()) {
       SourceVertexInfo srcInfo = vInfo.getValue();
       if (srcInfo.numTasks > 0 && srcInfo.numVMEventsReceived > 0) {
         // this assumes that 1 vmEvent is received per completed task - TEZ-2961
-        expectedTotalSourceTasksOutputSize += 
-            (srcInfo.numTasks * srcInfo.outputSize) / srcInfo.numVMEventsReceived;
+        // Estimate total size by projecting based on the current average size per event
+        BigInteger srcOutputSize = BigInteger.valueOf(srcInfo.outputSize);
+        BigInteger srcNumTasks = BigInteger.valueOf(srcInfo.numTasks);
+        BigInteger srcNumVMEventsReceived = BigInteger.valueOf(srcInfo.numVMEventsReceived);
+        BigInteger expectedSrcOutputSize = srcOutputSize.multiply(srcNumTasks).divide(srcNumVMEventsReceived);
+        expectedTotalSourceTasksOutputSize = expectedTotalSourceTasksOutputSize.add(expectedSrcOutputSize);
       }
     }
 
-    LOG.info("Expected output: " + expectedTotalSourceTasksOutputSize + " based on actual
output: "
-        + completedSourceTasksOutputSize + " from " + numVertexManagerEventsReceived + "
vertex manager events. "
-        + " desiredTaskInputSize: " + desiredTaskInputDataSize + " max slow start tasks:"
-        + (totalNumBipartiteSourceTasks * slowStartMaxSrcCompletionFraction) + " num sources
completed:"
-        + numBipartiteSourceTasksCompleted);
-
-    int desiredTaskParallelism = 
-        (int)(
-            (expectedTotalSourceTasksOutputSize+desiredTaskInputDataSize-1)/
-            desiredTaskInputDataSize);
-    if(desiredTaskParallelism < minTaskParallelism) {
-      desiredTaskParallelism = minTaskParallelism;
+    LOG.info("Expected output: {} based on actual output: {} from {} vertex " +
+        "manager events. desiredTaskInputSize: {} max slow start tasks: {} " +
+        " num sources completed: {}", expectedTotalSourceTasksOutputSize,
+        completedSourceTasksOutputSize, numVertexManagerEventsReceived,
+        this.desiredTaskInputDataSize,
+        (totalNumBipartiteSourceTasks * this.slowStartMaxSrcCompletionFraction),
+        numBipartiteSourceTasksCompleted);
+
+    // Calculate number of desired tasks by dividing with rounding up
+    BigInteger desiredTaskInputDataSize = BigInteger.valueOf(this.desiredTaskInputDataSize);
+    BigInteger desiredTaskInputDataSizeMinusOne = BigInteger.valueOf(this.desiredTaskInputDataSize
- 1);
+    BigInteger bigDesiredTaskParallelism =
+        expectedTotalSourceTasksOutputSize.add(desiredTaskInputDataSizeMinusOne).divide(desiredTaskInputDataSize);
+
+    if(bigDesiredTaskParallelism.compareTo(BigInteger.valueOf(Integer.MAX_VALUE)) > 0)
{
+      LOG.info("Not reducing auto parallelism for vertex: {}"
+              + " since the desired parallelism of {} is greater than or equal"
+              + " to the max parallelism of {}", getContext().getVertexName(),
+          bigDesiredTaskParallelism, Integer.MAX_VALUE);
+      return true;
+    }
+    int desiredTaskParallelism = bigDesiredTaskParallelism.intValue();
+    if(desiredTaskParallelism < this.minTaskParallelism) {
+      desiredTaskParallelism = this.minTaskParallelism;
     }
 
     if(desiredTaskParallelism >= currentParallelism) {

http://git-wip-us.apache.org/repos/asf/tez/blob/7f8687f2/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
index 2566c94..cd3a083 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
@@ -460,6 +460,70 @@ public class TestShuffleVertexManager {
     // parallelism changed due to small data size
     scheduledTasks.clear();
 
+    // Ensure long overflow doesn't reduce mistakenly
+    // Overflow can occur previously when output size * num tasks for a single vertex would
over flow max long
+    //
+    manager = createManager(conf, mockContext, 1.0f, 1.0f, (long)(Long.MAX_VALUE / 1.5));
+    manager.onVertexStarted(emptyCompletions);
+    manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED));
+    manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2, VertexState.CONFIGURED));
+    manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3, VertexState.CONFIGURED));
+    Assert.assertEquals(4, manager.pendingTasks.size()); // no tasks scheduled
+    Assert.assertEquals(4, manager.totalNumBipartiteSourceTasks);
+    // task completion from non-bipartite stage does nothing
+    manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId3, 0));
+    Assert.assertEquals(4, manager.pendingTasks.size()); // no tasks scheduled
+    Assert.assertEquals(4, manager.totalNumBipartiteSourceTasks);
+    Assert.assertEquals(0, manager.numBipartiteSourceTasksCompleted);
+    // First source 1 task completes
+    vmEvent = getVertexManagerEvent(null, 0L, mockSrcVertexId1);
+    manager.onVertexManagerEventReceived(vmEvent);
+    manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 0));
+    Assert.assertEquals(4, manager.pendingTasks.size());
+    Assert.assertEquals(0, scheduledTasks.size()); // no tasks scheduled
+    Assert.assertEquals(1, manager.numBipartiteSourceTasksCompleted);
+    Assert.assertEquals(1, manager.numVertexManagerEventsReceived);
+    Assert.assertEquals(0L, manager.completedSourceTasksOutputSize);
+    // Second source 1 task completes
+    vmEvent = getVertexManagerEvent(null, 0L, mockSrcVertexId1);
+    manager.onVertexManagerEventReceived(vmEvent);
+    manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 1));
+    Assert.assertEquals(4, manager.pendingTasks.size());
+    Assert.assertEquals(0, scheduledTasks.size()); // no tasks scheduled
+    Assert.assertEquals(2, manager.numBipartiteSourceTasksCompleted);
+    Assert.assertEquals(0L, manager.completedSourceTasksOutputSize);
+    // First source 2 task completes
+    vmEvent = getVertexManagerEvent(null, Long.MAX_VALUE >> 1 , mockSrcVertexId2);
+    manager.onVertexManagerEventReceived(vmEvent);
+    manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 0));
+    Assert.assertEquals(4, manager.pendingTasks.size());
+    Assert.assertEquals(0, scheduledTasks.size()); // no tasks scheduled
+    Assert.assertEquals(3, manager.numBipartiteSourceTasksCompleted);
+    Assert.assertEquals(Long.MAX_VALUE >> 1, manager.completedSourceTasksOutputSize);
+    // Second source 2 task completes
+    vmEvent = getVertexManagerEvent(null, Long.MAX_VALUE >> 1 , mockSrcVertexId2);
+    manager.onVertexManagerEventReceived(vmEvent);
+    manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 1));
+    // Auto-reduce is triggered
+    verify(mockContext, times(5)).reconfigureVertex(anyInt(), any(VertexLocationHint.class),
anyMap());
+    verify(mockContext, times(3)).reconfigureVertex(eq(2), any(VertexLocationHint.class),
anyMap());
+    Assert.assertEquals(2, newEdgeManagers.size());
+    Assert.assertEquals(0, manager.pendingTasks.size()); // all tasks scheduled
+    Assert.assertEquals(2, scheduledTasks.size());
+    Assert.assertTrue(scheduledTasks.contains(new Integer(0)));
+    Assert.assertTrue(scheduledTasks.contains(new Integer(1)));
+    Assert.assertEquals(4, manager.numBipartiteSourceTasksCompleted);
+    Assert.assertEquals(4, manager.numVertexManagerEventsReceived);
+    Assert.assertEquals(Long.MAX_VALUE >> 1 << 1, manager.completedSourceTasksOutputSize);
+
+    //reset context for next test
+    when(mockContext.getVertexNumTasks(mockSrcVertexId1)).thenReturn(2);
+    when(mockContext.getVertexNumTasks(mockSrcVertexId2)).thenReturn(2);
+    when(mockContext.getVertexNumTasks(mockManagedVertexId)).thenReturn(4);
+
+    // parallelism changed due to small data size
+    scheduledTasks.clear();
+
     manager = createManager(conf, mockContext, 0.5f, 0.5f);
     manager.onVertexStarted(emptyCompletions);
     manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED));
@@ -493,8 +557,8 @@ public class TestShuffleVertexManager {
 
     manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 1));
     // managedVertex tasks reduced
-    verify(mockContext, times(5)).reconfigureVertex(anyInt(), any(VertexLocationHint.class),
anyMap());
-    verify(mockContext, times(3)).reconfigureVertex(eq(2), any(VertexLocationHint.class),
anyMap());
+    verify(mockContext, times(6)).reconfigureVertex(anyInt(), any(VertexLocationHint.class),
anyMap());
+    verify(mockContext, times(4)).reconfigureVertex(eq(2), any(VertexLocationHint.class),
anyMap());
     Assert.assertEquals(2, newEdgeManagers.size());
     // TODO improve tests for parallelism
     Assert.assertEquals(0, manager.pendingTasks.size()); // all tasks scheduled
@@ -507,7 +571,7 @@ public class TestShuffleVertexManager {
     
     // more completions dont cause recalculation of parallelism
     manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 0));
-    verify(mockContext, times(5)).reconfigureVertex(anyInt(), any(VertexLocationHint.class),
anyMap());
+    verify(mockContext, times(6)).reconfigureVertex(anyInt(), any(VertexLocationHint.class),
anyMap());
     Assert.assertEquals(2, newEdgeManagers.size());
     
     EdgeManagerPlugin edgeManager = newEdgeManagers.values().iterator().next();
@@ -1379,7 +1443,17 @@ public class TestShuffleVertexManager {
   }
 
   private ShuffleVertexManager createManager(Configuration conf,
-      VertexManagerPluginContext context, Float min, Float max) {
+                                             VertexManagerPluginContext context, Float min,
Float max, Long size) {
+    return createShuffleVertexManager(conf, context, min, max, size);
+  }
+
+  private ShuffleVertexManager createManager(Configuration conf,
+                                             VertexManagerPluginContext context, Float min,
Float max) {
+    return createShuffleVertexManager(conf, context, min, max, null);
+  }
+
+  private ShuffleVertexManager createShuffleVertexManager(Configuration conf,
+      VertexManagerPluginContext context, Float min, Float max, Long size) {
     if (min != null) {
       conf.setFloat(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MIN_SRC_FRACTION, min);
     } else {
@@ -1390,6 +1464,18 @@ public class TestShuffleVertexManager {
     } else {
       conf.unset(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MAX_SRC_FRACTION);
     }
+    conf.setBoolean(
+        ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_ENABLE_AUTO_PARALLEL,
+            true);
+    if (size != null) {
+      conf.setLong(
+          ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_DESIRED_TASK_INPUT_SIZE,
+          size);
+    } else {
+      conf.setLong(
+          ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_DESIRED_TASK_INPUT_SIZE,
+          1000L);
+    }
     UserPayload payload;
     try {
       payload = TezUtils.createUserPayloadFromConf(conf);


Mime
View raw message