hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ss...@apache.org
Subject svn commit: r1376283 [12/22] - in /hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client: ./ hadoop-mapreduce-client-app2/ hadoop-mapreduce-client-app2/src/ hadoop-mapreduce-client-app2/src/main/ hadoop-mapreduce-client-app2/s...
Date Wed, 22 Aug 2012 22:11:48 GMT
Added: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/node/AMNodeMap.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/node/AMNodeMap.java?rev=1376283&view=auto
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/node/AMNodeMap.java (added)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/node/AMNodeMap.java Wed Aug 22 22:11:39 2012
@@ -0,0 +1,68 @@
+package org.apache.hadoop.mapreduce.v2.app2.rm.node;
+
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.hadoop.mapreduce.v2.app2.AppContext;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.event.EventHandler;
+
+// TODO Seems a little strange, extending ConcurrentHashMap like this.
+// TODO This needs to extend AbstractService to get a handle on the conf.
+@SuppressWarnings("serial")
+public class AMNodeMap extends ConcurrentHashMap<NodeId, AMNode> implements
+    EventHandler<AMNodeEvent> {
+
+  
+  
+  private final EventHandler eventHandler;
+  private final AppContext appContext;
+  
+  public AMNodeMap(EventHandler eventHandler, AppContext appContext) {
+    this.eventHandler = eventHandler;
+    this.appContext = appContext;
+    
+    // TODO XXX: Get a handle of allowed failures.
+  }
+  
+  public void nodeSeen(NodeId nodeId) {
+    // TODO Replace 3 with correct value.
+    putIfAbsent(nodeId, new AMNodeImpl(nodeId, 3, eventHandler, appContext));
+  }
+  
+  public boolean isHostBlackListed(String hostname) {
+    return false;
+    // Node versus host blacklisting.
+ // TODO XXX -> Maintain a map of host to NodeList (case of multiple NMs)
+    // Provide functionality to say isHostBlacklisted(hostname) -> all hosts.
+    // ... blacklisted means don't ask for containers on this host.
+    // Same list to be used for computing forcedUnblacklisting.
+  }
+  
+  public void handle(AMNodeEvent event) {
+    if (event.getType() == AMNodeEventType.N_NODE_WAS_BLACKLISTED) {
+      // TODO Handle blacklisting.
+    } else {
+      NodeId nodeId = event.getNodeId();
+      get(nodeId).handle(event);
+    }
+  }
+  
+  
+  
+//nodeBlacklistingEnabled = 
+//conf.getBoolean(MRJobConfig.MR_AM_JOB_NODE_BLACKLISTING_ENABLE, true);
+//LOG.info("nodeBlacklistingEnabled:" + nodeBlacklistingEnabled);
+//maxTaskFailuresPerNode = 
+//conf.getInt(MRJobConfig.MAX_TASK_FAILURES_PER_TRACKER, 3);
+//blacklistDisablePercent =
+//  conf.getInt(
+//      MRJobConfig.MR_AM_IGNORE_BLACKLISTING_BLACKLISTED_NODE_PERECENT,
+//      MRJobConfig.DEFAULT_MR_AM_IGNORE_BLACKLISTING_BLACKLISTED_NODE_PERCENT);
+//LOG.info("maxTaskFailuresPerNode is " + maxTaskFailuresPerNode);
+//if (blacklistDisablePercent < -1 || blacklistDisablePercent > 100) {
+//throw new YarnException("Invalid blacklistDisablePercent: "
+//    + blacklistDisablePercent
+//    + ". Should be an integer between 0 and 100 or -1 to disabled");
+//}
+//LOG.info("blacklistDisablePercent is " + blacklistDisablePercent);
+}

Added: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/node/AMNodeState.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/node/AMNodeState.java?rev=1376283&view=auto
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/node/AMNodeState.java (added)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/node/AMNodeState.java Wed Aug 22 22:11:39 2012
@@ -0,0 +1,8 @@
+package org.apache.hadoop.mapreduce.v2.app2.rm.node;
+
+public enum AMNodeState {
+  ACTIVE,
+  FORCED_ACTIVE,
+  BLACKLISTED,
+  UNHEALTHY,
+}

Added: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/package-info.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/package-info.java?rev=1376283&view=auto
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/package-info.java (added)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/package-info.java Wed Aug 22 22:11:39 2012
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+@InterfaceAudience.Private
+package org.apache.hadoop.mapreduce.v2.app2.rm;
+import org.apache.hadoop.classification.InterfaceAudience;

