tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ss...@apache.org
Subject [32/51] [abbrv] tez git commit: TEZ-2508. rebase 06/01. (sseth)
Date Thu, 06 Aug 2015 09:26:24 GMT
TEZ-2508. rebase 06/01. (sseth)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/c75bb377
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/c75bb377
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/c75bb377

Branch: refs/heads/TEZ-2003
Commit: c75bb377a51d6f65df3d078ad6f810c625c4812a
Parents: c5d0062
Author: Siddharth Seth <sseth@apache.org>
Authored: Mon Jun 1 16:37:26 2015 -0700
Committer: Siddharth Seth <sseth@apache.org>
Committed: Thu Aug 6 01:26:10 2015 -0700

----------------------------------------------------------------------
 TEZ-2003-CHANGES.txt                                         | 1 +
 .../java/org/apache/tez/dag/api/TaskHeartbeatRequest.java    | 7 +++++++
 .../java/org/apache/tez/dag/api/TaskHeartbeatResponse.java   | 8 +++++++-
 .../org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java | 8 ++++----
 .../java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java | 3 ++-
 .../apache/tez/runtime/LogicalIOProcessorRuntimeTask.java    | 3 ---
 6 files changed, 21 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/c75bb377/TEZ-2003-CHANGES.txt
----------------------------------------------------------------------
diff --git a/TEZ-2003-CHANGES.txt b/TEZ-2003-CHANGES.txt
index 42c2e1e..55002fe 100644
--- a/TEZ-2003-CHANGES.txt
+++ b/TEZ-2003-CHANGES.txt
@@ -30,5 +30,6 @@ ALL CHANGES:
   TEZ-2471. NPE in LogicalIOProcessorRuntimeTask while printing thread info.
   TEZ-2495. Inform TaskCommunicaor about Task and Container termination reasons.
   TEZ-2502. Fix for TezTaskRunner2 not killing tasks properly in all situations.
+  TEZ-2508. rebase 06/01
 
 INCOMPATIBLE CHANGES:

