tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rbalamo...@apache.org
Subject tez git commit: TEZ-2552. CRC errors can cause job to run for very long time in large jobs (rbalamohan)
Date Thu, 30 Jul 2015 04:45:59 GMT
Repository: tez
Updated Branches:
  refs/heads/branch-0.6 aa26fd797 -> b2b6fa7a1


TEZ-2552. CRC errors can cause job to run for very long time in large jobs (rbalamohan)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/b2b6fa7a
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/b2b6fa7a
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/b2b6fa7a

Branch: refs/heads/branch-0.6
Commit: b2b6fa7a148ef7cc3a2e0451e6d08a1669ae11c3
Parents: aa26fd7
Author: Rajesh Balamohan <rbalamohan@apache.org>
Authored: Thu Jul 30 10:17:14 2015 +0530
Committer: Rajesh Balamohan <rbalamohan@apache.org>
Committed: Thu Jul 30 10:17:14 2015 +0530

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../apache/tez/dag/api/TezConfiguration.java    | 22 +++++++++++
 tez-dag/findbugs-exclude.xml                    | 10 +++++
 .../tez/dag/app/dag/impl/TaskAttemptImpl.java   | 27 ++++++++++---
 .../tez/dag/app/dag/impl/TestTaskAttempt.java   | 41 ++++++++++++++++++--
 5 files changed, 92 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/b2b6fa7a/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 768dff2..4efe9c0 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -213,6 +213,7 @@ Release 0.5.5: Unreleased
 INCOMPATIBLE CHANGES
 
 ALL CHANGES:
+  TEZ-2552. CRC errors can cause job to run for very long time in large jobs.
   TEZ-2636. MRInput and MultiMRInput should work for cases when there are 0 physical inputs.
   TEZ-2600. When used with HDFS federation(viewfs) ,tez will throw a error
 