Added: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/security/authorize/ClientHSPolicyProvider.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/security/authorize/ClientHSPolicyProvider.java?rev=1376283&view=auto
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/security/authorize/ClientHSPolicyProvider.java (added)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/security/authorize/ClientHSPolicyProvider.java Wed Aug 22 22:11:39 2012
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapreduce.v2.app2.security.authorize;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.mapreduce.v2.api.HSClientProtocolPB;
+import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig;
+import org.apache.hadoop.security.authorize.PolicyProvider;
+import org.apache.hadoop.security.authorize.Service;
+
+/**
+ * {@link PolicyProvider} for YARN MapReduce protocols.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class ClientHSPolicyProvider extends PolicyProvider {
+  
+  private static final Service[] mrHSServices = 
+      new Service[] {
+    new Service(
+        JHAdminConfig.MR_HS_SECURITY_SERVICE_AUTHORIZATION,
+        HSClientProtocolPB.class)
+  };
+
+  @Override
+  public Service[] getServices() {
+    return mrHSServices;
+  }
+}

Added: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/security/authorize/MRAMPolicyProvider.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/security/authorize/MRAMPolicyProvider.java?rev=1376283&view=auto
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/security/authorize/MRAMPolicyProvider.java (added)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/security/authorize/MRAMPolicyProvider.java Wed Aug 22 22:11:39 2012
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapreduce.v2.app2.security.authorize;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.mapred.TaskUmbilicalProtocol;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.v2.api.MRClientProtocolPB;
+import org.apache.hadoop.security.authorize.PolicyProvider;
+import org.apache.hadoop.security.authorize.Service;
+
+/**
+ * {@link PolicyProvider} for YARN MapReduce protocols.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class MRAMPolicyProvider extends PolicyProvider {
+  
+  private static final Service[] mapReduceApplicationMasterServices = 
+      new Service[] {
+    new Service(
+        MRJobConfig.MR_AM_SECURITY_SERVICE_AUTHORIZATION_TASK_UMBILICAL,
+        TaskUmbilicalProtocol.class),
+    new Service(
+        MRJobConfig.MR_AM_SECURITY_SERVICE_AUTHORIZATION_CLIENT,
+        MRClientProtocolPB.class)
+  };
+
+  @Override
+  public Service[] getServices() {
+    return mapReduceApplicationMasterServices;
+  }
+
+}

Added: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/security/authorize/package-info.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/security/authorize/package-info.java?rev=1376283&view=auto
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/security/authorize/package-info.java (added)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/security/authorize/package-info.java Wed Aug 22 22:11:39 2012
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+@InterfaceAudience.Private
+package org.apache.hadoop.mapreduce.v2.app2.security.authorize;
+import org.apache.hadoop.classification.InterfaceAudience;

Added: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/speculate/DataStatistics.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/speculate/DataStatistics.java?rev=1376283&view=auto
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/speculate/DataStatistics.java (added)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/speculate/DataStatistics.java Wed Aug 22 22:11:39 2012
@@ -0,0 +1,78 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2.app2.speculate;
+
+public class DataStatistics {
+  private int count = 0;
+  private double sum = 0;
+  private double sumSquares = 0;
+
+  public DataStatistics() {
+  }
+
+  public DataStatistics(double initNum) {
+    this.count = 1;
+    this.sum = initNum;
+    this.sumSquares = initNum * initNum;
+  }
+
+  public synchronized void add(double newNum) {
+    this.count++;
+    this.sum += newNum;
+    this.sumSquares += newNum * newNum;
+  }
+
+  public synchronized void updateStatistics(double old, double update) {
+	this.sum += update - old;
+	this.sumSquares += (update * update) - (old * old);
+  }
+
+  public synchronized double mean() {
+    return count == 0 ? 0.0 : sum/count;
+  }
+
+  public synchronized double var() {
+    // E(X^2) - E(X)^2
+    if (count <= 1) {
+      return 0.0;
+    }
+    double mean = mean();
+    return Math.max((sumSquares/count) - mean * mean, 0.0d);
+  }
+
+  public synchronized double std() {
+    return Math.sqrt(this.var());
+  }
+
+  public synchronized double outlier(float sigma) {
+    if (count != 0.0) {
+      return mean() + std() * sigma;
+    }
+
+    return 0.0;
+  }
+
+  public synchronized double count() {
+    return count;
+  }
+
+  public String toString() {
+    return "DataStatistics: count is " + count + ", sum is " + sum +
+    ", sumSquares is " + sumSquares + " mean is " + mean() + " std() is " + std();
+  }
+}

Added: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/speculate/DefaultSpeculator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/speculate/DefaultSpeculator.java?rev=1376283&view=auto
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/speculate/DefaultSpeculator.java (added)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/speculate/DefaultSpeculator.java Wed Aug 22 22:11:39 2012
@@ -0,0 +1,511 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2.app2.speculate;
+
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.v2.api.records.JobId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
+import org.apache.hadoop.mapreduce.v2.app2.AppContext;
+import org.apache.hadoop.mapreduce.v2.app2.job.Job;
+import org.apache.hadoop.mapreduce.v2.app2.job.Task;
+import org.apache.hadoop.mapreduce.v2.app2.job.TaskAttempt;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskEvent;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskEventType;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptStatusUpdateEvent.TaskAttemptStatus;
+import org.apache.hadoop.yarn.Clock;
+import org.apache.hadoop.yarn.YarnException;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.service.AbstractService;
+
+
+public class DefaultSpeculator extends AbstractService implements
+    Speculator {
+
+  private static final long ON_SCHEDULE = Long.MIN_VALUE;
+  private static final long ALREADY_SPECULATING = Long.MIN_VALUE + 1;
+  private static final long TOO_NEW = Long.MIN_VALUE + 2;
+  private static final long PROGRESS_IS_GOOD = Long.MIN_VALUE + 3;
+  private static final long NOT_RUNNING = Long.MIN_VALUE + 4;
+  private static final long TOO_LATE_TO_SPECULATE = Long.MIN_VALUE + 5;
+
+  private static final long SOONEST_RETRY_AFTER_NO_SPECULATE = 1000L * 1L;
+  private static final long SOONEST_RETRY_AFTER_SPECULATE = 1000L * 15L;
+
+  private static final double PROPORTION_RUNNING_TASKS_SPECULATABLE = 0.1;
+  private static final double PROPORTION_TOTAL_TASKS_SPECULATABLE = 0.01;
+  private static final int  MINIMUM_ALLOWED_SPECULATIVE_TASKS = 10;
+
+  private static final Log LOG = LogFactory.getLog(DefaultSpeculator.class);
+
+  private final ConcurrentMap<TaskId, Boolean> runningTasks
+      = new ConcurrentHashMap<TaskId, Boolean>();
+
+  private final Map<Task, AtomicBoolean> pendingSpeculations
+      = new ConcurrentHashMap<Task, AtomicBoolean>();
+
+  // These are the current needs, not the initial needs.  For each job, these
+  //  record the number of attempts that exist and that are actively
+  //  waiting for a container [as opposed to running or finished]
+  private final ConcurrentMap<JobId, AtomicInteger> mapContainerNeeds
+      = new ConcurrentHashMap<JobId, AtomicInteger>();
+  private final ConcurrentMap<JobId, AtomicInteger> reduceContainerNeeds
+      = new ConcurrentHashMap<JobId, AtomicInteger>();
+
+  private final Set<TaskId> mayHaveSpeculated = new HashSet<TaskId>();
+
+  private final Configuration conf;
+  private AppContext context;
+  private Thread speculationBackgroundThread = null;
+  private BlockingQueue<SpeculatorEvent> eventQueue
+      = new LinkedBlockingQueue<SpeculatorEvent>();
+  private TaskRuntimeEstimator estimator;
+
+  private BlockingQueue<Object> scanControl = new LinkedBlockingQueue<Object>();
+
+  private final Clock clock;
+
+  private final EventHandler<TaskEvent> eventHandler;
+
+  public DefaultSpeculator(Configuration conf, AppContext context) {
+    this(conf, context, context.getClock());
+  }
+
+  public DefaultSpeculator(Configuration conf, AppContext context, Clock clock) {
+    this(conf, context, getEstimator(conf, context), clock);
+  }
+  
+  static private TaskRuntimeEstimator getEstimator
+      (Configuration conf, AppContext context) {
+    TaskRuntimeEstimator estimator;
+    
+    try {
+      // "yarn.mapreduce.job.task.runtime.estimator.class"
+      Class<? extends TaskRuntimeEstimator> estimatorClass
+          = conf.getClass(MRJobConfig.MR_AM_TASK_ESTIMATOR,
+                          LegacyTaskRuntimeEstimator.class,
+                          TaskRuntimeEstimator.class);
+
+      Constructor<? extends TaskRuntimeEstimator> estimatorConstructor
+          = estimatorClass.getConstructor();
+
+      estimator = estimatorConstructor.newInstance();
+
+      estimator.contextualize(conf, context);
+    } catch (InstantiationException ex) {
+      LOG.error("Can't make a speculation runtime extimator", ex);
+      throw new YarnException(ex);
+    } catch (IllegalAccessException ex) {
+      LOG.error("Can't make a speculation runtime extimator", ex);
+      throw new YarnException(ex);
+    } catch (InvocationTargetException ex) {
+      LOG.error("Can't make a speculation runtime extimator", ex);
+      throw new YarnException(ex);
+    } catch (NoSuchMethodException ex) {
+      LOG.error("Can't make a speculation runtime extimator", ex);
+      throw new YarnException(ex);
+    }
+    
+  return estimator;
+  }
+
+  // This constructor is designed to be called by other constructors.
+  //  However, it's public because we do use it in the test cases.
+  // Normally we figure out our own estimator.
+  public DefaultSpeculator
+      (Configuration conf, AppContext context,
+       TaskRuntimeEstimator estimator, Clock clock) {
+    super(DefaultSpeculator.class.getName());
+
+    this.conf = conf;
+    this.context = context;
+    this.estimator = estimator;
+    this.clock = clock;
+    this.eventHandler = context.getEventHandler();
+  }
+
+/*   *************************************************************    */
+
+  // This is the task-mongering that creates the two new threads -- one for
+  //  processing events from the event queue and one for periodically
+  //  looking for speculation opportunities
+
+  @Override
+  public void start() {
+    Runnable speculationBackgroundCore
+        = new Runnable() {
+            @Override
+            public void run() {
+              while (!Thread.currentThread().isInterrupted()) {
+                long backgroundRunStartTime = clock.getTime();
+                try {
+                  int speculations = computeSpeculations();
+                  long mininumRecomp
+                      = speculations > 0 ? SOONEST_RETRY_AFTER_SPECULATE
+                                         : SOONEST_RETRY_AFTER_NO_SPECULATE;
+
+                  long wait = Math.max(mininumRecomp,
+                        clock.getTime() - backgroundRunStartTime);
+
+                  if (speculations > 0) {
+                    LOG.info("We launched " + speculations
+                        + " speculations.  Sleeping " + wait + " milliseconds.");
+                  }
+
+                  Object pollResult
+                      = scanControl.poll(wait, TimeUnit.MILLISECONDS);
+                } catch (InterruptedException e) {
+                  LOG.error("Background thread returning, interrupted : " + e);
+                  e.printStackTrace(System.out);
+                  return;
+                }
+              }
+            }
+          };
+    speculationBackgroundThread = new Thread
+        (speculationBackgroundCore, "DefaultSpeculator background processing");
+    speculationBackgroundThread.start();
+
+    super.start();
+  }
+
+  @Override
+  public void stop() {
+    // this could be called before background thread is established
+    if (speculationBackgroundThread != null) {
+      speculationBackgroundThread.interrupt();
+    }
+    super.stop();
+  }
+
+  @Override
+  public void handleAttempt(TaskAttemptStatus status) {
+    long timestamp = clock.getTime();
+    statusUpdate(status, timestamp);
+  }
+
+  // This section is not part of the Speculator interface; it's used only for
+  //  testing
+  public boolean eventQueueEmpty() {
+    return eventQueue.isEmpty();
+  }
+
+  // This interface is intended to be used only for test cases.
+  public void scanForSpeculations() {
+    LOG.info("We got asked to run a debug speculation scan.");
+    // debug
+    System.out.println("We got asked to run a debug speculation scan.");
+    System.out.println("There are " + scanControl.size()
+        + " events stacked already.");
+    scanControl.add(new Object());
+    Thread.yield();
+  }
+
+
+/*   *************************************************************    */
+
+  // This section contains the code that gets run for a SpeculatorEvent
+
+  private AtomicInteger containerNeed(TaskId taskID) {
+    JobId jobID = taskID.getJobId();
+    TaskType taskType = taskID.getTaskType();
+
+    ConcurrentMap<JobId, AtomicInteger> relevantMap
+        = taskType == TaskType.MAP ? mapContainerNeeds : reduceContainerNeeds;
+
+    AtomicInteger result = relevantMap.get(jobID);
+
+    if (result == null) {
+      relevantMap.putIfAbsent(jobID, new AtomicInteger(0));
+      result = relevantMap.get(jobID);
+    }
+
+    return result;
+  }
+
+  private synchronized void processSpeculatorEvent(SpeculatorEvent event) {
+    switch (event.getType()) {
+      case ATTEMPT_STATUS_UPDATE:
+        statusUpdate(event.getReportedStatus(), event.getTimestamp());
+        break;
+
+      case TASK_CONTAINER_NEED_UPDATE:
+      {
+        AtomicInteger need = containerNeed(event.getTaskID());
+        need.addAndGet(event.containersNeededChange());
+        break;
+      }
+
+      case ATTEMPT_START:
+      {
+        LOG.info("ATTEMPT_START " + event.getTaskID());
+        estimator.enrollAttempt
+            (event.getReportedStatus(), event.getTimestamp());
+        break;
+      }
+      
+      case JOB_CREATE:
+      {
+        LOG.info("JOB_CREATE " + event.getJobID());
+        estimator.contextualize(getConfig(), context);
+        break;
+      }
+    }
+  }
+
+  /**
+   * Absorbs one TaskAttemptStatus
+   *
+   * @param reportedStatus the status report that we got from a task attempt
+   *        that we want to fold into the speculation data for this job
+   * @param timestamp the time this status corresponds to.  This matters
+   *        because statuses contain progress.
+   */
+  protected void statusUpdate(TaskAttemptStatus reportedStatus, long timestamp) {
+
+    String stateString = reportedStatus.taskState.toString();
+
+    TaskAttemptId attemptID = reportedStatus.id;
+    TaskId taskID = attemptID.getTaskId();
+    Job job = context.getJob(taskID.getJobId());
+
+    if (job == null) {
+      return;
+    }
+
+    Task task = job.getTask(taskID);
+
+    if (task == null) {
+      return;
+    }
+
+    estimator.updateAttempt(reportedStatus, timestamp);
+
+    // If the task is already known to be speculation-bait, don't do anything
+    if (pendingSpeculations.get(task) != null) {
+      if (pendingSpeculations.get(task).get()) {
+        return;
+      }
+    }
+
+    if (stateString.equals(TaskAttemptState.RUNNING.name())) {
+      runningTasks.putIfAbsent(taskID, Boolean.TRUE);
+    } else {
+      runningTasks.remove(taskID, Boolean.TRUE);
+    }
+  }
+
+/*   *************************************************************    */
+
+// This is the code section that runs periodically and adds speculations for
+//  those jobs that need them.
+
+
+  // This can return a few magic values for tasks that shouldn't speculate:
+  //  returns ON_SCHEDULE if thresholdRuntime(taskID) says that we should not
+  //     considering speculating this task
+  //  returns ALREADY_SPECULATING if that is true.  This has priority.
+  //  returns TOO_NEW if our companion task hasn't gotten any information
+  //  returns PROGRESS_IS_GOOD if the task is sailing through
+  //  returns NOT_RUNNING if the task is not running
+  //
+  // All of these values are negative.  Any value that should be allowed to
+  //  speculate is 0 or positive.
+  private long speculationValue(TaskId taskID, long now) {
+    Job job = context.getJob(taskID.getJobId());
+    Task task = job.getTask(taskID);
+    Map<TaskAttemptId, TaskAttempt> attempts = task.getAttempts();
+    long acceptableRuntime = Long.MIN_VALUE;
+    long result = Long.MIN_VALUE;
+
+    if (!mayHaveSpeculated.contains(taskID)) {
+      acceptableRuntime = estimator.thresholdRuntime(taskID);
+      if (acceptableRuntime == Long.MAX_VALUE) {
+        return ON_SCHEDULE;
+      }
+    }
+
+    TaskAttemptId runningTaskAttemptID = null;
+
+    int numberRunningAttempts = 0;
+
+    for (TaskAttempt taskAttempt : attempts.values()) {
+      if (taskAttempt.getState() == TaskAttemptState.RUNNING
+          || taskAttempt.getState() == TaskAttemptState.ASSIGNED) {
+        if (++numberRunningAttempts > 1) {
+          return ALREADY_SPECULATING;
+        }
+        runningTaskAttemptID = taskAttempt.getID();
+
+        long estimatedRunTime = estimator.estimatedRuntime(runningTaskAttemptID);
+
+        long taskAttemptStartTime
+            = estimator.attemptEnrolledTime(runningTaskAttemptID);
+        if (taskAttemptStartTime > now) {
+          // This background process ran before we could process the task
+          //  attempt status change that chronicles the attempt start
+          return TOO_NEW;
+        }
+
+        long estimatedEndTime = estimatedRunTime + taskAttemptStartTime;
+
+        long estimatedReplacementEndTime
+            = now + estimator.estimatedNewAttemptRuntime(taskID);
+
+        if (estimatedEndTime < now) {
+          return PROGRESS_IS_GOOD;
+        }
+
+        if (estimatedReplacementEndTime >= estimatedEndTime) {
+          return TOO_LATE_TO_SPECULATE;
+        }
+
+        result = estimatedEndTime - estimatedReplacementEndTime;
+      }
+    }
+
+    // If we are here, there's at most one task attempt.
+    if (numberRunningAttempts == 0) {
+      return NOT_RUNNING;
+    }
+
+
+
+    if (acceptableRuntime == Long.MIN_VALUE) {
+      acceptableRuntime = estimator.thresholdRuntime(taskID);
+      if (acceptableRuntime == Long.MAX_VALUE) {
+        return ON_SCHEDULE;
+      }
+    }
+
+    return result;
+  }
+
+  //Add attempt to a given Task.
+  protected void addSpeculativeAttempt(TaskId taskID) {
+    LOG.info
+        ("DefaultSpeculator.addSpeculativeAttempt -- we are speculating " + taskID);
+    eventHandler.handle(new TaskEvent(taskID, TaskEventType.T_ADD_SPEC_ATTEMPT));
+    mayHaveSpeculated.add(taskID);
+  }
+
+  @Override
+  public void handle(SpeculatorEvent event) {
+    processSpeculatorEvent(event);
+  }
+
+
+  private int maybeScheduleAMapSpeculation() {
+    return maybeScheduleASpeculation(TaskType.MAP);
+  }
+
+  private int maybeScheduleAReduceSpeculation() {
+    return maybeScheduleASpeculation(TaskType.REDUCE);
+  }
+
+  private int maybeScheduleASpeculation(TaskType type) {
+    int successes = 0;
+
+    long now = clock.getTime();
+
+    ConcurrentMap<JobId, AtomicInteger> containerNeeds
+        = type == TaskType.MAP ? mapContainerNeeds : reduceContainerNeeds;
+
+    for (ConcurrentMap.Entry<JobId, AtomicInteger> jobEntry : containerNeeds.entrySet()) {
+      // This race conditon is okay.  If we skip a speculation attempt we
+      //  should have tried because the event that lowers the number of
+      //  containers needed to zero hasn't come through, it will next time.
+      // Also, if we miss the fact that the number of containers needed was
+      //  zero but increased due to a failure it's not too bad to launch one
+      //  container prematurely.
+      if (jobEntry.getValue().get() > 0) {
+        continue;
+      }
+
+      int numberSpeculationsAlready = 0;
+      int numberRunningTasks = 0;
+
+      // loop through the tasks of the kind
+      Job job = context.getJob(jobEntry.getKey());
+
+      Map<TaskId, Task> tasks = job.getTasks(type);
+
+      int numberAllowedSpeculativeTasks
+          = (int) Math.max(MINIMUM_ALLOWED_SPECULATIVE_TASKS,
+                           PROPORTION_TOTAL_TASKS_SPECULATABLE * tasks.size());
+
+      TaskId bestTaskID = null;
+      long bestSpeculationValue = -1L;
+
+      // this loop is potentially pricey.
+      // TODO track the tasks that are potentially worth looking at
+      for (Map.Entry<TaskId, Task> taskEntry : tasks.entrySet()) {
+        long mySpeculationValue = speculationValue(taskEntry.getKey(), now);
+
+        if (mySpeculationValue == ALREADY_SPECULATING) {
+          ++numberSpeculationsAlready;
+        }
+
+        if (mySpeculationValue != NOT_RUNNING) {
+          ++numberRunningTasks;
+        }
+
+        if (mySpeculationValue > bestSpeculationValue) {
+          bestTaskID = taskEntry.getKey();
+          bestSpeculationValue = mySpeculationValue;
+        }
+      }
+      numberAllowedSpeculativeTasks
+          = (int) Math.max(numberAllowedSpeculativeTasks,
+                           PROPORTION_RUNNING_TASKS_SPECULATABLE * numberRunningTasks);
+
+      // If we found a speculation target, fire it off
+      if (bestTaskID != null
+          && numberAllowedSpeculativeTasks > numberSpeculationsAlready) {
+        addSpeculativeAttempt(bestTaskID);
+        ++successes;
+      }
+    }
+
+    return successes;
+  }
+
+  private int computeSpeculations() {
+    // We'll try to issue one map and one reduce speculation per job per run
+    return maybeScheduleAMapSpeculation() + maybeScheduleAReduceSpeculation();
+  }
+}

