hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From asur...@apache.org
Subject hadoop git commit: YARN-6777. Support for ApplicationMasterService processing chain of interceptors. (asuresh)
Date Wed, 19 Jul 2017 19:28:06 GMT
Repository: hadoop
Updated Branches:
  refs/heads/trunk 3556e36be -> 077fcf6a9


YARN-6777. Support for ApplicationMasterService processing chain of interceptors. (asuresh)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/077fcf6a
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/077fcf6a
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/077fcf6a

Branch: refs/heads/trunk
Commit: 077fcf6a96e420e7f36350931722b8603d010cf1
Parents: 3556e36
Author: Arun Suresh <asuresh@apache.org>
Authored: Mon Jul 17 17:02:22 2017 -0700
Committer: Arun Suresh <asuresh@apache.org>
Committed: Wed Jul 19 12:26:40 2017 -0700

----------------------------------------------------------------------
 .../ams/ApplicationMasterServiceContext.java    |  29 ++++
 .../ams/ApplicationMasterServiceProcessor.java  |  30 ++--
 .../hadoop/yarn/conf/YarnConfiguration.java     |   5 +-
 .../src/main/resources/yarn-default.xml         |  10 ++
 .../resourcemanager/AMSProcessingChain.java     | 102 ++++++++++++
 .../ApplicationMasterService.java               |  49 ++++--
 .../resourcemanager/DefaultAMSProcessor.java    |  69 ++++----
 ...pportunisticContainerAllocatorAMService.java |  67 +++++---
 .../yarn/server/resourcemanager/RMContext.java  |   3 +-
 .../TestApplicationMasterService.java           | 163 ++++++++++++++++++-
 10 files changed, 446 insertions(+), 81 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/077fcf6a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/ams/ApplicationMasterServiceContext.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/ams/ApplicationMasterServiceContext.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/ams/ApplicationMasterServiceContext.java
