hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From naganarasimha...@apache.org
Subject hadoop git commit: MAPREDUCE-6809. Create ContainerRequestor interface and refactor RMContainerRequestor to use it. Contributed by Devaraj K.
Date Sat, 12 Nov 2016 07:00:40 GMT
Repository: hadoop
Updated Branches:
  refs/heads/MR-6749 83b9cdf0b -> 06dcc886a


MAPREDUCE-6809. Create ContainerRequestor interface and refactor RMContainerRequestor to use
it. Contributed by Devaraj K.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/06dcc886
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/06dcc886
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/06dcc886

Branch: refs/heads/MR-6749
Commit: 06dcc886a96e33cac4ddb5bea2aeab741aa6fea3
Parents: 83b9cdf
Author: Naganarasimha <naganarasimha_gr@apache.org>
Authored: Sat Nov 12 12:28:42 2016 +0530
Committer: Naganarasimha <naganarasimha_gr@apache.org>
Committed: Sat Nov 12 12:28:42 2016 +0530

----------------------------------------------------------------------
 .../hadoop/mapreduce/v2/app/MRAppMaster.java    |  3 +-
 .../mapreduce/v2/app/rm/ContainerRequestor.java | 50 ++++++++++
 .../v2/app/rm/RMContainerAllocator.java         | 96 +++++++++++++-------
 .../v2/app/rm/RMContainerRequestor.java         | 54 +++++++----
 .../v2/app/rm/TestRMContainerAllocator.java     | 41 ++++-----
 5 files changed, 165 insertions(+), 79 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/06dcc886/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
index 4a8a90e..85e4b50 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
@@ -110,7 +110,6 @@ import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator;
 import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocatorEvent;
 import org.apache.hadoop.mapreduce.v2.app.rm.RMCommunicator;
 import org.apache.hadoop.mapreduce.v2.app.rm.RMContainerAllocator;
-import org.apache.hadoop.mapreduce.v2.app.rm.RMContainerRequestor;
 import org.apache.hadoop.mapreduce.v2.app.rm.RMHeartbeatHandler;
 import org.apache.hadoop.mapreduce.v2.app.rm.preemption.AMPreemptionPolicy;
 import org.apache.hadoop.mapreduce.v2.app.rm.preemption.NoopAMPreemptionPolicy;