Added: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/speculate/ExponentiallySmoothedTaskRuntimeEstimator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/speculate/ExponentiallySmoothedTaskRuntimeEstimator.java?rev=1376283&view=auto
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/speculate/ExponentiallySmoothedTaskRuntimeEstimator.java (added)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/speculate/ExponentiallySmoothedTaskRuntimeEstimator.java Wed Aug 22 22:11:39 2012
@@ -0,0 +1,192 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2.app2.speculate;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
+import org.apache.hadoop.mapreduce.v2.app2.AppContext;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptStatusUpdateEvent.TaskAttemptStatus;
+
+/**
+ * This estimator exponentially smooths the rate of progress versus wallclock
+ * time.  Conceivably we could write an estimator that smooths time per
+ * unit progress, and get different results.
+ */
+public class ExponentiallySmoothedTaskRuntimeEstimator extends StartEndTimesBase {
+
+  private final ConcurrentMap<TaskAttemptId, AtomicReference<EstimateVector>> estimates
+      = new ConcurrentHashMap<TaskAttemptId, AtomicReference<EstimateVector>>();
+
+  private SmoothedValue smoothedValue;
+
+  private long lambda;
+
+  public enum SmoothedValue {
+    RATE, TIME_PER_UNIT_PROGRESS
+  }
+
+  ExponentiallySmoothedTaskRuntimeEstimator
+      (long lambda, SmoothedValue smoothedValue) {
+    super();
+    this.smoothedValue = smoothedValue;
+    this.lambda = lambda;
+  }
+
+  public ExponentiallySmoothedTaskRuntimeEstimator() {
+    super();
+  }
+
+  // immutable
+  private class EstimateVector {
+    final double value;
+    final float basedOnProgress;
+    final long atTime;
+
+    EstimateVector(double value, float basedOnProgress, long atTime) {
+      this.value = value;
+      this.basedOnProgress = basedOnProgress;
+      this.atTime = atTime;
+    }
+
+    EstimateVector incorporate(float newProgress, long newAtTime) {
+      if (newAtTime <= atTime || newProgress < basedOnProgress) {
+        return this;
+      }
+
+      double oldWeighting
+          = value < 0.0
+              ? 0.0 : Math.exp(((double) (newAtTime - atTime)) / lambda);
+
+      double newRead = (newProgress - basedOnProgress) / (newAtTime - atTime);
+
+      if (smoothedValue == SmoothedValue.TIME_PER_UNIT_PROGRESS) {
+        newRead = 1.0 / newRead;
+      }
+
+      return new EstimateVector
+          (value * oldWeighting + newRead * (1.0 - oldWeighting),
+           newProgress, newAtTime);
+    }
+  }
+
+  private void incorporateReading
+      (TaskAttemptId attemptID, float newProgress, long newTime) {
+    //TODO: Refactor this method, it seems more complicated than necessary.
+    AtomicReference<EstimateVector> vectorRef = estimates.get(attemptID);
+
+    if (vectorRef == null) {
+      estimates.putIfAbsent(attemptID, new AtomicReference<EstimateVector>(null));
+      incorporateReading(attemptID, newProgress, newTime);
+      return;
+    }
+
+    EstimateVector oldVector = vectorRef.get();
+
+    if (oldVector == null) {
+      if (vectorRef.compareAndSet(null,
+             new EstimateVector(-1.0, 0.0F, Long.MIN_VALUE))) {
+        return;
+      }
+
+      incorporateReading(attemptID, newProgress, newTime);
+      return;
+    }
+
+    while (!vectorRef.compareAndSet
+            (oldVector, oldVector.incorporate(newProgress, newTime))) {
+      oldVector = vectorRef.get();
+    }
+  }
+
+  private EstimateVector getEstimateVector(TaskAttemptId attemptID) {
+    AtomicReference<EstimateVector> vectorRef = estimates.get(attemptID);
+
+    if (vectorRef == null) {
+      return null;
+    }
+
+    return vectorRef.get();
+  }
+
+  @Override
+  public void contextualize(Configuration conf, AppContext context) {
+    super.contextualize(conf, context);
+
+    lambda
+        = conf.getLong(MRJobConfig.MR_AM_TASK_ESTIMATOR_SMOOTH_LAMBDA_MS,
+            MRJobConfig.DEFAULT_MR_AM_TASK_ESTIMATOR_SMOOTH_LAMBDA_MS);
+    smoothedValue
+        = conf.getBoolean(MRJobConfig.MR_AM_TASK_ESTIMATOR_EXPONENTIAL_RATE_ENABLE, true)
+            ? SmoothedValue.RATE : SmoothedValue.TIME_PER_UNIT_PROGRESS;
+  }
+
+  @Override
+  public long estimatedRuntime(TaskAttemptId id) {
+    Long startTime = startTimes.get(id);
+
+    if (startTime == null) {
+      return -1L;
+    }
+
+    EstimateVector vector = getEstimateVector(id);
+
+    if (vector == null) {
+      return -1L;
+    }
+
+    long sunkTime = vector.atTime - startTime;
+
+    double value = vector.value;
+    float progress = vector.basedOnProgress;
+
+    if (value == 0) {
+      return -1L;
+    }
+
+    double rate = smoothedValue == SmoothedValue.RATE ? value : 1.0 / value;
+
+    if (rate == 0.0) {
+      return -1L;
+    }
+
+    double remainingTime = (1.0 - progress) / rate;
+
+    return sunkTime + (long)remainingTime;
+  }
+
+  @Override
+  public long runtimeEstimateVariance(TaskAttemptId id) {
+    return -1L;
+  }
+
+  @Override
+  public void updateAttempt(TaskAttemptStatus status, long timestamp) {
+    super.updateAttempt(status, timestamp);
+    TaskAttemptId attemptID = status.id;
+
+    float progress = status.progress;
+
+    incorporateReading(attemptID, progress, timestamp);
+  }
+}

