hive-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gop...@apache.org
Subject svn commit: r1673556 - in /hive/branches/llap: llap-client/src/java/org/apache/hadoop/hive/llap/configuration/ llap-server/src/java/org/apache/hadoop/hive/llap/cli/ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/registry/ llap-server/src/java/...
Date Tue, 14 Apr 2015 21:06:58 GMT
Author: gopalv
Date: Tue Apr 14 21:06:58 2015
New Revision: 1673556

URL: http://svn.apache.org/r1673556
Log:
HIVE-10012: LLAP: Dynamic refresh and blacklisting for daemons (gopalv)

Added:
    hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/registry/ServiceInstance.java
    hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/registry/ServiceInstanceSet.java
    hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/registry/ServiceRegistry.java
    hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/registry/impl/LlapFixedRegistryImpl.java
    hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/registry/impl/LlapYarnRegistryImpl.java
Modified:
    hive/branches/llap/llap-client/src/java/org/apache/hadoop/hive/llap/configuration/LlapConfiguration.java
    hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapOptionsProcessor.java
    hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/registry/impl/LlapRegistryService.java
    hive/branches/llap/llap-server/src/java/org/apache/tez/dag/app/rm/LlapTaskSchedulerService.java
    hive/branches/llap/llap-server/src/main/resources/package.py
    hive/branches/llap/llap-server/src/main/resources/templates.py
    hive/branches/llap/llap-server/src/test/org/apache/tez/dag/app/rm/TestLlapTaskSchedulerService.java

Modified: hive/branches/llap/llap-client/src/java/org/apache/hadoop/hive/llap/configuration/LlapConfiguration.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-client/src/java/org/apache/hadoop/hive/llap/configuration/LlapConfiguration.java?rev=1673556&r1=1673555&r2=1673556&view=diff
==============================================================================
--- hive/branches/llap/llap-client/src/java/org/apache/hadoop/hive/llap/configuration/LlapConfiguration.java (original)
+++ hive/branches/llap/llap-client/src/java/org/apache/hadoop/hive/llap/configuration/LlapConfiguration.java Tue Apr 14 21:06:58 2015
@@ -67,6 +67,9 @@ public class LlapConfiguration extends C
   // Section for configs used in the AM //
   public static final String LLAP_DAEMON_SERVICE_HOSTS = LLAP_DAEMON_PREFIX + "service.hosts";
 
+  public static final String LLAP_DAEMON_SERVICE_REFRESH_INTERVAL = LLAP_DAEMON_PREFIX + "service.refresh.interval";
+  public static final int LLAP_DAEMON_SERVICE_REFRESH_INTERVAL_DEFAULT = 60; // seconds
+
   public static final String LLAP_DAEMON_COMMUNICATOR_NUM_THREADS = LLAP_DAEMON_PREFIX + "communicator.num.threads";
   public static final int LLAP_DAEMON_COMMUNICATOR_NUM_THREADS_DEFAULT = 5;
 

Modified: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapOptionsProcessor.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapOptionsProcessor.java?rev=1673556&r1=1673555&r2=1673556&view=diff
==============================================================================
--- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapOptionsProcessor.java (original)
+++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapOptionsProcessor.java Tue Apr 14 21:06:58 2015
@@ -85,6 +85,9 @@ public class LlapOptionsProcessor {
     options.addOption(OptionBuilder.hasArg().withArgName("loglevel").withLongOpt("loglevel")
         .withDescription("log levels for the llap instance").create('l'));
 
+    options.addOption(OptionBuilder.hasArg().withArgName("chaosmonkey").withLongOpt("chaosmonkey")
+        .withDescription("chaosmonkey interval").create('m'));
+
     // [-H|--help]
     options.addOption(new Option("H", "help", false, "Print help information"));
   }
@@ -101,7 +104,7 @@ public class LlapOptionsProcessor {
     String directory = commandLine.getOptionValue("directory");
 
     String name = commandLine.getOptionValue("name", null);
-    // loglevel & args are parsed by the python processor
+    // loglevel, chaosmonkey & args are parsed by the python processor
 
     return new LlapOptions(name, instances, directory);
   }

Added: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/registry/ServiceInstance.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/registry/ServiceInstance.java?rev=1673556&view=auto
==============================================================================
--- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/registry/ServiceInstance.java (added)
+++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/registry/ServiceInstance.java Tue Apr 14 21:06:58 2015
@@ -0,0 +1,73 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.hive.llap.daemon.registry;
+
+import java.util.Map;
+
+import org.apache.hadoop.yarn.api.records.Resource;
+
+public interface ServiceInstance {
+
+  /**
+   * Worker identity is a UUID (unique across restarts), to identify a node which died & was brought
+   * back on the same host/port
+   */
+  public String getWorkerIdentity();
+
+  /**
+   * Hostname of the service instance
+   * 
+   * @return
+   */
+  public String getHost();
+
+  /**
+   * RPC Endpoint for service instance
+   * 
+   * @return
+   */
+  public int getRpcPort();
+
+  /**
+   * Shuffle Endpoint for service instance
+   * 
+   * @return
+   */
+  public int getShufflePort();
+
+  /**
+   * Return the last known state (without refreshing)
+   * 
+   * @return
+   */
+
+  public boolean isAlive();
+
+  /**
+   * Config properties of the Service Instance (llap.daemon.*)
+   * 
+   * @return
+   */
+
+  public Map<String, String> getProperties();
+
+  /**
+   * Memory and Executors available for the LLAP tasks
+   * 
+   * This does not include the size of the cache or the actual vCores allocated via Slider.
+   * 
+   * @return
+   */
+  public Resource getResource();
+}

Added: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/registry/ServiceInstanceSet.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/registry/ServiceInstanceSet.java?rev=1673556&view=auto
==============================================================================
--- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/registry/ServiceInstanceSet.java (added)
+++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/registry/ServiceInstanceSet.java Tue Apr 14 21:06:58 2015
@@ -0,0 +1,57 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.hive.llap.daemon.registry;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.Set;
+
+public interface ServiceInstanceSet {
+
+  /**
+   * Get an instance mapping which map worker identity to each instance.
+   * 
+   * The worker identity does not collide between restarts, so each restart will have a unique id,
+   * while having the same host/ip pair.
+   * 
+   * @return
+   */
+  public Map<String, ServiceInstance> getAll();
+
+  /**
+   * Get an instance by worker identity.
+   * 
+   * @param name
+   * @return
+   */
+  public ServiceInstance getInstance(String name);
+
+  /**
+   * Get a list of service instances for a given host.
+   * 
+   * The list could include dead and alive instances.
+   * 
+   * @param host
+   * @return
+   */
+  public Set<ServiceInstance> getByHost(String host);
+
+  /**
+   * Refresh the instance set from registry backing store.
+   * 
+   * @throws IOException
+   */
+  public void refresh() throws IOException;
+
+}
\ No newline at end of file

Added: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/registry/ServiceRegistry.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/registry/ServiceRegistry.java?rev=1673556&view=auto
==============================================================================
--- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/registry/ServiceRegistry.java (added)
+++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/registry/ServiceRegistry.java Tue Apr 14 21:06:58 2015
@@ -0,0 +1,59 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.hive.llap.daemon.registry;
+
+import java.io.IOException;
+
+/**
+ * ServiceRegistry interface for switching between fixed host and dynamic registry implementations.
+ */
+public interface ServiceRegistry {
+
+  /**
+   * Start the service registry
+   * 
+   * @throws InterruptedException
+   */
+  public void start() throws InterruptedException;
+
+  /**
+   * Stop the service registry
+   * 
+   * @throws InterruptedException
+   */
+  public void stop() throws InterruptedException;
+
+  /**
+   * Register the current instance - the implementation takes care of the endpoints to register.
+   * 
+   * @throws IOException
+   */
+  public void register() throws IOException;
+
+  /**
+   * Remove the current registration cleanly (implementation defined cleanup)
+   * 
+   * @throws IOException
+   */
+  public void unregister() throws IOException;
+
+  /**
+   * Client API to get the list of instances registered via the current registry key.
+   * 
+   * @param component
+   * @return
+   * @throws IOException
+   */
+  public ServiceInstanceSet getInstances(String component) throws IOException;
+}