@@ -1141,7 +1140,7 @@ public class MRAppMaster extends CompositeService {
 
     @Override
     public Set<String> getBlacklistedNodes() {
-      return ((RMContainerRequestor) containerAllocator).getBlacklistedNodes();
+      return ((RMContainerAllocator) containerAllocator).getBlacklistedNodes();
     }
     
     @Override

http://git-wip-us.apache.org/repos/asf/hadoop/blob/06dcc886/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/ContainerRequestor.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/ContainerRequestor.java
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/ContainerRequestor.java
new file mode 100644
index 0000000..2d54633
--- /dev/null
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/ContainerRequestor.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapreduce.v2.app.rm;
+
+import java.io.IOException;
+
+import org.apache.hadoop.mapreduce.v2.app.rm.RMContainerRequestor.ContainerRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+
+/**
+ * Interface for ContainerReqestor.
+ */
+public interface ContainerRequestor {
+
+  AllocateResponse makeRemoteRequest()
+      throws YarnRuntimeException, YarnException, IOException;
+
+  void addContainerReq(ContainerRequest request);
+
+  void decContainerReq(ContainerRequest request);
+
+  void release(ContainerId containerId);
+
+  boolean isNodeBlacklisted(String hostname);
+
+  Resource getAvailableResources();
+
+  void containerFailedOnHost(String hostName);
+
+  ContainerRequest filterRequest(ContainerRequest orig);
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/06dcc886/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
index 4cb3cbe..9f34ea4 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
@@ -60,6 +60,7 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptDiagnosticsUpdate
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptKillEvent;
+import org.apache.hadoop.mapreduce.v2.app.rm.RMContainerRequestor.ContainerRequest;
 import org.apache.hadoop.mapreduce.v2.app.rm.preemption.AMPreemptionPolicy;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.StringInterner;
@@ -94,7 +95,7 @@ import com.google.common.annotations.VisibleForTesting;
 /**
  * Allocates the container from the ResourceManager scheduler.
  */
-public class RMContainerAllocator extends RMContainerRequestor
+public class RMContainerAllocator extends RMCommunicator
     implements ContainerAllocator {
 
   static final Log LOG = LogFactory.getLog(RMContainerAllocator.class);
@@ -114,6 +115,9 @@ public class RMContainerAllocator extends RMContainerRequestor
   private Thread eventHandlingThread;
   private final AtomicBoolean stopped;
 
+  @VisibleForTesting
+  protected RMContainerRequestor containerRequestor;
+
   static {
     PRIORITY_FAST_FAIL_MAP = RecordFactoryProvider.getRecordFactory(null).newRecordInstance(Priority.class);
     PRIORITY_FAST_FAIL_MAP.setPriority(5);
@@ -207,6 +211,7 @@ public class RMContainerAllocator extends RMContainerRequestor
     return new AssignedRequests();
   }
 
+  @SuppressWarnings("unchecked")
   @Override
   protected void serviceInit(Configuration conf) throws Exception {
     super.serviceInit(conf);
@@ -240,6 +245,8 @@ public class RMContainerAllocator extends RMContainerRequestor
     this.scheduledRequests.setNumOpportunisticMapsPer100(
         conf.getInt(MRJobConfig.MR_NUM_OPPORTUNISTIC_MAPS_PER_100,
             MRJobConfig.DEFAULT_MR_NUM_OPPORTUNISTIC_MAPS_PER_100));
+    containerRequestor = new RMContainerRequestor(this);
+    containerRequestor.init(conf);
   }
 
   @Override
@@ -450,8 +457,8 @@ public class RMContainerAllocator extends RMContainerRequestor
           removed = true;
           assignedRequests.remove(aId);
           containersReleased++;
-          pendingRelease.add(containerId);
-          release(containerId);
+          containerRequestor.pendingRelease.add(containerId);
+          containerRequestor.release(containerId);
         }
       }
       if (!removed) {
@@ -463,7 +470,7 @@ public class RMContainerAllocator extends RMContainerRequestor
         event.getType() == ContainerAllocator.EventType.CONTAINER_FAILED) {
       ContainerFailedEvent fEv = (ContainerFailedEvent) event;
       String host = getHost(fEv.getContMgrAddress());
-      containerFailedOnHost(host);
+      containerRequestor.containerFailedOnHost(host);
       // propagate failures to preemption policy to discard checkpoints for
       // failed tasks
       preemptionPolicy.handleFailedContainer(event.getAttemptID());
@@ -522,9 +529,11 @@ public class RMContainerAllocator extends RMContainerRequestor
 
     // The pending mappers haven't been waiting for too long. Let us see if
     // the headroom can fit a mapper.
-    Resource availableResourceForMap = getAvailableResources();
-    if (ResourceCalculatorUtils.computeAvailableContainers(availableResourceForMap,
-        mapResourceRequest, getSchedulerResourceTypes()) > 0) {
+    Resource availableResourceForMap = containerRequestor
+        .getAvailableResources();
+    if (ResourceCalculatorUtils.computeAvailableContainers(
+        availableResourceForMap, mapResourceRequest,
+        getSchedulerResourceTypes()) > 0) {
       // the available headroom is enough to run a mapper
       return false;
     }
@@ -601,7 +610,7 @@ public class RMContainerAllocator extends RMContainerRequestor
     }
     
     // get available resources for this job
-    Resource headRoom = getAvailableResources();
+    Resource headRoom = containerRequestor.getAvailableResources();
 
     LOG.info("Recalculating schedule, headroom=" + headRoom);
     
@@ -732,7 +741,8 @@ public class RMContainerAllocator extends RMContainerRequestor
     applyConcurrentTaskLimits();
 
     // will be null the first time
-    Resource headRoom = Resources.clone(getAvailableResources());
+    Resource headRoom = Resources
+        .clone(containerRequestor.getAvailableResources());
     AllocateResponse response;
     /*
      * If contact with RM is lost, the AM will wait MR_AM_TO_RM_WAIT_INTERVAL_MS
@@ -740,7 +750,7 @@ public class RMContainerAllocator extends RMContainerRequestor
      * to contact the RM.
      */
     try {
-      response = makeRemoteRequest();
+      response = containerRequestor.makeRemoteRequest();
       // Reset retry count if no exception occurred.
       retrystartTime = System.currentTimeMillis();
     } catch (ApplicationAttemptNotFoundException e ) {
@@ -755,9 +765,9 @@ public class RMContainerAllocator extends RMContainerRequestor
       LOG.info("ApplicationMaster is out of sync with ResourceManager,"
           + " hence resync and send outstanding requests.");
       // RM may have restarted, re-register with RM.
-      lastResponseID = 0;
+      containerRequestor.lastResponseID = 0;
       register();
-      addOutstandingRequestOnResync();
+      containerRequestor.addOutstandingRequestOnResync();
       return null;
     } catch (InvalidLabelResourceRequestException e) {
       // If Invalid label exception is received means the requested label doesnt
@@ -783,7 +793,7 @@ public class RMContainerAllocator extends RMContainerRequestor
       // continue to attempt to contact the RM.
       throw e;
     }
-    Resource newHeadRoom = getAvailableResources();
+    Resource newHeadRoom = containerRequestor.getAvailableResources();
     List<Container> newContainers = response.getAllocatedContainers();
     // Setting NMTokens
     if (response.getNMTokens() != null) {
@@ -823,7 +833,7 @@ public class RMContainerAllocator extends RMContainerRequestor
     }
 
     //Called on each allocation. Will know about newly blacklisted/added hosts.
-    computeIgnoreBlacklisting();
+    containerRequestor.computeIgnoreBlacklisting();
 
     handleUpdatedNodes(response);
     handleJobPriorityChange(response);
@@ -852,7 +862,7 @@ public class RMContainerAllocator extends RMContainerRequestor
       LOG.error("Container complete event for unknown container "
           + container.getContainerId());
     } else {
-      pendingRelease.remove(container.getContainerId());
+      containerRequestor.pendingRelease.remove(container.getContainerId());
       assignedRequests.remove(attemptID);
 
       // Send the diagnostics
@@ -878,11 +888,12 @@ public class RMContainerAllocator extends RMContainerRequestor
       int normalMapRequestLimit = Math.min(
           maxRequestedMaps - failedMapRequestLimit,
           numScheduledMaps - numScheduledFailMaps);
-      setRequestLimit(PRIORITY_FAST_FAIL_MAP, mapResourceRequest,
-          failedMapRequestLimit);
-      setRequestLimit(PRIORITY_MAP, mapResourceRequest, normalMapRequestLimit);
-      setRequestLimit(PRIORITY_OPPORTUNISTIC_MAP, mapResourceRequest,
+      containerRequestor.setRequestLimit(PRIORITY_FAST_FAIL_MAP,
+          mapResourceRequest, failedMapRequestLimit);
+      containerRequestor.setRequestLimit(PRIORITY_MAP, mapResourceRequest,
           normalMapRequestLimit);
+      containerRequestor.setRequestLimit(PRIORITY_OPPORTUNISTIC_MAP,
+          mapResourceRequest, normalMapRequestLimit);
     }
 
     int numScheduledReduces = scheduledRequests.reduces.size();
@@ -891,7 +902,7 @@ public class RMContainerAllocator extends RMContainerRequestor
           maxRunningReduces - assignedRequests.reduces.size());
       int reduceRequestLimit = Math.min(maxRequestedReduces,
           numScheduledReduces);
-      setRequestLimit(PRIORITY_REDUCE, reduceResourceRequest,
+      containerRequestor.setRequestLimit(PRIORITY_REDUCE, reduceResourceRequest,
           reduceRequestLimit);
     }
   }