Added: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/speculate/LegacyTaskRuntimeEstimator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/speculate/LegacyTaskRuntimeEstimator.java?rev=1376283&view=auto
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/speculate/LegacyTaskRuntimeEstimator.java (added)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/speculate/LegacyTaskRuntimeEstimator.java Wed Aug 22 22:11:39 2012
@@ -0,0 +1,150 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2.app2.speculate;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.hadoop.mapreduce.v2.api.records.JobId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
+import org.apache.hadoop.mapreduce.v2.app2.job.Job;
+import org.apache.hadoop.mapreduce.v2.app2.job.Task;
+import org.apache.hadoop.mapreduce.v2.app2.job.TaskAttempt;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptStatusUpdateEvent.TaskAttemptStatus;
+
+
+
+
+public class LegacyTaskRuntimeEstimator extends StartEndTimesBase {
+
+  private final Map<TaskAttempt, AtomicLong> attemptRuntimeEstimates
+      = new ConcurrentHashMap<TaskAttempt, AtomicLong>();
+  private final ConcurrentHashMap<TaskAttempt, AtomicLong> attemptRuntimeEstimateVariances
+      = new ConcurrentHashMap<TaskAttempt, AtomicLong>();
+
+  @Override
+  public void updateAttempt(TaskAttemptStatus status, long timestamp) {
+    super.updateAttempt(status, timestamp);
+    
+
+    TaskAttemptId attemptID = status.id;
+    TaskId taskID = attemptID.getTaskId();
+    JobId jobID = taskID.getJobId();
+    Job job = context.getJob(jobID);
+
+    if (job == null) {
+      return;
+    }
+
+    Task task = job.getTask(taskID);
+
+    if (task == null) {
+      return;
+    }
+
+    TaskAttempt taskAttempt = task.getAttempt(attemptID);
+
+    if (taskAttempt == null) {
+      return;
+    }
+
+    Long boxedStart = startTimes.get(attemptID);
+    long start = boxedStart == null ? Long.MIN_VALUE : boxedStart;
+
+    // We need to do two things.
+    //  1: If this is a completion, we accumulate statistics in the superclass
+    //  2: If this is not a completion, we learn more about it.
+
+    // This is not a completion, but we're cooking.
+    //
+    if (taskAttempt.getState() == TaskAttemptState.RUNNING) {
+      // See if this task is already in the registry
+      AtomicLong estimateContainer = attemptRuntimeEstimates.get(taskAttempt);
+      AtomicLong estimateVarianceContainer
+          = attemptRuntimeEstimateVariances.get(taskAttempt);
+
+      if (estimateContainer == null) {
+        if (attemptRuntimeEstimates.get(taskAttempt) == null) {
+          attemptRuntimeEstimates.put(taskAttempt, new AtomicLong());
+
+          estimateContainer = attemptRuntimeEstimates.get(taskAttempt);
+        }
+      }
+
+      if (estimateVarianceContainer == null) {
+        attemptRuntimeEstimateVariances.putIfAbsent(taskAttempt, new AtomicLong());
+        estimateVarianceContainer = attemptRuntimeEstimateVariances.get(taskAttempt);
+      }
+
+
+      long estimate = -1;
+      long varianceEstimate = -1;
+
+      // This code assumes that we'll never consider starting a third
+      //  speculative task attempt if two are already running for this task
+      if (start > 0 && timestamp > start) {
+        estimate = (long) ((timestamp - start) / Math.max(0.0001, status.progress));
+        varianceEstimate = (long) (estimate * status.progress / 10);
+      }
+      if (estimateContainer != null) {
+        estimateContainer.set(estimate);
+      }
+      if (estimateVarianceContainer != null) {
+        estimateVarianceContainer.set(varianceEstimate);
+      }
+    }
+  }
+
+  private long storedPerAttemptValue
+       (Map<TaskAttempt, AtomicLong> data, TaskAttemptId attemptID) {
+    TaskId taskID = attemptID.getTaskId();
+    JobId jobID = taskID.getJobId();
+    Job job = context.getJob(jobID);
+
+    Task task = job.getTask(taskID);
+
+    if (task == null) {
+      return -1L;
+    }
+
+    TaskAttempt taskAttempt = task.getAttempt(attemptID);
+
+    if (taskAttempt == null) {
+      return -1L;
+    }
+
+    AtomicLong estimate = data.get(taskAttempt);
+
+    return estimate == null ? -1L : estimate.get();
+
+  }
+
+  @Override
+  public long estimatedRuntime(TaskAttemptId attemptID) {
+    return storedPerAttemptValue(attemptRuntimeEstimates, attemptID);
+  }
+
+  @Override
+  public long runtimeEstimateVariance(TaskAttemptId attemptID) {
+    return storedPerAttemptValue(attemptRuntimeEstimateVariances, attemptID);
+  }
+}

