tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zjf...@apache.org
Subject tez git commit: TEZ-2758. Remove append API in RecoveryService after TEZ-1909 (zjffdu)
Date Wed, 30 Sep 2015 02:32:07 GMT
Repository: tez
Updated Branches:
  refs/heads/branch-0.6 aa4806544 -> 7514b157e


TEZ-2758. Remove append API in RecoveryService after TEZ-1909 (zjffdu)

(cherry picked from commit 35c926f238ec456c7ddf7e8ca47616c89cf68695)

Conflicts:
	tez-dag/src/main/java/org/apache/tez/dag/history/recovery/RecoveryService.java


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/7514b157
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/7514b157
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/7514b157

Branch: refs/heads/branch-0.6
Commit: 7514b157ec44b9e356a2579671e56e1bf1997f15
Parents: aa48065
Author: Jeff Zhang <zjffdu@apache.org>
Authored: Wed Sep 30 10:25:21 2015 +0800
Committer: Jeff Zhang <zjffdu@apache.org>
Committed: Wed Sep 30 10:31:44 2015 +0800

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../dag/history/recovery/RecoveryService.java   |  57 +++++-----
 .../history/recovery/TestRecoveryService.java   | 110 ++++++++++++++++++-
 3 files changed, 141 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/7514b157/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index e0b76ae..1209e22 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -6,6 +6,7 @@ Release 0.6.3: Unreleased
 INCOMPATIBLE CHANGES
 
 ALL CHANGES:
+  TEZ-2758. Remove append API in RecoveryService after TEZ-1909.
   TEZ-2851. Support a way for upstream applications to pass in a caller context to Tez.
   TEZ-2398. Flaky test: TestFaultTolerance
   TEZ-2808. Race condition between preemption and container assignment

