hive-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ser...@apache.org
Subject svn commit: r1661823 - in /hive/branches/llap/llap-server/src/java/org/apache: hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java tez/dag/app/launcher/DaemonContainerLauncher.java tez/dag/app/rm/DaemonTaskSchedulerService.java
Date Tue, 24 Feb 2015 00:12:44 GMT
Author: sershe
Date: Tue Feb 24 00:12:44 2015
New Revision: 1661823

URL: http://svn.apache.org/r1661823
Log:
HIVE-9759 : LLAP: Update launcher, scheduler to work with Tez changes (Siddharth Seth)

Modified:
    hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
    hive/branches/llap/llap-server/src/java/org/apache/tez/dag/app/launcher/DaemonContainerLauncher.java
    hive/branches/llap/llap-server/src/java/org/apache/tez/dag/app/rm/DaemonTaskSchedulerService.java

Modified: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java?rev=1661823&r1=1661822&r2=1661823&view=diff
==============================================================================
--- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
(original)
+++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
Tue Feb 24 00:12:44 2015
@@ -212,7 +212,7 @@ public class ContainerRunnerImpl extends
               request.getContainerIdString(),
               request.getTokenIdentifier(), request.getAppAttemptNumber(), workingDir, localDirs,
               envMap, objectRegistry, pid,
-              executionContext, credentials, memoryAvailable, request.getUser());
+              executionContext, credentials, memoryAvailable, request.getUser(), null);
       ContainerExecutionResult result = tezChild.run();
       LOG.info("ExecutionTime for Container: " + request.getContainerIdString() + "=" +
           sw.stop().elapsedMillis());

Modified: hive/branches/llap/llap-server/src/java/org/apache/tez/dag/app/launcher/DaemonContainerLauncher.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/tez/dag/app/launcher/DaemonContainerLauncher.java?rev=1661823&r1=1661822&r2=1661823&view=diff
==============================================================================
--- hive/branches/llap/llap-server/src/java/org/apache/tez/dag/app/launcher/DaemonContainerLauncher.java
(original)
+++ hive/branches/llap/llap-server/src/java/org/apache/tez/dag/app/launcher/DaemonContainerLauncher.java
Tue Feb 24 00:12:44 2015
@@ -14,6 +14,7 @@
 
 package org.apache.tez.dag.app.launcher;
 
+import java.net.InetSocketAddress;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.Callable;
@@ -103,9 +104,10 @@ public class DaemonContainerLauncher ext
     switch (event.getType()) {
       case CONTAINER_LAUNCH_REQUEST:
         NMCommunicatorLaunchRequestEvent launchEvent = (NMCommunicatorLaunchRequestEvent)
event;
+        InetSocketAddress address = tal.getTaskCommunicator(launchEvent.getTaskCommId()).getAddress();
         ListenableFuture<Void> future = executor.submit(
             new SubmitCallable(getProxy(launchEvent.getNodeId().getHost()), launchEvent,
-                tokenIdentifier, tal.getAddress().getHostName(), tal.getAddress().getPort()));
+                tokenIdentifier, address.getHostName(), address.getPort()));
         Futures.addCallback(future, new SubmitCallback(launchEvent.getContainerId(),
             launchEvent.getContainer().getNodeId().getHost()));
         break;

Modified: hive/branches/llap/llap-server/src/java/org/apache/tez/dag/app/rm/DaemonTaskSchedulerService.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/tez/dag/app/rm/DaemonTaskSchedulerService.java?rev=1661823&r1=1661822&r2=1661823&view=diff
==============================================================================
--- hive/branches/llap/llap-server/src/java/org/apache/tez/dag/app/rm/DaemonTaskSchedulerService.java
(original)
+++ hive/branches/llap/llap-server/src/java/org/apache/tez/dag/app/rm/DaemonTaskSchedulerService.java
Tue Feb 24 00:12:44 2015
@@ -14,11 +14,12 @@
 
 package org.apache.tez.dag.app.rm;
 
