tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ss...@apache.org
Subject [13/37] tez git commit: TEZ-2314. Tez task attempt failures due to bad event serialization (bikas)
Date Tue, 28 Apr 2015 20:40:54 GMT
TEZ-2314. Tez task attempt failures due to bad event serialization (bikas)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/25224477
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/25224477
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/25224477

Branch: refs/heads/TEZ-2003
Commit: 2522447732f93ec86f76625392ed5f34430d294c
Parents: 73bdbb2
Author: Bikas Saha <bikas@apache.org>
Authored: Mon Apr 27 22:59:09 2015 -0700
Committer: Bikas Saha <bikas@apache.org>
Committed: Mon Apr 27 22:59:09 2015 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../org/apache/tez/runtime/RuntimeTask.java     |  5 ++
 .../tez/runtime/api/impl/IOStatistics.java      |  4 +-
 .../apache/tez/runtime/task/TaskReporter.java   | 19 +++++-
 .../tez/runtime/task/TestTaskReporter.java      | 62 ++++++++++++++++++++
 5 files changed, 86 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/25224477/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index d6a0adf..0bd5214 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -9,6 +9,7 @@ INCOMPATIBLE CHANGES
   TEZ-1993. Implement a pluggable InputSizeEstimator for grouping fairly
 
 ALL CHANGES:
+  TEZ-2314. Tez task attempt failures due to bad event serialization
   TEZ-2368. Make a dag identifier available in Context classes.
   TEZ-2325. Route status update event directly to the attempt.
   TEZ-2358. Pipelined Shuffle: MergeManager assumptions about 1 merge per source-task.

http://git-wip-us.apache.org/repos/asf/tez/blob/25224477/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeTask.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeTask.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeTask.java
index 745b10b..f8b8621 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeTask.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeTask.java
@@ -19,6 +19,7 @@
 package org.apache.tez.runtime;
 
 import java.util.Collection;
+import java.util.EnumSet;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -77,6 +78,10 @@ public abstract class RuntimeTask {
     return counter;
   }
   
+  public boolean hasInitialized() {
+    return EnumSet.of(State.RUNNING, State.CLOSED).contains(state.get());
+  }
+  
   public String getVertexName() {
     return taskSpec.getVertexName();
   }