Added: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/speculate/NullTaskRuntimesEngine.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/speculate/NullTaskRuntimesEngine.java?rev=1376283&view=auto
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/speculate/NullTaskRuntimesEngine.java (added)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/speculate/NullTaskRuntimesEngine.java Wed Aug 22 22:11:39 2012
@@ -0,0 +1,72 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2.app2.speculate;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
+import org.apache.hadoop.mapreduce.v2.app2.AppContext;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptStatusUpdateEvent.TaskAttemptStatus;
+
+
+/*
+ * This class is provided solely as an exemplae of the values that mean
+ *  that nothing needs to be computed.  It's not currently used.
+ */
+public class NullTaskRuntimesEngine implements TaskRuntimeEstimator {
+  @Override
+  public void enrollAttempt(TaskAttemptStatus status, long timestamp) {
+    // no code
+  }
+
+  @Override
+  public long attemptEnrolledTime(TaskAttemptId attemptID) {
+    return Long.MAX_VALUE;
+  }
+
+  @Override
+  public void updateAttempt(TaskAttemptStatus status, long timestamp) {
+    // no code
+  }
+
+  @Override
+  public void contextualize(Configuration conf, AppContext context) {
+    // no code
+  }
+
+  @Override
+  public long thresholdRuntime(TaskId id) {
+    return Long.MAX_VALUE;
+  }
+
+  @Override
+  public long estimatedRuntime(TaskAttemptId id) {
+    return -1L;
+  }
+  @Override
+  public long estimatedNewAttemptRuntime(TaskId id) {
+    return -1L;
+  }
+
+  @Override
+  public long runtimeEstimateVariance(TaskAttemptId id) {
+    return -1L;
+  }
+
+}