-import java.io.IOException;
 import java.util.Arrays;
+import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Random;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutorService;
@@ -31,25 +32,17 @@ import com.google.common.util.concurrent
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.NodeId;
-import org.apache.hadoop.yarn.api.records.NodeReport;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.client.api.AMRMClient;
-import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
-import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.hive.llap.daemon.LlapDaemonConfiguration;
-import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.dag.app.AppContext;
 
 
-// TODO Registration with RM - so that the AM is considered dead and restarted in the expiry
interval - 10 minutes.
-
 public class DaemonTaskSchedulerService extends TaskSchedulerService {
 
   private static final Log LOG = LogFactory.getLog(DaemonTaskSchedulerService.class);
@@ -58,6 +51,7 @@ public class DaemonTaskSchedulerService
   private final TaskSchedulerAppCallback appClientDelegate;
   private final AppContext appContext;
   private final List<String> serviceHosts;
+  private final Set<String> serviceHostSet;
   private final ContainerFactory containerFactory;
   private final Random random = new Random();
 
@@ -68,8 +62,6 @@ public class DaemonTaskSchedulerService
   private final ConcurrentMap<Object, ContainerId> runningTasks =
       new ConcurrentHashMap<Object, ContainerId>();
 
-  private final AMRMClientAsync<AMRMClient.ContainerRequest> amRmClient;
-
   // Per daemon
   private final int memoryPerInstance;
   private final int coresPerInstance;
@@ -81,6 +73,7 @@ public class DaemonTaskSchedulerService
 
   public DaemonTaskSchedulerService(TaskSchedulerAppCallback appClient, AppContext appContext,
                                     String clientHostname, int clientPort, String trackingUrl,
+                                    long customAppIdIdentifier,
                                     Configuration conf) {
     // Accepting configuration here to allow setting up fields as final
     super(DaemonTaskSchedulerService.class.getName());
@@ -88,7 +81,8 @@ public class DaemonTaskSchedulerService
     this.appClientDelegate = createAppCallbackDelegate(appClient);
     this.appContext = appContext;
     this.serviceHosts = new LinkedList<String>();
-    this.containerFactory = new ContainerFactory(appContext);
+    this.serviceHostSet = new HashSet<>();
+    this.containerFactory = new ContainerFactory(appContext, customAppIdIdentifier);
     this.memoryPerInstance = conf
         .getInt(LlapDaemonConfiguration.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB,
             LlapDaemonConfiguration.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB_DEFAULT);
@@ -104,7 +98,6 @@ public class DaemonTaskSchedulerService
     int memoryPerExecutor = (int) (memoryPerInstance / (float) executorsPerInstance);
     int coresPerExecutor = (int) (coresPerInstance / (float) executorsPerInstance);
     this.resourcePerExecutor = Resource.newInstance(memoryPerExecutor, coresPerExecutor);
-    this.amRmClient = TezAMRMClientAsync.createAMRMClientAsync(5000, new FakeAmRmCallbackHandler());
 
     String[] hosts = conf.getTrimmedStrings(LlapDaemonConfiguration.LLAP_DAEMON_AM_SERVICE_HOSTS);
     if (hosts == null || hosts.length == 0) {
@@ -112,6 +105,7 @@ public class DaemonTaskSchedulerService
     }
     for (String host : hosts) {
       serviceHosts.add(host);
+      serviceHostSet.add(host);
     }
 
     LOG.info("Running with configuration: " +
@@ -125,35 +119,15 @@ public class DaemonTaskSchedulerService
 
   @Override
   public void serviceInit(Configuration conf) {
-    amRmClient.init(conf);
   }
 
   @Override
   public void serviceStart() {
-    amRmClient.start();
-    RegisterApplicationMasterResponse response;
-    try {
-      amRmClient.registerApplicationMaster(clientHostname, clientPort, trackingUrl);
-    } catch (YarnException e) {
-      throw new TezUncheckedException(e);
-    } catch (IOException e) {
-      throw new TezUncheckedException(e);
-    }
   }
 
   @Override
   public void serviceStop() {
     if (!this.isStopped.getAndSet(true)) {
-
-      try {
-        TaskSchedulerAppCallback.AppFinalStatus status = appClientDelegate.getFinalAppStatus();
-        amRmClient.unregisterApplicationMaster(status.exitStatus, status.exitMessage,
-            status.postCompletionTrackingUrl);
-      } catch (YarnException e) {
-        throw new TezUncheckedException(e);
-      } catch (IOException e) {
-        throw new TezUncheckedException(e);
-      }
       appCallbackExecutor.shutdownNow();
     }
   }
@@ -257,8 +231,23 @@ public class DaemonTaskSchedulerService
     if (requestedHosts != null && requestedHosts.length > 0) {
       Arrays.sort(requestedHosts);
       host = requestedHosts[0];
-      LOG.info("Selected host: " + host + " from requested hosts: " + Arrays.toString(requestedHosts));
-    } else {
+      if (serviceHostSet.contains(host)) {
+        LOG.info("Selected host: " + host + " from requested hosts: " + Arrays.toString(requestedHosts));
+      } else {
+        LOG.info("Preferred host: " + host + " not present. Attempting to select another
one");
+        host = null;
+        for (String h : requestedHosts) {
+          if (serviceHostSet.contains(h)) {
+            host = h;
+            break;
+          }
+        }
+        if (host == null) {
+          LOG.info("Requested hosts: " + Arrays.toString(requestedHosts) + " not present.
Randomizing the host");
+        }
+      }
+    }
+    if (host == null) {
       host = serviceHosts.get(random.nextInt(serviceHosts.size()));
       LOG.info("Selected random host: " + host + " since the request contained no host information");
     }
@@ -266,17 +255,19 @@ public class DaemonTaskSchedulerService
   }
 
   static class ContainerFactory {
-    final AppContext appContext;
+    final ApplicationAttemptId customAppAttemptId;
     AtomicInteger nextId;
 
-    public ContainerFactory(AppContext appContext) {
-      this.appContext = appContext;
+    public ContainerFactory(AppContext appContext, long appIdLong) {
       this.nextId = new AtomicInteger(1);
+      ApplicationId appId = ApplicationId
+          .newInstance(appIdLong, appContext.getApplicationAttemptId().getApplicationId().getId());
+      this.customAppAttemptId = ApplicationAttemptId
+          .newInstance(appId, appContext.getApplicationAttemptId().getAttemptId());
     }
 
     public Container createContainer(Resource capability, Priority priority, String hostname)
{
-      ApplicationAttemptId appAttemptId = appContext.getApplicationAttemptId();
-      ContainerId containerId = ContainerId.newInstance(appAttemptId, nextId.getAndIncrement());
+      ContainerId containerId = ContainerId.newInstance(customAppAttemptId, nextId.getAndIncrement());
       NodeId nodeId = NodeId.newInstance(hostname, 0);
       String nodeHttpAddress = "hostname:0";
 
@@ -290,37 +281,4 @@ public class DaemonTaskSchedulerService
       return container;
     }
   }
-
-  private static class FakeAmRmCallbackHandler implements AMRMClientAsync.CallbackHandler
{
-
-    @Override
-    public void onContainersCompleted(List<ContainerStatus> statuses) {
-
-    }
-
-    @Override
-    public void onContainersAllocated(List<Container> containers) {
-
-    }
-
-    @Override
-    public void onShutdownRequest() {
-
-    }
-
-    @Override
-    public void onNodesUpdated(List<NodeReport> updatedNodes) {
-
-    }
-
-    @Override
-    public float getProgress() {
-      return 0;
-    }
-
-    @Override
-    public void onError(Throwable e) {
-
-    }
-  }
 }



Mime
View raw message