http://git-wip-us.apache.org/repos/asf/tez/blob/c75bb377/tez-dag/src/main/java/org/apache/tez/dag/api/TaskHeartbeatRequest.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/api/TaskHeartbeatRequest.java b/tez-dag/src/main/java/org/apache/tez/dag/api/TaskHeartbeatRequest.java
index f6bc8f0..b5ff991 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/api/TaskHeartbeatRequest.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/api/TaskHeartbeatRequest.java
@@ -29,15 +29,18 @@ public class TaskHeartbeatRequest {
   private final TezTaskAttemptID taskAttemptId;
   private final List<TezEvent> events;
   private final int startIndex;
+  private final int preRoutedStartIndex;
   private final int maxEvents;
 
 
   public TaskHeartbeatRequest(String containerIdentifier, TezTaskAttemptID taskAttemptId,
List<TezEvent> events, int startIndex,
+                              int preRoutedStartIndex,
                               int maxEvents) {
     this.containerIdentifier = containerIdentifier;
     this.taskAttemptId = taskAttemptId;
     this.events = events;
     this.startIndex = startIndex;
+    this.preRoutedStartIndex = preRoutedStartIndex;
     this.maxEvents = maxEvents;
   }
 
@@ -57,6 +60,10 @@ public class TaskHeartbeatRequest {
     return startIndex;
   }
 
+  public int getPreRoutedStartIndex() {
+    return preRoutedStartIndex;
+  }
+
   public int getMaxEvents() {
     return maxEvents;
   }

http://git-wip-us.apache.org/repos/asf/tez/blob/c75bb377/tez-dag/src/main/java/org/apache/tez/dag/api/TaskHeartbeatResponse.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/api/TaskHeartbeatResponse.java b/tez-dag/src/main/java/org/apache/tez/dag/api/TaskHeartbeatResponse.java
index b826e76..7f063c4 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/api/TaskHeartbeatResponse.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/api/TaskHeartbeatResponse.java
@@ -23,12 +23,14 @@ public class TaskHeartbeatResponse {
 
   private final boolean shouldDie;
   private final int nextFromEventId;
+  private final int nextPreRoutedEventId;
   private final List<TezEvent> events;
 
-  public TaskHeartbeatResponse(boolean shouldDie, List<TezEvent> events, int nextFromEventId)
{
+  public TaskHeartbeatResponse(boolean shouldDie, List<TezEvent> events, int nextFromEventId,
int nextPreRoutedEventId) {
     this.shouldDie = shouldDie;
     this.events = events;
     this.nextFromEventId = nextFromEventId;
+    this.nextPreRoutedEventId = nextPreRoutedEventId;
   }
 
   public boolean isShouldDie() {
@@ -42,4 +44,8 @@ public class TaskHeartbeatResponse {
   public int getNextFromEventId() {
     return nextFromEventId;
   }
+
+  public int getNextPreRoutedEventId() {
+    return nextPreRoutedEventId;
+  }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/c75bb377/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
index 1c61a0d..e2d44e2 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
@@ -79,7 +79,7 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
   protected final TaskHeartbeatHandler taskHeartbeatHandler;
   protected final ContainerHeartbeatHandler containerHeartbeatHandler;
 
-  private final TaskHeartbeatResponse RESPONSE_SHOULD_DIE = new TaskHeartbeatResponse(true,
null, 0);
+  private final TaskHeartbeatResponse RESPONSE_SHOULD_DIE = new TaskHeartbeatResponse(true,
null, 0, 0);
 
   private final ConcurrentMap<TezTaskAttemptID, ContainerId> registeredAttempts =
       new ConcurrentHashMap<TezTaskAttemptID, ContainerId>();
@@ -195,7 +195,7 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
     // So - avoiding synchronization.
 
     pingContainerHeartbeatHandler(containerId);
-    TaskAttemptEventInfo eventInfo = new TaskAttemptEventInfo(0, null);
+    TaskAttemptEventInfo eventInfo = new TaskAttemptEventInfo(0, null, 0);
     TezTaskAttemptID taskAttemptID = request.getTaskAttemptId();
     if (taskAttemptID != null) {
       ContainerId containerIdFromMap = registeredAttempts.get(taskAttemptID);
@@ -241,10 +241,10 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
       eventInfo = context
           .getCurrentDAG()
           .getVertex(taskAttemptID.getTaskID().getVertexID())
-          .getTaskAttemptTezEvents(taskAttemptID, request.getStartIndex(),
+          .getTaskAttemptTezEvents(taskAttemptID, request.getStartIndex(), request.getPreRoutedStartIndex(),
               request.getMaxEvents());
     }
-    return new TaskHeartbeatResponse(false, eventInfo.getEvents(), eventInfo.getNextFromEventId());
+    return new TaskHeartbeatResponse(false, eventInfo.getEvents(), eventInfo.getNextFromEventId(),
eventInfo.getNextPreRoutedFromEventId());
   }
   public void taskAlive(TezTaskAttemptID taskAttemptId) {
     taskHeartbeatHandler.pinged(taskAttemptId);

http://git-wip-us.apache.org/repos/asf/tez/blob/c75bb377/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java
index 3774eb4..83322f2 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java
@@ -363,13 +363,14 @@ public class TezTaskCommunicatorImpl extends TaskCommunicator {
         }
         TaskHeartbeatRequest tRequest = new TaskHeartbeatRequest(request.getContainerIdentifier(),
             request.getCurrentTaskAttemptID(), request.getEvents(), request.getStartIndex(),
-            request.getMaxEvents());
+            request.getPreRoutedStartIndex(), request.getMaxEvents());
         tResponse = taskCommunicatorContext.heartbeat(tRequest);
       }
       TezHeartbeatResponse response = new TezHeartbeatResponse();
       response.setLastRequestId(requestId);
       response.setEvents(tResponse.getEvents());
       response.setNextFromEventId(tResponse.getNextFromEventId());
+      response.setNextPreRoutedEventId(tResponse.getNextPreRoutedEventId());
       containerInfo.lastRequestId = requestId;
       containerInfo.lastResponse = response;
       return response;

http://git-wip-us.apache.org/repos/asf/tez/blob/c75bb377/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
index 449fa0f..c79da5d 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
@@ -172,9 +172,6 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
     this.outputsMap = new ConcurrentHashMap<String, LogicalOutput>(numOutputs);
     this.outputContextMap = new ConcurrentHashMap<String, OutputContext>(numOutputs);
 
-    this.initializedInputs = new ConcurrentHashMap<String, LogicalInput>();
-    this.initializedOutputs = new ConcurrentHashMap<String, LogicalOutput>();
-
     this.runInputMap = new LinkedHashMap<String, LogicalInput>();
     this.runOutputMap = new LinkedHashMap<String, LogicalOutput>();
 


Mime
View raw message