Return-Path: X-Original-To: apmail-tez-commits-archive@minotaur.apache.org Delivered-To: apmail-tez-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id A4644179C8 for ; Tue, 31 Mar 2015 01:20:54 +0000 (UTC) Received: (qmail 78938 invoked by uid 500); 31 Mar 2015 01:20:42 -0000 Delivered-To: apmail-tez-commits-archive@tez.apache.org Received: (qmail 78905 invoked by uid 500); 31 Mar 2015 01:20:41 -0000 Mailing-List: contact commits-help@tez.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@tez.apache.org Delivered-To: mailing list commits@tez.apache.org Received: (qmail 78896 invoked by uid 99); 31 Mar 2015 01:20:41 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 31 Mar 2015 01:20:41 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 80D1FE17C9; Tue, 31 Mar 2015 01:20:41 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: zjffdu@apache.org To: commits@tez.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: tez git commit: TEZ-2224. EventQueue empty doesn't mean events are consumed in RecoveryService (zjffdu) Date: Tue, 31 Mar 2015 01:20:41 +0000 (UTC) Repository: tez Updated Branches: refs/heads/master 3d5e13f13 -> 008f9bc1e TEZ-2224. EventQueue empty doesn't mean events are consumed in RecoveryService (zjffdu) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/008f9bc1 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/008f9bc1 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/008f9bc1 Branch: refs/heads/master Commit: 008f9bc1e5f6a37a1cdb71ba1527c5a477efc148 Parents: 3d5e13f Author: Jeff Zhang Authored: Tue Mar 31 09:20:17 2015 +0800 Committer: Jeff Zhang Committed: Tue Mar 31 09:20:17 2015 +0800 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../dag/history/recovery/RecoveryService.java | 89 +++++++++++++------- .../history/recovery/TestRecoveryService.java | 81 ++++++++++++++++++ .../org/apache/tez/test/TestAMRecovery.java | 2 +- 4 files changed, 141 insertions(+), 32 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/008f9bc1/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index e91e4a2..92abe79 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -255,6 +255,7 @@ TEZ-UI CHANGES (TEZ-8): Release 0.5.4: Unreleased ALL CHANGES: + TEZ-2224. EventQueue empty doesn't mean events are consumed in RecoveryService TEZ-2240. Fix toUpperCase/toLowerCase to use Locale.ENGLISH. TEZ-2238. TestContainerReuse flaky TEZ-2217. The min-held-containers being released prematurely http://git-wip-us.apache.org/repos/asf/tez/blob/008f9bc1/tez-dag/src/main/java/org/apache/tez/dag/history/recovery/RecoveryService.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/recovery/RecoveryService.java b/tez-dag/src/main/java/org/apache/tez/dag/history/recovery/RecoveryService.java index 23aecaa..4cdc99a 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/history/recovery/RecoveryService.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/history/recovery/RecoveryService.java @@ -34,6 +34,8 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.service.AbstractService; import org.apache.tez.common.TezCommonUtils; +import org.apache.tez.dag.api.ConfigurationScope; +import org.apache.tez.dag.api.Scope; import org.apache.tez.dag.api.TezConfiguration; import org.apache.tez.dag.api.TezConstants; import org.apache.tez.dag.app.AppContext; @@ -54,21 +56,19 @@ public class RecoveryService extends AbstractService { public static final String RECOVERY_FATAL_OCCURRED_DIR = "RecoveryFatalErrorOccurred"; - /** * whether to handle remaining event in the eventqueue when AM is stopped */ @VisibleForTesting - public static final String TEZ_AM_RECOVERY_HANDLE_REMAINING_EVENT_WHEN_STOPPED = - TezConfiguration.TEZ_AM_PREFIX + "recovery.handle_remaining_event_when_stopped"; + public static final String TEZ_TEST_RECOVERY_DRAIN_EVENTS_WHEN_STOPPED = + TezConfiguration.TEZ_PREFIX + "test.recovery.drain_event"; /** - * by default do not handle remaining event when AM is stopped. - * Most of time, true is for recovery unit test + * by default handle remaining event when AM is stopped. + * This should be helpful for recovery */ @VisibleForTesting - public static final boolean TEZ_AM_RECOVERY_HANDLE_REMAINING_EVENT_WHEN_STOPPED_DEFAULT = false; - + public static final boolean TEZ_TEST_RECOVERY_DRAIN_EVENTS_WHEN_STOPPED_DEFAULT = true; private LinkedBlockingQueue eventQueue = new LinkedBlockingQueue(); @@ -92,7 +92,12 @@ public class RecoveryService extends AbstractService { private int maxUnflushedEvents; private int flushInterval; private AtomicBoolean recoveryFatalErrorOccurred = new AtomicBoolean(false); - private boolean handleRemainingEventWhenStopped; + private boolean drainEventsFlag; + + // Indicates all the remaining events on stop have been drained + // and processed. + private volatile boolean drained = true; + private Object waitForDrained = new Object(); public RecoveryService(AppContext appContext) { super(RecoveryService.class.getName()); @@ -112,9 +117,9 @@ public class RecoveryService extends AbstractService { maxUnflushedEvents = conf.getInt(TezConfiguration.DAG_RECOVERY_MAX_UNFLUSHED_EVENTS, TezConfiguration.DAG_RECOVERY_MAX_UNFLUSHED_EVENTS_DEFAULT); - handleRemainingEventWhenStopped = conf.getBoolean( - TEZ_AM_RECOVERY_HANDLE_REMAINING_EVENT_WHEN_STOPPED, - TEZ_AM_RECOVERY_HANDLE_REMAINING_EVENT_WHEN_STOPPED_DEFAULT); + drainEventsFlag = conf.getBoolean( + TEZ_TEST_RECOVERY_DRAIN_EVENTS_WHEN_STOPPED, + TEZ_TEST_RECOVERY_DRAIN_EVENTS_WHEN_STOPPED_DEFAULT); } @Override @@ -126,6 +131,16 @@ public class RecoveryService extends AbstractService { public void run() { DAGHistoryEvent event; while (!stopped.get() && !Thread.currentThread().isInterrupted()) { + drained = eventQueue.isEmpty(); + // adding this service state check is to avoid the overhead of acquiring the lock + // and calling notify every time in the normal run of the loop. + if (getServiceState() == STATE.STOPPED) { + synchronized (waitForDrained) { + if (drained) { + waitForDrained.notify(); + } + } + } if (recoveryFatalErrorOccurred.get()) { LOG.error("Recovery failure occurred. Stopping recovery thread." @@ -170,27 +185,26 @@ public class RecoveryService extends AbstractService { } @Override - public void serviceStop() { + public void serviceStop() throws Exception { LOG.info("Stopping RecoveryService"); + if (drainEventsFlag) { + LOG.info("Handle the remaining events in queue, queue size=" + eventQueue.size()); + synchronized (waitForDrained) { + while (!drained && eventHandlingThread.isAlive()) { + waitForDrained.wait(1000); + LOG.info("Waiting for RecoveryEventHandlingThread to drain."); + } + } + } + stopped.set(true); if (eventHandlingThread != null) { eventHandlingThread.interrupt(); - } - - if (handleRemainingEventWhenStopped) { - LOG.info("Handle the remaining events in queue, queue size=" + eventQueue.size()); - while(!eventQueue.isEmpty()) { - synchronized (lock) { - try { - DAGHistoryEvent event = eventQueue.take(); - handleRecoveryEvent(event); - } catch (Exception e) { - // For now, ignore any such errors as these are non-critical - // All summary event related errors are handled as critical - LOG.warn("Error handling recovery event", e); - } - } + try { + eventHandlingThread.join(); + } catch (InterruptedException ie) { + LOG.warn("Interrupted Exception while stopping", ie); } } @@ -214,6 +228,13 @@ public class RecoveryService extends AbstractService { } } + // ---------- IMPORTANT ---------------------- + // ALWAYS USE THIS METHOD TO ADD EVENT TO QUEUE + private void addToEventQueue(DAGHistoryEvent event) { + drained = false; + eventQueue.add(event); + } + public void handle(DAGHistoryEvent event) throws IOException { if (stopped.get()) { LOG.warn("Igoring event as service stopped, eventType" @@ -229,7 +250,7 @@ public class RecoveryService extends AbstractService { if (!started.get()) { LOG.warn("Adding event of type " + eventType + " to queue as service not started"); - eventQueue.add(event); + addToEventQueue(event); return; } @@ -272,7 +293,7 @@ public class RecoveryService extends AbstractService { LOG.debug("Queueing Non-immediate Summary/Recovery event of type" + eventType.name()); } - eventQueue.add(event); + addToEventQueue(event); } if (eventType.equals(HistoryEventType.DAG_FINISHED)) { LOG.info("DAG completed" @@ -320,7 +341,7 @@ public class RecoveryService extends AbstractService { if (LOG.isDebugEnabled()) { LOG.debug("Queueing Non-Summary Recovery event of type " + eventType.name()); } - eventQueue.add(event); + addToEventQueue(event); } } @@ -352,7 +373,8 @@ public class RecoveryService extends AbstractService { summaryEvent.toSummaryProtoStream(summaryStream); } - private void handleRecoveryEvent(DAGHistoryEvent event) throws IOException { + @VisibleForTesting + protected void handleRecoveryEvent(DAGHistoryEvent event) throws IOException { HistoryEventType eventType = event.getHistoryEvent().getEventType(); if (LOG.isDebugEnabled()) { LOG.debug("Handling recovery event of type " @@ -451,4 +473,9 @@ public class RecoveryService extends AbstractService { return recoveryFatalErrorOccurred.get(); } + public void await() { + while (!this.drained) { + Thread.yield(); + } + } } http://git-wip-us.apache.org/repos/asf/tez/blob/008f9bc1/tez-dag/src/test/java/org/apache/tez/dag/history/recovery/TestRecoveryService.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/history/recovery/TestRecoveryService.java b/tez-dag/src/test/java/org/apache/tez/dag/history/recovery/TestRecoveryService.java new file mode 100644 index 0000000..f10adfc --- /dev/null +++ b/tez-dag/src/test/java/org/apache/tez/dag/history/recovery/TestRecoveryService.java @@ -0,0 +1,81 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.tez.dag.history.recovery; + +import java.io.IOException; +import java.util.Random; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.util.SystemClock; +import org.apache.tez.dag.api.TezConfiguration; +import org.apache.tez.dag.app.AppContext; +import org.apache.tez.dag.history.DAGHistoryEvent; +import org.apache.tez.dag.history.events.TaskStartedEvent; +import org.apache.tez.dag.records.TezDAGID; +import org.apache.tez.dag.records.TezTaskID; +import org.apache.tez.dag.records.TezVertexID; +import org.junit.Test; + +import static org.mockito.Mockito.*; +import static org.junit.Assert.*; + +public class TestRecoveryService { + + private static String TEST_ROOT_DIR = "target" + Path.SEPARATOR + + TestRecoveryService.class.getName() + "-tmpDir"; + + @Test(timeout = 5000) + public void testDrainEvents() throws IOException { + Configuration conf = new Configuration(); + AppContext appContext = mock(AppContext.class); + when(appContext.getCurrentRecoveryDir()).thenReturn(new Path(TEST_ROOT_DIR)); + when(appContext.getClock()).thenReturn(new SystemClock()); + + MockRecoveryService recoveryService = new MockRecoveryService(appContext); + conf.setBoolean(RecoveryService.TEZ_TEST_RECOVERY_DRAIN_EVENTS_WHEN_STOPPED, true); + recoveryService.init(conf); + recoveryService.start(); + TezDAGID dagId = TezDAGID.getInstance(ApplicationId.newInstance(System.currentTimeMillis(), 1),1); + int randEventCount = new Random().nextInt(100) + 100; + for (int i=0; i< randEventCount; ++i) { + recoveryService.handle(new DAGHistoryEvent(dagId, + new TaskStartedEvent(TezTaskID.getInstance(TezVertexID.getInstance(dagId, 1), 1), "v1", 0L, 0L))); + } + recoveryService.stop(); + assertEquals(randEventCount, recoveryService.processedRecoveryEventCounter.get()); + } + + private static class MockRecoveryService extends RecoveryService { + + public AtomicInteger processedRecoveryEventCounter = new AtomicInteger(0); + + public MockRecoveryService(AppContext appContext) { + super(appContext); + } + + @Override + protected void handleRecoveryEvent(DAGHistoryEvent event) + throws IOException { + super.handleRecoveryEvent(event); + processedRecoveryEventCounter.addAndGet(1); + } + } +} http://git-wip-us.apache.org/repos/asf/tez/blob/008f9bc1/tez-tests/src/test/java/org/apache/tez/test/TestAMRecovery.java ---------------------------------------------------------------------- diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestAMRecovery.java b/tez-tests/src/test/java/org/apache/tez/test/TestAMRecovery.java index b020055..66d8373 100644 --- a/tez-tests/src/test/java/org/apache/tez/test/TestAMRecovery.java +++ b/tez-tests/src/test/java/org/apache/tez/test/TestAMRecovery.java @@ -171,7 +171,7 @@ public class TestAMRecovery { tezConf.setBoolean( TezConfiguration.TEZ_AM_STAGING_SCRATCH_DATA_AUTO_DELETE, false); tezConf.setBoolean( - RecoveryService.TEZ_AM_RECOVERY_HANDLE_REMAINING_EVENT_WHEN_STOPPED, + RecoveryService.TEZ_TEST_RECOVERY_DRAIN_EVENTS_WHEN_STOPPED, true); tezSession = TezClient.create("TestDAGRecovery", tezConf); tezSession.start();