tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rbalamo...@apache.org
Subject git commit: TEZ-1624. Flaky tests in TestContainerReuse due to race condition in DelayedContainerManager thread (Rajesh Balamohan)
Date Fri, 26 Sep 2014 22:46:50 GMT
Repository: tez
Updated Branches:
  refs/heads/branch-0.5 df9b37c1c -> 52648442a


TEZ-1624. Flaky tests in TestContainerReuse due to race condition in DelayedContainerManager
thread  (Rajesh Balamohan)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/52648442
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/52648442
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/52648442

Branch: refs/heads/branch-0.5
Commit: 52648442aea4a2d51044131a70b7e86b5fd805fb
Parents: df9b37c
Author: Rajesh Balamohan <rbalamohan@apache.org>
Authored: Sat Sep 27 04:16:26 2014 +0530
Committer: Rajesh Balamohan <rbalamohan@apache.org>
Committed: Sat Sep 27 04:16:26 2014 +0530

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../dag/app/rm/YarnTaskSchedulerService.java    | 122 +++++++++----------
 2 files changed, 62 insertions(+), 61 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/52648442/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index b37873e..6f5d06b 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -34,6 +34,7 @@ ALL CHANGES
   Windows
   TEZ-1554. Failing tests in TestMRHelpers related to environment on Windows
   TEZ-978. Enhance auto parallelism tuning for queries having empty outputs or data skewness
+  TEZ-1624. Flaky tests in TestContainerReuse due to race condition in DelayedContainerManager
thread
 
 Release 0.5.0: 2014-09-03
 

http://git-wip-us.apache.org/repos/asf/tez/blob/52648442/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java
b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java
index 5bd476c..75d62f1 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java
@@ -1714,78 +1714,77 @@ public class YarnTaskSchedulerService extends TaskSchedulerService
         // Try allocating containers which have timed out.
         // Required since these containers may get assigned without
         // locality at this point.
-        if (delayedContainers.peek() == null) {
-          try {
-            // test only signaling to make TestTaskScheduler work
-            if (drainedDelayedContainersForTest != null) {
-              drainedDelayedContainersForTest.set(true);
-              synchronized (drainedDelayedContainersForTest) {
-                drainedDelayedContainersForTest.notifyAll();
+        synchronized(this) {
+          if (delayedContainers.peek() == null) {
+            try {
+              // test only signaling to make TestTaskScheduler work
+              if (drainedDelayedContainersForTest != null) {
+                drainedDelayedContainersForTest.set(true);
+                synchronized (drainedDelayedContainersForTest) {
+                  drainedDelayedContainersForTest.notifyAll();
+                }
               }
-            }
-            synchronized(this) {
               this.wait();
-            }
-            // Re-loop to see if tryAssignAll is set.
-            continue;
-          } catch (InterruptedException e) {
-            LOG.info("AllocatedContainerManager Thread interrupted");
-          }
-        } else {
-          // test only sleep to prevent tight loop cycling that makes tests stall
-          if (drainedDelayedContainersForTest != null) {
-            try {
-              Thread.sleep(100);
+              // Re-loop to see if tryAssignAll is set.
+              continue;
             } catch (InterruptedException e) {
-              e.printStackTrace();
+              LOG.info("AllocatedContainerManager Thread interrupted");
             }
           }
-          HeldContainer delayedContainer = delayedContainers.peek();
-          if (delayedContainer == null) {
-            continue;
+        }
+        // test only sleep to prevent tight loop cycling that makes tests stall
+        if (drainedDelayedContainersForTest != null) {
+          try {
+            Thread.sleep(100);
+          } catch (InterruptedException e) {
+            e.printStackTrace();
           }
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("Considering HeldContainer: "
+        }
+        HeldContainer delayedContainer = delayedContainers.peek();
+        if (delayedContainer == null) {
+          continue;
+        }
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Considering HeldContainer: "
               + delayedContainer + " for assignment");
+        }
+        long currentTs = System.currentTimeMillis();
+        long nextScheduleTs = delayedContainer.getNextScheduleTime();
+        if (currentTs >= nextScheduleTs) {
+          // Remove the container and try scheduling it.
+          // TEZ-587 what if container is released by RM after this
+          // in onContainerCompleted()
+          delayedContainer = delayedContainers.poll();
+          if (delayedContainer == null) {
+            continue;
           }
-          long currentTs = System.currentTimeMillis();
-          long nextScheduleTs = delayedContainer.getNextScheduleTime();
-          if (currentTs >= nextScheduleTs) {
-            // Remove the container and try scheduling it.
-            // TEZ-587 what if container is released by RM after this
-            // in onContainerCompleted()
-            delayedContainer = delayedContainers.poll();
-            if (delayedContainer == null) {
-              continue;
-            }
-            Map<CookieContainerRequest, Container> assignedContainers = null;
-            synchronized(YarnTaskSchedulerService.this) {
-              if (null !=
-                  heldContainers.get(delayedContainer.getContainer().getId())) {
-                assignedContainers = assignDelayedContainer(delayedContainer);
-              } else {
-                LOG.info("Skipping delayed container as container is no longer"
+          Map<CookieContainerRequest, Container> assignedContainers = null;
+          synchronized(YarnTaskSchedulerService.this) {
+            if (null !=
+                heldContainers.get(delayedContainer.getContainer().getId())) {
+              assignedContainers = assignDelayedContainer(delayedContainer);
+            } else {
+              LOG.info("Skipping delayed container as container is no longer"
                   + " running, containerId="
                   + delayedContainer.getContainer().getId());
-              }
             }
-            // Inform App should be done outside of the lock
-            informAppAboutAssignments(assignedContainers);
-          } else {
-            synchronized(this) {
-              try {
-                // Wait for the next container to be assignable
-                delayedContainer = delayedContainers.peek();
-                long diff = localitySchedulingDelay;
-                if (delayedContainer != null) {
-                  diff = delayedContainer.getNextScheduleTime() - currentTs;
-                }
-                if (diff > 0) {
-                  this.wait(diff);
-                }
-              } catch (InterruptedException e) {
-                LOG.info("AllocatedContainerManager Thread interrupted");
+          }
+          // Inform App should be done outside of the lock
+          informAppAboutAssignments(assignedContainers);
+        } else {
+          synchronized(this) {
+            try {
+              // Wait for the next container to be assignable
+              delayedContainer = delayedContainers.peek();
+              long diff = localitySchedulingDelay;
+              if (delayedContainer != null) {
+                diff = delayedContainer.getNextScheduleTime() - currentTs;
               }
+              if (diff > 0) {
+                this.wait(diff);
+              }
+            } catch (InterruptedException e) {
+              LOG.info("AllocatedContainerManager Thread interrupted");
             }
           }
         }
@@ -1874,8 +1873,9 @@ public class YarnTaskSchedulerService extends TaskSchedulerService
           + ", nextScheduleTime=" + delayedContainer.getNextScheduleTime()
           + ", containerExpiry=" + delayedContainer.getContainerExpiryTime());
       }
-      boolean added = delayedContainers.offer(delayedContainer);
+      boolean added =  false;
       synchronized(this) {
+        added = delayedContainers.offer(delayedContainer);
         this.notify();
       }
       if (!added) {


Mime
View raw message