@@ -980,7 +991,7 @@ public class RMContainerAllocator extends RMContainerRequestor
 
   @Private
   public Resource getResourceLimit() {
-    Resource headRoom = getAvailableResources();
+    Resource headRoom = containerRequestor.getAvailableResources();
     Resource assignedMapResource =
         Resources.multiply(mapResourceRequest, assignedRequests.maps.size());
     Resource assignedReduceResource =
@@ -1032,7 +1043,7 @@ public class RMContainerAllocator extends RMContainerRequestor
       if (req == null) {
         return false;
       } else {
-        decContainerReq(req);
+        containerRequestor.decContainerReq(req);
         return true;
       }
     }
@@ -1042,7 +1053,7 @@ public class RMContainerAllocator extends RMContainerRequestor
       if (it.hasNext()) {
         Entry<TaskAttemptId, ContainerRequest> entry = it.next();
         it.remove();
-        decContainerReq(entry.getValue());
+        containerRequestor.decContainerReq(entry.getValue());
         return entry.getValue();
       }
       return null;
@@ -1059,14 +1070,15 @@ public class RMContainerAllocator extends RMContainerRequestor
         LOG.info("Added "+event.getAttemptID()+" to list of failed maps");
         // If its an earlier Failed attempt, do not retry as OPPORTUNISTIC
         maps.put(event.getAttemptID(), request);
