Return-Path: X-Original-To: apmail-tez-commits-archive@minotaur.apache.org Delivered-To: apmail-tez-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 9E59217A63 for ; Fri, 26 Sep 2014 22:32:18 +0000 (UTC) Received: (qmail 83096 invoked by uid 500); 26 Sep 2014 22:32:18 -0000 Delivered-To: apmail-tez-commits-archive@tez.apache.org Received: (qmail 83060 invoked by uid 500); 26 Sep 2014 22:32:18 -0000 Mailing-List: contact commits-help@tez.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@tez.apache.org Delivered-To: mailing list commits@tez.apache.org Received: (qmail 83051 invoked by uid 99); 26 Sep 2014 22:32:18 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 26 Sep 2014 22:32:18 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 145859B6175; Fri, 26 Sep 2014 22:32:17 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: rbalamohan@apache.org To: commits@tez.apache.org Message-Id: <9c0e7c2e7e284735a6eaf751722ddbbb@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: git commit: TEZ-1624. Flaky tests in TestContainerReuse due to race condition in DelayedContainerManager thread (Rajesh Balamohan) Date: Fri, 26 Sep 2014 22:32:17 +0000 (UTC) Repository: tez Updated Branches: refs/heads/master a56f9ef4e -> 160a5d80f TEZ-1624. Flaky tests in TestContainerReuse due to race condition in DelayedContainerManager thread (Rajesh Balamohan) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/160a5d80 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/160a5d80 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/160a5d80 Branch: refs/heads/master Commit: 160a5d80fd108dcf597c734ceb3d0e23ff432f6e Parents: a56f9ef Author: Rajesh Balamohan Authored: Sat Sep 27 04:01:41 2014 +0530 Committer: Rajesh Balamohan Committed: Sat Sep 27 04:01:41 2014 +0530 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../dag/app/rm/YarnTaskSchedulerService.java | 122 +++++++++---------- 2 files changed, 62 insertions(+), 61 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/160a5d80/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 93cd3ae..e96d7c6 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -69,6 +69,7 @@ ALL CHANGES TEZ-978. Enhance auto parallelism tuning for queries having empty outputs or data skewness TEZ-1433. Invalid credentials can be used when a DAG is submitted to a session which has timed out + TEZ-1624. Flaky tests in TestContainerReuse due to race condition in DelayedContainerManager thread Release 0.5.0: 2014-09-03 http://git-wip-us.apache.org/repos/asf/tez/blob/160a5d80/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java index 5bd476c..75d62f1 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java @@ -1714,78 +1714,77 @@ public class YarnTaskSchedulerService extends TaskSchedulerService // Try allocating containers which have timed out. // Required since these containers may get assigned without // locality at this point. - if (delayedContainers.peek() == null) { - try { - // test only signaling to make TestTaskScheduler work - if (drainedDelayedContainersForTest != null) { - drainedDelayedContainersForTest.set(true); - synchronized (drainedDelayedContainersForTest) { - drainedDelayedContainersForTest.notifyAll(); + synchronized(this) { + if (delayedContainers.peek() == null) { + try { + // test only signaling to make TestTaskScheduler work + if (drainedDelayedContainersForTest != null) { + drainedDelayedContainersForTest.set(true); + synchronized (drainedDelayedContainersForTest) { + drainedDelayedContainersForTest.notifyAll(); + } } - } - synchronized(this) { this.wait(); - } - // Re-loop to see if tryAssignAll is set. - continue; - } catch (InterruptedException e) { - LOG.info("AllocatedContainerManager Thread interrupted"); - } - } else { - // test only sleep to prevent tight loop cycling that makes tests stall - if (drainedDelayedContainersForTest != null) { - try { - Thread.sleep(100); + // Re-loop to see if tryAssignAll is set. + continue; } catch (InterruptedException e) { - e.printStackTrace(); + LOG.info("AllocatedContainerManager Thread interrupted"); } } - HeldContainer delayedContainer = delayedContainers.peek(); - if (delayedContainer == null) { - continue; + } + // test only sleep to prevent tight loop cycling that makes tests stall + if (drainedDelayedContainersForTest != null) { + try { + Thread.sleep(100); + } catch (InterruptedException e) { + e.printStackTrace(); } - if (LOG.isDebugEnabled()) { - LOG.debug("Considering HeldContainer: " + } + HeldContainer delayedContainer = delayedContainers.peek(); + if (delayedContainer == null) { + continue; + } + if (LOG.isDebugEnabled()) { + LOG.debug("Considering HeldContainer: " + delayedContainer + " for assignment"); + } + long currentTs = System.currentTimeMillis(); + long nextScheduleTs = delayedContainer.getNextScheduleTime(); + if (currentTs >= nextScheduleTs) { + // Remove the container and try scheduling it. + // TEZ-587 what if container is released by RM after this + // in onContainerCompleted() + delayedContainer = delayedContainers.poll(); + if (delayedContainer == null) { + continue; } - long currentTs = System.currentTimeMillis(); - long nextScheduleTs = delayedContainer.getNextScheduleTime(); - if (currentTs >= nextScheduleTs) { - // Remove the container and try scheduling it. - // TEZ-587 what if container is released by RM after this - // in onContainerCompleted() - delayedContainer = delayedContainers.poll(); - if (delayedContainer == null) { - continue; - } - Map assignedContainers = null; - synchronized(YarnTaskSchedulerService.this) { - if (null != - heldContainers.get(delayedContainer.getContainer().getId())) { - assignedContainers = assignDelayedContainer(delayedContainer); - } else { - LOG.info("Skipping delayed container as container is no longer" + Map assignedContainers = null; + synchronized(YarnTaskSchedulerService.this) { + if (null != + heldContainers.get(delayedContainer.getContainer().getId())) { + assignedContainers = assignDelayedContainer(delayedContainer); + } else { + LOG.info("Skipping delayed container as container is no longer" + " running, containerId=" + delayedContainer.getContainer().getId()); - } } - // Inform App should be done outside of the lock - informAppAboutAssignments(assignedContainers); - } else { - synchronized(this) { - try { - // Wait for the next container to be assignable - delayedContainer = delayedContainers.peek(); - long diff = localitySchedulingDelay; - if (delayedContainer != null) { - diff = delayedContainer.getNextScheduleTime() - currentTs; - } - if (diff > 0) { - this.wait(diff); - } - } catch (InterruptedException e) { - LOG.info("AllocatedContainerManager Thread interrupted"); + } + // Inform App should be done outside of the lock + informAppAboutAssignments(assignedContainers); + } else { + synchronized(this) { + try { + // Wait for the next container to be assignable + delayedContainer = delayedContainers.peek(); + long diff = localitySchedulingDelay; + if (delayedContainer != null) { + diff = delayedContainer.getNextScheduleTime() - currentTs; } + if (diff > 0) { + this.wait(diff); + } + } catch (InterruptedException e) { + LOG.info("AllocatedContainerManager Thread interrupted"); } } } @@ -1874,8 +1873,9 @@ public class YarnTaskSchedulerService extends TaskSchedulerService + ", nextScheduleTime=" + delayedContainer.getNextScheduleTime() + ", containerExpiry=" + delayedContainer.getContainerExpiryTime()); } - boolean added = delayedContainers.offer(delayedContainer); + boolean added = false; synchronized(this) { + added = delayedContainers.offer(delayedContainer); this.notify(); } if (!added) {