Return-Path: X-Original-To: apmail-tez-commits-archive@minotaur.apache.org Delivered-To: apmail-tez-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 1D8EA1895A for ; Fri, 21 Aug 2015 01:36:10 +0000 (UTC) Received: (qmail 98731 invoked by uid 500); 21 Aug 2015 01:36:10 -0000 Delivered-To: apmail-tez-commits-archive@tez.apache.org Received: (qmail 98645 invoked by uid 500); 21 Aug 2015 01:36:10 -0000 Mailing-List: contact commits-help@tez.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@tez.apache.org Delivered-To: mailing list commits@tez.apache.org Received: (qmail 97484 invoked by uid 99); 21 Aug 2015 01:36:09 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 21 Aug 2015 01:36:09 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 3B145E7E01; Fri, 21 Aug 2015 01:36:09 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sseth@apache.org To: commits@tez.apache.org Date: Fri, 21 Aug 2015 01:36:24 -0000 Message-Id: In-Reply-To: <0a6214bdcf644e979ab2906bb3bbf947@git.apache.org> References: <0a6214bdcf644e979ab2906bb3bbf947@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [16/50] [abbrv] tez git commit: TEZ-2005. Define basic interface for pluggable TaskScheduler. (sseth) http://git-wip-us.apache.org/repos/asf/tez/blob/b6582f06/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/rm/TezTestServiceTaskSchedulerService.java ---------------------------------------------------------------------- diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/rm/TezTestServiceTaskSchedulerService.java b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/rm/TezTestServiceTaskSchedulerService.java index 506e991..7d209bc 100644 --- a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/rm/TezTestServiceTaskSchedulerService.java +++ b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/rm/TezTestServiceTaskSchedulerService.java @@ -20,14 +20,10 @@ import java.util.List; import java.util.Random; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import com.google.common.base.Preconditions; import com.google.common.primitives.Ints; -import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; @@ -37,30 +33,24 @@ import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.tez.serviceplugins.api.TaskAttemptEndReason; -import org.apache.tez.dag.app.AppContext; import org.apache.tez.service.TezTestServiceConfConstants; +import org.apache.tez.serviceplugins.api.TaskScheduler; +import org.apache.tez.serviceplugins.api.TaskSchedulerContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class TezTestServiceTaskSchedulerService extends TaskSchedulerService { +public class TezTestServiceTaskSchedulerService extends TaskScheduler { private static final Logger LOG = LoggerFactory.getLogger(TezTestServiceTaskSchedulerService.class); - private final ExecutorService appCallbackExecutor; - private final TaskSchedulerAppCallback appClientDelegate; - private final AppContext appContext; private final List serviceHosts; private final ContainerFactory containerFactory; private final Random random = new Random(); // Currently all services must be running on the same port. private final int containerPort; - private final String clientHostname; - private final int clientPort; - private final String trackingUrl; - private final AtomicBoolean isStopped = new AtomicBoolean(false); private final ConcurrentMap runningTasks = new ConcurrentHashMap(); @@ -77,20 +67,14 @@ public class TezTestServiceTaskSchedulerService extends TaskSchedulerService { // Not registering with the RM. Assuming the main TezScheduler will always run (except local mode), // and take care of YARN registration. - public TezTestServiceTaskSchedulerService(TaskSchedulerAppCallback appClient, - AppContext appContext, - String clientHostname, int clientPort, - String trackingUrl, - long customAppIdIdentifier, - Configuration conf) { + public TezTestServiceTaskSchedulerService(TaskSchedulerContext taskSchedulerContext) { // Accepting configuration here to allow setting up fields as final - super(TezTestServiceTaskSchedulerService.class.getName()); - this.appCallbackExecutor = createAppCallbackExecutorService(); - this.appClientDelegate = createAppCallbackDelegate(appClient); - this.appContext = appContext; + super(taskSchedulerContext); this.serviceHosts = new LinkedList(); - this.containerFactory = new ContainerFactory(appContext, customAppIdIdentifier); + this.containerFactory = new ContainerFactory(taskSchedulerContext.getApplicationAttemptId(), + taskSchedulerContext.getCustomClusterIdentifier()); + Configuration conf = taskSchedulerContext.getInitialConfiguration(); this.memoryPerInstance = conf .getInt(TezTestServiceConfConstants.TEZ_TEST_SERVICE_MEMORY_PER_INSTANCE_MB, -1); Preconditions.checkArgument(memoryPerInstance > 0, @@ -112,10 +96,6 @@ public class TezTestServiceTaskSchedulerService extends TaskSchedulerService { Preconditions.checkArgument(executorsPerInstance > 0, TezTestServiceConfConstants.TEZ_TEST_SERVICE_RPC_PORT + " must be configured"); - this.clientHostname = clientHostname; - this.clientPort = clientPort; - this.trackingUrl = trackingUrl; - int memoryPerContainer = (int) (memoryPerInstance / (float) executorsPerInstance); int coresPerContainer = (int) (coresPerInstance / (float) executorsPerInstance); this.resourcePerContainer = Resource.newInstance(memoryPerContainer, coresPerContainer); @@ -138,13 +118,6 @@ public class TezTestServiceTaskSchedulerService extends TaskSchedulerService { } @Override - public void serviceStop() { - if (!this.isStopped.getAndSet(true)) { - appCallbackExecutor.shutdownNow(); - } - } - - @Override public Resource getAvailableResources() { // TODO This needs information about all running executors, and the amount of memory etc available across the cluster. return Resource @@ -185,7 +158,7 @@ public class TezTestServiceTaskSchedulerService extends TaskSchedulerService { Container container = containerFactory.createContainer(resourcePerContainer, priority, host, containerPort); runningTasks.put(task, container.getId()); - appClientDelegate.taskAllocated(task, clientCookie, container); + getContext().taskAllocated(task, clientCookie, container); } @@ -196,7 +169,7 @@ public class TezTestServiceTaskSchedulerService extends TaskSchedulerService { Container container = containerFactory.createContainer(resourcePerContainer, priority, host, containerPort); runningTasks.put(task, container.getId()); - appClientDelegate.taskAllocated(task, clientCookie, container); + getContext().taskAllocated(task, clientCookie, container); } @Override @@ -208,7 +181,7 @@ public class TezTestServiceTaskSchedulerService extends TaskSchedulerService { " The query may hang since this \"unknown\" container is now taking up a slot permanently"); return false; } - appClientDelegate.containerBeingReleased(containerId); + getContext().containerBeingReleased(containerId); return true; } @@ -229,17 +202,6 @@ public class TezTestServiceTaskSchedulerService extends TaskSchedulerService { return true; } - private ExecutorService createAppCallbackExecutorService() { - return Executors.newSingleThreadExecutor(new ThreadFactoryBuilder() - .setNameFormat("TezTestTaskSchedulerAppCaller").setDaemon(true).build()); - } - - private TaskSchedulerAppCallback createAppCallbackDelegate( - TaskSchedulerAppCallback realAppClient) { - return new TaskSchedulerAppCallbackWrapper(realAppClient, - appCallbackExecutor); - } - private String selectHost(String[] requestedHosts) { String host; if (requestedHosts != null && requestedHosts.length > 0) { @@ -257,12 +219,12 @@ public class TezTestServiceTaskSchedulerService extends TaskSchedulerService { AtomicInteger nextId; final ApplicationAttemptId customAppAttemptId; - public ContainerFactory(AppContext appContext, long appIdLong) { + public ContainerFactory(ApplicationAttemptId appAttemptId, long appIdLong) { this.nextId = new AtomicInteger(1); ApplicationId appId = ApplicationId - .newInstance(appIdLong, appContext.getApplicationAttemptId().getApplicationId().getId()); + .newInstance(appIdLong, appAttemptId.getApplicationId().getId()); this.customAppAttemptId = ApplicationAttemptId - .newInstance(appId, appContext.getApplicationAttemptId().getAttemptId()); + .newInstance(appId, appAttemptId.getAttemptId()); } public Container createContainer(Resource capability, Priority priority, String hostname, int port) {