tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ss...@apache.org
Subject [12/37] tez git commit: TEZ-2303. ConcurrentModificationException while processing recovery (zjffdu)
Date Tue, 28 Apr 2015 20:40:53 GMT
TEZ-2303. ConcurrentModificationException while processing recovery (zjffdu)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/73bdbb21
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/73bdbb21
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/73bdbb21

Branch: refs/heads/TEZ-2003
Commit: 73bdbb219ded569ad88caf39a56abad3b0503d08
Parents: 9e9cf99
Author: Jeff Zhang <zjffdu@apache.org>
Authored: Tue Apr 28 10:40:12 2015 +0800
Committer: Jeff Zhang <zjffdu@apache.org>
Committed: Tue Apr 28 10:40:12 2015 +0800

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../apache/tez/dag/app/dag/impl/DAGImpl.java    |  83 ++++----
 .../tez/dag/app/dag/impl/TaskAttemptImpl.java   |  67 ++++---
 .../apache/tez/dag/app/dag/impl/TaskImpl.java   | 187 ++++++++++---------
 .../apache/tez/dag/app/dag/impl/VertexImpl.java | 162 ++++++++--------
 5 files changed, 260 insertions(+), 240 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/73bdbb21/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index b582b85..d6a0adf 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -318,6 +318,7 @@ TEZ-UI CHANGES (TEZ-8):
 Release 0.5.4: Unreleased
 
 ALL CHANGES:
+  TEZ-2303. ConcurrentModificationException while processing recovery.
   TEZ-2334. ContainerManagementProtocolProxy modifies IPC timeout conf without making a copy.
   TEZ-2317. Event processing backlog can result in task failures for short
   tasks

