hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ey...@apache.org
Subject [2/2] hadoop git commit: YARN-8298. Added express upgrade for YARN service. Contributed by Chandni Singh
Date Tue, 21 Aug 2018 23:53:43 GMT
YARN-8298.  Added express upgrade for YARN service.
            Contributed by Chandni Singh


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/e557c6bd
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/e557c6bd
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/e557c6bd

Branch: refs/heads/trunk
Commit: e557c6bd8de2811a561210f672f47b4d07a9d5c6
Parents: 9c3fc3e
Author: Eric Yang <eyang@apache.org>
Authored: Tue Aug 21 19:49:26 2018 -0400
Committer: Eric Yang <eyang@apache.org>
Committed: Tue Aug 21 19:49:26 2018 -0400

----------------------------------------------------------------------
 .../yarn/service/client/ApiServiceClient.java   |  20 +
 .../hadoop/yarn/service/webapp/ApiServer.java   |  12 +-
 .../hadoop/yarn/service/ClientAMService.java    |   2 +-
 .../hadoop/yarn/service/ServiceEvent.java       |  25 +
 .../hadoop/yarn/service/ServiceManager.java     | 127 +++-
 .../hadoop/yarn/service/ServiceScheduler.java   |  15 +-
 .../yarn/service/api/records/ServiceState.java  |   2 +-
 .../yarn/service/client/ServiceClient.java      | 100 ++-
 .../yarn/service/component/Component.java       |  16 +-
 .../yarn/service/component/ComponentEvent.java  |  10 +
 .../component/instance/ComponentInstance.java   |   5 +
 .../yarn/service/utils/ServiceApiUtil.java      |  44 ++
 .../src/main/proto/ClientAMProtocol.proto       |   1 +
 .../hadoop/yarn/service/TestServiceApiUtil.java | 653 ----------------
 .../hadoop/yarn/service/TestServiceManager.java | 299 +++++---
 .../yarn/service/TestYarnNativeServices.java    |  35 +
 .../yarn/service/utils/TestServiceApiUtil.java  | 743 +++++++++++++++++++
 .../hadoop/yarn/client/cli/ApplicationCLI.java  |  20 +-
 .../hadoop/yarn/client/cli/TestYarnCLI.java     |   4 +
 .../hadoop/yarn/client/api/AppAdminClient.java  |  12 +
 20 files changed, 1308 insertions(+), 837 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/e557c6bd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/client/ApiServiceClient.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/client/ApiServiceClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/client/ApiServiceClient.java
index 9229446..ca6cc50 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/client/ApiServiceClient.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/client/ApiServiceClient.java
@@ -601,6 +601,26 @@ public class ApiServiceClient extends AppAdminClient {
   }
 
   @Override
+  public int actionUpgradeExpress(String appName, File path)
+      throws IOException, YarnException {
+    int result;
+    try {
+      Service service =
+          loadAppJsonFromLocalFS(path.getAbsolutePath(), appName, null, null);
+      service.setState(ServiceState.EXPRESS_UPGRADING);
+      String buffer = jsonSerDeser.toJson(service);
+      LOG.info("Upgrade in progress. Please wait..");
+      ClientResponse response = getApiClient(getServicePath(appName))
+          .put(ClientResponse.class, buffer);
+      result = processResponse(response);
+    } catch (Exception e) {
+      LOG.error("Failed to upgrade application: ", e);
+      result = EXIT_EXCEPTION_THROWN;
+    }
+    return result;
+  }
+
+  @Override
   public int initiateUpgrade(String appName,
       String fileName, boolean autoFinalize) throws IOException, YarnException {
     int result;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e557c6bd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/webapp/ApiServer.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/webapp/ApiServer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/webapp/ApiServer.java
index 4db0ac8..cd6f0d7 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/webapp/ApiServer.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/webapp/ApiServer.java
@@ -440,7 +440,8 @@ public class ApiServer {
       if (updateServiceData.getState() != null && (
           updateServiceData.getState() == ServiceState.UPGRADING ||
               updateServiceData.getState() ==
-                  ServiceState.UPGRADING_AUTO_FINALIZE)) {
+                  ServiceState.UPGRADING_AUTO_FINALIZE) ||
+          updateServiceData.getState() == ServiceState.EXPRESS_UPGRADING) {
         return upgradeService(updateServiceData, ugi);
       }
 
@@ -690,7 +691,11 @@ public class ApiServer {
       ServiceClient sc = getServiceClient();
       sc.init(YARN_CONFIG);
       sc.start();
-      sc.initiateUpgrade(service);
+      if (service.getState().equals(ServiceState.EXPRESS_UPGRADING)) {
+        sc.actionUpgradeExpress(service);
+      } else {
+        sc.initiateUpgrade(service);
+      }
       sc.close();
       return null;
     });
