hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jia...@apache.org
Subject [09/38] hadoop git commit: YARN-5461. Initial code ported from slider-core module. (jianhe)
Date Thu, 04 Aug 2016 15:11:02 GMT
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c8b0c5de/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/ProviderAppState.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/ProviderAppState.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/ProviderAppState.java
new file mode 100644
index 0000000..37e9a7f
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/ProviderAppState.java
@@ -0,0 +1,307 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.slider.server.appmaster.state;
+
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.slider.api.ClusterDescription;
+import org.apache.slider.api.ClusterNode;
+import org.apache.slider.api.types.ApplicationLivenessInformation;
+import org.apache.slider.api.types.ComponentInformation;
+import org.apache.slider.api.types.NodeInformation;
+import org.apache.slider.api.types.RoleStatistics;
+import org.apache.slider.core.conf.AggregateConf;
+import org.apache.slider.core.conf.ConfTreeOperations;
+import org.apache.slider.core.exceptions.NoSuchNodeException;
+import org.apache.slider.core.registry.docstore.PublishedConfigSet;
+import org.apache.slider.core.registry.docstore.PublishedExportsSet;
+import org.apache.slider.server.appmaster.web.rest.RestPaths;
+import org.apache.slider.server.services.utility.PatternValidator;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Implementation of {@link StateAccessForProviders}, which means
+ * state access for providers, web UI and IPC/REST views.
+ */
+public class ProviderAppState implements StateAccessForProviders {
+
+
+  private final Map<String, PublishedConfigSet> publishedConfigSets =
+      new ConcurrentHashMap<>(5);
+  private final PublishedExportsSet publishedExportsSets = new PublishedExportsSet();
+  private static final PatternValidator validator = new PatternValidator(
+      RestPaths.PUBLISHED_CONFIGURATION_SET_REGEXP);
+  private String applicationName;
+
+  private final AppState appState;
+
+  public ProviderAppState(String applicationName, AppState appState) {
+    this.appState = appState;
+    this.applicationName = applicationName;
+  }
+
+  public void setApplicationName(String applicationName) {
+    this.applicationName = applicationName;
+  }
+
+  @Override
+  public String getApplicationName() {
+    return applicationName;
+  }
+
+  @Override
+  public PublishedConfigSet getPublishedSliderConfigurations() {
+    return getOrCreatePublishedConfigSet(RestPaths.SLIDER_CONFIGSET);
+  }
+
+  @Override
+  public PublishedExportsSet getPublishedExportsSet() {
+    return publishedExportsSets;
+  }
+
+  @Override
+  public PublishedConfigSet getPublishedConfigSet(String name) {
+    return publishedConfigSets.get(name);
+  }
+
+  @Override
+  public PublishedConfigSet getOrCreatePublishedConfigSet(String name) {
+    PublishedConfigSet set = publishedConfigSets.get(name);
+    if (set == null) {
+      validator.validate(name);
+      synchronized (publishedConfigSets) {
+        // synchronized double check to ensure that there is never an overridden
+        // config set created
+        set = publishedConfigSets.get(name);
+        if (set == null) {
+          set = new PublishedConfigSet();
+          publishedConfigSets.put(name, set);
+        }
+      }
+    }
+    return set;
+  }
+
+  @Override
+  public List<String> listConfigSets() {
+
+    synchronized (publishedConfigSets) {
+      List<String> sets = new ArrayList<>(publishedConfigSets.keySet());
+      return sets;
+    }
+  }
+
+  @Override
+  public Map<Integer, RoleStatus> getRoleStatusMap() {
+    return appState.getRoleStatusMap();
+  }
+
+
+  @Override
+  public Map<ContainerId, RoleInstance> getFailedContainers() {
+    return appState.getFailedContainers();
+  }
+
+  @Override
+  public Map<ContainerId, RoleInstance> getLiveContainers() {
+    return appState.getLiveContainers();
+  }
+
+  @Override
+  public ClusterDescription getClusterStatus() {
+    return appState.getClusterStatus();
+  }
+
+  @Override
+  public ConfTreeOperations getResourcesSnapshot() {
+    return appState.getResourcesSnapshot();
+  }
+
+  @Override
+  public ConfTreeOperations getAppConfSnapshot() {
+    return appState.getAppConfSnapshot();
+  }
+
+  @Override
+  public ConfTreeOperations getInternalsSnapshot() {
+    return appState.getInternalsSnapshot();
+  }
+
+  @Override
+  public boolean isApplicationLive() {
+    return appState.isApplicationLive();
+  }
+
+  @Override
+  public long getSnapshotTime() {
+    return appState.getSnapshotTime();
+  }
+
+  @Override
+  public AggregateConf getInstanceDefinitionSnapshot() {
+    return appState.getInstanceDefinitionSnapshot();
+  }
+  
+  @Override
+  public AggregateConf getUnresolvedInstanceDefinition() {
+    return appState.getUnresolvedInstanceDefinition();
+  }
+
+  @Override
+  public RoleStatus lookupRoleStatus(int key) {
+    return appState.lookupRoleStatus(key);
+  }
+
+  @Override
+  public RoleStatus lookupRoleStatus(Container c) throws YarnRuntimeException {
+    return appState.lookupRoleStatus(c);
+  }
+
+  @Override
+  public RoleStatus lookupRoleStatus(String name) throws YarnRuntimeException {
+    return appState.lookupRoleStatus(name);
+  }
+
+  @Override
+  public List<RoleInstance> cloneOwnedContainerList() {
+    return appState.cloneOwnedContainerList();
+  }
+
+  @Override
+  public int getNumOwnedContainers() {
+    return appState.getNumOwnedContainers();
+  }
+
+  @Override
+  public RoleInstance getOwnedContainer(ContainerId id) {
+    return appState.getOwnedContainer(id);
+  }
+
+  @Override
+  public RoleInstance getOwnedContainer(String id) throws NoSuchNodeException {
+    return appState.getOwnedInstanceByContainerID(id);
+  }
+
+  @Override
+  public List<RoleInstance> cloneLiveContainerInfoList() {
+    return appState.cloneLiveContainerInfoList();
+  }
+
+  @Override
+  public RoleInstance getLiveInstanceByContainerID(String containerId) throws
+      NoSuchNodeException {
+    return appState.getLiveInstanceByContainerID(containerId);
+  }
+
+  @Override
+  public List<RoleInstance> getLiveInstancesByContainerIDs(Collection<String> containerIDs) {
+    return appState.getLiveInstancesByContainerIDs(containerIDs);
+  }
+
+  @Override
+  public ClusterDescription refreshClusterStatus() {
+    return appState.refreshClusterStatus();
+  }
+
+  @Override
+  public List<RoleStatus> cloneRoleStatusList() {
+    return appState.cloneRoleStatusList();
+  }
+
+  @Override
+  public ApplicationLivenessInformation getApplicationLivenessInformation() {
+    return appState.getApplicationLivenessInformation();
+  }
+
+  @Override
+  public Map<String, Integer> getLiveStatistics() {
+    return appState.getLiveStatistics();
+  }
+
+  @Override
+  public Map<String, ComponentInformation> getComponentInfoSnapshot() {
+    return appState.getComponentInfoSnapshot();
+  }
+
+  @Override
+  public Map<String, Map<String, ClusterNode>> getRoleClusterNodeMapping() {
+    return appState.createRoleToClusterNodeMap();
+  }
+
+  @Override
+  public List<RoleInstance> enumLiveInstancesInRole(String role) {
+    List<RoleInstance> nodes = new ArrayList<>();
+    Collection<RoleInstance> allRoleInstances = cloneLiveContainerInfoList();
+    for (RoleInstance node : allRoleInstances) {
+      if (role.isEmpty() || role.equals(node.role)) {
+        nodes.add(node);
+      }
+    }
+    return nodes;
+  }
+
+  @Override
+  public List<RoleInstance> lookupRoleContainers(String component) {
+    RoleStatus roleStatus = lookupRoleStatus(component);
+    List<RoleInstance> ownedContainerList = cloneOwnedContainerList();
+    List<RoleInstance> matching = new ArrayList<>(ownedContainerList.size());
+    int roleId = roleStatus.getPriority();
+    for (RoleInstance instance : ownedContainerList) {
+      if (instance.roleId == roleId) {
+        matching.add(instance);
+      }
+    }
+    return matching;
+  }
+  
+  @Override
+  public ComponentInformation getComponentInformation(String component) {
+    RoleStatus roleStatus = lookupRoleStatus(component);
+    ComponentInformation info = roleStatus.serialize();
+    List<RoleInstance> containers = lookupRoleContainers(component);
+    info.containers = new ArrayList<>(containers.size());
+    for (RoleInstance container : containers) {
+      info.containers.add(container.id);
+    }
+    return info;
+  }
+
+  @Override
+  public Map<String, NodeInformation> getNodeInformationSnapshot() {
+    return appState.getRoleHistory()
+      .getNodeInformationSnapshot(appState.buildNamingMap());
+  }
+
+  @Override
+  public NodeInformation getNodeInformation(String hostname) {
+    return appState.getRoleHistory()
+      .getNodeInformation(hostname, appState.buildNamingMap());
+  }
+
+  @Override
+  public RoleStatistics getRoleStatistics() {
+    return appState.getRoleStatistics();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c8b0c5de/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleHistory.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleHistory.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleHistory.java
new file mode 100644
index 0000000..4e8a4d7
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleHistory.java
@@ -0,0 +1,1101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.slider.server.appmaster.state;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.NodeReport;
+import org.apache.hadoop.yarn.api.records.NodeState;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.slider.api.types.NodeInformation;
+import org.apache.slider.common.tools.SliderUtils;
+import org.apache.slider.core.exceptions.BadConfigException;
+import org.apache.slider.providers.ProviderRole;
+import org.apache.slider.server.appmaster.management.BoolMetric;
+import org.apache.slider.server.appmaster.management.MetricsAndMonitoring;
+import org.apache.slider.server.appmaster.management.Timestamp;
+import org.apache.slider.server.appmaster.operations.AbstractRMOperation;
+import org.apache.slider.server.avro.LoadedRoleHistory;
+import org.apache.slider.server.avro.NodeEntryRecord;
+import org.apache.slider.server.avro.RoleHistoryHeader;
+import org.apache.slider.server.avro.RoleHistoryWriter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * The Role History.
+ * <p>
+ * Synchronization policy: all public operations are synchronized.
+ * Protected methods are in place for testing -no guarantees are made.
+ * <p>
+ * Inner classes have no synchronization guarantees; they should be manipulated 
+ * in these classes and not externally.
+ * <p>
+ * Note that as well as some methods marked visible for testing, there
+ * is the option for the time generator method, {@link #now()} to
+ * be overridden so that a repeatable time series can be used.
+ * 
+ */
+public class RoleHistory {
+  protected static final Logger log =
+    LoggerFactory.getLogger(RoleHistory.class);
+  private final List<ProviderRole> providerRoles;
+  /** the roles in here are shared with App State */
+  private final Map<Integer, RoleStatus> roleStatusMap = new HashMap<>();
+  private final AbstractClusterServices recordFactory;
+
+  private long startTime;
+
+  /** Time when saved */
+  private final Timestamp saveTime = new Timestamp(0);
+
+  /** If the history was loaded, the time at which the history was saved.
+   * That is: the time the data was valid */
+  private final Timestamp thawedDataTime = new Timestamp(0);
+  
+  private NodeMap nodemap;
+  private int roleSize;
+  private final BoolMetric dirty = new BoolMetric(false);
+  private FileSystem filesystem;
+  private Path historyPath;
+  private RoleHistoryWriter historyWriter = new RoleHistoryWriter();
+
+  /**
+   * When were the nodes updated in a {@link #onNodesUpdated(List)} call?
+   * If zero: never.
+   */
+  private final Timestamp nodesUpdatedTime = new Timestamp(0);
+  private final BoolMetric nodeUpdateReceived = new BoolMetric(false);
+
+  private OutstandingRequestTracker outstandingRequests =
+    new OutstandingRequestTracker();
+
+  /**
+   * For each role, lists nodes that are available for data-local allocation,
+   * ordered by more recently released - to accelerate node selection.
+   * That is, they are "recently used nodes"
+   */
+  private Map<Integer, LinkedList<NodeInstance>> recentNodes;
+
+  /**
+   * Instantiate
+   * @param roles initial role list
+   * @param recordFactory yarn record factory
+   * @throws BadConfigException
+   */
+  public RoleHistory(Collection<RoleStatus> roles, AbstractClusterServices recordFactory) throws BadConfigException {
+    this.recordFactory = recordFactory;
+    roleSize = roles.size();
+    providerRoles = new ArrayList<>(roleSize);
+    for (RoleStatus role : roles) {
+      addNewRole(role);
+    }
+    reset();
+  }
+
+  /**
+   * Reset the variables -this does not adjust the fixed attributes
+   * of the history, but the nodemap and failed node map are cleared.
+   */
+  protected synchronized void reset() throws BadConfigException {
+
+    nodemap = new NodeMap(roleSize);
+    resetAvailableNodeLists();
+    outstandingRequests = new OutstandingRequestTracker();
+  }
+
+  /**
+   * Register all metrics with the metrics infra
+   * @param metrics metrics
+   */
+  public void register(MetricsAndMonitoring metrics) {
+    metrics.register(RoleHistory.class, dirty, "dirty");
+    metrics.register(RoleHistory.class, nodesUpdatedTime, "nodes-updated.time");
+    metrics.register(RoleHistory.class, nodeUpdateReceived, "nodes-updated.flag");
+    metrics.register(RoleHistory.class, thawedDataTime, "thawed.time");
+    metrics.register(RoleHistory.class, saveTime, "saved.time");
+  }
+
+  /**
+   * safety check: make sure the role is unique amongst
+   * the role stats...which is extended with the new role
+   * @param roleStatus role
+   * @throws ArrayIndexOutOfBoundsException
+   * @throws BadConfigException
+   */
+  protected void putRole(RoleStatus roleStatus) throws BadConfigException {
+    int index = roleStatus.getKey();
+    if (index < 0) {
+      throw new BadConfigException("Provider " + roleStatus + " id is out of range");
+    }
+    if (roleStatusMap.get(index) != null) {
+      throw new BadConfigException(
+        roleStatus.toString() + " id duplicates that of " +
+            roleStatusMap.get(index));
+    }
+    roleStatusMap.put(index, roleStatus);
+  }
+
+  /**
+   * Add a new role
+   * @param roleStatus new role
+   */
+  public void addNewRole(RoleStatus roleStatus) throws BadConfigException {
+    log.debug("Validating/adding new role to role history: {} ", roleStatus);
+    putRole(roleStatus);
+    this.providerRoles.add(roleStatus.getProviderRole());
+  }
+
+  /**
+   * Lookup a role by ID
+   * @param roleId role Id
+   * @return role or null if not found
+   */
+  public ProviderRole lookupRole(int roleId) {
+    for (ProviderRole role : providerRoles) {
+      if (role.id == roleId) {
+        return role;
+      }
+    }
+    return null;
+  }
+
+  /**
+   * Clear the lists of available nodes
+   */
+  private synchronized void resetAvailableNodeLists() {
+    recentNodes = new ConcurrentHashMap<>(roleSize);
+  }
+
+  /**
+   * Prepare the history for re-reading its state.
+   * <p>
+   * This intended for use by the RoleWriter logic.
+   * @throws BadConfigException if there is a problem rebuilding the state
+   */
+  private void prepareForReading(RoleHistoryHeader header)
+      throws BadConfigException {
+    reset();
+
+    int roleCountInSource = header.getRoles();
+    if (roleCountInSource != roleSize) {
+      log.warn("Number of roles in source {}"
+              +" does not match the expected number of {}",
+          roleCountInSource,
+          roleSize);
+    }
+    //record when the data was loaded
+    setThawedDataTime(header.getSaved());
+  }
+
+  /**
+   * rebuild the placement history from the loaded role history
+   * @param loadedRoleHistory loaded history
+   * @return the number of entries discarded
+   * @throws BadConfigException if there is a problem rebuilding the state
+   */
+  @VisibleForTesting
+  public synchronized int rebuild(LoadedRoleHistory loadedRoleHistory) throws BadConfigException {
+    RoleHistoryHeader header = loadedRoleHistory.getHeader();
+    prepareForReading(header);
+    int discarded = 0;
+    Long saved = header.getSaved();
+    for (NodeEntryRecord nodeEntryRecord : loadedRoleHistory.records) {
+      Integer roleId = nodeEntryRecord.getRole();
+      NodeEntry nodeEntry = new NodeEntry(roleId);
+      nodeEntry.setLastUsed(nodeEntryRecord.getLastUsed());
+      if (nodeEntryRecord.getActive()) {
+        //if active at the time of save, make the last used time the save time
+        nodeEntry.setLastUsed(saved);
+      }
+      String hostname = SliderUtils.sequenceToString(nodeEntryRecord.getHost());
+      ProviderRole providerRole = lookupRole(roleId);
+      if (providerRole == null) {
+        // discarding entry
+        log.info("Discarding history entry with unknown role: {} on host {}",
+            roleId, hostname);
+        discarded ++;
+      } else {
+        NodeInstance instance = getOrCreateNodeInstance(hostname);
+        instance.set(roleId, nodeEntry);
+      }
+    }
+    return discarded;
+  }
+
+  public synchronized long getStartTime() {
+    return startTime;
+  }
+
+  public synchronized long getSaveTime() {
+    return saveTime.get();
+  }
+
+  public long getThawedDataTime() {
+    return thawedDataTime.get();
+  }
+
+  public void setThawedDataTime(long thawedDataTime) {
+    this.thawedDataTime.set(thawedDataTime);
+  }
+
+  public synchronized int getRoleSize() {
+    return roleSize;
+  }
+
+  /**
+   * Get the total size of the cluster -the number of NodeInstances
+   * @return a count
+   */
+  public synchronized int getClusterSize() {
+    return nodemap.size();
+  }
+
+  public synchronized boolean isDirty() {
+    return dirty.get();
+  }
+
+  public synchronized void setDirty(boolean dirty) {
+    this.dirty.set(dirty);
+  }
+
+  /**
+   * Tell the history that it has been saved; marks itself as clean
+   * @param timestamp timestamp -updates the savetime field
+   */
+  public synchronized void saved(long timestamp) {
+    setDirty(false);
+    saveTime.set(timestamp);
+  }
+
+  /**
+   * Get a clone of the nodemap.
+   * The instances inside are not cloned
+   * @return the map
+   */
+  public synchronized NodeMap cloneNodemap() {
+    return (NodeMap) nodemap.clone();
+  }
+
+  /**
+   * Get snapshot of the node map
+   * @return a snapshot of the current node state
+   * @param naming naming map of priority to enty name; entries must be unique.
+   * It's OK to be incomplete, for those the list falls back to numbers.
+   */
+  public synchronized Map<String, NodeInformation> getNodeInformationSnapshot(
+    Map<Integer, String> naming) {
+    Map<String, NodeInformation> result = new HashMap<>(nodemap.size());
+    for (Map.Entry<String, NodeInstance> entry : nodemap.entrySet()) {
+      result.put(entry.getKey(), entry.getValue().serialize(naming));
+    }
+    return result;
+  }
+
+  /**
+   * Get the information on a node
+   * @param hostname hostname
+   * @param naming naming map of priority to enty name; entries must be unique.
+   * It's OK to be incomplete, for those the list falls back to numbers.
+   * @return the information about that host, or null if there is none
+   */
+  public synchronized NodeInformation getNodeInformation(String hostname,
+    Map<Integer, String> naming) {
+    NodeInstance nodeInstance = nodemap.get(hostname);
+    return nodeInstance != null ? nodeInstance.serialize(naming) : null;
+  }
+
+  /**
+   * Get the node instance for the specific node -creating it if needed
+   * @param hostname node address
+   * @return the instance
+   */
+  public synchronized NodeInstance getOrCreateNodeInstance(String hostname) {
+    //convert to a string
+    return nodemap.getOrCreate(hostname);
+  }
+
+  /**
+   * Insert a list of nodes into the map; overwrite any with that name.
+   * This is a bulk operation for testing.
+   * Important: this does not update the available node lists, these
+   * must be rebuilt afterwards.
+   * @param nodes collection of nodes.
+   */
+  @VisibleForTesting
+  public synchronized void insert(Collection<NodeInstance> nodes) {
+    nodemap.insert(nodes);
+  }
+
+  /**
+   * Get current time. overrideable for test subclasses
+   * @return current time in millis
+   */
+  protected long now() {
+    return System.currentTimeMillis();
+  }
+
+  /**
+   * Mark ourselves as dirty
+   */
+  public void touch() {
+    setDirty(true);
+    try {
+      saveHistoryIfDirty();
+    } catch (IOException e) {
+      log.warn("Failed to save history file ", e);
+    }
+  }
+
+  /**
+   * reset the failed recently counters
+   */
+  public synchronized void resetFailedRecently() {
+    log.info("Resetting failure history");
+    nodemap.resetFailedRecently();
+  }
+
+  /**
+   * Get the path used for history files
+   * @return the directory used for history files
+   */
+  public Path getHistoryPath() {
+    return historyPath;
+  }
+
+  /**
+   * Save the history to its location using the timestamp as part of
+   * the filename. The saveTime and dirty fields are updated
+   * @param time timestamp timestamp to use as the save time
+   * @return the path saved to
+   * @throws IOException IO problems
+   */
+  @VisibleForTesting
+  public synchronized Path saveHistory(long time) throws IOException {
+    Path filename = historyWriter.createHistoryFilename(historyPath, time);
+    historyWriter.write(filesystem, filename, true, this, time);
+    saved(time);
+    return filename;
+  }
+
+  /**
+   * Save the history with the current timestamp if it is dirty;
+   * return the path saved to if this is the case
+   * @return the path or null if the history was not saved
+   * @throws IOException failed to save for some reason
+   */
+  public synchronized Path saveHistoryIfDirty() throws IOException {
+    if (isDirty()) {
+      return saveHistory(now());
+    } else {
+      return null;
+    }
+  } 
+
+  /**
+   * Start up
+   * @param fs filesystem 
+   * @param historyDir path in FS for history
+   * @return true if the history was thawed
+   */
+  public boolean onStart(FileSystem fs, Path historyDir) throws BadConfigException {
+    assert filesystem == null;
+    filesystem = fs;
+    historyPath = historyDir;
+    startTime = now();
+    //assume the history is being thawed; this will downgrade as appropriate
+    return onThaw();
+    }
+  
+  /**
+   * Handler for bootstrap event: there was no history to thaw
+   */
+  public void onBootstrap() {
+    log.debug("Role history bootstrapped");
+  }
+
+  /**
+   * Handle the start process <i>after the history has been rebuilt</i>,
+   * and after any gc/purge
+   */
+  public synchronized boolean onThaw() throws BadConfigException {
+    assert filesystem != null;
+    assert historyPath != null;
+    boolean thawSuccessful = false;
+    //load in files from data dir
+
+    LoadedRoleHistory loadedRoleHistory = null;
+    try {
+      loadedRoleHistory = historyWriter.loadFromHistoryDir(filesystem, historyPath);
+    } catch (IOException e) {
+      log.warn("Exception trying to load history from {}", historyPath, e);
+    }
+    if (loadedRoleHistory != null) {
+      rebuild(loadedRoleHistory);
+      thawSuccessful = true;
+      Path loadPath = loadedRoleHistory.getPath();
+      log.debug("loaded history from {}", loadPath);
+      // delete any old entries
+      try {
+        int count = historyWriter.purgeOlderHistoryEntries(filesystem, loadPath);
+        log.debug("Deleted {} old history entries", count);
+      } catch (IOException e) {
+        log.info("Ignoring exception raised while trying to delete old entries",
+                 e);
+      }
+
+      //start is then completed
+      buildRecentNodeLists();
+    } else {
+      //fallback to bootstrap procedure
+      onBootstrap();
+    }
+    return thawSuccessful;
+  }
+
+
+  /**
+   * (After the start), rebuild the availability data structures
+   */
+  @VisibleForTesting
+  public synchronized void buildRecentNodeLists() {
+    resetAvailableNodeLists();
+    // build the list of available nodes
+    for (Map.Entry<String, NodeInstance> entry : nodemap.entrySet()) {
+      NodeInstance ni = entry.getValue();
+      for (int i = 0; i < roleSize; i++) {
+        NodeEntry nodeEntry = ni.get(i);
+        if (nodeEntry != null && nodeEntry.isAvailable()) {
+          log.debug("Adding {} for role {}", ni, i);
+          listRecentNodesForRoleId(i).add(ni);
+        }
+      }
+    }
+    // sort the resulting arrays
+    for (int i = 0; i < roleSize; i++) {
+      sortRecentNodeList(i);
+    }
+  }
+
+  /**
+   * Get the nodes for an ID -may be null
+   * @param id role ID
+   * @return potentially null list
+   */
+  @VisibleForTesting
+  public List<NodeInstance> getRecentNodesForRoleId(int id) {
+    return recentNodes.get(id);
+  }
+
+  /**
+   * Get a possibly empty list of suggested nodes for a role.
+   * @param id role ID
+   * @return list
+   */
+  private LinkedList<NodeInstance> listRecentNodesForRoleId(int id) {
+    LinkedList<NodeInstance> instances = recentNodes.get(id);
+    if (instances == null) {
+      synchronized (this) {
+        // recheck in the synchronized block and recreate
+        if (recentNodes.get(id) == null) {
+          recentNodes.put(id, new LinkedList<NodeInstance>());
+        }
+        instances = recentNodes.get(id);
+      }
+    }
+    return instances;
+  }
+
+  /**
+   * Sort a the recent node list for a single role
+   * @param role role to sort
+   */
+  private void sortRecentNodeList(int role) {
+    List<NodeInstance> nodesForRoleId = getRecentNodesForRoleId(role);
+    if (nodesForRoleId != null) {
+      Collections.sort(nodesForRoleId, new NodeInstance.Preferred(role));
+    }
+  }
+
+  /**
+   * Find a node for use
+   * @param role role
+   * @return the instance, or null for none
+   */
+  @VisibleForTesting
+  public synchronized NodeInstance findRecentNodeForNewInstance(RoleStatus role) {
+    if (!role.isPlacementDesired()) {
+      // no data locality policy
+      return null;
+    }
+    int roleId = role.getKey();
+    boolean strictPlacement = role.isStrictPlacement();
+    NodeInstance nodeInstance = null;
+    // Get the list of possible targets.
+    // This is a live list: changes here are preserved
+    List<NodeInstance> targets = getRecentNodesForRoleId(roleId);
+    if (targets == null) {
+      // nothing to allocate on
+      return null;
+    }
+
+    int cnt = targets.size();
+    log.debug("There are {} node(s) to consider for {}", cnt, role.getName());
+    for (int i = 0; i < cnt  && nodeInstance == null; i++) {
+      NodeInstance candidate = targets.get(i);
+      if (candidate.getActiveRoleInstances(roleId) == 0) {
+        // no active instances: check failure statistics
+        if (strictPlacement
+            || (candidate.isOnline() && !candidate.exceedsFailureThreshold(role))) {
+          targets.remove(i);
+          // exit criteria for loop is now met
+          nodeInstance = candidate;
+        } else {
+          // too many failures for this node
+          log.info("Recent node failures is higher than threshold {}. Not requesting host {}",
+              role.getNodeFailureThreshold(), candidate.hostname);
+        }
+      }
+    }
+
+    if (nodeInstance == null) {
+      log.info("No node found for {}", role.getName());
+    }
+    return nodeInstance;
+  }
+
+  /**
+   * Find a node for use
+   * @param role role
+   * @return the instance, or null for none
+   */
+  @VisibleForTesting
+  public synchronized List<NodeInstance> findNodeForNewAAInstance(RoleStatus role) {
+    // all nodes that are live and can host the role; no attempt to exclude ones
+    // considered failing
+    return nodemap.findAllNodesForRole(role.getKey(), role.getLabelExpression());
+  }
+
+  /**
+   * Request an instance on a given node.
+   * An outstanding request is created & tracked, with the 
+   * relevant node entry for that role updated.
+   *<p>
+   * The role status entries will also be tracked
+   * <p>
+   * Returns the request that is now being tracked.
+   * If the node instance is not null, it's details about the role is incremented
+   *
+   * @param node node to target or null for "any"
+   * @param role role to request
+   * @return the request
+   */
+  public synchronized OutstandingRequest requestInstanceOnNode(
+      NodeInstance node, RoleStatus role, Resource resource) {
+    OutstandingRequest outstanding = outstandingRequests.newRequest(node, role.getKey());
+    outstanding.buildContainerRequest(resource, role, now());
+    return outstanding;
+  }
+
+  /**
+   * Find a node for a role and request an instance on that (or a location-less
+   * instance)
+   * @param role role status
+   * @return a request ready to go, or null if this is an AA request and no
+   * location can be found.
+   */
+  public synchronized OutstandingRequest requestContainerForRole(RoleStatus role) {
+
+    if (role.isAntiAffinePlacement()) {
+      return requestContainerForAARole(role);
+    } else {
+      Resource resource = recordFactory.newResource();
+      role.copyResourceRequirements(resource);
+      NodeInstance node = findRecentNodeForNewInstance(role);
+      return requestInstanceOnNode(node, role, resource);
+    }
+  }
+
+  /**
+   * Find a node for an AA role and request an instance on that (or a location-less
+   * instance)
+   * @param role role status
+   * @return a request ready to go, or null if no location can be found.
+   */
+  public synchronized OutstandingRequest requestContainerForAARole(RoleStatus role) {
+    List<NodeInstance> nodes = findNodeForNewAAInstance(role);
+    if (!nodes.isEmpty()) {
+      OutstandingRequest outstanding = outstandingRequests.newAARequest(
+          role.getKey(), nodes, role.getLabelExpression());
+      Resource resource = recordFactory.newResource();
+      role.copyResourceRequirements(resource);
+      outstanding.buildContainerRequest(resource, role, now());
+      return outstanding;
+    } else {
+      log.warn("No suitable location for {}", role.getName());
+      return null;
+    }
+  }
+  /**
+   * Get the list of active nodes ... walks the node map so
+   * is {@code O(nodes)}
+   * @param role role index
+   * @return a possibly empty list of nodes with an instance of that node
+   */
+  public synchronized List<NodeInstance> listActiveNodes(int role) {
+    return nodemap.listActiveNodes(role);
+  }
+
+  /**
+   * Get the node entry of a container
+   * @param container container to look up
+   * @return the entry
+   * @throws RuntimeException if the container has no hostname
+   */
+  public NodeEntry getOrCreateNodeEntry(Container container) {
+    return getOrCreateNodeInstance(container).getOrCreate(container);
+  }
+
+  /**
+   * Get the node instance of a container -always returns something
+   * @param container container to look up
+   * @return a (possibly new) node instance
+   * @throws RuntimeException if the container has no hostname
+   */
+  public synchronized NodeInstance getOrCreateNodeInstance(Container container) {
+    return nodemap.getOrCreate(RoleHistoryUtils.hostnameOf(container));
+  }
+
+  /**
+   * Get the node instance of a host if defined
+   * @param hostname hostname to look up
+   * @return a node instance or null
+   * @throws RuntimeException if the container has no hostname
+   */
+  public synchronized NodeInstance getExistingNodeInstance(String hostname) {
+    return nodemap.get(hostname);
+  }
+
+  /**
+   * Get the node instance of a container <i>if there's an entry in the history</i>
+   * @param container container to look up
+   * @return a node instance or null
+   * @throws RuntimeException if the container has no hostname
+   */
+  public synchronized NodeInstance getExistingNodeInstance(Container container) {
+    return nodemap.get(RoleHistoryUtils.hostnameOf(container));
+  }
+
+  /**
+   * Perform any pre-allocation operations on the list of allocated containers
+   * based on knowledge of system state. 
+   * Currently this places requested hosts ahead of unrequested ones.
+   * @param allocatedContainers list of allocated containers
+   * @return list of containers potentially reordered
+   */
+  public synchronized List<Container> prepareAllocationList(List<Container> allocatedContainers) {
+
+    //partition into requested and unrequested
+    List<Container> requested =
+      new ArrayList<>(allocatedContainers.size());
+    List<Container> unrequested =
+      new ArrayList<>(allocatedContainers.size());
+    outstandingRequests.partitionRequests(this, allocatedContainers, requested, unrequested);
+
+    //give the unrequested ones lower priority
+    requested.addAll(unrequested);
+    return requested;
+  }
+
+  /**
+   * A container has been allocated on a node -update the data structures
+   * @param container container
+   * @param desiredCount desired #of instances
+   * @param actualCount current count of instances
+   * @return The allocation outcome
+   */
+  public synchronized ContainerAllocationResults onContainerAllocated(Container container,
+      long desiredCount,
+      long actualCount) {
+    int role = ContainerPriority.extractRole(container);
+
+    String hostname = RoleHistoryUtils.hostnameOf(container);
+    List<NodeInstance> nodeInstances = listRecentNodesForRoleId(role);
+    ContainerAllocationResults outcome =
+        outstandingRequests.onContainerAllocated(role, hostname, container);
+    if (desiredCount <= actualCount) {
+      // all outstanding requests have been satisfied
+      // clear all the lists, so returning nodes to the available set
+      List<NodeInstance> hosts = outstandingRequests.resetOutstandingRequests(role);
+      if (!hosts.isEmpty()) {
+        //add the list
+        log.info("Adding {} hosts for role {}", hosts.size(), role);
+        nodeInstances.addAll(hosts);
+        sortRecentNodeList(role);
+      }
+    }
+    return outcome;
+  }
+
+  /**
+   * A container has been assigned to a role instance on a node -update the data structures
+   * @param container container
+   */
+  public void onContainerAssigned(Container container) {
+    NodeInstance node = getOrCreateNodeInstance(container);
+    NodeEntry nodeEntry = node.getOrCreate(container);
+    nodeEntry.onStarting();
+    log.debug("Node {} has updated NodeEntry {}", node, nodeEntry);
+  }
+
+  /**
+   * Event: a container start has been submitted
+   * @param container container being started
+   * @param instance instance bound to the container
+   */
+  public void onContainerStartSubmitted(Container container,
+                                        RoleInstance instance) {
+    // no actions here
+  }
+
+  /**
+   * Container start event
+   * @param container container that just started
+   */
+  public void onContainerStarted(Container container) {
+    NodeEntry nodeEntry = getOrCreateNodeEntry(container);
+    nodeEntry.onStartCompleted();
+    touch();
+  }
+
+  /**
+   * A container failed to start: update the node entry state
+   * and return the container to the queue
+   * @param container container that failed
+   * @return true if the node was queued
+   */
+  public boolean onNodeManagerContainerStartFailed(Container container) {
+    return markContainerFinished(container, false, true, ContainerOutcome.Failed);
+  }
+
+  /**
+   * Does the RoleHistory have enough information about the YARN cluster
+   * to start placing AA requests? That is: has it the node map and
+   * any label information needed?
+   * @return true if the caller can start requesting AA nodes
+   */
+  public boolean canPlaceAANodes() {
+    return nodeUpdateReceived.get();
+  }
+
+  /**
+   * Get the last time the nodes were updated from YARN
+   * @return the update time or zero if never updated.
+   */
+  public long getNodesUpdatedTime() {
+    return nodesUpdatedTime.get();
+  }
+
+  /**
+   * Update failedNodes and nodemap based on the node state
+   *
+   * @param updatedNodes list of updated nodes
+   * @return true if a review should be triggered.
+   */
+  public synchronized boolean onNodesUpdated(List<NodeReport> updatedNodes) {
+    log.debug("Updating {} nodes", updatedNodes.size());
+    nodesUpdatedTime.set(now());
+    nodeUpdateReceived.set(true);
+    int printed = 0;
+    boolean triggerReview = false;
+    for (NodeReport updatedNode : updatedNodes) {
+      String hostname = updatedNode.getNodeId() == null
+          ? ""
+          : updatedNode.getNodeId().getHost();
+      NodeState nodeState = updatedNode.getNodeState();
+      if (hostname.isEmpty() || nodeState == null) {
+        log.warn("Ignoring incomplete update");
+        continue;
+      }
+      if (log.isDebugEnabled() && printed++ < 10) {
+        // log the first few, but avoid overloading the logs for a full cluster
+        // update
+        log.debug("Node \"{}\" is in state {}", hostname, nodeState);
+      }
+      // update the node; this also creates an instance if needed
+      boolean updated = nodemap.updateNode(hostname, updatedNode);
+      triggerReview |= updated;
+    }
+    return triggerReview;
+  }
+
+  /**
+   * A container release request was issued
+   * @param container container submitted
+   */
+  public void onContainerReleaseSubmitted(Container container) {
+    NodeEntry nodeEntry = getOrCreateNodeEntry(container);
+    nodeEntry.release();
+  }
+
+  /**
+   * App state notified of a container completed 
+   * @param container completed container
+   * @return true if the node was queued
+   */
+  public boolean onReleaseCompleted(Container container) {
+    return markContainerFinished(container, true, false, ContainerOutcome.Failed);
+  }
+
+  /**
+   * App state notified of a container completed -but as
+   * it wasn't being released it is marked as failed
+   *
+   * @param container completed container
+   * @param shortLived was the container short lived?
+   * @param outcome
+   * @return true if the node is considered available for work
+   */
+  public boolean onFailedContainer(Container container,
+      boolean shortLived,
+      ContainerOutcome outcome) {
+    return markContainerFinished(container, false, shortLived, outcome);
+  }
+
+  /**
+   * Mark a container finished; if it was released then that is treated
+   * differently. history is {@code touch()}-ed
+   *
+   *
+   * @param container completed container
+   * @param wasReleased was the container released?
+   * @param shortLived was the container short lived?
+   * @param outcome
+   * @return true if the node was queued
+   */
+  protected synchronized boolean markContainerFinished(Container container,
+      boolean wasReleased,
+      boolean shortLived,
+      ContainerOutcome outcome) {
+    NodeEntry nodeEntry = getOrCreateNodeEntry(container);
+    log.info("Finished container for node {}, released={}, shortlived={}",
+        nodeEntry.rolePriority, wasReleased, shortLived);
+    boolean available;
+    if (shortLived) {
+      nodeEntry.onStartFailed();
+      available = false;
+    } else {
+      available = nodeEntry.containerCompleted(wasReleased, outcome);
+      maybeQueueNodeForWork(container, nodeEntry, available);
+    }
+    touch();
+    return available;
+  }
+
+  /**
+   * If the node is marked as available; queue it for assignments.
+   * Unsynced: requires caller to be in a sync block.
+   * @param container completed container
+   * @param nodeEntry node
+   * @param available available flag
+   * @return true if the node was queued
+   */
+  private boolean maybeQueueNodeForWork(Container container,
+                                        NodeEntry nodeEntry,
+                                        boolean available) {
+    if (available) {
+      //node is free
+      nodeEntry.setLastUsed(now());
+      NodeInstance ni = getOrCreateNodeInstance(container);
+      int roleId = ContainerPriority.extractRole(container);
+      log.debug("Node {} is now available for role id {}", ni, roleId);
+      listRecentNodesForRoleId(roleId).addFirst(ni);
+    }
+    return available;
+  }
+
+  /**
+   * Print the history to the log. This is for testing and diagnostics 
+   */
+  public synchronized void dump() {
+    for (ProviderRole role : providerRoles) {
+      log.info(role.toString());
+      List<NodeInstance> instances = listRecentNodesForRoleId(role.id);
+      log.info("  available: " + instances.size()
+               + " " + SliderUtils.joinWithInnerSeparator(" ", instances));
+    }
+
+    log.info("Nodes in Cluster: {}", getClusterSize());
+    for (NodeInstance node : nodemap.values()) {
+      log.info(node.toFullString());
+    }
+  }
+
+  /**
+   * Build the mapping entry for persisting to the role history
+   * @return a mapping object
+   */
+  public synchronized Map<CharSequence, Integer> buildMappingForHistoryFile() {
+    Map<CharSequence, Integer> mapping = new HashMap<>(getRoleSize());
+    for (ProviderRole role : providerRoles) {
+      mapping.put(role.name, role.id);
+    }
+    return mapping;
+  }
+
+  /**
+   * Get a clone of the available list
+   * @param role role index
+   * @return a clone of the list
+   */
+  @VisibleForTesting
+  public List<NodeInstance> cloneRecentNodeList(int role) {
+    return new LinkedList<>(listRecentNodesForRoleId(role));
+  }
+
+  /**
+   * Get a snapshot of the outstanding placed request list
+   * @return a list of the requests outstanding at the time of requesting
+   */
+  @VisibleForTesting
+  public List<OutstandingRequest> listPlacedRequests() {
+    return outstandingRequests.listPlacedRequests();
+  }
+
+  /**
+   * Get a snapshot of the outstanding placed request list
+   * @return a list of the requests outstanding at the time of requesting
+   */
+  @VisibleForTesting
+  public List<OutstandingRequest> listOpenRequests() {
+    return outstandingRequests.listOpenRequests();
+  }
+
+  /**
+   * Escalate operation as triggered by external timer.
+   * @return a (usually empty) list of cancel/request operations.
+   */
+  public synchronized List<AbstractRMOperation> escalateOutstandingRequests() {
+    return outstandingRequests.escalateOutstandingRequests(now());
+  }
+  /**
+   * Escalate operation as triggered by external timer.
+   * @return a (usually empty) list of cancel/request operations.
+   */
+  public List<AbstractRMOperation> cancelOutstandingAARequests() {
+    return outstandingRequests.cancelOutstandingAARequests();
+  }
+
+  /**
+   * Cancel a number of outstanding requests for a role -that is, not
+   * actual containers, just requests for new ones.
+   * @param role role
+   * @param toCancel number to cancel
+   * @return a list of cancellable operations.
+   */
+  public List<AbstractRMOperation> cancelRequestsForRole(RoleStatus role, int toCancel) {
+    return role.isAntiAffinePlacement() ?
+        cancelRequestsForAARole(role, toCancel)
+        : cancelRequestsForSimpleRole(role, toCancel);
+  }
+
+  /**
+   * Build the list of requests to cancel from the outstanding list.
+   * @param role role
+   * @param toCancel number to cancel
+   * @return a list of cancellable operations.
+   */
+  private synchronized List<AbstractRMOperation> cancelRequestsForSimpleRole(RoleStatus role, int toCancel) {
+    Preconditions.checkArgument(toCancel > 0,
+        "trying to cancel invalid number of requests: " + toCancel);
+    List<AbstractRMOperation> results = new ArrayList<>(toCancel);
+    // first scan through the unplaced request list to find all of a role
+    int roleId = role.getKey();
+    List<OutstandingRequest> requests =
+        outstandingRequests.extractOpenRequestsForRole(roleId, toCancel);
+
+    // are there any left?
+    int remaining = toCancel - requests.size();
+    // ask for some placed nodes
+    requests.addAll(outstandingRequests.extractPlacedRequestsForRole(roleId, remaining));
+
+    // build cancellations
+    for (OutstandingRequest request : requests) {
+      results.add(request.createCancelOperation());
+    }
+    return results;
+  }
+
+  /**
+   * Build the list of requests to cancel for an AA role. This reduces the number
+   * of outstanding pending requests first, then cancels any active request,
+   * before finally asking for any placed containers
+   * @param role role
+   * @param toCancel number to cancel
+   * @return a list of cancellable operations.
+   */
+  private synchronized List<AbstractRMOperation> cancelRequestsForAARole(RoleStatus role, int toCancel) {
+    List<AbstractRMOperation> results = new ArrayList<>(toCancel);
+    int roleId = role.getKey();
+    List<OutstandingRequest> requests = new ArrayList<>(toCancel);
+    // there may be pending requests which can be cancelled here
+    long pending = role.getPendingAntiAffineRequests();
+    if (pending > 0) {
+      // there are some pending ones which can be cancelled first
+      long pendingToCancel = Math.min(pending, toCancel);
+      log.info("Cancelling {} pending AA allocations, leaving {}", toCancel,
+          pendingToCancel);
+      role.setPendingAntiAffineRequests(pending - pendingToCancel);
+      toCancel -= pendingToCancel;
+    }
+    if (toCancel > 0 && role.isAARequestOutstanding()) {
+      // not enough
+      log.info("Cancelling current AA request");
+      // find the single entry which may be running
+      requests = outstandingRequests.extractOpenRequestsForRole(roleId, toCancel);
+      role.cancelOutstandingAARequest();
+      toCancel--;
+    }
+
+    // ask for some excess nodes
+    if (toCancel > 0) {
+      requests.addAll(outstandingRequests.extractPlacedRequestsForRole(roleId, toCancel));
+    }
+
+    // build cancellations
+    for (OutstandingRequest request : requests) {
+      results.add(request.createCancelOperation());
+    }
+    return results;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c8b0c5de/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleHistoryUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleHistoryUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleHistoryUtils.java
new file mode 100644
index 0000000..ea6197b
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleHistoryUtils.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.slider.server.appmaster.state;
+
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.slider.common.tools.SliderUtils;
+
+public class RoleHistoryUtils {
+
+  public static String hostnameOf(Container container) {
+    NodeId nodeId = container.getNodeId();
+    if (nodeId== null) {
+      throw new RuntimeException("Container has no node ID: %s" +
+         SliderUtils.containerToString(container));
+    }
+    return nodeId.getHost();
+  }
+
+  /**
+   * Decrement a value but hold it at zero. Usually a sanity check
+   * on counters tracking outstanding operations
+   * @param val value
+   * @return decremented value
+   */
+  public static int decToFloor(int val) {
+    int v = val-1;
+    if (v < 0) {
+      v = 0;
+    }
+    return v;
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c8b0c5de/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleHostnamePair.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleHostnamePair.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleHostnamePair.java
new file mode 100644
index 0000000..920887a
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleHostnamePair.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.slider.server.appmaster.state;
+
+import java.util.Objects;
+
+public class RoleHostnamePair {
+
+  /**
+   * requested role
+   */
+  public final int roleId;
+
+  /**
+   * hostname -will be null if node==null
+   */
+  public final String hostname;
+
+  public RoleHostnamePair(int roleId, String hostname) {
+    this.roleId = roleId;
+    this.hostname = hostname;
+  }
+
+  public int getRoleId() {
+    return roleId;
+  }
+
+  public String getHostname() {
+    return hostname;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (!(o instanceof RoleHostnamePair)) {
+      return false;
+    }
+    RoleHostnamePair that = (RoleHostnamePair) o;
+    return Objects.equals(roleId, that.roleId) &&
+        Objects.equals(hostname, that.hostname);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(roleId, hostname);
+  }
+
+  @Override
+  public String toString() {
+    final StringBuilder sb = new StringBuilder(
+        "RoleHostnamePair{");
+    sb.append("roleId=").append(roleId);
+    sb.append(", hostname='").append(hostname).append('\'');
+    sb.append('}');
+    return sb.toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c8b0c5de/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleInstance.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleInstance.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleInstance.java
new file mode 100644
index 0000000..30cfec9
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleInstance.java
@@ -0,0 +1,323 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.slider.server.appmaster.state;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.registry.client.binding.RegistryTypeUtils;
+import org.apache.hadoop.registry.client.types.Endpoint;
+import org.apache.hadoop.registry.client.types.ProtocolTypes;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.slider.api.ClusterNode;
+import org.apache.slider.api.proto.Messages;
+import org.apache.slider.api.types.ContainerInformation;
+import org.apache.slider.common.tools.SliderUtils;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * Tracking information about a container
+ */
+public final class RoleInstance implements Cloneable {
+
+  public Container container;
+  /**
+   * Container ID
+   */
+  public final String id;
+  public long createTime;
+  public long startTime;
+  /**
+   * flag set when it is released, to know if it has
+   * already been targeted for termination
+   */
+  public boolean released;
+
+  /**
+   * Name of the role
+   */
+  public String role;
+  public String group;
+
+  /**
+   * Version of the app
+   */
+  public String appVersion;
+
+  /**
+   * Role Id; matches priority in resources.json
+   */
+  public int roleId;
+
+  /**
+   * state from StateValues
+   */
+  public int state;
+
+  /**
+   * Exit code: only valid if the state >= STOPPED
+   */
+  public int exitCode;
+
+  /**
+   * what was the command executed?
+   */
+  public String command;
+
+  /**
+   * Any diagnostics
+   */
+  public String diagnostics;
+
+  /**
+   * What is the tail output from the executed process (or [] if not started
+   * or the log cannot be picked up
+   */
+  public String[] output;
+
+  /**
+   * Any environment details
+   */
+  public String[] environment;
+  
+  public String ip;
+  public String hostname;
+  public String host;
+  public String hostURL;
+  public ContainerAllocationOutcome placement;
+
+
+  /**
+   * A list of registered endpoints.
+   */
+  private List<Endpoint> endpoints =
+      new ArrayList<>(2);
+
+  public RoleInstance(ContainerAssignment assignment) {
+    this(assignment.container);
+    placement = assignment.placement;
+  }
+  /**
+   * Create an instance to track an allocated container
+   * @param container a container which must be non null, and have a non-null Id field.
+   */
+  public RoleInstance(Container container) {
+    Preconditions.checkNotNull(container, "Null container");
+    Preconditions.checkState(container.getId() != null, 
+      "Null container ID");
+
+    this.container = container;
+    id = container.getId().toString();
+    if (container.getNodeId() != null) {
+      host = container.getNodeId().getHost();
+    }
+    if (container.getNodeHttpAddress() != null) {
+      hostURL = "http://" + container.getNodeHttpAddress();
+    }
+  }
+
+  public ContainerId getId() {
+    return container.getId();
+  }
+  
+  public NodeId getHost() {
+    return container.getNodeId();
+  }
+
+  @Override
+  public String toString() {
+    final StringBuilder sb =
+      new StringBuilder("RoleInstance{");
+    sb.append("role='").append(role).append('\'');
+    sb.append(", id='").append(id).append('\'');
+    sb.append(", container=").append(SliderUtils.containerToString(container));
+    sb.append(", createTime=").append(createTime);
+    sb.append(", startTime=").append(startTime);
+    sb.append(", released=").append(released);
+    sb.append(", roleId=").append(roleId);
+    sb.append(", host=").append(host);
+    sb.append(", hostURL=").append(hostURL);
+    sb.append(", state=").append(state);
+    sb.append(", placement=").append(placement);
+    sb.append(", exitCode=").append(exitCode);
+    sb.append(", command='").append(command).append('\'');
+    sb.append(", diagnostics='").append(diagnostics).append('\'');
+    sb.append(", output=").append(Arrays.toString(output));
+    sb.append(", environment=").append(Arrays.toString(environment));
+    sb.append('}');
+    return sb.toString();
+  }
+
+  public ContainerId getContainerId() {
+    return container != null ? container.getId() : null;
+  }
+
+  /**
+   * Generate the protobuf format of a request
+   * @return protobuf format. This excludes the Container info
+   */
+  public Messages.RoleInstanceState toProtobuf() {
+    Messages.RoleInstanceState.Builder builder =
+      Messages.RoleInstanceState.newBuilder();
+    if (container != null) {
+      builder.setName(container.getId().toString());
+    } else {
+      builder.setName("unallocated instance");
+    }
+    if (command != null) {
+      builder.setCommand(command);
+    }
+    if (environment != null) {
+      builder.addAllEnvironment(Arrays.asList(environment));
+    }
+    if (diagnostics != null) {
+      builder.setDiagnostics(diagnostics);
+    }
+    builder.setExitCode(exitCode);
+
+    if (output != null) {
+      builder.addAllOutput(Arrays.asList(output));
+    }
+    if (role != null) {
+      builder.setRole(role);
+    }
+    builder.setRoleId(roleId);
+    builder.setState(state);
+
+    builder.setReleased(released);
+    builder.setCreateTime(createTime);
+    builder.setStartTime(startTime);
+    builder.setHost(host);
+    builder.setHostURL(hostURL);
+    if (appVersion != null) {
+      builder.setAppVersion(appVersion);
+    }
+    return builder.build();
+  }
+
+  /**
+   * Build a serializable ClusterNode structure from this instance.
+   * This operation is unsynchronized.
+   * @return a serialized value.
+   */
+  public ClusterNode toClusterNode() {
+    ClusterNode node;
+    if (container != null) {
+      node = new ClusterNode(container.getId());
+    } else {
+      node = new ClusterNode();
+      node.name = "unallocated instance";
+    }
+    node.command = command;
+    node.createTime = createTime;
+    node.diagnostics = diagnostics;
+    if (environment != null) {
+      node.environment = Arrays.copyOf(environment, environment.length);
+    }
+    node.exitCode = exitCode;
+    node.ip = ip;
+    node.hostname = hostname;
+    node.host = host;
+    node.hostUrl = hostURL;
+    if (output != null) {
+      node.output = Arrays.copyOf(output, output.length);
+    }
+    node.released = released;
+    node.role = role;
+    node.roleId = roleId;
+    node.startTime = startTime ;
+    node.state = state;
+    
+    return node;
+  }
+  
+  /**
+   * Clone operation clones all the simple values but shares the 
+   * Container object into the cloned copy -same with the output,
+   * diagnostics and env arrays.
+   * @return a clone of the object
+   * @throws CloneNotSupportedException
+   */
+  @Override
+  public Object clone() throws CloneNotSupportedException {
+    RoleInstance cloned = (RoleInstance) super.clone();
+    // clone the endpoint list, but not the values
+    cloned.endpoints = new ArrayList<Endpoint>(this.endpoints);
+    return cloned;
+  }
+
+  /**
+   * Get the list of endpoints. 
+   * @return the endpoint list.
+   */
+  public List<Endpoint> getEndpoints() {
+    return endpoints;
+  }
+
+  /**
+   * Add an endpoint registration
+   * @param endpoint endpoint (non-null)
+   */
+  public void addEndpoint(Endpoint endpoint) {
+    Preconditions.checkArgument(endpoint != null);
+    endpoints.add(endpoint);
+  }
+
+  /**
+   * Register a port endpoint as an inet-addr formatted endpoint, using the
+   * hostname as the first part of the address
+   * @param port port port
+   * @param api  API API name
+   */
+  public void registerPortEndpoint(int port, String api) {
+    Endpoint epr =
+        RegistryTypeUtils.inetAddrEndpoint(api,
+            ProtocolTypes.PROTOCOL_TCP, host, port);
+    addEndpoint(epr);
+  }
+
+  /**
+   * Serialize. Some data structures (e.g output)
+   * may be shared
+   * @return a serialized form for marshalling as JSON
+   */
+  public ContainerInformation serialize() {
+    ContainerInformation info = new ContainerInformation();
+    info.containerId = id;
+    info.component = role;
+    info.appVersion = appVersion;
+    info.startTime = startTime;
+    info.createTime = createTime;
+    info.diagnostics = diagnostics;
+    info.state = state;
+    info.host = host;
+    info.hostURL = hostURL;
+    info.released = released ? Boolean.TRUE : null;
+    if (placement != null) {
+      info.placement = placement.toString();
+    }
+    if (output != null) {
+      info.output = output;
+    }
+    return info;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c8b0c5de/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleStatus.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleStatus.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleStatus.java
new file mode 100644
index 0000000..0a3a3c9
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleStatus.java
@@ -0,0 +1,563 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.slider.server.appmaster.state;
+
+import com.codahale.metrics.Metric;
+import com.codahale.metrics.MetricSet;
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.slider.api.types.ComponentInformation;
+import org.apache.slider.api.types.RoleStatistics;
+import org.apache.slider.providers.PlacementPolicy;
+import org.apache.slider.providers.ProviderRole;
+import org.apache.slider.server.appmaster.management.BoolMetricPredicate;
+import org.apache.slider.server.appmaster.management.LongGauge;
+
+import java.io.Serializable;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Models the ongoing status of all nodes in an application.
+ *
+ * These structures are shared across the {@link AppState} and {@link RoleHistory} structures,
+ * and must be designed for synchronous access. Atomic counters are preferred to anything which
+ * requires synchronization. Where synchronized access is good is that it allows for
+ * the whole instance to be locked, for updating multiple entries.
+ */
+public final class RoleStatus implements Cloneable, MetricSet {
+
+  private final String name;
+  private final String group;
+
+  /**
+   * Role priority
+   */
+  private final int key;
+  private final ProviderRole providerRole;
+
+  private final LongGauge actual = new LongGauge();
+  private final LongGauge completed = new LongGauge();
+  private final LongGauge desired = new LongGauge();
+  private final LongGauge failed = new LongGauge();
+  private final LongGauge failedRecently = new LongGauge(0);
+  private final LongGauge limitsExceeded = new LongGauge(0);
+  private final LongGauge nodeFailed = new LongGauge(0);
+  /** Number of AA requests queued. */
+  private final LongGauge pendingAntiAffineRequests = new LongGauge(0);
+  private final LongGauge preempted = new LongGauge(0);
+  private final LongGauge releasing = new LongGauge();
+  private final LongGauge requested = new LongGauge();
+  private final LongGauge started = new LongGauge();
+  private final LongGauge startFailed = new LongGauge();
+  private final LongGauge totalRequested = new LongGauge();
+
+  /** resource requirements */
+  private Resource resourceRequirements;
+
+
+  /** any pending AA request */
+  private volatile OutstandingRequest outstandingAArequest = null;
+
+
+  private String failureMessage = "";
+
+  public RoleStatus(ProviderRole providerRole) {
+    this.providerRole = providerRole;
+    this.name = providerRole.name;
+    this.group = providerRole.group;
+    this.key = providerRole.id;
+  }
+
+  @Override
+  public Map<String, Metric> getMetrics() {
+    Map<String, Metric> metrics = new HashMap<>(15);
+    metrics.put("actual", actual);
+    metrics.put("completed", completed );
+    metrics.put("desired", desired);
+    metrics.put("failed", failed);
+    metrics.put("limitsExceeded", limitsExceeded);
+    metrics.put("nodeFailed", nodeFailed);
+    metrics.put("preempted", preempted);
+    metrics.put("pendingAntiAffineRequests", pendingAntiAffineRequests);
+    metrics.put("releasing", releasing);
+    metrics.put("requested", requested);
+    metrics.put("preempted", preempted);
+    metrics.put("releasing", releasing );
+    metrics.put("requested", requested);
+    metrics.put("started", started);
+    metrics.put("startFailed", startFailed);
+    metrics.put("totalRequested", totalRequested);
+
+    metrics.put("outstandingAArequest",
+      new BoolMetricPredicate(new BoolMetricPredicate.Eval() {
+        @Override
+        public boolean eval() {
+          return isAARequestOutstanding();
+        }
+      }));
+    return metrics;
+  }
+
+  public String getName() {
+    return name;
+  }
+
+  public String getGroup() {
+    return group;
+  }
+
+  public int getKey() {
+    return key;
+  }
+
+  public int getPriority() {
+    return getKey();
+  }
+
+  /**
+   * Get the placement policy enum, from the values in
+   * {@link PlacementPolicy}
+   * @return the placement policy for this role
+   */
+  public int getPlacementPolicy() {
+    return providerRole.placementPolicy;
+  }
+
+  public long getPlacementTimeoutSeconds() {
+    return providerRole.placementTimeoutSeconds;
+  }
+  
+  /**
+   * The number of failures on a specific node that can be tolerated
+   * before selecting a different node for placement
+   * @return
+   */
+  public int getNodeFailureThreshold() {
+    return providerRole.nodeFailureThreshold;
+  }
+
+  public boolean isExcludeFromFlexing() {
+    return hasPlacementPolicy(PlacementPolicy.EXCLUDE_FROM_FLEXING);
+  }
+
+  public boolean isStrictPlacement() {
+    return hasPlacementPolicy(PlacementPolicy.STRICT);
+  }
+
+  public boolean isAntiAffinePlacement() {
+    return hasPlacementPolicy(PlacementPolicy.ANTI_AFFINITY_REQUIRED);
+  }
+
+  public boolean hasPlacementPolicy(int policy) {
+    return 0 != (getPlacementPolicy() & policy);
+  }
+
+  public boolean isPlacementDesired() {
+    return !hasPlacementPolicy(PlacementPolicy.ANYWHERE);
+  }
+
+  public long getDesired() {
+    return desired.get();
+  }
+
+  public void setDesired(long desired) {
+    this.desired.set(desired);
+  }
+
+  public long getActual() {
+    return actual.get();
+  }
+
+  public long incActual() {
+    return actual.incrementAndGet();
+  }
+
+  public long decActual() {
+    return actual.decToFloor(1);
+  }
+
+  /**
+   * Get the request count.
+   * @return a count of requested containers
+   */
+  public long getRequested() {
+    return requested.get();
+  }
+
+  public long incRequested() {
+    totalRequested.incrementAndGet();
+    return requested.incrementAndGet();
+  }
+
+  public void cancel(long count) {
+    requested.decToFloor(count);
+  }
+
+  public void decRequested() {
+    cancel(1);
+  }
+
+  public long getReleasing() {
+    return releasing.get();
+  }
+
+  public long incReleasing() {
+    return releasing.incrementAndGet();
+  }
+
+  public long decReleasing() {
+    return releasing.decToFloor(1);
+  }
+
+  public long getFailed() {
+    return failed.get();
+  }
+
+  public long getFailedRecently() {
+    return failedRecently.get();
+  }
+
+  /**
+   * Reset the recent failure
+   * @return the number of failures in the "recent" window
+   */
+  public long resetFailedRecently() {
+    return failedRecently.getAndSet(0);
+  }
+
+  public long getLimitsExceeded() {
+    return limitsExceeded.get();
+  }
+
+  public long incPendingAntiAffineRequests(long v) {
+    return pendingAntiAffineRequests.addAndGet(v);
+  }
+
+  /**
+   * Probe for an outstanding AA request being true
+   * @return true if there is an outstanding AA Request
+   */
+  public boolean isAARequestOutstanding() {
+    return outstandingAArequest != null;
+  }
+
+  /**
+   * expose the predicate {@link #isAARequestOutstanding()} as an integer,
+   * which is very convenient in tests
+   * @return 1 if there is an outstanding request; 0 if not
+   */
+  public int getOutstandingAARequestCount() {
+    return isAARequestOutstanding()? 1: 0;
+  }
+  /**
+   * Note that a role failed, text will
+   * be used in any diagnostics if an exception
+   * is later raised.
+   * @param startupFailure flag to indicate this was a startup event
+   * @param text text about the failure
+   * @param outcome outcome of the container
+   */
+  public synchronized void noteFailed(boolean startupFailure, String text,
+      ContainerOutcome outcome) {
+    if (text != null) {
+      failureMessage = text;
+    }
+    switch (outcome) {
+      case Preempted:
+        preempted.incrementAndGet();
+        break;
+
+      case Node_failure:
+        nodeFailed.incrementAndGet();
+        failed.incrementAndGet();
+        break;
+
+      case Failed_limits_exceeded: // exceeded memory or CPU; app/configuration related
+        limitsExceeded.incrementAndGet();
+        // fall through
+      case Failed: // application failure, possibly node related, possibly not
+      default: // anything else (future-proofing)
+        failed.incrementAndGet();
+        failedRecently.incrementAndGet();
+        //have a look to see if it short lived
+        if (startupFailure) {
+          incStartFailed();
+        }
+        break;
+    }
+  }
+
+  public long getStartFailed() {
+    return startFailed.get();
+  }
+
+  public synchronized void incStartFailed() {
+    startFailed.getAndIncrement();
+  }
+
+  public synchronized String getFailureMessage() {
+    return failureMessage;
+  }
+
+  public long getCompleted() {
+    return completed.get();
+  }
+
+  public synchronized void setCompleted(int completed) {
+    this.completed.set(completed);
+  }
+
+  public long incCompleted() {
+    return completed.incrementAndGet();
+  }
+  public long getStarted() {
+    return started.get();
+  }
+
+  public synchronized void incStarted() {
+    started.incrementAndGet();
+  }
+
+  public long getTotalRequested() {
+    return totalRequested.get();
+  }
+
+  public long getPreempted() {
+    return preempted.get();
+  }
+
+  public long getNodeFailed() {
+    return nodeFailed.get();
+  }
+
+  public long getPendingAntiAffineRequests() {
+    return pendingAntiAffineRequests.get();
+  }
+
+  public void setPendingAntiAffineRequests(long pendingAntiAffineRequests) {
+    this.pendingAntiAffineRequests.set(pendingAntiAffineRequests);
+  }
+
+  public long decPendingAntiAffineRequests() {
+    return pendingAntiAffineRequests.decToFloor(1);
+  }
+
+  public OutstandingRequest getOutstandingAArequest() {
+    return outstandingAArequest;
+  }
+
+  public void setOutstandingAArequest(OutstandingRequest outstandingAArequest) {
+    this.outstandingAArequest = outstandingAArequest;
+  }
+
+  /**
+   * Complete the outstanding AA request (there's no check for one in progress, caller
+   * expected to have done that).
+   */
+  public void completeOutstandingAARequest() {
+    setOutstandingAArequest(null);
+  }
+
+  /**
+   * Cancel any outstanding AA request. Harmless if the role is non-AA, or
+   * if there are no outstanding requests.
+   */
+  public void cancelOutstandingAARequest() {
+    if (outstandingAArequest != null) {
+      setOutstandingAArequest(null);
+      setPendingAntiAffineRequests(0);
+      decRequested();
+    }
+  }
+
+  /**
+   * Get the number of roles we are short of.
+   * nodes released are ignored.
+   * @return the positive or negative number of roles to add/release.
+   * 0 means "do nothing".
+   */
+  public long getDelta() {
+    long inuse = getActualAndRequested();
+    long delta = desired.get() - inuse;
+    if (delta < 0) {
+      //if we are releasing, remove the number that are already released.
+      delta += releasing.get();
+      //but never switch to a positive
+      delta = Math.min(delta, 0);
+    }
+    return delta;
+  }
+
+  /**
+   * Get count of actual and requested containers. This includes pending ones
+   * @return the size of the application when outstanding requests are included.
+   */
+  public long getActualAndRequested() {
+    return actual.get() + requested.get();
+  }
+
+  @Override
+  public String toString() {
+    final StringBuilder sb = new StringBuilder("RoleStatus{");
+    sb.append("name='").append(name).append('\'');
+    sb.append(", group=").append(group);
+    sb.append(", key=").append(key);
+    sb.append(", desired=").append(desired);
+    sb.append(", actual=").append(actual);
+    sb.append(", requested=").append(requested);
+    sb.append(", releasing=").append(releasing);
+    sb.append(", failed=").append(failed);
+    sb.append(", startFailed=").append(startFailed);
+    sb.append(", started=").append(started);
+    sb.append(", completed=").append(completed);
+    sb.append(", totalRequested=").append(totalRequested);
+    sb.append(", preempted=").append(preempted);
+    sb.append(", nodeFailed=").append(nodeFailed);
+    sb.append(", failedRecently=").append(failedRecently);
+    sb.append(", limitsExceeded=").append(limitsExceeded);
+    sb.append(", resourceRequirements=").append(resourceRequirements);
+    sb.append(", isAntiAffinePlacement=").append(isAntiAffinePlacement());
+    if (isAntiAffinePlacement()) {
+      sb.append(", pendingAntiAffineRequests=").append(pendingAntiAffineRequests);
+      sb.append(", outstandingAArequest=").append(outstandingAArequest);
+    }
+    sb.append(", failureMessage='").append(failureMessage).append('\'');
+    sb.append(", providerRole=").append(providerRole);
+    sb.append('}');
+    return sb.toString();
+  }
+
+  @Override
+  public synchronized  Object clone() throws CloneNotSupportedException {
+    return super.clone();
+  }
+
+  /**
+   * Get the provider role
+   * @return the provider role
+   */
+  public ProviderRole getProviderRole() {
+    return providerRole;
+  }
+
+  /**
+   * Build the statistics map from the current data
+   * @return a map for use in statistics reports
+   */
+  public Map<String, Integer> buildStatistics() {
+    ComponentInformation componentInformation = serialize();
+    return componentInformation.buildStatistics();
+  }
+
+  /**
+   * Produced a serialized form which can be served up as JSON
+   * @return a summary of the current role status.
+   */
+  public synchronized ComponentInformation serialize() {
+    ComponentInformation info = new ComponentInformation();
+    info.name = name;
+    info.priority = getPriority();
+    info.desired = desired.intValue();
+    info.actual = actual.intValue();
+    info.requested = requested.intValue();
+    info.releasing = releasing.intValue();
+    info.failed = failed.intValue();
+    info.startFailed = startFailed.intValue();
+    info.placementPolicy = getPlacementPolicy();
+    info.failureMessage = failureMessage;
+    info.totalRequested = totalRequested.intValue();
+    info.failedRecently = failedRecently.intValue();
+    info.nodeFailed = nodeFailed.intValue();
+    info.preempted = preempted.intValue();
+    info.pendingAntiAffineRequestCount = pendingAntiAffineRequests.intValue();
+    info.isAARequestOutstanding = isAARequestOutstanding();
+    return info;
+  }
+
+  /**
+   * Get the (possibly null) label expression for this role
+   * @return a string or null
+   */
+  public String getLabelExpression() {
+    return providerRole.labelExpression;
+  }
+
+  public Resource getResourceRequirements() {
+    return resourceRequirements;
+  }
+
+  public void setResourceRequirements(Resource resourceRequirements) {
+    this.resourceRequirements = resourceRequirements;
+  }
+
+  /**
+   * Compare two role status entries by name
+   */
+  public static class CompareByName implements Comparator<RoleStatus>,
+      Serializable {
+    @Override
+    public int compare(RoleStatus o1, RoleStatus o2) {
+      return o1.getName().compareTo(o2.getName());
+    }
+  }
+  
+  /**
+   * Compare two role status entries by key
+   */
+  public static class CompareByKey implements Comparator<RoleStatus>,
+      Serializable {
+    @Override
+    public int compare(RoleStatus o1, RoleStatus o2) {
+      return (o1.getKey() < o2.getKey() ? -1 : (o1.getKey() == o2.getKey() ? 0 : 1));
+    }
+  }
+
+  /**
+   * Given a resource, set its requirements to those this role needs
+   * @param resource resource to configure
+   * @return the resource
+   */
+  public Resource copyResourceRequirements(Resource resource) {
+    Preconditions.checkNotNull(resourceRequirements,
+        "Role resource requirements have not been set");
+    resource.setMemory(resourceRequirements.getMemory());
+    resource.setVirtualCores(resourceRequirements.getVirtualCores());
+    return resource;
+  }
+
+  public synchronized RoleStatistics getStatistics() {
+    RoleStatistics stats = new RoleStatistics();
+    stats.activeAA = getOutstandingAARequestCount();
+    stats.actual = actual.get();
+    stats.desired = desired.get();
+    stats.failed = failed.get();
+    stats.limitsExceeded = limitsExceeded.get();
+    stats.nodeFailed = nodeFailed.get();
+    stats.preempted = preempted.get();
+    stats.releasing = releasing.get();
+    stats.requested = requested.get();
+    stats.started = started.get();
+    stats.startFailed = startFailed.get();
+    stats.totalRequested = totalRequested.get();
+    return stats;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c8b0c5de/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/SimpleReleaseSelector.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/SimpleReleaseSelector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/SimpleReleaseSelector.java
new file mode 100644
index 0000000..b848096
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/SimpleReleaseSelector.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.slider.server.appmaster.state;
+
+import java.util.List;
+
+/**
+ * Simplest release selector simply returns the list
+ */
+public class SimpleReleaseSelector implements ContainerReleaseSelector {
+
+  @Override
+  public List<RoleInstance> sortCandidates(int roleId,
+      List<RoleInstance> candidates) {
+    return candidates;
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message