tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jeag...@apache.org
Subject tez git commit: TEZ-3969. TaskAttemptImpl: static fields initialized in instance ctor (Jaume Marhuenda via jegales)
Date Tue, 09 Oct 2018 18:12:33 GMT
Repository: tez
Updated Branches:
  refs/heads/master 7d73bb2dc -> 39d76a656


TEZ-3969. TaskAttemptImpl: static fields initialized in instance ctor (Jaume Marhuenda via
jegales)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/39d76a65
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/39d76a65
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/39d76a65

Branch: refs/heads/master
Commit: 39d76a656216d4843908279ef8eaa29a4cc83104
Parents: 7d73bb2
Author: Jaume Marhuenda <jmarhuenda@hortonworks.com>
Authored: Tue Oct 9 13:12:17 2018 -0500
Committer: Jonathan Eagles <jeagles@yahoo-inc.com>
Committed: Tue Oct 9 13:12:17 2018 -0500

----------------------------------------------------------------------
 tez-dag/findbugs-exclude.xml                    | 11 -----
 .../java/org/apache/tez/dag/app/dag/Vertex.java | 13 ++++++
 .../tez/dag/app/dag/impl/TaskAttemptImpl.java   | 38 +++++++---------
 .../apache/tez/dag/app/dag/impl/VertexImpl.java | 46 ++++++++++++++++++++
 .../tez/dag/app/dag/impl/TestTaskAttempt.java   | 28 +++++++++---
 5 files changed, 97 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/39d76a65/tez-dag/findbugs-exclude.xml
----------------------------------------------------------------------
diff --git a/tez-dag/findbugs-exclude.xml b/tez-dag/findbugs-exclude.xml
index 1150ccb..a6ce380 100644
--- a/tez-dag/findbugs-exclude.xml
+++ b/tez-dag/findbugs-exclude.xml
@@ -252,15 +252,4 @@
     <Bug pattern="URF_UNREAD_PUBLIC_OR_PROTECTED_FIELD"/>
   </Match>
 
-  <!-- TEZ-2552 -->
-  <Match>
-    <Class name="org.apache.tez.dag.app.dag.impl.TaskAttemptImpl"/>
-    <Or>
-      <Field name="MAX_ALLOWED_OUTPUT_FAILURES_FRACTION"/>
-      <Field name="MAX_ALLOWED_OUTPUT_FAILURES"/>
-      <Field name="MAX_ALLOWED_TIME_FOR_TASK_READ_ERROR_SEC"/>
-    </Or>
-    <Bug pattern="ST_WRITE_TO_STATIC_FROM_INSTANCE_METHOD"/>
-  </Match>
-
 </FindBugsFilter>

http://git-wip-us.apache.org/repos/asf/tez/blob/39d76a65/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
index 0e54e9f..0b2406f 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
@@ -213,6 +213,19 @@ public interface Vertex extends Comparable<Vertex> {
     int getMaxFailedTaskAttempts();
     boolean getTaskRescheduleHigherPriority();
     boolean getTaskRescheduleRelaxedLocality();
+
+    /**
+     * @return tez.task.max.allowed.output.failures.
+     */
+    int getMaxAllowedOutputFailures();
+    /**
+     * @return tez.task.max.allowed.output.failures.fraction.
+     */
+    double getMaxAllowedOutputFailuresFraction();
+    /**
+     * @return tez.am.max.allowed.time-sec.for-read-error.
+     */
+    int getMaxAllowedTimeForTaskReadErrorSec();
   }
 
   void incrementRejectedTaskAttemptCount();

http://git-wip-us.apache.org/repos/asf/tez/blob/39d76a65/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
index bbec9ea..7399979 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
@@ -214,9 +214,6 @@ public class TaskAttemptImpl implements TaskAttempt,
   Set<String> taskRacks = new HashSet<String>();
 
   private Map<TezTaskAttemptID, Long> uniquefailedOutputReports = Maps.newHashMap();
