Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id BD961200B64 for ; Tue, 2 Aug 2016 17:21:09 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id BC1F4160AB0; Tue, 2 Aug 2016 15:21:09 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 0CF48160AA8 for ; Tue, 2 Aug 2016 17:21:08 +0200 (CEST) Received: (qmail 81325 invoked by uid 500); 2 Aug 2016 15:20:59 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 80270 invoked by uid 99); 2 Aug 2016 15:20:58 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 02 Aug 2016 15:20:58 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 75F25EEB01; Tue, 2 Aug 2016 15:20:58 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: jianhe@apache.org To: common-commits@hadoop.apache.org Date: Tue, 02 Aug 2016 15:21:16 -0000 Message-Id: <469c51daa230467dba88dc6e67b91871@git.apache.org> In-Reply-To: <0b8347fdc73c480b8d5c2ea4ac794f36@git.apache.org> References: <0b8347fdc73c480b8d5c2ea4ac794f36@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [19/40] hadoop git commit: YARN-5436. Race in AsyncDispatcher can cause random test failures in Tez (probably YARN also). (Zhiyuan Yang via gtcarrera9) archived-at: Tue, 02 Aug 2016 15:21:09 -0000 YARN-5436. Race in AsyncDispatcher can cause random test failures in Tez (probably YARN also). (Zhiyuan Yang via gtcarrera9) Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/7086fc72 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/7086fc72 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/7086fc72 Branch: refs/heads/yarn-native-services Commit: 7086fc72eebc41fd174d91839ed703c014aac920 Parents: d9aae22 Author: Li Lu Authored: Thu Jul 28 16:50:57 2016 -0700 Committer: Li Lu Committed: Thu Jul 28 16:50:57 2016 -0700 ---------------------------------------------------------------------- .../hadoop/yarn/event/AsyncDispatcher.java | 8 ++- .../hadoop/yarn/event/DrainDispatcher.java | 53 +++++++++++++++++++- 2 files changed, 55 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/7086fc72/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java index f5361c8..5dea1c8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java @@ -59,6 +59,9 @@ public class AsyncDispatcher extends AbstractService implements Dispatcher { // Indicates all the remaining dispatcher's events on stop have been drained // and processed. + // Race condition happens if dispatcher thread sets drained to true between + // handler setting drained to false and enqueueing event. YARN-3878 decided + // to ignore it because of its tiny impact. Also see YARN-5436. private volatile boolean drained = true; private final Object waitForDrained = new Object(); @@ -300,9 +303,4 @@ public class AsyncDispatcher extends AbstractService implements Dispatcher { protected boolean isEventThreadWaiting() { return eventHandlingThread.getState() == Thread.State.WAITING; } - - @VisibleForTesting - protected boolean isDrained() { - return this.drained; - } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/7086fc72/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/DrainDispatcher.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/DrainDispatcher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/DrainDispatcher.java index e4a5a82..cf4b1b5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/DrainDispatcher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/DrainDispatcher.java @@ -22,6 +22,10 @@ import java.util.concurrent.LinkedBlockingQueue; @SuppressWarnings("rawtypes") public class DrainDispatcher extends AsyncDispatcher { + private volatile boolean drained = false; + private volatile boolean stopped = false; + private final BlockingQueue queue; + private final Object mutex; public DrainDispatcher() { this(new LinkedBlockingQueue()); @@ -29,6 +33,8 @@ public class DrainDispatcher extends AsyncDispatcher { public DrainDispatcher(BlockingQueue eventQueue) { super(eventQueue); + this.queue = eventQueue; + this.mutex = this; } /** @@ -44,8 +50,53 @@ public class DrainDispatcher extends AsyncDispatcher { * Busy loop waiting for all queued events to drain. */ public void await() { - while (!isDrained()) { + while (!drained) { Thread.yield(); } } + + @Override + Runnable createThread() { + return new Runnable() { + @Override + public void run() { + while (!stopped && !Thread.currentThread().isInterrupted()) { + synchronized (mutex) { + // !drained if dispatch queued new events on this dispatcher + drained = queue.isEmpty(); + } + Event event; + try { + event = queue.take(); + } catch (InterruptedException ie) { + return; + } + if (event != null) { + dispatch(event); + } + } + } + }; + } + + @SuppressWarnings("unchecked") + @Override + public EventHandler getEventHandler() { + final EventHandler actual = super.getEventHandler(); + return new EventHandler() { + @Override + public void handle(Event event) { + synchronized (mutex) { + actual.handle(event); + drained = false; + } + } + }; + } + + @Override + protected void serviceStop() throws Exception { + stopped = true; + super.serviceStop(); + } } --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org For additional commands, e-mail: common-commits-help@hadoop.apache.org