Added: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/registry/impl/LlapFixedRegistryImpl.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/registry/impl/LlapFixedRegistryImpl.java?rev=1673556&view=auto
==============================================================================
--- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/registry/impl/LlapFixedRegistryImpl.java (added)
+++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/registry/impl/LlapFixedRegistryImpl.java Tue Apr 14 21:06:58 2015
@@ -0,0 +1,204 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.hive.llap.daemon.registry.impl;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.UnknownHostException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.llap.configuration.LlapConfiguration;
+import org.apache.hadoop.hive.llap.daemon.registry.ServiceInstance;
+import org.apache.hadoop.hive.llap.daemon.registry.ServiceInstanceSet;
+import org.apache.hadoop.hive.llap.daemon.registry.ServiceRegistry;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.log4j.Logger;
+
+public class LlapFixedRegistryImpl implements ServiceRegistry {
+
+  private static final Logger LOG = Logger.getLogger(LlapFixedRegistryImpl.class);
+
+  private final int port;
+  private final int shuffle;
+  private final String[] hosts;
+  private final int memory;
+  private final int vcores;
+
+  private final Map<String, String> srv = new HashMap<String, String>();
+
+  public LlapFixedRegistryImpl(String hosts, Configuration conf) {
+    this.hosts = hosts.split(",");
+    this.port =
+        conf.getInt(LlapConfiguration.LLAP_DAEMON_RPC_PORT,
+            LlapConfiguration.LLAP_DAEMON_RPC_PORT_DEFAULT);
+    this.shuffle =
+        conf.getInt(LlapConfiguration.LLAP_DAEMON_YARN_SHUFFLE_PORT,
+            LlapConfiguration.LLAP_DAEMON_YARN_SHUFFLE_PORT_DEFAULT);
+
+    for (Map.Entry<String, String> kv : conf) {
+      if (kv.getKey().startsWith(LlapConfiguration.LLAP_DAEMON_PREFIX)
+          || kv.getKey().startsWith("hive.llap.")) {
+        // TODO: read this somewhere useful, like the task scheduler
+        srv.put(kv.getKey(), kv.getValue());
+      }
+    }
+
+    this.memory =
+        conf.getInt(LlapConfiguration.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB,
+            LlapConfiguration.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB_DEFAULT);
+    this.vcores =
+        conf.getInt(LlapConfiguration.LLAP_DAEMON_NUM_EXECUTORS,
+            LlapConfiguration.LLAP_DAEMON_NUM_EXECUTORS_DEFAULT);
+  }
+
+  @Override
+  public void start() throws InterruptedException {
+    // nothing to start
+  }
+
+  @Override
+  public void stop() throws InterruptedException {
+    // nothing to stop
+  }
+
+  @Override
+  public void register() throws IOException {
+    // nothing to register
+  }
+
+  @Override
+  public void unregister() throws IOException {
+    // nothing to unregister
+  }
+
+  public static String getWorkerIdentity(String host) {
+    // trigger clean errors for anyone who mixes up identity with hosts
+    return "host-" + host;
+  }
+
+  private final class FixedServiceInstance implements ServiceInstance {
+
+    private final String host;
+
+    public FixedServiceInstance(String host) {
+      try {
+        InetAddress inetAddress = InetAddress.getByName(host);
+        if (NetUtils.isLocalAddress(inetAddress)) {
+          InetSocketAddress socketAddress = new InetSocketAddress(0);
+          socketAddress = NetUtils.getConnectAddress(socketAddress);
+          LOG.info("Adding host identified as local: " + host + " as "
+              + socketAddress.getHostName());
+          host = socketAddress.getHostName();
+        }
+      } catch (UnknownHostException e) {
+        LOG.warn("Ignoring resolution issues for host: " + host, e);
+      }
+      this.host = host;
+    }
+
+    @Override
+    public String getWorkerIdentity() {
+      return LlapFixedRegistryImpl.getWorkerIdentity(host);
+    }
+
+    @Override
+    public String getHost() {
+      return host;
+    }
+
+    @Override
+    public int getRpcPort() {
+      // TODO: allow >1 port per host?
+      return LlapFixedRegistryImpl.this.port;
+    }
+
+    @Override
+    public int getShufflePort() {
+      return LlapFixedRegistryImpl.this.shuffle;
+    }
+
+    @Override
+    public boolean isAlive() {
+      return true;
+    }
+
+    @Override
+    public Map<String, String> getProperties() {
+      Map<String, String> properties = new HashMap<>(srv);
+      // no worker identity
+      return properties;
+    }
+
+    @Override
+    public Resource getResource() {
+      return Resource.newInstance(memory, vcores);
+    }
+
+  }
+
+  private final class FixedServiceInstanceSet implements ServiceInstanceSet {
+
+    private final Map<String, ServiceInstance> instances = new HashMap<String, ServiceInstance>();
+
+    public FixedServiceInstanceSet() {
+      for (String host : hosts) {
+        // trigger bugs in anyone who uses this as a hostname
+        instances.put(getWorkerIdentity(host), new FixedServiceInstance(host));
+      }
+    }
+
+    @Override
+    public Map<String, ServiceInstance> getAll() {
+      return instances;
+    }
+
+    @Override
+    public ServiceInstance getInstance(String name) {
+      return instances.get(name);
+    }
+
+    @Override
+    public Set<ServiceInstance> getByHost(String host) {
+      Set<ServiceInstance> byHost = new HashSet<ServiceInstance>();
+      ServiceInstance inst = getInstance(getWorkerIdentity(host));
+      if (inst != null) {
+        byHost.add(inst);
+      }
+      return byHost;
+    }
+
+    @Override
+    public void refresh() throws IOException {
+      // I will do no such thing
+    }
+
+  }
+
+  @Override
+  public ServiceInstanceSet getInstances(String component) throws IOException {
+    return new FixedServiceInstanceSet();
+  }
+
+  @Override
+  public String toString() {
+    return String.format("FixedRegistry hosts=%s", StringUtils.join(",", this.hosts));
+  }
+}
\ No newline at end of file

Modified: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/registry/impl/LlapRegistryService.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/registry/impl/LlapRegistryService.java?rev=1673556&r1=1673555&r2=1673556&view=diff
==============================================================================
--- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/registry/impl/LlapRegistryService.java (original)
+++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/registry/impl/LlapRegistryService.java Tue Apr 14 21:06:58 2015
@@ -1,16 +1,29 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
 package org.apache.hadoop.hive.llap.daemon.registry.impl;
 
 import java.io.IOException;
-import java.net.Inet4Address;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.UnknownHostException;
 import java.util.Map;
+import java.util.UUID;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.llap.configuration.LlapConfiguration;
-import org.apache.hadoop.hive.llap.daemon.impl.LlapDaemon;
-import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.hive.llap.daemon.registry.ServiceInstanceSet;
+import org.apache.hadoop.hive.llap.daemon.registry.ServiceRegistry;
 import org.apache.hadoop.registry.client.api.RegistryOperationsFactory;
 import org.apache.hadoop.registry.client.binding.RegistryPathUtils;
 import org.apache.hadoop.registry.client.binding.RegistryTypeUtils;