new file mode 100644
index 0000000..988c727
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/ams/ApplicationMasterServiceContext.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.ams;
+
+/**
+ * This is a marker interface for a context object that is injected into
+ * the ApplicationMasterService processor. The processor implementation
+ * is free to type cast this based on the availability of the context's
+ * implementation in the classpath.
+ */
+public interface ApplicationMasterServiceContext {
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/077fcf6a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/ams/ApplicationMasterServiceProcessor.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/ams/ApplicationMasterServiceProcessor.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/ams/ApplicationMasterServiceProcessor.java
index b426f48..b7d925a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/ams/ApplicationMasterServiceProcessor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/ams/ApplicationMasterServiceProcessor.java
@@ -38,34 +38,44 @@ import java.io.IOException;
 public interface ApplicationMasterServiceProcessor {
 
   /**
+   * Initialize with and ApplicationMasterService Context as well as the
+   * next processor in the chain.
+   * @param amsContext AMSContext.
+   * @param nextProcessor next ApplicationMasterServiceProcessor
+   */
+  void init(ApplicationMasterServiceContext amsContext,
+      ApplicationMasterServiceProcessor nextProcessor);
+
+  /**
    * Register AM attempt.
    * @param applicationAttemptId applicationAttemptId.
    * @param request Register Request.
-   * @return Register Response.
+   * @param response Register Response.
    * @throws IOException IOException.
    */
-  RegisterApplicationMasterResponse registerApplicationMaster(
+  void registerApplicationMaster(
       ApplicationAttemptId applicationAttemptId,
-      RegisterApplicationMasterRequest request) throws IOException;
+      RegisterApplicationMasterRequest request,
+      RegisterApplicationMasterResponse response) throws IOException;
 
   /**
    * Allocate call.
    * @param appAttemptId appAttemptId.
    * @param request Allocate Request.
-   * @return Allocate Response.
+   * @param response Allocate Response.
    * @throws YarnException YarnException.
    */
-  AllocateResponse allocate(ApplicationAttemptId appAttemptId,
-      AllocateRequest request) throws YarnException;
+  void allocate(ApplicationAttemptId appAttemptId,
+      AllocateRequest request, AllocateResponse response) throws YarnException;
 
   /**
    * Finish AM.
    * @param applicationAttemptId applicationAttemptId.
    * @param request Finish AM Request.
-   * @return Finish AM response.
+   * @param response Finish AM Response.
    */
-  FinishApplicationMasterResponse finishApplicationMaster(
+  void finishApplicationMaster(
       ApplicationAttemptId applicationAttemptId,
-      FinishApplicationMasterRequest request);
-
+      FinishApplicationMasterRequest request,
+      FinishApplicationMasterResponse response);
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/077fcf6a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 01eff64..93437e3 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -103,7 +103,7 @@ public class YarnConfiguration extends Configuration {
       YarnConfiguration.NM_PREFIX + "log-container-debug-info.enabled";
 
   public static final boolean DEFAULT_NM_LOG_CONTAINER_DEBUG_INFO = false;
-  
+
   ////////////////////////////////
   // IPC Configs
   ////////////////////////////////
@@ -150,6 +150,9 @@ public class YarnConfiguration extends Configuration {
   public static final String DEFAULT_RM_ADDRESS =
     "0.0.0.0:" + DEFAULT_RM_PORT;
 
+  public static final String RM_APPLICATION_MASTER_SERVICE_PROCESSORS =
+      RM_PREFIX + "application-master-service.processors";
+
   /** The actual bind address for the RM.*/
   public static final String RM_BIND_HOST =
     RM_PREFIX + "bind-host";

http://git-wip-us.apache.org/repos/asf/hadoop/blob/077fcf6a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index 0588c6c..7ddcfcd 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -123,6 +123,16 @@
   </property>
 
   <property>
+    <description>
+      Comma separated class names of ApplicationMasterServiceProcessor
+      implementations. The processors will be applied in the order
+      they are specified.
+    </description>
+    <name>yarn.resourcemanager.application-master-service.processors</name>
+    <value></value>
+  </property>
+
+  <property>
       <description>
         This configures the HTTP endpoint for Yarn Daemons.The following
         values are supported:

http://git-wip-us.apache.org/repos/asf/hadoop/blob/077fcf6a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AMSProcessingChain.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AMSProcessingChain.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AMSProcessingChain.java
new file mode 100644
index 0000000..931b1c8
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AMSProcessingChain.java
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.yarn.ams.ApplicationMasterServiceContext;
+import org.apache.hadoop.yarn.ams.ApplicationMasterServiceProcessor;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+
+import java.io.IOException;
+
+/**
+ * This maintains a chain of {@link ApplicationMasterServiceProcessor}s.
+ */
+class AMSProcessingChain implements ApplicationMasterServiceProcessor {
+
+  private static final Log LOG = LogFactory.getLog(AMSProcessingChain.class);
+
+  private ApplicationMasterServiceProcessor head;
+  private RMContext rmContext;
+
+  /**
+   * This has to be initialized with at-least 1 Processor.
+   * @param rootProcessor Root processor.
+   */
+  AMSProcessingChain(ApplicationMasterServiceProcessor rootProcessor) {
+    if (rootProcessor == null) {
+      throw new YarnRuntimeException("No root ApplicationMasterService" +
+          "Processor specified for the processing chain..");
+    }
+    this.head = rootProcessor;
+  }
+
+  @Override
+  public void init(ApplicationMasterServiceContext amsContext,
+      ApplicationMasterServiceProcessor nextProcessor) {
+    LOG.info("Initializing AMS Processing chain. Root Processor=["
+        + this.head.getClass().getName() + "].");
+    this.rmContext = (RMContext)amsContext;
+    // The head is initialized with a null 'next' processor
+    this.head.init(amsContext, null);
+  }
+
+  /**
+   * Add an processor to the top of the chain.
+   * @param processor ApplicationMasterServiceProcessor
+   */
+  public synchronized void addProcessor(
+      ApplicationMasterServiceProcessor processor) {
+    LOG.info("Adding [" + processor.getClass().getName() + "] tp top of" +
+        " AMS Processing chain. ");
+    processor.init(this.rmContext, this.head);
+    this.head = processor;
+  }
+
+  @Override
+  public void registerApplicationMaster(
+      ApplicationAttemptId applicationAttemptId,
+      RegisterApplicationMasterRequest request,
+      RegisterApplicationMasterResponse resp) throws IOException {
+    this.head.registerApplicationMaster(applicationAttemptId, request, resp);
+  }
+
+  @Override
+  public void allocate(ApplicationAttemptId appAttemptId,
+      AllocateRequest request, AllocateResponse response) throws YarnException {
+    this.head.allocate(appAttemptId, request, response);
+  }
+
+  @Override
+  public void finishApplicationMaster(
+      ApplicationAttemptId applicationAttemptId,
+      FinishApplicationMasterRequest request,
+      FinishApplicationMasterResponse response) {
+    this.head.finishApplicationMaster(applicationAttemptId, request, response);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/077fcf6a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
index fe8b83c..76a1640 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
@@ -88,7 +89,7 @@ public class ApplicationMasterService extends AbstractService implements
   private final ConcurrentMap<ApplicationAttemptId, AllocateResponseLock> responseMap
=
       new ConcurrentHashMap<ApplicationAttemptId, AllocateResponseLock>();
   protected final RMContext rmContext;
-  private final ApplicationMasterServiceProcessor amsProcessor;
+  private final AMSProcessingChain amsProcessingChain;
 
   public ApplicationMasterService(RMContext rmContext,
       YarnScheduler scheduler) {
@@ -101,11 +102,7 @@ public class ApplicationMasterService extends AbstractService implements
     this.amLivelinessMonitor = rmContext.getAMLivelinessMonitor();
     this.rScheduler = scheduler;
     this.rmContext = rmContext;
-    this.amsProcessor = createProcessor();
-  }
-
-  protected ApplicationMasterServiceProcessor createProcessor() {
-    return new DefaultAMSProcessor(rmContext, rScheduler);
+    this.amsProcessingChain = new AMSProcessingChain(new DefaultAMSProcessor());
   }
 
   @Override
@@ -115,6 +112,21 @@ public class ApplicationMasterService extends AbstractService implements
         YarnConfiguration.RM_SCHEDULER_ADDRESS,
         YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS,
         YarnConfiguration.DEFAULT_RM_SCHEDULER_PORT);
+    amsProcessingChain.init(rmContext, null);
+    List<ApplicationMasterServiceProcessor> processors = getProcessorList(conf);
+    if (processors != null) {
+      Collections.reverse(processors);
+      for (ApplicationMasterServiceProcessor p : processors) {
+        this.amsProcessingChain.addProcessor(p);
+      }
+    }
+  }
+
+  protected List<ApplicationMasterServiceProcessor> getProcessorList(
+      Configuration conf) {
+    return conf.getInstances(
+        YarnConfiguration.RM_APPLICATION_MASTER_SERVICE_PROCESSORS,
+        ApplicationMasterServiceProcessor.class);
   }
 
   @Override
@@ -165,6 +177,10 @@ public class ApplicationMasterService extends AbstractService implements
             YarnConfiguration.DEFAULT_RM_SCHEDULER_CLIENT_THREAD_COUNT));
   }
 
+  protected AMSProcessingChain getProcessingChain() {
+    return this.amsProcessingChain;
+  }
+
   @Private
   public InetSocketAddress getBindAddress() {
     return this.masterServiceAddress;
@@ -214,8 +230,12 @@ public class ApplicationMasterService extends AbstractService implements
       lastResponse.setResponseId(0);
       lock.setAllocateResponse(lastResponse);
 
-      return this.amsProcessor.registerApplicationMaster(
-          amrmTokenIdentifier.getApplicationAttemptId(), request);
+      RegisterApplicationMasterResponse response =
+          recordFactory.newRecordInstance(
+              RegisterApplicationMasterResponse.class);
+      this.amsProcessingChain.registerApplicationMaster(
+          amrmTokenIdentifier.getApplicationAttemptId(), request, response);
+      return response;
     }
   }
 