-        addContainerReq(request);
+        containerRequestor.addContainerReq(request);
       } else {
         if (mapsMod100 < numOpportunisticMapsPer100) {
           request =
               new ContainerRequest(event, PRIORITY_OPPORTUNISTIC_MAP,
                   mapNodeLabelExpression);
           maps.put(event.getAttemptID(), request);
-          addOpportunisticResourceRequest(request.priority, request.capability);
+          containerRequestor.addOpportunisticResourceRequest(request.priority,
+              request.capability);
         } else {
           request =
               new ContainerRequest(event, PRIORITY_MAP, mapNodeLabelExpression);
@@ -1093,7 +1105,7 @@ public class RMContainerAllocator extends RMContainerRequestor
             }
           }
           maps.put(event.getAttemptID(), request);
-          addContainerReq(request);
+          containerRequestor.addContainerReq(request);
         }
         mapsMod100++;
         mapsMod100 %= 100;
@@ -1103,7 +1115,7 @@ public class RMContainerAllocator extends RMContainerRequestor
     
     void addReduce(ContainerRequest req) {
       reduces.put(req.attemptID, req);
-      addContainerReq(req);
+      containerRequestor.addContainerReq(req);
     }
     
     // this method will change the list of allocatedContainers.
@@ -1168,7 +1180,7 @@ public class RMContainerAllocator extends RMContainerRequestor
         // do not assign if allocated container is on a  
         // blacklisted host
         String allocatedHost = allocated.getNodeId().getHost();