@@ -706,7 +711,8 @@ public class ApiServer {
       String serviceName, Set<String> compNames) throws YarnException,
       IOException, InterruptedException {
     Service service = getServiceFromClient(ugi, serviceName);
-    if (service.getState() != ServiceState.UPGRADING) {
+    if (!service.getState().equals(ServiceState.UPGRADING) &&
+        !service.getState().equals(ServiceState.UPGRADING_AUTO_FINALIZE)) {
       throw new YarnException(
           String.format("The upgrade of service %s has not been initiated.",
               service.getName()));

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e557c6bd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ClientAMService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ClientAMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ClientAMService.java
index 5bf1833..2ef8f7e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ClientAMService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ClientAMService.java
@@ -166,7 +166,7 @@ public class ClientAMService extends AbstractService
       LOG.info("Upgrading service to version {} by {}", request.getVersion(),
           UserGroupInformation.getCurrentUser());
       context.getServiceManager().processUpgradeRequest(request.getVersion(),
-          request.getAutoFinalize());
+          request.getAutoFinalize(), request.getExpressUpgrade());
       return UpgradeServiceResponseProto.newBuilder().build();
     } catch (Exception ex) {
       return UpgradeServiceResponseProto.newBuilder().setError(ex.getMessage())

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e557c6bd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceEvent.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceEvent.java
index 0196be2..3a55472 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceEvent.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceEvent.java
@@ -19,6 +19,9 @@
 package org.apache.hadoop.yarn.service;
 
 import org.apache.hadoop.yarn.event.AbstractEvent;
+import org.apache.hadoop.yarn.service.api.records.Component;
+
+import java.util.Queue;
 
 /**
  * Events are handled by {@link ServiceManager} to manage the service
@@ -29,6 +32,8 @@ public class ServiceEvent extends AbstractEvent<ServiceEventType> {
   private final ServiceEventType type;
   private String version;
   private boolean autoFinalize;
+  private boolean expressUpgrade;
+  private Queue<Component> compsToUpgradeInOrder;
 
   public ServiceEvent(ServiceEventType serviceEventType) {
     super(serviceEventType);
@@ -56,4 +61,24 @@ public class ServiceEvent extends AbstractEvent<ServiceEventType> {
     this.autoFinalize = autoFinalize;
     return this;
   }
+
+  public boolean isExpressUpgrade() {
+    return expressUpgrade;
+  }
+
+  public ServiceEvent setExpressUpgrade(boolean expressUpgrade) {
+    this.expressUpgrade = expressUpgrade;
+    return this;
+  }
+
+  public Queue<Component> getCompsToUpgradeInOrder() {
+    return compsToUpgradeInOrder;
+  }
+
+  public ServiceEvent setCompsToUpgradeInOrder(
+      Queue<Component> compsToUpgradeInOrder) {
+    this.compsToUpgradeInOrder = compsToUpgradeInOrder;
+    return this;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e557c6bd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceManager.java
index 05ecb3f..04454b1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceManager.java
@@ -22,6 +22,7 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import org.apache.hadoop.yarn.event.AsyncDispatcher;
 import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.service.api.records.ComponentState;
 import org.apache.hadoop.yarn.service.api.records.Service;
 import org.apache.hadoop.yarn.service.api.records.ServiceState;
 import org.apache.hadoop.yarn.service.component.Component;
@@ -40,8 +41,11 @@ import org.slf4j.LoggerFactory;
 import java.io.IOException;
 import java.text.MessageFormat;
 import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Queue;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import static org.apache.hadoop.yarn.service.utils.ServiceApiUtil.jsonSerDeser;
@@ -67,6 +71,8 @@ public class ServiceManager implements EventHandler<ServiceEvent> {
   private final SliderFileSystem fs;
 
   private String upgradeVersion;
+  private Queue<org.apache.hadoop.yarn.service.api.records
+        .Component> compsToUpgradeInOrder;
 
   private static final StateMachineFactory<ServiceManager, State,
       ServiceEventType, ServiceEvent> STATE_MACHINE_FACTORY =
@@ -141,14 +147,20 @@ public class ServiceManager implements EventHandler<ServiceEvent> {
     @Override
     public State transition(ServiceManager serviceManager,
         ServiceEvent event) {
+      serviceManager.upgradeVersion = event.getVersion();
       try {
-        if (!event.isAutoFinalize()) {
-          serviceManager.serviceSpec.setState(ServiceState.UPGRADING);
+        if (event.isExpressUpgrade()) {
+          serviceManager.serviceSpec.setState(ServiceState.EXPRESS_UPGRADING);
+          serviceManager.compsToUpgradeInOrder = event
+              .getCompsToUpgradeInOrder();
+          serviceManager.upgradeNextCompIfAny();
+        } else if (event.isAutoFinalize()) {
+          serviceManager.serviceSpec.setState(ServiceState
+              .UPGRADING_AUTO_FINALIZE);
         } else {
           serviceManager.serviceSpec.setState(
-              ServiceState.UPGRADING_AUTO_FINALIZE);
+              ServiceState.UPGRADING);
         }
-        serviceManager.upgradeVersion = event.getVersion();
         return State.UPGRADING;
       } catch (Throwable e) {
         LOG.error("[SERVICE]: Upgrade to version {} failed", event.getVersion(),
@@ -169,8 +181,19 @@ public class ServiceManager implements EventHandler<ServiceEvent> {
       if (currState.equals(ServiceState.STABLE)) {
         return State.STABLE;
       }
+      if (currState.equals(ServiceState.EXPRESS_UPGRADING)) {
+        org.apache.hadoop.yarn.service.api.records.Component component =
+            serviceManager.compsToUpgradeInOrder.peek();
+        if (!component.getState().equals(ComponentState.NEEDS_UPGRADE) &&
+            !component.getState().equals(ComponentState.UPGRADING)) {
+          serviceManager.compsToUpgradeInOrder.remove();
+        }
+        serviceManager.upgradeNextCompIfAny();
+      }
       if (currState.equals(ServiceState.UPGRADING_AUTO_FINALIZE) ||
-          event.getType().equals(ServiceEventType.START)) {
+          event.getType().equals(ServiceEventType.START) ||
+          (currState.equals(ServiceState.EXPRESS_UPGRADING) &&
+              serviceManager.compsToUpgradeInOrder.isEmpty())) {
         ServiceState targetState = checkIfStable(serviceManager.serviceSpec);
         if (targetState.equals(ServiceState.STABLE)) {
           if (serviceManager.finalizeUpgrade()) {
@@ -184,6 +207,19 @@ public class ServiceManager implements EventHandler<ServiceEvent> {
     }
   }
 
+  private void upgradeNextCompIfAny() {
+    if (!compsToUpgradeInOrder.isEmpty()) {
+      org.apache.hadoop.yarn.service.api.records.Component component =
+          compsToUpgradeInOrder.peek();
+
+      ComponentEvent needUpgradeEvent = new ComponentEvent(
+          component.getName(), ComponentEventType.UPGRADE).setTargetSpec(
+          component).setUpgradeVersion(upgradeVersion).setExpressUpgrade(true);
+      context.scheduler.getDispatcher().getEventHandler().handle(
+          needUpgradeEvent);
+    }
+  }
+
   /**
    * @return whether finalization of upgrade was successful.
    */
@@ -250,23 +286,18 @@ public class ServiceManager implements EventHandler<ServiceEvent> {
   }
 
   void processUpgradeRequest(String upgradeVersion,
-      boolean autoFinalize) throws IOException {
+      boolean autoFinalize, boolean expressUpgrade) throws IOException {
     Service targetSpec = ServiceApiUtil.loadServiceUpgrade(
         context.fs, context.service.getName(), upgradeVersion);
 
     List<org.apache.hadoop.yarn.service.api.records.Component>
-        compsThatNeedUpgrade = componentsFinder.
+        compsNeedUpgradeList = componentsFinder.
         findTargetComponentSpecs(context.service, targetSpec);
-    ServiceEvent event = new ServiceEvent(ServiceEventType.UPGRADE)
-        .setVersion(upgradeVersion)
-        .setAutoFinalize(autoFinalize);
-    context.scheduler.getDispatcher().getEventHandler().handle(event);
 
-    if (compsThatNeedUpgrade != null && !compsThatNeedUpgrade.isEmpty()) {
-      if (autoFinalize) {
-        event.setAutoFinalize(true);
-      }
-      compsThatNeedUpgrade.forEach(component -> {
+    // remove all components from need upgrade list if there restart policy
+    // doesn't all upgrade.
+    if (compsNeedUpgradeList != null) {
+      compsNeedUpgradeList.removeIf(component -> {
         org.apache.hadoop.yarn.service.api.records.Component.RestartPolicyEnum
             restartPolicy = component.getRestartPolicy();
 
@@ -274,25 +305,65 @@ public class ServiceManager implements EventHandler<ServiceEvent> {
             Component.getRestartPolicyHandler(restartPolicy);
         // Do not allow upgrades for components which have NEVER/ON_FAILURE
         // restart policy
-        if (restartPolicyHandler.allowUpgrades()) {
+        if (!restartPolicyHandler.allowUpgrades()) {
+          LOG.info("The component {} has a restart policy that doesnt " +
+                  "allow upgrades {} ", component.getName(),
+              component.getRestartPolicy().toString());
+          return true;
+        }
+
+        return false;
+      });
+    }
+
+    ServiceEvent event = new ServiceEvent(ServiceEventType.UPGRADE)
+        .setVersion(upgradeVersion)
+        .setAutoFinalize(autoFinalize)
+        .setExpressUpgrade(expressUpgrade);
+
+    if (expressUpgrade) {
+      // In case of express upgrade  components need to be upgraded in order.
+      // Once the service manager gets notified that a component finished
+      // upgrading, it then issues event to upgrade the next component.
+      Map<String, org.apache.hadoop.yarn.service.api.records.Component>
+          compsNeedUpgradeByName = new HashMap<>();
+      if (compsNeedUpgradeList != null) {
+        compsNeedUpgradeList.forEach(component ->
+            compsNeedUpgradeByName.put(component.getName(), component));
+      }
+      List<String> resolvedComps = ServiceApiUtil
+          .resolveCompsDependency(targetSpec);
+
+      Queue<org.apache.hadoop.yarn.service.api.records.Component>
+          orderedCompUpgrade = new LinkedList<>();
+      resolvedComps.forEach(compName -> {
+        org.apache.hadoop.yarn.service.api.records.Component component =
+            compsNeedUpgradeByName.get(compName);
+        if (component != null ) {
+          orderedCompUpgrade.add(component);
+        }
+      });
+      event.setCompsToUpgradeInOrder(orderedCompUpgrade);
+    }
+
+    context.scheduler.getDispatcher().getEventHandler().handle(event);
+
+    if (compsNeedUpgradeList != null && !compsNeedUpgradeList.isEmpty()) {
+      if (!expressUpgrade) {
+        compsNeedUpgradeList.forEach(component -> {
           ComponentEvent needUpgradeEvent = new ComponentEvent(
               component.getName(), ComponentEventType.UPGRADE).setTargetSpec(
               component).setUpgradeVersion(event.getVersion());
           context.scheduler.getDispatcher().getEventHandler().handle(
               needUpgradeEvent);
-        } else {
-          LOG.info("The component {} has a restart "
-              + "policy that doesnt allow upgrades {} ", component.getName(),
-              component.getRestartPolicy().toString());
-        }
-      });
-    } else {
+
+        });
+      }
+    }  else if (autoFinalize) {
       // nothing to upgrade if upgrade auto finalize is requested, trigger a
       // state check.
-      if (autoFinalize) {
-        context.scheduler.getDispatcher().getEventHandler().handle(
-            new ServiceEvent(ServiceEventType.CHECK_STABLE));
-      }
+      context.scheduler.getDispatcher().getEventHandler().handle(
+          new ServiceEvent(ServiceEventType.CHECK_STABLE));
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e557c6bd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java
index 0801ad0..384659f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java
@@ -219,7 +219,7 @@ public class ServiceScheduler extends CompositeService {
     nmClient.getClient().cleanupRunningContainersOnStop(false);
     addIfService(nmClient);
 
-    dispatcher = new AsyncDispatcher("Component  dispatcher");
+    dispatcher = createAsyncDispatcher();
     dispatcher.register(ServiceEventType.class, new ServiceEventHandler());
     dispatcher.register(ComponentEventType.class,
         new ComponentEventHandler());
@@ -253,6 +253,9 @@ public class ServiceScheduler extends CompositeService {
         YarnServiceConf.CONTAINER_RECOVERY_TIMEOUT_MS,
         YarnServiceConf.DEFAULT_CONTAINER_RECOVERY_TIMEOUT_MS,
         app.getConfiguration(), getConfig());
+
+    serviceManager = createServiceManager();
+    context.setServiceManager(serviceManager);
   }
 
   protected YarnRegistryViewForProviders createYarnRegistryOperations(
@@ -262,6 +265,14 @@ public class ServiceScheduler extends CompositeService {
         context.attemptId);
   }
 
+  protected ServiceManager createServiceManager() {
+    return new ServiceManager(context);
+  }
+
+  protected AsyncDispatcher createAsyncDispatcher() {
+    return new AsyncDispatcher("Component  dispatcher");
+  }
+
   protected NMClientAsync createNMClient() {
     return NMClientAsync.createNMClientAsync(new NMClientCallback());
   }
@@ -344,8 +355,6 @@ public class ServiceScheduler extends CompositeService {
 
     // Since AM has been started and registered, the service is in STARTED state
     app.setState(ServiceState.STARTED);
-    serviceManager = new ServiceManager(context);
-    context.setServiceManager(serviceManager);
 
     // recover components based on containers sent from RM
     recoverComponents(response);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e557c6bd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ServiceState.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ServiceState.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ServiceState.java
index b6ae38b..0b3c037 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ServiceState.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ServiceState.java
@@ -30,5 +30,5 @@ import org.apache.hadoop.classification.InterfaceStability;
 @javax.annotation.Generated(value = "class io.swagger.codegen.languages.JavaClientCodegen", date = "2016-06-02T08:15:05.615-07:00")
 public enum ServiceState {
   ACCEPTED, STARTED, STABLE, STOPPED, FAILED, FLEX, UPGRADING,
-  UPGRADING_AUTO_FINALIZE;
+  UPGRADING_AUTO_FINALIZE, EXPRESS_UPGRADING;
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e557c6bd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/client/ServiceClient.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/client/ServiceClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/client/ServiceClient.java
index 5668d9f..a27ed87 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/client/ServiceClient.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/client/ServiceClient.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.yarn.service.client;
 
 import com.google.common.annotations.VisibleForTesting;
+
 import org.apache.commons.lang3.StringUtils;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
@@ -215,48 +216,31 @@ public class ServiceClient extends AppAdminClient implements SliderExitCodes,
     return EXIT_SUCCESS;
   }
 
-  @Override
-  public int initiateUpgrade(String appName, String fileName,
-      boolean autoFinalize)
-      throws IOException, YarnException {
-    Service upgradeService = loadAppJsonFromLocalFS(fileName, appName,
-        null, null);
-    if (autoFinalize) {
-      upgradeService.setState(ServiceState.UPGRADING_AUTO_FINALIZE);
-    } else {
-      upgradeService.setState(ServiceState.UPGRADING);
-    }
-    return initiateUpgrade(upgradeService);
-  }
-
-  public int initiateUpgrade(Service service) throws YarnException,
-      IOException {
+  private ApplicationReport upgradePrecheck(Service service)
+      throws YarnException, IOException {
     boolean upgradeEnabled = getConfig().getBoolean(
-        YARN_SERVICE_UPGRADE_ENABLED,
-        YARN_SERVICE_UPGRADE_ENABLED_DEFAULT);
+        YARN_SERVICE_UPGRADE_ENABLED, YARN_SERVICE_UPGRADE_ENABLED_DEFAULT);
     if (!upgradeEnabled) {
       throw new YarnException(ErrorStrings.SERVICE_UPGRADE_DISABLED);
     }
-    Service persistedService =
-        ServiceApiUtil.loadService(fs, service.getName());
+    Service persistedService = ServiceApiUtil.loadService(fs,
+        service.getName());
     if (!StringUtils.isEmpty(persistedService.getId())) {
-      cachedAppInfo.put(persistedService.getName(), new AppInfo(
-          ApplicationId.fromString(persistedService.getId()),
-          persistedService.getKerberosPrincipal().getPrincipalName()));
+      cachedAppInfo.put(persistedService.getName(),
+          new AppInfo(ApplicationId.fromString(persistedService.getId()),
+              persistedService.getKerberosPrincipal().getPrincipalName()));
     }
 
     if (persistedService.getVersion().equals(service.getVersion())) {
-      String message =
-          service.getName() + " is already at version " + service.getVersion()
-              + ". There is nothing to upgrade.";
+      String message = service.getName() + " is already at version "
+          + service.getVersion() + ". There is nothing to upgrade.";
       LOG.error(message);
       throw new YarnException(message);
     }
 
     Service liveService = getStatus(service.getName());
     if (!liveService.getState().equals(ServiceState.STABLE)) {
-      String message = service.getName() + " is at " +
-          liveService.getState()
+      String message = service.getName() + " is at " + liveService.getState()
           + " state and upgrade can only be initiated when service is STABLE.";
       LOG.error(message);
       throw new YarnException(message);
@@ -266,11 +250,67 @@ public class ServiceClient extends AppAdminClient implements SliderExitCodes,
     ServiceApiUtil.validateAndResolveService(service, fs, getConfig());
     ServiceApiUtil.createDirAndPersistApp(fs, serviceUpgradeDir, service);
 
-    ApplicationReport appReport =
-        yarnClient.getApplicationReport(getAppId(service.getName()));
+    ApplicationReport appReport = yarnClient
+        .getApplicationReport(getAppId(service.getName()));
     if (StringUtils.isEmpty(appReport.getHost())) {
       throw new YarnException(service.getName() + " AM hostname is empty");
     }
+    return appReport;
+  }
+
+  @Override
+  public int actionUpgradeExpress(String appName, File path)
+      throws IOException, YarnException {
+    Service service =
+        loadAppJsonFromLocalFS(path.getAbsolutePath(), appName, null, null);
+    service.setState(ServiceState.UPGRADING_AUTO_FINALIZE);
+    actionUpgradeExpress(service);
+    return EXIT_SUCCESS;
+  }
+
+  public int actionUpgradeExpress(Service service) throws YarnException,
+      IOException {
+    ApplicationReport appReport = upgradePrecheck(service);
+    ClientAMProtocol proxy = createAMProxy(service.getName(), appReport);
+    UpgradeServiceRequestProto.Builder requestBuilder =
+        UpgradeServiceRequestProto.newBuilder();
+    requestBuilder.setVersion(service.getVersion());
+    if (service.getState().equals(ServiceState.UPGRADING_AUTO_FINALIZE)) {
+      requestBuilder.setAutoFinalize(true);
+    }
+    if (service.getState().equals(ServiceState.EXPRESS_UPGRADING)) {
+      requestBuilder.setExpressUpgrade(true);
+      requestBuilder.setAutoFinalize(true);
+    }
+    UpgradeServiceResponseProto responseProto = proxy.upgrade(
+        requestBuilder.build());
+    if (responseProto.hasError()) {
+      LOG.error("Service {} express upgrade to version {} failed because {}",
+          service.getName(), service.getVersion(), responseProto.getError());
+      throw new YarnException("Failed to express upgrade service " +
+          service.getName() + " to version " + service.getVersion() +
+          " because " + responseProto.getError());
+    }
+    return EXIT_SUCCESS;
+  }
+
+  @Override
+  public int initiateUpgrade(String appName, String fileName,
+      boolean autoFinalize)
+      throws IOException, YarnException {
+    Service upgradeService = loadAppJsonFromLocalFS(fileName, appName,
+        null, null);
+    if (autoFinalize) {
+      upgradeService.setState(ServiceState.UPGRADING_AUTO_FINALIZE);
+    } else {
+      upgradeService.setState(ServiceState.UPGRADING);
+    }
+    return initiateUpgrade(upgradeService);
+  }
+
+  public int initiateUpgrade(Service service) throws YarnException,
+      IOException {
+    ApplicationReport appReport = upgradePrecheck(service);
     ClientAMProtocol proxy = createAMProxy(service.getName(), appReport);
 
     UpgradeServiceRequestProto.Builder requestBuilder =

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e557c6bd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java
index 41a2fcd..acf3404 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.service.component;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.ExecutionType;
 import static org.apache.hadoop.yarn.service.api.records.Component
@@ -43,6 +44,7 @@ import org.apache.hadoop.yarn.service.ServiceEventType;
 import org.apache.hadoop.yarn.service.api.records.ContainerState;
 import org.apache.hadoop.yarn.service.api.records.ResourceInformation;
 import org.apache.hadoop.yarn.service.component.instance.ComponentInstance;
+import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEventType;
 import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceId;
 import org.apache.hadoop.yarn.service.ContainerFailureTracker;
 import org.apache.hadoop.yarn.service.ServiceContext;
@@ -546,13 +548,21 @@ public class Component implements EventHandler<ComponentEvent> {
     @Override
     public void transition(Component component, ComponentEvent event) {
       component.upgradeInProgress.set(true);
+      component.upgradeEvent = event;
       component.componentSpec.setState(org.apache.hadoop.yarn.service.api.
           records.ComponentState.NEEDS_UPGRADE);
       component.numContainersThatNeedUpgrade.set(
           component.componentSpec.getNumberOfContainers());
-      component.componentSpec.getContainers().forEach(container ->
-          container.setState(ContainerState.NEEDS_UPGRADE));
-      component.upgradeEvent = event;
+      component.componentSpec.getContainers().forEach(container -> {
+        container.setState(ContainerState.NEEDS_UPGRADE);
+        if (event.isExpressUpgrade()) {
+          ComponentInstanceEvent upgradeEvent = new ComponentInstanceEvent(
+              ContainerId.fromString(container.getId()),
+                  ComponentInstanceEventType.UPGRADE);
+          LOG.info("Upgrade container {}", container.getId());
+          component.dispatcher.getEventHandler().handle(upgradeEvent);
+        }
+      });
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e557c6bd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentEvent.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentEvent.java
index 84caa77..643961d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentEvent.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentEvent.java
@@ -35,6 +35,7 @@ public class ComponentEvent extends AbstractEvent<ComponentEventType> {
   private ContainerId containerId;
   private org.apache.hadoop.yarn.service.api.records.Component targetSpec;
   private String upgradeVersion;
+  private boolean expressUpgrade;
 
   public ContainerId getContainerId() {
     return containerId;
@@ -113,4 +114,13 @@ public class ComponentEvent extends AbstractEvent<ComponentEventType> {
     this.upgradeVersion = upgradeVersion;
     return this;
   }
+
+  public boolean isExpressUpgrade() {
+    return expressUpgrade;
+  }
+
+  public ComponentEvent setExpressUpgrade(boolean expressUpgrade) {
+    this.expressUpgrade = expressUpgrade;
+    return this;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e557c6bd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java
index 11a6caa..ed5e68e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java
@@ -380,6 +380,11 @@ public class ComponentInstance implements EventHandler<ComponentInstanceEvent>,
     @Override
     public void transition(ComponentInstance compInstance,
         ComponentInstanceEvent event) {
+      if (!compInstance.containerSpec.getState().equals(
+          ContainerState.NEEDS_UPGRADE)) {
+        //nothing to upgrade. this may happen with express upgrade.
+        return;
+      }
       compInstance.containerSpec.setState(ContainerState.UPGRADING);
       compInstance.component.decContainersReady(false);
       ComponentEvent upgradeEvent = compInstance.component.getUpgradeEvent();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e557c6bd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/utils/ServiceApiUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/utils/ServiceApiUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/utils/ServiceApiUtil.java
index 9219569..b588e88 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/utils/ServiceApiUtil.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/utils/ServiceApiUtil.java
@@ -638,6 +638,32 @@ public class ServiceApiUtil {
     return containerNeedUpgrade;
   }
 
+  /**
+   * Validates the components that are requested are stable for upgrade.
+   * It returns the instances of the components which are in ready state.
+   */
+  public static List<Container> validateAndResolveCompsStable(
+      Service liveService, Collection<String> compNames) throws YarnException {
+    Preconditions.checkNotNull(compNames);
+    HashSet<String> requestedComps = Sets.newHashSet(compNames);
+    List<Container> containerNeedUpgrade = new ArrayList<>();
+    for (Component liveComp : liveService.getComponents()) {
+      if (requestedComps.contains(liveComp.getName())) {
+        if (!liveComp.getState().equals(ComponentState.STABLE)) {
+          // Nothing to upgrade
+          throw new YarnException(String.format(
+              ERROR_COMP_DOES_NOT_NEED_UPGRADE, liveComp.getName()));
+        }
+        liveComp.getContainers().forEach(liveContainer -> {
+          if (liveContainer.getState().equals(ContainerState.READY)) {
+            containerNeedUpgrade.add(liveContainer);
+          }
+        });
+      }
+    }
+    return containerNeedUpgrade;
+  }
+
   private static String parseComponentName(String componentInstanceName)
       throws YarnException {
     int idx = componentInstanceName.lastIndexOf('-');
@@ -651,4 +677,22 @@ public class ServiceApiUtil {
   public static String $(String s) {
     return "${" + s +"}";
   }
+
+  public static List<String> resolveCompsDependency(Service service) {
+    List<String> components = new ArrayList<String>();
+    for (Component component : service.getComponents()) {
+      int depSize = component.getDependencies().size();
+      if (!components.contains(component.getName())) {
+        components.add(component.getName());
+      }
+      if (depSize != 0) {
+        for (String depComp : component.getDependencies()) {
+          if (!components.contains(depComp)) {
+            components.add(0, depComp);
+          }
+        }
+      }
+    }
+    return components;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e557c6bd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/proto/ClientAMProtocol.proto
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/proto/ClientAMProtocol.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/proto/ClientAMProtocol.proto
index 6166ded..169f765 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/proto/ClientAMProtocol.proto
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/proto/ClientAMProtocol.proto
@@ -66,6 +66,7 @@ message StopResponseProto {
 message UpgradeServiceRequestProto {
   optional string version = 1;
   optional bool autoFinalize = 2;
+  optional bool expressUpgrade = 3;
 }
 
 message UpgradeServiceResponseProto {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e557c6bd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestServiceApiUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestServiceApiUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestServiceApiUtil.java
deleted file mode 100644
index c2a80e7..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestServiceApiUtil.java
+++ /dev/null
@@ -1,653 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.yarn.service;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.registry.client.api.RegistryConstants;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.service.api.records.Artifact;
-import org.apache.hadoop.yarn.service.api.records.Component;
-import org.apache.hadoop.yarn.service.api.records.KerberosPrincipal;
-import org.apache.hadoop.yarn.service.api.records.PlacementConstraint;
-import org.apache.hadoop.yarn.service.api.records.PlacementPolicy;
-import org.apache.hadoop.yarn.service.api.records.PlacementScope;
-import org.apache.hadoop.yarn.service.api.records.PlacementType;
-import org.apache.hadoop.yarn.service.api.records.Resource;
-import org.apache.hadoop.yarn.service.api.records.Service;
-import org.apache.hadoop.yarn.service.exceptions.RestApiErrorMessages;
-import org.apache.hadoop.yarn.service.utils.ServiceApiUtil;
-import org.apache.hadoop.yarn.service.utils.SliderFileSystem;
-import org.junit.Assert;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-
-import static org.apache.hadoop.yarn.service.conf.RestApiConstants.DEFAULT_UNLIMITED_LIFETIME;
-import static org.apache.hadoop.yarn.service.exceptions.RestApiErrorMessages.*;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-
-/**
- * Test for ServiceApiUtil helper methods.
- */
-public class TestServiceApiUtil {
-  private static final Logger LOG = LoggerFactory
-      .getLogger(TestServiceApiUtil.class);
-  private static final String EXCEPTION_PREFIX = "Should have thrown " +
-      "exception: ";
-  private static final String NO_EXCEPTION_PREFIX = "Should not have thrown " +
-      "exception: ";
-
-  private static final String LEN_64_STR =
-      "abcdefghijklmnopqrstuvwxyz0123456789abcdefghijklmnopqrstuvwxyz01";
-
-  private static final YarnConfiguration CONF_DEFAULT_DNS = new
-      YarnConfiguration();
-  private static final YarnConfiguration CONF_DNS_ENABLED = new
-      YarnConfiguration();
-
-  @BeforeClass
-  public static void init() {
-    CONF_DNS_ENABLED.setBoolean(RegistryConstants.KEY_DNS_ENABLED, true);
-  }
-
-  @Test(timeout = 90000)
-  public void testResourceValidation() throws Exception {
-    assertEquals(RegistryConstants.MAX_FQDN_LABEL_LENGTH + 1, LEN_64_STR
-        .length());
-
-    SliderFileSystem sfs = ServiceTestUtils.initMockFs();
-
-    Service app = new Service();
-
-    // no name
-    try {
-      ServiceApiUtil.validateAndResolveService(app, sfs, CONF_DNS_ENABLED);
-      Assert.fail(EXCEPTION_PREFIX + "service with no name");
-    } catch (IllegalArgumentException e) {
-      assertEquals(ERROR_APPLICATION_NAME_INVALID, e.getMessage());
-    }
-
-    app.setName("test");
-    // no version
-    try {
-      ServiceApiUtil.validateAndResolveService(app, sfs, CONF_DNS_ENABLED);
-      Assert.fail(EXCEPTION_PREFIX + " service with no version");
-    } catch (IllegalArgumentException e) {
-      assertEquals(String.format(ERROR_APPLICATION_VERSION_INVALID,
-          app.getName()), e.getMessage());
-    }
-
-    app.setVersion("v1");
-    // bad format name
-    String[] badNames = {"4finance", "Finance", "finance@home", LEN_64_STR};
-    for (String badName : badNames) {
-      app.setName(badName);
-      try {
-        ServiceApiUtil.validateAndResolveService(app, sfs, CONF_DNS_ENABLED);
-        Assert.fail(EXCEPTION_PREFIX + "service with bad name " + badName);
-      } catch (IllegalArgumentException e) {
-
-      }
-    }
-
-    // launch command not specified
-    app.setName(LEN_64_STR);
-    Component comp = new Component().name("comp1");
-    app.addComponent(comp);
-    try {
-      ServiceApiUtil.validateAndResolveService(app, sfs, CONF_DEFAULT_DNS);
-      Assert.fail(EXCEPTION_PREFIX + "service with no launch command");
-    } catch (IllegalArgumentException e) {
-      assertEquals(RestApiErrorMessages.ERROR_ABSENT_LAUNCH_COMMAND,
-          e.getMessage());
-    }
-
-    // launch command not specified
-    app.setName(LEN_64_STR.substring(0, RegistryConstants
-        .MAX_FQDN_LABEL_LENGTH));
-    try {
-      ServiceApiUtil.validateAndResolveService(app, sfs, CONF_DNS_ENABLED);
-      Assert.fail(EXCEPTION_PREFIX + "service with no launch command");
-    } catch (IllegalArgumentException e) {
-      assertEquals(RestApiErrorMessages.ERROR_ABSENT_LAUNCH_COMMAND,
-          e.getMessage());
-    }
-
-    // memory not specified
-    comp.setLaunchCommand("sleep 1");
-    Resource res = new Resource();
-    app.setResource(res);
-    try {
-      ServiceApiUtil.validateAndResolveService(app, sfs, CONF_DNS_ENABLED);
-      Assert.fail(EXCEPTION_PREFIX + "service with no memory");
-    } catch (IllegalArgumentException e) {
-      assertEquals(String.format(
-          RestApiErrorMessages.ERROR_RESOURCE_MEMORY_FOR_COMP_INVALID,
-          comp.getName()), e.getMessage());
-    }
-
-    // invalid no of cpus
-    res.setMemory("100mb");
-    res.setCpus(-2);
-    try {
-      ServiceApiUtil.validateAndResolveService(app, sfs, CONF_DNS_ENABLED);
-      Assert.fail(
-          EXCEPTION_PREFIX + "service with invalid no of cpus");
-    } catch (IllegalArgumentException e) {
-      assertEquals(String.format(
-          RestApiErrorMessages.ERROR_RESOURCE_CPUS_FOR_COMP_INVALID_RANGE,
-          comp.getName()), e.getMessage());
-    }
-
-    // number of containers not specified
-    res.setCpus(2);
-    try {
-      ServiceApiUtil.validateAndResolveService(app, sfs, CONF_DNS_ENABLED);
-      Assert.fail(EXCEPTION_PREFIX + "service with no container count");
-    } catch (IllegalArgumentException e) {
-      Assert.assertTrue(e.getMessage()
-          .contains(ERROR_CONTAINERS_COUNT_INVALID));
-    }
-
-    // specifying profile along with cpus/memory raises exception
-    res.setProfile("hbase_finance_large");
-    try {
-      ServiceApiUtil.validateAndResolveService(app, sfs, CONF_DNS_ENABLED);
-      Assert.fail(EXCEPTION_PREFIX
-          + "service with resource profile along with cpus/memory");
-    } catch (IllegalArgumentException e) {
-      assertEquals(String.format(RestApiErrorMessages
-              .ERROR_RESOURCE_PROFILE_MULTIPLE_VALUES_FOR_COMP_NOT_SUPPORTED,
-          comp.getName()),
-          e.getMessage());
-    }
-
-    // currently resource profile alone is not supported.
-    // TODO: remove the next test once resource profile alone is supported.
-    res.setCpus(null);
-    res.setMemory(null);
-    try {
-      ServiceApiUtil.validateAndResolveService(app, sfs, CONF_DNS_ENABLED);
-      Assert.fail(EXCEPTION_PREFIX + "service with resource profile only");
-    } catch (IllegalArgumentException e) {
-      assertEquals(ERROR_RESOURCE_PROFILE_NOT_SUPPORTED_YET,
-          e.getMessage());
-    }
-
-    // unset profile here and add cpus/memory back
-    res.setProfile(null);
-    res.setCpus(2);
-    res.setMemory("2gb");
-
-    // null number of containers
-    try {
-      ServiceApiUtil.validateAndResolveService(app, sfs, CONF_DNS_ENABLED);
-      Assert.fail(EXCEPTION_PREFIX + "null number of containers");
-    } catch (IllegalArgumentException e) {
-      Assert.assertTrue(e.getMessage()
-          .startsWith(ERROR_CONTAINERS_COUNT_INVALID));
-    }
-  }
-
-  @Test
-  public void testArtifacts() throws IOException {
-    SliderFileSystem sfs = ServiceTestUtils.initMockFs();
-
-    Service app = new Service();
-    app.setName("service1");
-    app.setVersion("v1");
-    Resource res = new Resource();
-    app.setResource(res);
-    res.setMemory("512M");
-
-    // no artifact id fails with default type
-    Artifact artifact = new Artifact();
-    app.setArtifact(artifact);
-    String compName = "comp1";
-    Component comp = ServiceTestUtils.createComponent(compName);
-
-    app.setComponents(Collections.singletonList(comp));
-    try {
-      ServiceApiUtil.validateAndResolveService(app, sfs, CONF_DNS_ENABLED);
-      Assert.fail(EXCEPTION_PREFIX + "service with no artifact id");
-    } catch (IllegalArgumentException e) {
-      assertEquals(String.format(ERROR_ARTIFACT_ID_FOR_COMP_INVALID, compName),
-          e.getMessage());
-    }
-
-    // no artifact id fails with SERVICE type
-    artifact.setType(Artifact.TypeEnum.SERVICE);
-    try {
-      ServiceApiUtil.validateAndResolveService(app, sfs, CONF_DNS_ENABLED);
-      Assert.fail(EXCEPTION_PREFIX + "service with no artifact id");
-    } catch (IllegalArgumentException e) {
-      assertEquals(ERROR_ARTIFACT_ID_INVALID, e.getMessage());
-    }
-
-    // no artifact id fails with TARBALL type
-    artifact.setType(Artifact.TypeEnum.TARBALL);
-    try {
-      ServiceApiUtil.validateAndResolveService(app, sfs, CONF_DNS_ENABLED);
-      Assert.fail(EXCEPTION_PREFIX + "service with no artifact id");
-    } catch (IllegalArgumentException e) {
-      assertEquals(String.format(ERROR_ARTIFACT_ID_FOR_COMP_INVALID, compName),
-          e.getMessage());
-    }
-
-    // everything valid here
-    artifact.setType(Artifact.TypeEnum.DOCKER);
-    artifact.setId("docker.io/centos:centos7");
-    try {
-      ServiceApiUtil.validateAndResolveService(app, sfs, CONF_DNS_ENABLED);
-    } catch (IllegalArgumentException e) {
-      LOG.error("service attributes specified should be valid here", e);
-      Assert.fail(NO_EXCEPTION_PREFIX + e.getMessage());
-    }
-
-    assertEquals(app.getLifetime(), DEFAULT_UNLIMITED_LIFETIME);
-  }
-
-  private static Resource createValidResource() {
-    Resource res = new Resource();
-    res.setMemory("512M");
-    return res;
-  }
-
-  private static Component createValidComponent(String compName) {
-    Component comp = new Component();
-    comp.setName(compName);
-    comp.setResource(createValidResource());
-    comp.setNumberOfContainers(1L);
-    comp.setLaunchCommand("sleep 1");
-    return comp;
-  }
-
-  private static Service createValidApplication(String compName) {
-    Service app = new Service();
-    app.setName("name");
-    app.setVersion("v1");
-    app.setResource(createValidResource());
-    if (compName != null) {
-      app.addComponent(createValidComponent(compName));
-    }
-    return app;
-  }
-
-  @Test
-  public void testExternalApplication() throws IOException {
-    Service ext = createValidApplication("comp1");
-    SliderFileSystem sfs = ServiceTestUtils.initMockFs(ext);
-
-    Service app = createValidApplication(null);
-
-    Artifact artifact = new Artifact();
-    artifact.setType(Artifact.TypeEnum.SERVICE);
-    artifact.setId("id");
-    app.setArtifact(artifact);
-    app.addComponent(ServiceTestUtils.createComponent("comp2"));
-    try {
-      ServiceApiUtil.validateAndResolveService(app, sfs, CONF_DNS_ENABLED);
-    } catch (IllegalArgumentException e) {
-      Assert.fail(NO_EXCEPTION_PREFIX + e.getMessage());
-    }
-
-    assertEquals(1, app.getComponents().size());
-    assertNotNull(app.getComponent("comp2"));
-  }
-
-  @Test
-  public void testDuplicateComponents() throws IOException {
-    SliderFileSystem sfs = ServiceTestUtils.initMockFs();
-
-    String compName = "comp1";
-    Service app = createValidApplication(compName);
-    app.addComponent(createValidComponent(compName));
-
-    // duplicate component name fails
-    try {
-      ServiceApiUtil.validateAndResolveService(app, sfs, CONF_DNS_ENABLED);
-      Assert.fail(EXCEPTION_PREFIX + "service with component collision");
-    } catch (IllegalArgumentException e) {
-      assertEquals("Component name collision: " + compName, e.getMessage());
-    }
-  }
-
-  @Test
-  public void testComponentNameSameAsServiceName() throws IOException {
-    SliderFileSystem sfs = ServiceTestUtils.initMockFs();
-    Service app = new Service();
-    app.setName("test");
-    app.setVersion("v1");
-    app.addComponent(createValidComponent("test"));
-
-    //component name same as service name
-    try {
-      ServiceApiUtil.validateAndResolveService(app, sfs, CONF_DNS_ENABLED);
-      Assert.fail(EXCEPTION_PREFIX + "component name matches service name");
-    } catch (IllegalArgumentException e) {
-      assertEquals("Component name test must not be same as service name test",
-          e.getMessage());
-    }
-  }
-
-  @Test
-  public void testExternalDuplicateComponent() throws IOException {
-    Service ext = createValidApplication("comp1");
-    SliderFileSystem sfs = ServiceTestUtils.initMockFs(ext);
-
-    Service app = createValidApplication("comp1");
-    Artifact artifact = new Artifact();
-    artifact.setType(Artifact.TypeEnum.SERVICE);
-    artifact.setId("id");
-    app.getComponent("comp1").setArtifact(artifact);
-
-    // duplicate component name okay in the case of SERVICE component
-    try {
-      ServiceApiUtil.validateAndResolveService(app, sfs, CONF_DNS_ENABLED);
-    } catch (IllegalArgumentException e) {
-      Assert.fail(NO_EXCEPTION_PREFIX + e.getMessage());
-    }
-  }
-
-  @Test
-  public void testExternalComponent() throws IOException {
-    Service ext = createValidApplication("comp1");
-    SliderFileSystem sfs = ServiceTestUtils.initMockFs(ext);
-
-    Service app = createValidApplication("comp2");
-    Artifact artifact = new Artifact();
-    artifact.setType(Artifact.TypeEnum.SERVICE);
-    artifact.setId("id");
-    app.setArtifact(artifact);
-
-    try {
-      ServiceApiUtil.validateAndResolveService(app, sfs, CONF_DNS_ENABLED);
-    } catch (IllegalArgumentException e) {
-      Assert.fail(NO_EXCEPTION_PREFIX + e.getMessage());
-    }
-
-    assertEquals(1, app.getComponents().size());
-    // artifact ID not inherited from global
-    assertNotNull(app.getComponent("comp2"));
-
-    // set SERVICE artifact id on component
-    app.getComponent("comp2").setArtifact(artifact);
-
-    try {
-      ServiceApiUtil.validateAndResolveService(app, sfs, CONF_DNS_ENABLED);
-    } catch (IllegalArgumentException e) {
-      Assert.fail(NO_EXCEPTION_PREFIX + e.getMessage());
-    }
-
-    assertEquals(1, app.getComponents().size());
-    // original component replaced by external component
-    assertNotNull(app.getComponent("comp1"));
-  }
-
-  public static void verifyDependencySorting(List<Component> components,
-      Component... expectedSorting) {
-    Collection<Component> actualSorting = ServiceApiUtil.sortByDependencies(
-        components);
-    assertEquals(expectedSorting.length, actualSorting.size());
-    int i = 0;
-    for (Component component : actualSorting) {
-      assertEquals(expectedSorting[i++], component);
-    }
-  }
-
-  @Test
-  public void testDependencySorting() throws IOException {
-    Component a = ServiceTestUtils.createComponent("a");
-    Component b = ServiceTestUtils.createComponent("b");
-    Component c = ServiceTestUtils.createComponent("c");
-    Component d =
-        ServiceTestUtils.createComponent("d").dependencies(Arrays.asList("c"));
-    Component e = ServiceTestUtils.createComponent("e")
-        .dependencies(Arrays.asList("b", "d"));
-
-    verifyDependencySorting(Arrays.asList(a, b, c), a, b, c);
-    verifyDependencySorting(Arrays.asList(c, a, b), c, a, b);
-    verifyDependencySorting(Arrays.asList(a, b, c, d, e), a, b, c, d, e);
-    verifyDependencySorting(Arrays.asList(e, d, c, b, a), c, b, a, d, e);
-
-    c.setDependencies(Arrays.asList("e"));
-    try {
-      verifyDependencySorting(Arrays.asList(a, b, c, d, e));
-      Assert.fail(EXCEPTION_PREFIX + "components with dependency cycle");
-    } catch (IllegalArgumentException ex) {
-      assertEquals(String.format(
-          RestApiErrorMessages.ERROR_DEPENDENCY_CYCLE, Arrays.asList(c, d,
-              e)), ex.getMessage());
-    }
-
-    SliderFileSystem sfs = ServiceTestUtils.initMockFs();
-    Service service = createValidApplication(null);
-    service.setComponents(Arrays.asList(c, d, e));
-    try {
-      ServiceApiUtil.validateAndResolveService(service, sfs,
-          CONF_DEFAULT_DNS);
-      Assert.fail(EXCEPTION_PREFIX + "components with bad dependencies");
-    } catch (IllegalArgumentException ex) {
-      assertEquals(String.format(
-          RestApiErrorMessages.ERROR_DEPENDENCY_INVALID, "b", "e"), ex
-          .getMessage());
-    }
-  }
-
-  @Test
-  public void testInvalidComponent() throws IOException {
-    SliderFileSystem sfs = ServiceTestUtils.initMockFs();
-    testComponent(sfs);
-  }
-
-  @Test
-  public void testValidateCompName() {
-    String[] invalidNames = {
-        "EXAMPLE", // UPPER case not allowed
-        "example_app" // underscore not allowed.
-    };
-    for (String name : invalidNames) {
-      try {
-        ServiceApiUtil.validateNameFormat(name, new Configuration());
-        Assert.fail();
-      } catch (IllegalArgumentException ex) {
-        ex.printStackTrace();
-      }
-    }
-  }
-
-  private static void testComponent(SliderFileSystem sfs)
-      throws IOException {
-    int maxLen = RegistryConstants.MAX_FQDN_LABEL_LENGTH;
-    assertEquals(19, Long.toString(Long.MAX_VALUE).length());
-    maxLen = maxLen - Long.toString(Long.MAX_VALUE).length();
-
-    String compName = LEN_64_STR.substring(0, maxLen + 1);
-    Service app = createValidApplication(null);
-    app.addComponent(createValidComponent(compName));
-
-    // invalid component name fails if dns is enabled
-    try {
-      ServiceApiUtil.validateAndResolveService(app, sfs, CONF_DNS_ENABLED);
-      Assert.fail(EXCEPTION_PREFIX + "service with invalid component name");
-    } catch (IllegalArgumentException e) {
-      assertEquals(String.format(RestApiErrorMessages
-          .ERROR_COMPONENT_NAME_INVALID, maxLen, compName), e.getMessage());
-    }
-
-    // does not fail if dns is disabled
-    try {
-      ServiceApiUtil.validateAndResolveService(app, sfs, CONF_DEFAULT_DNS);
-    } catch (IllegalArgumentException e) {
-      Assert.fail(NO_EXCEPTION_PREFIX + e.getMessage());
-    }
-
-    compName = LEN_64_STR.substring(0, maxLen);
-    app = createValidApplication(null);
-    app.addComponent(createValidComponent(compName));
-
-    // does not fail
-    try {
-      ServiceApiUtil.validateAndResolveService(app, sfs, CONF_DNS_ENABLED);
-    } catch (IllegalArgumentException e) {
-      Assert.fail(NO_EXCEPTION_PREFIX + e.getMessage());
-    }
-  }
-
-  @Test
-  public void testPlacementPolicy() throws IOException {
-    SliderFileSystem sfs = ServiceTestUtils.initMockFs();
-    Service app = createValidApplication("comp-a");
-    Component comp = app.getComponents().get(0);
-    PlacementPolicy pp = new PlacementPolicy();
-    PlacementConstraint pc = new PlacementConstraint();
-    pc.setName("CA1");
-    pp.setConstraints(Collections.singletonList(pc));
-    comp.setPlacementPolicy(pp);
-
-    try {
-      ServiceApiUtil.validateAndResolveService(app, sfs, CONF_DNS_ENABLED);
-      Assert.fail(EXCEPTION_PREFIX + "constraint with no type");
-    } catch (IllegalArgumentException e) {
-      assertEquals(String.format(
-          RestApiErrorMessages.ERROR_PLACEMENT_POLICY_CONSTRAINT_TYPE_NULL,
-          "CA1 ", "comp-a"), e.getMessage());
-    }
-
-    // Set the type
-    pc.setType(PlacementType.ANTI_AFFINITY);
-
-    try {
-      ServiceApiUtil.validateAndResolveService(app, sfs, CONF_DNS_ENABLED);
-      Assert.fail(EXCEPTION_PREFIX + "constraint with no scope");
-    } catch (IllegalArgumentException e) {
-      assertEquals(String.format(
-          RestApiErrorMessages.ERROR_PLACEMENT_POLICY_CONSTRAINT_SCOPE_NULL,
-          "CA1 ", "comp-a"), e.getMessage());
-    }
-
-    // Set the scope
-    pc.setScope(PlacementScope.NODE);
-
-    try {
-      ServiceApiUtil.validateAndResolveService(app, sfs, CONF_DNS_ENABLED);
-      Assert.fail(EXCEPTION_PREFIX + "constraint with no tag(s)");
-    } catch (IllegalArgumentException e) {
-      assertEquals(String.format(
-          RestApiErrorMessages.ERROR_PLACEMENT_POLICY_CONSTRAINT_TAGS_NULL,
-          "CA1 ", "comp-a"), e.getMessage());
-    }
-
-    // Set a target tag - but an invalid one
-    pc.setTargetTags(Collections.singletonList("comp-invalid"));
-
-    try {
-      ServiceApiUtil.validateAndResolveService(app, sfs, CONF_DNS_ENABLED);
-      Assert.fail(EXCEPTION_PREFIX + "constraint with invalid tag name");
-    } catch (IllegalArgumentException e) {
-      assertEquals(
-          String.format(
-              RestApiErrorMessages.ERROR_PLACEMENT_POLICY_TAG_NAME_NOT_SAME,
-              "comp-invalid", "comp-a", "comp-a", "comp-a"),
-          e.getMessage());
-    }
-
-    // Set valid target tags now
-    pc.setTargetTags(Collections.singletonList("comp-a"));
-
-    // Finally it should succeed
-    try {
-      ServiceApiUtil.validateAndResolveService(app, sfs, CONF_DNS_ENABLED);
-    } catch (IllegalArgumentException e) {
-      Assert.fail(NO_EXCEPTION_PREFIX + e.getMessage());
-    }
-  }
-
-  @Test
-  public void testKerberosPrincipal() throws IOException {
-    SliderFileSystem sfs = ServiceTestUtils.initMockFs();
-    Service app = createValidApplication("comp-a");
-    KerberosPrincipal kp = new KerberosPrincipal();
-    kp.setKeytab("/some/path");
-    kp.setPrincipalName("user/_HOST@domain.com");
-    app.setKerberosPrincipal(kp);
-
-    try {
-      ServiceApiUtil.validateKerberosPrincipal(app.getKerberosPrincipal());
-      Assert.fail(EXCEPTION_PREFIX + "service with invalid keytab URI scheme");
-    } catch (IllegalArgumentException e) {
-      assertEquals(
-          String.format(RestApiErrorMessages.ERROR_KEYTAB_URI_SCHEME_INVALID,
-              kp.getKeytab()),
-          e.getMessage());
-    }
-
-    kp.setKeytab("/ blank / in / paths");
-    try {
-      ServiceApiUtil.validateKerberosPrincipal(app.getKerberosPrincipal());
-      Assert.fail(EXCEPTION_PREFIX + "service with invalid keytab");
-    } catch (IllegalArgumentException e) {
-      // strip out the %s at the end of the RestApiErrorMessages string constant
-      assertTrue(e.getMessage().contains(
-          RestApiErrorMessages.ERROR_KEYTAB_URI_INVALID.substring(0,
-              RestApiErrorMessages.ERROR_KEYTAB_URI_INVALID.length() - 2)));
-    }
-
-    kp.setKeytab("file:///tmp/a.keytab");
-    // now it should succeed
-    try {
-      ServiceApiUtil.validateKerberosPrincipal(app.getKerberosPrincipal());
-    } catch (IllegalArgumentException e) {
-      Assert.fail(NO_EXCEPTION_PREFIX + e.getMessage());
-    }
-  }
-
-  @Test
-  public void testKerberosPrincipalNameFormat() throws IOException {
-    Service app = createValidApplication("comp-a");
-    KerberosPrincipal kp = new KerberosPrincipal();
-    kp.setPrincipalName("user@domain.com");
-    app.setKerberosPrincipal(kp);
-
-    try {
-      ServiceApiUtil.validateKerberosPrincipal(app.getKerberosPrincipal());
-      Assert.fail(EXCEPTION_PREFIX + "service with invalid principal name format.");
-    } catch (IllegalArgumentException e) {
-      assertEquals(
-          String.format(RestApiErrorMessages.ERROR_KERBEROS_PRINCIPAL_NAME_FORMAT,
-              kp.getPrincipalName()),
-          e.getMessage());
-    }
-
-    kp.setPrincipalName("user/_HOST@domain.com");
-    try {
-      ServiceApiUtil.validateKerberosPrincipal(app.getKerberosPrincipal());
-    } catch (IllegalArgumentException e) {
-      Assert.fail(NO_EXCEPTION_PREFIX + e.getMessage());
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e557c6bd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestServiceManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestServiceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestServiceManager.java
index fc509f1..a37cabe 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestServiceManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestServiceManager.java
@@ -19,23 +19,26 @@
 package org.apache.hadoop.yarn.service;
 
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.registry.client.api.RegistryOperations;
+import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.service.api.records.Artifact;
 import org.apache.hadoop.yarn.service.api.records.ComponentState;
+import org.apache.hadoop.yarn.service.api.records.ContainerState;
 import org.apache.hadoop.yarn.service.api.records.Service;
 import org.apache.hadoop.yarn.service.api.records.ServiceState;
+import org.apache.hadoop.yarn.service.component.instance.ComponentInstance;
+import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEvent;
+import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEventType;
 import org.apache.hadoop.yarn.service.exceptions.SliderException;
-import org.apache.hadoop.yarn.service.registry.YarnRegistryViewForProviders;
 import org.apache.hadoop.yarn.service.utils.ServiceApiUtil;
 import org.junit.Assert;
 import org.junit.Rule;
 import org.junit.Test;
 
 import java.io.IOException;
-import java.util.Map;
-
-import static org.mockito.Mockito.mock;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.TimeoutException;
 
 /**
  * Tests for {@link ServiceManager}.
@@ -46,117 +49,120 @@ public class TestServiceManager {
   public ServiceTestUtils.ServiceFSWatcher rule =
       new ServiceTestUtils.ServiceFSWatcher();
 
-  @Test
-  public void testUpgrade() throws IOException, SliderException {
-    ServiceManager serviceManager = createTestServiceManager("testUpgrade");
-    upgrade(serviceManager, "v2", false, false);
+  @Test (timeout = TIMEOUT)
+  public void testUpgrade() throws Exception {
+    ServiceContext context = createServiceContext("testUpgrade");
+    initUpgrade(context, "v2", false, false, false);
     Assert.assertEquals("service not upgraded", ServiceState.UPGRADING,
-        serviceManager.getServiceSpec().getState());
+        context.getServiceManager().getServiceSpec().getState());
   }
 
-  @Test
+  @Test (timeout = TIMEOUT)
   public void testRestartNothingToUpgrade()
-      throws IOException, SliderException {
-    ServiceManager serviceManager = createTestServiceManager(
+      throws Exception {
+    ServiceContext context = createServiceContext(
         "testRestartNothingToUpgrade");
-    upgrade(serviceManager, "v2", false, false);
-
-    //make components stable
-    serviceManager.getServiceSpec().getComponents().forEach(comp -> {
-      comp.setState(ComponentState.STABLE);
-    });
-    serviceManager.handle(new ServiceEvent(ServiceEventType.START));
+    initUpgrade(context, "v2", false, false, false);
+    ServiceManager manager = context.getServiceManager();
+    //make components stable by upgrading all instances
+    upgradeAllInstances(context);
+
+    context.scheduler.getDispatcher().getEventHandler().handle(
+        new ServiceEvent(ServiceEventType.START));
+    GenericTestUtils.waitFor(()->
+        context.service.getState().equals(ServiceState.STABLE),
+        CHECK_EVERY_MILLIS, TIMEOUT);
     Assert.assertEquals("service not re-started", ServiceState.STABLE,
-        serviceManager.getServiceSpec().getState());
+        manager.getServiceSpec().getState());
   }
 
-  @Test
-  public void testAutoFinalizeNothingToUpgrade() throws IOException,
-      SliderException {
-    ServiceManager serviceManager = createTestServiceManager(
+  @Test(timeout = TIMEOUT)
+  public void testAutoFinalizeNothingToUpgrade() throws Exception {
+    ServiceContext context = createServiceContext(
         "testAutoFinalizeNothingToUpgrade");
-    upgrade(serviceManager, "v2", false, true);
-
-    //make components stable
-    serviceManager.getServiceSpec().getComponents().forEach(comp ->
-        comp.setState(ComponentState.STABLE));
-    serviceManager.handle(new ServiceEvent(ServiceEventType.CHECK_STABLE));
+    initUpgrade(context, "v2", false, true, false);
+    ServiceManager manager = context.getServiceManager();
+    //make components stable by upgrading all instances
+    upgradeAllInstances(context);
+
+    GenericTestUtils.waitFor(()->
+        context.service.getState().equals(ServiceState.STABLE),
+        CHECK_EVERY_MILLIS, TIMEOUT);
     Assert.assertEquals("service stable", ServiceState.STABLE,
-        serviceManager.getServiceSpec().getState());
+        manager.getServiceSpec().getState());
   }
 
-  @Test
+  @Test(timeout = TIMEOUT)
   public void testRestartWithPendingUpgrade()
-      throws IOException, SliderException {
-    ServiceManager serviceManager = createTestServiceManager("testRestart");
-    upgrade(serviceManager, "v2", true, false);
-    serviceManager.handle(new ServiceEvent(ServiceEventType.START));
+      throws Exception {
+    ServiceContext context = createServiceContext("testRestart");
+    initUpgrade(context, "v2", true, false, false);
+    ServiceManager manager = context.getServiceManager();
+
+    context.scheduler.getDispatcher().getEventHandler().handle(
+        new ServiceEvent(ServiceEventType.START));
+    context.scheduler.getDispatcher().stop();
     Assert.assertEquals("service should still be upgrading",
-        ServiceState.UPGRADING, serviceManager.getServiceSpec().getState());
+        ServiceState.UPGRADING, manager.getServiceSpec().getState());
   }
 
-  @Test
-  public void testCheckState() throws IOException, SliderException {
-    ServiceManager serviceManager = createTestServiceManager(
-        "testCheckState");
-    upgrade(serviceManager, "v2", true, false);
+  @Test(timeout = TIMEOUT)
+  public void testFinalize() throws Exception {
+    ServiceContext context = createServiceContext("testCheckState");
+    initUpgrade(context, "v2", true, false, false);
+    ServiceManager manager = context.getServiceManager();
     Assert.assertEquals("service not upgrading", ServiceState.UPGRADING,
-        serviceManager.getServiceSpec().getState());
+        manager.getServiceSpec().getState());
 
-    // make components stable
-    serviceManager.getServiceSpec().getComponents().forEach(comp -> {
-      comp.setState(ComponentState.STABLE);
-    });
-    ServiceEvent checkStable = new ServiceEvent(ServiceEventType.CHECK_STABLE);
-    serviceManager.handle(checkStable);
-    Assert.assertEquals("service should still be upgrading",
-        ServiceState.UPGRADING, serviceManager.getServiceSpec().getState());
+    //make components stable by upgrading all instances
+    upgradeAllInstances(context);
 
     // finalize service
-    ServiceEvent restart = new ServiceEvent(ServiceEventType.START);
-    serviceManager.handle(restart);
-    Assert.assertEquals("service not stable",
-        ServiceState.STABLE, serviceManager.getServiceSpec().getState());
+    context.scheduler.getDispatcher().getEventHandler().handle(
+        new ServiceEvent(ServiceEventType.START));
+    GenericTestUtils.waitFor(()->
+        context.service.getState().equals(ServiceState.STABLE),
+        CHECK_EVERY_MILLIS, TIMEOUT);
+    Assert.assertEquals("service not re-started", ServiceState.STABLE,
+        manager.getServiceSpec().getState());
 
-    validateUpgradeFinalization(serviceManager.getName(), "v2");
+    validateUpgradeFinalization(manager.getName(), "v2");
   }
 
-  @Test
-  public void testCheckStateAutoFinalize() throws IOException, SliderException {
-    ServiceManager serviceManager = createTestServiceManager(
-        "testCheckState");
-    serviceManager.getServiceSpec().setState(
+  @Test(timeout = TIMEOUT)
+  public void testAutoFinalize() throws Exception {
+    ServiceContext context = createServiceContext("testCheckStateAutoFinalize");
+    ServiceManager manager = context.getServiceManager();
+    manager.getServiceSpec().setState(
         ServiceState.UPGRADING_AUTO_FINALIZE);
-    upgrade(serviceManager, "v2", true, true);
-    Assert.assertEquals("service not upgrading",
-        ServiceState.UPGRADING_AUTO_FINALIZE,
-        serviceManager.getServiceSpec().getState());
+    initUpgrade(context, "v2", true, true, false);
 
     // make components stable
-    serviceManager.getServiceSpec().getComponents().forEach(comp ->
-        comp.setState(ComponentState.STABLE));
-    ServiceEvent checkStable = new ServiceEvent(ServiceEventType.CHECK_STABLE);
-    serviceManager.handle(checkStable);
+    upgradeAllInstances(context);
+
+    GenericTestUtils.waitFor(() ->
+        context.service.getState().equals(ServiceState.STABLE),
+        CHECK_EVERY_MILLIS, TIMEOUT);
     Assert.assertEquals("service not stable",
-        ServiceState.STABLE, serviceManager.getServiceSpec().getState());
+        ServiceState.STABLE, manager.getServiceSpec().getState());
 
-    validateUpgradeFinalization(serviceManager.getName(), "v2");
+    validateUpgradeFinalization(manager.getName(), "v2");
   }
 
   @Test
-  public void testInvalidUpgrade() throws IOException, SliderException {
-    ServiceManager serviceManager = createTestServiceManager(
-        "testInvalidUpgrade");
-    serviceManager.getServiceSpec().setState(
+  public void testInvalidUpgrade() throws Exception {
+    ServiceContext serviceContext = createServiceContext("testInvalidUpgrade");
+    ServiceManager manager = serviceContext.getServiceManager();
+    manager.getServiceSpec().setState(
         ServiceState.UPGRADING_AUTO_FINALIZE);
     Service upgradedDef = ServiceTestUtils.createExampleApplication();
-    upgradedDef.setName(serviceManager.getName());
+    upgradedDef.setName(manager.getName());
     upgradedDef.setVersion("v2");
     upgradedDef.setLifetime(2L);
     writeUpgradedDef(upgradedDef);
 
     try {
-      serviceManager.processUpgradeRequest("v2", true);
+      manager.processUpgradeRequest("v2", true, false);
     } catch (Exception ex) {
       Assert.assertTrue(ex instanceof UnsupportedOperationException);
       return;
@@ -164,6 +170,32 @@ public class TestServiceManager {
     Assert.fail();
   }
 
+  @Test(timeout = TIMEOUT)
+  public void testExpressUpgrade() throws Exception {
+    ServiceContext context = createServiceContext("testExpressUpgrade");
+    ServiceManager manager = context.getServiceManager();
+    manager.getServiceSpec().setState(
+        ServiceState.EXPRESS_UPGRADING);
+    initUpgrade(context, "v2", true, true, true);
+
+    List<String> comps = ServiceApiUtil.resolveCompsDependency(context.service);
+    // wait till instances of first component are in upgrade
+    String comp1 = comps.get(0);
+    upgradeInstancesOf(context, comp1);
+
+    // wait till instances of second component are in upgrade
+    String comp2 = comps.get(1);
+    upgradeInstancesOf(context, comp2);
+
+    GenericTestUtils.waitFor(() ->
+            context.service.getState().equals(ServiceState.STABLE),
+        CHECK_EVERY_MILLIS, TIMEOUT);
+
+    Assert.assertEquals("service not stable",
+        ServiceState.STABLE, manager.getServiceSpec().getState());
+    validateUpgradeFinalization(manager.getName(), "v2");
+  }
+
   private void validateUpgradeFinalization(String serviceName,
       String expectedVersion) throws IOException {
     Service savedSpec = ServiceApiUtil.loadService(rule.getFs(), serviceName);
@@ -172,15 +204,16 @@ public class TestServiceManager {
     Assert.assertNotNull("app id not present", savedSpec.getId());
     Assert.assertEquals("state not stable", ServiceState.STABLE,
         savedSpec.getState());
-    savedSpec.getComponents().forEach(compSpec -> {
-      Assert.assertEquals("comp not stable", ComponentState.STABLE,
-          compSpec.getState());
-    });
+    savedSpec.getComponents().forEach(compSpec ->
+        Assert.assertEquals("comp not stable", ComponentState.STABLE,
+        compSpec.getState()));
   }
 
-  private void upgrade(ServiceManager serviceManager, String version,
-      boolean upgradeArtifact, boolean autoFinalize)
-      throws IOException, SliderException {
+  private void initUpgrade(ServiceContext context, String version,
+      boolean upgradeArtifact, boolean autoFinalize, boolean expressUpgrade)
+      throws IOException, SliderException, TimeoutException,
+      InterruptedException {
+    ServiceManager serviceManager = context.getServiceManager();
     Service upgradedDef = ServiceTestUtils.createExampleApplication();
     upgradedDef.setName(serviceManager.getName());
     upgradedDef.setVersion(version);
@@ -191,39 +224,81 @@ public class TestServiceManager {
       });
     }
     writeUpgradedDef(upgradedDef);
-    serviceManager.processUpgradeRequest(version, autoFinalize);
+    serviceManager.processUpgradeRequest(version, autoFinalize, expressUpgrade);
     ServiceEvent upgradeEvent = new ServiceEvent(ServiceEventType.UPGRADE);
-    upgradeEvent.setVersion(version);
-    if (autoFinalize) {
-      upgradeEvent.setAutoFinalize(true);
-    }
-    serviceManager.handle(upgradeEvent);
+    upgradeEvent.setVersion(version).setExpressUpgrade(expressUpgrade)
+        .setAutoFinalize(autoFinalize);
+
+    GenericTestUtils.waitFor(()-> {
+      ServiceState serviceState = context.service.getState();
+      if (serviceState.equals(ServiceState.UPGRADING) ||
+          serviceState.equals(ServiceState.UPGRADING_AUTO_FINALIZE) ||
+          serviceState.equals(ServiceState.EXPRESS_UPGRADING)) {
+        return true;
+      }
+      return false;
+    }, CHECK_EVERY_MILLIS, TIMEOUT);
+  }
+
+  private void upgradeAllInstances(ServiceContext context) throws
+      TimeoutException, InterruptedException {
+    // upgrade the instances
+    context.scheduler.getLiveInstances().forEach(((containerId, instance) -> {
+      ComponentInstanceEvent event = new ComponentInstanceEvent(containerId,
+          ComponentInstanceEventType.UPGRADE);
+      context.scheduler.getDispatcher().getEventHandler().handle(event);
+    }));
+
+    // become ready
+    context.scheduler.getLiveInstances().forEach(((containerId, instance) -> {
+      ComponentInstanceEvent event = new ComponentInstanceEvent(containerId,
+          ComponentInstanceEventType.BECOME_READY);
+
+      context.scheduler.getDispatcher().getEventHandler().handle(event);
+    }));
+    GenericTestUtils.waitFor(()-> {
+      for (ComponentInstance instance:
+          context.scheduler.getLiveInstances().values()) {
+        if (!instance.getContainerState().equals(ContainerState.READY)) {
+          return false;
+        }
+      }
+      return true;
+    }, CHECK_EVERY_MILLIS, TIMEOUT);
   }
 
-  private ServiceManager createTestServiceManager(String name)
-      throws IOException {
-    ServiceContext context = new ServiceContext();
-    context.service = createBaseDef(name);
-    context.fs = rule.getFs();
-
-    context.scheduler = new ServiceScheduler(context) {
-      @Override
-      protected YarnRegistryViewForProviders createYarnRegistryOperations(
-          ServiceContext context, RegistryOperations registryClient) {
-        return mock(YarnRegistryViewForProviders.class);
+  private void upgradeInstancesOf(ServiceContext context, String compName)
+      throws TimeoutException, InterruptedException {
+    Collection<ComponentInstance> compInstances = context.scheduler
+        .getAllComponents().get(compName).getAllComponentInstances();
+    GenericTestUtils.waitFor(() -> {
+      for (ComponentInstance instance : compInstances) {
+        if (!instance.getContainerState().equals(ContainerState.UPGRADING)) {
+          return false;
+        }
       }
-    };
+      return true;
+    }, CHECK_EVERY_MILLIS, TIMEOUT);
 
-    context.scheduler.init(rule.getConf());
+    // instances of comp1 get upgraded and become ready event is triggered
+    // become ready
+    compInstances.forEach(instance -> {
+      ComponentInstanceEvent event = new ComponentInstanceEvent(
+          instance.getContainer().getId(),
+          ComponentInstanceEventType.BECOME_READY);
 
-    Map<String, org.apache.hadoop.yarn.service.component.Component>
-        componentState = context.scheduler.getAllComponents();
-    context.service.getComponents().forEach(component -> {
-      componentState.put(component.getName(),
-          new org.apache.hadoop.yarn.service.component.Component(component,
-              1L, context));
+      context.scheduler.getDispatcher().getEventHandler().handle(event);
     });
-    return new ServiceManager(context);
+  }
+
+  private ServiceContext createServiceContext(String name)
+      throws Exception {
+    Service service  = createBaseDef(name);
+    ServiceContext context = new MockRunningServiceContext(rule,
+        service);
+    context.scheduler.getDispatcher().setDrainEventsOnStop();
+    context.scheduler.getDispatcher().start();
+    return context;
   }
 
   public static Service createBaseDef(String name) {
@@ -257,4 +332,6 @@ public class TestServiceManager {
         upgradedDef);
   }
 
+  private static final int TIMEOUT = 200000;
+  private static final int CHECK_EVERY_MILLIS = 100;
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e557c6bd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestYarnNativeServices.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestYarnNativeServices.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestYarnNativeServices.java
index 8b13b24..216d88f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestYarnNativeServices.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestYarnNativeServices.java
@@ -415,6 +415,41 @@ public class TestYarnNativeServices extends ServiceTestUtils {
     client.actionDestroy(service.getName());
   }
 
+  @Test(timeout = 200000)
+  public void testExpressUpgrade() throws Exception {
+    setupInternal(NUM_NMS);
+    getConf().setBoolean(YARN_SERVICE_UPGRADE_ENABLED, true);
+    ServiceClient client = createClient(getConf());
+
+    Service service = createExampleApplication();
+    client.actionCreate(service);
+    waitForServiceToBeStable(client, service);
+
+    // upgrade the service
+    Component component = service.getComponents().iterator().next();
+    service.setState(ServiceState.EXPRESS_UPGRADING);
+    service.setVersion("v2");
+    component.getConfiguration().getEnv().put("key1", "val1");
+    Component component2 = service.getComponent("compb");
+    component2.getConfiguration().getEnv().put("key2", "val2");
+    client.actionUpgradeExpress(service);
+
+    // wait for upgrade to complete
+    waitForServiceToBeStable(client, service);
+    Service active = client.getStatus(service.getName());
+    Assert.assertEquals("component not stable", ComponentState.STABLE,
+        active.getComponent(component.getName()).getState());
+    Assert.assertEquals("compa does not have new env", "val1",
+        active.getComponent(component.getName()).getConfiguration()
+            .getEnv("key1"));
+    Assert.assertEquals("compb does not have new env", "val2",
+        active.getComponent(component2.getName()).getConfiguration()
+            .getEnv("key2"));
+    LOG.info("Stop/destroy service {}", service);
+    client.actionStop(service.getName(), true);
+    client.actionDestroy(service.getName());
+  }
+
   // Test to verify ANTI_AFFINITY placement policy
   // 1. Start mini cluster with 3 NMs and scheduler placement-constraint handler
   // 2. Create an example service with 3 containers


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message