http://git-wip-us.apache.org/repos/asf/tez/blob/73bdbb21/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
index c47a0d7..f562451 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
@@ -618,45 +618,50 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
 
   @Override
   public DAGState restoreFromEvent(HistoryEvent historyEvent) {
-    switch (historyEvent.getEventType()) {
-      case DAG_INITIALIZED:
-        recoveredState = initializeDAG((DAGInitializedEvent) historyEvent);
-        recoveryInitEventSeen = true;
-        return recoveredState;
-      case DAG_STARTED:
-        if (!recoveryInitEventSeen) {
-          throw new RuntimeException("Started Event seen but"
-              + " no Init Event was encountered earlier");
-        }
-        recoveryStartEventSeen = true;
-        this.startTime = ((DAGStartedEvent) historyEvent).getStartTime();
-        recoveredState = DAGState.RUNNING;
-        return recoveredState;
-      case DAG_COMMIT_STARTED:
-        recoveryCommitInProgress = true;
-        return recoveredState;
-      case VERTEX_GROUP_COMMIT_STARTED:
-        VertexGroupCommitStartedEvent vertexGroupCommitStartedEvent =
-            (VertexGroupCommitStartedEvent) historyEvent;
-        recoveredGroupCommits.put(
-            vertexGroupCommitStartedEvent.getVertexGroupName(), false);
-        return recoveredState;
-      case VERTEX_GROUP_COMMIT_FINISHED:
-        VertexGroupCommitFinishedEvent vertexGroupCommitFinishedEvent =
-            (VertexGroupCommitFinishedEvent) historyEvent;
-        recoveredGroupCommits.put(
-            vertexGroupCommitFinishedEvent.getVertexGroupName(), true);
-        return recoveredState;
-      case DAG_FINISHED:
-        recoveryCommitInProgress = false;
-        DAGFinishedEvent finishedEvent = (DAGFinishedEvent) historyEvent;
-        setFinishTime(finishedEvent.getFinishTime());
-        recoveredState = finishedEvent.getState();
-        this.fullCounters = finishedEvent.getTezCounters();
-        return recoveredState;
-      default:
-        throw new RuntimeException("Unexpected event received for restoring"
-            + " state, eventType=" + historyEvent.getEventType());
+    writeLock.lock();
+    try {
+      switch (historyEvent.getEventType()) {
+        case DAG_INITIALIZED:
+          recoveredState = initializeDAG((DAGInitializedEvent) historyEvent);
+          recoveryInitEventSeen = true;
+          return recoveredState;
+        case DAG_STARTED:
+          if (!recoveryInitEventSeen) {
+            throw new RuntimeException("Started Event seen but"
+                + " no Init Event was encountered earlier");
+          }
+          recoveryStartEventSeen = true;
+          this.startTime = ((DAGStartedEvent) historyEvent).getStartTime();
+          recoveredState = DAGState.RUNNING;
+          return recoveredState;
+        case DAG_COMMIT_STARTED:
+          recoveryCommitInProgress = true;
+          return recoveredState;
+        case VERTEX_GROUP_COMMIT_STARTED:
+          VertexGroupCommitStartedEvent vertexGroupCommitStartedEvent =
+              (VertexGroupCommitStartedEvent) historyEvent;
+          recoveredGroupCommits.put(
+              vertexGroupCommitStartedEvent.getVertexGroupName(), false);
+          return recoveredState;
+        case VERTEX_GROUP_COMMIT_FINISHED:
+          VertexGroupCommitFinishedEvent vertexGroupCommitFinishedEvent =
+              (VertexGroupCommitFinishedEvent) historyEvent;
+          recoveredGroupCommits.put(
+              vertexGroupCommitFinishedEvent.getVertexGroupName(), true);
+          return recoveredState;
+        case DAG_FINISHED:
+          recoveryCommitInProgress = false;
+          DAGFinishedEvent finishedEvent = (DAGFinishedEvent) historyEvent;
+          setFinishTime(finishedEvent.getFinishTime());
+          recoveredState = finishedEvent.getState();
+          this.fullCounters = finishedEvent.getTezCounters();
+          return recoveredState;
+        default:
+          throw new RuntimeException("Unexpected event received for restoring"
+              + " state, eventType=" + historyEvent.getEventType());
+      }
+    } finally {
+      writeLock.unlock();
     }
   }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/73bdbb21/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
index 1af4274..1f3e1cf 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
@@ -734,39 +734,44 @@ public class TaskAttemptImpl implements TaskAttempt,
 
   @Override
   public TaskAttemptState restoreFromEvent(HistoryEvent historyEvent) {
-    switch (historyEvent.getEventType()) {
-      case TASK_ATTEMPT_STARTED:
-      {
-        TaskAttemptStartedEvent tEvent = (TaskAttemptStartedEvent) historyEvent;
-        this.launchTime = tEvent.getStartTime();
-        recoveryStartEventSeen = true;
-        recoveredState = TaskAttemptState.RUNNING;
-        this.containerId = tEvent.getContainerId();
-        sendEvent(createDAGCounterUpdateEventTALaunched(this));
-        return recoveredState;
-      }
-      case TASK_ATTEMPT_FINISHED:
-      {
-        if (!recoveryStartEventSeen) {
-          throw new RuntimeException("Finished Event seen but"
-              + " no Started Event was encountered earlier");
+    writeLock.lock();
+    try {
+      switch (historyEvent.getEventType()) {
+        case TASK_ATTEMPT_STARTED:
+        {
+          TaskAttemptStartedEvent tEvent = (TaskAttemptStartedEvent) historyEvent;
+          this.launchTime = tEvent.getStartTime();
+          recoveryStartEventSeen = true;
+          recoveredState = TaskAttemptState.RUNNING;
+          this.containerId = tEvent.getContainerId();
+          sendEvent(createDAGCounterUpdateEventTALaunched(this));
+          return recoveredState;
+        }
+        case TASK_ATTEMPT_FINISHED:
+        {
+          if (!recoveryStartEventSeen) {
+            throw new RuntimeException("Finished Event seen but"
+                + " no Started Event was encountered earlier");
+          }
+          TaskAttemptFinishedEvent tEvent = (TaskAttemptFinishedEvent) historyEvent;
+          this.finishTime = tEvent.getFinishTime();
+          this.reportedStatus.counters = tEvent.getCounters();
+          this.reportedStatus.progress = 1f;
+          this.reportedStatus.state = tEvent.getState();
+          this.terminationCause = tEvent.getTaskAttemptError() != null ? tEvent.getTaskAttemptError()
+              : TaskAttemptTerminationCause.UNKNOWN_ERROR;
+          this.diagnostics.add(tEvent.getDiagnostics());
+          this.recoveredState = tEvent.getState();
+          sendEvent(createDAGCounterUpdateEventTAFinished(this, tEvent.getState()));
+          return recoveredState;
         }
-        TaskAttemptFinishedEvent tEvent = (TaskAttemptFinishedEvent) historyEvent;
-        this.finishTime = tEvent.getFinishTime();
-        this.reportedStatus.counters = tEvent.getCounters();
-        this.reportedStatus.progress = 1f;
-        this.reportedStatus.state = tEvent.getState();
-        this.terminationCause = tEvent.getTaskAttemptError() != null ? tEvent.getTaskAttemptError()
-            : TaskAttemptTerminationCause.UNKNOWN_ERROR;
-        this.diagnostics.add(tEvent.getDiagnostics());
-        this.recoveredState = tEvent.getState();
-        sendEvent(createDAGCounterUpdateEventTAFinished(this, tEvent.getState()));
-        return recoveredState;
+        default:
+          throw new RuntimeException("Unexpected event received for restoring"
+              + " state, eventType=" + historyEvent.getEventType());
+  
       }
-      default:
-        throw new RuntimeException("Unexpected event received for restoring"
-            + " state, eventType=" + historyEvent.getEventType());
-
+    } finally {
+      writeLock.unlock();
     }
   }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/73bdbb21/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
index a1eed07..91413a5 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
@@ -517,106 +517,111 @@ public class TaskImpl implements Task, EventHandler<TaskEvent>
{
 
   @Override
   public TaskState restoreFromEvent(HistoryEvent historyEvent) {
-    switch (historyEvent.getEventType()) {
-      case TASK_STARTED:
-      {
-        TaskStartedEvent tEvent = (TaskStartedEvent) historyEvent;
-        recoveryStartEventSeen = true;
-        this.scheduledTime = tEvent.getScheduledTime();
-        if (this.attempts == null
-            || this.attempts.isEmpty()) {
-          this.attempts = new LinkedHashMap<TezTaskAttemptID, TaskAttempt>();
-        }
-        recoveredState = TaskState.SCHEDULED;
-        historyTaskStartGenerated = true;
-        taskAttemptStatus.clear();
-        return recoveredState;
-      }
-      case TASK_FINISHED:
-      {
-        TaskFinishedEvent tEvent = (TaskFinishedEvent) historyEvent;
-        if (!recoveryStartEventSeen
-            && !tEvent.getState().equals(TaskState.KILLED)) {
-          throw new TezUncheckedException("Finished Event seen but"
-              + " no Started Event was encountered earlier"
-              + ", taskId=" + taskId
-              + ", finishState=" + tEvent.getState());
-        }
-        recoveredState = tEvent.getState();
-        if (tEvent.getState() == TaskState.SUCCEEDED
-            && tEvent.getSuccessfulAttemptID() != null) {
-          successfulAttempt = tEvent.getSuccessfulAttemptID();
+    writeLock.lock();
+    try {
+      switch (historyEvent.getEventType()) {
+        case TASK_STARTED:
+        {
+          TaskStartedEvent tEvent = (TaskStartedEvent) historyEvent;
+          recoveryStartEventSeen = true;
+          this.scheduledTime = tEvent.getScheduledTime();
+          if (this.attempts == null
+              || this.attempts.isEmpty()) {
+            this.attempts = new LinkedHashMap<TezTaskAttemptID, TaskAttempt>();
+          }
+          recoveredState = TaskState.SCHEDULED;
+          historyTaskStartGenerated = true;
+          taskAttemptStatus.clear();
+          return recoveredState;
         }
-        return recoveredState;
-      }
-      case TASK_ATTEMPT_STARTED:
-      {
-        TaskAttemptStartedEvent taskAttemptStartedEvent =
-            (TaskAttemptStartedEvent) historyEvent;
-        TaskAttempt recoveredAttempt = createRecoveredTaskAttempt(
-            taskAttemptStartedEvent.getTaskAttemptID());
-        recoveredAttempt.restoreFromEvent(taskAttemptStartedEvent);
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Adding restored attempt into known attempts map"
-              + ", taskAttemptId=" + taskAttemptStartedEvent.getTaskAttemptID());
+        case TASK_FINISHED:
+        {
+          TaskFinishedEvent tEvent = (TaskFinishedEvent) historyEvent;
+          if (!recoveryStartEventSeen
+              && !tEvent.getState().equals(TaskState.KILLED)) {
+            throw new TezUncheckedException("Finished Event seen but"
+                + " no Started Event was encountered earlier"
+                + ", taskId=" + taskId
+                + ", finishState=" + tEvent.getState());
+          }
+          recoveredState = tEvent.getState();
+          if (tEvent.getState() == TaskState.SUCCEEDED
+              && tEvent.getSuccessfulAttemptID() != null) {
+            successfulAttempt = tEvent.getSuccessfulAttemptID();
+          }
+          return recoveredState;
         }
-        this.attempts.put(taskAttemptStartedEvent.getTaskAttemptID(),
-            recoveredAttempt);
-        this.taskAttemptStatus.put(taskAttemptStartedEvent.getTaskAttemptID().getId(), false);
-        this.recoveredState = TaskState.RUNNING;
-        return recoveredState;
-      }
-      case TASK_ATTEMPT_FINISHED:
-      {
-        TaskAttemptFinishedEvent taskAttemptFinishedEvent =
-            (TaskAttemptFinishedEvent) historyEvent;
-        TaskAttempt taskAttempt = this.attempts.get(
-            taskAttemptFinishedEvent.getTaskAttemptID());
-        this.taskAttemptStatus.put(taskAttemptFinishedEvent.getTaskAttemptID().getId(), true);
-        if (taskAttempt == null) {
-          LOG.warn("Received an attempt finished event for an attempt that "
-              + " never started or does not exist"
-              + ", taskAttemptId=" + taskAttemptFinishedEvent.getTaskAttemptID()
-              + ", taskAttemptFinishState=" + taskAttemptFinishedEvent.getState());
+        case TASK_ATTEMPT_STARTED:
+        {
+          TaskAttemptStartedEvent taskAttemptStartedEvent =
+              (TaskAttemptStartedEvent) historyEvent;
           TaskAttempt recoveredAttempt = createRecoveredTaskAttempt(
-              taskAttemptFinishedEvent.getTaskAttemptID());
-          this.attempts.put(taskAttemptFinishedEvent.getTaskAttemptID(),
+              taskAttemptStartedEvent.getTaskAttemptID());
+          recoveredAttempt.restoreFromEvent(taskAttemptStartedEvent);
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Adding restored attempt into known attempts map"
+                + ", taskAttemptId=" + taskAttemptStartedEvent.getTaskAttemptID());
+          }
+          this.attempts.put(taskAttemptStartedEvent.getTaskAttemptID(),
               recoveredAttempt);
-          if (!taskAttemptFinishedEvent.getState().equals(TaskAttemptState.KILLED)) {
-            throw new TezUncheckedException("Could not find task attempt"
-                + " when trying to recover"
+          this.taskAttemptStatus.put(taskAttemptStartedEvent.getTaskAttemptID().getId(),
false);
+          this.recoveredState = TaskState.RUNNING;
+          return recoveredState;
+        }
+        case TASK_ATTEMPT_FINISHED:
+        {
+          TaskAttemptFinishedEvent taskAttemptFinishedEvent =
+              (TaskAttemptFinishedEvent) historyEvent;
+          TaskAttempt taskAttempt = this.attempts.get(
+              taskAttemptFinishedEvent.getTaskAttemptID());
+          this.taskAttemptStatus.put(taskAttemptFinishedEvent.getTaskAttemptID().getId(),
true);
+          if (taskAttempt == null) {
+            LOG.warn("Received an attempt finished event for an attempt that "
+                + " never started or does not exist"
                 + ", taskAttemptId=" + taskAttemptFinishedEvent.getTaskAttemptID()
-                + ", taskAttemptFinishState" + taskAttemptFinishedEvent.getState());
+                + ", taskAttemptFinishState=" + taskAttemptFinishedEvent.getState());
+            TaskAttempt recoveredAttempt = createRecoveredTaskAttempt(
+                taskAttemptFinishedEvent.getTaskAttemptID());
+            this.attempts.put(taskAttemptFinishedEvent.getTaskAttemptID(),
+                recoveredAttempt);
+            if (!taskAttemptFinishedEvent.getState().equals(TaskAttemptState.KILLED)) {
+              throw new TezUncheckedException("Could not find task attempt"
+                  + " when trying to recover"
+                  + ", taskAttemptId=" + taskAttemptFinishedEvent.getTaskAttemptID()
+                  + ", taskAttemptFinishState" + taskAttemptFinishedEvent.getState());
+            }
+            return recoveredState;
+          }
+          if (getUncompletedAttemptsCount() < 0) {
+            throw new TezUncheckedException("Invalid recovery event for attempt finished"
+                + ", more completions than starts encountered"
+                + ", taskId=" + taskId
+                + ", finishedAttempts=" + getFinishedAttemptsCount()
+                + ", incompleteAttempts=" + getUncompletedAttemptsCount());
+          }
+          TaskAttemptState taskAttemptState = taskAttempt.restoreFromEvent(
+              taskAttemptFinishedEvent);
+          if (taskAttemptState.equals(TaskAttemptState.SUCCEEDED)) {
+            recoveredState = TaskState.SUCCEEDED;
+            successfulAttempt = taskAttempt.getID();
+          } else if (taskAttemptState.equals(TaskAttemptState.FAILED)){
+            failedAttempts++;
+            getVertex().incrementFailedTaskAttemptCount();
+            successfulAttempt = null;
+            recoveredState = TaskState.RUNNING; // reset to RUNNING, may fail after SUCCEEDED
+          } else if (taskAttemptState.equals(TaskAttemptState.KILLED)) {
+            successfulAttempt = null;
+            getVertex().incrementKilledTaskAttemptCount();
+            recoveredState = TaskState.RUNNING; // reset to RUNNING, may been killed after
SUCCEEDED
           }
           return recoveredState;
         }
-        if (getUncompletedAttemptsCount() < 0) {
-          throw new TezUncheckedException("Invalid recovery event for attempt finished"
-              + ", more completions than starts encountered"
-              + ", taskId=" + taskId
-              + ", finishedAttempts=" + getFinishedAttemptsCount()
-              + ", incompleteAttempts=" + getUncompletedAttemptsCount());
-        }
-        TaskAttemptState taskAttemptState = taskAttempt.restoreFromEvent(
-            taskAttemptFinishedEvent);
-        if (taskAttemptState.equals(TaskAttemptState.SUCCEEDED)) {
-          recoveredState = TaskState.SUCCEEDED;
-          successfulAttempt = taskAttempt.getID();
-        } else if (taskAttemptState.equals(TaskAttemptState.FAILED)){
-          failedAttempts++;
-          getVertex().incrementFailedTaskAttemptCount();
-          successfulAttempt = null;
-          recoveredState = TaskState.RUNNING; // reset to RUNNING, may fail after SUCCEEDED
-        } else if (taskAttemptState.equals(TaskAttemptState.KILLED)) {
-          successfulAttempt = null;
-          getVertex().incrementKilledTaskAttemptCount();
-          recoveredState = TaskState.RUNNING; // reset to RUNNING, may been killed after
SUCCEEDED
-        }
-        return recoveredState;
+        default:
+          throw new RuntimeException("Unexpected event received for restoring"
+              + " state, eventType=" + historyEvent.getEventType());
       }
-      default:
-        throw new RuntimeException("Unexpected event received for restoring"
-            + " state, eventType=" + historyEvent.getEventType());
+    } finally {
+      writeLock.unlock();
     }
   }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/73bdbb21/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index c4619a0..987e9d3 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -1257,85 +1257,89 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
 
   @Override
   public VertexState restoreFromEvent(HistoryEvent historyEvent) {
-    switch (historyEvent.getEventType()) {
-      case VERTEX_INITIALIZED:
-        recoveryInitEventSeen = true;
-        recoveredState = setupVertex((VertexInitializedEvent) historyEvent);
-        createTasks();
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Recovered state for vertex after Init event"
-              + ", vertex=" + logIdentifier
-              + ", recoveredState=" + recoveredState);
-        }
-        return recoveredState;
-      case VERTEX_STARTED:
-        if (!recoveryInitEventSeen) {
-          throw new RuntimeException("Started Event seen but"
-              + " no Init Event was encountered earlier");
-        }
-        recoveryStartEventSeen = true;
-        VertexStartedEvent startedEvent = (VertexStartedEvent) historyEvent;
-        startTimeRequested = startedEvent.getStartRequestedTime();
-        startedTime = startedEvent.getStartTime();
-        recoveredState = VertexState.RUNNING;
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Recovered state for vertex after Started event"
-              + ", vertex=" + logIdentifier
-              + ", recoveredState=" + recoveredState);
-        }
-        return recoveredState;
-      case VERTEX_PARALLELISM_UPDATED:
-        // TODO TEZ-1019 this should flow through setParallelism method
-        VertexParallelismUpdatedEvent updatedEvent =
-            (VertexParallelismUpdatedEvent) historyEvent;
-        int oldNumTasks = numTasks;
-        int newNumTasks = updatedEvent.getNumTasks();
-        handleParallelismUpdate(newNumTasks, updatedEvent.getSourceEdgeProperties(),
-          updatedEvent.getRootInputSpecUpdates(), oldNumTasks);
-        Preconditions.checkState(this.numTasks == newNumTasks, getLogIdentifier());
-        if (updatedEvent.getVertexLocationHint() != null) {
-          setVertexLocationHint(updatedEvent.getVertexLocationHint());
-        }
-        stateChangeNotifier.stateChanged(vertexId,
-            new VertexStateUpdateParallelismUpdated(vertexName, numTasks, oldNumTasks));
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Recovered state for vertex after parallelism updated event"
-              + ", vertex=" + logIdentifier
-              + ", recoveredState=" + recoveredState);
-        }
-        return recoveredState;
-      case VERTEX_COMMIT_STARTED:
-        recoveryCommitInProgress = true;
-        hasCommitter = true;
-        return recoveredState;
-      case VERTEX_FINISHED:
-        VertexFinishedEvent finishedEvent = (VertexFinishedEvent) historyEvent;
-        if (finishedEvent.isFromSummary()) {
-          summaryCompleteSeen  = true;
-        } else {
-          vertexCompleteSeen = true;
-        }
-        numTasks = finishedEvent.getNumTasks();
-        recoveryCommitInProgress = false;
-        recoveredState = finishedEvent.getState();
-        diagnostics.add(finishedEvent.getDiagnostics());
-        finishTime = finishedEvent.getFinishTime();
-        // TODO counters ??
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Recovered state for vertex after finished event"
-              + ", vertex=" + logIdentifier
-              + ", recoveredState=" + recoveredState);
-        }
-        return recoveredState;
-      case VERTEX_DATA_MOVEMENT_EVENTS_GENERATED:
-        VertexRecoverableEventsGeneratedEvent vEvent =
-            (VertexRecoverableEventsGeneratedEvent) historyEvent;
-        this.recoveredEvents.addAll(vEvent.getTezEvents());
-        return recoveredState;
-      default:
-        throw new RuntimeException("Unexpected event received for restoring"
-            + " state, eventType=" + historyEvent.getEventType());
-
+    writeLock.lock();
+    try {
+      switch (historyEvent.getEventType()) {
+        case VERTEX_INITIALIZED:
+          recoveryInitEventSeen = true;
+          recoveredState = setupVertex((VertexInitializedEvent) historyEvent);
+          createTasks();
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Recovered state for vertex after Init event"
+                + ", vertex=" + logIdentifier
+                + ", recoveredState=" + recoveredState);
+          }
+          return recoveredState;
+        case VERTEX_STARTED:
+          if (!recoveryInitEventSeen) {
+            throw new RuntimeException("Started Event seen but"
+                + " no Init Event was encountered earlier");
+          }
+          recoveryStartEventSeen = true;
+          VertexStartedEvent startedEvent = (VertexStartedEvent) historyEvent;
+          startTimeRequested = startedEvent.getStartRequestedTime();
+          startedTime = startedEvent.getStartTime();
+          recoveredState = VertexState.RUNNING;
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Recovered state for vertex after Started event"
+                + ", vertex=" + logIdentifier
+                + ", recoveredState=" + recoveredState);
+          }
+          return recoveredState;
+        case VERTEX_PARALLELISM_UPDATED:
+          // TODO TEZ-1019 this should flow through setParallelism method
+          VertexParallelismUpdatedEvent updatedEvent =
+              (VertexParallelismUpdatedEvent) historyEvent;
+          int oldNumTasks = numTasks;
+          int newNumTasks = updatedEvent.getNumTasks();
+          handleParallelismUpdate(newNumTasks, updatedEvent.getSourceEdgeProperties(),
+            updatedEvent.getRootInputSpecUpdates(), oldNumTasks);
+          Preconditions.checkState(this.numTasks == newNumTasks, getLogIdentifier());
+          if (updatedEvent.getVertexLocationHint() != null) {
+            setVertexLocationHint(updatedEvent.getVertexLocationHint());
+          }
+          stateChangeNotifier.stateChanged(vertexId,
+              new VertexStateUpdateParallelismUpdated(vertexName, numTasks, oldNumTasks));
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Recovered state for vertex after parallelism updated event"
+                + ", vertex=" + logIdentifier
+                + ", recoveredState=" + recoveredState);
+          }
+          return recoveredState;
+        case VERTEX_COMMIT_STARTED:
+          recoveryCommitInProgress = true;
+          hasCommitter = true;
+          return recoveredState;
+        case VERTEX_FINISHED:
+          VertexFinishedEvent finishedEvent = (VertexFinishedEvent) historyEvent;
+          if (finishedEvent.isFromSummary()) {
+            summaryCompleteSeen  = true;
+          } else {
+            vertexCompleteSeen = true;
+          }
+          numTasks = finishedEvent.getNumTasks();
+          recoveryCommitInProgress = false;
+          recoveredState = finishedEvent.getState();
+          diagnostics.add(finishedEvent.getDiagnostics());
+          finishTime = finishedEvent.getFinishTime();
+          // TODO counters ??
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Recovered state for vertex after finished event"
+                + ", vertex=" + logIdentifier
+                + ", recoveredState=" + recoveredState);
+          }
+          return recoveredState;
+        case VERTEX_DATA_MOVEMENT_EVENTS_GENERATED:
+          VertexRecoverableEventsGeneratedEvent vEvent =
+              (VertexRecoverableEventsGeneratedEvent) historyEvent;
+          this.recoveredEvents.addAll(vEvent.getTezEvents());
+          return recoveredState;
+        default:
+          throw new RuntimeException("Unexpected event received for restoring"
+              + " state, eventType=" + historyEvent.getEventType());
+      }
+    } finally {
+      writeLock.unlock();
     }
   }
 


Mime
View raw message