http://git-wip-us.apache.org/repos/asf/tez/blob/b2b6fa7a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
index 0856228..c70ba25 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
@@ -156,6 +156,28 @@ public class TezConfiguration extends Configuration {
   public static final String TEZ_TASK_LOG_LEVEL_DEFAULT = "INFO";
 
   /**
+   * double value. Represents ratio of unique failed outputs / number of consumer
+   * tasks. When this condition or value mentioned in {@link
+   * #TEZ_TASK_MAX_ALLOWED_OUTPUT_FAILURES} is met, task would be declared as failed by AM.
+   *
+   * Expert level setting.
+   */
+  public static final String TEZ_TASK_MAX_ALLOWED_OUTPUT_FAILURES_FRACTION =
+      TEZ_TASK_PREFIX + "max.allowed.output.failures.fraction";
+  public static final double TEZ_TASK_MAX_ALLOWED_OUTPUT_FAILURES_FRACTION_DEFAULT = 0.1;
+
+  /**
+   * Int value. Represents maximum allowed unique failures after which a task would be
+   * declared as failed by AM. When this condition or the threshold mentioned in {@link
+   * #TEZ_TASK_MAX_ALLOWED_OUTPUT_FAILURES_FRACTION} is met, task would be relaunched by
AM.
+   *
+   * Expert level setting.
+   */
+  public static final String TEZ_TASK_MAX_ALLOWED_OUTPUT_FAILURES =
+      TEZ_TASK_PREFIX + "max.allowed.output.failures";
+  public static final int TEZ_TASK_MAX_ALLOWED_OUTPUT_FAILURES_DEFAULT = 10;
+
+  /**
    * Boolean value. Determines when the final outputs to data sinks are committed. Commit
is an
    * output specific operation and typically involves making the output visible for consumption.

    * If the config is true, then the outputs are committed at the end of DAG completion after
all 

http://git-wip-us.apache.org/repos/asf/tez/blob/b2b6fa7a/tez-dag/findbugs-exclude.xml
----------------------------------------------------------------------
diff --git a/tez-dag/findbugs-exclude.xml b/tez-dag/findbugs-exclude.xml
index d3d365d..0ae5ee6 100644
--- a/tez-dag/findbugs-exclude.xml
+++ b/tez-dag/findbugs-exclude.xml
@@ -22,4 +22,14 @@
     <Bug pattern="IS2_INCONSISTENT_SYNC" />
   </Match>
 
+  <!-- TEZ-2552 -->
+  <Match>
+    <Class name="org.apache.tez.dag.app.dag.impl.TaskAttemptImpl"/>
+    <Or>
+      <Field name="MAX_ALLOWED_OUTPUT_FAILURES_FRACTION"/>
+      <Field name="MAX_ALLOWED_OUTPUT_FAILURES"/>
+    </Or>
+    <Bug pattern="ST_WRITE_TO_STATIC_FROM_INSTANCE_METHOD"/>
+  </Match>
+
 </FindBugsFilter>

http://git-wip-us.apache.org/repos/asf/tez/blob/b2b6fa7a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
index 73e598c..fe74586 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
@@ -151,10 +151,11 @@ public class TaskAttemptImpl implements TaskAttempt,
   // Used to store locality information when
   Set<String> taskHosts = new HashSet<String>();
   Set<String> taskRacks = new HashSet<String>();
-  
+
   private Set<TezTaskAttemptID> uniquefailedOutputReports = 
       new HashSet<TezTaskAttemptID>();
-  private static final double MAX_ALLOWED_OUTPUT_FAILURES_FRACTION = 0.25;
+  private static double MAX_ALLOWED_OUTPUT_FAILURES_FRACTION;
+  private static int MAX_ALLOWED_OUTPUT_FAILURES;
 
   protected final boolean isRescheduled;
   private final Resource taskResource;
@@ -404,6 +405,12 @@ public class TaskAttemptImpl implements TaskAttempt,
       TaskHeartbeatHandler taskHeartbeatHandler, AppContext appContext,
       boolean isRescheduled,
       Resource resource, ContainerContext containerContext, boolean leafVertex) {
+    this.MAX_ALLOWED_OUTPUT_FAILURES = conf.getInt(TezConfiguration
+        .TEZ_TASK_MAX_ALLOWED_OUTPUT_FAILURES, TezConfiguration
+        .TEZ_TASK_MAX_ALLOWED_OUTPUT_FAILURES_DEFAULT);
+    this.MAX_ALLOWED_OUTPUT_FAILURES_FRACTION = conf.getDouble(TezConfiguration
+        .TEZ_TASK_MAX_ALLOWED_OUTPUT_FAILURES_FRACTION, TezConfiguration
+        .TEZ_TASK_MAX_ALLOWED_OUTPUT_FAILURES_FRACTION_DEFAULT);
     ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
     this.readLock = rwLock.readLock();
     this.writeLock = rwLock.writeLock();
@@ -1447,15 +1454,23 @@ public class TaskAttemptImpl implements TaskAttempt,
       attempt.uniquefailedOutputReports.add(failedDestTaId);
       float failureFraction = ((float) attempt.uniquefailedOutputReports.size())
           / outputFailedEvent.getConsumerTaskNumber();
-      
-      // If needed we can also use the absolute number of reported output errors
+
+      boolean withinFailureFractionLimits =
+          (failureFraction <= MAX_ALLOWED_OUTPUT_FAILURES_FRACTION);
+      boolean withinOutputFailureLimits =
+          (attempt.uniquefailedOutputReports.size() < MAX_ALLOWED_OUTPUT_FAILURES);
+
       // If needed we can launch a background task without failing this task
       // to generate a copy of the output just in case.
       // If needed we can consider only running consumer tasks
-      if (failureFraction <= MAX_ALLOWED_OUTPUT_FAILURES_FRACTION) {
+      if (withinFailureFractionLimits && withinOutputFailureLimits) {
         return attempt.getInternalState();
       }
-      String message = attempt.getID() + " being failed for too many output errors";
+      String message = attempt.getID() + " being failed for too many output errors. "
+          + "failureFraction=" + failureFraction + ", "
+          + "MAX_ALLOWED_OUTPUT_FAILURES_FRACTION=" + MAX_ALLOWED_OUTPUT_FAILURES_FRACTION
+ ", "
+          + "uniquefailedOutputReports=" + attempt.uniquefailedOutputReports.size() + ",
"
+          + "MAX_ALLOWED_OUTPUT_FAILURES=" + MAX_ALLOWED_OUTPUT_FAILURES;
       LOG.info(message);
       attempt.addDiagnosticInfo(message);
       // send input failed event

http://git-wip-us.apache.org/repos/asf/tez/blob/b2b6fa7a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
index 6949779..126e002 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
@@ -1127,14 +1127,14 @@ public class TestTaskAttempt {
     TezTaskAttemptID mockDestId1 = mock(TezTaskAttemptID.class);
     when(mockMeta.getTaskAttemptID()).thenReturn(mockDestId1);
     TezEvent tzEvent = new TezEvent(mockReEvent, mockMeta);
-    taImpl.handle(new TaskAttemptEventOutputFailed(taskAttemptID, tzEvent, 4));
+    taImpl.handle(new TaskAttemptEventOutputFailed(taskAttemptID, tzEvent, 11));
     
     // failure threshold not met. state is SUCCEEDED
     assertEquals("Task attempt is not in succeeded state", taImpl.getState(),
         TaskAttemptState.SUCCEEDED);
 
     // sending same error again doesnt change anything
-    taImpl.handle(new TaskAttemptEventOutputFailed(taskAttemptID, tzEvent, 4));
+    taImpl.handle(new TaskAttemptEventOutputFailed(taskAttemptID, tzEvent, 11));
     assertEquals("Task attempt is not in succeeded state", taImpl.getState(),
         TaskAttemptState.SUCCEEDED);
     // default value of error cause
@@ -1143,7 +1143,7 @@ public class TestTaskAttempt {
     // different destination attempt reports error. now threshold crossed
     TezTaskAttemptID mockDestId2 = mock(TezTaskAttemptID.class);
     when(mockMeta.getTaskAttemptID()).thenReturn(mockDestId2);    
-    taImpl.handle(new TaskAttemptEventOutputFailed(taskAttemptID, tzEvent, 4));
+    taImpl.handle(new TaskAttemptEventOutputFailed(taskAttemptID, tzEvent, 11));
     
     assertEquals("Task attempt is not in FAILED state", taImpl.getState(),
         TaskAttemptState.FAILED);
@@ -1168,6 +1168,41 @@ public class TestTaskAttempt {
     // No new events.
     verify(eventHandler, times(expectedEventsAfterFetchFailure)).handle(
         arg.capture());
+
+    taskConf.setInt(TezConfiguration.TEZ_TASK_MAX_ALLOWED_OUTPUT_FAILURES, 1);
+    TezTaskID taskID2 = TezTaskID.getInstance(vertexID, 2);
+    MockTaskAttemptImpl taImpl2 = new MockTaskAttemptImpl(taskID2, 1, eventHandler,
+        taListener, taskConf, new SystemClock(),
+        mockHeartbeatHandler, appCtx, locationHint, false,
+        resource, createFakeContainerContext(), false);
+    TezTaskAttemptID taskAttemptID2 = taImpl2.getID();
+
+    taImpl2.handle(new TaskAttemptEventSchedule(taskAttemptID2, 0, 0));
+    // At state STARTING.
+    taImpl2.handle(new TaskAttemptEventStartedRemotely(taskAttemptID2, contId, null));
+    verify(mockHeartbeatHandler).register(taskAttemptID2);
+    taImpl2.handle(new TaskAttemptEvent(taskAttemptID2, TaskAttemptEventType.TA_DONE));
+    assertEquals("Task attempt is not in succeeded state", taImpl2.getState(),
+        TaskAttemptState.SUCCEEDED);
+    verify(mockHeartbeatHandler).unregister(taskAttemptID2);
+
+    mockReEvent = InputReadErrorEvent.create("", 1, 1);
+    mockMeta = mock(EventMetaData.class);
+    mockDestId1 = mock(TezTaskAttemptID.class);
+    when(mockMeta.getTaskAttemptID()).thenReturn(mockDestId1);
+    tzEvent = new TezEvent(mockReEvent, mockMeta);
+    //This should fail even when MAX_ALLOWED_OUTPUT_FAILURES_FRACTION is within limits, as
+    // MAX_ALLOWED_OUTPUT_FAILURES has crossed the limit.
+    taImpl2.handle(new TaskAttemptEventOutputFailed(taskAttemptID2, tzEvent, 8));
+    assertEquals("Task attempt is not in succeeded state", taImpl2.getState(),
+        TaskAttemptState.FAILED);
+
+    assertEquals("Task attempt is not in FAILED state", taImpl2.getState(),
+        TaskAttemptState.FAILED);
+    assertEquals(TaskAttemptTerminationCause.OUTPUT_LOST, taImpl2.getTerminationCause());
+    // verify unregister is not invoked again
+    verify(mockHeartbeatHandler, times(1)).unregister(taskAttemptID2);
+
   }
 
   @SuppressWarnings("deprecation")


Mime
View raw message