Return-Path: X-Original-To: apmail-tez-commits-archive@minotaur.apache.org Delivered-To: apmail-tez-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id D6CF610855 for ; Tue, 18 Mar 2014 17:47:58 +0000 (UTC) Received: (qmail 16497 invoked by uid 500); 18 Mar 2014 17:47:58 -0000 Delivered-To: apmail-tez-commits-archive@tez.apache.org Received: (qmail 16478 invoked by uid 500); 18 Mar 2014 17:47:56 -0000 Mailing-List: contact commits-help@tez.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@tez.incubator.apache.org Delivered-To: mailing list commits@tez.incubator.apache.org Received: (qmail 16471 invoked by uid 99); 18 Mar 2014 17:47:56 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 18 Mar 2014 17:47:56 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED,T_RP_MATCHES_RCVD X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO mail.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with SMTP; Tue, 18 Mar 2014 17:47:54 +0000 Received: (qmail 15732 invoked by uid 99); 18 Mar 2014 17:47:34 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 18 Mar 2014 17:47:34 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id C3AAB947AF7; Tue, 18 Mar 2014 17:47:33 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sseth@apache.org To: commits@tez.incubator.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: git commit: TEZ-940. Fix a memory leak in TaskSchedulerAppCallbackWrapper. Contributed by Gopal V and Siddharth Seth. Date: Tue, 18 Mar 2014 17:47:33 +0000 (UTC) X-Virus-Checked: Checked by ClamAV on apache.org Repository: incubator-tez Updated Branches: refs/heads/master 4a60f05c1 -> 3f3f94827 TEZ-940. Fix a memory leak in TaskSchedulerAppCallbackWrapper. Contributed by Gopal V and Siddharth Seth. Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/3f3f9482 Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/3f3f9482 Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/3f3f9482 Branch: refs/heads/master Commit: 3f3f94827486ceb87d9a76311911fe4ebc8cdc02 Parents: 4a60f05 Author: Siddharth Seth Authored: Tue Mar 18 10:46:56 2014 -0700 Committer: Siddharth Seth Committed: Tue Mar 18 10:46:56 2014 -0700 ---------------------------------------------------------------------- .../apache/tez/dag/app/rm/TaskScheduler.java | 3 +- .../app/rm/TaskSchedulerAppCallbackWrapper.java | 35 ++----- .../dag/app/rm/TestTaskSchedulerHelpers.java | 101 ++++++++++++++++++- 3 files changed, 110 insertions(+), 29 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3f3f9482/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskScheduler.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskScheduler.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskScheduler.java index bb06904..606a0b3 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskScheduler.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskScheduler.java @@ -254,7 +254,8 @@ public class TaskScheduler extends AbstractService this.appContext = appContext; } - private ExecutorService createAppCallbackExecutorService() { + @VisibleForTesting + ExecutorService createAppCallbackExecutorService() { return Executors.newSingleThreadExecutor(new ThreadFactoryBuilder() .setNameFormat("TaskSchedulerAppCaller #%d").setDaemon(true).build()); } http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3f3f9482/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerAppCallbackWrapper.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerAppCallbackWrapper.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerAppCallbackWrapper.java index c690926..53c3e95 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerAppCallbackWrapper.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerAppCallbackWrapper.java @@ -22,8 +22,6 @@ import java.nio.ByteBuffer; import java.util.List; import java.util.Map; import java.util.concurrent.Callable; -import java.util.concurrent.CompletionService; -import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; @@ -36,20 +34,15 @@ import org.apache.hadoop.yarn.api.records.Resource; import org.apache.tez.dag.api.TezUncheckedException; import org.apache.tez.dag.app.rm.TaskScheduler.TaskSchedulerAppCallback; -import com.google.common.annotations.VisibleForTesting; - /** * Makes use of an ExecutionService to invoke application callbacks. Methods * which return values wait for execution to complete - effectively waiting for * all previous events in the queue to complete. */ -@SuppressWarnings({"rawtypes", "unchecked"}) class TaskSchedulerAppCallbackWrapper implements TaskSchedulerAppCallback { private TaskSchedulerAppCallback real; - - @VisibleForTesting - CompletionService completionService; + ExecutorService executorService; /** @@ -60,58 +53,52 @@ class TaskSchedulerAppCallbackWrapper implements TaskSchedulerAppCallback { ExecutorService executorService) { this.real = real; this.executorService = executorService; - this.completionService = createAppCallbackCompletionService(); - } - - @VisibleForTesting - CompletionService createAppCallbackCompletionService() { - return new ExecutorCompletionService(this.executorService); } @Override public void taskAllocated(Object task, Object appCookie, Container container) { - completionService.submit(new TaskAllocatedCallable(real, task, appCookie, + executorService.submit(new TaskAllocatedCallable(real, task, appCookie, container)); } @Override public void containerCompleted(Object taskLastAllocated, ContainerStatus containerStatus) { - completionService.submit(new ContainerCompletedCallable(real, + executorService.submit(new ContainerCompletedCallable(real, taskLastAllocated, containerStatus)); } @Override public void containerBeingReleased(ContainerId containerId) { - completionService + executorService .submit(new ContainerBeingReleasedCallable(real, containerId)); } @Override public void nodesUpdated(List updatedNodes) { - completionService.submit(new NodesUpdatedCallable(real, updatedNodes)); + executorService.submit(new NodesUpdatedCallable(real, updatedNodes)); } @Override public void appShutdownRequested() { - completionService.submit(new AppShudownRequestedCallable(real)); + executorService.submit(new AppShudownRequestedCallable(real)); } @Override public void setApplicationRegistrationData(Resource maxContainerCapability, Map appAcls, ByteBuffer key) { - completionService.submit(new SetApplicationRegistrationDataCallable(real, + executorService.submit(new SetApplicationRegistrationDataCallable(real, maxContainerCapability, appAcls, key)); } @Override public void onError(Throwable t) { - completionService.submit(new OnErrorCallable(real, t)); + executorService.submit(new OnErrorCallable(real, t)); } @Override public float getProgress() { - Future progressFuture = completionService + Future progressFuture = executorService .submit(new GetProgressCallable(real)); try { return progressFuture.get(); @@ -122,12 +109,12 @@ class TaskSchedulerAppCallbackWrapper implements TaskSchedulerAppCallback { @Override public void preemptContainer(ContainerId containerId) { - completionService.submit(new PreemptContainerCallable(real, containerId)); + executorService.submit(new PreemptContainerCallable(real, containerId)); } @Override public AppFinalStatus getFinalAppStatus() { - Future appFinalStatusFuture = completionService + Future appFinalStatusFuture = executorService .submit(new GetFinalAppStatusCallable(real)); try { return appFinalStatusFuture.get(); http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3f3f9482/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java index 087da83..72cc8e7 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java @@ -25,13 +25,18 @@ import static org.mockito.Mockito.spy; import static org.mockito.Mockito.when; import java.nio.ByteBuffer; +import java.util.Collection; import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.concurrent.Callable; import java.util.concurrent.CompletionService; import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorCompletionService; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -108,7 +113,7 @@ class TestTaskSchedulerHelpers { protected void serviceStop() { } } - + // Overrides start / stop. Will be controlled without the extra event handling thread. static class TaskSchedulerEventHandlerForTest extends TaskSchedulerEventHandler { @@ -217,6 +222,12 @@ class TestTaskSchedulerHelpers { appCallbackExecutor)); return drainableAppCallback; } + + @Override + ExecutorService createAppCallbackExecutorService() { + ExecutorService real = super.createAppCallbackExecutorService(); + return new CountingExecutorService(real); + } public TaskSchedulerAppCallbackDrainable getDrainableAppCallback() { return drainableAppCallback; @@ -228,11 +239,11 @@ class TestTaskSchedulerHelpers { int completedEvents; int invocations; private TaskSchedulerAppCallback real; - private CompletionService completionService; + private CountingExecutorService countingExecutorService; final AtomicInteger count = new AtomicInteger(0); public TaskSchedulerAppCallbackDrainable(TaskSchedulerAppCallbackWrapper real) { - completionService = real.completionService; + countingExecutorService = (CountingExecutorService) real.executorService; this.real = real; } @@ -301,7 +312,7 @@ class TestTaskSchedulerHelpers { public void drain() throws InterruptedException, ExecutionException { while (completedEvents < invocations) { - Future f = completionService.poll(5000l, TimeUnit.MILLISECONDS); + Future f = countingExecutorService.completionService.poll(5000l, TimeUnit.MILLISECONDS); if (f != null) { completedEvents++; } else { @@ -365,5 +376,87 @@ class TestTaskSchedulerHelpers { } } } + + @SuppressWarnings({"rawtypes", "unchecked"}) + private static class CountingExecutorService implements ExecutorService { + + final ExecutorService real; + final CompletionService completionService; + + CountingExecutorService(ExecutorService real) { + this.real = real; + completionService = new ExecutorCompletionService(real); + } + + @Override + public void execute(Runnable command) { + throw new UnsupportedOperationException("Not expected to be used"); + } + + @Override + public void shutdown() { + real.shutdown(); + } + + @Override + public List shutdownNow() { + return real.shutdownNow(); + } + + @Override + public boolean isShutdown() { + return real.isShutdown(); + } + + @Override + public boolean isTerminated() { + return real.isTerminated(); + } + + @Override + public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { + return real.awaitTermination(timeout, unit); + } + + @Override + public Future submit(Callable task) { + return completionService.submit(task); + } + + @Override + public Future submit(Runnable task, T result) { + return completionService.submit(task, result); + } + + @Override + public Future submit(Runnable task) { + throw new UnsupportedOperationException("Not expected to be used"); + } + + @Override + public List> invokeAll(Collection> tasks) + throws InterruptedException { + throw new UnsupportedOperationException("Not expected to be used"); + } + + @Override + public List> invokeAll(Collection> tasks, long timeout, + TimeUnit unit) throws InterruptedException { + throw new UnsupportedOperationException("Not expected to be used"); + } + + @Override + public T invokeAny(Collection> tasks) throws InterruptedException, + ExecutionException { + throw new UnsupportedOperationException("Not expected to be used"); + } + + @Override + public T invokeAny(Collection> tasks, long timeout, TimeUnit unit) + throws InterruptedException, ExecutionException, TimeoutException { + throw new UnsupportedOperationException("Not expected to be used"); + } + + } }