http://git-wip-us.apache.org/repos/asf/tez/blob/7514b157/tez-dag/src/main/java/org/apache/tez/dag/history/recovery/RecoveryService.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/recovery/RecoveryService.java
b/tez-dag/src/main/java/org/apache/tez/dag/history/recovery/RecoveryService.java
index 8a07211..6dcced3 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/recovery/RecoveryService.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/recovery/RecoveryService.java
@@ -304,11 +304,13 @@ public class RecoveryService extends AbstractService {
         try {
           SummaryEvent summaryEvent = (SummaryEvent) event.getHistoryEvent();
           handleSummaryEvent(dagId, eventType, summaryEvent);
-          summaryStream.hflush();
           if (summaryEvent.writeToRecoveryImmediately()) {
             handleRecoveryEvent(event);
-            doFlush(outputStreamMap.get(event.getDagID()),
-                appContext.getClock().getTime());
+            // outputStream may already be closed and removed
+            if (outputStreamMap.containsKey(event.getDagID())) {
+              doFlush(outputStreamMap.get(event.getDagID()),
+                  appContext.getClock().getTime());
+            }
           } else {
             if (LOG.isDebugEnabled()) {
               LOG.debug("Queueing Non-immediate Summary/Recovery event of type"
@@ -334,23 +336,7 @@ public class RecoveryService extends AbstractService {
         } catch (IOException ioe) {
           LOG.error("Error handling summary event"
               + ", eventType=" + event.getHistoryEvent().getEventType(), ioe);
-          Path fatalErrorDir = new Path(recoveryPath, RECOVERY_FATAL_OCCURRED_DIR);
-          try {
-            LOG.error("Adding a flag to ensure next AM attempt does not start up"
-                + ", flagFile=" + fatalErrorDir.toString());
-            recoveryFatalErrorOccurred.set(true);
-            recoveryDirFS.mkdirs(fatalErrorDir);
-            if (recoveryDirFS.exists(fatalErrorDir)) {
-              LOG.error("Recovery failure occurred. Skipping all events");
-            } else {
-              // throw error if fatal error flag could not be set
-              throw ioe;
-            }
-          } catch (IOException e) {
-            LOG.fatal("Failed to create fatal error flag dir "
-                + fatalErrorDir.toString(), e);
-            throw ioe;
-          }
+          createFatalErrorFlagDir();
           if (eventType.equals(HistoryEventType.DAG_SUBMITTED)) {
             // Throw error to tell client that dag submission failed
             throw ioe;
@@ -366,6 +352,26 @@ public class RecoveryService extends AbstractService {
     }
   }
 
+  private void createFatalErrorFlagDir() throws IOException {
+    Path fatalErrorDir = new Path(recoveryPath, RECOVERY_FATAL_OCCURRED_DIR);
+    try {
+      LOG.error("Adding a flag to ensure next AM attempt does not start up"
+          + ", flagFile=" + fatalErrorDir.toString());
+      recoveryFatalErrorOccurred.set(true);
+      recoveryDirFS.mkdirs(fatalErrorDir);
+      if (recoveryDirFS.exists(fatalErrorDir)) {
+        LOG.error("Recovery failure occurred. Skipping all events");
+      } else {
+        // throw error if fatal error flag could not be set
+        throw new IOException("Failed to create fatal error flag dir "
+            + fatalErrorDir.toString());
+      }
+    } catch (IOException e) {
+      LOG.error("Failed to create fatal error flag dir "
+          + fatalErrorDir.toString(), e);
+    }
+  }
+
   private void handleSummaryEvent(TezDAGID dagID,
       HistoryEventType eventType,
       SummaryEvent summaryEvent) throws IOException {
@@ -383,7 +389,8 @@ public class RecoveryService extends AbstractService {
         summaryStream = recoveryDirFS.create(summaryPath, false,
             bufferSize);
       } else {
-        summaryStream = recoveryDirFS.append(summaryPath, bufferSize);
+        createFatalErrorFlagDir();
+        return;
       }
     }
     if (LOG.isDebugEnabled()) {
@@ -392,6 +399,7 @@ public class RecoveryService extends AbstractService {
           + ", eventType=" + eventType);
     }
     summaryEvent.toSummaryProtoStream(summaryStream);
+    summaryStream.hflush();
   }
 
   @VisibleForTesting
@@ -419,11 +427,8 @@ public class RecoveryService extends AbstractService {
       Path dagFilePath = TezCommonUtils.getDAGRecoveryPath(recoveryPath, dagID.toString());
       FSDataOutputStream outputStream;
       if (recoveryDirFS.exists(dagFilePath)) {
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Opening DAG recovery file in append mode"
-              + ", filePath=" + dagFilePath);
-        }
-        outputStream = recoveryDirFS.append(dagFilePath, bufferSize);
+        createFatalErrorFlagDir();
+        return;
       } else {
         if (LOG.isDebugEnabled()) {
           LOG.debug("Opening DAG recovery file in create mode"

http://git-wip-us.apache.org/repos/asf/tez/blob/7514b157/tez-dag/src/test/java/org/apache/tez/dag/history/recovery/TestRecoveryService.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/history/recovery/TestRecoveryService.java
b/tez-dag/src/test/java/org/apache/tez/dag/history/recovery/TestRecoveryService.java
index f10adfc..040b407 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/history/recovery/TestRecoveryService.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/history/recovery/TestRecoveryService.java
@@ -22,16 +22,21 @@ import java.util.Random;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.util.SystemClock;
-import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.common.TezCommonUtils;
 import org.apache.tez.dag.app.AppContext;
+import org.apache.tez.dag.app.dag.DAGState;
 import org.apache.tez.dag.history.DAGHistoryEvent;
+import org.apache.tez.dag.history.events.DAGFinishedEvent;
 import org.apache.tez.dag.history.events.TaskStartedEvent;
 import org.apache.tez.dag.records.TezDAGID;
 import org.apache.tez.dag.records.TezTaskID;
 import org.apache.tez.dag.records.TezVertexID;
+import org.junit.Before;
 import org.junit.Test;
 
 import static org.mockito.Mockito.*;
@@ -42,6 +47,15 @@ public class TestRecoveryService {
   private static String TEST_ROOT_DIR = "target" + Path.SEPARATOR
       + TestRecoveryService.class.getName() + "-tmpDir";
 
+  private Configuration conf;
+
+  @Before
+  public void setUp() throws IllegalArgumentException, IOException {
+    this.conf = new Configuration();
+    FileSystem localFS = FileSystem.getLocal(conf);
+    localFS.delete(new Path(TEST_ROOT_DIR), true);
+  }
+
   @Test(timeout = 5000)
   public void testDrainEvents() throws IOException {
     Configuration conf = new Configuration();
@@ -63,6 +77,100 @@ public class TestRecoveryService {
     assertEquals(randEventCount, recoveryService.processedRecoveryEventCounter.get());
   }
 
+  @Test(timeout = 5000)
+  public void testMultipleDAGFinishedEvent() throws IOException {
+    Configuration conf = new Configuration();
+    ApplicationId appId = ApplicationId.newInstance(System.currentTimeMillis(), 1);
+    ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(appId, 1);
+    AppContext appContext = mock(AppContext.class);
+    when(appContext.getCurrentRecoveryDir()).thenReturn(new Path(TEST_ROOT_DIR));
+    when(appContext.getClock()).thenReturn(new SystemClock());
+
+    MockRecoveryService recoveryService = new MockRecoveryService(appContext);
+    conf.setBoolean(RecoveryService.TEZ_TEST_RECOVERY_DRAIN_EVENTS_WHEN_STOPPED, true);
+    recoveryService.init(conf);
+    recoveryService.start();
+    TezDAGID dagId = TezDAGID.getInstance(ApplicationId.newInstance(System.currentTimeMillis(),
1),1);
+    int randEventCount = new Random().nextInt(100) + 100;
+    for (int i=0; i< randEventCount; ++i) {
+      recoveryService.handle(new DAGHistoryEvent(dagId,
+          new TaskStartedEvent(TezTaskID.getInstance(TezVertexID.getInstance(dagId, 1), 1),
"v1", 0L, 0L)));
+    }
+    recoveryService.await();
+    assertTrue(recoveryService.outputStreamMap.containsKey(dagId));
+    // 2 DAGFinishedEvent
+    recoveryService.handle(new DAGHistoryEvent(dagId,
+        new DAGFinishedEvent(dagId, 1L, 2L, DAGState.FAILED, "diag", null, "user", "dag1",
null,
+            appAttemptId)));
+    // outputStream removed
+    assertFalse(recoveryService.outputStreamMap.containsKey(dagId));
+    recoveryService.handle(new DAGHistoryEvent(dagId,
+        new DAGFinishedEvent(dagId, 1L, 2L, DAGState.ERROR, "diag", null, "user", "dag1",
null,
+            appAttemptId)));
+    // no new outputStream opened
+    assertEquals(recoveryService.outputStreamMap.size(), 0);
+    assertFalse(recoveryService.outputStreamMap.containsKey(dagId));
+    recoveryService.stop();
+  }
+
+  @Test(timeout = 5000)
+  public void testSummaryPathExisted() throws IOException {
+    Configuration conf = new Configuration();
+    ApplicationId appId = ApplicationId.newInstance(System.currentTimeMillis(), 1);
+    ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(appId, 1);
+    AppContext appContext = mock(AppContext.class);
+    when(appContext.getCurrentRecoveryDir()).thenReturn(new Path(TEST_ROOT_DIR));
+    when(appContext.getClock()).thenReturn(new SystemClock());
+
+    MockRecoveryService recoveryService = new MockRecoveryService(appContext);
+    conf.setBoolean(RecoveryService.TEZ_TEST_RECOVERY_DRAIN_EVENTS_WHEN_STOPPED, true);
+    recoveryService.init(conf);
+    recoveryService.start();
+    TezDAGID dagId = TezDAGID.getInstance(ApplicationId.newInstance(System.currentTimeMillis(),
1),1);
+    Path dagRecoveryPath = TezCommonUtils.getSummaryRecoveryPath(recoveryService.recoveryPath);
+    touchFile(dagRecoveryPath);
+    assertFalse(recoveryService.hasRecoveryFailed());
+    recoveryService.handle(new DAGHistoryEvent(dagId,
+        new DAGFinishedEvent(dagId, 1L, 2L, DAGState.ERROR, "diag", null, "user", "dag1",
null,
+            appAttemptId)));
+    assertTrue(recoveryService.hasRecoveryFailed());
+    // be able to handle event after fatal error
+    recoveryService.handle(new DAGHistoryEvent(dagId,
+        new DAGFinishedEvent(dagId, 1L, 2L, DAGState.ERROR, "diag", null, "user", "dag1",
null,
+            appAttemptId)));
+  }
+
+  @Test(timeout = 5000)
+  public void testRecoveryPathExisted() throws IOException {
+    Configuration conf = new Configuration();
+    ApplicationId appId = ApplicationId.newInstance(System.currentTimeMillis(), 1);
+    AppContext appContext = mock(AppContext.class);
+    when(appContext.getCurrentRecoveryDir()).thenReturn(new Path(TEST_ROOT_DIR));
+    when(appContext.getClock()).thenReturn(new SystemClock());
+
+    MockRecoveryService recoveryService = new MockRecoveryService(appContext);
+    conf.setBoolean(RecoveryService.TEZ_TEST_RECOVERY_DRAIN_EVENTS_WHEN_STOPPED, true);
+    recoveryService.init(conf);
+    recoveryService.start();
+    TezDAGID dagId = TezDAGID.getInstance(ApplicationId.newInstance(System.currentTimeMillis(),
1),1);
+    Path dagRecoveryPath = TezCommonUtils.getDAGRecoveryPath(recoveryService.recoveryPath,
dagId.toString());
+    touchFile(dagRecoveryPath);
+    assertFalse(recoveryService.hasRecoveryFailed());
+    recoveryService.handle(new DAGHistoryEvent(dagId,
+        new TaskStartedEvent(TezTaskID.getInstance(TezVertexID.getInstance(dagId, 1), 1),
"v1", 0L, 0L)));
+    // wait for recovery event to be handled
+    recoveryService.await();
+    assertTrue(recoveryService.hasRecoveryFailed());
+    // be able to handle recovery event after fatal error
+    recoveryService.handle(new DAGHistoryEvent(dagId,
+        new TaskStartedEvent(TezTaskID.getInstance(TezVertexID.getInstance(dagId, 1), 1),
"v1", 0L, 0L)));
+  }
+
+  private void touchFile(Path path) throws IOException {
+    FileSystem fs = FileSystem.getLocal(new Configuration());
+    fs.create(path).close();
+  }
+
   private static class MockRecoveryService extends RecoveryService {
 
     public AtomicInteger processedRecoveryEventCounter = new AtomicInteger(0);


Mime
View raw message