Added: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/speculate/Speculator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/speculate/Speculator.java?rev=1376283&view=auto
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/speculate/Speculator.java (added)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/speculate/Speculator.java Wed Aug 22 22:11:39 2012
@@ -0,0 +1,45 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2.app2.speculate;
+
+import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptStatusUpdateEvent.TaskAttemptStatus;
+import org.apache.hadoop.yarn.event.EventHandler;
+
+/**
+ * Speculator component. Task Attempts' status updates are sent to this
+ * component. Concrete implementation runs the speculative algorithm and
+ * sends the TaskEventType.T_ADD_ATTEMPT.
+ *
+ * An implementation also has to arrange for the jobs to be scanned from
+ * time to time, to launch the speculations.
+ */
+public interface Speculator
+              extends EventHandler<SpeculatorEvent> {
+
+  enum EventType {
+    ATTEMPT_STATUS_UPDATE,
+    ATTEMPT_START,
+    TASK_CONTAINER_NEED_UPDATE,
+    JOB_CREATE
+  }
+
+  // This will be implemented if we go to a model where the events are
+  //  processed within the TaskAttempts' state transitions' code.
+  public void handleAttempt(TaskAttemptStatus status);
+}

Added: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/speculate/SpeculatorEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/speculate/SpeculatorEvent.java?rev=1376283&view=auto
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/speculate/SpeculatorEvent.java (added)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/speculate/SpeculatorEvent.java Wed Aug 22 22:11:39 2012
@@ -0,0 +1,87 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2.app2.speculate;
+
+import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptStatusUpdateEvent.TaskAttemptStatus;
+import org.apache.hadoop.yarn.event.AbstractEvent;
+import org.apache.hadoop.mapreduce.v2.api.records.JobId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
+
+public class SpeculatorEvent extends AbstractEvent<Speculator.EventType> {
+
+  // valid for ATTEMPT_STATUS_UPDATE
+  private TaskAttemptStatus reportedStatus;
+
+  // valid for TASK_CONTAINER_NEED_UPDATE
+  private TaskId taskID;
+  private int containersNeededChange;
+  
+  // valid for CREATE_JOB
+  private JobId jobID;
+
+  public SpeculatorEvent(JobId jobID, long timestamp) {
+    super(Speculator.EventType.JOB_CREATE, timestamp);
+    this.jobID = jobID;
+  }
+
+  public SpeculatorEvent(TaskAttemptStatus reportedStatus, long timestamp) {
+    super(Speculator.EventType.ATTEMPT_STATUS_UPDATE, timestamp);
+    this.reportedStatus = reportedStatus;
+  }
+
+  public SpeculatorEvent(TaskAttemptId attemptID, boolean flag, long timestamp) {
+    super(Speculator.EventType.ATTEMPT_START, timestamp);
+    this.reportedStatus = new TaskAttemptStatus();
+    this.reportedStatus.id = attemptID;
+    this.taskID = attemptID.getTaskId();
+  }
+
+  /*
+   * This c'tor creates a TASK_CONTAINER_NEED_UPDATE event .
+   * We send a +1 event when a task enters a state where it wants a container,
+   *  and a -1 event when it either gets one or withdraws the request.
+   * The per job sum of all these events is the number of containers requested
+   *  but not granted.  The intent is that we only do speculations when the
+   *  speculation wouldn't compete for containers with tasks which need
+   *  to be run.
+   */
+  // TODO XXX: This statement sounds like it's going to be broken by ContainerRe-use.
+  public SpeculatorEvent(TaskId taskID, int containersNeededChange) {
+    super(Speculator.EventType.TASK_CONTAINER_NEED_UPDATE);
+    this.taskID = taskID;
+    this.containersNeededChange = containersNeededChange;
+  }
+
+  public TaskAttemptStatus getReportedStatus() {
+    return reportedStatus;
+  }
+
+  public int containersNeededChange() {
+    return containersNeededChange;
+  }
+
+  public TaskId getTaskID() {
+    return taskID;
+  }
+  
+  public JobId getJobID() {
+    return jobID;
+  }
+}