-  private static double MAX_ALLOWED_OUTPUT_FAILURES_FRACTION;
-  private static int MAX_ALLOWED_OUTPUT_FAILURES;
-  private static int MAX_ALLOWED_TIME_FOR_TASK_READ_ERROR_SEC;
 
   protected final boolean isRescheduled;
   private final Resource taskResource;
@@ -548,18 +545,6 @@ public class TaskAttemptImpl implements TaskAttempt,
       Vertex vertex, TaskLocationHint locationHint, TaskSpec taskSpec,
       TezTaskAttemptID schedulingCausalTA) {
 
-    // TODO: Move these configs over to Vertex.VertexConfig
-    MAX_ALLOWED_OUTPUT_FAILURES = conf.getInt(TezConfiguration
-        .TEZ_TASK_MAX_ALLOWED_OUTPUT_FAILURES, TezConfiguration
-        .TEZ_TASK_MAX_ALLOWED_OUTPUT_FAILURES_DEFAULT);
-
-    MAX_ALLOWED_OUTPUT_FAILURES_FRACTION = conf.getDouble(TezConfiguration
-        .TEZ_TASK_MAX_ALLOWED_OUTPUT_FAILURES_FRACTION, TezConfiguration
-        .TEZ_TASK_MAX_ALLOWED_OUTPUT_FAILURES_FRACTION_DEFAULT);
-    
-    MAX_ALLOWED_TIME_FOR_TASK_READ_ERROR_SEC = conf.getInt(
-        TezConfiguration.TEZ_AM_MAX_ALLOWED_TIME_FOR_TASK_READ_ERROR_SEC,
-        TezConfiguration.TEZ_AM_MAX_ALLOWED_TIME_FOR_TASK_READ_ERROR_SEC_DEFAULT);
     ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
     this.readLock = rwLock.readLock();
     this.writeLock = rwLock.writeLock();
@@ -1793,17 +1778,24 @@ public class TaskAttemptImpl implements TaskAttempt,
         attempt.uniquefailedOutputReports.put(failedDestTaId, time);
         firstErrReportTime = time;
       }
-      
+
+      int maxAllowedOutputFailures = attempt.getVertex().getVertexConfig()
+          .getMaxAllowedOutputFailures();
+      int maxAllowedTimeForTaskReadErrorSec = attempt.getVertex()
+          .getVertexConfig().getMaxAllowedTimeForTaskReadErrorSec();
+      double maxAllowedOutputFailuresFraction = attempt.getVertex()
+          .getVertexConfig().getMaxAllowedOutputFailuresFraction();
+
       int readErrorTimespanSec = (int)((time - firstErrReportTime)/1000);
-      boolean crossTimeDeadline = readErrorTimespanSec >= MAX_ALLOWED_TIME_FOR_TASK_READ_ERROR_SEC;
+      boolean crossTimeDeadline = readErrorTimespanSec >= maxAllowedTimeForTaskReadErrorSec;
 
       int runningTasks = attempt.appContext.getCurrentDAG().getVertex(
           failedDestTaId.getTaskID().getVertexID()).getRunningTasks();
       float failureFraction = runningTasks > 0 ? ((float) attempt.uniquefailedOutputReports.size())
/ runningTasks : 0;
       boolean withinFailureFractionLimits =
-          (failureFraction <= MAX_ALLOWED_OUTPUT_FAILURES_FRACTION);
+          (failureFraction <= maxAllowedOutputFailuresFraction);
       boolean withinOutputFailureLimits =
-          (attempt.uniquefailedOutputReports.size() < MAX_ALLOWED_OUTPUT_FAILURES);
+          (attempt.uniquefailedOutputReports.size() < maxAllowedOutputFailures);
 
       // If needed we can launch a background task without failing this task
       // to generate a copy of the output just in case.
@@ -1813,10 +1805,12 @@ public class TaskAttemptImpl implements TaskAttempt,
       }
       String message = attempt.getID() + " being failed for too many output errors. "
           + "failureFraction=" + failureFraction
