tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ss...@apache.org
Subject [06/35] tez git commit: TEZ-2224. EventQueue empty doesn't mean events are consumed in RecoveryService (zjffdu)
Date Tue, 07 Apr 2015 20:12:24 GMT
TEZ-2224. EventQueue empty doesn't mean events are consumed in RecoveryService (zjffdu)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/008f9bc1
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/008f9bc1
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/008f9bc1

Branch: refs/heads/TEZ-2003
Commit: 008f9bc1e5f6a37a1cdb71ba1527c5a477efc148
Parents: 3d5e13f
Author: Jeff Zhang <zjffdu@apache.org>
Authored: Tue Mar 31 09:20:17 2015 +0800
Committer: Jeff Zhang <zjffdu@apache.org>
Committed: Tue Mar 31 09:20:17 2015 +0800

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../dag/history/recovery/RecoveryService.java   | 89 +++++++++++++-------
 .../history/recovery/TestRecoveryService.java   | 81 ++++++++++++++++++
 .../org/apache/tez/test/TestAMRecovery.java     |  2 +-
 4 files changed, 141 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/008f9bc1/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index e91e4a2..92abe79 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -255,6 +255,7 @@ TEZ-UI CHANGES (TEZ-8):
 Release 0.5.4: Unreleased
 
 ALL CHANGES:
+  TEZ-2224. EventQueue empty doesn't mean events are consumed in RecoveryService
   TEZ-2240. Fix toUpperCase/toLowerCase to use Locale.ENGLISH.
   TEZ-2238. TestContainerReuse flaky
   TEZ-2217. The min-held-containers being released prematurely

http://git-wip-us.apache.org/repos/asf/tez/blob/008f9bc1/tez-dag/src/main/java/org/apache/tez/dag/history/recovery/RecoveryService.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/recovery/RecoveryService.java
b/tez-dag/src/main/java/org/apache/tez/dag/history/recovery/RecoveryService.java
index 23aecaa..4cdc99a 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/recovery/RecoveryService.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/recovery/RecoveryService.java
@@ -34,6 +34,8 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.service.AbstractService;
 import org.apache.tez.common.TezCommonUtils;
+import org.apache.tez.dag.api.ConfigurationScope;
+import org.apache.tez.dag.api.Scope;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.TezConstants;
 import org.apache.tez.dag.app.AppContext;