@@ -265,8 +285,11 @@ public class ApplicationMasterService extends AbstractService implements
       }
 
       this.amLivelinessMonitor.receivedPing(applicationAttemptId);
-      return this.amsProcessor.finishApplicationMaster(
-          applicationAttemptId, request);
+      FinishApplicationMasterResponse response =
+          FinishApplicationMasterResponse.newInstance(false);
+      this.amsProcessingChain.finishApplicationMaster(
+          applicationAttemptId, request, response);
+      return response;
     }
   }
 
@@ -346,8 +369,10 @@ public class ApplicationMasterService extends AbstractService implements
         throw new InvalidApplicationMasterRequestException(message);
       }
 
-      AllocateResponse response = this.amsProcessor.allocate(
-          amrmTokenIdentifier.getApplicationAttemptId(), request);
+      AllocateResponse response =
+          recordFactory.newRecordInstance(AllocateResponse.class);
+      this.amsProcessingChain.allocate(
+          amrmTokenIdentifier.getApplicationAttemptId(), request, response);
 
       // update AMRMToken if the token is rolled-up
       MasterKeyData nextMasterKey =

http://git-wip-us.apache.org/repos/asf/hadoop/blob/077fcf6a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DefaultAMSProcessor.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DefaultAMSProcessor.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DefaultAMSProcessor.java
index 6eb1fba..052ec22 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DefaultAMSProcessor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DefaultAMSProcessor.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.server.resourcemanager;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.ams.ApplicationMasterServiceContext;
 import org.apache.hadoop.yarn.ams.ApplicationMasterServiceUtils;
 import org.apache.hadoop.yarn.ams.ApplicationMasterServiceProcessor;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