-          + ", MAX_ALLOWED_OUTPUT_FAILURES_FRACTION=" + MAX_ALLOWED_OUTPUT_FAILURES_FRACTION
+          + ", MAX_ALLOWED_OUTPUT_FAILURES_FRACTION="
+          + maxAllowedOutputFailuresFraction
           + ", uniquefailedOutputReports=" + attempt.uniquefailedOutputReports.size()
-          + ", MAX_ALLOWED_OUTPUT_FAILURES=" + MAX_ALLOWED_OUTPUT_FAILURES
-          + ", MAX_ALLOWED_TIME_FOR_TASK_READ_ERROR_SEC=" + MAX_ALLOWED_TIME_FOR_TASK_READ_ERROR_SEC
+          + ", MAX_ALLOWED_OUTPUT_FAILURES=" + maxAllowedOutputFailures
+          + ", MAX_ALLOWED_TIME_FOR_TASK_READ_ERROR_SEC="
+          + maxAllowedTimeForTaskReadErrorSec
           + ", readErrorTimespan=" + readErrorTimespanSec;
       LOG.info(message);
       attempt.addDiagnosticInfo(message);

http://git-wip-us.apache.org/repos/asf/tez/blob/39d76a65/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index 0184657..a4d2de1 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -4690,6 +4690,19 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
EventHandl
     private final boolean taskRescheduleHigherPriority;
     private final boolean taskRescheduleRelaxedLocality;
 
+    /**
+     * See tez.task.max.allowed.output.failures.fraction.
+     */
+    private final double maxAllowedOutputFailuresFraction;
+    /**
+     * See tez.task.max.allowed.output.failures.
+     */
+    private final int maxAllowedOutputFailures;
+    /**
+     * See tez.am.max.allowed.time-sec.for-read-error.
+     */
+    private final int maxAllowedTimeForTaskReadErrorSec;
+
     public VertexConfigImpl(Configuration conf) {
       this.maxFailedTaskAttempts = conf.getInt(TezConfiguration.TEZ_AM_TASK_MAX_FAILED_ATTEMPTS,
           TezConfiguration.TEZ_AM_TASK_MAX_FAILED_ATTEMPTS_DEFAULT);
@@ -4699,6 +4712,18 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
EventHandl
       this.taskRescheduleRelaxedLocality =
           conf.getBoolean(TezConfiguration.TEZ_AM_TASK_RESCHEDULE_RELAXED_LOCALITY,
               TezConfiguration.TEZ_AM_TASK_RESCHEDULE_RELAXED_LOCALITY_DEFAULT);
+
+      this.maxAllowedOutputFailures = conf.getInt(TezConfiguration
+          .TEZ_TASK_MAX_ALLOWED_OUTPUT_FAILURES, TezConfiguration
+          .TEZ_TASK_MAX_ALLOWED_OUTPUT_FAILURES_DEFAULT);
+
+      this.maxAllowedOutputFailuresFraction = conf.getDouble(TezConfiguration
+          .TEZ_TASK_MAX_ALLOWED_OUTPUT_FAILURES_FRACTION, TezConfiguration
+          .TEZ_TASK_MAX_ALLOWED_OUTPUT_FAILURES_FRACTION_DEFAULT);
+
+      this.maxAllowedTimeForTaskReadErrorSec = conf.getInt(
+          TezConfiguration.TEZ_AM_MAX_ALLOWED_TIME_FOR_TASK_READ_ERROR_SEC,
+          TezConfiguration.TEZ_AM_MAX_ALLOWED_TIME_FOR_TASK_READ_ERROR_SEC_DEFAULT);
     }
 
     @Override
@@ -4715,5 +4740,26 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
EventHandl
     public boolean getTaskRescheduleRelaxedLocality() {
       return taskRescheduleRelaxedLocality;
     }