@@ -54,21 +56,19 @@ public class RecoveryService extends AbstractService {
 
   public static final String RECOVERY_FATAL_OCCURRED_DIR =
       "RecoveryFatalErrorOccurred";
-
   /**
    * whether to handle remaining event in the eventqueue when AM is stopped
    */
   @VisibleForTesting
-  public static final String TEZ_AM_RECOVERY_HANDLE_REMAINING_EVENT_WHEN_STOPPED =
-      TezConfiguration.TEZ_AM_PREFIX + "recovery.handle_remaining_event_when_stopped";
+  public static final String TEZ_TEST_RECOVERY_DRAIN_EVENTS_WHEN_STOPPED =
+      TezConfiguration.TEZ_PREFIX + "test.recovery.drain_event";
 
   /**
-   * by default do not handle remaining event when AM is stopped.
-   * Most of time, true is for recovery unit test
+   * by default handle remaining event when AM is stopped.
+   * This should be helpful for recovery
    */
   @VisibleForTesting
-  public static final boolean TEZ_AM_RECOVERY_HANDLE_REMAINING_EVENT_WHEN_STOPPED_DEFAULT
= false;
-
+  public static final boolean TEZ_TEST_RECOVERY_DRAIN_EVENTS_WHEN_STOPPED_DEFAULT = true;
 
   private LinkedBlockingQueue<DAGHistoryEvent> eventQueue =
       new LinkedBlockingQueue<DAGHistoryEvent>();
@@ -92,7 +92,12 @@ public class RecoveryService extends AbstractService {
   private int maxUnflushedEvents;
   private int flushInterval;
   private AtomicBoolean recoveryFatalErrorOccurred = new AtomicBoolean(false);
-  private boolean handleRemainingEventWhenStopped;
+  private boolean drainEventsFlag;
+
+  // Indicates all the remaining events on stop have been drained
+  // and processed.
+  private volatile boolean drained = true;
+  private Object waitForDrained = new Object();
 
   public RecoveryService(AppContext appContext) {
     super(RecoveryService.class.getName());
@@ -112,9 +117,9 @@ public class RecoveryService extends AbstractService {
     maxUnflushedEvents = conf.getInt(TezConfiguration.DAG_RECOVERY_MAX_UNFLUSHED_EVENTS,
         TezConfiguration.DAG_RECOVERY_MAX_UNFLUSHED_EVENTS_DEFAULT);
 
-    handleRemainingEventWhenStopped = conf.getBoolean(
-        TEZ_AM_RECOVERY_HANDLE_REMAINING_EVENT_WHEN_STOPPED,
-        TEZ_AM_RECOVERY_HANDLE_REMAINING_EVENT_WHEN_STOPPED_DEFAULT);
+    drainEventsFlag = conf.getBoolean(
+        TEZ_TEST_RECOVERY_DRAIN_EVENTS_WHEN_STOPPED,
+        TEZ_TEST_RECOVERY_DRAIN_EVENTS_WHEN_STOPPED_DEFAULT);
   }
 
   @Override
@@ -126,6 +131,16 @@ public class RecoveryService extends AbstractService {
       public void run() {
         DAGHistoryEvent event;
         while (!stopped.get() && !Thread.currentThread().isInterrupted()) {
+          drained = eventQueue.isEmpty();
+          // adding this service state check is to avoid the overhead of acquiring the lock
+          // and calling notify every time in the normal run of the loop.
+          if (getServiceState() == STATE.STOPPED) {
+            synchronized (waitForDrained) {
+              if (drained) {
+                waitForDrained.notify();
+              }
+            }
+          }
 
           if (recoveryFatalErrorOccurred.get()) {
             LOG.error("Recovery failure occurred. Stopping recovery thread."
@@ -170,27 +185,26 @@ public class RecoveryService extends AbstractService {
   }
 
   @Override
-  public void serviceStop() {
+  public void serviceStop() throws Exception {
     LOG.info("Stopping RecoveryService");
 
+    if (drainEventsFlag) {
+      LOG.info("Handle the remaining events in queue, queue size=" + eventQueue.size());
+      synchronized (waitForDrained) {
+        while (!drained && eventHandlingThread.isAlive()) {
+          waitForDrained.wait(1000);
+          LOG.info("Waiting for RecoveryEventHandlingThread to drain.");
+        }
+      }
+    }
+
     stopped.set(true);
     if (eventHandlingThread != null) {
       eventHandlingThread.interrupt();
-    }
-
-    if (handleRemainingEventWhenStopped) {
-      LOG.info("Handle the remaining events in queue, queue size=" + eventQueue.size());
-      while(!eventQueue.isEmpty()) {
-        synchronized (lock) {
-          try {
-            DAGHistoryEvent event = eventQueue.take();
-            handleRecoveryEvent(event);
-          } catch (Exception e) {
-            // For now, ignore any such errors as these are non-critical
-            // All summary event related errors are handled as critical
-            LOG.warn("Error handling recovery event", e);
-          }
-        }
+      try {
+        eventHandlingThread.join();
+      } catch (InterruptedException ie) {
+        LOG.warn("Interrupted Exception while stopping", ie);
       }
     }
 
@@ -214,6 +228,13 @@ public class RecoveryService extends AbstractService {
     }
   }
 
+  // ---------- IMPORTANT ----------------------
+  // ALWAYS USE THIS METHOD TO ADD EVENT TO QUEUE
+  private void addToEventQueue(DAGHistoryEvent event) {
+    drained = false;
+    eventQueue.add(event);
+  }
+
   public void handle(DAGHistoryEvent event) throws IOException {
     if (stopped.get()) {
       LOG.warn("Igoring event as service stopped, eventType"
@@ -229,7 +250,7 @@ public class RecoveryService extends AbstractService {
     if (!started.get()) {
       LOG.warn("Adding event of type " + eventType
           + " to queue as service not started");
-      eventQueue.add(event);
+      addToEventQueue(event);
       return;
     }
 
@@ -272,7 +293,7 @@ public class RecoveryService extends AbstractService {
               LOG.debug("Queueing Non-immediate Summary/Recovery event of type"
                   + eventType.name());
             }
-            eventQueue.add(event);
+            addToEventQueue(event);
           }
           if (eventType.equals(HistoryEventType.DAG_FINISHED)) {
             LOG.info("DAG completed"
@@ -320,7 +341,7 @@ public class RecoveryService extends AbstractService {
       if (LOG.isDebugEnabled()) {
         LOG.debug("Queueing Non-Summary Recovery event of type " + eventType.name());
       }
-      eventQueue.add(event);
+      addToEventQueue(event);
     }
   }
 
@@ -352,7 +373,8 @@ public class RecoveryService extends AbstractService {
     summaryEvent.toSummaryProtoStream(summaryStream);
   }
 
-  private void handleRecoveryEvent(DAGHistoryEvent event) throws IOException {
+  @VisibleForTesting
+  protected void handleRecoveryEvent(DAGHistoryEvent event) throws IOException {
     HistoryEventType eventType = event.getHistoryEvent().getEventType();
     if (LOG.isDebugEnabled()) {
       LOG.debug("Handling recovery event of type "
@@ -451,4 +473,9 @@ public class RecoveryService extends AbstractService {
     return recoveryFatalErrorOccurred.get();
   }
 
+  public void await() {
+    while (!this.drained) {
+      Thread.yield();
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/008f9bc1/tez-dag/src/test/java/org/apache/tez/dag/history/recovery/TestRecoveryService.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/history/recovery/TestRecoveryService.java
b/tez-dag/src/test/java/org/apache/tez/dag/history/recovery/TestRecoveryService.java
new file mode 100644
index 0000000..f10adfc
--- /dev/null
+++ b/tez-dag/src/test/java/org/apache/tez/dag/history/recovery/TestRecoveryService.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.dag.history.recovery;
+
+import java.io.IOException;
+import java.util.Random;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.util.SystemClock;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.app.AppContext;
+import org.apache.tez.dag.history.DAGHistoryEvent;
+import org.apache.tez.dag.history.events.TaskStartedEvent;
+import org.apache.tez.dag.records.TezDAGID;
+import org.apache.tez.dag.records.TezTaskID;
+import org.apache.tez.dag.records.TezVertexID;
+import org.junit.Test;
+
+import static org.mockito.Mockito.*;
+import static org.junit.Assert.*;
+
+public class TestRecoveryService {
+
+  private static String TEST_ROOT_DIR = "target" + Path.SEPARATOR
+      + TestRecoveryService.class.getName() + "-tmpDir";
+
+  @Test(timeout = 5000)
+  public void testDrainEvents() throws IOException {
+    Configuration conf = new Configuration();
+    AppContext appContext = mock(AppContext.class);
+    when(appContext.getCurrentRecoveryDir()).thenReturn(new Path(TEST_ROOT_DIR));
+    when(appContext.getClock()).thenReturn(new SystemClock());
+
+    MockRecoveryService recoveryService = new MockRecoveryService(appContext);
+    conf.setBoolean(RecoveryService.TEZ_TEST_RECOVERY_DRAIN_EVENTS_WHEN_STOPPED, true);
+    recoveryService.init(conf);
+    recoveryService.start();
+    TezDAGID dagId = TezDAGID.getInstance(ApplicationId.newInstance(System.currentTimeMillis(),
1),1);
+    int randEventCount = new Random().nextInt(100) + 100;
+    for (int i=0; i< randEventCount; ++i) {
+      recoveryService.handle(new DAGHistoryEvent(dagId,
+          new TaskStartedEvent(TezTaskID.getInstance(TezVertexID.getInstance(dagId, 1), 1),
"v1", 0L, 0L)));
+    }
+    recoveryService.stop();
+    assertEquals(randEventCount, recoveryService.processedRecoveryEventCounter.get());
+  }
+
+  private static class MockRecoveryService extends RecoveryService {
+
+    public AtomicInteger processedRecoveryEventCounter = new AtomicInteger(0);
+
+    public MockRecoveryService(AppContext appContext) {
+      super(appContext);
+    }
+
+    @Override
+    protected void handleRecoveryEvent(DAGHistoryEvent event)
+        throws IOException {
+      super.handleRecoveryEvent(event);
+      processedRecoveryEventCounter.addAndGet(1);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/008f9bc1/tez-tests/src/test/java/org/apache/tez/test/TestAMRecovery.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestAMRecovery.java b/tez-tests/src/test/java/org/apache/tez/test/TestAMRecovery.java
index b020055..66d8373 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/TestAMRecovery.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestAMRecovery.java
@@ -171,7 +171,7 @@ public class TestAMRecovery {
     tezConf.setBoolean(
         TezConfiguration.TEZ_AM_STAGING_SCRATCH_DATA_AUTO_DELETE, false);
     tezConf.setBoolean(
-        RecoveryService.TEZ_AM_RECOVERY_HANDLE_REMAINING_EVENT_WHEN_STOPPED,
+        RecoveryService.TEZ_TEST_RECOVERY_DRAIN_EVENTS_WHEN_STOPPED,
         true);
     tezSession = TezClient.create("TestDAGRecovery", tezConf);
     tezSession.start();


Mime
View raw message