Return-Path: X-Original-To: apmail-tez-commits-archive@minotaur.apache.org Delivered-To: apmail-tez-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 58F9A10027 for ; Wed, 26 Feb 2014 03:18:17 +0000 (UTC) Received: (qmail 73520 invoked by uid 500); 26 Feb 2014 03:18:16 -0000 Delivered-To: apmail-tez-commits-archive@tez.apache.org Received: (qmail 73474 invoked by uid 500); 26 Feb 2014 03:18:15 -0000 Mailing-List: contact commits-help@tez.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@tez.incubator.apache.org Delivered-To: mailing list commits@tez.incubator.apache.org Received: (qmail 73457 invoked by uid 99); 26 Feb 2014 03:18:12 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 26 Feb 2014 03:18:12 +0000 X-ASF-Spam-Status: No, hits=-2000.5 required=5.0 tests=ALL_TRUSTED,RP_MATCHES_RCVD X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO mail.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with SMTP; Wed, 26 Feb 2014 03:18:10 +0000 Received: (qmail 73388 invoked by uid 99); 26 Feb 2014 03:17:49 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 26 Feb 2014 03:17:49 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 9050392D69D; Wed, 26 Feb 2014 03:17:49 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: bikas@apache.org To: commits@tez.incubator.apache.org Message-Id: <04cef1af9b9046c1bf30811e3d09b02f@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: git commit: TEZ-891. TestTaskScheduler does not handle mockApp being called on different thread (bikas) Date: Wed, 26 Feb 2014 03:17:49 +0000 (UTC) X-Virus-Checked: Checked by ClamAV on apache.org Repository: incubator-tez Updated Branches: refs/heads/master a5713a41f -> f2f31e0ef TEZ-891. TestTaskScheduler does not handle mockApp being called on different thread (bikas) Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/f2f31e0e Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/f2f31e0e Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/f2f31e0e Branch: refs/heads/master Commit: f2f31e0eff3f654d51ca7001a3bf1784cf8d5e0b Parents: a5713a4 Author: Bikas Saha Authored: Tue Feb 25 19:17:36 2014 -0800 Committer: Bikas Saha Committed: Tue Feb 25 19:17:36 2014 -0800 ---------------------------------------------------------------------- .../apache/tez/dag/app/rm/TaskScheduler.java | 3 +- .../tez/dag/app/rm/TestTaskScheduler.java | 33 +++++++++++--------- .../dag/app/rm/TestTaskSchedulerHelpers.java | 5 ++- 3 files changed, 25 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/f2f31e0e/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskScheduler.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskScheduler.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskScheduler.java index 775d342..c50e3a4 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskScheduler.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskScheduler.java @@ -68,7 +68,6 @@ import org.apache.tez.dag.app.rm.container.ContainerSignatureMatcher; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; -import com.google.common.collect.Sets; import com.google.common.util.concurrent.ThreadFactoryBuilder; /* TODO not yet updating cluster nodes on every allocate response @@ -787,12 +786,14 @@ public class TaskScheduler extends AbstractService } public synchronized void blacklistNode(NodeId nodeId) { + LOG.info("Blacklisting node: " + nodeId); amRmClient.addNodeToBlacklist(nodeId); blacklistedNodes.add(nodeId); } public synchronized void unblacklistNode(NodeId nodeId) { if (blacklistedNodes.remove(nodeId)) { + LOG.info("UnBlacklisting node: " + nodeId); amRmClient.removeNodeFromBlacklist(nodeId); } } http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/f2f31e0e/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java index 150ef7a..63cc476 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java @@ -491,7 +491,7 @@ public class TestTaskScheduler { new TaskSchedulerWithDrainableAppCallback( mockApp, new AlwaysMatchesContainerMatcher(), appHost, appPort, appUrl, mockRMClient, mockAppContext); - TaskSchedulerAppCallbackDrainable drainableAppCallback = scheduler + final TaskSchedulerAppCallbackDrainable drainableAppCallback = scheduler .getDrainableAppCallback(); Configuration conf = new Configuration(); @@ -520,6 +520,7 @@ public class TestTaskScheduler { drainableAppCallback.drain(); Object mockTask1 = mock(Object.class); + when(mockTask1.toString()).thenReturn("task1"); Object mockCookie1 = mock(Object.class); Resource mockCapability = mock(Resource.class); String[] hosts = {"host1", "host5"}; @@ -529,8 +530,10 @@ public class TestTaskScheduler { final Priority mockPriority3 = Priority.newInstance(3); final Priority mockPriority4 = Priority.newInstance(4); Object mockTask2 = mock(Object.class); + when(mockTask2.toString()).thenReturn("task2"); Object mockCookie2 = mock(Object.class); Object mockTask3 = mock(Object.class); + when(mockTask3.toString()).thenReturn("task3"); Object mockCookie3 = mock(Object.class); ArgumentCaptor requestCaptor = ArgumentCaptor.forClass(CookieContainerRequest.class); @@ -558,38 +561,43 @@ public class TestTaskScheduler { // sending lower priority container first to make sure its not matched Container mockContainer4 = mock(Container.class, RETURNS_DEEP_STUBS); when(mockContainer4.getNodeId().getHost()).thenReturn("host4"); + when(mockContainer4.toString()).thenReturn("container4"); when(mockContainer4.getPriority()).thenReturn(mockPriority4); ContainerId mockCId4 = mock(ContainerId.class); when(mockContainer4.getId()).thenReturn(mockCId4); + when(mockCId4.toString()).thenReturn("container4"); containers.add(mockContainer4); Container mockContainer1 = mock(Container.class, RETURNS_DEEP_STUBS); when(mockContainer1.getNodeId().getHost()).thenReturn("host1"); when(mockContainer1.getPriority()).thenReturn(mockPriority1); + when(mockContainer1.toString()).thenReturn("container1"); ContainerId mockCId1 = mock(ContainerId.class); when(mockContainer1.getId()).thenReturn(mockCId1); + when(mockCId1.toString()).thenReturn("container1"); containers.add(mockContainer1); Container mockContainer2 = mock(Container.class, RETURNS_DEEP_STUBS); when(mockContainer2.getNodeId().getHost()).thenReturn("host2"); when(mockContainer2.getPriority()).thenReturn(mockPriority2); + when(mockContainer2.toString()).thenReturn("container2"); ContainerId mockCId2 = mock(ContainerId.class); when(mockContainer2.getId()).thenReturn(mockCId2); + when(mockCId2.toString()).thenReturn("container2"); containers.add(mockContainer2); Container mockContainer3 = mock(Container.class, RETURNS_DEEP_STUBS); when(mockContainer3.getNodeId().getHost()).thenReturn("host3"); when(mockContainer3.getPriority()).thenReturn(mockPriority3); + when(mockContainer3.toString()).thenReturn("container3"); ContainerId mockCId3 = mock(ContainerId.class); when(mockContainer3.getId()).thenReturn(mockCId3); + when(mockCId3.toString()).thenReturn("container3"); containers.add(mockContainer3); ArrayList hostContainers = new ArrayList(); hostContainers.add(request1); - hostContainers.add(request2); - hostContainers.add(request3); ArrayList rackContainers = new ArrayList(); rackContainers.add(request2); - rackContainers.add(request3); ArrayList anyContainers = new ArrayList(); anyContainers.add(request3); @@ -605,7 +613,7 @@ public class TestTaskScheduler { anyList.add(anyContainers); final List> emptyList = new LinkedList>(); - // return all requests for host1 + // return pri1 requests for host1 when( mockRMClient.getMatchingRequestsForTopPriority(eq("host1"), (Resource) any())).thenAnswer( @@ -617,7 +625,6 @@ public class TestTaskScheduler { } }); - // first request matched by host // second request matched to rack. RackResolver by default puts hosts in // /default-rack. We need to workaround by returning rack matches only once when( @@ -660,19 +667,12 @@ public class TestTaskScheduler { }); - final AtomicInteger count = new AtomicInteger(0); - Mockito.doAnswer(new Answer() { - public Object answer(InvocationOnMock invocation) { - count.incrementAndGet(); - return null; - }}) - .when(mockApp).taskAllocated(any(), any(), (Container)any()); when(mockRMClient.getTopPriority()).then( new Answer() { @Override public Priority answer( InvocationOnMock invocation) throws Throwable { - int allocations = count.get(); + int allocations = drainableAppCallback.count.get(); if (allocations == 0) { return mockPriority1; } @@ -751,6 +751,7 @@ public class TestTaskScheduler { scheduler.blacklistNode(badNodeId); verify(mockRMClient, times(1)).addNodeToBlacklist(badNodeId); Object mockTask4 = mock(Object.class); + when(mockTask4.toString()).thenReturn("task4"); Object mockCookie4 = mock(Object.class); scheduler.allocateTask(mockTask4, mockCapability, null, null, mockPriority4, null, mockCookie4); @@ -763,6 +764,8 @@ public class TestTaskScheduler { when(mockContainer5.getNodeId().getHost()).thenReturn(badHost); when(mockContainer5.getNodeId()).thenReturn(badNodeId); ContainerId mockCId5 = mock(ContainerId.class); + when(mockContainer5.toString()).thenReturn("container5"); + when(mockCId5.toString()).thenReturn("container5"); when(mockContainer5.getId()).thenReturn(mockCId5); when(mockContainer5.getPriority()).thenReturn(mockPriority4); containers.clear(); @@ -804,6 +807,8 @@ public class TestTaskScheduler { when(mockContainer6.getNodeId().getHost()).thenReturn("host7"); ContainerId mockCId6 = mock(ContainerId.class); when(mockContainer6.getId()).thenReturn(mockCId6); + when(mockContainer6.toString()).thenReturn("container6"); + when(mockCId6.toString()).thenReturn("container6"); containers.clear(); containers.add(mockContainer6); when( http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/f2f31e0e/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java index 6058f88..f7601a6 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java @@ -33,6 +33,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; import org.apache.hadoop.yarn.api.records.ApplicationAccessType; @@ -226,7 +227,8 @@ class TestTaskSchedulerHelpers { int invocations; private TaskSchedulerAppCallback real; private CompletionService completionService; - + final AtomicInteger count = new AtomicInteger(0); + public TaskSchedulerAppCallbackDrainable(TaskSchedulerAppCallbackWrapper real) { completionService = real.completionService; this.real = real; @@ -234,6 +236,7 @@ class TestTaskSchedulerHelpers { @Override public void taskAllocated(Object task, Object appCookie, Container container) { + count.incrementAndGet(); invocations++; real.taskAllocated(task, appCookie, container); }