+
+    /**
+     * @return maxAllowedOutputFailures.
+     */
+    @Override public int getMaxAllowedOutputFailures() {
+      return maxAllowedOutputFailures;
+    }
+
+    /**
+     * @return maxAllowedOutputFailuresFraction.
+     */
+    @Override public double getMaxAllowedOutputFailuresFraction() {
+      return maxAllowedOutputFailuresFraction;
+    }
+
+    /**
+     * @return maxAllowedTimeForTaskReadErrorSec.
+     */
+    @Override public int getMaxAllowedTimeForTaskReadErrorSec() {
+      return maxAllowedTimeForTaskReadErrorSec;
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/39d76a65/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
index 5ab68f7..5038810 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
@@ -160,15 +160,20 @@ public class TestTaskAttempt {
     when(appCtx.getContainerLauncherName(anyInt())).thenReturn(
         TezConstants.getTezYarnServicePluginName());
 
-    mockVertex = mock(Vertex.class);
-    when(mockVertex.getServicePluginInfo()).thenReturn(servicePluginInfo);
-    when(mockVertex.getVertexConfig()).thenReturn(new VertexImpl.VertexConfigImpl(vertexConf));
+    createMockVertex(vertexConf);
 
     HistoryEventHandler mockHistHandler = mock(HistoryEventHandler.class);
     doReturn(mockHistHandler).when(appCtx).getHistoryHandler();
     LogManager.getRootLogger().setLevel(Level.DEBUG);
   }
 
+  private void createMockVertex(Configuration conf) {
+    mockVertex = mock(Vertex.class);
+    when(mockVertex.getServicePluginInfo()).thenReturn(servicePluginInfo);
+    when(mockVertex.getVertexConfig()).thenReturn(
+        new VertexImpl.VertexConfigImpl(conf));
+  }
+
   @Test(timeout = 5000)
   public void testLocalityRequest() {
     TaskAttemptImpl.ScheduleTaskattemptTransition sta =
@@ -1919,7 +1924,11 @@ public class TestTaskAttempt {
     verify(eventHandler, times(expectedEventsAfterFetchFailure)).handle(
         arg.capture());
 
-    taskConf.setInt(TezConfiguration.TEZ_TASK_MAX_ALLOWED_OUTPUT_FAILURES, 1);
+    Configuration newVertexConf = new Configuration(vertexConf);
+    newVertexConf.setInt(TezConfiguration.TEZ_TASK_MAX_ALLOWED_OUTPUT_FAILURES,
+        1);
+    createMockVertex(newVertexConf);
+
     TezTaskID taskID2 = TezTaskID.getInstance(vertexID, 2);
     MockTaskAttemptImpl taImpl2 = new MockTaskAttemptImpl(taskID2, 1, eventHandler,
         taListener, taskConf, new SystemClock(),
@@ -1953,8 +1962,15 @@ public class TestTaskAttempt {
 
     Clock mockClock = mock(Clock.class); 
     int readErrorTimespanSec = 1;
-    taskConf.setInt(TezConfiguration.TEZ_TASK_MAX_ALLOWED_OUTPUT_FAILURES, 10);
-    taskConf.setInt(TezConfiguration.TEZ_AM_MAX_ALLOWED_TIME_FOR_TASK_READ_ERROR_SEC, readErrorTimespanSec);
+
+    newVertexConf = new Configuration(vertexConf);
+    newVertexConf.setInt(TezConfiguration.TEZ_TASK_MAX_ALLOWED_OUTPUT_FAILURES,
+        10);
+    newVertexConf.setInt(
+        TezConfiguration.TEZ_AM_MAX_ALLOWED_TIME_FOR_TASK_READ_ERROR_SEC,
+        readErrorTimespanSec);
+    createMockVertex(newVertexConf);
+
     TezTaskID taskID3 = TezTaskID.getInstance(vertexID, 3);
     MockTaskAttemptImpl taImpl3 = new MockTaskAttemptImpl(taskID3, 1, eventHandler,
         taListener, taskConf, mockClock,


Mime
View raw message