-        if (isNodeBlacklisted(allocatedHost)) {
+        if (containerRequestor.isNodeBlacklisted(allocatedHost)) {
           // we need to request for a new container 
           // and release the current one
           LOG.info("Got allocated container on a blacklisted "
@@ -1182,9 +1194,9 @@ public class RMContainerAllocator extends RMContainerRequestor
           if (toBeReplacedReq != null) {
             LOG.info("Placing a new container request for task attempt " 
                 + toBeReplacedReq.attemptID);
-            ContainerRequest newReq = 
-                getFilteredContainerRequest(toBeReplacedReq);
-            decContainerReq(toBeReplacedReq);
+            ContainerRequest newReq = containerRequestor
+                .filterRequest(toBeReplacedReq);
+            containerRequestor.decContainerReq(toBeReplacedReq);
             if (toBeReplacedReq.attemptID.getTaskId().getTaskType() ==
                 TaskType.MAP) {
               maps.put(newReq.attemptID, newReq);
@@ -1192,7 +1204,7 @@ public class RMContainerAllocator extends RMContainerRequestor
             else {
               reduces.put(newReq.attemptID, newReq);
             }
-            addContainerReq(newReq);
+            containerRequestor.addContainerReq(newReq);
           }
           else {
             LOG.info("Could not map allocated container to a valid request."
@@ -1221,7 +1233,7 @@ public class RMContainerAllocator extends RMContainerRequestor
     private void containerAssigned(Container allocated, 
                                     ContainerRequest assigned) {
       // Update resource requests
-      decContainerReq(assigned);
+      containerRequestor.decContainerReq(assigned);
 
       // send the container-assigned event to task attempt
       eventHandler.handle(new TaskAttemptContainerAssignedEvent(
@@ -1238,8 +1250,8 @@ public class RMContainerAllocator extends RMContainerRequestor
     
     private void containerNotAssigned(Container allocated) {
       containersReleased++;
-      pendingRelease.add(allocated.getId());
-      release(allocated.getId());      
+      containerRequestor.pendingRelease.add(allocated.getId());
+      containerRequestor.release(allocated.getId());
     }
     
     private ContainerRequest assignWithoutLocality(Container allocated) {
@@ -1604,4 +1616,18 @@ public class RMContainerAllocator extends RMContainerRequestor
 
   }
 
+  public Set<String> getBlacklistedNodes() {
+    return containerRequestor.getBlacklistedNodes();
+  }
+
+  public RMContainerRequestor getContainerRequestor() {
+    return containerRequestor;
+  }
+
+  public void resetContainerForReuse(ContainerId containerId) {
+    TaskAttemptId attemptId = assignedRequests.get(containerId);
+    if (attemptId != null) {
+      assignedRequests.remove(attemptId);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/06dcc886/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java
index f4579ab..3475d75 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java
@@ -36,10 +36,10 @@ import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
-import org.apache.hadoop.mapreduce.v2.app.AppContext;
-import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
+import org.apache.hadoop.service.AbstractService;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
 import org.apache.hadoop.yarn.api.records.ExecutionType;
@@ -60,7 +60,8 @@ import org.apache.hadoop.yarn.util.resource.Resources;
 /**
  * Keeps the data structures to send container requests to RM.
  */
-public abstract class RMContainerRequestor extends RMCommunicator {
+public class RMContainerRequestor extends AbstractService
+    implements ContainerRequestor {
   
   private static final Log LOG = LogFactory.getLog(RMContainerRequestor.class);
   private static final ResourceRequestComparator RESOURCE_REQUEST_COMPARATOR =
@@ -110,9 +111,13 @@ public abstract class RMContainerRequestor extends RMCommunicator {
       .newSetFromMap(new ConcurrentHashMap<String, Boolean>());
   private final Set<String> blacklistRemovals = Collections
       .newSetFromMap(new ConcurrentHashMap<String, Boolean>());
+  private final ApplicationId applicationId;
+  private final RMCommunicator rmCommunicator;
 
-  public RMContainerRequestor(ClientService clientService, AppContext context) {
-    super(clientService, context);
+  public RMContainerRequestor(RMCommunicator rmCommunicator) {
+    super(RMContainerRequestor.class.getName());
+    this.rmCommunicator = rmCommunicator;
+    applicationId = rmCommunicator.applicationId;
   }
 
   @Private
@@ -193,17 +198,19 @@ public abstract class RMContainerRequestor extends RMCommunicator {
     LOG.info("blacklistDisablePercent is " + blacklistDisablePercent);
   }
 
-  protected AllocateResponse makeRemoteRequest() throws YarnException,
-      IOException {
+  @Override
+  public AllocateResponse makeRemoteRequest()
+      throws YarnException, IOException {
     applyRequestLimits();
     ResourceBlacklistRequest blacklistRequest =
         ResourceBlacklistRequest.newInstance(new ArrayList<String>(blacklistAdditions),
             new ArrayList<String>(blacklistRemovals));
-    AllocateRequest allocateRequest =
-        AllocateRequest.newInstance(lastResponseID,
-          super.getApplicationProgress(), new ArrayList<ResourceRequest>(ask),
-          new ArrayList<ContainerId>(release), blacklistRequest);
-    AllocateResponse allocateResponse = scheduler.allocate(allocateRequest);
+    AllocateRequest allocateRequest = AllocateRequest.newInstance(
+        lastResponseID, rmCommunicator.getApplicationProgress(),
+        new ArrayList<ResourceRequest>(ask),
+        new ArrayList<ContainerId>(release), blacklistRequest);
+    AllocateResponse allocateResponse = rmCommunicator.scheduler
+        .allocate(allocateRequest);
     lastResponseID = allocateResponse.getResponseId();
     availableResources = allocateResponse.getAvailableResources();
     lastClusterNmCount = clusterNmCount;
@@ -323,7 +330,8 @@ public abstract class RMContainerRequestor extends RMCommunicator {
     }
   }
   
-  protected void containerFailedOnHost(String hostName) {
+  @Override
+  public void containerFailedOnHost(String hostName) {
     if (!nodeBlacklistingEnabled) {
       return;
     }
@@ -388,11 +396,13 @@ public abstract class RMContainerRequestor extends RMCommunicator {
     }
   }
 
-  protected Resource getAvailableResources() {
+  @Override
+  public Resource getAvailableResources() {
     return availableResources == null ? Resources.none() : availableResources;
   }
 
-  protected void addContainerReq(ContainerRequest req) {
+  @Override
+  public void addContainerReq(ContainerRequest req) {
     // Create resource requests
     for (String host : req.hosts) {
       // Data-local
@@ -413,7 +423,8 @@ public abstract class RMContainerRequestor extends RMCommunicator {
         req.nodeLabelExpression);
   }
 
-  protected void decContainerReq(ContainerRequest req) {
+  @Override
+  public void decContainerReq(ContainerRequest req) {
     // Update resource requests
     for (String hostName : req.hosts) {
       decResourceRequest(req.priority, hostName, req.capability);
@@ -539,18 +550,21 @@ public abstract class RMContainerRequestor extends RMCommunicator {
     ask.add(remoteRequest);    
   }
 
-  protected void release(ContainerId containerId) {
+  @Override
+  public void release(ContainerId containerId) {
     release.add(containerId);
   }
   
-  protected boolean isNodeBlacklisted(String hostname) {
+  @Override
+  public boolean isNodeBlacklisted(String hostname) {
     if (!nodeBlacklistingEnabled || ignoreBlacklisting.get()) {
       return false;
     }
     return blacklistedNodes.contains(hostname);
   }
-  
-  protected ContainerRequest getFilteredContainerRequest(ContainerRequest orig) {
+
+  @Override
+  public ContainerRequest filterRequest(ContainerRequest orig) {
     ArrayList<String> newHosts = new ArrayList<String>();
     for (String host : orig.hosts) {
       if (!isNodeBlacklisted(host)) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/06dcc886/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java
index 38a9731..1570c9b 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java
@@ -24,6 +24,7 @@ import static org.mockito.Matchers.anyInt;
 import static org.mockito.Matchers.isA;
 import static org.mockito.Mockito.doCallRealMethod;
 import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.inOrder;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
@@ -664,7 +665,7 @@ public class TestRMContainerAllocator {
     final String[] locations = new String[] { host };
     allocator.sendRequest(createReq(jobId, 0, 1024, locations, false, true));
     allocator.scheduleAllReduces();
-    allocator.makeRemoteRequest();
+    allocator.containerRequestor.makeRemoteRequest();
     nm.nodeHeartbeat(true);
     dispatcher.await();
     allocator.sendRequest(createReq(jobId, 1, 1024, locations, false, false));
@@ -2059,24 +2060,18 @@ public class TestRMContainerAllocator {
     public void updateSchedulerProxy(MyResourceManager rm) {
       scheduler = rm.getApplicationMasterService();
     }
-
-    @Override
-    protected AllocateResponse makeRemoteRequest() throws IOException,
-        YarnException {
-      allocateResponse = super.makeRemoteRequest();
-      return allocateResponse;
-    }
   }
 
   private static class MyContainerAllocator2 extends MyContainerAllocator {
     public MyContainerAllocator2(MyResourceManager rm, Configuration conf,
-      ApplicationAttemptId appAttemptId, Job job) {
+        ApplicationAttemptId appAttemptId, Job job)
+        throws YarnException, IOException {
       super(rm, conf, appAttemptId, job);
-    }
-    @Override
-    protected AllocateResponse makeRemoteRequest() throws IOException,
-      YarnException {
-      throw new IOException("for testing");
+      containerRequestor = mock(RMContainerRequestor.class);
+      doThrow(new IOException("for testing")).when(containerRequestor)
+          .makeRemoteRequest();
+      doReturn(Resource.newInstance(2048, 1)).when(containerRequestor)
+          .getAvailableResources();
     }
   }
 
@@ -2100,6 +2095,7 @@ public class TestRMContainerAllocator {
         any(Resource.class), anyInt(), anyFloat(), anyFloat());
     doReturn(EnumSet.of(SchedulerResourceTypes.MEMORY)).when(allocator)
       .getSchedulerResourceTypes();
+    allocator.containerRequestor = mock(RMContainerRequestor.class);
 
     // Test slow-start
     allocator.scheduleReduces(
@@ -2983,8 +2979,8 @@ public class TestRMContainerAllocator {
     Assert.assertEquals(1, allocator.getScheduledRequests().maps.size());
     Assert.assertEquals(0, allocator.getAssignedRequests().maps.size());
 
-    Assert.assertEquals(6, allocator.getAsk().size());
-    for (ResourceRequest req : allocator.getAsk()) {
+    Assert.assertEquals(6, allocator.containerRequestor.getAsk().size());
+    for (ResourceRequest req : allocator.containerRequestor.getAsk()) {
       boolean isReduce =
           req.getPriority().equals(RMContainerAllocator.PRIORITY_REDUCE);
       if (isReduce) {
@@ -3009,8 +3005,8 @@ public class TestRMContainerAllocator {
     // indicate ramping down of reduces to scheduler.
     Assert.assertEquals(0, allocator.getScheduledRequests().reduces.size());
     Assert.assertEquals(2, allocator.getNumOfPendingReduces());
-    Assert.assertEquals(3, allocator.getAsk().size());
-    for (ResourceRequest req : allocator.getAsk()) {
+    Assert.assertEquals(3, allocator.containerRequestor.getAsk().size());
+    for (ResourceRequest req : allocator.containerRequestor.getAsk()) {
       Assert.assertEquals(
           RMContainerAllocator.PRIORITY_REDUCE, req.getPriority());
       Assert.assertTrue(req.getResourceName().equals("*") ||
@@ -3037,6 +3033,7 @@ public class TestRMContainerAllocator {
     RMContainerAllocator containerAllocator =
         new RMContainerAllocatorForFinishedContainer(null, context,
             mock(AMPreemptionPolicy.class));
+    containerAllocator.init(new Configuration());
 
     ContainerStatus finishedContainer = ContainerStatus.newInstance(
         mock(ContainerId.class), ContainerState.COMPLETE, "", 0);
@@ -3149,8 +3146,8 @@ public class TestRMContainerAllocator {
     Assert.assertEquals(1, allocator.getScheduledRequests().maps.size());
     Assert.assertEquals(0, allocator.getAssignedRequests().maps.size());
 
-    Assert.assertEquals(6, allocator.getAsk().size());
-    for (ResourceRequest req : allocator.getAsk()) {
+    Assert.assertEquals(6, allocator.containerRequestor.getAsk().size());
+    for (ResourceRequest req : allocator.containerRequestor.getAsk()) {
       boolean isReduce =
           req.getPriority().equals(RMContainerAllocator.PRIORITY_REDUCE);
       if (isReduce) {
@@ -3178,8 +3175,8 @@ public class TestRMContainerAllocator {
     // indicate ramping down of reduces to scheduler.
     Assert.assertEquals(0, allocator.getScheduledRequests().reduces.size());
     Assert.assertEquals(2, allocator.getNumOfPendingReduces());
-    Assert.assertEquals(3, allocator.getAsk().size());
-    for (ResourceRequest req : allocator.getAsk()) {
+    Assert.assertEquals(3, allocator.containerRequestor.getAsk().size());
+    for (ResourceRequest req : allocator.containerRequestor.getAsk()) {
       Assert.assertEquals(
           RMContainerAllocator.PRIORITY_REDUCE, req.getPriority());
       Assert.assertTrue(req.getResourceName().equals("*") ||


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message