hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ey...@apache.org
Subject [2/3] hadoop git commit: YARN-7939. Added support to upgrade a component instance. Contributed by Chandni Singh
Date Thu, 26 Apr 2018 19:54:41 GMT
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b39d183d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java
index 782cc3b..5a85e8f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java
@@ -34,20 +34,23 @@ import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
 import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
 import org.apache.hadoop.yarn.event.AsyncDispatcher;
 import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.service.ServiceEvent;
+import org.apache.hadoop.yarn.service.ServiceEventType;
+import org.apache.hadoop.yarn.service.api.records.ContainerState;
+import org.apache.hadoop.yarn.service.api.records.ResourceInformation;
+import org.apache.hadoop.yarn.service.component.instance.ComponentInstance;
+import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceId;
 import org.apache.hadoop.yarn.service.ContainerFailureTracker;
 import org.apache.hadoop.yarn.service.ServiceContext;
-import org.apache.hadoop.yarn.service.ServiceMaster;
 import org.apache.hadoop.yarn.service.ServiceMetrics;
 import org.apache.hadoop.yarn.service.ServiceScheduler;
 import org.apache.hadoop.yarn.service.api.records.PlacementPolicy;
-import org.apache.hadoop.yarn.service.api.records.ResourceInformation;
 import org.apache.hadoop.yarn.service.api.records.ServiceState;
-import org.apache.hadoop.yarn.service.component.instance.ComponentInstance;
 import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEvent;
-import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceId;
 import org.apache.hadoop.yarn.service.conf.YarnServiceConf;
 import org.apache.hadoop.yarn.service.monitor.probe.MonitorUtils;
 import org.apache.hadoop.yarn.service.monitor.probe.Probe;
+import org.apache.hadoop.yarn.service.containerlaunch.ContainerLaunchService;
 import org.apache.hadoop.yarn.service.provider.ProviderUtils;
 import org.apache.hadoop.yarn.service.utils.ServiceUtils;
 import org.apache.hadoop.yarn.state.InvalidStateTransitionException;