http://git-wip-us.apache.org/repos/asf/tez/blob/25224477/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/IOStatistics.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/IOStatistics.java
b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/IOStatistics.java
index 0f8b589..8f28062 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/IOStatistics.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/IOStatistics.java
@@ -24,8 +24,8 @@ import java.io.IOException;
 import org.apache.hadoop.io.Writable;
 
 public class IOStatistics implements Writable {
-  private long dataSize = 0;
-  private long numItems = 0;
+  private volatile long dataSize = 0;
+  private volatile long numItems = 0;
   
   public void setDataSize(long size) {
     this.dataSize = size;

http://git-wip-us.apache.org/repos/asf/tez/blob/25224477/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskReporter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskReporter.java
b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskReporter.java
index b9e7217..7324abd 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskReporter.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskReporter.java
@@ -34,6 +34,7 @@ import java.util.concurrent.locks.ReentrantLock;
 
 import org.apache.commons.lang.exception.ExceptionUtils;
 import org.apache.tez.common.TezTaskUmbilicalProtocol;
+import org.apache.tez.common.counters.TezCounters;
 import org.apache.tez.dag.api.TezException;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.runtime.RuntimeTask;
@@ -41,6 +42,7 @@ import org.apache.tez.runtime.api.events.TaskAttemptCompletedEvent;
 import org.apache.tez.runtime.api.events.TaskAttemptFailedEvent;
 import org.apache.tez.runtime.api.events.TaskStatusUpdateEvent;
 import org.apache.tez.runtime.api.impl.EventMetaData;
+import org.apache.tez.runtime.api.impl.TaskStatistics;
 import org.apache.tez.runtime.api.impl.TezEvent;
 import org.apache.tez.runtime.api.impl.TezHeartbeatRequest;
 import org.apache.tez.runtime.api.impl.TezHeartbeatResponse;
@@ -317,9 +319,20 @@ public class TaskReporter {
       return !heartbeat(Lists.newArrayList(statusUpdateEvent, taskCompletedEvent)).shouldDie;
     }
     
-    private TaskStatusUpdateEvent getStatusUpdateEvent(boolean sendCounters) {
-      return new TaskStatusUpdateEvent((sendCounters ? task.getCounters() : null),
-          task.getProgress(), task.getTaskStatistics());
+    @VisibleForTesting
+    TaskStatusUpdateEvent getStatusUpdateEvent(boolean sendCounters) {
+      TezCounters counters = null;
+      TaskStatistics stats = null;
+      float progress = 0;
+      if (task.hasInitialized()) {
+        progress = task.getProgress();
+        if (sendCounters) {
+          // send these potentially large objects at longer intervals to avoid overloading
the AM
+          counters = task.getCounters();
+          stats = task.getTaskStatistics();
+        }
+      }
+      return new TaskStatusUpdateEvent(counters, progress, stats);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/tez/blob/25224477/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTaskReporter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTaskReporter.java
b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTaskReporter.java
index 9add252..b44c9f8 100644
--- a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTaskReporter.java
+++ b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTaskReporter.java
@@ -32,17 +32,23 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 
 import com.google.common.collect.Lists;
+
 import org.apache.tez.common.TezTaskUmbilicalProtocol;
+import org.apache.tez.common.counters.TezCounters;
 import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.runtime.LogicalIOProcessorRuntimeTask;
+import org.apache.tez.runtime.api.events.TaskStatusUpdateEvent;
+import org.apache.tez.runtime.api.impl.TaskStatistics;
 import org.apache.tez.runtime.api.impl.TezEvent;
 import org.apache.tez.runtime.api.impl.TezHeartbeatRequest;
 import org.apache.tez.runtime.api.impl.TezHeartbeatResponse;
+import org.junit.Assert;
 import org.junit.Test;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
+@SuppressWarnings("rawtypes")
 public class TestTaskReporter {
 
   @Test(timeout = 10000)
@@ -103,6 +109,62 @@ public class TestTaskReporter {
     }
 
   }
+  
+  @Test (timeout=5000)
+  public void testStatusUpdateAfterInitializationAndCounterFlag() {
+    TezTaskAttemptID mockTaskAttemptId = mock(TezTaskAttemptID.class);
+    LogicalIOProcessorRuntimeTask mockTask = mock(LogicalIOProcessorRuntimeTask.class);
+    doReturn("vertexName").when(mockTask).getVertexName();
+    doReturn(mockTaskAttemptId).when(mockTask).getTaskAttemptID();
+    TezTaskUmbilicalProtocol mockUmbilical = mock(TezTaskUmbilicalProtocol.class);
+    
+    float progress = 0.5f;
+    TaskStatistics stats = new TaskStatistics();
+    TezCounters counters = new TezCounters();
+    doReturn(progress).when(mockTask).getProgress();
+    doReturn(stats).when(mockTask).getTaskStatistics();
+    doReturn(counters).when(mockTask).getCounters();
+    
+    // Setup the sleep time to be way higher than the test timeout
+    TaskReporter.HeartbeatCallable heartbeatCallable =
+        new TaskReporter.HeartbeatCallable(mockTask, mockUmbilical, 100000, 100000, 5,
+            new AtomicLong(0),
+            "containerIdStr");
+    
+    // task not initialized - nothing obtained from task
+    doReturn(false).when(mockTask).hasInitialized();
+    TaskStatusUpdateEvent event = heartbeatCallable.getStatusUpdateEvent(true);
+    verify(mockTask, times(1)).hasInitialized();
+    verify(mockTask, times(0)).getProgress();
+    verify(mockTask, times(0)).getTaskStatistics();
+    verify(mockTask, times(0)).getCounters();
+    Assert.assertEquals(0, event.getProgress(), 0);
+    Assert.assertNull(event.getCounters());
+    Assert.assertNull(event.getStatistics());
+
+    // task is initialized - progress obtained but not counters since flag is false
+    doReturn(true).when(mockTask).hasInitialized();
+    event = heartbeatCallable.getStatusUpdateEvent(false);
+    verify(mockTask, times(2)).hasInitialized();
+    verify(mockTask, times(1)).getProgress();
+    verify(mockTask, times(0)).getTaskStatistics();
+    verify(mockTask, times(0)).getCounters();
+    Assert.assertEquals(progress, event.getProgress(), 0);
+    Assert.assertNull(event.getCounters());
+    Assert.assertNull(event.getStatistics());
+
+    // task is initialized - progress obtained and also counters since flag is true
+    doReturn(true).when(mockTask).hasInitialized();
+    event = heartbeatCallable.getStatusUpdateEvent(true);
+    verify(mockTask, times(3)).hasInitialized();
+    verify(mockTask, times(2)).getProgress();
+    verify(mockTask, times(1)).getTaskStatistics();
+    verify(mockTask, times(1)).getCounters();
+    Assert.assertEquals(progress, event.getProgress(), 0);
+    Assert.assertEquals(counters, event.getCounters());
+    Assert.assertEquals(stats, event.getStatistics());
+
+  }
 
   private List<TezEvent> createEvents(int numEvents) {
     List<TezEvent> list = Lists.newArrayListWithCapacity(numEvents);


Mime
View raw message