@@ -81,7 +82,11 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 
-class DefaultAMSProcessor implements ApplicationMasterServiceProcessor {
+/**
+ * This is the default Application Master Service processor. It has be the
+ * last processor in the @{@link AMSProcessingChain}.
+ */
+final class DefaultAMSProcessor implements ApplicationMasterServiceProcessor {
 
   private static final Log LOG = LogFactory.getLog(DefaultAMSProcessor.class);
 
@@ -93,17 +98,19 @@ class DefaultAMSProcessor implements ApplicationMasterServiceProcessor
{
   private final RecordFactory recordFactory =
       RecordFactoryProvider.getRecordFactory(null);
 
-  private final RMContext rmContext;
-  private final YarnScheduler scheduler;
+  private RMContext rmContext;
 
-  DefaultAMSProcessor(RMContext rmContext, YarnScheduler scheduler) {
-    this.rmContext = rmContext;
-    this.scheduler = scheduler;
+  @Override
+  public void init(ApplicationMasterServiceContext amsContext,
+      ApplicationMasterServiceProcessor nextProcessor) {
+    this.rmContext = (RMContext)amsContext;
   }
 
-  public RegisterApplicationMasterResponse registerApplicationMaster(
+  @Override
+  public void registerApplicationMaster(
       ApplicationAttemptId applicationAttemptId,
-      RegisterApplicationMasterRequest request) throws IOException {
+      RegisterApplicationMasterRequest request,
+      RegisterApplicationMasterResponse response) throws IOException {
 
     RMApp app = getRmContext().getRMApps().get(
         applicationAttemptId.getApplicationId());
@@ -116,8 +123,6 @@ class DefaultAMSProcessor implements ApplicationMasterServiceProcessor
{
         RMAuditLogger.AuditConstants.REGISTER_AM,
         "ApplicationMasterService", app.getApplicationId(),
         applicationAttemptId);
-    RegisterApplicationMasterResponse response = recordFactory
-        .newRecordInstance(RegisterApplicationMasterResponse.class);
     response.setMaximumResourceCapability(getScheduler()
         .getMaximumResourceCapability(app.getQueue()));
     response.setApplicationACLs(app.getRMAppAttempt(applicationAttemptId)
@@ -165,11 +170,11 @@ class DefaultAMSProcessor implements ApplicationMasterServiceProcessor
{
 
     response.setSchedulerResourceTypes(getScheduler()
         .getSchedulingResourceTypes());
-    return response;
   }
 
-  public AllocateResponse allocate(ApplicationAttemptId appAttemptId,
-      AllocateRequest request) throws YarnException {
+  @Override
+  public void allocate(ApplicationAttemptId appAttemptId,
+      AllocateRequest request, AllocateResponse response) throws YarnException {
 
     handleProgress(appAttemptId, request);
 
@@ -259,50 +264,46 @@ class DefaultAMSProcessor implements ApplicationMasterServiceProcessor
{
           "blacklistRemovals: " + blacklistRemovals);
     }
     RMAppAttempt appAttempt = app.getRMAppAttempt(appAttemptId);
-    AllocateResponse allocateResponse =
-        recordFactory.newRecordInstance(AllocateResponse.class);
 
     if (allocation.getNMTokens() != null &&
         !allocation.getNMTokens().isEmpty()) {
-      allocateResponse.setNMTokens(allocation.getNMTokens());
+      response.setNMTokens(allocation.getNMTokens());
     }
 
     // Notify the AM of container update errors
     ApplicationMasterServiceUtils.addToUpdateContainerErrors(
-        allocateResponse, updateErrors);
+        response, updateErrors);
 
     // update the response with the deltas of node status changes
-    handleNodeUpdates(app, allocateResponse);
+    handleNodeUpdates(app, response);
 
     ApplicationMasterServiceUtils.addToAllocatedContainers(
-        allocateResponse, allocation.getContainers());
+        response, allocation.getContainers());
 
-    allocateResponse.setCompletedContainersStatuses(appAttempt
+    response.setCompletedContainersStatuses(appAttempt
         .pullJustFinishedContainers());
-    allocateResponse.setAvailableResources(allocation.getResourceLimit());
+    response.setAvailableResources(allocation.getResourceLimit());
 
-    addToContainerUpdates(allocateResponse, allocation,
+    addToContainerUpdates(response, allocation,
         ((AbstractYarnScheduler)getScheduler())
             .getApplicationAttempt(appAttemptId).pullUpdateContainerErrors());
 
-    allocateResponse.setNumClusterNodes(getScheduler().getNumClusterNodes());
+    response.setNumClusterNodes(getScheduler().getNumClusterNodes());
 
     // add collector address for this application
     if (YarnConfiguration.timelineServiceV2Enabled(
         getRmContext().getYarnConfiguration())) {
-      allocateResponse.setCollectorAddr(
+      response.setCollectorAddr(
           getRmContext().getRMApps().get(appAttemptId.getApplicationId())
               .getCollectorAddr());
     }
 
     // add preemption to the allocateResponse message (if any)
-    allocateResponse
-        .setPreemptionMessage(generatePreemptionMessage(allocation));
+    response.setPreemptionMessage(generatePreemptionMessage(allocation));
 
     // Set application priority
-    allocateResponse.setApplicationPriority(app
+    response.setApplicationPriority(app
         .getApplicationPriority());
-    return allocateResponse;
   }
 
   private void handleNodeUpdates(RMApp app, AllocateResponse allocateResponse) {
@@ -351,20 +352,20 @@ class DefaultAMSProcessor implements ApplicationMasterServiceProcessor
{
             .getProgress()));
   }
 
-  public FinishApplicationMasterResponse finishApplicationMaster(
+  @Override
+  public void finishApplicationMaster(
       ApplicationAttemptId applicationAttemptId,
-      FinishApplicationMasterRequest request) {
+      FinishApplicationMasterRequest request,
+      FinishApplicationMasterResponse response) {
     RMApp app =
         getRmContext().getRMApps().get(applicationAttemptId.getApplicationId());
     // For UnmanagedAMs, return true so they don't retry
-    FinishApplicationMasterResponse response =
-        FinishApplicationMasterResponse.newInstance(
+    response.setIsUnregistered(
             app.getApplicationSubmissionContext().getUnmanagedAM());
     getRmContext().getDispatcher().getEventHandler().handle(
         new RMAppAttemptUnregistrationEvent(applicationAttemptId, request
             .getTrackingUrl(), request.getFinalApplicationStatus(), request
             .getDiagnostics()));
-    return response;
   }
 
   private PreemptionMessage generatePreemptionMessage(Allocation allocation){
@@ -424,7 +425,7 @@ class DefaultAMSProcessor implements ApplicationMasterServiceProcessor
{
   }
 
   protected YarnScheduler getScheduler() {
-    return scheduler;
+    return rmContext.getScheduler();
   }
 
   private static void addToContainerUpdates(AllocateResponse allocateResponse,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/077fcf6a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java
index e03d944..3c278de 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java
@@ -23,9 +23,13 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.Server;
+import org.apache.hadoop.yarn.ams.ApplicationMasterServiceContext;
 import org.apache.hadoop.yarn.ams.ApplicationMasterServiceProcessor;
 import org.apache.hadoop.yarn.ams.ApplicationMasterServiceUtils;
 import org.apache.hadoop.yarn.api.ApplicationMasterProtocolPB;
+import org.apache.hadoop.yarn.api.protocolrecords
+    .FinishApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.NodeId;
@@ -101,17 +105,29 @@ public class OpportunisticContainerAllocatorAMService
   private volatile List<RemoteNode> cachedNodes;
   private volatile long lastCacheUpdateTime;
 
-  class OpportunisticAMSProcessor extends DefaultAMSProcessor {
+  class OpportunisticAMSProcessor implements
+      ApplicationMasterServiceProcessor {
 
-    OpportunisticAMSProcessor(RMContext rmContext, YarnScheduler
-        scheduler) {
-      super(rmContext, scheduler);
+    private ApplicationMasterServiceContext context;
+    private ApplicationMasterServiceProcessor nextProcessor;
+
+    private YarnScheduler getScheduler() {
+      return ((RMContext)context).getScheduler();
     }
 
     @Override
-    public RegisterApplicationMasterResponse registerApplicationMaster(
+    public void init(ApplicationMasterServiceContext amsContext,
+        ApplicationMasterServiceProcessor next) {
+      this.context = amsContext;
+      // The AMSProcessingChain guarantees that 'next' is not null.
+      this.nextProcessor = next;
+    }
+
+    @Override
+    public void registerApplicationMaster(
         ApplicationAttemptId applicationAttemptId,
-        RegisterApplicationMasterRequest request) throws IOException {
+        RegisterApplicationMasterRequest request,
+        RegisterApplicationMasterResponse response) throws IOException {
       SchedulerApplicationAttempt appAttempt = ((AbstractYarnScheduler)
           getScheduler()).getApplicationAttempt(applicationAttemptId);
       if (appAttempt.getOpportunisticContainerContext() == null) {
@@ -135,12 +151,14 @@ public class OpportunisticContainerAllocatorAMService
             tokenExpiryInterval);
         appAttempt.setOpportunisticContainerContext(opCtx);
       }
-      return super.registerApplicationMaster(applicationAttemptId, request);
+      nextProcessor.registerApplicationMaster(
+          applicationAttemptId, request, response);
     }
 
     @Override
-    public AllocateResponse allocate(ApplicationAttemptId appAttemptId,
-        AllocateRequest request) throws YarnException {
+    public void allocate(ApplicationAttemptId appAttemptId,
+        AllocateRequest request, AllocateResponse response)
+        throws YarnException {
       // Partition requests to GUARANTEED and OPPORTUNISTIC.
       OpportunisticContainerAllocator.PartitionedResourceRequests
           partitionedAsks =
@@ -165,17 +183,22 @@ public class OpportunisticContainerAllocatorAMService
       if (!oppContainers.isEmpty()) {
         handleNewContainers(oppContainers, false);
         appAttempt.updateNMTokens(oppContainers);
+        ApplicationMasterServiceUtils.addToAllocatedContainers(
+            response, oppContainers);
       }
 
       // Allocate GUARANTEED containers.
       request.setAskList(partitionedAsks.getGuaranteed());
+      nextProcessor.allocate(appAttemptId, request, response);
+    }
 
-      AllocateResponse response = super.allocate(appAttemptId, request);
-      if (!oppContainers.isEmpty()) {
-        ApplicationMasterServiceUtils.addToAllocatedContainers(
-            response, oppContainers);
-      }
-      return response;
+    @Override
+    public void finishApplicationMaster(
+        ApplicationAttemptId applicationAttemptId,
+        FinishApplicationMasterRequest request,
+        FinishApplicationMasterResponse response) {
+      nextProcessor.finishApplicationMaster(applicationAttemptId,
+          request, response);
     }
   }
 
@@ -237,11 +260,6 @@ public class OpportunisticContainerAllocatorAMService
   }
 
   @Override
-  protected ApplicationMasterServiceProcessor createProcessor() {
-    return new OpportunisticAMSProcessor(rmContext, rmContext.getScheduler());
-  }
-
-  @Override
   public Server getServer(YarnRPC rpc, Configuration serverConf,
       InetSocketAddress addr, AMRMTokenSecretManager secretManager) {
     if (YarnConfiguration.isDistSchedulingEnabled(serverConf)) {
@@ -262,6 +280,15 @@ public class OpportunisticContainerAllocatorAMService
   }
 
   @Override
+  protected List<ApplicationMasterServiceProcessor> getProcessorList(
+      Configuration conf) {
+    List<ApplicationMasterServiceProcessor> retVal =
+        super.getProcessorList(conf);
+    retVal.add(new OpportunisticAMSProcessor());
+    return retVal;
+  }
+
+  @Override
   public RegisterDistributedSchedulingAMResponse
       registerApplicationMasterForDistributedScheduling(
       RegisterApplicationMasterRequest request) throws YarnException,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/077fcf6a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java
index ba6b915..0ea9516 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java
@@ -23,6 +23,7 @@ import java.util.concurrent.ConcurrentMap;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
+import org.apache.hadoop.yarn.ams.ApplicationMasterServiceContext;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.conf.ConfigurationProvider;
@@ -53,7 +54,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.timelineservice.RMTimelineC
 /**
  * Context of the ResourceManager.
  */
-public interface RMContext {
+public interface RMContext extends ApplicationMasterServiceContext {
 
   Dispatcher getDispatcher();
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/077fcf6a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java
index 18c49bd..85a36e7 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java
@@ -20,20 +20,29 @@ package org.apache.hadoop.yarn.server.resourcemanager;
 
 import static java.lang.Thread.sleep;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.ams.ApplicationMasterServiceContext;
+import org.apache.hadoop.yarn.ams.ApplicationMasterServiceProcessor;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.protocolrecords
+    .RegisterApplicationMasterRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.AllocateRequestPBImpl;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerUpdateType;
@@ -44,6 +53,7 @@ import org.apache.hadoop.yarn.api.records.UpdateContainerRequest;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.ApplicationMasterNotRegisteredException;
 import org.apache.hadoop.yarn.exceptions.InvalidContainerReleaseException;
+import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.proto.YarnServiceProtos.SchedulerResourceTypes;
 import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
@@ -61,7 +71,7 @@ import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
 import org.apache.hadoop.yarn.util.resource.Resources;
 import org.junit.Assert;
-import org.junit.BeforeClass;
+import org.junit.Before;
 import org.junit.Test;
 
 public class TestApplicationMasterService {
@@ -71,13 +81,160 @@ public class TestApplicationMasterService {
   private final int GB = 1024;
   private static YarnConfiguration conf;
 
-  @BeforeClass
-  public static void setup() {
+  private static AtomicInteger beforeRegCount = new AtomicInteger(0);
+  private static AtomicInteger afterRegCount = new AtomicInteger(0);
+  private static AtomicInteger beforeAllocCount = new AtomicInteger(0);
+  private static AtomicInteger afterAllocCount = new AtomicInteger(0);
+  private static AtomicInteger beforeFinishCount = new AtomicInteger(0);
+  private static AtomicInteger afterFinishCount = new AtomicInteger(0);
+  private static AtomicInteger initCount = new AtomicInteger(0);
+
+  static class TestInterceptor1 implements
+      ApplicationMasterServiceProcessor {
+
+    private ApplicationMasterServiceProcessor nextProcessor;
+
+    @Override
+    public void init(ApplicationMasterServiceContext amsContext,
+        ApplicationMasterServiceProcessor next) {
+      initCount.incrementAndGet();
+      this.nextProcessor = next;
+    }
+
+    @Override
+    public void registerApplicationMaster(ApplicationAttemptId
+        applicationAttemptId, RegisterApplicationMasterRequest request,
+        RegisterApplicationMasterResponse response) throws IOException {
+      nextProcessor.registerApplicationMaster(
+          applicationAttemptId, request, response);
+    }
+
+    @Override
+    public void allocate(ApplicationAttemptId appAttemptId,
+        AllocateRequest request,
+        AllocateResponse response) throws YarnException {
+      beforeAllocCount.incrementAndGet();
+      nextProcessor.allocate(appAttemptId, request, response);
+      afterAllocCount.incrementAndGet();
+    }
+
+    @Override
+    public void finishApplicationMaster(
+        ApplicationAttemptId applicationAttemptId,
+        FinishApplicationMasterRequest request,
+        FinishApplicationMasterResponse response) {
+      beforeFinishCount.incrementAndGet();
+      afterFinishCount.incrementAndGet();
+    }
+  }
+
+  static class TestInterceptor2 implements
+      ApplicationMasterServiceProcessor {
+
+    private ApplicationMasterServiceProcessor nextProcessor;
+
+    @Override
+    public void init(ApplicationMasterServiceContext amsContext,
+        ApplicationMasterServiceProcessor next) {
+      initCount.incrementAndGet();
+      this.nextProcessor = next;
+    }
+
+    @Override
+    public void registerApplicationMaster(
+        ApplicationAttemptId applicationAttemptId,
+        RegisterApplicationMasterRequest request,
+        RegisterApplicationMasterResponse response) throws IOException {
+      beforeRegCount.incrementAndGet();
+      nextProcessor.registerApplicationMaster(applicationAttemptId,
+              request, response);
+      afterRegCount.incrementAndGet();
+    }
+
+    @Override
+    public void allocate(ApplicationAttemptId appAttemptId,
+        AllocateRequest request, AllocateResponse response)
+        throws YarnException {
+      beforeAllocCount.incrementAndGet();
+      nextProcessor.allocate(appAttemptId, request, response);
+      afterAllocCount.incrementAndGet();
+    }
+
+    @Override
+    public void finishApplicationMaster(
+        ApplicationAttemptId applicationAttemptId,
+        FinishApplicationMasterRequest request,
+        FinishApplicationMasterResponse response) {
+      beforeFinishCount.incrementAndGet();
+      nextProcessor.finishApplicationMaster(
+          applicationAttemptId, request, response);
+      afterFinishCount.incrementAndGet();
+    }
+  }
+
+  @Before
+  public void setup() {
     conf = new YarnConfiguration();
     conf.setClass(YarnConfiguration.RM_SCHEDULER, FifoScheduler.class,
       ResourceScheduler.class);
   }
 
+  @Test(timeout = 300000)
+  public void testApplicationMasterInterceptor() throws Exception {
+    conf.set(YarnConfiguration.RM_APPLICATION_MASTER_SERVICE_PROCESSORS,
+        TestInterceptor1.class.getName() + ","
+            + TestInterceptor2.class.getName());
+    MockRM rm = new MockRM(conf);
+    rm.start();
+
+    // Register node1
+    MockNM nm1 = rm.registerNode("127.0.0.1:1234", 6 * GB);
+
+    // Submit an application
+    RMApp app1 = rm.submitApp(2048);
+
+    // kick the scheduling
+    nm1.nodeHeartbeat(true);
+    RMAppAttempt attempt1 = app1.getCurrentAppAttempt();
+    MockAM am1 = rm.sendAMLaunched(attempt1.getAppAttemptId());
+    am1.registerAppAttempt();
+    int allocCount = 0;
+
+    am1.addRequests(new String[] {"127.0.0.1"}, GB, 1, 1);
+    AllocateResponse alloc1Response = am1.schedule(); // send the request
+    allocCount++;
+
+    // kick the scheduler
+    nm1.nodeHeartbeat(true);
+    while (alloc1Response.getAllocatedContainers().size() < 1) {
+      LOG.info("Waiting for containers to be created for app 1...");
+      sleep(1000);
+      alloc1Response = am1.schedule();
+      allocCount++;
+    }
+
+    // assert RMIdentifer is set properly in allocated containers
+    Container allocatedContainer =
+        alloc1Response.getAllocatedContainers().get(0);
+    ContainerTokenIdentifier tokenId =
+        BuilderUtils.newContainerTokenIdentifier(allocatedContainer
+            .getContainerToken());
+    am1.unregisterAppAttempt();
+
+    Assert.assertEquals(1, beforeRegCount.get());
+    Assert.assertEquals(1, afterRegCount.get());
+
+    // The allocate calls should be incremented twice
+    Assert.assertEquals(allocCount * 2, beforeAllocCount.get());
+    Assert.assertEquals(allocCount * 2, afterAllocCount.get());
+
+    // Finish should only be called once, since the FirstInterceptor
+    // does not forward the call.
+    Assert.assertEquals(1, beforeFinishCount.get());
+    Assert.assertEquals(1, afterFinishCount.get());
+    rm.stop();
+  }
+
   @Test(timeout = 3000000)
   public void testRMIdentifierOnContainerAllocation() throws Exception {
     MockRM rm = new MockRM(conf);


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message