Added: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/speculate/StartEndTimesBase.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/speculate/StartEndTimesBase.java?rev=1376283&view=auto
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/speculate/StartEndTimesBase.java (added)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/speculate/StartEndTimesBase.java Wed Aug 22 22:11:39 2012
@@ -0,0 +1,213 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2.app2.speculate;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.v2.app2.AppContext;
+import org.apache.hadoop.mapreduce.v2.app2.job.Job;
+import org.apache.hadoop.mapreduce.v2.app2.job.Task;
+import org.apache.hadoop.mapreduce.v2.app2.job.TaskAttempt;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptStatusUpdateEvent.TaskAttemptStatus;
+import org.apache.hadoop.mapreduce.v2.api.records.JobId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
+
+abstract class StartEndTimesBase implements TaskRuntimeEstimator {
+  static final float MINIMUM_COMPLETE_PROPORTION_TO_SPECULATE
+      = 0.05F;
+  static final int MINIMUM_COMPLETE_NUMBER_TO_SPECULATE
+      = 1;
+
+  protected Configuration conf = null;
+  protected AppContext context = null;
+
+  protected final Map<TaskAttemptId, Long> startTimes
+      = new ConcurrentHashMap<TaskAttemptId, Long>();
+
+  // XXXX This class design assumes that the contents of AppContext.getAllJobs
+  //   never changes.  Is that right?
+  //
+  // This assumption comes in in several places, mostly in data structure that
+  //   can grow without limit if a AppContext gets new Job's when the old ones
+  //   run out.  Also, these mapper statistics blocks won't cover the Job's
+  //   we don't know about.
+  protected final Map<Job, DataStatistics> mapperStatistics
+      = new HashMap<Job, DataStatistics>();
+  protected final Map<Job, DataStatistics> reducerStatistics
+      = new HashMap<Job, DataStatistics>();
+
+
+  private final Map<Job, Float> slowTaskRelativeTresholds
+      = new HashMap<Job, Float>();
+
+  protected final Set<Task> doneTasks = new HashSet<Task>();
+
+  @Override
+  public void enrollAttempt(TaskAttemptStatus status, long timestamp) {
+    startTimes.put(status.id,timestamp);
+  }
+
+  @Override
+  public long attemptEnrolledTime(TaskAttemptId attemptID) {
+    Long result = startTimes.get(attemptID);
+
+    return result == null ? Long.MAX_VALUE : result;
+  }
+
+
+  @Override
+  public void contextualize(Configuration conf, AppContext context) {
+    this.conf = conf;
+    this.context = context;
+
+    Map<JobId, Job> allJobs = context.getAllJobs();
+
+    for (Map.Entry<JobId, Job> entry : allJobs.entrySet()) {
+      final Job job = entry.getValue();
+      mapperStatistics.put(job, new DataStatistics());
+      reducerStatistics.put(job, new DataStatistics());
+      slowTaskRelativeTresholds.put
+          (job, conf.getFloat(MRJobConfig.SPECULATIVE_SLOWTASK_THRESHOLD,1.0f));
+    }
+  }
+
+  protected DataStatistics dataStatisticsForTask(TaskId taskID) {
+    JobId jobID = taskID.getJobId();
+    Job job = context.getJob(jobID);
+
+    if (job == null) {
+      return null;
+    }
+
+    Task task = job.getTask(taskID);
+
+    if (task == null) {
+      return null;
+    }
+
+    return task.getType() == TaskType.MAP
+            ? mapperStatistics.get(job)
+            : task.getType() == TaskType.REDUCE
+                ? reducerStatistics.get(job)
+                : null;
+  }
+
+  @Override
+  public long thresholdRuntime(TaskId taskID) {
+    JobId jobID = taskID.getJobId();
+    Job job = context.getJob(jobID);
+
+    TaskType type = taskID.getTaskType();
+
+    DataStatistics statistics
+        = dataStatisticsForTask(taskID);
+
+    int completedTasksOfType
+        = type == TaskType.MAP
+            ? job.getCompletedMaps() : job.getCompletedReduces();
+
+    int totalTasksOfType
+        = type == TaskType.MAP
+            ? job.getTotalMaps() : job.getTotalReduces();
+
+    if (completedTasksOfType < MINIMUM_COMPLETE_NUMBER_TO_SPECULATE
+        || (((float)completedTasksOfType) / totalTasksOfType)
+              < MINIMUM_COMPLETE_PROPORTION_TO_SPECULATE ) {
+      return Long.MAX_VALUE;
+    }
+
+    long result =  statistics == null
+        ? Long.MAX_VALUE
+        : (long)statistics.outlier(slowTaskRelativeTresholds.get(job));
+    return result;
+  }
+
+  @Override
+  public long estimatedNewAttemptRuntime(TaskId id) {
+    DataStatistics statistics = dataStatisticsForTask(id);
+
+    if (statistics == null) {
+      return -1L;
+    }
+
+    return (long)statistics.mean();
+  }
+
+  @Override
+  public void updateAttempt(TaskAttemptStatus status, long timestamp) {
+
+    TaskAttemptId attemptID = status.id;
+    TaskId taskID = attemptID.getTaskId();
+    JobId jobID = taskID.getJobId();
+    Job job = context.getJob(jobID);
+
+    if (job == null) {
+      return;
+    }
+
+    Task task = job.getTask(taskID);
+
+    if (task == null) {
+      return;
+    }
+
+    Long boxedStart = startTimes.get(attemptID);
+    long start = boxedStart == null ? Long.MIN_VALUE : boxedStart;
+    
+    TaskAttempt taskAttempt = task.getAttempt(attemptID);
+
+    if (taskAttempt.getState() == TaskAttemptState.SUCCEEDED) {
+      boolean isNew = false;
+      // is this  a new success?
+      synchronized (doneTasks) {
+        if (!doneTasks.contains(task)) {
+          doneTasks.add(task);
+          isNew = true;
+        }
+      }
+
+      // It's a new completion
+      // Note that if a task completes twice [because of a previous speculation
+      //  and a race, or a success followed by loss of the machine with the
+      //  local data] we only count the first one.
+      if (isNew) {
+        long finish = timestamp;
+        if (start > 1L && finish > 1L && start <= finish) {
+          long duration = finish - start;
+
+          DataStatistics statistics
+          = dataStatisticsForTask(taskID);
+
+          if (statistics != null) {
+            statistics.add(duration);
+          }
+        }
+      }
+    }
+  }
+}

Added: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/speculate/TaskRuntimeEstimator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/speculate/TaskRuntimeEstimator.java?rev=1376283&view=auto
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/speculate/TaskRuntimeEstimator.java (added)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/speculate/TaskRuntimeEstimator.java Wed Aug 22 22:11:39 2012
@@ -0,0 +1,90 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2.app2.speculate;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
+import org.apache.hadoop.mapreduce.v2.app2.AppContext;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptStatusUpdateEvent.TaskAttemptStatus;
+
+
+
+public interface TaskRuntimeEstimator {
+  public void enrollAttempt(TaskAttemptStatus reportedStatus, long timestamp);
+
+  public long attemptEnrolledTime(TaskAttemptId attemptID);
+
+  public void updateAttempt(TaskAttemptStatus reportedStatus, long timestamp);
+
+  public void contextualize(Configuration conf, AppContext context);
+
+  /**
+   *
+   * Find a maximum reasonable execution wallclock time.  Includes the time
+   * already elapsed.
+   *
+   * Find a maximum reasonable execution time.  Includes the time
+   * already elapsed.  If the projected total execution time for this task
+   * ever exceeds its reasonable execution time, we may speculate it.
+   *
+   * @param id the {@link TaskId} of the task we are asking about
+   * @return the task's maximum reasonable runtime, or MAX_VALUE if
+   *         we don't have enough information to rule out any runtime,
+   *         however long.
+   *
+   */
+  public long thresholdRuntime(TaskId id);
+
+  /**
+   *
+   * Estimate a task attempt's total runtime.  Includes the time already
+   * elapsed.
+   *
+   * @param id the {@link TaskAttemptId} of the attempt we are asking about
+   * @return our best estimate of the attempt's runtime, or {@code -1} if
+   *         we don't have enough information yet to produce an estimate.
+   *
+   */
+  public long estimatedRuntime(TaskAttemptId id);
+
+  /**
+   *
+   * Estimates how long a new attempt on this task will take if we start
+   *  one now
+   *
+   * @param id the {@link TaskId} of the task we are asking about
+   * @return our best estimate of a new attempt's runtime, or {@code -1} if
+   *         we don't have enough information yet to produce an estimate.
+   *
+   */
+  public long estimatedNewAttemptRuntime(TaskId id);
+
+  /**
+   *
+   * Computes the width of the error band of our estimate of the task
+   *  runtime as returned by {@link #estimatedRuntime(TaskAttemptId)}
+   *
+   * @param id the {@link TaskAttemptId} of the attempt we are asking about
+   * @return our best estimate of the attempt's runtime, or {@code -1} if
+   *         we don't have enough information yet to produce an estimate.
+   *
+   */
+  public long runtimeEstimateVariance(TaskAttemptId id);
+}

