hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jia...@apache.org
Subject [15/50] [abbrv] hadoop git commit: YARN-5461. Initial code ported from slider-core module. (jianhe)
Date Tue, 11 Oct 2016 20:37:19 GMT
http://git-wip-us.apache.org/repos/asf/hadoop/blob/82fdd408/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/AppStateBindingInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/AppStateBindingInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/AppStateBindingInfo.java
new file mode 100644
index 0000000..a8aa1a2
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/AppStateBindingInfo.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.slider.server.appmaster.state;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.NodeReport;
+import org.apache.slider.core.conf.AggregateConf;
+import org.apache.slider.providers.ProviderRole;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Binding information for application states; designed to be extensible
+ * so that tests don't have to be massivley reworked when new arguments
+ * are added.
+ */
+public class AppStateBindingInfo {
+  public AggregateConf instanceDefinition;
+  public Configuration serviceConfig = new Configuration();
+  public Configuration publishedProviderConf = new Configuration(false);
+  public List<ProviderRole> roles = new ArrayList<>();
+  public FileSystem fs;
+  public Path historyPath;
+  public List<Container> liveContainers = new ArrayList<>(0);
+  public Map<String, String> applicationInfo = new HashMap<>();
+  public ContainerReleaseSelector releaseSelector = new SimpleReleaseSelector();
+  /** node reports off the RM. */
+  public List<NodeReport> nodeReports = new ArrayList<>(0);
+
+  public void validate() throws IllegalArgumentException {
+    Preconditions.checkArgument(instanceDefinition != null, "null instanceDefinition");
+    Preconditions.checkArgument(serviceConfig != null, "null appmasterConfig");
+    Preconditions.checkArgument(publishedProviderConf != null, "null publishedProviderConf");
+    Preconditions.checkArgument(releaseSelector != null, "null releaseSelector");
+    Preconditions.checkArgument(roles != null, "null providerRoles");
+    Preconditions.checkArgument(fs != null, "null fs");
+    Preconditions.checkArgument(historyPath != null, "null historyDir");
+    Preconditions.checkArgument(nodeReports != null, "null nodeReports");
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/82fdd408/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/ContainerAllocationOutcome.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/ContainerAllocationOutcome.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/ContainerAllocationOutcome.java
new file mode 100644
index 0000000..5b3a93c
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/ContainerAllocationOutcome.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.slider.server.appmaster.state;
+
+/**
+ * Outcome of the assignment
+ */
+public enum ContainerAllocationOutcome {
+  /**
+   * There wasn't a request for this
+   */
+  Unallocated,
+
+  /**
+   * Open placement
+   */
+  Open,
+
+  /**
+   * Allocated explicitly  where requested
+   */
+  Placed,
+
+  /**
+   * This was an escalated placement
+   */
+  Escalated
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/82fdd408/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/ContainerAllocationResults.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/ContainerAllocationResults.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/ContainerAllocationResults.java
new file mode 100644
index 0000000..e80639e
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/ContainerAllocationResults.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.slider.server.appmaster.state;
+
+import org.apache.slider.server.appmaster.operations.AbstractRMOperation;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * This is just a tuple of the outcome of a container allocation
+ */
+public class ContainerAllocationResults {
+
+  /**
+   * What was the outcome of this allocation: placed, escalated, ...
+   */
+  public ContainerAllocationOutcome outcome;
+
+  /**
+   * The outstanding request which originated this.
+   * This will be null if the outcome is {@link ContainerAllocationOutcome#Unallocated}
+   * as it wasn't expected.
+   */
+  public OutstandingRequest origin;
+
+  /**
+   * A possibly empty list of requests to add to the follow-up actions
+   */
+  public List<AbstractRMOperation> operations = new ArrayList<>(0);
+
+  public ContainerAllocationResults() {
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/82fdd408/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/ContainerAssignment.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/ContainerAssignment.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/ContainerAssignment.java
new file mode 100644
index 0000000..3e8a3c3
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/ContainerAssignment.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.slider.server.appmaster.state;
+
+import org.apache.hadoop.yarn.api.records.Container;
+
+/**
+ * Static assignment structure
+ */
+public class ContainerAssignment {
+
+  /**
+   * Container that has been allocated
+   */
+  public final Container container;
+
+  /**
+   * Role to assign to it
+   */
+  public final RoleStatus role;
+
+  /**
+   * Placement outcome: was this from history or not
+   */
+  public final ContainerAllocationOutcome placement;
+
+  public ContainerAssignment(Container container,
+      RoleStatus role,
+      ContainerAllocationOutcome placement) {
+    this.container = container;
+    this.role = role;
+    this.placement = placement;
+  }
+
+  @Override
+  public String toString() {
+    final StringBuilder sb = new StringBuilder("ContainerAssignment{");
+    sb.append("container=").append(container);
+    sb.append(", role=").append(role);
+    sb.append(", placement=").append(placement);
+    sb.append('}');
+    return sb.toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/82fdd408/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/ContainerOutcome.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/ContainerOutcome.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/ContainerOutcome.java
new file mode 100644
index 0000000..59ab30b
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/ContainerOutcome.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.slider.server.appmaster.state;
+
+import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
+
+/**
+ * Container outcomes we care about; slightly simplified from
+ * {@link ContainerExitStatus} -and hopefully able to handle
+ * any new exit codes.
+ */
+public enum ContainerOutcome {
+  Completed,
+  Failed,
+  Failed_limits_exceeded,
+  Node_failure,
+  Preempted;
+
+  /**
+   * Build a container outcome from an exit status.
+   * The values in {@link ContainerExitStatus} are used
+   * here.
+   * @param exitStatus exit status
+   * @return an enumeration of the outcome.
+   */
+  public static ContainerOutcome fromExitStatus(int exitStatus) {
+    switch (exitStatus) {
+      case ContainerExitStatus.ABORTED:
+      case ContainerExitStatus.KILLED_BY_APPMASTER:
+      case ContainerExitStatus.KILLED_BY_RESOURCEMANAGER:
+      case ContainerExitStatus.KILLED_AFTER_APP_COMPLETION:
+        // could either be a release or node failure. Treat as completion
+        return Completed;
+      case ContainerExitStatus.DISKS_FAILED:
+        return Node_failure;
+      case ContainerExitStatus.PREEMPTED:
+        return Preempted;
+      case ContainerExitStatus.KILLED_EXCEEDED_PMEM:
+      case ContainerExitStatus.KILLED_EXCEEDED_VMEM:
+        return Failed_limits_exceeded;
+      default:
+        return exitStatus == 0 ? Completed : Failed;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/82fdd408/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/ContainerPriority.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/ContainerPriority.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/ContainerPriority.java
new file mode 100644
index 0000000..df222fa
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/ContainerPriority.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.slider.server.appmaster.state;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.util.Records;
+
+import java.util.Locale;
+
+/**
+ * Class containing the logic to build/split container priorities into the
+ * different fields used by Slider
+ *
+ * The original design here had a requestID merged with the role, to
+ * track outstanding requests. However, this isn't possible, so
+ * the request ID has been dropped. A "location specified" flag was
+ * added to indicate whether or not the request was for a specific location
+ * -though this is currently unused.
+ * 
+ * The methods are effectively surplus -but retained to preserve the
+ * option of changing behavior in future
+ */
+public final class ContainerPriority {
+
+  // bit that represents whether location is specified
+  static final int NOLOCATION = 1 << 30;
+  
+  public static int buildPriority(int role,
+                                  boolean locationSpecified) {
+    int location = locationSpecified ? 0 : NOLOCATION;
+    return role | location;
+  }
+
+
+  public static Priority createPriority(int role,
+                                        boolean locationSpecified) {
+    Priority pri = Records.newRecord(Priority.class);
+    pri.setPriority(ContainerPriority.buildPriority(role,
+                                                    locationSpecified));
+    return pri;
+  }
+
+  public static int extractRole(int priority) {
+    return priority >= NOLOCATION ? priority ^ NOLOCATION : priority;
+  }
+
+  /**
+   * Does the priority have location
+   * @param priority priority index
+   * @return true if the priority has the location marker
+   */
+  public static boolean hasLocation(int priority) {
+    return (priority ^ NOLOCATION ) == 0;
+  }
+  
+  /**
+   * Map from a container to a role key by way of its priority
+   * @param container container
+   * @return role key
+   */
+  public static int extractRole(Container container) {
+    Priority priority = container.getPriority();
+    return extractRole(priority);
+  }
+  
+  /**
+   * Priority record to role mapper
+   * @param priorityRecord priority record
+   * @return the role #
+   */
+  public static int extractRole(Priority priorityRecord) {
+    Preconditions.checkNotNull(priorityRecord);
+    return extractRole(priorityRecord.getPriority());
+  }
+
+  /**
+   * Convert a priority record to a string, extracting role and locality
+   * @param priorityRecord priority record. May be null
+   * @return a string value
+   */
+  public static String toString(Priority priorityRecord) {
+    if (priorityRecord==null) {
+      return "(null)";
+    } else {
+      return String.format(Locale.ENGLISH,
+          "role %d (locality=%b)",
+          extractRole(priorityRecord),
+          hasLocation(priorityRecord.getPriority()));
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/82fdd408/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/ContainerReleaseSelector.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/ContainerReleaseSelector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/ContainerReleaseSelector.java
new file mode 100644
index 0000000..fafbada
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/ContainerReleaseSelector.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.slider.server.appmaster.state;
+
+import java.util.List;
+
+/**
+ * Interface implemented by anything that must choose containers to release
+ * 
+ */
+public interface ContainerReleaseSelector {
+
+  /**
+   * Given a list of candidate containers, return a sorted version of the priority
+   * in which they should be released. 
+   * @param candidates candidate list ... everything considered suitable
+   * @return the list of candidates
+   */
+  List<RoleInstance> sortCandidates(int roleId,
+      List<RoleInstance> candidates);
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/82fdd408/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/MostRecentContainerReleaseSelector.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/MostRecentContainerReleaseSelector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/MostRecentContainerReleaseSelector.java
new file mode 100644
index 0000000..38c5b8e
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/MostRecentContainerReleaseSelector.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.slider.server.appmaster.state;
+
+import org.apache.slider.common.tools.Comparators;
+
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+
+/**
+ * Sort the candidate list by the most recent container first.
+ */
+public class MostRecentContainerReleaseSelector implements ContainerReleaseSelector {
+
+  @Override
+  public List<RoleInstance> sortCandidates(int roleId,
+      List<RoleInstance> candidates) {
+    Collections.sort(candidates, new newerThan());
+    return candidates;
+  }
+
+  private static class newerThan implements Comparator<RoleInstance>, Serializable {
+    private final Comparator<Long> innerComparator =
+        new Comparators.ComparatorReverser<>(new Comparators.LongComparator());
+    public int compare(RoleInstance o1, RoleInstance o2) {
+      return innerComparator.compare(o1.createTime, o2.createTime);
+
+    }
+    
+  }
+  
+  
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/82fdd408/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/NodeEntry.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/NodeEntry.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/NodeEntry.java
new file mode 100644
index 0000000..eb8ff03
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/NodeEntry.java
@@ -0,0 +1,325 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.slider.server.appmaster.state;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.slider.api.types.NodeEntryInformation;
+
+/**
+ * Information about the state of a role on a specific node instance.
+ * No fields are synchronized; sync on the instance to work with it
+ * <p>
+ * The two fields `releasing` and `requested` are used to track the ongoing
+ * state of YARN requests; they do not need to be persisted across stop/start
+ * cycles. They may be relevant across AM restart, but without other data
+ * structures in the AM, not enough to track what the AM was up to before
+ * it was restarted. The strategy will be to ignore unexpected allocation
+ * responses (which may come from pre-restart) requests, while treating
+ * unexpected container release responses as failures.
+ * <p>
+ * The `active` counter is only decremented after a container release response
+ * has been received.
+ * <p>
+ *
+ */
+public class NodeEntry implements Cloneable {
+  
+  public final int rolePriority;
+
+  public NodeEntry(int rolePriority) {
+    this.rolePriority = rolePriority;
+  }
+
+  /**
+   * instance explicitly requested on this node: it's OK if an allocation
+   * comes in that has not been (and when that happens, this count should 
+   * not drop).
+   */
+  private int requested;
+
+  /** number of starting instances */
+  private int starting;
+
+  /** incrementing counter of instances that failed to start */
+  private int startFailed;
+
+  /** incrementing counter of instances that failed */
+  private int failed;
+
+  /**
+   * Counter of "failed recently" events. These are all failures
+   * which have happened since it was last reset.
+   */
+  private int failedRecently;
+
+  /** incrementing counter of instances that have been pre-empted. */
+  private int preempted;
+
+  /**
+   * Number of live nodes. 
+   */
+  private int live;
+
+  /** number of containers being released off this node */
+  private int releasing;
+
+  /** timestamp of last use */
+  private long lastUsed;
+
+  /**
+   * Is the node available for assignments? That is, it is
+   * not running any instances of this type, nor are there
+   * any requests oustanding for it.
+   * @return true if a new request could be issued without taking
+   * the number of instances &gt; 1.
+   */
+  public synchronized boolean isAvailable() {
+    return live + requested + starting - releasing <= 0;
+  }
+
+  /**
+   * Are the anti-affinity constraints held. That is, zero or one
+   * node running or starting
+   * @return true if the constraint holds.
+   */
+  public synchronized boolean isAntiAffinityConstraintHeld() {
+    return (live - releasing + starting) <= 1;
+  }
+
+  /**
+   * return no of active instances -those that could be released as they
+   * are live and not already being released
+   * @return a number, possibly 0
+   */
+  public synchronized int getActive() {
+    return (live - releasing);
+  }
+
+  /**
+   * Return true if the node is not busy, and it
+   * has not been used since the absolute time
+   * @param absoluteTime time
+   * @return true if the node could be cleaned up
+   */
+  public synchronized boolean notUsedSince(long absoluteTime) {
+    return isAvailable() && lastUsed < absoluteTime;
+  }
+
+  public synchronized int getLive() {
+    return live;
+  }
+
+  public int getStarting() {
+    return starting;
+  }
+
+  /**
+   * Set the live value directly -used on AM restart
+   * @param v value
+   */
+  public synchronized void setLive(int v) {
+    live = v;
+  }
+  
+  private synchronized void incLive() {
+    ++live;
+  }
+
+  private synchronized void decLive() {
+    live = RoleHistoryUtils.decToFloor(live);
+  }
+  
+  public synchronized void onStarting() {
+    ++starting;
+  }
+
+  private void decStarting() {
+    starting = RoleHistoryUtils.decToFloor(starting);
+  }
+
+  public synchronized void onStartCompleted() {
+    decStarting();
+    incLive();
+  }
+  
+    /**
+   * start failed -decrement the starting flag.
+   * @return true if the node is now available
+   */
+  public synchronized boolean onStartFailed() {
+    decStarting();
+    ++startFailed;
+    return containerCompleted(false, ContainerOutcome.Failed);
+  }
+  
+  /**
+   * no of requests made of this role of this node. If it goes above
+   * 1 there's a problem
+   */
+  public synchronized  int getRequested() {
+    return requested;
+  }
+
+  /**
+   * request a node: 
+   */
+  public synchronized void request() {
+    ++requested;
+  }
+
+  /**
+   * A request made explicitly to this node has completed
+   */
+  public synchronized void requestCompleted() {
+    requested = RoleHistoryUtils.decToFloor(requested);
+  }
+
+  /**
+   * No of instances in release state
+   */
+  public synchronized int getReleasing() {
+    return releasing;
+  }
+
+  /**
+   * Release an instance -which is no longer marked as active
+   */
+  public synchronized void release() {
+    releasing++;
+  }
+
+  /**
+   * completion event, which can be a planned or unplanned
+   * planned: dec our release count
+   * unplanned: dec our live count
+   * @param wasReleased true if this was planned
+   * @param outcome
+   * @return true if this node is now available
+   */
+  public synchronized boolean containerCompleted(boolean wasReleased, ContainerOutcome outcome) {
+    if (wasReleased) {
+      releasing = RoleHistoryUtils.decToFloor(releasing);
+    } else {
+      // for the node, we use the outcome of the faiure to decide
+      // whether this is potentially "node-related"
+      switch(outcome) {
+        // general "any reason" app failure
+        case Failed:
+        // specific node failure
+        case Node_failure:
+
+          ++failed;
+          ++failedRecently;
+          break;
+
+        case Preempted:
+          preempted++;
+          break;
+
+          // failures which are node-independent
+        case Failed_limits_exceeded:
+        case Completed:
+        default:
+          break;
+      }
+    }
+    decLive();
+    return isAvailable();
+  }
+
+  /**
+   * Time last used.
+   */
+  public synchronized long getLastUsed() {
+    return lastUsed;
+  }
+
+  public synchronized void setLastUsed(long lastUsed) {
+    this.lastUsed = lastUsed;
+  }
+
+  public synchronized int getStartFailed() {
+    return startFailed;
+  }
+
+  public synchronized int getFailed() {
+    return failed;
+  }
+
+  public synchronized int getFailedRecently() {
+    return failedRecently;
+  }
+
+  @VisibleForTesting
+  public synchronized void setFailedRecently(int failedRecently) {
+    this.failedRecently = failedRecently;
+  }
+
+  public synchronized int getPreempted() {
+    return preempted;
+  }
+
+
+  /**
+   * Reset the failed recently count.
+   */
+  public synchronized void resetFailedRecently() {
+    failedRecently = 0;
+  }
+
+  @Override
+  public String toString() {
+    final StringBuilder sb = new StringBuilder("NodeEntry{");
+    sb.append("priority=").append(rolePriority);
+    sb.append(", requested=").append(requested);
+    sb.append(", starting=").append(starting);
+    sb.append(", live=").append(live);
+    sb.append(", releasing=").append(releasing);
+    sb.append(", lastUsed=").append(lastUsed);
+    sb.append(", failedRecently=").append(failedRecently);
+    sb.append(", preempted=").append(preempted);
+    sb.append(", startFailed=").append(startFailed);
+    sb.append('}');
+    return sb.toString();
+  }
+
+  /**
+   * Produced a serialized form which can be served up as JSON
+   * @return a summary of the current role status.
+   */
+  public synchronized NodeEntryInformation serialize() {
+    NodeEntryInformation info = new NodeEntryInformation();
+    info.priority = rolePriority;
+    info.requested = requested;
+    info.releasing = releasing;
+    info.starting = starting;
+    info.startFailed = startFailed;
+    info.failed = failed;
+    info.failedRecently = failedRecently;
+    info.preempted = preempted;
+    info.live = live;
+    info.lastUsed = lastUsed;
+    return info;
+  }
+
+  @Override
+  public Object clone() throws CloneNotSupportedException {
+    return super.clone();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/82fdd408/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/NodeInstance.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/NodeInstance.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/NodeInstance.java
new file mode 100644
index 0000000..cc17cf0
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/NodeInstance.java
@@ -0,0 +1,409 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.slider.server.appmaster.state;
+
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.NodeReport;
+import org.apache.hadoop.yarn.api.records.NodeState;
+import org.apache.slider.api.types.NodeInformation;
+import org.apache.slider.common.tools.Comparators;
+import org.apache.slider.common.tools.SliderUtils;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.Map;
+
+/**
+ * A node instance -stores information about a node in the cluster.
+ * <p>
+ * Operations on the array/set of roles are synchronized.
+ */
+public class NodeInstance {
+
+  public final String hostname;
+
+  /**
+   * last state of node. Starts off as {@link NodeState#RUNNING},
+   * on the assumption that it is live.
+   */
+  private NodeState nodeState = NodeState.RUNNING;
+
+  /**
+   * Last node report. If null: none
+   */
+  private NodeReport nodeReport = null;
+
+  /**
+   * time of state update
+   */
+  private long nodeStateUpdateTime = 0;
+
+  /**
+   * Node labels.
+   *
+   * IMPORTANT: we assume that there is one label/node, which is the policy
+   * for Hadoop as of November 2015
+   */
+  private String nodeLabels = "";
+
+  /**
+   * An unordered list of node entries of specific roles. There's nothing
+   * indexed so as to support sparser datastructures.
+   */
+  private final List<NodeEntry> nodeEntries;
+
+  /**
+   * Create an instance and the (empty) array of nodes
+   * @param roles role count -the no. of roles
+   */
+  public NodeInstance(String hostname, int roles) {
+    this.hostname = hostname;
+    nodeEntries = new ArrayList<>(roles);
+  }
+
+  /**
+   * Update the node status.
+   * The return code is true if the node state changed enough to
+   * trigger a re-evaluation of pending requests. That is, either a node
+   * became available when it was previously not, or the label changed
+   * on an available node.
+   *
+   * Transitions of a node from live to dead aren't treated as significant,
+   * nor label changes on a dead node.
+   *
+   * @param report latest node report
+   * @return true if the node state changed enough for a request evaluation.
+   */
+  public synchronized boolean updateNode(NodeReport report) {
+    nodeStateUpdateTime = report.getLastHealthReportTime();
+    nodeReport = report;
+    NodeState oldState = nodeState;
+    boolean oldStateUnusable = oldState.isUnusable();
+    nodeState = report.getNodeState();
+    boolean newUsable = !nodeState.isUnusable();
+    boolean nodeNowAvailable = oldStateUnusable && newUsable;
+    String labels = this.nodeLabels;
+    nodeLabels = SliderUtils.extractNodeLabel(report);
+    return nodeNowAvailable
+        || newUsable && !this.nodeLabels.equals(labels);
+  }
+
+  public String getNodeLabels() {
+    return nodeLabels;
+  }
+
+  /**
+   * Get the entry for a role -if present
+   * @param role role index
+   * @return the entry
+   * null if the role is out of range
+   */
+  public synchronized NodeEntry get(int role) {
+    for (NodeEntry nodeEntry : nodeEntries) {
+      if (nodeEntry.rolePriority == role) {
+        return nodeEntry;
+      }
+    }
+    return null;
+  }
+  
+  /**
+   * Get the entry for a role -if present
+   * @param role role index
+   * @return the entry
+   * @throws ArrayIndexOutOfBoundsException if the role is out of range
+   */
+  public synchronized NodeEntry getOrCreate(int role) {
+    NodeEntry entry = get(role);
+    if (entry == null) {
+      entry = new NodeEntry(role);
+      nodeEntries.add(entry);
+    }
+    return entry;
+  }
+
+  /**
+   * Get the node entry matching a container on this node
+   * @param container container
+   * @return matching node instance for the role
+   */
+  public NodeEntry getOrCreate(Container container) {
+    return getOrCreate(ContainerPriority.extractRole(container));
+  }
+
+  /**
+   * Count the number of active role instances on this node
+   * @param role role index
+   * @return 0 if there are none, otherwise the #of nodes that are running and
+   * not being released already.
+   */
+  public int getActiveRoleInstances(int role) {
+    NodeEntry nodeEntry = get(role);
+    return (nodeEntry != null ) ? nodeEntry.getActive() : 0;
+  }
+  
+  /**
+   * Count the number of live role instances on this node
+   * @param role role index
+   * @return 0 if there are none, otherwise the #of nodes that are running 
+   */
+  public int getLiveRoleInstances(int role) {
+    NodeEntry nodeEntry = get(role);
+    return (nodeEntry != null ) ? nodeEntry.getLive() : 0;
+  }
+
+  /**
+   * Is the node considered online
+   * @return the node
+   */
+  public boolean isOnline() {
+    return !nodeState.isUnusable();
+  }
+
+  /**
+   * Query for a node being considered unreliable
+   * @param role role key
+   * @param threshold threshold above which a node is considered unreliable
+   * @return true if the node is considered unreliable
+   */
+  public boolean isConsideredUnreliable(int role, int threshold) {
+    NodeEntry entry = get(role);
+    return entry != null && entry.getFailedRecently() > threshold;
+  }
+
+  /**
+   * Get the entry for a role -and remove it if present
+   * @param role the role index
+   * @return the entry that WAS there
+   */
+  public synchronized NodeEntry remove(int role) {
+    NodeEntry nodeEntry = get(role);
+    if (nodeEntry != null) {
+      nodeEntries.remove(nodeEntry);
+    }
+    return nodeEntry;
+  }
+
+  public synchronized void set(int role, NodeEntry nodeEntry) {
+    remove(role);
+    nodeEntries.add(nodeEntry);
+  }
+
+  /**
+   * run through each entry; gc'ing & removing old ones that don't have
+   * a recent failure count (we care about those)
+   * @param absoluteTime age in millis
+   * @return true if there are still entries left
+   */
+  public synchronized boolean purgeUnusedEntries(long absoluteTime) {
+    boolean active = false;
+    ListIterator<NodeEntry> entries = nodeEntries.listIterator();
+    while (entries.hasNext()) {
+      NodeEntry entry = entries.next();
+      if (entry.notUsedSince(absoluteTime) && entry.getFailedRecently() == 0) {
+        entries.remove();
+      } else {
+        active = true;
+      }
+    }
+    return active;
+  }
+
+
+  /**
+   * run through each entry resetting the failure count
+   */
+  public synchronized void resetFailedRecently() {
+    for (NodeEntry entry : nodeEntries) {
+      entry.resetFailedRecently();
+    }
+  }
+  
+  @Override
+  public String toString() {
+    return hostname;
+  }
+
+  /**
+   * Full dump of entry including children
+   * @return a multi-line description fo the node
+   */
+  public String toFullString() {
+    final StringBuilder sb =
+      new StringBuilder(toString());
+    sb.append("{ ");
+    for (NodeEntry entry : nodeEntries) {
+      sb.append(String.format("\n  [%02d]  ", entry.rolePriority));
+        sb.append(entry.toString());
+    }
+    sb.append("} ");
+    return sb.toString();
+  }
+
+  /**
+   * Equality test is purely on the hostname of the node address
+   * @param o other
+   * @return true if the hostnames are equal
+   */
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    NodeInstance that = (NodeInstance) o;
+    return hostname.equals(that.hostname);
+  }
+
+  @Override
+  public int hashCode() {
+    return hostname.hashCode();
+  }
+
+
+  /**
+   * Predicate to query if the number of recent failures of a role
+   * on this node exceeds that role's failure threshold.
+   * If there is no record of a deployment of that role on this
+   * node, the failure count is taken as "0".
+   * @param role role to look up
+   * @return true if the failure rate is above the threshold.
+   */
+  public boolean exceedsFailureThreshold(RoleStatus role) {
+    NodeEntry entry = get(role.getKey());
+    int numFailuresOnLastHost = entry != null ? entry.getFailedRecently() : 0;
+    int failureThreshold = role.getNodeFailureThreshold();
+    return failureThreshold < 0 || numFailuresOnLastHost > failureThreshold;
+  }
+
+  /**
+   * Produced a serialized form which can be served up as JSON
+   * @param naming map of priority -> value for naming entries
+   * @return a summary of the current role status.
+   */
+  public synchronized NodeInformation serialize(Map<Integer, String> naming) {
+    NodeInformation info = new NodeInformation();
+    info.hostname = hostname;
+    // null-handling state constructor
+    info.state = "" + nodeState;
+    info.lastUpdated = nodeStateUpdateTime;
+    info.labels = nodeLabels;
+    if (nodeReport != null) {
+      info.httpAddress = nodeReport.getHttpAddress();
+      info.rackName = nodeReport.getRackName();
+      info.healthReport = nodeReport.getHealthReport();
+    }
+    info.entries = new HashMap<>(nodeEntries.size());
+    for (NodeEntry nodeEntry : nodeEntries) {
+      String name = naming.get(nodeEntry.rolePriority);
+      if (name == null) {
+        name = Integer.toString(nodeEntry.rolePriority);
+      }
+      info.entries.put(name, nodeEntry.serialize());
+    }
+    return info;
+  }
+
+  /**
+   * Is this node instance a suitable candidate for the specific role?
+   * @param role role ID
+   * @param label label which must match, or "" for no label checks
+   * @return true if the node has space for this role, is running and the labels
+   * match.
+   */
+  public boolean canHost(int role, String label) {
+    return isOnline()
+        && (SliderUtils.isUnset(label) || label.equals(nodeLabels))   // label match
+        && getOrCreate(role).isAvailable();                          // no live role
+  }
+
+  /**
+   * A comparator for sorting entries where the node is preferred over another.
+   *
+   * The exact algorithm may change: current policy is "most recent first", so sorted
+   * on the lastUsed
+   *
+   * the comparision is a positive int if left is preferred to right;
+   * negative if right over left, 0 for equal
+   */
+  public static class Preferred implements Comparator<NodeInstance>, Serializable {
+
+    private static final Comparators.InvertedLongComparator comparator =
+        new Comparators.InvertedLongComparator();
+    private final int role;
+
+    public Preferred(int role) {
+      this.role = role;
+    }
+
+    @Override
+    public int compare(NodeInstance o1, NodeInstance o2) {
+      NodeEntry left = o1.get(role);
+      NodeEntry right = o2.get(role);
+      long ageL = left != null ? left.getLastUsed() : -1;
+      long ageR = right != null ? right.getLastUsed() : -1;
+      return comparator.compare(ageL, ageR);
+    }
+  }
+
+  /**
+   * A comparator for sorting entries where the role is newer than
+   * the other. 
+   * This sort only compares the lastUsed field, not whether the
+   * node is in use or not
+   */
+  public static class MoreActiveThan implements Comparator<NodeInstance>,
+                                           Serializable {
+
+    private final int role;
+
+    public MoreActiveThan(int role) {
+      this.role = role;
+    }
+
+    @Override
+    public int compare(NodeInstance left, NodeInstance right) {
+      int activeLeft = left.getActiveRoleInstances(role);
+      int activeRight = right.getActiveRoleInstances(role);
+      return activeRight - activeLeft;
+    }
+  }
+  /**
+   * A comparator for sorting entries alphabetically
+   */
+  public static class CompareNames implements Comparator<NodeInstance>,
+                                           Serializable {
+
+    public CompareNames() {
+    }
+
+    @Override
+    public int compare(NodeInstance left, NodeInstance right) {
+      return left.hostname.compareTo(right.hostname);
+    }
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/82fdd408/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/NodeMap.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/NodeMap.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/NodeMap.java
new file mode 100644
index 0000000..3858b68
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/NodeMap.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.slider.server.appmaster.state;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.yarn.api.records.NodeReport;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Node map map -and methods to work with it. 
+ * Not Synchronized: caller is expected to lock access.
+ */
+public class NodeMap extends HashMap<String, NodeInstance> {
+  protected static final Logger log =
+    LoggerFactory.getLogger(NodeMap.class);
+
+  /**
+   * number of roles
+   */
+  private final int roleSize;
+
+  /**
+   * Construct
+   * @param roleSize number of roles
+   */
+  public NodeMap(int roleSize) {
+    this.roleSize = roleSize;
+  }
+
+  /**
+   * Get the node instance for the specific node -creating it if needed
+   * @param hostname node
+   * @return the instance
+   */
+  public NodeInstance getOrCreate(String hostname) {
+    NodeInstance node = get(hostname);
+    if (node == null) {
+      node = new NodeInstance(hostname, roleSize);
+      put(hostname, node);
+    }
+    return node;
+  }
+
+  /**
+   * List the active nodes
+   * @param role role
+   * @return a possibly empty sorted list of all nodes that are active
+   * in that role
+   */
+  public List<NodeInstance> listActiveNodes(int role) {
+    List<NodeInstance> nodes = new ArrayList<>();
+    for (NodeInstance instance : values()) {
+      if (instance.getActiveRoleInstances(role) > 0) {
+        nodes.add(instance);
+      }
+    }
+    Collections.sort(nodes, new NodeInstance.MoreActiveThan(role));
+    return nodes;
+  }
+
+  /**
+   * reset the failed recently counters
+   */
+  public void resetFailedRecently() {
+    for (Map.Entry<String, NodeInstance> entry : entrySet()) {
+      NodeInstance ni = entry.getValue();
+      ni.resetFailedRecently();
+    }
+  }
+
+  /**
+   * Update the node state. Return true if the node state changed: either by
+   * being created, or by changing its internal state as defined
+   * by {@link NodeInstance#updateNode(NodeReport)}.
+   *
+   * @param hostname host name
+   * @param report latest node report
+   * @return true if the node state changed enough for a request evaluation.
+   */
+  public boolean updateNode(String hostname, NodeReport report) {
+    boolean nodeExisted = get(hostname) != null;
+    boolean updated = getOrCreate(hostname).updateNode(report);
+    return updated || !nodeExisted;
+  }
+
+  /**
+   * Clone point
+   * @return a shallow clone
+   */
+  @Override
+  public Object clone() {
+    return super.clone();
+  }
+
+  /**
+   * Insert a list of nodes into the map; overwrite any with that name
+   * This is a bulk operation for testing.
+   * @param nodes collection of nodes.
+   */
+  @VisibleForTesting
+  public void insert(Collection<NodeInstance> nodes) {
+    for (NodeInstance node : nodes) {
+      put(node.hostname, node);
+    }
+  }
+
+  /**
+   * Test helper: build or update a cluster from a list of node reports
+   * @param reports the list of reports
+   * @return true if this has been considered to have changed the cluster
+   */
+  @VisibleForTesting
+  public boolean buildOrUpdate(List<NodeReport> reports) {
+    boolean updated = false;
+    for (NodeReport report : reports) {
+      updated |= getOrCreate(report.getNodeId().getHost()).updateNode(report);
+    }
+    return updated;
+  }
+
+  /**
+   * Scan the current node map for all nodes capable of hosting an instance
+   * @param role role ID
+   * @param label label which must match, or "" for no label checks
+   * @return a possibly empty list of node instances matching the criteria.
+   */
+  public List<NodeInstance> findAllNodesForRole(int role, String label) {
+    List<NodeInstance> nodes = new ArrayList<>(size());
+    for (NodeInstance instance : values()) {
+      if (instance.canHost(role, label)) {
+        nodes.add(instance);
+      }
+    }
+    Collections.sort(nodes, new NodeInstance.CompareNames());
+    return nodes;
+  }
+
+  @Override
+  public synchronized String toString() {
+    final StringBuilder sb = new StringBuilder("NodeMap{");
+    List<String> keys = new ArrayList<>(keySet());
+    Collections.sort(keys);
+    for (String key : keys) {
+      sb.append(key).append(": ");
+      sb.append(get(key).toFullString()).append("\n");
+    }
+    sb.append('}');
+    return sb.toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/82fdd408/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/OutstandingRequest.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/OutstandingRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/OutstandingRequest.java
new file mode 100644
index 0000000..4357ef8
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/OutstandingRequest.java
@@ -0,0 +1,428 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.slider.server.appmaster.state;
+
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.client.api.AMRMClient;
+import org.apache.hadoop.yarn.client.api.InvalidContainerRequestException;
+import org.apache.hadoop.yarn.util.resource.Resources;
+import org.apache.slider.common.tools.SliderUtils;
+import org.apache.slider.server.appmaster.operations.CancelSingleRequest;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Tracks an outstanding request. This is used to correlate an allocation response
+ * with the node and role used in the request.
+ * <p>
+ * The node identifier may be null -which indicates that a request was made without
+ * a specific target node
+ * <p>
+ * Equality and the hash code are based <i>only</i> on the role and hostname,
+ * which are fixed in the constructor. This means that a simple 
+ * instance constructed with (role, hostname) can be used to look up
+ * a complete request instance in the {@link OutstandingRequestTracker} map
+ */
+public final class OutstandingRequest extends RoleHostnamePair {
+  protected static final Logger log =
+    LoggerFactory.getLogger(OutstandingRequest.class);
+
+  /**
+   * Node the request is for -may be null
+   */
+  public final NodeInstance node;
+
+  /**
+   * A list of all possible nodes to list in an AA request. For a non-AA
+   * request where {@link #node} is set, element 0 of the list is the same
+   * value.
+   */
+  public final List<NodeInstance> nodes = new ArrayList<>(1);
+
+  /**
+   * Optional label. This is cached as the request option (explicit-location + label) is forbidden,
+   * yet the label needs to be retained for escalation.
+   */
+  public String label;
+
+  /**
+   * Requested time in millis.
+   * <p>
+   * Only valid after {@link #buildContainerRequest(Resource, RoleStatus, long)}
+   */
+  private AMRMClient.ContainerRequest issuedRequest;
+  
+  /**
+   * Requested time in millis.
+   * <p>
+   * Only valid after {@link #buildContainerRequest(Resource, RoleStatus, long)}
+   */
+  private long requestedTimeMillis;
+
+  /**
+   * Time in millis after which escalation should be triggered..
+   * <p>
+   * Only valid after {@link #buildContainerRequest(Resource, RoleStatus, long)}
+   */
+  private long escalationTimeoutMillis;
+
+  /**
+   * Has the placement request been escalated?
+   */
+  private boolean escalated;
+
+  /**
+   * Flag to indicate that escalation is allowed
+   */
+  private boolean mayEscalate;
+
+  /**
+   * Priority of request; only valid after the request is built up
+   */
+  private int priority = -1;
+
+  /**
+   * Is this an Anti-affine request which should be cancelled on
+   * a cluster resize?
+   */
+  private boolean antiAffine = false;
+
+  /**
+   * Create a request
+   * @param roleId role
+   * @param node node -can be null
+   */
+  public OutstandingRequest(int roleId,
+                            NodeInstance node) {
+    super(roleId, node != null ? node.hostname : null);
+    this.node = node;
+    nodes.add(node);
+  }
+
+  /**
+   * Create an outstanding request with the given role and hostname
+   * Important: this is useful only for map lookups -the other constructor
+   * with the NodeInstance parameter is needed to generate node-specific
+   * container requests
+   * @param roleId role
+   * @param hostname hostname
+   */
+  public OutstandingRequest(int roleId, String hostname) {
+    super(roleId, hostname);
+    this.node = null;
+  }
+
+  /**
+   * Create an Anti-affine reques, including all listed nodes (there must be one)
+   * as targets.
+   * @param roleId role
+   * @param nodes list of nodes
+   */
+  public OutstandingRequest(int roleId, List<NodeInstance> nodes) {
+    super(roleId, nodes.get(0).hostname);
+    this.node = null;
+    this.antiAffine = true;
+    this.nodes.addAll(nodes);
+  }
+
+  /**
+   * Is the request located in the cluster, that is: does it have a node.
+   * @return true if a node instance was supplied in the constructor
+   */
+  public boolean isLocated() {
+    return node != null;
+  }
+
+  public long getRequestedTimeMillis() {
+    return requestedTimeMillis;
+  }
+
+  public long getEscalationTimeoutMillis() {
+    return escalationTimeoutMillis;
+  }
+
+  public synchronized boolean isEscalated() {
+    return escalated;
+  }
+
+  public boolean mayEscalate() {
+    return mayEscalate;
+  }
+
+  public AMRMClient.ContainerRequest getIssuedRequest() {
+    return issuedRequest;
+  }
+
+  public int getPriority() {
+    return priority;
+  }
+
+  public boolean isAntiAffine() {
+    return antiAffine;
+  }
+
+  public void setAntiAffine(boolean antiAffine) {
+    this.antiAffine = antiAffine;
+  }
+
+  /**
+   * Build a container request.
+   * <p>
+   *  The value of {@link #node} is used to direct a lot of policy. If null,
+   *  placement is relaxed.
+   *  If not null, the choice of whether to use the suggested node
+   *  is based on the placement policy and failure history.
+   * <p>
+   * If the request has an address, it is set in the container request
+   * (with a flag to enable relaxed priorities).
+   * <p>
+   * This operation sets the requested time flag, used for tracking timeouts
+   * on outstanding requests
+   * @param resource resource
+   * @param role role
+   * @param time time in millis to record as request time
+   * @return the request to raise
+   */
+  public synchronized AMRMClient.ContainerRequest buildContainerRequest(
+      Resource resource, RoleStatus role, long time) {
+    Preconditions.checkArgument(resource != null, "null `resource` arg");
+    Preconditions.checkArgument(role != null, "null `role` arg");
+
+    // cache label for escalation
+    label = role.getLabelExpression();
+    requestedTimeMillis = time;
+    escalationTimeoutMillis = time + role.getPlacementTimeoutSeconds() * 1000;
+    String[] hosts;
+    boolean relaxLocality;
+    boolean strictPlacement = role.isStrictPlacement();
+    NodeInstance target = this.node;
+    String nodeLabels;
+
+    if (isAntiAffine()) {
+      int size = nodes.size();
+      log.info("Creating anti-affine request across {} nodes; first node = {}",
+          size, hostname);
+      hosts = new String[size];
+      StringBuilder builder = new StringBuilder(size * 16);
+      int c = 0;
+      for (NodeInstance nodeInstance : nodes) {
+        hosts[c++] = nodeInstance.hostname;
+        builder.append(nodeInstance.hostname).append(" ");
+      }
+      log.debug("Full host list: [ {}]", builder);
+      escalated = false;
+      mayEscalate = false;
+      relaxLocality = false;
+      nodeLabels = null;
+    } else if (target != null) {
+      // placed request. Hostname is used in request
+      hosts = new String[1];
+      hosts[0] = target.hostname;
+      // and locality flag is set to false; Slider will decide when
+      // to relax things
+      relaxLocality = false;
+
+      log.info("Submitting request for container on {}", hosts[0]);
+      // enable escalation for all but strict placements.
+      escalated = false;
+      mayEscalate = !strictPlacement;
+      nodeLabels = null;
+    } else {
+      // no hosts
+      hosts = null;
+      // relax locality is mandatory on an unconstrained placement
+      relaxLocality = true;
+      // declare that the the placement is implicitly escalated.
+      escalated = true;
+      // and forbid it happening
+      mayEscalate = false;
+      nodeLabels = label;
+    }
+    Priority pri = ContainerPriority.createPriority(roleId, !relaxLocality);
+    priority = pri.getPriority();
+    issuedRequest = new AMRMClient.ContainerRequest(resource,
+                                      hosts,
+                                      null,
+                                      pri,
+                                      relaxLocality,
+                                      nodeLabels);
+    validate();
+    return issuedRequest;
+  }
+
+
+  /**
+   * Build an escalated container request, updating {@link #issuedRequest} with
+   * the new value.
+   * @return the new container request, which has the same resource and label requirements
+   * as the original one, and the same host, but: relaxed placement, and a changed priority
+   * so as to place it into the relaxed list.
+   */
+  public synchronized AMRMClient.ContainerRequest escalate() {
+    Preconditions.checkNotNull(issuedRequest, "cannot escalate if request not issued " + this);
+    log.debug("Escalating {}", this.toString());
+    escalated = true;
+
+    // this is now the priority
+    // it is tagged as unlocated because it needs to go into a different
+    // set of outstanding requests from the strict placements
+    Priority pri = ContainerPriority.createPriority(roleId, false);
+    // update the field
+    priority = pri.getPriority();
+
+    String[] nodes;
+    List<String> issuedRequestNodes = issuedRequest.getNodes();
+    if (SliderUtils.isUnset(label) && issuedRequestNodes != null) {
+      nodes = issuedRequestNodes.toArray(new String[issuedRequestNodes.size()]);
+    } else {
+      nodes = null;
+    }
+
+    issuedRequest = new AMRMClient.ContainerRequest(issuedRequest.getCapability(),
+        nodes,
+        null,
+        pri,
+        true,
+        label);
+    validate();
+    return issuedRequest;
+  }
+      
+  /**
+   * Mark the request as completed (or canceled).
+   * <p>
+   *   Current action: if a node is defined, its request count is decremented
+   */
+  public void completed() {
+    if (node != null) {
+      node.getOrCreate(roleId).requestCompleted();
+    }
+  }
+
+  /**
+   * Query to see if the request is available and ready to be escalated
+   * @param time time to check against
+   * @return true if escalation should begin
+   */
+  public synchronized boolean shouldEscalate(long time) {
+    return mayEscalate
+           && !escalated
+           && issuedRequest != null
+           && escalationTimeoutMillis < time;
+  }
+
+  /**
+   * Query for the resource requirements matching; always false before a request is issued
+   * @param resource
+   * @return
+   */
+  public synchronized boolean resourceRequirementsMatch(Resource resource) {
+    return issuedRequest != null && Resources.fitsIn(issuedRequest.getCapability(), resource);
+  }
+
+  @Override
+  public String toString() {
+    boolean requestHasLocation = ContainerPriority.hasLocation(getPriority());
+    final StringBuilder sb = new StringBuilder("OutstandingRequest{");
+    sb.append("roleId=").append(roleId);
+    if (hostname != null) {
+      sb.append(", hostname='").append(hostname).append('\'');
+    }
+    sb.append(", node=").append(node);
+    sb.append(", hasLocation=").append(requestHasLocation);
+    sb.append(", label=").append(label);
+    sb.append(", requestedTimeMillis=").append(requestedTimeMillis);
+    sb.append(", mayEscalate=").append(mayEscalate);
+    sb.append(", escalated=").append(escalated);
+    sb.append(", escalationTimeoutMillis=").append(escalationTimeoutMillis);
+    sb.append(", issuedRequest=").append(
+        issuedRequest != null ? SliderUtils.requestToString(issuedRequest) : "(null)");
+    sb.append('}');
+    return sb.toString();
+  }
+
+  /**
+   * Create a cancel operation
+   * @return an operation that can be used to cancel the request
+   */
+  public CancelSingleRequest createCancelOperation() {
+    Preconditions.checkState(issuedRequest != null, "No issued request to cancel");
+    return new CancelSingleRequest(issuedRequest);
+  }
+
+  /**
+   * Valid if a node label expression specified on container request is valid or
+   * not. Mimics the logic in AMRMClientImpl, so can be used for preflight checking
+   * and in mock tests
+   *
+   */
+  public void validate() throws InvalidContainerRequestException {
+    Preconditions.checkNotNull(issuedRequest, "request has not yet been built up");
+    AMRMClient.ContainerRequest containerRequest = issuedRequest;
+    String requestDetails = this.toString();
+    validateContainerRequest(containerRequest, priority, requestDetails);
+  }
+
+  /**
+   * Inner Validation logic for container request
+   * @param containerRequest request
+   * @param priority raw priority of role
+   * @param requestDetails details for error messages
+   */
+  @VisibleForTesting
+  public static void validateContainerRequest(AMRMClient.ContainerRequest containerRequest,
+    int priority, String requestDetails) {
+    String exp = containerRequest.getNodeLabelExpression();
+    boolean hasRacks = containerRequest.getRacks() != null &&
+      (!containerRequest.getRacks().isEmpty());
+    boolean hasNodes = containerRequest.getNodes() != null &&
+      (!containerRequest.getNodes().isEmpty());
+
+    boolean hasLabel = SliderUtils.isSet(exp);
+
+    // Don't support specifying >= 2 node labels in a node label expression now
+    if (hasLabel && (exp.contains("&&") || exp.contains("||"))) {
+      throw new InvalidContainerRequestException(
+          "Cannot specify more than two node labels"
+              + " in a single node label expression: " + requestDetails);
+    }
+
+    // Don't allow specify node label against ANY request listing hosts or racks
+    if (hasLabel && ( hasRacks || hasNodes)) {
+      throw new InvalidContainerRequestException(
+          "Cannot specify node label with rack or node: " + requestDetails);
+    }
+  }
+
+  /**
+   * Create a new role/hostname pair for indexing.
+   * @return a new index.
+   */
+  public RoleHostnamePair getIndex() {
+    return new RoleHostnamePair(roleId, hostname);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/82fdd408/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/OutstandingRequestTracker.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/OutstandingRequestTracker.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/OutstandingRequestTracker.java
new file mode 100644
index 0000000..c16aa3c
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/OutstandingRequestTracker.java
@@ -0,0 +1,482 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.slider.server.appmaster.state;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.client.api.AMRMClient;
+import org.apache.slider.common.tools.SliderUtils;
+import org.apache.slider.server.appmaster.operations.AbstractRMOperation;
+import org.apache.slider.server.appmaster.operations.CancelSingleRequest;
+import org.apache.slider.server.appmaster.operations.ContainerRequestOperation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Tracks outstanding requests made with a specific placement option.
+ * <p>
+ *   <ol>
+ *     <li>Used to decide when to return a node to 'can request containers here' list</li>
+ *     <li>Used to identify requests where placement has timed out, and so issue relaxed requests</li>
+ *   </ol>
+ * <p>
+ * If an allocation comes in that is not in the map: either the allocation
+ * was unplaced, or the placed allocation could not be met on the specified
+ * host, and the RM/scheduler fell back to another location. 
+ */
+
+public class OutstandingRequestTracker {
+  protected static final Logger log =
+    LoggerFactory.getLogger(OutstandingRequestTracker.class);
+
+  /**
+   * no requests; saves creating a new list if not needed
+   */
+  private final List<AbstractRMOperation> NO_REQUESTS = new ArrayList<>(0);
+ 
+  private Map<RoleHostnamePair, OutstandingRequest> placedRequests = new HashMap<>();
+
+  /**
+   * List of open requests; no specific details on them.
+   */
+  private List<OutstandingRequest> openRequests = new ArrayList<>();
+
+  /**
+   * Create a new request for the specific role.
+   * <p>
+   * If a location is set, the request is added to {@link #placedRequests}.
+   * If not, it is added to {@link #openRequests}
+   * <p>
+   * This does not update the node instance's role's request count
+   * @param instance node instance to manager
+   * @param role role index
+   * @return a new request
+   */
+  public synchronized OutstandingRequest newRequest(NodeInstance instance, int role) {
+    OutstandingRequest request = new OutstandingRequest(role, instance);
+    if (request.isLocated()) {
+      placedRequests.put(request.getIndex(), request);
+    } else {
+      openRequests.add(request);
+    }
+    return request;
+  }
+
+  /**
+   * Create a new Anti-affine request for the specific role
+   * <p>
+   * It is added to {@link #openRequests}
+   * <p>
+   * This does not update the node instance's role's request count
+   * @param role role index
+   * @param nodes list of suitable nodes
+   * @param label label to use
+   * @return a new request
+   */
+  public synchronized OutstandingRequest newAARequest(int role,
+      List<NodeInstance> nodes,
+      String label) {
+    Preconditions.checkArgument(!nodes.isEmpty());
+    // safety check to verify the allocation will hold
+    for (NodeInstance node : nodes) {
+      Preconditions.checkState(node.canHost(role, label),
+        "Cannot allocate role ID %d to node %s", role, node);
+    }
+    OutstandingRequest request = new OutstandingRequest(role, nodes);
+    openRequests.add(request);
+    return request;
+  }
+
+  /**
+   * Look up any oustanding request to a (role, hostname). 
+   * @param role role index
+   * @param hostname hostname
+   * @return the request or null if there was no outstanding one in the {@link #placedRequests}
+   */
+  @VisibleForTesting
+  public synchronized OutstandingRequest lookupPlacedRequest(int role, String hostname) {
+    Preconditions.checkArgument(hostname != null, "null hostname");
+    return placedRequests.get(new RoleHostnamePair(role, hostname));
+  }
+
+  /**
+   * Remove a request
+   * @param request matching request to find
+   * @return the request or null for no match in the {@link #placedRequests}
+   */
+  @VisibleForTesting
+  public synchronized OutstandingRequest removePlacedRequest(OutstandingRequest request) {
+    return placedRequests.remove(request);
+  }
+
+  /**
+   * Notification that a container has been allocated
+   *
+   * <ol>
+   *   <li>drop it from the {@link #placedRequests} structure.</li>
+   *   <li>generate the cancellation request</li>
+   *   <li>for AA placement, any actions needed</li>
+   * </ol>
+   *
+   * @param role role index
+   * @param hostname hostname
+   * @return the allocation outcome
+   */
+  public synchronized ContainerAllocationResults onContainerAllocated(int role,
+      String hostname,
+      Container container) {
+    final String containerDetails = SliderUtils.containerToString(container);
+    log.debug("Processing allocation for role {}  on {}", role,
+        containerDetails);
+    ContainerAllocationResults allocation = new ContainerAllocationResults();
+    ContainerAllocationOutcome outcome;
+    OutstandingRequest request = placedRequests.remove(new OutstandingRequest(role, hostname));
+    if (request != null) {
+      //satisfied request
+      log.debug("Found oustanding placed request for container: {}", request);
+      request.completed();
+      // derive outcome from status of tracked request
+      outcome = request.isEscalated()
+          ? ContainerAllocationOutcome.Escalated
+          : ContainerAllocationOutcome.Placed;
+    } else {
+      // not in the list; this is an open placement
+      // scan through all containers in the open request list
+      request = removeOpenRequest(container);
+      if (request != null) {
+        log.debug("Found open outstanding request for container: {}", request);
+        request.completed();
+        outcome = ContainerAllocationOutcome.Open;
+      } else {
+        log.warn("No oustanding request found for container {}, outstanding queue has {} entries ",
+            containerDetails,
+            openRequests.size());
+        outcome = ContainerAllocationOutcome.Unallocated;
+      }
+    }
+    if (request != null && request.getIssuedRequest() != null) {
+      allocation.operations.add(request.createCancelOperation());
+    } else {
+      // there's a request, but no idea what to cancel.
+      // rather than try to recover from it inelegantly, (and cause more confusion),
+      // log the event, but otherwise continue
+      log.warn("Unexpected allocation of container " + SliderUtils.containerToString(container));
+    }
+
+    allocation.origin = request;
+    allocation.outcome = outcome;
+    return allocation;
+  }
+
+  /**
+   * Find and remove an open request. Determine it by scanning open requests
+   * for one whose priority & resource requirements match that of the container
+   * allocated.
+   * @param container container allocated
+   * @return a request which matches the allocation, or null for "no match"
+   */
+  private OutstandingRequest removeOpenRequest(Container container) {
+    int pri = container.getPriority().getPriority();
+    Resource resource = container.getResource();
+    OutstandingRequest request = null;
+    ListIterator<OutstandingRequest> openlist = openRequests.listIterator();
+    while (openlist.hasNext() && request == null) {
+      OutstandingRequest r = openlist.next();
+      if (r.getPriority() == pri) {
+        // matching resource
+        if (r.resourceRequirementsMatch(resource)) {
+          // match of priority and resources
+          request = r;
+          openlist.remove();
+        } else {
+          log.debug("Matched priorities but resources different");
+        }
+      }
+    }
+    return request;
+  }
+  
+  /**
+   * Determine which host was a role type most recently used on, so that
+   * if a choice is made of which (potentially surplus) containers to use,
+   * the most recent one is picked first. This operation <i>does not</i>
+   * change the role history, though it queries it.
+   */
+  static class newerThan implements Comparator<Container>, Serializable {
+    private RoleHistory rh;
+    
+    public newerThan(RoleHistory rh) {
+      this.rh = rh;
+    }
+
+    /**
+     * Get the age of a node hosting container. If it is not known in the history, 
+     * return 0.
+     * @param c container
+     * @return age, null if there's no entry for it. 
+     */
+    private long getAgeOf(Container c) {
+      long age = 0;
+      NodeInstance node = rh.getExistingNodeInstance(c);
+      int role = ContainerPriority.extractRole(c);
+      if (node != null) {
+        NodeEntry nodeEntry = node.get(role);
+        if (nodeEntry != null) {
+          age = nodeEntry.getLastUsed();
+        }
+      }
+      return age;
+    }
+
+    /**
+     * Comparator: which host is more recent?
+     * @param c1 container 1
+     * @param c2 container 2
+     * @return 1 if c2 older-than c1, 0 if equal; -1 if c1 older-than c2
+     */
+    @Override
+    public int compare(Container c1, Container c2) {
+      int role1 = ContainerPriority.extractRole(c1);
+      int role2 = ContainerPriority.extractRole(c2);
+      if (role1 < role2) return -1;
+      if (role1 > role2) return 1;
+
+      long age = getAgeOf(c1);
+      long age2 = getAgeOf(c2);
+
+      if (age > age2) {
+        return -1;
+      } else if (age < age2) {
+        return 1;
+      }
+      // equal
+      return 0;
+    }
+  }
+
+  /**
+   * Take a list of requests and split them into specific host requests and
+   * generic assignments. This is to give requested hosts priority
+   * in container assignments if more come back than expected
+   * @param rh RoleHistory instance
+   * @param inAllocated the list of allocated containers
+   * @param outPlaceRequested initially empty list of requested locations 
+   * @param outUnplaced initially empty list of unrequested hosts
+   */
+  public synchronized void partitionRequests(RoleHistory rh,
+      List<Container> inAllocated,
+      List<Container> outPlaceRequested,
+      List<Container> outUnplaced) {
+    Collections.sort(inAllocated, new newerThan(rh));
+    for (Container container : inAllocated) {
+      int role = ContainerPriority.extractRole(container);
+      String hostname = RoleHistoryUtils.hostnameOf(container);
+      if (placedRequests.containsKey(new OutstandingRequest(role, hostname))) {
+        outPlaceRequested.add(container);
+      } else {
+        outUnplaced.add(container);
+      }
+    }
+  }
+  
+
+  /**
+   * Reset list all outstanding requests for a role: return the hostnames
+   * of any canceled requests
+   *
+   * @param role role to cancel
+   * @return possibly empty list of hostnames
+   */
+  public synchronized List<NodeInstance> resetOutstandingRequests(int role) {
+    List<NodeInstance> hosts = new ArrayList<>();
+    Iterator<Map.Entry<RoleHostnamePair, OutstandingRequest>> iterator =
+      placedRequests.entrySet().iterator();
+    while (iterator.hasNext()) {
+      Map.Entry<RoleHostnamePair, OutstandingRequest> next =
+        iterator.next();
+      OutstandingRequest request = next.getValue();
+      if (request.roleId == role) {
+        iterator.remove();
+        request.completed();
+        hosts.add(request.node);
+      }
+    }
+    ListIterator<OutstandingRequest> openlist = openRequests.listIterator();
+    while (openlist.hasNext()) {
+      OutstandingRequest next = openlist.next();
+      if (next.roleId == role) {
+        openlist.remove();
+      }
+    }
+    return hosts;
+  }
+
+  /**
+   * Get a list of outstanding requests. The list is cloned, but the contents
+   * are shared
+   * @return a list of the current outstanding requests
+   */
+  public synchronized List<OutstandingRequest> listPlacedRequests() {
+    return new ArrayList<>(placedRequests.values());
+  }
+
+  /**
+   * Get a list of outstanding requests. The list is cloned, but the contents
+   * are shared
+   * @return a list of the current outstanding requests
+   */
+  public synchronized List<OutstandingRequest> listOpenRequests() {
+    return new ArrayList<>(openRequests);
+  }
+
+  /**
+   * Escalate operation as triggered by external timer.
+   * @return a (usually empty) list of cancel/request operations.
+   */
+  @SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter")
+  public synchronized List<AbstractRMOperation> escalateOutstandingRequests(long now) {
+    if (placedRequests.isEmpty()) {
+      return NO_REQUESTS;
+    }
+
+    List<AbstractRMOperation> operations = new ArrayList<>();
+    for (OutstandingRequest outstandingRequest : placedRequests.values()) {
+      synchronized (outstandingRequest) {
+        // sync escalation check with operation so that nothing can happen to state
+        // of the request during the escalation
+        if (outstandingRequest.shouldEscalate(now)) {
+
+          // time to escalate
+          CancelSingleRequest cancel = outstandingRequest.createCancelOperation();
+          operations.add(cancel);
+          AMRMClient.ContainerRequest escalated = outstandingRequest.escalate();
+          operations.add(new ContainerRequestOperation(escalated));
+        }
+      }
+      
+    }
+    return operations;
+  }
+
+  /**
+   * Cancel all outstanding AA requests from the lists of requests.
+   *
+   * This does not remove them from the role status; they must be reset
+   * by the caller.
+   *
+   */
+  @SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter")
+  public synchronized List<AbstractRMOperation> cancelOutstandingAARequests() {
+
+    log.debug("Looking for AA request to cancel");
+    List<AbstractRMOperation> operations = new ArrayList<>();
+
+    // first, all placed requests
+    List<RoleHostnamePair> requestsToRemove = new ArrayList<>(placedRequests.size());
+    for (Map.Entry<RoleHostnamePair, OutstandingRequest> entry : placedRequests.entrySet()) {
+      OutstandingRequest outstandingRequest = entry.getValue();
+      synchronized (outstandingRequest) {
+        if (outstandingRequest.isAntiAffine()) {
+          // time to escalate
+          operations.add(outstandingRequest.createCancelOperation());
+          requestsToRemove.add(entry.getKey());
+        }
+      }
+    }
+    for (RoleHostnamePair keys : requestsToRemove) {
+      placedRequests.remove(keys);
+    }
+
+    // second, all open requests
+    ListIterator<OutstandingRequest> orit = openRequests.listIterator();
+    while (orit.hasNext()) {
+      OutstandingRequest outstandingRequest =  orit.next();
+      synchronized (outstandingRequest) {
+        if (outstandingRequest.isAntiAffine()) {
+          // time to escalate
+          operations.add(outstandingRequest.createCancelOperation());
+          orit.remove();
+        }
+      }
+    }
+    log.info("Cancelling {} outstanding AA requests", operations.size());
+
+    return operations;
+  }
+
+  /**
+   * Extract a specific number of open requests for a role
+   * @param roleId role Id
+   * @param count count to extract
+   * @return a list of requests which are no longer in the open request list
+   */
+  public synchronized List<OutstandingRequest> extractOpenRequestsForRole(int roleId, int count) {
+    List<OutstandingRequest> results = new ArrayList<>();
+    ListIterator<OutstandingRequest> openlist = openRequests.listIterator();
+    while (openlist.hasNext() && count > 0) {
+      OutstandingRequest openRequest = openlist.next();
+      if (openRequest.roleId == roleId) {
+        results.add(openRequest);
+        openlist.remove();
+        count--;
+      }
+    }
+    return results;
+  }
+
+  /**
+   * Extract a specific number of placed requests for a role
+   * @param roleId role Id
+   * @param count count to extract
+   * @return a list of requests which are no longer in the placed request data structure
+   */
+  public synchronized List<OutstandingRequest> extractPlacedRequestsForRole(int roleId, int count) {
+    List<OutstandingRequest> results = new ArrayList<>();
+    Iterator<Map.Entry<RoleHostnamePair, OutstandingRequest>>
+        iterator = placedRequests.entrySet().iterator();
+    while (iterator.hasNext() && count > 0) {
+      OutstandingRequest request = iterator.next().getValue();
+      if (request.roleId == roleId) {
+        results.add(request);
+        count--;
+      }
+    }
+    // now cull them from the map
+    for (OutstandingRequest result : results) {
+      placedRequests.remove(result);
+    }
+
+    return results;
+  }
+
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message