@@ -30,25 +43,8 @@ import com.google.common.base.Preconditi
 public class LlapRegistryService extends AbstractService {
 
   private static final Logger LOG = Logger.getLogger(LlapRegistryService.class);
-
-  public final static String SERVICE_CLASS = "org-apache-hive";
-
-  private RegistryOperationsService client;
-  private String instanceName;
-  private Configuration conf;
-  private ServiceRecordMarshal encoder;
-
-  private static final String hostname;
   
-  static {
-    String localhost = "localhost";
-    try {
-      localhost = InetAddress.getLocalHost().getCanonicalHostName();
-    } catch (UnknownHostException uhe) {
-      // ignore
-    }
-    hostname = localhost;
-  }
+  private ServiceRegistry registry = null;
 
   public LlapRegistryService() {
     super("LlapRegistryService");
@@ -56,92 +52,48 @@ public class LlapRegistryService extends
 
   @Override
   public void serviceInit(Configuration conf) {
-    String registryId = conf.getTrimmed(LlapConfiguration.LLAP_DAEMON_SERVICE_HOSTS);
-    if (registryId.startsWith("@")) {
-      LOG.info("Llap Registry is enabled with registryid: " + registryId);
-      this.conf = new Configuration(conf);
-      conf.addResource(YarnConfiguration.YARN_SITE_CONFIGURATION_FILE);
-      // registry reference
-      instanceName = registryId.substring(1);
-      client = (RegistryOperationsService) RegistryOperationsFactory.createInstance(conf);
-      encoder = new RegistryUtils.ServiceRecordMarshal();
-
+    String hosts = conf.getTrimmed(LlapConfiguration.LLAP_DAEMON_SERVICE_HOSTS);
+    if (hosts.startsWith("@")) {
+      registry = initRegistry(hosts.substring(1), conf);
     } else {
-      LOG.info("Llap Registry is disabled");
+      registry = new LlapFixedRegistryImpl(hosts, conf);
     }
+    LOG.info("Using LLAP registry type " + registry);
+  }
+
+  private ServiceRegistry initRegistry(String instanceName, Configuration conf) {
+    return new LlapYarnRegistryImpl(instanceName, conf);
   }
 
   @Override
   public void serviceStart() throws Exception {
-    if (client != null) {
-      client.start();
+    if (this.registry != null) {
+      this.registry.start();
     }
   }
 
+  @Override
   public void serviceStop() throws Exception {
-    if (client != null) {
-      client.stop();
+    if (this.registry != null) {
+      this.registry.start();
+    } else {
+      LOG.warn("Stopping non-existent registry service");
     }
   }
 
-  public Endpoint getRpcEndpoint() {
-    final int rpcPort =
-        conf.getInt(LlapConfiguration.LLAP_DAEMON_RPC_PORT,
-            LlapConfiguration.LLAP_DAEMON_RPC_PORT_DEFAULT);
-
-    return RegistryTypeUtils.ipcEndpoint("llap", new InetSocketAddress(hostname, rpcPort));
-  }
-
-  public Endpoint getShuffleEndpoint() {
-    final int shufflePort =
-        conf.getInt(LlapConfiguration.LLAP_DAEMON_YARN_SHUFFLE_PORT,
-            LlapConfiguration.LLAP_DAEMON_YARN_SHUFFLE_PORT_DEFAULT);
-    // HTTP today, but might not be
-    return RegistryTypeUtils.inetAddrEndpoint("shuffle", ProtocolTypes.PROTOCOL_TCP, hostname,
-        shufflePort);
-  }
-
-  private final String getPath() {
-    return RegistryPathUtils.join(RegistryUtils.componentPath(RegistryUtils.currentUser(),
-        SERVICE_CLASS, instanceName, "workers"), "worker-");
-  }
-
   public void registerWorker() throws IOException {
-    if (this.client != null) {
-      String path = getPath();
-      ServiceRecord srv = new ServiceRecord();
-      srv.addInternalEndpoint(getRpcEndpoint());
-      srv.addInternalEndpoint(getShuffleEndpoint());
-
-      for (Map.Entry<String, String> kv : this.conf) {
-        if (kv.getKey().startsWith(LlapConfiguration.LLAP_DAEMON_PREFIX)) {
-          // TODO: read this somewhere useful, like the allocator
-          srv.set(kv.getKey(), kv.getValue());
-        }
-      }
-
-      client.mknode(RegistryPathUtils.parentOf(path), true);
-
-      // FIXME: YARN registry needs to expose Ephemeral_Seq nodes & return the paths
-      client.zkCreate(path, CreateMode.EPHEMERAL_SEQUENTIAL, encoder.toBytes(srv),
-          client.getClientAcls());
+    if (this.registry != null) {
+      this.registry.register();
     }
   }
 
-  public void unregisterWorker() {
-    if (this.client != null) {
-      // with ephemeral nodes, there's nothing to do here
-      // because the create didn't return paths
+  public void unregisterWorker() throws IOException {
+    if (this.registry != null) {
+      this.registry.unregister();
     }
   }
 
-  public Map<String, ServiceRecord> getWorkers() throws IOException {
-    if (this.client != null) {
-      String path = getPath();
-      return RegistryUtils.listServiceRecords(client, RegistryPathUtils.parentOf(path));
-    } else {
-      Preconditions.checkNotNull(this.client, "Yarn registry client is not intialized");
-      return null;
-    }
+  public ServiceInstanceSet getInstances() throws IOException {
+    return this.registry.getInstances("LLAP");
   }
 }

Added: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/registry/impl/LlapYarnRegistryImpl.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/registry/impl/LlapYarnRegistryImpl.java?rev=1673556&view=auto
==============================================================================
--- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/registry/impl/LlapYarnRegistryImpl.java (added)
+++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/registry/impl/LlapYarnRegistryImpl.java Tue Apr 14 21:06:58 2015
@@ -0,0 +1,339 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.hive.llap.daemon.registry.impl;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.UnknownHostException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.PathNotFoundException;
+import org.apache.hadoop.hive.llap.configuration.LlapConfiguration;
+import org.apache.hadoop.hive.llap.daemon.registry.ServiceInstance;
+import org.apache.hadoop.hive.llap.daemon.registry.ServiceInstanceSet;
+import org.apache.hadoop.hive.llap.daemon.registry.ServiceRegistry;
+import org.apache.hadoop.registry.client.api.RegistryOperationsFactory;
+import org.apache.hadoop.registry.client.binding.RegistryPathUtils;
+import org.apache.hadoop.registry.client.binding.RegistryTypeUtils;
+import org.apache.hadoop.registry.client.binding.RegistryUtils;
+import org.apache.hadoop.registry.client.binding.RegistryUtils.ServiceRecordMarshal;
+import org.apache.hadoop.registry.client.exceptions.InvalidRecordException;
+import org.apache.hadoop.registry.client.impl.zk.RegistryOperationsService;
+import org.apache.hadoop.registry.client.types.AddressTypes;
+import org.apache.hadoop.registry.client.types.Endpoint;
+import org.apache.hadoop.registry.client.types.ProtocolTypes;
+import org.apache.hadoop.registry.client.types.ServiceRecord;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.CreateMode;
+
+import com.google.common.base.Preconditions;
+
+public class LlapYarnRegistryImpl implements ServiceRegistry {
+
+  private static final Logger LOG = Logger.getLogger(LlapYarnRegistryImpl.class);
+
+  private RegistryOperationsService client;
+  private String instanceName;
+  private Configuration conf;
+  private ServiceRecordMarshal encoder;
+
+  private final DynamicServiceInstanceSet instances = new DynamicServiceInstanceSet();
+
+  private static final UUID uniq = UUID.randomUUID();
+  private static final String hostname;
+
+  private static final String UNIQUE_IDENTIFIER = "llap.unique.id";
+
+  private final static String SERVICE_CLASS = "org-apache-hive";
+
+  final ScheduledExecutorService refresher = Executors.newScheduledThreadPool(1);
+  final long refreshDelay;
+
+  static {
+    String localhost = "localhost";
+    try {
+      localhost = InetAddress.getLocalHost().getCanonicalHostName();
+    } catch (UnknownHostException uhe) {
+      // ignore
+    }
+    hostname = localhost;
+  }
+
+  public LlapYarnRegistryImpl(String instanceName, Configuration conf) {
+
+    LOG.info("Llap Registry is enabled with registryid: " + instanceName);
+    this.conf = new Configuration(conf);
+    this.instanceName = instanceName;
+    conf.addResource(YarnConfiguration.YARN_SITE_CONFIGURATION_FILE);
+    // registry reference
+    client = (RegistryOperationsService) RegistryOperationsFactory.createInstance(conf);
+    encoder = new RegistryUtils.ServiceRecordMarshal();
+    refreshDelay =
+        conf.getInt(LlapConfiguration.LLAP_DAEMON_SERVICE_REFRESH_INTERVAL,
+            LlapConfiguration.LLAP_DAEMON_SERVICE_REFRESH_INTERVAL_DEFAULT);
+    Preconditions.checkArgument(refreshDelay > 0,
+        "Refresh delay for registry has to be positive = %d", refreshDelay);
+  }
+
+  public Endpoint getRpcEndpoint() {
+    final int rpcPort =
+        conf.getInt(LlapConfiguration.LLAP_DAEMON_RPC_PORT,
+            LlapConfiguration.LLAP_DAEMON_RPC_PORT_DEFAULT);
+    return RegistryTypeUtils.ipcEndpoint("llap", new InetSocketAddress(hostname, rpcPort));
+  }
+
+  public Endpoint getShuffleEndpoint() {
+    final int shufflePort =
+        conf.getInt(LlapConfiguration.LLAP_DAEMON_YARN_SHUFFLE_PORT,
+            LlapConfiguration.LLAP_DAEMON_YARN_SHUFFLE_PORT_DEFAULT);
+    // HTTP today, but might not be
+    return RegistryTypeUtils.inetAddrEndpoint("shuffle", ProtocolTypes.PROTOCOL_TCP, hostname,
+        shufflePort);
+  }
+
+  private final String getPath() {
+    return RegistryPathUtils.join(RegistryUtils.componentPath(RegistryUtils.currentUser(),
+        SERVICE_CLASS, instanceName, "workers"), "worker-");
+  }
+
+  @Override
+  public void register() throws IOException {
+    String path = getPath();
+    ServiceRecord srv = new ServiceRecord();
+    srv.addInternalEndpoint(getRpcEndpoint());
+    srv.addInternalEndpoint(getShuffleEndpoint());
+
+    for (Map.Entry<String, String> kv : this.conf) {
+      if (kv.getKey().startsWith(LlapConfiguration.LLAP_DAEMON_PREFIX)
+          || kv.getKey().startsWith("hive.llap.")) {
+        // TODO: read this somewhere useful, like the task scheduler
+        srv.set(kv.getKey(), kv.getValue());
+      }
+    }
+
+    // restart sensitive instance id
+    srv.set(UNIQUE_IDENTIFIER, uniq.toString());
+
+    client.mknode(RegistryPathUtils.parentOf(path), true);
+
+    // FIXME: YARN registry needs to expose Ephemeral_Seq nodes & return the paths
+    client.zkCreate(path, CreateMode.EPHEMERAL_SEQUENTIAL, encoder.toBytes(srv),
+        client.getClientAcls());
+  }
+
+  @Override
+  public void unregister() throws IOException {
+   // Nothing for the zkCreate models
+  }
+
+  private class DynamicServiceInstance implements ServiceInstance {
+
+    private final ServiceRecord srv;
+    private boolean alive = true;
+    private final String host;
+    private final int rpcPort;
+    private final int shufflePort;
+
+    public DynamicServiceInstance(ServiceRecord srv) throws IOException {
+      this.srv = srv;
+
+      final Endpoint shuffle = srv.getInternalEndpoint("shuffle");
+      final Endpoint rpc = srv.getInternalEndpoint("llap");
+
+      this.host =
+          RegistryTypeUtils.getAddressField(rpc.addresses.get(0),
+              AddressTypes.ADDRESS_HOSTNAME_FIELD);
+      this.rpcPort =
+          Integer.valueOf(RegistryTypeUtils.getAddressField(rpc.addresses.get(0),
+              AddressTypes.ADDRESS_PORT_FIELD));
+      this.shufflePort =
+          Integer.valueOf(RegistryTypeUtils.getAddressField(shuffle.addresses.get(0),
+              AddressTypes.ADDRESS_PORT_FIELD));
+    }
+
+    @Override
+    public String getWorkerIdentity() {
+      return srv.get(UNIQUE_IDENTIFIER);
+    }
+
+    @Override
+    public String getHost() {
+      return host;
+    }
+
+    @Override
+    public int getRpcPort() {
+      return rpcPort;
+    }
+
+    @Override
+    public int getShufflePort() {
+      return shufflePort;
+    }
+
+    @Override
+    public boolean isAlive() {
+      return alive ;
+    }
+
+    public void kill() {
+      LOG.info("Killing " + this);
+      this.alive = false;
+    }
+
+    @Override
+    public Map<String, String> getProperties() {
+      return srv.attributes();
+    }
+
+    @Override
+    public Resource getResource() {
+      int memory = Integer.valueOf(srv.get(LlapConfiguration.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB));
+      int vCores = Integer.valueOf(srv.get(LlapConfiguration.LLAP_DAEMON_NUM_EXECUTORS));
+      return Resource.newInstance(memory, vCores);
+    }
+
+    @Override
+    public String toString() {
+      return "DynamicServiceInstance [alive=" + alive + ", host=" + host + ":" + rpcPort + "]";
+    }
+  }
+
+  private class DynamicServiceInstanceSet implements ServiceInstanceSet {
+
+    Map<String, ServiceInstance> instances;
+
+    @Override
+    public Map<String, ServiceInstance> getAll() {
+      return instances;
+    }
+
+    @Override
+    public ServiceInstance getInstance(String name) {
+      return instances.get(name);
+    }
+
+    @Override
+    public synchronized void refresh() throws IOException {
+      /* call this from wherever */
+      Map<String, ServiceInstance> freshInstances = new HashMap<String, ServiceInstance>();
+
+      String path = getPath();
+      Map<String, ServiceRecord> records =
+          RegistryUtils.listServiceRecords(client, RegistryPathUtils.parentOf(path));
+      Set<String> latestKeys = new HashSet<String>();
+      LOG.info("Starting to refresh ServiceInstanceSet " + System.identityHashCode(this));
+      for (ServiceRecord rec : records.values()) {
+        ServiceInstance instance = new DynamicServiceInstance(rec);
+        if (instance != null) {
+          if (instances != null && instances.containsKey(instance.getWorkerIdentity()) == false) {
+            // add a new object
+            freshInstances.put(instance.getWorkerIdentity(), instance);
+            if (LOG.isInfoEnabled()) {
+              LOG.info("Adding new worker " + instance.getWorkerIdentity() + " which mapped to "
+                  + instance);
+            }
+          } else if (LOG.isDebugEnabled()) {
+            LOG.debug("Retaining running worker " + instance.getWorkerIdentity() + " which mapped to " + instance);
+          }
+        }
+        latestKeys.add(instance.getWorkerIdentity());
+      }
+
+      if (instances != null) {
+        // deep-copy before modifying
+        Set<String> oldKeys = new HashSet(instances.keySet());
+        if (oldKeys.removeAll(latestKeys)) {
+          for (String k : oldKeys) {
+            // this is so that people can hold onto ServiceInstance references as placeholders for tasks
+            final DynamicServiceInstance dead = (DynamicServiceInstance) instances.get(k);
+            dead.kill();
+            if (LOG.isInfoEnabled()) {
+              LOG.info("Deleting dead worker " + k + " which mapped to " + dead);
+            }
+          }
+        }
+        this.instances.keySet().removeAll(oldKeys);
+        this.instances.putAll(freshInstances);
+      } else {
+        this.instances = freshInstances;
+      }
+    }
+
+    @Override
+    public Set<ServiceInstance> getByHost(String host) {
+      Set<ServiceInstance> byHost = new HashSet<ServiceInstance>();
+      for (ServiceInstance i : instances.values()) {
+        if (host.equals(i.getHost())) {
+          // all hosts in instances should be alive in this impl
+          byHost.add(i);
+        }
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Locality comparing " + host + " to " + i.getHost());
+        }
+      }
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Returning " + byHost.size() + " hosts for locality allocation on " + host);
+      }
+      return byHost;
+    }
+  }
+
+  @Override
+  public ServiceInstanceSet getInstances(String component) throws IOException {
+    Preconditions.checkArgument("LLAP".equals(component)); // right now there is only 1 component 
+    if (this.client != null) {
+      instances.refresh();
+      return instances;
+    } else {
+      Preconditions.checkNotNull(this.client, "Yarn registry client is not intialized");
+      return null;
+    }
+  }
+
+  @Override
+  public void start() {
+    if (client != null) {
+      client.start();
+      refresher.scheduleWithFixedDelay(new Runnable() {
+        @Override
+        public void run() {
+          try {
+            instances.refresh();
+          } catch (IOException ioe) {
+            LOG.warn("Could not refresh hosts during scheduled refresh", ioe);
+          }
+        }
+      }, 0, refreshDelay, TimeUnit.SECONDS);
+    }
+  }
+
+  @Override
+  public void stop() {
+    if (client != null) {
+      client.stop();
+    }
+  }
+}

Modified: hive/branches/llap/llap-server/src/java/org/apache/tez/dag/app/rm/LlapTaskSchedulerService.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/tez/dag/app/rm/LlapTaskSchedulerService.java?rev=1673556&r1=1673555&r2=1673556&view=diff
==============================================================================
--- hive/branches/llap/llap-server/src/java/org/apache/tez/dag/app/rm/LlapTaskSchedulerService.java (original)
+++ hive/branches/llap/llap-server/src/java/org/apache/tez/dag/app/rm/LlapTaskSchedulerService.java Tue Apr 14 21:06:58 2015
@@ -15,16 +15,12 @@
 package org.apache.tez.dag.app.rm;
 
 import java.io.IOException;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.UnknownHostException;
 import java.util.Arrays;
 import java.util.Comparator;
 import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
-import java.util.LinkedHashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
@@ -42,24 +38,16 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import com.google.common.primitives.Ints;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.ListeningExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.registry.client.binding.RegistryTypeUtils;
-import org.apache.hadoop.registry.client.types.AddressTypes;
-import org.apache.hadoop.registry.client.types.Endpoint;
-import org.apache.hadoop.registry.client.types.ServiceRecord;
+import org.apache.hadoop.hive.llap.configuration.LlapConfiguration;
+import org.apache.hadoop.hive.llap.daemon.registry.ServiceInstance;
+import org.apache.hadoop.hive.llap.daemon.registry.ServiceInstanceSet;
+import org.apache.hadoop.hive.llap.daemon.registry.impl.LlapRegistryService;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.Container;
@@ -67,12 +55,16 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.hive.llap.configuration.LlapConfiguration;
-import org.apache.hadoop.hive.llap.daemon.registry.impl.LlapRegistryService;
 import org.apache.hadoop.yarn.util.Clock;
 import org.apache.tez.dag.api.TaskAttemptEndReason;
 import org.apache.tez.dag.app.AppContext;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
 
 public class LlapTaskSchedulerService extends TaskSchedulerService {
 
@@ -83,59 +75,55 @@ public class LlapTaskSchedulerService ex
   private final ExecutorService appCallbackExecutor;
   private final TaskSchedulerAppCallback appClientDelegate;
 
-  // Set of active hosts
+  // interface into the registry service
+  private ServiceInstanceSet activeInstances;
+
   @VisibleForTesting
-  final LinkedHashMap<String, NodeInfo> activeHosts = new LinkedHashMap<>();
-  // Populated each time activeHosts is modified
+  final Map<ServiceInstance, NodeInfo> instanceToNodeMap = new HashMap<>();
+  
   @VisibleForTesting
-  String []activeHostList;
+  final Set<ServiceInstance> instanceBlackList = new HashSet<ServiceInstance>();
 
-  // Set of all hosts in the system.
   @VisibleForTesting
-  final ConcurrentMap<String, NodeInfo> allHosts = new ConcurrentHashMap<>();
-
   // Tracks currently allocated containers.
-  private final Map<ContainerId, String> containerToHostMap = new HashMap<>();
+  final Map<ContainerId, String> containerToInstanceMap = new HashMap<>();
 
   // Tracks tasks which could not be allocated immediately.
   @VisibleForTesting
-  final TreeMap<Priority, List<TaskInfo>> pendingTasks =
-      new TreeMap<>(new Comparator<Priority>() {
-        @Override
-        public int compare(Priority o1, Priority o2) {
-          return o1.getPriority() - o2.getPriority();
-        }
-      });
+  final TreeMap<Priority, List<TaskInfo>> pendingTasks = new TreeMap<>(new Comparator<Priority>() {
+    @Override
+    public int compare(Priority o1, Priority o2) {
+      return o1.getPriority() - o2.getPriority();
+    }
+  });
 
   // Tracks running and queued tasks. Cleared after a task completes.
-  private final ConcurrentMap<Object, TaskInfo> knownTasks =
-      new ConcurrentHashMap<>();
+  private final ConcurrentMap<Object, TaskInfo> knownTasks = new ConcurrentHashMap<>();
 
   @VisibleForTesting
   final DelayQueue<NodeInfo> disabledNodes = new DelayQueue<>();
 
   private final ContainerFactory containerFactory;
   private final Random random = new Random();
-  private final int containerPort;
   private final Clock clock;
   private final ListeningExecutorService executor;
   private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
   private final ReentrantReadWriteLock.ReadLock readLock = lock.readLock();
   private final ReentrantReadWriteLock.WriteLock writeLock = lock.writeLock();
 
-
-  // TODO Track resources used by this application on specific hosts, and make scheduling decisions accordingly.
+  // TODO Track resources used by this application on specific hosts, and make scheduling decisions
+  // accordingly.
   // Ideally implement in a way where updates from ZK, if they do come, can just be plugged in.
   // A heap based on available capacity - which is updated each time stats are updated,
   // or anytime assignment numbers are changed. Especially for random allocations (no host request).
-  // For non-random allocations - Walk through all pending tasks to get local assignments, then start assigning them to non local hosts.
+  // For non-random allocations - Walk through all pending tasks to get local assignments, then
+  // start assigning them to non local hosts.
   // Also setup a max over-subscribe limit as part of this.
 
   private final AtomicBoolean isStopped = new AtomicBoolean(false);
 
   private final long nodeReEnableTimeout;
 
-
   // Per daemon
   private final int memoryPerInstance;
   private final int coresPerInstance;
@@ -144,9 +132,9 @@ public class LlapTaskSchedulerService ex
   // Per Executor Thread
   private final Resource resourcePerExecutor;
 
-  private final boolean initFromRegistry;
   private final LlapRegistryService registry = new LlapRegistryService();
-  private final PendingTaskSchedulerCallable pendingTaskSchedulerCallable = new PendingTaskSchedulerCallable();
+  private final PendingTaskSchedulerCallable pendingTaskSchedulerCallable =
+      new PendingTaskSchedulerCallable();
   private ListenableFuture<Void> pendingTaskSchedulerFuture;
 
   @VisibleForTesting
@@ -156,14 +144,9 @@ public class LlapTaskSchedulerService ex
   @VisibleForTesting
   StatsPerDag dagStats = new StatsPerDag();
 
-
-
-
-
   public LlapTaskSchedulerService(TaskSchedulerAppCallback appClient, AppContext appContext,
-                                    String clientHostname, int clientPort, String trackingUrl,
-                                    long customAppIdIdentifier,
-                                    Configuration conf) {
+      String clientHostname, int clientPort, String trackingUrl, long customAppIdIdentifier,
+      Configuration conf) {
     // Accepting configuration here to allow setting up fields as final
 
     super(LlapTaskSchedulerService.class.getName());
@@ -171,17 +154,18 @@ public class LlapTaskSchedulerService ex
     this.appClientDelegate = createAppCallbackDelegate(appClient);
     this.clock = appContext.getClock();
     this.containerFactory = new ContainerFactory(appContext, customAppIdIdentifier);
-    this.memoryPerInstance = conf
-        .getInt(LlapConfiguration.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB,
+    this.memoryPerInstance =
+        conf.getInt(LlapConfiguration.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB,
             LlapConfiguration.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB_DEFAULT);
-    this.coresPerInstance = conf
-        .getInt(LlapConfiguration.LLAP_DAEMON_VCPUS_PER_INSTANCE,
+    this.coresPerInstance =
+        conf.getInt(LlapConfiguration.LLAP_DAEMON_VCPUS_PER_INSTANCE,
             LlapConfiguration.LLAP_DAEMON_VCPUS_PER_INSTANCE_DEFAULT);
-    this.executorsPerInstance = conf.getInt(LlapConfiguration.LLAP_DAEMON_NUM_EXECUTORS,
-        LlapConfiguration.LLAP_DAEMON_NUM_EXECUTORS_DEFAULT);
-    this.nodeReEnableTimeout = conf.getLong(
-        LlapConfiguration.LLAP_DAEMON_TASK_SCHEDULER_NODE_REENABLE_TIMEOUT_MILLIS,
-        LlapConfiguration.LLAP_DAEMON_TASK_SCHEDULER_NODE_REENABLE_TIMEOUT_MILLIS_DEFAULT);
+    this.executorsPerInstance =
+        conf.getInt(LlapConfiguration.LLAP_DAEMON_NUM_EXECUTORS,
+            LlapConfiguration.LLAP_DAEMON_NUM_EXECUTORS_DEFAULT);
+    this.nodeReEnableTimeout =
+        conf.getLong(LlapConfiguration.LLAP_DAEMON_TASK_SCHEDULER_NODE_REENABLE_TIMEOUT_MILLIS,
+            LlapConfiguration.LLAP_DAEMON_TASK_SCHEDULER_NODE_REENABLE_TIMEOUT_MILLIS_DEFAULT);
 
     int memoryPerExecutor = (int) (memoryPerInstance / (float) executorsPerInstance);
     int coresPerExecutor = (int) (coresPerInstance / (float) executorsPerInstance);
@@ -189,121 +173,39 @@ public class LlapTaskSchedulerService ex
 
     String instanceId = conf.getTrimmed(LlapConfiguration.LLAP_DAEMON_SERVICE_HOSTS);
 
-    Preconditions.checkNotNull(instanceId,
-        LlapConfiguration.LLAP_DAEMON_SERVICE_HOSTS + " must be defined");
+    Preconditions.checkNotNull(instanceId, LlapConfiguration.LLAP_DAEMON_SERVICE_HOSTS
+        + " must be defined");
 
-    if (!instanceId.startsWith("@")) { // Manual setup. Not via the service registry
-      initFromRegistry = false;
-      String[] hosts = conf.getTrimmedStrings(LlapConfiguration.LLAP_DAEMON_SERVICE_HOSTS);
-      Preconditions.checkState(hosts != null && hosts.length != 0,
-          LlapConfiguration.LLAP_DAEMON_SERVICE_HOSTS + "must be defined");
-      for (String host : hosts) {
-        // If reading addresses from conf, try resolving local addresses so that
-        // this matches with the address reported by daemons.
-        InetAddress inetAddress = null;
-        try {
-          inetAddress = InetAddress.getByName(host);
-          if (NetUtils.isLocalAddress(inetAddress)) {
-            InetSocketAddress socketAddress = new InetSocketAddress(0);
-            socketAddress = NetUtils.getConnectAddress(socketAddress);
-            LOG.info("Adding host identified as local: " + host + " as " + socketAddress.getHostName());
-            host = socketAddress.getHostName();
-          }
-        } catch (UnknownHostException e) {
-          LOG.warn("Ignoring resolution issues for host: " + host, e);
-        }
-        NodeInfo nodeInfo = new NodeInfo(host, BACKOFF_FACTOR, clock);
-        activeHosts.put(host, nodeInfo);
-        allHosts.put(host, nodeInfo);
-      }
-      activeHostList = activeHosts.keySet().toArray(new String[activeHosts.size()]);
-    } else {
-      initFromRegistry = true;
-    }
-
-    this.containerPort = conf.getInt(LlapConfiguration.LLAP_DAEMON_RPC_PORT,
-        LlapConfiguration.LLAP_DAEMON_RPC_PORT_DEFAULT);
-    ExecutorService executorService = Executors.newFixedThreadPool(1,
-        new ThreadFactoryBuilder().setDaemon(true).setNameFormat("LlapScheduler").build());
+    ExecutorService executorService =
+        Executors.newFixedThreadPool(1,
+            new ThreadFactoryBuilder().setDaemon(true).setNameFormat("LlapScheduler").build());
     executor = MoreExecutors.listeningDecorator(executorService);
 
-    if (activeHosts.size() > 0) {
-      LOG.info("Running with configuration: " +
-          "memoryPerInstance=" + memoryPerInstance +
-          ", vCoresPerInstance=" + coresPerInstance +
-          ", executorsPerInstance=" + executorsPerInstance +
-          ", resourcePerInstanceInferred=" + resourcePerExecutor +
-          ", hosts=" + allHosts.keySet() +
-          ", rpcPort=" + containerPort +
-          ", nodeReEnableTimeout=" + nodeReEnableTimeout +
-          ", nodeReEnableBackOffFactor=" + BACKOFF_FACTOR);
-    } else {
-      LOG.info("Running with configuration: " +
-          "memoryPerInstance=" + memoryPerInstance +
-          ", vCoresPerInstance=" + coresPerInstance +
-          ", executorsPerInstance=" + executorsPerInstance +
-          ", resourcePerInstanceInferred=" + resourcePerExecutor +
-          ", hosts=<pending>" +
-          ", rpcPort=<pending>" +
-          ", nodeReEnableTimeout=" + nodeReEnableTimeout +
-          ", nodeReEnableBackOffFactor=" + BACKOFF_FACTOR);
-    }
-
+    LOG.info("Running with configuration: " + "memoryPerInstance=" + memoryPerInstance
+        + ", vCoresPerInstance=" + coresPerInstance + ", executorsPerInstance="
+        + executorsPerInstance + ", resourcePerInstanceInferred=" + resourcePerExecutor
+        + ", nodeReEnableTimeout=" + nodeReEnableTimeout + ", nodeReEnableBackOffFactor="
+        + BACKOFF_FACTOR);
   }
 
   @Override
   public void serviceInit(Configuration conf) {
-    if (initFromRegistry) {
-      registry.init(conf);
-    }
+    registry.init(conf);
   }
 
-
   @Override
   public void serviceStart() throws IOException {
-
     writeLock.lock();
     try {
       pendingTaskSchedulerFuture = executor.submit(pendingTaskSchedulerCallable);
-      if (initFromRegistry) {
-        registry.start();
-        if (activeHosts.size() > 0) {
-          return;
-        }
-        LOG.info("Reading YARN registry for service records");
-
-        Map<String, ServiceRecord> workers = registry.getWorkers();
-        for (ServiceRecord srv : workers.values()) {
-          Endpoint rpc = srv.getInternalEndpoint("llap");
-          if (rpc != null) {
-            LOG.info("Examining endpoint: " + rpc);
-            final String host =
-                RegistryTypeUtils.getAddressField(rpc.addresses.get(0),
-                    AddressTypes.ADDRESS_HOSTNAME_FIELD);
-            NodeInfo nodeInfo = new NodeInfo(host, BACKOFF_FACTOR, clock);
-            activeHosts.put(host, nodeInfo);
-            allHosts.put(host, nodeInfo);
-          } else {
-
-            LOG.info("The SRV record was " + srv);
-          }
-        }
-        activeHostList = activeHosts.keySet().toArray(new String[activeHosts.size()]);
-
-
-
-        LOG.info("Re-inited with configuration: " +
-            "memoryPerInstance=" + memoryPerInstance +
-            ", vCoresPerInstance=" + coresPerInstance +
-            ", executorsPerInstance=" + executorsPerInstance +
-            ", resourcePerInstanceInferred=" + resourcePerExecutor +
-            ", hosts=" + allHosts.keySet());
-
+      registry.start();
+      activeInstances = registry.getInstances();
+      for (ServiceInstance inst : activeInstances.getAll().values()) {
+        addNode(inst, new NodeInfo(inst, BACKOFF_FACTOR, clock));
       }
     } finally {
       writeLock.unlock();
     }
-
   }
 
   @Override
@@ -316,7 +218,7 @@ public class LlapTaskSchedulerService ex
           pendingTaskSchedulerFuture.cancel(true);
         }
         executor.shutdownNow();
-        if (initFromRegistry) {
+        if (registry != null) {
           registry.stop();
         }
         appCallbackExecutor.shutdownNow();
@@ -327,18 +229,68 @@ public class LlapTaskSchedulerService ex
   }
 
   @Override
+  public Resource getTotalResources() {
+    int memory = 0;
+    int vcores = 0;
+    readLock.lock();
+    try {
+      for (ServiceInstance inst : activeInstances.getAll().values()) {
+        if (inst.isAlive()) {
+          Resource r = inst.getResource();
+          LOG.info("Found instance " + inst + " with " + r);
+          memory += r.getMemory();
+          vcores += r.getVirtualCores();
+        } else {
+          LOG.info("Ignoring dead instance " + inst);
+        }
+      }
+    } finally {
+      readLock.unlock();
+    }
+
+    return Resource.newInstance(memory, vcores);
+  }
+
+  /**
+   * The difference between this and getTotalResources() is that this only gives currently free
+   * resource instances, while the other lists all the instances that may become available in a
+   * while.
+   */
+  @Override
   public Resource getAvailableResources() {
-    // TODO This needs information about all running executors, and the amount of memory etc available across the cluster.
-    // No lock required until this moves to using something other than allHosts
-    return Resource
-        .newInstance(Ints.checkedCast(allHosts.size() * memoryPerInstance),
-            allHosts.size() * coresPerInstance);
+    // need a state store eventually for current state & measure backoffs
+    int memory = 0;
+    int vcores = 0;
+    readLock.lock();
+    try {
+      for (ServiceInstance inst : instanceToNodeMap.keySet()) {
+        if (inst.isAlive()) {
+          Resource r = inst.getResource();
+          memory += r.getMemory();
+          vcores += r.getVirtualCores();
+        }
+      }
+    } finally {
+      readLock.unlock();
+    }
+
+    return Resource.newInstance(memory, vcores);
   }
 
   @Override
   public int getClusterNodeCount() {
-    // No lock required until this moves to using something other than allHosts
-    return allHosts.size();
+    readLock.lock();
+    try {
+      int n = 0;
+      for (ServiceInstance inst : activeInstances.getAll().values()) {
+        if (inst.isAlive()) {
+          n++;
+        }
+      }
+      return n;
+    } finally {
+      readLock.unlock();
+    }
   }
 
   @Override
@@ -350,14 +302,6 @@ public class LlapTaskSchedulerService ex
   }
 
   @Override
-  public Resource getTotalResources() {
-    // No lock required until this moves to using something other than allHosts
-    return Resource
-        .newInstance(Ints.checkedCast(allHosts.size() * memoryPerInstance),
-            allHosts.size() * coresPerInstance);
-  }
-
-  @Override
   public void blacklistNode(NodeId nodeId) {
     LOG.info("DEBUG: BlacklistNode not supported");
   }
@@ -369,8 +313,9 @@ public class LlapTaskSchedulerService ex
 
   @Override
   public void allocateTask(Object task, Resource capability, String[] hosts, String[] racks,
-                           Priority priority, Object containerSignature, Object clientCookie) {
-    TaskInfo taskInfo = new TaskInfo(task, clientCookie, priority, capability, hosts, racks, clock.getTime());
+      Priority priority, Object containerSignature, Object clientCookie) {
+    TaskInfo taskInfo =
+        new TaskInfo(task, clientCookie, priority, capability, hosts, racks, clock.getTime());
     writeLock.lock();
     try {
       dagStats.registerTaskRequest(hosts, racks);
@@ -383,13 +328,13 @@ public class LlapTaskSchedulerService ex
     }
   }
 
-
   @Override
   public void allocateTask(Object task, Resource capability, ContainerId containerId,
-                           Priority priority, Object containerSignature, Object clientCookie) {
+      Priority priority, Object containerSignature, Object clientCookie) {
     // Container affinity can be implemented as Host affinity for LLAP. Not required until
     // 1:1 edges are used in Hive.
-    TaskInfo taskInfo = new TaskInfo(task, clientCookie, priority, capability, null, null, clock.getTime());
+    TaskInfo taskInfo =
+        new TaskInfo(task, clientCookie, priority, capability, null, null, clock.getTime());
     writeLock.lock();
     try {
       dagStats.registerTaskRequest(null, null);
@@ -411,43 +356,49 @@ public class LlapTaskSchedulerService ex
     try {
       taskInfo = knownTasks.remove(task);
       if (taskInfo == null) {
-        LOG.error("Could not determine ContainerId for task: " + task +
-            " . Could have hit a race condition. Ignoring." +
-            " The query may hang since this \"unknown\" container is now taking up a slot permanently");
+        LOG.error("Could not determine ContainerId for task: "
+            + task
+            + " . Could have hit a race condition. Ignoring."
+            + " The query may hang since this \"unknown\" container is now taking up a slot permanently");
         return false;
       }
       if (taskInfo.containerId == null) {
         if (taskInfo.assigned) {
-          LOG.error(
-              "Task: " + task + " assigned, but could not find the corresponding containerId." +
-                  " The query may hang since this \"unknown\" container is now taking up a slot permanently");
+          LOG.error("Task: "
+              + task
+              + " assigned, but could not find the corresponding containerId."
+              + " The query may hang since this \"unknown\" container is now taking up a slot permanently");
         } else {
-          LOG.info("Ignoring deallocate request for task " + task +
-              " which hasn't been assigned to a container");
+          LOG.info("Ignoring deallocate request for task " + task
+              + " which hasn't been assigned to a container");
           removePendingTask(taskInfo);
         }
         return false;
       }
-      String hostForContainer = containerToHostMap.remove(taskInfo.containerId);
+      String hostForContainer = containerToInstanceMap.remove(taskInfo.containerId);
       assert hostForContainer != null;
-      String assignedHost = taskInfo.assignedHost;
-      assert assignedHost != null;
+      ServiceInstance assignedInstance = taskInfo.assignedInstance;
+      assert assignedInstance != null;
 
       if (taskSucceeded) {
-        // The node may have been blacklisted at this point - which means it may not be in the activeNodeList.
-        NodeInfo nodeInfo = allHosts.get(assignedHost);
+        // The node may have been blacklisted at this point - which means it may not be in the
+        // activeNodeList.
+        NodeInfo nodeInfo = instanceToNodeMap.get(assignedInstance);
         assert nodeInfo != null;
         nodeInfo.registerTaskSuccess();
-        // TODO Consider un-blacklisting the node since at least 1 slot should have become available on the node.
-      } else if (!taskSucceeded && endReason != null && EnumSet
-          .of(TaskAttemptEndReason.SERVICE_BUSY, TaskAttemptEndReason.COMMUNICATION_ERROR)
-          .contains(endReason)) {
+        // TODO Consider un-blacklisting the node since at least 1 slot should have become available
+        // on the node.
+      } else if (!taskSucceeded
+          && endReason != null
+          && EnumSet
+              .of(TaskAttemptEndReason.SERVICE_BUSY, TaskAttemptEndReason.COMMUNICATION_ERROR)
+              .contains(endReason)) {
         if (endReason == TaskAttemptEndReason.COMMUNICATION_ERROR) {
-          dagStats.registerCommFailure(taskInfo.assignedHost);
+          dagStats.registerCommFailure(taskInfo.assignedInstance.getHost());
         } else if (endReason == TaskAttemptEndReason.SERVICE_BUSY) {
-          dagStats.registerTaskRejected(taskInfo.assignedHost);
+          dagStats.registerTaskRejected(taskInfo.assignedInstance.getHost());
         }
-        disableNode(assignedHost);
+        disableInstance(assignedInstance, endReason == TaskAttemptEndReason.SERVICE_BUSY);
       }
     } finally {
       writeLock.unlock();
@@ -479,87 +430,137 @@ public class LlapTaskSchedulerService ex
   }
 
   @VisibleForTesting
-  TaskSchedulerAppCallback createAppCallbackDelegate(
-      TaskSchedulerAppCallback realAppClient) {
-    return new TaskSchedulerAppCallbackWrapper(realAppClient,
-        appCallbackExecutor);
+  TaskSchedulerAppCallback createAppCallbackDelegate(TaskSchedulerAppCallback realAppClient) {
+    return new TaskSchedulerAppCallbackWrapper(realAppClient, appCallbackExecutor);
   }
 
   /**
    * @param requestedHosts the list of preferred hosts. null implies any host
    * @return
    */
-  private String selectHost(String[] requestedHosts) {
-    // TODO Change this to work off of what we think is remaining capacity for a host
-
+  private ServiceInstance selectHost(TaskInfo request) {
+    String[] requestedHosts = request.requestedHosts;
     readLock.lock(); // Read-lock. Not updating any stats at the moment.
     try {
-      // Check if any hosts are active. If there's any active host, an allocation will happen.
-      if (activeHosts.size() == 0) {
+      // Check if any hosts are active.
+      if (getAvailableResources().getMemory() <= 0) {
+        refreshInstances();
+      }
+
+      // If there's no memory available, fail
+      if (getTotalResources().getMemory() <= 0) {
         return null;
       }
 
-      String host = null;
-      if (requestedHosts != null && requestedHosts.length > 0) {
-        // Pick the first host always. Weak attempt at cache affinity.
-        host = requestedHosts[0];
-        if (activeHosts.get(host) != null) {
-          LOG.info("Selected host: " + host + " from requested hosts: " +
-              Arrays.toString(requestedHosts));
-        } else {
-          LOG.info("Preferred host: " + host + " not present. Attempting to select another one");
-          host = null;
-          for (String h : requestedHosts) {
-            if (activeHosts.get(h) != null) {
-              host = h;
-              break;
+      if (requestedHosts != null) {
+        for (String host : requestedHosts) {
+          // Pick the first host always. Weak attempt at cache affinity.
+          Set<ServiceInstance> instances = activeInstances.getByHost(host);
+          if (!instances.isEmpty()) {
+            for (ServiceInstance inst : instances) {
+              if (inst.isAlive() && instanceToNodeMap.containsKey(inst)) {
+                // only allocate from the "available" list
+                // TODO Change this to work off of what we think is remaining capacity for an
+                // instance
+                LOG.info("Assigning " + inst + " when looking for " + host);
+                return inst;
+              }
             }
           }
-          if (host == null) {
-            host = activeHostList[random.nextInt(activeHostList.length)];
-            LOG.info("Requested hosts: " + Arrays.toString(requestedHosts) +
-                " not present. Randomizing the host");
+        }
+      }
+      /* fall through - miss in locality (random scheduling) */
+      ServiceInstance[] all = instanceToNodeMap.keySet().toArray(new ServiceInstance[0]);
+      // Check again
+      if (all.length > 0) {
+        int n = random.nextInt(all.length);
+        // start at random offset and iterate whole list
+        for (int i = 0; i < all.length; i++) {
+          ServiceInstance inst = all[(i + n) % all.length];
+          if (inst.isAlive()) {
+            LOG.info("Assigning " + inst + " when looking for any host");
+            return inst;
           }
         }
-      } else {
-        host = activeHostList[random.nextInt(activeHostList.length)];
-        LOG.info("Selected random host: " + host + " since the request contained no host information");
       }
-      return host;
     } finally {
       readLock.unlock();
     }
+
+    /* check again whether nodes are disabled or just missing */
+    writeLock.lock();
+    try {
+      for (ServiceInstance inst : activeInstances.getAll().values()) {
+        if (inst.isAlive() && instanceBlackList.contains(inst) == false
+            && instanceToNodeMap.containsKey(inst) == false) {
+          /* that's a good node, not added to the allocations yet */
+          addNode(inst, new NodeInfo(inst, BACKOFF_FACTOR, clock));
+          // mark it as disabled to let the pending tasks go there
+          disableInstance(inst, true);
+        }
+      }
+      /* do not allocate nodes from this process, as then the pending tasks will get starved */
+    } finally {
+      writeLock.unlock();
+    }
+    return null;
+  }
+
+  private void refreshInstances() {
+    try {
+      activeInstances.refresh(); // handles its own sync
+    } catch (IOException ioe) {
+      LOG.warn("Could not refresh list of active instances", ioe);
+    }
   }
 
+  private void addNode(ServiceInstance inst, NodeInfo node) {
+    instanceToNodeMap.put(inst, node);
+  }
 
   private void reenableDisabledNode(NodeInfo nodeInfo) {
     writeLock.lock();
     try {
-      nodeInfo.enableNode();
-      activeHosts.put(nodeInfo.hostname, nodeInfo);
-      activeHostList = activeHosts.keySet().toArray(new String[activeHosts.size()]);
+      if (!nodeInfo.isBusy()) {
+        refreshInstances();
+      }
+      if (nodeInfo.host.isAlive()) {
+        nodeInfo.enableNode();
+        instanceBlackList.remove(nodeInfo.host);
+        instanceToNodeMap.put(nodeInfo.host, nodeInfo);
+      } else {
+        if (LOG.isInfoEnabled()) {
+          LOG.info("Removing dead node " + nodeInfo);
+        }
+      }
     } finally {
       writeLock.unlock();
     }
   }
 
-  private void disableNode(String hostname) {
+  private void disableInstance(ServiceInstance instance, boolean busy) {
     writeLock.lock();
     try {
-      NodeInfo nodeInfo = activeHosts.remove(hostname);
+      NodeInfo nodeInfo = instanceToNodeMap.remove(instance);
       if (nodeInfo == null) {
-        LOG.debug("Node: " + hostname + " already disabled, or invalid. Not doing anything.");
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Node: " + instance + " already disabled, or invalid. Not doing anything.");
+        }
       } else {
+        instanceBlackList.add(instance);
         nodeInfo.disableNode(nodeReEnableTimeout);
+        nodeInfo.setBusy(busy); // daemon failure vs daemon busy
+        // TODO: handle task to container map events in case of hard failures
         disabledNodes.add(nodeInfo);
+        if (LOG.isInfoEnabled()) {
+          LOG.info("Disabling instance " + instance + " for " + nodeReEnableTimeout + " seconds");
+        }
       }
-      activeHostList = activeHosts.keySet().toArray(new String[activeHosts.size()]);
     } finally {
       writeLock.unlock();
     }
   }
 
-
   private void addPendingTask(TaskInfo taskInfo) {
     writeLock.lock();
     try {
@@ -582,8 +583,8 @@ public class LlapTaskSchedulerService ex
       Priority priority = taskInfo.priority;
       List<TaskInfo> taskInfoList = pendingTasks.get(priority);
       if (taskInfoList == null || taskInfoList.isEmpty() || !taskInfoList.remove(taskInfo)) {
-        LOG.warn(
-            "Could not find task: " + taskInfo.task + " in pending list, at priority: " + priority);
+        LOG.warn("Could not find task: " + taskInfo.task + " in pending list, at priority: "
+            + priority);
       }
     } finally {
       writeLock.unlock();
@@ -593,7 +594,8 @@ public class LlapTaskSchedulerService ex
   private void schedulePendingTasks() {
     writeLock.lock();
     try {
-      Iterator<Entry<Priority, List<TaskInfo>>> pendingIterator =  pendingTasks.entrySet().iterator();
+      Iterator<Entry<Priority, List<TaskInfo>>> pendingIterator =
+          pendingTasks.entrySet().iterator();
       while (pendingIterator.hasNext()) {
         Entry<Priority, List<TaskInfo>> entry = pendingIterator.next();
         List<TaskInfo> taskListAtPriority = entry.getValue();
@@ -625,18 +627,20 @@ public class LlapTaskSchedulerService ex
   }
 
   private boolean scheduleTask(TaskInfo taskInfo) {
-    String host = selectHost(taskInfo.requestedHosts);
+    ServiceInstance host = selectHost(taskInfo);
     if (host == null) {
       return false;
     } else {
       Container container =
-          containerFactory.createContainer(resourcePerExecutor, taskInfo.priority, host, containerPort);
+          containerFactory.createContainer(resourcePerExecutor, taskInfo.priority, host.getHost(),
+              host.getRpcPort());
       writeLock.lock(); // While updating local structures
       try {
-        dagStats.registerTaskAllocated(taskInfo.requestedHosts, taskInfo.requestedRacks, host);
+        dagStats.registerTaskAllocated(taskInfo.requestedHosts, taskInfo.requestedRacks,
+            host.getHost());
         taskInfo.setAssignmentInfo(host, container.getId());
         knownTasks.putIfAbsent(taskInfo.task, taskInfo);
-        containerToHostMap.put(container.getId(), host);
+        containerToInstanceMap.put(container.getId(), host.getWorkerIdentity());
       } finally {
         writeLock.unlock();
       }
@@ -683,16 +687,17 @@ public class LlapTaskSchedulerService ex
   @VisibleForTesting
   static class NodeInfo implements Delayed {
     private final float constBackOffFactor;
-    final String hostname;
+    final ServiceInstance host;
     private final Clock clock;
 
     long expireTimeMillis = -1;
     private long numSuccessfulTasks = 0;
     private long numSuccessfulTasksAtLastBlacklist = -1;
     float cumulativeBackoffFactor = 1.0f;
+    private boolean busy;
 
-    NodeInfo(String hostname, float backoffFactor, Clock clock) {
-      this.hostname = hostname;
+    NodeInfo(ServiceInstance host, float backoffFactor, Clock clock) {
+      this.host = host;
       constBackOffFactor = backoffFactor;
       this.clock = clock;
     }
@@ -716,9 +721,18 @@ public class LlapTaskSchedulerService ex
     }
 
     void registerTaskSuccess() {
+      this.busy = false; // if a task exited, we might have free slots
       numSuccessfulTasks++;
     }
 
+    public void setBusy(boolean busy) {
+      this.busy = busy;
+    }
+
+    public boolean isBusy() {
+      return busy;
+    }
+
     @Override
     public long getDelay(TimeUnit unit) {
       return expireTimeMillis - clock.getTime();
@@ -738,18 +752,13 @@ public class LlapTaskSchedulerService ex
 
     @Override
     public String toString() {
-      return "NodeInfo{" +
-          "constBackOffFactor=" + constBackOffFactor +
-          ", hostname='" + hostname + '\'' +
-          ", expireTimeMillis=" + expireTimeMillis +
-          ", numSuccessfulTasks=" + numSuccessfulTasks +
-          ", numSuccessfulTasksAtLastBlacklist=" + numSuccessfulTasksAtLastBlacklist +
-          ", cumulativeBackoffFactor=" + cumulativeBackoffFactor +
-          '}';
+      return "NodeInfo{" + "constBackOffFactor=" + constBackOffFactor + ", host=" + host
+          + ", expireTimeMillis=" + expireTimeMillis + ", numSuccessfulTasks=" + numSuccessfulTasks
+          + ", numSuccessfulTasksAtLastBlacklist=" + numSuccessfulTasksAtLastBlacklist
+          + ", cumulativeBackoffFactor=" + cumulativeBackoffFactor + '}';
     }
   }
 
-
   @VisibleForTesting
   static class StatsPerDag {
     int numRequestedAllocations = 0;
@@ -775,12 +784,13 @@ public class LlapTaskSchedulerService ex
       sb.append("NumRejectedTasks=").append(numRejectedTasks).append(", ");
       sb.append("NumCommFailures=").append(numCommFailures).append(", ");
       sb.append("NumDelayedAllocations=").append(numDelayedAllocations).append(", ");
-      sb.append("LocalityBasedAllocationsPerHost=").append(localityBasedNumAllocationsPerHost).append(", ");
+      sb.append("LocalityBasedAllocationsPerHost=").append(localityBasedNumAllocationsPerHost)
+          .append(", ");
       sb.append("NumAllocationsPerHost=").append(numAllocationsPerHost);
       return sb.toString();
     }
 
-    void registerTaskRequest(String []requestedHosts, String[] requestedRacks) {
+    void registerTaskRequest(String[] requestedHosts, String[] requestedRacks) {
       numRequestedAllocations++;
       // TODO Change after HIVE-9987. For now, there's no rack matching.
       if (requestedHosts != null && requestedHosts.length != 0) {
@@ -790,7 +800,8 @@ public class LlapTaskSchedulerService ex
       }
     }
 
-    void registerTaskAllocated(String[] requestedHosts, String [] requestedRacks, String allocatedHost) {
+    void registerTaskAllocated(String[] requestedHosts, String[] requestedRacks,
+        String allocatedHost) {
       // TODO Change after HIVE-9987. For now, there's no rack matching.
       if (requestedHosts != null && requestedHosts.length != 0) {
         Set<String> requestedHostSet = new HashSet<>(Arrays.asList(requestedHosts));
@@ -837,12 +848,11 @@ public class LlapTaskSchedulerService ex
     final String[] requestedRacks;
     final long requestTime;
     ContainerId containerId;
-    String assignedHost;
+    ServiceInstance assignedInstance;
     private boolean assigned = false;
 
-    public TaskInfo(Object task, Object clientCookie,
-                    Priority priority, Resource capability, String[] hosts, String[] racks,
-                    long requestTime) {
+    public TaskInfo(Object task, Object clientCookie, Priority priority, Resource capability,
+        String[] hosts, String[] racks, long requestTime) {
       this.task = task;
       this.clientCookie = clientCookie;
       this.priority = priority;
@@ -852,8 +862,8 @@ public class LlapTaskSchedulerService ex
       this.requestTime = requestTime;
     }
 
-    void setAssignmentInfo(String host, ContainerId containerId) {
-      this.assignedHost = host;
+    void setAssignmentInfo(ServiceInstance instance, ContainerId containerId) {
+      this.assignedInstance = instance;
       this.containerId = containerId;
       assigned = true;
     }
@@ -861,27 +871,27 @@ public class LlapTaskSchedulerService ex
 
   static class ContainerFactory {
     final ApplicationAttemptId customAppAttemptId;
-    AtomicInteger nextId;
+    AtomicLong nextId;
 
     public ContainerFactory(AppContext appContext, long appIdLong) {
-      this.nextId = new AtomicInteger(1);
-      ApplicationId appId = ApplicationId
-          .newInstance(appIdLong, appContext.getApplicationAttemptId().getApplicationId().getId());
-      this.customAppAttemptId = ApplicationAttemptId
-          .newInstance(appId, appContext.getApplicationAttemptId().getAttemptId());
+      this.nextId = new AtomicLong(1);
+      ApplicationId appId =
+          ApplicationId.newInstance(appIdLong, appContext.getApplicationAttemptId()
+              .getApplicationId().getId());
+      this.customAppAttemptId =
+          ApplicationAttemptId.newInstance(appId, appContext.getApplicationAttemptId()
+              .getAttemptId());
     }
 
-    public Container createContainer(Resource capability, Priority priority, String hostname, int port) {
-      ContainerId containerId = ContainerId.newInstance(customAppAttemptId, nextId.getAndIncrement());
+    public Container createContainer(Resource capability, Priority priority, String hostname,
+        int port) {
+      ContainerId containerId =
+          ContainerId.newContainerId(customAppAttemptId, nextId.getAndIncrement());
       NodeId nodeId = NodeId.newInstance(hostname, port);
-      String nodeHttpAddress = "hostname:0";
+      String nodeHttpAddress = "hostname:0"; // TODO: include UI ports
 
-      Container container = Container.newInstance(containerId,
-          nodeId,
-          nodeHttpAddress,
-          capability,
-          priority,
-          null);
+      Container container =
+          Container.newInstance(containerId, nodeId, nodeHttpAddress, capability, priority, null);
 
       return container;
     }

Modified: hive/branches/llap/llap-server/src/main/resources/package.py
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/main/resources/package.py?rev=1673556&r1=1673555&r2=1673556&view=diff
==============================================================================
--- hive/branches/llap/llap-server/src/main/resources/package.py (original)
+++ hive/branches/llap/llap-server/src/main/resources/package.py Tue Apr 14 21:06:58 2015
@@ -47,7 +47,7 @@ def zipdir(path, zip, prefix="."):
 			zip.write(src, dst)
 	
 def main(args):
-	opts, args = getopt(args,"",["instances=","output=", "input=","args=","name=","loglevel="])
+	opts, args = getopt(args,"",["instances=","output=", "input=","args=","name=","loglevel=","chaosmonkey="])
 	version = os.getenv("HIVE_VERSION")
 	if not version:
 		version = strftime("%d%b%Y", gmtime()) 
@@ -58,6 +58,7 @@ def main(args):
 	d_args = ""
 	d_loglevel = "INFO"
 	input = None
+	monkey = "0"
 	for k,v in opts:
 		if k in ("--input"):
 			input = v
@@ -71,12 +72,17 @@ def main(args):
 			d_args = v
 		elif k in ("--loglevel"):
 			d_loglevel = v
+		elif k in ("--chaosmonkey"):
+			monkey = v
 	if not input:
 		print "Cannot find input files"
 		sys.exit(1)
 		return
 	config = json_parse(open(join(input, "config.json")).read())
 	resource = LlapResource(config)
+	monkey_interval = int(monkey) 
+	# 5% container failure every monkey_interval seconds
+	monkey_percentage = 5 # 5%
 	vars = {
 		"home" : home,
 		"version" : version,
@@ -88,7 +94,10 @@ def main(args):
 		"java_home" : os.getenv("JAVA_HOME"),
 		"name" : name,
 		"daemon_args" : d_args,
-		"daemon_loglevel" : d_loglevel
+		"daemon_loglevel" : d_loglevel,
+		"monkey_interval" : monkey_interval,
+		"monkey_percentage" : monkey_percentage,
+		"monkey_enabled" : monkey_interval > 0
 	}
 	
 	if not exists(output):

Modified: hive/branches/llap/llap-server/src/main/resources/templates.py
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/main/resources/templates.py?rev=1673556&r1=1673555&r2=1673556&view=diff
==============================================================================
--- hive/branches/llap/llap-server/src/main/resources/templates.py (original)
+++ hive/branches/llap/llap-server/src/main/resources/templates.py Tue Apr 14 21:06:58 2015
@@ -78,7 +78,11 @@ appConfig = """
     "site.global.daemon_args": "%(daemon_args)s",
     "site.global.library_path": "%(hadoop_home)s/lib/native",
     "site.global.memory_val": "%(heap)d",
-    "site.global.pid_file": "${AGENT_WORK_ROOT}/app/run/llap-daemon.pid"
+    "site.global.pid_file": "${AGENT_WORK_ROOT}/app/run/llap-daemon.pid",
+    "internal.chaos.monkey.probability.amlaunchfailure": "0",
+    "internal.chaos.monkey.probability.containerfailure": "%(monkey_percentage)d",
+    "internal.chaos.monkey.interval.seconds": "%(monkey_interval)d",
+    "internal.chaos.monkey.enabled": "%(monkey_enabled)s"
   },
   "components": {
     "slider-appmaster": {
@@ -115,5 +119,4 @@ slider stop %(name)s
 slider destroy %(name)s 
 slider install-package --name LLAP --package  $BASEDIR/llap-%(version)s.zip --replacepkg
 slider create %(name)s --resources $BASEDIR/resources.json --template $BASEDIR/appConfig.json
-slider status %(name)s
 """

Modified: hive/branches/llap/llap-server/src/test/org/apache/tez/dag/app/rm/TestLlapTaskSchedulerService.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/test/org/apache/tez/dag/app/rm/TestLlapTaskSchedulerService.java?rev=1673556&r1=1673555&r2=1673556&view=diff
==============================================================================
--- hive/branches/llap/llap-server/src/test/org/apache/tez/dag/app/rm/TestLlapTaskSchedulerService.java (original)
+++ hive/branches/llap/llap-server/src/test/org/apache/tez/dag/app/rm/TestLlapTaskSchedulerService.java Tue Apr 14 21:06:58 2015
@@ -46,6 +46,7 @@ import org.apache.tez.dag.app.Controlled
 import org.apache.tez.dag.app.rm.TaskSchedulerService.TaskSchedulerAppCallback;
 import org.junit.Test;
 import org.mockito.ArgumentCaptor;
+import org.mortbay.log.Log;
 
 public class TestLlapTaskSchedulerService {
 
@@ -96,7 +97,6 @@ public class TestLlapTaskSchedulerServic
     try {
       Priority priority1 = Priority.newInstance(1);
       String[] hosts1 = new String[]{HOST1};
-
       Object task1 = new Object();
       Object clientCookie1 = new Object();
       tsWrapper.allocateTask(task1, hosts1, priority1, clientCookie1);
@@ -111,11 +111,10 @@ public class TestLlapTaskSchedulerServic
 
       // Verify that the node is blacklisted
       assertEquals(1, tsWrapper.ts.dagStats.numRejectedTasks);
-      assertEquals(2, tsWrapper.ts.activeHosts.size());
-      assertEquals(2, tsWrapper.ts.activeHostList.length);
+      assertEquals(2, tsWrapper.ts.instanceToNodeMap.size());
       LlapTaskSchedulerService.NodeInfo disabledNodeInfo = tsWrapper.ts.disabledNodes.peek();
       assertNotNull(disabledNodeInfo);
-      assertEquals(HOST1, disabledNodeInfo.hostname);
+      assertEquals(HOST1, disabledNodeInfo.host.getHost());
       assertEquals((10000l), disabledNodeInfo.getDelay(TimeUnit.NANOSECONDS));
       assertEquals((10000l + 10000l), disabledNodeInfo.expireTimeMillis);
 
@@ -164,8 +163,7 @@ public class TestLlapTaskSchedulerServic
 
       // Verify that the node is blacklisted
       assertEquals(3, tsWrapper.ts.dagStats.numRejectedTasks);
-      assertEquals(0, tsWrapper.ts.activeHosts.size());
-      assertEquals(0, tsWrapper.ts.activeHostList.length);
+      assertEquals(0, tsWrapper.ts.instanceToNodeMap.size());
       assertEquals(3, tsWrapper.ts.disabledNodes.size());
 
 



Mime
View raw message