@@ -70,6 +73,7 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -109,6 +113,10 @@ public class Component implements EventHandler<ComponentEvent> {
   // disk_failed containers etc. This will be reset to 0 periodically.
   public AtomicInteger currentContainerFailure = new AtomicInteger(0);
 
+  private AtomicBoolean upgradeInProgress = new AtomicBoolean(false);
+  private ComponentEvent upgradeEvent;
+  private AtomicLong numContainersThatNeedUpgrade = new AtomicLong(0);
+
   private StateMachine<ComponentState, ComponentEventType, ComponentEvent>
       stateMachine;
   private AsyncDispatcher dispatcher;
@@ -131,7 +139,7 @@ public class Component implements EventHandler<ComponentEvent> {
           .addTransition(FLEXING, FLEXING, CONTAINER_ALLOCATED,
               new ContainerAllocatedTransition())
           // container launched on NM
-          .addTransition(FLEXING, EnumSet.of(STABLE, FLEXING),
+          .addTransition(FLEXING, EnumSet.of(STABLE, FLEXING, UPGRADING),
               CONTAINER_STARTED, new ContainerStartedTransition())
           // container failed while flexing
           .addTransition(FLEXING, FLEXING, CONTAINER_COMPLETED,
@@ -151,12 +159,19 @@ public class Component implements EventHandler<ComponentEvent> {
           // For flex down, go to STABLE state
           .addTransition(STABLE, EnumSet.of(STABLE, FLEXING),
               FLEX, new FlexComponentTransition())
-          .addTransition(STABLE, UPGRADING, UPGRADE,
-              new ComponentNeedsUpgradeTransition())
-          .addTransition(FLEXING, UPGRADING, UPGRADE,
+          .addTransition(STABLE, UPGRADING, ComponentEventType.UPGRADE,
               new ComponentNeedsUpgradeTransition())
-          .addTransition(UPGRADING, UPGRADING, UPGRADE,
+          //Upgrade while previous upgrade is still in progress
+          .addTransition(UPGRADING, UPGRADING, ComponentEventType.UPGRADE,
               new ComponentNeedsUpgradeTransition())
+          .addTransition(UPGRADING, EnumSet.of(UPGRADING, FLEXING, STABLE),
+              CHECK_STABLE, new CheckStableTransition())
+          .addTransition(FLEXING, EnumSet.of(UPGRADING, FLEXING, STABLE),
+              CHECK_STABLE, new CheckStableTransition())
+          .addTransition(STABLE, EnumSet.of(STABLE), CHECK_STABLE,
+              new CheckStableTransition())
+          .addTransition(UPGRADING, FLEXING, CONTAINER_COMPLETED,
+              new ContainerCompletedTransition())
           .installTopology();
 
   public Component(
@@ -291,7 +306,10 @@ public class Component implements EventHandler<ComponentEvent> {
 
       component.pendingInstances.remove(instance);
       instance.setContainer(container);
-      ProviderUtils.initCompInstanceDir(component.getContext().fs, instance);
+
+      ProviderUtils.initCompInstanceDir(component.getContext().fs,
+          component.createLaunchContext(component.componentSpec,
+              component.scheduler.getApp().getVersion()), instance);
       component.getScheduler().addLiveCompInstance(container.getId(), instance);
       LOG.info("[COMPONENT {}]: Recovered {} for component instance {} on " +
               "host {}, num pending component instances reduced to {} ",
@@ -317,14 +335,21 @@ public class Component implements EventHandler<ComponentEvent> {
   private static ComponentState checkIfStable(Component component) {
     // if desired == running
     if (component.componentMetrics.containersReady.value() == component
-        .getComponentSpec().getNumberOfContainers()) {
+        .getComponentSpec().getNumberOfContainers() &&
+        component.numContainersThatNeedUpgrade.get() == 0) {
       component.componentSpec.setState(
           org.apache.hadoop.yarn.service.api.records.ComponentState.STABLE);
       return STABLE;
-    } else {
+    } else if (component.componentMetrics.containersReady.value() != component
+        .getComponentSpec().getNumberOfContainers()) {
       component.componentSpec.setState(
           org.apache.hadoop.yarn.service.api.records.ComponentState.FLEXING);
       return FLEXING;
+    } else {
+      //  component.numContainersThatNeedUpgrade.get() > 0
+      component.componentSpec.setState(org.apache.hadoop.yarn.service.api.
+          records.ComponentState.NEEDS_UPGRADE);
+      return UPGRADING;
     }
   }
 
@@ -336,8 +361,9 @@ public class Component implements EventHandler<ComponentEvent> {
         component.componentSpec.getState();
     if (isIncrement) {
       // check if all containers are in READY state
-      if (component.componentMetrics.containersReady
-          .value() == component.componentMetrics.containersDesired.value()) {
+      if (component.numContainersThatNeedUpgrade.get() == 0 &&
+          component.componentMetrics.containersReady.value() ==
+              component.componentMetrics.containersDesired.value()) {
         component.componentSpec.setState(
             org.apache.hadoop.yarn.service.api.records.ComponentState.STABLE);
         if (curState != component.componentSpec.getState()) {
@@ -346,8 +372,7 @@ public class Component implements EventHandler<ComponentEvent> {
               component.componentSpec.getState());
         }
         // component state change will trigger re-check of service state
-        ServiceMaster.checkAndUpdateServiceState(component.scheduler,
-            isIncrement);
+        component.context.getServiceManager().checkAndUpdateServiceState(true);
       }
     } else {
       // container moving out of READY state could be because of FLEX down so
@@ -362,10 +387,13 @@ public class Component implements EventHandler<ComponentEvent> {
               component.componentSpec.getState());
         }
         // component state change will trigger re-check of service state
-        ServiceMaster.checkAndUpdateServiceState(component.scheduler,
-            isIncrement);
+        component.context.getServiceManager().checkAndUpdateServiceState(false);
       }
     }
+    // when the service is stable then the state of component needs to
+    // transition to stable
+    component.dispatcher.getEventHandler().handle(new ComponentEvent(
+        component.getName(), ComponentEventType.CHECK_STABLE));
   }
 
   private static class ContainerCompletedTransition extends BaseTransition {
@@ -377,15 +405,52 @@ public class Component implements EventHandler<ComponentEvent> {
               STOP).setStatus(event.getStatus()));
       component.componentSpec.setState(
           org.apache.hadoop.yarn.service.api.records.ComponentState.FLEXING);
-      component.getScheduler().getApp().setState(ServiceState.STARTED);
+      if (component.context.service.getState().equals(ServiceState.STABLE)) {
+        component.getScheduler().getApp().setState(ServiceState.STARTED);
+        LOG.info("Service def state changed from {} -> {}",
+            ServiceState.STABLE, ServiceState.STARTED);
+      }
     }
   }
 
   private static class ComponentNeedsUpgradeTransition extends BaseTransition {
     @Override
     public void transition(Component component, ComponentEvent event) {
+      component.upgradeInProgress.set(true);
       component.componentSpec.setState(org.apache.hadoop.yarn.service.api.
           records.ComponentState.NEEDS_UPGRADE);
+      component.numContainersThatNeedUpgrade.set(
+          component.componentSpec.getNumberOfContainers());
+      component.componentSpec.getContainers().forEach(container ->
+          container.setState(ContainerState.NEEDS_UPGRADE));
+      component.upgradeEvent = event;
+    }
+  }
+
+  private static class CheckStableTransition implements MultipleArcTransition
+      <Component, ComponentEvent, ComponentState> {
+
+    @Override
+    public ComponentState transition(Component component,
+        ComponentEvent componentEvent) {
+      org.apache.hadoop.yarn.service.api.records.ComponentState currState =
+          component.componentSpec.getState();
+      if (currState.equals(org.apache.hadoop.yarn.service.api.records
+          .ComponentState.STABLE)) {
+        return ComponentState.STABLE;
+      }
+      // checkIfStable also updates the state in definition when STABLE
+      ComponentState targetState = checkIfStable(component);
+      if (targetState.equals(STABLE) && component.upgradeInProgress.get()) {
+        component.componentSpec.overwrite(
+            component.upgradeEvent.getTargetSpec());
+        component.upgradeEvent = null;
+        ServiceEvent checkStable = new ServiceEvent(ServiceEventType.
+            CHECK_STABLE);
+        component.dispatcher.getEventHandler().handle(checkStable);
+        component.upgradeInProgress.set(false);
+      }
+      return targetState;
     }
   }
 
@@ -421,8 +486,28 @@ public class Component implements EventHandler<ComponentEvent> {
         "[COMPONENT {}]: Assigned {} to component instance {} and launch on host {} ",
         getName(), container.getId(), instance.getCompInstanceName(),
         container.getNodeId());
-    scheduler.getContainerLaunchService()
-        .launchCompInstance(scheduler.getApp(), instance, container);
+    if (upgradeInProgress.get()) {
+      scheduler.getContainerLaunchService()
+          .launchCompInstance(scheduler.getApp(), instance, container,
+              createLaunchContext(upgradeEvent.getTargetSpec(),
+                  upgradeEvent.getUpgradeVersion()));
+    } else {
+      scheduler.getContainerLaunchService().launchCompInstance(
+          scheduler.getApp(), instance, container,
+          createLaunchContext(componentSpec, scheduler.getApp().getVersion()));
+    }
+  }
+
+  public ContainerLaunchService.ComponentLaunchContext createLaunchContext(
+      org.apache.hadoop.yarn.service.api.records.Component compSpec,
+      String version) {
+    ContainerLaunchService.ComponentLaunchContext launchContext =
+        new ContainerLaunchService.ComponentLaunchContext(compSpec.getName(),
+            version);
+    launchContext.setArtifact(compSpec.getArtifact())
+        .setConfiguration(compSpec.getConfiguration())
+        .setLaunchCommand(compSpec.getLaunchCommand());
+    return launchContext;
   }
 
   @SuppressWarnings({ "unchecked" })
@@ -661,16 +746,24 @@ public class Component implements EventHandler<ComponentEvent> {
     scheduler.getServiceMetrics().containersRunning.decr();
   }
 
-  public void incContainersReady() {
+  public void incContainersReady(boolean updateDefinition) {
     componentMetrics.containersReady.incr();
     scheduler.getServiceMetrics().containersReady.incr();
-    checkAndUpdateComponentState(this, true);
+    if (updateDefinition) {
+      checkAndUpdateComponentState(this, true);
+    }
   }
 
-  public void decContainersReady() {
+  public void decContainersReady(boolean updateDefinition) {
     componentMetrics.containersReady.decr();
     scheduler.getServiceMetrics().containersReady.decr();
-    checkAndUpdateComponentState(this, false);
+    if (updateDefinition) {
+      checkAndUpdateComponentState(this, false);
+    }
+  }
+
+  public void decContainersThatNeedUpgrade() {
+    numContainersThatNeedUpgrade.decrementAndGet();
   }
 
   public int getNumReadyInstances() {
@@ -729,6 +822,16 @@ public class Component implements EventHandler<ComponentEvent> {
       this.readLock.unlock();
     }
   }
+
+  public ComponentEvent getUpgradeEvent() {
+    this.readLock.lock();
+    try {
+      return upgradeEvent;
+    } finally {
+      this.readLock.unlock();
+    }
+  }
+
   public ServiceScheduler getScheduler() {
     return scheduler;
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b39d183d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentEvent.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentEvent.java
index 7bd5cb9..84caa77 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentEvent.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentEvent.java
@@ -34,6 +34,7 @@ public class ComponentEvent extends AbstractEvent<ComponentEventType> {
   private ContainerStatus status;
   private ContainerId containerId;
   private org.apache.hadoop.yarn.service.api.records.Component targetSpec;
+  private String upgradeVersion;
 
   public ContainerId getContainerId() {
     return containerId;
@@ -103,4 +104,13 @@ public class ComponentEvent extends AbstractEvent<ComponentEventType> {
     this.targetSpec = Preconditions.checkNotNull(targetSpec);
     return this;
   }
+
+  public String getUpgradeVersion() {
+    return upgradeVersion;
+  }
+
+  public ComponentEvent setUpgradeVersion(String upgradeVersion) {
+    this.upgradeVersion = upgradeVersion;
+    return this;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b39d183d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentEventType.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentEventType.java
index 970788a..44d781f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentEventType.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentEventType.java
@@ -25,5 +25,5 @@ public enum ComponentEventType {
   CONTAINER_STARTED,
   CONTAINER_COMPLETED,
   UPGRADE,
-  STOP_UPGRADE
+  CHECK_STABLE
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b39d183d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java
index c57d888..ffb9d76 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java
@@ -41,6 +41,8 @@ import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.apache.hadoop.yarn.service.ServiceScheduler;
 import org.apache.hadoop.yarn.service.api.records.ContainerState;
 import org.apache.hadoop.yarn.service.component.Component;
+import org.apache.hadoop.yarn.service.component.ComponentEvent;
+import org.apache.hadoop.yarn.service.component.ComponentEventType;
 import org.apache.hadoop.yarn.service.monitor.probe.ProbeStatus;
 import org.apache.hadoop.yarn.service.registry.YarnRegistryViewForProviders;
 import org.apache.hadoop.yarn.service.timelineservice.ServiceTimelinePublisher;
@@ -116,10 +118,15 @@ public class ComponentInstance implements EventHandler<ComponentInstanceEvent>,
       .addTransition(READY, STARTED, BECOME_NOT_READY,
           new ContainerBecomeNotReadyTransition())
       .addTransition(READY, INIT, STOP, new ContainerStoppedTransition())
+      .addTransition(READY, UPGRADING, UPGRADE,
+          new ContainerUpgradeTransition())
+      .addTransition(UPGRADING, UPGRADING, UPGRADE,
+          new ContainerUpgradeTransition())
+      .addTransition(UPGRADING, READY, BECOME_READY,
+          new ContainerBecomeReadyTransition())
+      .addTransition(UPGRADING, INIT, STOP, new ContainerStoppedTransition())
       .installTopology();
 
-
-
   public ComponentInstance(Component component,
       ComponentInstanceId compInstanceId) {
     this.stateMachine = stateMachineFactory.make(this);
@@ -186,7 +193,17 @@ public class ComponentInstance implements EventHandler<ComponentInstanceEvent>,
     public void transition(ComponentInstance compInstance,
         ComponentInstanceEvent event) {
       compInstance.containerSpec.setState(ContainerState.READY);
-      compInstance.component.incContainersReady();
+      if (compInstance.getState().equals(ComponentInstanceState.UPGRADING)) {
+        compInstance.component.incContainersReady(false);
+        compInstance.component.decContainersThatNeedUpgrade();
+        ComponentEvent checkState = new ComponentEvent(
+            compInstance.component.getName(), ComponentEventType.CHECK_STABLE);
+        compInstance.scheduler.getDispatcher().getEventHandler().handle(
+            checkState);
+
+      } else {
+        compInstance.component.incContainersReady(true);
+      }
       if (compInstance.timelineServiceEnabled) {
         compInstance.serviceTimelinePublisher
             .componentInstanceBecomeReady(compInstance.containerSpec);
@@ -199,7 +216,7 @@ public class ComponentInstance implements EventHandler<ComponentInstanceEvent>,
     public void transition(ComponentInstance compInstance,
         ComponentInstanceEvent event) {
       compInstance.containerSpec.setState(ContainerState.RUNNING_BUT_UNREADY);
-      compInstance.component.decContainersReady();
+      compInstance.component.decContainersReady(true);
     }
   }
 
@@ -225,9 +242,11 @@ public class ComponentInstance implements EventHandler<ComponentInstanceEvent>,
               .getDiagnostics();
       compInstance.diagnostics.append(containerDiag + System.lineSeparator());
       compInstance.cancelContainerStatusRetriever();
-
+      if (compInstance.getState().equals(ComponentInstanceState.UPGRADING)) {
+        compInstance.component.decContainersThatNeedUpgrade();
+      }
       if (compInstance.getState().equals(READY)) {
-        compInstance.component.decContainersReady();
+        compInstance.component.decContainersReady(true);
       }
       compInstance.component.decRunningContainers();
       boolean shouldExit = false;
@@ -287,6 +306,23 @@ public class ComponentInstance implements EventHandler<ComponentInstanceEvent>,
     }
   }
 
+  private static class ContainerUpgradeTransition extends BaseTransition {
+
+    @Override
+    public void transition(ComponentInstance compInstance,
+        ComponentInstanceEvent event) {
+      compInstance.containerSpec.setState(ContainerState.UPGRADING);
+      compInstance.component.decContainersReady(false);
+      ComponentEvent upgradeEvent = compInstance.component.getUpgradeEvent();
+      compInstance.scheduler.getContainerLaunchService()
+          .reInitCompInstance(compInstance.scheduler.getApp(), compInstance,
+              compInstance.container,
+              compInstance.component.createLaunchContext(
+                  upgradeEvent.getTargetSpec(),
+                  upgradeEvent.getUpgradeVersion()));
+    }
+  }
+
   public ComponentInstanceState getState() {
     this.readLock.lock();
 
@@ -422,7 +458,7 @@ public class ComponentInstance implements EventHandler<ComponentInstanceEvent>,
       component.decRunningContainers();
     }
     if (getState() == READY) {
-      component.decContainersReady();
+      component.decContainersReady(true);
       component.decRunningContainers();
     }
     getCompSpec().removeContainer(containerSpec);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b39d183d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstanceEventType.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstanceEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstanceEventType.java
index 1a880ba..665b8fa 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstanceEventType.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstanceEventType.java
@@ -22,5 +22,6 @@ public enum ComponentInstanceEventType {
   START,
   STOP,
   BECOME_READY,
-  BECOME_NOT_READY
+  BECOME_NOT_READY,
+  UPGRADE
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b39d183d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/conf/RestApiConstants.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/conf/RestApiConstants.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/conf/RestApiConstants.java
index 243fc52..bd1e9e7 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/conf/RestApiConstants.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/conf/RestApiConstants.java
@@ -17,6 +17,8 @@
 
 package org.apache.hadoop.yarn.service.conf;
 
+import javax.ws.rs.core.MediaType;
+
 public interface RestApiConstants {
 
   // Rest endpoints
@@ -26,9 +28,19 @@ public interface RestApiConstants {
   String SERVICE_PATH = "/services/{service_name}";
   String COMPONENT_PATH = "/services/{service_name}/components/{component_name}";
 
+  String COMP_INSTANCE_PATH = SERVICE_PATH +
+      "/component-instances/{component_instance_name}";
+  String COMP_INSTANCE_LONG_PATH = COMPONENT_PATH +
+      "/component-instances/{component_instance_name}";
+  String COMP_INSTANCES = "component-instances";
+  String COMP_INSTANCES_PATH = SERVICE_PATH + "/" + COMP_INSTANCES;
+
   // Query param
   String SERVICE_NAME = "service_name";
   String COMPONENT_NAME = "component_name";
+  String COMP_INSTANCE_NAME = "component_instance_name";
+
+  String MEDIA_TYPE_JSON_UTF8 = MediaType.APPLICATION_JSON + ";charset=utf-8";
 
   Long DEFAULT_UNLIMITED_LIFETIME = -1l;
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b39d183d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/containerlaunch/ContainerLaunchService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/containerlaunch/ContainerLaunchService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/containerlaunch/ContainerLaunchService.java
index e07661b..084c721 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/containerlaunch/ContainerLaunchService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/containerlaunch/ContainerLaunchService.java
@@ -18,11 +18,12 @@
 
 package org.apache.hadoop.yarn.service.containerlaunch;
 
+import com.google.common.base.Preconditions;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.service.AbstractService;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.service.ServiceContext;
-import org.apache.hadoop.yarn.service.api.records.Component;
+import org.apache.hadoop.yarn.service.api.records.Artifact;
 import org.apache.hadoop.yarn.service.component.instance.ComponentInstance;
 import org.apache.hadoop.yarn.service.provider.ProviderService;
 import org.apache.hadoop.yarn.service.provider.ProviderFactory;
@@ -63,36 +64,57 @@ public class ContainerLaunchService extends AbstractService{
   }
 
   public void launchCompInstance(Service service,
-      ComponentInstance instance, Container container) {
+      ComponentInstance instance, Container container,
+      ComponentLaunchContext componentLaunchContext) {
     ContainerLauncher launcher =
-        new ContainerLauncher(service, instance, container);
+        new ContainerLauncher(service, instance, container,
+            componentLaunchContext, false);
     executorService.execute(launcher);
   }
 
+  public void reInitCompInstance(Service service,
+      ComponentInstance instance, Container container,
+      ComponentLaunchContext componentLaunchContext) {
+    ContainerLauncher reInitializer = new ContainerLauncher(service, instance,
+        container, componentLaunchContext, true);
+    executorService.execute(reInitializer);
+  }
+
   private class ContainerLauncher implements Runnable {
     public final Container container;
     public final Service service;
     public ComponentInstance instance;
+    private final ComponentLaunchContext componentLaunchContext;
+    private final boolean reInit;
 
-    public ContainerLauncher(
-        Service service,
-        ComponentInstance instance, Container container) {
+    ContainerLauncher(Service service, ComponentInstance instance,
+        Container container, ComponentLaunchContext componentLaunchContext,
+        boolean reInit) {
       this.container = container;
       this.service = service;
       this.instance = instance;
+      this.componentLaunchContext = componentLaunchContext;
+      this.reInit = reInit;
     }
 
     @Override public void run() {
-      Component compSpec = instance.getCompSpec();
       ProviderService provider = ProviderFactory.getProviderService(
-          compSpec.getArtifact());
+          componentLaunchContext.getArtifact());
       AbstractLauncher launcher = new AbstractLauncher(context);
       try {
         provider.buildContainerLaunchContext(launcher, service,
-            instance, fs, getConfig(), container);
-        instance.getComponent().getScheduler().getNmClient()
-            .startContainerAsync(container,
-                launcher.completeContainerLaunch());
+            instance, fs, getConfig(), container, componentLaunchContext);
+        if (!reInit) {
+          LOG.info("launching container {}", container.getId());
+          instance.getComponent().getScheduler().getNmClient()
+              .startContainerAsync(container,
+                  launcher.completeContainerLaunch());
+        } else {
+          LOG.info("reInitializing container {}", container.getId());
+          instance.getComponent().getScheduler().getNmClient()
+              .reInitializeContainerAsync(container.getId(),
+                  launcher.completeContainerLaunch(), true);
+        }
       } catch (Exception e) {
         LOG.error(instance.getCompInstanceId()
             + ": Failed to launch container. ", e);
@@ -100,4 +122,58 @@ public class ContainerLaunchService extends AbstractService{
       }
     }
   }
+
+  /**
+   * Launch context of a component.
+   */
+  public static class ComponentLaunchContext {
+    private final String name;
+    private final String serviceVersion;
+    private Artifact artifact;
+    private org.apache.hadoop.yarn.service.api.records.Configuration
+        configuration;
+    private String launchCommand;
+
+    public ComponentLaunchContext(String name, String serviceVersion) {
+      this.name = Preconditions.checkNotNull(name);
+      this.serviceVersion = Preconditions.checkNotNull(serviceVersion);
+    }
+
+    public String getName() {
+      return name;
+    }
+
+    public String getServiceVersion() {
+      return serviceVersion;
+    }
+
+    public Artifact getArtifact() {
+      return artifact;
+    }
+
+    public org.apache.hadoop.yarn.service.api.records.
+        Configuration getConfiguration() {
+      return configuration;
+    }
+
+    public String getLaunchCommand() {
+      return launchCommand;
+    }
+
+    public ComponentLaunchContext setArtifact(Artifact artifact) {
+      this.artifact = artifact;
+      return this;
+    }
+
+    public ComponentLaunchContext setConfiguration(org.apache.hadoop.yarn.
+        service.api.records.Configuration configuration) {
+      this.configuration = configuration;
+      return this;
+    }
+
+    public ComponentLaunchContext setLaunchCommand(String launchCommand) {
+      this.launchCommand = launchCommand;
+      return this;
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b39d183d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/impl/pb/client/ClientAMProtocolPBClientImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/impl/pb/client/ClientAMProtocolPBClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/impl/pb/client/ClientAMProtocolPBClientImpl.java
index 8152225..e82181e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/impl/pb/client/ClientAMProtocolPBClientImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/impl/pb/client/ClientAMProtocolPBClientImpl.java
@@ -30,6 +30,8 @@ import java.io.Closeable;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 
+import org.apache.hadoop.yarn.proto.ClientAMProtocol.CompInstancesUpgradeResponseProto;
+import org.apache.hadoop.yarn.proto.ClientAMProtocol.CompInstancesUpgradeRequestProto;
 import org.apache.hadoop.yarn.proto.ClientAMProtocol.FlexComponentsRequestProto;
 import org.apache.hadoop.yarn.proto.ClientAMProtocol.FlexComponentsResponseProto;
 import org.apache.hadoop.yarn.proto.ClientAMProtocol.GetStatusRequestProto;
@@ -114,4 +116,16 @@ public class ClientAMProtocolPBClientImpl
     }
     return null;
   }
+
+  @Override
+  public CompInstancesUpgradeResponseProto upgrade(
+      CompInstancesUpgradeRequestProto request)
+      throws IOException, YarnException {
+    try {
+      return proxy.upgrade(null, request);
+    } catch (ServiceException e) {
+      RPCUtil.unwrapAndThrowException(e);
+    }
+    return null;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b39d183d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/impl/pb/service/ClientAMProtocolPBServiceImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/impl/pb/service/ClientAMProtocolPBServiceImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/impl/pb/service/ClientAMProtocolPBServiceImpl.java
index 1a1a1ef..50a678b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/impl/pb/service/ClientAMProtocolPBServiceImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/impl/pb/service/ClientAMProtocolPBServiceImpl.java
@@ -21,6 +21,8 @@ package org.apache.hadoop.yarn.service.impl.pb.service;
 import com.google.protobuf.RpcController;
 import com.google.protobuf.ServiceException;
 import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.proto.ClientAMProtocol.CompInstancesUpgradeRequestProto;
+import org.apache.hadoop.yarn.proto.ClientAMProtocol.CompInstancesUpgradeResponseProto;
 import org.apache.hadoop.yarn.proto.ClientAMProtocol.FlexComponentsRequestProto;
 import org.apache.hadoop.yarn.proto.ClientAMProtocol.FlexComponentsResponseProto;
 import org.apache.hadoop.yarn.proto.ClientAMProtocol.GetStatusRequestProto;
@@ -91,4 +93,14 @@ public class ClientAMProtocolPBServiceImpl implements ClientAMProtocolPB {
       throw new ServiceException(e);
     }
   }
+
+  @Override
+  public CompInstancesUpgradeResponseProto upgrade(RpcController controller,
+      CompInstancesUpgradeRequestProto request) throws ServiceException {
+    try {
+      return real.upgrade(request);
+    } catch (IOException | YarnException e) {
+      throw new ServiceException(e);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b39d183d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/provider/AbstractProviderService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/provider/AbstractProviderService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/provider/AbstractProviderService.java
index ee27686..560f421 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/provider/AbstractProviderService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/provider/AbstractProviderService.java
@@ -23,8 +23,8 @@ import org.apache.hadoop.yarn.api.ApplicationConstants;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.service.api.records.Service;
 import org.apache.hadoop.yarn.service.conf.YarnServiceConf;
-import org.apache.hadoop.yarn.service.api.records.Component;
 import org.apache.hadoop.yarn.service.conf.YarnServiceConstants;
+import org.apache.hadoop.yarn.service.containerlaunch.ContainerLaunchService;
 import org.apache.hadoop.yarn.service.utils.SliderFileSystem;
 import org.apache.hadoop.yarn.service.utils.ServiceUtils;
 import org.apache.hadoop.yarn.service.exceptions.SliderException;
@@ -60,9 +60,9 @@ public abstract class AbstractProviderService implements ProviderService,
 
   public void buildContainerLaunchContext(AbstractLauncher launcher,
       Service service, ComponentInstance instance,
-      SliderFileSystem fileSystem, Configuration yarnConf, Container container)
+      SliderFileSystem fileSystem, Configuration yarnConf, Container container,
+      ContainerLaunchService.ComponentLaunchContext compLaunchContext)
       throws IOException, SliderException {
-    Component component = instance.getComponent().getComponentSpec();;
     processArtifact(launcher, instance, fileSystem, service);
 
     ServiceContext context =
@@ -72,11 +72,12 @@ public abstract class AbstractProviderService implements ProviderService,
     Map<String, String> globalTokens =
         instance.getComponent().getScheduler().globalTokens;
     Map<String, String> tokensForSubstitution = ProviderUtils
-        .initCompTokensForSubstitute(instance, container);
+        .initCompTokensForSubstitute(instance, container,
+            compLaunchContext);
     tokensForSubstitution.putAll(globalTokens);
     // Set the environment variables in launcher
-    launcher.putEnv(ServiceUtils
-        .buildEnvMap(component.getConfiguration(), tokensForSubstitution));
+    launcher.putEnv(ServiceUtils.buildEnvMap(
+        compLaunchContext.getConfiguration(), tokensForSubstitution));
     launcher.setEnv("WORK_DIR", ApplicationConstants.Environment.PWD.$());
     launcher.setEnv("LOG_DIR", ApplicationConstants.LOG_DIR_EXPANSION_VAR);
     if (System.getenv(HADOOP_USER_NAME) != null) {
@@ -94,10 +95,10 @@ public abstract class AbstractProviderService implements ProviderService,
 
     // create config file on hdfs and add local resource
     ProviderUtils.createConfigFileAndAddLocalResource(launcher, fileSystem,
-        component, tokensForSubstitution, instance, context);
+        compLaunchContext, tokensForSubstitution, instance, context);
 
     // substitute launch command
-    String launchCommand = component.getLaunchCommand();
+    String launchCommand = compLaunchContext.getLaunchCommand();
     // docker container may have empty commands
     if (!StringUtils.isEmpty(launchCommand)) {
       launchCommand = ProviderUtils
@@ -111,12 +112,12 @@ public abstract class AbstractProviderService implements ProviderService,
     // By default retry forever every 30 seconds
     launcher.setRetryContext(
         YarnServiceConf.getInt(CONTAINER_RETRY_MAX, DEFAULT_CONTAINER_RETRY_MAX,
-            component.getConfiguration(), yarnConf),
+            compLaunchContext.getConfiguration(), yarnConf),
         YarnServiceConf.getInt(CONTAINER_RETRY_INTERVAL,
-            DEFAULT_CONTAINER_RETRY_INTERVAL, component.getConfiguration(),
-            yarnConf),
+            DEFAULT_CONTAINER_RETRY_INTERVAL,
+            compLaunchContext.getConfiguration(), yarnConf),
         YarnServiceConf.getLong(CONTAINER_FAILURES_VALIDITY_INTERVAL,
             DEFAULT_CONTAINER_FAILURES_VALIDITY_INTERVAL,
-            component.getConfiguration(), yarnConf));
+            compLaunchContext.getConfiguration(), yarnConf));
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b39d183d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/provider/ProviderService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/provider/ProviderService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/provider/ProviderService.java
index 11015ea..fe765de 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/provider/ProviderService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/provider/ProviderService.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.service.provider;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.service.api.records.Service;
+import org.apache.hadoop.yarn.service.containerlaunch.ContainerLaunchService;
 import org.apache.hadoop.yarn.service.utils.SliderFileSystem;
 import org.apache.hadoop.yarn.service.exceptions.SliderException;
 import org.apache.hadoop.yarn.service.containerlaunch.AbstractLauncher;
@@ -35,6 +36,8 @@ public interface ProviderService {
    */
   void buildContainerLaunchContext(AbstractLauncher containerLauncher,
       Service service, ComponentInstance instance,
-      SliderFileSystem sliderFileSystem, Configuration yarnConf, Container
-      container) throws IOException, SliderException;
+      SliderFileSystem sliderFileSystem, Configuration yarnConf,
+      Container container,
+      ContainerLaunchService.ComponentLaunchContext componentLaunchContext)
+      throws IOException, SliderException;
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b39d183d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/provider/ProviderUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/provider/ProviderUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/provider/ProviderUtils.java
index d65a196..2fc8cfb 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/provider/ProviderUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/provider/ProviderUtils.java
@@ -27,12 +27,12 @@ import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.hadoop.yarn.api.records.LocalResourceType;
 import org.apache.hadoop.yarn.service.ServiceContext;
-import org.apache.hadoop.yarn.service.api.records.Component;
 import org.apache.hadoop.yarn.service.api.records.ConfigFile;
 import org.apache.hadoop.yarn.service.api.records.ConfigFormat;
 import org.apache.hadoop.yarn.service.component.instance.ComponentInstance;
 import org.apache.hadoop.yarn.service.conf.YarnServiceConstants;
 import org.apache.hadoop.yarn.service.containerlaunch.AbstractLauncher;
+import org.apache.hadoop.yarn.service.containerlaunch.ContainerLaunchService;
 import org.apache.hadoop.yarn.service.exceptions.BadCommandArgumentsException;
 import org.apache.hadoop.yarn.service.exceptions.SliderException;
 import org.apache.hadoop.yarn.service.utils.PublishedConfiguration;
@@ -51,7 +51,11 @@ import java.util.Map;
 import java.util.concurrent.ExecutionException;
 import java.util.regex.Pattern;
 
-import static org.apache.hadoop.yarn.service.api.ServiceApiConstants.*;
+import static org.apache.hadoop.yarn.service.api.ServiceApiConstants.COMPONENT_ID;
+import static org.apache.hadoop.yarn.service.api.ServiceApiConstants.COMPONENT_INSTANCE_NAME;
+import static org.apache.hadoop.yarn.service.api.ServiceApiConstants.COMPONENT_NAME;
+import static org.apache.hadoop.yarn.service.api.ServiceApiConstants.COMPONENT_NAME_LC;
+import static org.apache.hadoop.yarn.service.api.ServiceApiConstants.CONTAINER_ID;
 
 /**
  * This is a factoring out of methods handy for providers. It's bonded to a log
@@ -160,9 +164,11 @@ public class ProviderUtils implements YarnServiceConstants {
   }
 
   public static Path initCompInstanceDir(SliderFileSystem fs,
+      ContainerLaunchService.ComponentLaunchContext compLaunchContext,
       ComponentInstance instance) {
     Path compDir = new Path(new Path(fs.getAppDir(), "components"),
-        instance.getCompName());
+        compLaunchContext.getServiceVersion() + "/" +
+            compLaunchContext.getName());
     Path compInstanceDir = new Path(compDir, instance.getCompInstanceName());
     instance.setCompInstanceDir(compInstanceDir);
     return compInstanceDir;
@@ -171,10 +177,11 @@ public class ProviderUtils implements YarnServiceConstants {
   // 1. Create all config files for a component on hdfs for localization
   // 2. Add the config file to localResource
   public static synchronized void createConfigFileAndAddLocalResource(
-      AbstractLauncher launcher, SliderFileSystem fs, Component component,
+      AbstractLauncher launcher, SliderFileSystem fs,
+      ContainerLaunchService.ComponentLaunchContext compLaunchContext,
       Map<String, String> tokensForSubstitution, ComponentInstance instance,
       ServiceContext context) throws IOException {
-    Path compInstanceDir = initCompInstanceDir(fs, instance);
+    Path compInstanceDir = initCompInstanceDir(fs, compLaunchContext, instance);
     if (!fs.getFileSystem().exists(compInstanceDir)) {
       log.info(instance.getCompInstanceId() + ": Creating dir on hdfs: " + compInstanceDir);
       fs.getFileSystem().mkdirs(compInstanceDir,
@@ -189,7 +196,8 @@ public class ProviderUtils implements YarnServiceConstants {
           + tokensForSubstitution);
     }
 
-    for (ConfigFile originalFile : component.getConfiguration().getFiles()) {
+    for (ConfigFile originalFile : compLaunchContext.getConfiguration()
+        .getFiles()) {
       ConfigFile configFile = originalFile.copy();
       String fileName = new Path(configFile.getDestFile()).getName();
 
@@ -343,11 +351,12 @@ public class ProviderUtils implements YarnServiceConstants {
    * @return tokens to replace
    */
   public static Map<String, String> initCompTokensForSubstitute(
-      ComponentInstance instance, Container container) {
+      ComponentInstance instance, Container container,
+      ContainerLaunchService.ComponentLaunchContext componentLaunchContext) {
     Map<String, String> tokens = new HashMap<>();
-    tokens.put(COMPONENT_NAME, instance.getCompSpec().getName());
+    tokens.put(COMPONENT_NAME, componentLaunchContext.getName());
     tokens
-        .put(COMPONENT_NAME_LC, instance.getCompSpec().getName().toLowerCase());
+        .put(COMPONENT_NAME_LC, componentLaunchContext.getName().toLowerCase());
     tokens.put(COMPONENT_INSTANCE_NAME, instance.getCompInstanceName());
     tokens.put(CONTAINER_ID, container.getId().toString());
     tokens.put(COMPONENT_ID,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b39d183d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/utils/ServiceApiUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/utils/ServiceApiUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/utils/ServiceApiUtil.java
index 194ae83..33919ba 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/utils/ServiceApiUtil.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/utils/ServiceApiUtil.java
@@ -19,6 +19,8 @@
 package org.apache.hadoop.yarn.service.utils;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.Multimap;
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -27,12 +29,13 @@ import org.apache.hadoop.registry.client.api.RegistryConstants;
 import org.apache.hadoop.registry.client.binding.RegistryUtils;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.service.api.records.Container;
+import org.apache.hadoop.yarn.service.api.records.Service;
 import org.apache.hadoop.yarn.service.api.records.Artifact;
 import org.apache.hadoop.yarn.service.api.records.Component;
 import org.apache.hadoop.yarn.service.api.records.Configuration;
 import org.apache.hadoop.yarn.service.api.records.PlacementConstraint;
 import org.apache.hadoop.yarn.service.api.records.Resource;
-import org.apache.hadoop.yarn.service.api.records.Service;
 import org.apache.hadoop.yarn.service.exceptions.SliderException;
 import org.apache.hadoop.yarn.service.conf.RestApiConstants;
 import org.apache.hadoop.yarn.service.exceptions.RestApiErrorMessages;
@@ -66,6 +69,8 @@ public class ServiceApiUtil {
   private static final PatternValidator userNamePattern
       = new PatternValidator("[a-z][a-z0-9-.]*");
 
+
+
   @VisibleForTesting
   public static void setJsonSerDeser(JsonSerDeser jsd) {
     jsonSerDeser = jsd;
@@ -496,6 +501,47 @@ public class ServiceApiUtil {
     return appJson;
   }
 
+  public static List<Container> getLiveContainers(Service service,
+      List<String> componentInstances)
+      throws YarnException {
+    List<Container> result = new ArrayList<>();
+
+    // In order to avoid iterating over all the containers of all components,
+    // first find the affected components by parsing the instance name.
+    Multimap<String, String> affectedComps = ArrayListMultimap.create();
+    for (String instanceName : componentInstances) {
+      affectedComps.put(
+          ServiceApiUtil.parseComponentName(instanceName), instanceName);
+    }
+
+    service.getComponents().forEach(comp -> {
+      // Iterating once over the containers of the affected component to
+      // find all the containers. Avoiding multiple calls to
+      // service.getComponent(...) and component.getContainer(...) because they
+      // iterate over all the components of the service and all the containers
+      // of the components respectively.
+      if (affectedComps.get(comp.getName()) != null) {
+        Collection<String> instanceNames = affectedComps.get(comp.getName());
+        comp.getContainers().forEach(container -> {
+          if (instanceNames.contains(container.getComponentInstanceName())) {
+            result.add(container);
+          }
+        });
+      }
+    });
+    return result;
+  }
+
+  private static String parseComponentName(String componentInstanceName)
+      throws YarnException {
+    int idx = componentInstanceName.lastIndexOf('-');
+    if (idx == -1) {
+      throw new YarnException("Invalid component instance (" +
+          componentInstanceName + ") name.");
+    }
+    return componentInstanceName.substring(0, idx);
+  }
+
   public static String $(String s) {
     return "${" + s +"}";
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b39d183d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/proto/ClientAMProtocol.proto
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/proto/ClientAMProtocol.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/proto/ClientAMProtocol.proto
index 3677593..91721b0 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/proto/ClientAMProtocol.proto
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/proto/ClientAMProtocol.proto
@@ -30,6 +30,8 @@ service ClientAMProtocolService {
     returns (UpgradeServiceResponseProto);
   rpc restartService(RestartServiceRequestProto)
     returns (RestartServiceResponseProto);
+  rpc upgrade(CompInstancesUpgradeRequestProto) returns
+    (CompInstancesUpgradeResponseProto);
 }
 
 message FlexComponentsRequestProto {
@@ -61,13 +63,22 @@ message StopResponseProto {
 
 message UpgradeServiceRequestProto {
   optional string version = 1;
+  optional bool autoFinalize = 2;
 }
 
 message UpgradeServiceResponseProto {
+  optional string error = 1;
 }
 
 message RestartServiceRequestProto {
 }
 
 message RestartServiceResponseProto {
+}
+
+message CompInstancesUpgradeRequestProto {
+    repeated string containerIds = 1;
+}
+
+message CompInstancesUpgradeResponseProto {
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b39d183d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestServiceAM.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestServiceAM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestServiceAM.java
index 57cf367..260976a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestServiceAM.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestServiceAM.java
@@ -108,6 +108,7 @@ public class TestServiceAM extends ServiceTestUtils{
     ApplicationId applicationId = ApplicationId.newInstance(123456, 1);
     Service exampleApp = new Service();
     exampleApp.setId(applicationId.toString());
+    exampleApp.setVersion("v1");
     exampleApp.setName("testContainerCompleted");
     exampleApp.addComponent(createComponent("compa", 1, "pwd"));
 
@@ -146,6 +147,7 @@ public class TestServiceAM extends ServiceTestUtils{
         System.currentTimeMillis(), 1);
     Service exampleApp = new Service();
     exampleApp.setId(applicationId.toString());
+    exampleApp.setVersion("v1");
     exampleApp.setName("testContainersRecovers");
     String comp1Name = "comp1";
     String comp1InstName = "comp1-0";
@@ -189,6 +191,7 @@ public class TestServiceAM extends ServiceTestUtils{
     Service exampleApp = new Service();
     exampleApp.setId(applicationId.toString());
     exampleApp.setName("testContainersRecovers");
+    exampleApp.setVersion("v1");
     String comp1Name = "comp1";
     String comp1InstName = "comp1-0";
 
@@ -230,6 +233,7 @@ public class TestServiceAM extends ServiceTestUtils{
     Service exampleApp = new Service();
     exampleApp.setId(applicationId.toString());
     exampleApp.setName("testContainersFromDifferentApp");
+    exampleApp.setVersion("v1");
     String comp1Name = "comp1";
     String comp1InstName = "comp1-0";
 
@@ -270,6 +274,7 @@ public class TestServiceAM extends ServiceTestUtils{
     Service exampleApp = new Service();
     exampleApp.setId(applicationId.toString());
     exampleApp.setName("testScheduleWithMultipleResourceTypes");
+    exampleApp.setVersion("v1");
 
     List<ResourceTypeInfo> resourceTypeInfos = new ArrayList<>(
         ResourceUtils.getResourcesTypeInfo());

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b39d183d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestServiceManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestServiceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestServiceManager.java
index c65a5d4..56a0c71 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestServiceManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestServiceManager.java
@@ -49,7 +49,7 @@ public class TestServiceManager {
   @Test
   public void testUpgrade() throws IOException, SliderException {
     ServiceManager serviceManager = createTestServiceManager("testUpgrade");
-    upgrade(serviceManager, "v2", false);
+    upgrade(serviceManager, "v2", false, false);
     Assert.assertEquals("service not upgraded", ServiceState.UPGRADING,
         serviceManager.getServiceSpec().getState());
   }
@@ -57,8 +57,9 @@ public class TestServiceManager {
   @Test
   public void testRestartNothingToUpgrade()
       throws IOException, SliderException {
-    ServiceManager serviceManager = createTestServiceManager("testRestart");
-    upgrade(serviceManager, "v2", false);
+    ServiceManager serviceManager = createTestServiceManager(
+        "testRestartNothingToUpgrade");
+    upgrade(serviceManager, "v2", false, false);
 
     //make components stable
     serviceManager.getServiceSpec().getComponents().forEach(comp -> {
@@ -70,21 +71,118 @@ public class TestServiceManager {
   }
 
   @Test
+  public void testAutoFinalizeNothingToUpgrade() throws IOException,
+      SliderException {
+    ServiceManager serviceManager = createTestServiceManager(
+        "testAutoFinalizeNothingToUpgrade");
+    upgrade(serviceManager, "v2", false, true);
+
+    //make components stable
+    serviceManager.getServiceSpec().getComponents().forEach(comp ->
+        comp.setState(ComponentState.STABLE));
+    serviceManager.handle(new ServiceEvent(ServiceEventType.CHECK_STABLE));
+    Assert.assertEquals("service stable", ServiceState.STABLE,
+        serviceManager.getServiceSpec().getState());
+  }
+
+  @Test
   public void testRestartWithPendingUpgrade()
       throws IOException, SliderException {
     ServiceManager serviceManager = createTestServiceManager("testRestart");
-    upgrade(serviceManager, "v2", true);
+    upgrade(serviceManager, "v2", true, false);
     serviceManager.handle(new ServiceEvent(ServiceEventType.START));
     Assert.assertEquals("service should still be upgrading",
         ServiceState.UPGRADING, serviceManager.getServiceSpec().getState());
   }
 
+  @Test
+  public void testCheckState() throws IOException, SliderException {
+    ServiceManager serviceManager = createTestServiceManager(
+        "testCheckState");
+    upgrade(serviceManager, "v2", true, false);
+    Assert.assertEquals("service not upgrading", ServiceState.UPGRADING,
+        serviceManager.getServiceSpec().getState());
+
+    // make components stable
+    serviceManager.getServiceSpec().getComponents().forEach(comp -> {
+      comp.setState(ComponentState.STABLE);
+    });
+    ServiceEvent checkStable = new ServiceEvent(ServiceEventType.CHECK_STABLE);
+    serviceManager.handle(checkStable);
+    Assert.assertEquals("service should still be upgrading",
+        ServiceState.UPGRADING, serviceManager.getServiceSpec().getState());
+
+    // finalize service
+    ServiceEvent restart = new ServiceEvent(ServiceEventType.START);
+    serviceManager.handle(restart);
+    Assert.assertEquals("service not stable",
+        ServiceState.STABLE, serviceManager.getServiceSpec().getState());
+
+    validateUpgradeFinalization(serviceManager.getName(), "v2");
+  }
 
-  private void upgrade(ServiceManager service, String version,
-      boolean upgradeArtifact)
+  @Test
+  public void testCheckStateAutoFinalize() throws IOException, SliderException {
+    ServiceManager serviceManager = createTestServiceManager(
+        "testCheckState");
+    serviceManager.getServiceSpec().setState(
+        ServiceState.UPGRADING_AUTO_FINALIZE);
+    upgrade(serviceManager, "v2", true, true);
+    Assert.assertEquals("service not upgrading",
+        ServiceState.UPGRADING_AUTO_FINALIZE,
+        serviceManager.getServiceSpec().getState());
+
+    // make components stable
+    serviceManager.getServiceSpec().getComponents().forEach(comp ->
+        comp.setState(ComponentState.STABLE));
+    ServiceEvent checkStable = new ServiceEvent(ServiceEventType.CHECK_STABLE);
+    serviceManager.handle(checkStable);
+    Assert.assertEquals("service not stable",
+        ServiceState.STABLE, serviceManager.getServiceSpec().getState());
+
+    validateUpgradeFinalization(serviceManager.getName(), "v2");
+  }
+
+  @Test
+  public void testInvalidUpgrade() throws IOException, SliderException {
+    ServiceManager serviceManager = createTestServiceManager(
+        "testInvalidUpgrade");
+    serviceManager.getServiceSpec().setState(
+        ServiceState.UPGRADING_AUTO_FINALIZE);
+    Service upgradedDef = ServiceTestUtils.createExampleApplication();
+    upgradedDef.setName(serviceManager.getName());
+    upgradedDef.setVersion("v2");
+    upgradedDef.setLifetime(2L);
+    writeUpgradedDef(upgradedDef);
+
+    try {
+      serviceManager.processUpgradeRequest("v2", true);
+    } catch (Exception ex) {
+      Assert.assertTrue(ex instanceof UnsupportedOperationException);
+      return;
+    }
+    Assert.fail();
+  }
+
+  private void validateUpgradeFinalization(String serviceName,
+      String expectedVersion) throws IOException {
+    Service savedSpec = ServiceApiUtil.loadService(rule.getFs(), serviceName);
+    Assert.assertEquals("service def not re-written", expectedVersion,
+        savedSpec.getVersion());
+    Assert.assertNotNull("app id not present", savedSpec.getId());
+    Assert.assertEquals("state not stable", ServiceState.STABLE,
+        savedSpec.getState());
+    savedSpec.getComponents().forEach(compSpec -> {
+      Assert.assertEquals("comp not stable", ComponentState.STABLE,
+          compSpec.getState());
+    });
+  }
+
+  private void upgrade(ServiceManager serviceManager, String version,
+      boolean upgradeArtifact, boolean autoFinalize)
       throws IOException, SliderException {
     Service upgradedDef = ServiceTestUtils.createExampleApplication();
-    upgradedDef.setName(service.getName());
+    upgradedDef.setName(serviceManager.getName());
     upgradedDef.setVersion(version);
     if (upgradeArtifact) {
       Artifact upgradedArtifact = createTestArtifact("2");
@@ -93,9 +191,13 @@ public class TestServiceManager {
       });
     }
     writeUpgradedDef(upgradedDef);
+    serviceManager.processUpgradeRequest(version, autoFinalize);
     ServiceEvent upgradeEvent = new ServiceEvent(ServiceEventType.UPGRADE);
-    upgradeEvent.setVersion("v2");
-    service.handle(upgradeEvent);
+    upgradeEvent.setVersion(version);
+    if (autoFinalize) {
+      upgradeEvent.setAutoFinalize(true);
+    }
+    serviceManager.handle(upgradeEvent);
   }
 
   private ServiceManager createTestServiceManager(String name)
@@ -124,7 +226,7 @@ public class TestServiceManager {
     return new ServiceManager(context);
   }
 
-  static Service createBaseDef(String name) {
+  public static Service createBaseDef(String name) {
     ApplicationId applicationId = ApplicationId.newInstance(
         System.currentTimeMillis(), 1);
     Service serviceDef = ServiceTestUtils.createExampleApplication();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b39d183d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestYarnNativeServices.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestYarnNativeServices.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestYarnNativeServices.java
index 443ba0b..32ea6e5 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestYarnNativeServices.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestYarnNativeServices.java
@@ -31,9 +31,9 @@ import org.apache.hadoop.yarn.api.records.*;
 import org.apache.hadoop.yarn.client.api.YarnClient;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.service.api.records.ComponentState;
 import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
 import org.apache.hadoop.yarn.service.api.records.Component;
-import org.apache.hadoop.yarn.service.api.records.ComponentState;
 import org.apache.hadoop.yarn.service.api.records.Container;
 import org.apache.hadoop.yarn.service.api.records.ContainerState;
 import org.apache.hadoop.yarn.service.api.records.PlacementConstraint;
@@ -372,25 +372,47 @@ public class TestYarnNativeServices extends ServiceTestUtils {
   }
 
   @Test(timeout = 200000)
-  public void testUpgradeService() throws Exception {
+  public void testUpgrade() throws Exception {
     setupInternal(NUM_NMS);
     ServiceClient client = createClient(getConf());
 
     Service service = createExampleApplication();
     client.actionCreate(service);
-    waitForServiceToBeStarted(client, service);
+    waitForServiceToBeStable(client, service);
 
-    //upgrade the service
+    // upgrade the service
+    Component component = service.getComponents().iterator().next();
+    service.setState(ServiceState.UPGRADING);
     service.setVersion("v2");
-    client.actionUpgrade(service);
+    component.getConfiguration().getEnv().put("key1", "val1");
+    client.initiateUpgrade(service);
 
-    //wait for service to be in upgrade state
+    // wait for service to be in upgrade state
     waitForServiceToBeInState(client, service, ServiceState.UPGRADING);
     SliderFileSystem fs = new SliderFileSystem(getConf());
     Service fromFs = ServiceApiUtil.loadServiceUpgrade(fs,
         service.getName(), service.getVersion());
     Assert.assertEquals(service.getName(), fromFs.getName());
     Assert.assertEquals(service.getVersion(), fromFs.getVersion());
+
+    // upgrade containers
+    Service liveService = client.getStatus(service.getName());
+    client.actionUpgrade(service,
+        liveService.getComponent(component.getName()).getContainers());
+    waitForAllCompToBeReady(client, service);
+
+    // finalize the upgrade
+    client.actionStart(service.getName());
+    waitForServiceToBeStable(client, service);
+    Service active = client.getStatus(service.getName());
+    Assert.assertEquals("component not stable", ComponentState.STABLE,
+        active.getComponent(component.getName()).getState());
+    Assert.assertEquals("comp does not have new env", "val1",
+        active.getComponent(component.getName()).getConfiguration()
+            .getEnv("key1"));
+    LOG.info("Stop/destroy service {}", service);
+    client.actionStop(service.getName(), true);
+    client.actionDestroy(service.getName());
   }
 
   // Test to verify ANTI_AFFINITY placement policy

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b39d183d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/client/TestServiceCLI.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/client/TestServiceCLI.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/client/TestServiceCLI.java
index a95818f..7290962 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/client/TestServiceCLI.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/client/TestServiceCLI.java
@@ -21,9 +21,9 @@ package org.apache.hadoop.yarn.service.client;
 import org.apache.commons.io.FileUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.util.ToolRunner;
-import org.apache.hadoop.yarn.client.api.AppAdminClient;
 import org.apache.hadoop.yarn.client.cli.ApplicationCLI;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.service.api.records.Component;
 import org.apache.hadoop.yarn.service.api.records.Service;
 import org.apache.hadoop.yarn.service.conf.ExampleAppJson;
@@ -36,12 +36,15 @@ import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.ByteArrayOutputStream;
 import java.io.File;
 import java.io.IOException;
-import java.util.Arrays;
+import java.io.PrintStream;
 import java.util.List;
 
+import static org.apache.hadoop.yarn.client.api.AppAdminClient.YARN_APP_ADMIN_CLIENT_PREFIX;
 import static org.apache.hadoop.yarn.service.conf.YarnServiceConf.YARN_SERVICE_BASE_PATH;
+import static org.mockito.Mockito.spy;
 
 public class TestServiceCLI {
   private static final Logger LOG = LoggerFactory.getLogger(TestServiceCLI
@@ -51,33 +54,36 @@ public class TestServiceCLI {
   private File basedir;
   private SliderFileSystem fs;
   private String basedirProp;
+  private ApplicationCLI cli;
 
-  private void runCLI(String[] args) throws Exception {
-    LOG.info("running CLI: yarn {}", Arrays.asList(args));
-    ApplicationCLI cli = new ApplicationCLI();
-    cli.setSysOutPrintStream(System.out);
-    cli.setSysErrPrintStream(System.err);
-    int res = ToolRunner.run(cli, ApplicationCLI.preProcessArgs(args));
-    cli.stop();
+  private void createCLI() {
+    cli = new ApplicationCLI();
+    PrintStream sysOut = spy(new PrintStream(new ByteArrayOutputStream()));
+    PrintStream sysErr = spy(new PrintStream(new ByteArrayOutputStream()));
+    cli.setSysOutPrintStream(sysOut);
+    cli.setSysErrPrintStream(sysErr);
+    conf.set(YARN_APP_ADMIN_CLIENT_PREFIX + DUMMY_APP_TYPE,
+        DummyServiceClient.class.getName());
+    cli.setConf(conf);
   }
 
   private void buildApp(String serviceName, String appDef) throws Throwable {
     String[] args = {"app",
         "-D", basedirProp, "-save", serviceName,
         ExampleAppJson.resourceName(appDef),
-        "-appTypes", AppAdminClient.UNIT_TEST_TYPE};
-    runCLI(args);
+        "-appTypes", DUMMY_APP_TYPE};
+    ToolRunner.run(cli, ApplicationCLI.preProcessArgs(args));
   }
 
-  private void buildApp(String serviceName, String appDef, String lifetime,
-      String queue) throws Throwable {
+  private void buildApp(String serviceName, String appDef,
+      String lifetime, String queue) throws Throwable {
     String[] args = {"app",
         "-D", basedirProp, "-save", serviceName,
         ExampleAppJson.resourceName(appDef),
-        "-appTypes", AppAdminClient.UNIT_TEST_TYPE,
+        "-appTypes", DUMMY_APP_TYPE,
         "-updateLifetime", lifetime,
         "-changeQueue", queue};
-    runCLI(args);
+    ToolRunner.run(cli, ApplicationCLI.preProcessArgs(args));
   }
 
   @Before
@@ -91,6 +97,7 @@ public class TestServiceCLI {
     } else {
       basedir.mkdirs();
     }
+    createCLI();
   }
 
   @After
@@ -98,6 +105,7 @@ public class TestServiceCLI {
     if (basedir != null) {
       FileUtils.deleteDirectory(basedir);
     }
+    cli.stop();
   }
 
   @Test
@@ -114,6 +122,38 @@ public class TestServiceCLI {
     checkApp(serviceName, "master", 1L, 1000L, "qname");
   }
 
+  @Test
+  public void testInitiateServiceUpgrade() throws Exception {
+    String[] args = {"app", "-upgrade", "app-1",
+        "-initiate", ExampleAppJson.resourceName(ExampleAppJson.APP_JSON),
+        "-appTypes", DUMMY_APP_TYPE};
+    int result = cli.run(ApplicationCLI.preProcessArgs(args));
+    Assert.assertEquals(result, 0);
+  }
+
+  @Test
+  public void testInitiateAutoFinalizeServiceUpgrade() throws Exception {
+    String[] args =  {"app", "-upgrade", "app-1",
+        "-initiate", ExampleAppJson.resourceName(ExampleAppJson.APP_JSON),
+        "-autoFinalize",
+        "-appTypes", DUMMY_APP_TYPE};
+    int result = cli.run(ApplicationCLI.preProcessArgs(args));
+    Assert.assertEquals(result, 0);
+  }
+
+  @Test
+  public void testUpgradeInstances() throws Exception {
+    conf.set(YARN_APP_ADMIN_CLIENT_PREFIX + DUMMY_APP_TYPE,
+        DummyServiceClient.class.getName());
+    cli.setConf(conf);
+    String[] args = {"app", "-upgrade", "app-1",
+        "-instances", "comp1-0,comp1-1",
+        "-appTypes", DUMMY_APP_TYPE};
+    int result = cli.run(ApplicationCLI.preProcessArgs(args));
+    Assert.assertEquals(result, 0);
+  }
+
+
   private void checkApp(String serviceName, String compName, long count, Long
       lifetime, String queue) throws IOException {
     Service service = ServiceApiUtil.loadService(fs, serviceName);
@@ -130,4 +170,24 @@ public class TestServiceCLI {
     }
     Assert.fail();
   }
+
+  private static final String DUMMY_APP_TYPE = "dummy";
+
+  /**
+   * Dummy service client for test purpose.
+   */
+  public static class DummyServiceClient extends ServiceClient {
+
+    @Override
+    public int initiateUpgrade(String appName, String fileName,
+        boolean autoFinalize) throws IOException, YarnException {
+      return 0;
+    }
+
+    @Override
+    public int actionUpgradeInstances(String appName,
+        List<String> componentInstances) throws IOException, YarnException {
+      return 0;
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b39d183d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/client/TestServiceClient.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/client/TestServiceClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/client/TestServiceClient.java
index cc5b6ec..3e3280b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/client/TestServiceClient.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/client/TestServiceClient.java
@@ -24,17 +24,29 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
 import org.apache.hadoop.yarn.client.api.YarnClient;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.service.ClientAMProtocol;
+import org.apache.hadoop.yarn.proto.ClientAMProtocol.CompInstancesUpgradeRequestProto;
+import org.apache.hadoop.yarn.proto.ClientAMProtocol.CompInstancesUpgradeResponseProto;
+import org.apache.hadoop.yarn.proto.ClientAMProtocol.UpgradeServiceRequestProto;
+import org.apache.hadoop.yarn.proto.ClientAMProtocol.UpgradeServiceResponseProto;
 import org.apache.hadoop.yarn.service.ServiceTestUtils;
+import org.apache.hadoop.yarn.service.api.records.Component;
+import org.apache.hadoop.yarn.service.api.records.Container;
 import org.apache.hadoop.yarn.service.api.records.Service;
+import org.apache.hadoop.yarn.service.api.records.ServiceState;
 import org.apache.hadoop.yarn.service.utils.ServiceApiUtil;
 import org.junit.Assert;
 import org.junit.Rule;
 import org.junit.Test;
 import org.mockito.Matchers;
+import org.mockito.stubbing.Answer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -47,79 +59,152 @@ import static org.mockito.Mockito.when;
  */
 public class TestServiceClient {
 
+  private static final Logger LOG = LoggerFactory.getLogger(
+      TestServiceClient.class);
+
   @Rule
   public ServiceTestUtils.ServiceFSWatcher rule =
       new ServiceTestUtils.ServiceFSWatcher();
 
   @Test
-  public void testActionUpgrade() throws Exception {
-    ApplicationId applicationId = ApplicationId.newInstance(
-        System.currentTimeMillis(), 1);
-    ServiceClient client = createServiceClient(applicationId);
-
-    Service service = ServiceTestUtils.createExampleApplication();
-    service.setVersion("v1");
-    client.actionCreate(service);
+  public void testActionServiceUpgrade() throws Exception {
+    Service service = createService();
+    ServiceClient client = MockServiceClient.create(rule, service);
 
     //upgrade the service
     service.setVersion("v2");
-    client.actionUpgrade(service);
+    client.initiateUpgrade(service);
 
-    //wait for service to be in upgrade state
     Service fromFs = ServiceApiUtil.loadServiceUpgrade(rule.getFs(),
         service.getName(), service.getVersion());
     Assert.assertEquals(service.getName(), fromFs.getName());
     Assert.assertEquals(service.getVersion(), fromFs.getVersion());
+    client.stop();
   }
 
+  @Test
+  public void testActionCompInstanceUpgrade() throws Exception {
+    Service service = createService();
+    MockServiceClient client = MockServiceClient.create(rule, service);
 
-  private ServiceClient createServiceClient(ApplicationId applicationId)
-      throws Exception {
-    ClientAMProtocol amProxy = mock(ClientAMProtocol.class);
-    YarnClient yarnClient = createMockYarnClient();
-    ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance(
-        applicationId, 1);
-    ApplicationAttemptReport attemptReport =
-        ApplicationAttemptReport.newInstance(attemptId, "localhost", 0,
-            null, null, null,
-        YarnApplicationAttemptState.RUNNING, null);
-
-    ApplicationReport appReport = mock(ApplicationReport.class);
-    when(appReport.getHost()).thenReturn("localhost");
-
-    when(yarnClient.getApplicationAttemptReport(Matchers.any()))
-        .thenReturn(attemptReport);
-    when(yarnClient.getApplicationReport(applicationId)).thenReturn(appReport);
-
-    ServiceClient client = new ServiceClient() {
-      @Override
-      protected void serviceInit(Configuration configuration) throws Exception {
-      }
+    //upgrade the service
+    service.setVersion("v2");
+    client.initiateUpgrade(service);
+
+    //add containers to the component that needs to be upgraded.
+    Component comp = service.getComponents().iterator().next();
+    ContainerId containerId = ContainerId.newContainerId(client.attemptId, 1L);
+    comp.addContainer(new Container().id(containerId.toString()));
+
+    client.actionUpgrade(service, comp.getContainers());
+    CompInstancesUpgradeResponseProto response = client.getLastProxyResponse(
+        CompInstancesUpgradeResponseProto.class);
+    Assert.assertNotNull("upgrade did not complete", response);
+    client.stop();
+  }
 
-      @Override
-      protected ClientAMProtocol createAMProxy(String serviceName,
-          ApplicationReport appReport) throws IOException, YarnException {
-        return amProxy;
-      }
+  private Service createService() throws IOException,
+      YarnException {
+    Service service = ServiceTestUtils.createExampleApplication();
+    service.setVersion("v1");
+    service.setState(ServiceState.UPGRADING);
+    return service;
+  }
 
-      @Override
-      ApplicationId submitApp(Service app) throws IOException, YarnException {
-        return applicationId;
+  private static final class MockServiceClient extends ServiceClient {
+
+    private final ApplicationId appId;
+    private final ApplicationAttemptId attemptId;
+    private final ClientAMProtocol amProxy;
+    private Object proxyResponse;
+    private Service service;
+
+    private MockServiceClient()  {
+      amProxy = mock(ClientAMProtocol.class);
+      appId = ApplicationId.newInstance(System.currentTimeMillis(), 1);
+      LOG.debug("mocking service client for {}", appId);
+      attemptId = ApplicationAttemptId.newInstance(appId, 1);
+    }
+
+    static MockServiceClient create(ServiceTestUtils.ServiceFSWatcher rule,
+        Service service)
+        throws IOException, YarnException {
+      MockServiceClient client = new MockServiceClient();
+
+      YarnClient yarnClient = createMockYarnClient();
+      ApplicationReport appReport = mock(ApplicationReport.class);
+      when(appReport.getHost()).thenReturn("localhost");
+      when(appReport.getYarnApplicationState()).thenReturn(
+          YarnApplicationState.RUNNING);
+
+      ApplicationAttemptReport attemptReport =
+          ApplicationAttemptReport.newInstance(client.attemptId, "localhost", 0,
+              null, null, null,
+              YarnApplicationAttemptState.RUNNING, null);
+      when(yarnClient.getApplicationAttemptReport(Matchers.any()))
+          .thenReturn(attemptReport);
+      when(yarnClient.getApplicationReport(client.appId)).thenReturn(appReport);
+      when(client.amProxy.upgrade(
+          Matchers.any(UpgradeServiceRequestProto.class))).thenAnswer(
+          (Answer<UpgradeServiceResponseProto>) invocation -> {
+              UpgradeServiceResponseProto response =
+                  UpgradeServiceResponseProto.newBuilder().build();
+              client.proxyResponse = response;
+              return response;
+            });
+      when(client.amProxy.upgrade(Matchers.any(
+          CompInstancesUpgradeRequestProto.class))).thenAnswer(
+          (Answer<CompInstancesUpgradeResponseProto>) invocation -> {
+              CompInstancesUpgradeResponseProto response =
+                CompInstancesUpgradeResponseProto.newBuilder().build();
+              client.proxyResponse = response;
+              return response;
+            });
+      client.setFileSystem(rule.getFs());
+      client.setYarnClient(yarnClient);
+      client.service = service;
+
+      client.init(rule.getConf());
+      client.start();
+      client.actionCreate(service);
+      return client;
+    }
+
+    @Override
+    protected void serviceInit(Configuration configuration) throws Exception {
+    }
+
+    @Override
+    protected ClientAMProtocol createAMProxy(String serviceName,
+        ApplicationReport appReport) throws IOException, YarnException {
+      return amProxy;
+    }
+
+    @Override
+    ApplicationId submitApp(Service app) throws IOException, YarnException {
+      return appId;
+    }
+
+    @Override
+    public Service getStatus(String serviceName) throws IOException,
+        YarnException {
+      service.setState(ServiceState.STABLE);
+      return service;
+    }
+
+    private <T> T getLastProxyResponse(Class<T> clazz) {
+      if (clazz.isInstance(proxyResponse)) {
+        return clazz.cast(proxyResponse);
       }
-    };
-
-    client.setFileSystem(rule.getFs());
-    client.setYarnClient(yarnClient);
-
-    client.init(rule.getConf());
-    client.start();
-    return client;
+      return null;
+    }
   }
 
-  private YarnClient createMockYarnClient() throws IOException, YarnException {
+  private static YarnClient createMockYarnClient() throws IOException,
+      YarnException {
     YarnClient yarnClient = mock(YarnClient.class);
-    when(yarnClient.getApplications(Matchers.any(GetApplicationsRequest.class)))
-        .thenReturn(new ArrayList<>());
+    when(yarnClient.getApplications(Matchers.any(
+        GetApplicationsRequest.class))).thenReturn(new ArrayList<>());
     return yarnClient;
   }
 }
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message