Added: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/speculate/TaskSpeculationPredicate.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/speculate/TaskSpeculationPredicate.java?rev=1376283&view=auto
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/speculate/TaskSpeculationPredicate.java (added)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/speculate/TaskSpeculationPredicate.java Wed Aug 22 22:11:39 2012
@@ -0,0 +1,39 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2.app2.speculate;
+
+import org.apache.hadoop.mapreduce.v2.api.records.JobId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
+import org.apache.hadoop.mapreduce.v2.app2.AppContext;
+import org.apache.hadoop.mapreduce.v2.app2.job.Job;
+import org.apache.hadoop.mapreduce.v2.app2.job.Task;
+
+
+public class TaskSpeculationPredicate {
+  boolean canSpeculate(AppContext context, TaskId taskID) {
+    // This class rejects speculating any task that already has speculations,
+    //  or isn't running.
+    //  Subclasses should call TaskSpeculationPredicate.canSpeculate(...) , but
+    //  can be even more restrictive.
+    JobId jobID = taskID.getJobId();
+    Job job = context.getJob(jobID);
+    Task task = job.getTask(taskID);
+    return task.getAttempts().size() == 1;
+  }
+}

Added: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/speculate/package-info.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/speculate/package-info.java?rev=1376283&view=auto
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/speculate/package-info.java (added)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/speculate/package-info.java Wed Aug 22 22:11:39 2012
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+@InterfaceAudience.Private
+package org.apache.hadoop.mapreduce.v2.app2.speculate;
+import org.apache.hadoop.classification.InterfaceAudience;

Added: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/taskclean/TaskAttemptCleanupEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/taskclean/TaskAttemptCleanupEvent.java?rev=1376283&view=auto
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/taskclean/TaskAttemptCleanupEvent.java (added)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/taskclean/TaskAttemptCleanupEvent.java Wed Aug 22 22:11:39 2012
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.v2.app2.taskclean;
+
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.event.AbstractEvent;
+
+/**
+ * This class encapsulates task cleanup event.
+ * 
+ */
+public class TaskAttemptCleanupEvent extends
+    AbstractEvent<TaskCleaner.EventType> {
+
+  private final TaskAttemptId attemptID;
+  private final OutputCommitter committer;
+  private final TaskAttemptContext attemptContext;
+  private final ContainerId containerId;
+
+  public TaskAttemptCleanupEvent(TaskAttemptId attemptID,
+      ContainerId containerId, OutputCommitter committer,
+      TaskAttemptContext attemptContext) {
+    super(TaskCleaner.EventType.TASK_CLEAN);
+    this.attemptID = attemptID;
+    this.containerId = containerId;
+    this.committer = committer;
+    this.attemptContext = attemptContext;
+  }
+
+  public TaskAttemptId getAttemptID() {
+    return attemptID;
+  }
+
+  public OutputCommitter getCommitter() {
+    return committer;
+  }
+
+  public TaskAttemptContext getAttemptContext() {
+    return attemptContext;
+  }
+
+  /**
+   * containerId could be null if the container task attempt had not started.
+   * @return
+   */
+  public ContainerId getContainerId() {
+    return containerId;
+  }
+
+}

Added: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/taskclean/TaskCleaner.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/taskclean/TaskCleaner.java?rev=1376283&view=auto
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/taskclean/TaskCleaner.java (added)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/taskclean/TaskCleaner.java Wed Aug 22 22:11:39 2012
@@ -0,0 +1,30 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2.app2.taskclean;
+
+import org.apache.hadoop.yarn.event.EventHandler;
+
+public interface TaskCleaner extends EventHandler<TaskCleanupEvent> {
+
+  enum EventType {
+    // TODO XXX Rename this event once the code is more stable.
+    TASK_CLEAN,
+    CONTAINER_COMPLETED,
+  }
+}

Added: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/taskclean/TaskCleanerContainerCompletedEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/taskclean/TaskCleanerContainerCompletedEvent.java?rev=1376283&view=auto
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/taskclean/TaskCleanerContainerCompletedEvent.java (added)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/taskclean/TaskCleanerContainerCompletedEvent.java Wed Aug 22 22:11:39 2012
@@ -0,0 +1,18 @@
+package org.apache.hadoop.mapreduce.v2.app2.taskclean;
+
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.event.AbstractEvent;
+
+public class TaskCleanerContainerCompletedEvent extends AbstractEvent<TaskCleaner.EventType> {
+
+  private ContainerId containerId;
+  
+  public TaskCleanerContainerCompletedEvent(ContainerId containerId) {
+    super(TaskCleaner.EventType.CONTAINER_COMPLETED);
+    this.containerId = containerId;
+  }
+
+  public ContainerId getContainerId() {
+    return this.containerId;
+  }
+}

Added: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/taskclean/TaskCleanerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/taskclean/TaskCleanerImpl.java?rev=1376283&view=auto
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/taskclean/TaskCleanerImpl.java (added)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/taskclean/TaskCleanerImpl.java Wed Aug 22 22:11:39 2012
@@ -0,0 +1,115 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2.app2.taskclean;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.mapreduce.v2.app2.AppContext;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptEvent;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptEventType;
+import org.apache.hadoop.yarn.YarnException;
+import org.apache.hadoop.yarn.service.AbstractService;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+public class TaskCleanerImpl extends AbstractService implements TaskCleaner {
+
+  private static final Log LOG = LogFactory.getLog(TaskCleanerImpl.class);
+
+  private final AppContext context;
+  private ThreadPoolExecutor launcherPool;
+  private Thread eventHandlingThread;
+  private BlockingQueue<TaskCleanupEvent> eventQueue =
+      new LinkedBlockingQueue<TaskCleanupEvent>();
+
+  public TaskCleanerImpl(AppContext context) {
+    super("TaskCleaner");
+    this.context = context;
+  }
+
+  public void start() {
+    ThreadFactory tf = new ThreadFactoryBuilder()
+      .setNameFormat("TaskCleaner #%d")
+      .build();
+    launcherPool = new ThreadPoolExecutor(5, 5, 1, 
+        TimeUnit.HOURS, new LinkedBlockingQueue<Runnable>(), tf);
+    eventHandlingThread = new Thread(new Runnable() {
+      @Override
+      public void run() {
+        TaskCleanupEvent event = null;
+        while (!Thread.currentThread().isInterrupted()) {
+          try {
+            event = eventQueue.take();
+          } catch (InterruptedException e) {
+            LOG.error("Returning, interrupted : " + e);
+            return;
+          }
+          // the events from the queue are handled in parallel
+          // using a thread pool
+          launcherPool.execute(new EventProcessor(event));        }
+      }
+    });
+    eventHandlingThread.setName("TaskCleaner Event Handler");
+    eventHandlingThread.start();
+    super.start();
+  }
+
+  public void stop() {
+    eventHandlingThread.interrupt();
+    launcherPool.shutdown();
+    super.stop();
+  }
+
+  private class EventProcessor implements Runnable {
+    private TaskCleanupEvent event;
+
+    EventProcessor(TaskCleanupEvent event) {
+      this.event = event;
+    }
+
+    @Override
+    public void run() {
+      LOG.info("Processing the event " + event.toString());
+      try {
+        event.getCommitter().abortTask(event.getAttemptContext());
+      } catch (Exception e) {
+        LOG.warn("Task cleanup failed for attempt " + event.getAttemptID(), e);
+      }
+//      context.getEventHandler().handle(
+//          new TaskAttemptEvent(event.getAttemptID(), 
+//              TaskAttemptEventType.TA_CLEANUP_DONE));
+    }
+  }
+
+  @Override
+  public void handle(TaskCleanupEvent event) {
+    try {
+      eventQueue.put(event);
+    } catch (InterruptedException e) {
+      throw new YarnException(e);
+    }
+  }
+
+}



Mime
View raw message