hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ey...@apache.org
Subject [3/3] hadoop git commit: YARN-7939. Added support to upgrade a component instance. Contributed by Chandni Singh
Date Thu, 26 Apr 2018 19:51:44 GMT
YARN-7939.  Added support to upgrade a component instance.
            Contributed by Chandni Singh


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/86ae380e
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/86ae380e
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/86ae380e

Branch: refs/heads/trunk
Commit: 86ae380e55ca3b1b12744b9338d60b34e8c717a6
Parents: 583a2f4
Author: Eric Yang <eyang@apache.org>
Authored: Thu Apr 26 15:47:55 2018 -0400
Committer: Eric Yang <eyang@apache.org>
Committed: Thu Apr 26 15:47:55 2018 -0400

----------------------------------------------------------------------
 .../yarn/service/client/ApiServiceClient.java   |  86 ++++--
 .../hadoop/yarn/service/webapp/ApiServer.java   | 188 +++++++++++--
 .../hadoop/yarn/service/ServiceClientTest.java  | 120 +++++++--
 .../hadoop/yarn/service/TestApiServer.java      |  87 ++++--
 .../service/client/TestApiServiceClient.java    |  26 ++
 .../hadoop/yarn/service/ClientAMProtocol.java   |   6 +
 .../hadoop/yarn/service/ClientAMService.java    |  38 ++-
 .../hadoop/yarn/service/ServiceContext.java     |  10 +
 .../hadoop/yarn/service/ServiceEvent.java       |  10 +
 .../hadoop/yarn/service/ServiceEventType.java   |   2 +-
 .../hadoop/yarn/service/ServiceManager.java     | 156 ++++++++---
 .../hadoop/yarn/service/ServiceScheduler.java   |  27 ++
 .../yarn/service/api/records/Component.java     |  21 ++
 .../service/api/records/ContainerState.java     |   2 +-
 .../yarn/service/api/records/ServiceState.java  |   3 +-
 .../yarn/service/client/ServiceClient.java      |  87 +++++-
 .../yarn/service/component/Component.java       | 153 +++++++++--
 .../yarn/service/component/ComponentEvent.java  |  10 +
 .../service/component/ComponentEventType.java   |   2 +-
 .../component/instance/ComponentInstance.java   |  50 +++-
 .../instance/ComponentInstanceEventType.java    |   3 +-
 .../yarn/service/conf/RestApiConstants.java     |  12 +
 .../containerlaunch/ContainerLaunchService.java | 100 ++++++-
 .../pb/client/ClientAMProtocolPBClientImpl.java |  14 +
 .../service/ClientAMProtocolPBServiceImpl.java  |  12 +
 .../provider/AbstractProviderService.java       |  25 +-
 .../yarn/service/provider/ProviderService.java  |   7 +-
 .../yarn/service/provider/ProviderUtils.java    |  27 +-
 .../yarn/service/utils/ServiceApiUtil.java      |  48 +++-
 .../src/main/proto/ClientAMProtocol.proto       |  11 +
 .../hadoop/yarn/service/TestServiceAM.java      |   5 +
 .../hadoop/yarn/service/TestServiceManager.java | 122 ++++++++-
 .../yarn/service/TestYarnNativeServices.java    |  34 ++-
 .../yarn/service/client/TestServiceCLI.java     |  90 +++++--
 .../yarn/service/client/TestServiceClient.java  | 187 +++++++++----
 .../yarn/service/component/TestComponent.java   | 265 +++++++++++++++++++
 .../instance/TestComponentInstance.java         |  88 ++++++
 .../service/monitor/TestServiceMonitor.java     |   1 +
 .../hadoop/yarn/client/api/AppAdminClient.java  |  27 +-
 .../hadoop/yarn/client/cli/ApplicationCLI.java  |  65 +++++
 .../hadoop/yarn/client/cli/TestYarnCLI.java     |  21 ++
 41 files changed, 1961 insertions(+), 287 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/86ae380e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/client/ApiServiceClient.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/client/ApiServiceClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/client/ApiServiceClient.java
index e4a245d..cdba555 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/client/ApiServiceClient.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/client/ApiServiceClient.java
@@ -26,6 +26,7 @@ import java.util.Map;
 
 import javax.ws.rs.core.MediaType;
 
+import com.google.common.base.Preconditions;
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
@@ -40,11 +41,16 @@ import org.apache.hadoop.yarn.client.api.YarnClient;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.service.api.records.Component;
+import org.apache.hadoop.yarn.service.api.records.Container;
+import org.apache.hadoop.yarn.service.api.records.ContainerState;
 import org.apache.hadoop.yarn.service.api.records.Service;
 import org.apache.hadoop.yarn.service.api.records.ServiceState;
 import org.apache.hadoop.yarn.service.api.records.ServiceStatus;
+import org.apache.hadoop.yarn.service.conf.RestApiConstants;
+import org.apache.hadoop.yarn.service.utils.JsonSerDeser;
 import org.apache.hadoop.yarn.service.utils.ServiceApiUtil;
 import org.apache.hadoop.yarn.util.RMHAUtils;
+import org.codehaus.jackson.map.PropertyNamingStrategy;
 import org.eclipse.jetty.util.UrlEncoded;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -131,7 +137,7 @@ public class ApiServiceClient extends AppAdminClient {
    * @return URI to API Service
    * @throws IOException
    */
-  private String getApiUrl(String appName) throws IOException {
+  private String getServicePath(String appName) throws IOException {
     String url = getRMWebAddress();
     StringBuilder api = new StringBuilder();
     api.append(url);
@@ -148,23 +154,40 @@ public class ApiServiceClient extends AppAdminClient {
     return api.toString();
   }
 
+  private String getInstancesPath(String appName) throws IOException {
+    Preconditions.checkNotNull(appName);
+    String url = getRMWebAddress();
+    StringBuilder api = new StringBuilder();
+    api.append(url);
+    api.append("/app/v1/services/").append(appName).append("/")
+        .append(RestApiConstants.COMP_INSTANCES);
+    Configuration conf = getConfig();
+    if (conf.get("hadoop.http.authentication.type").equalsIgnoreCase(
+        "simple")) {
+      api.append("?user.name=" + UrlEncoded
+          .encodeString(System.getProperty("user.name")));
+    }
+    return api.toString();
+  }
+
   private Builder getApiClient() throws IOException {
-    return getApiClient(null);
+    return getApiClient(getServicePath(null));
   }
 
   /**
    * Setup API service web request.
    *
-   * @param appName
+   * @param requestPath
    * @return
    * @throws IOException
    */
-  private Builder getApiClient(String appName) throws IOException {
+  private Builder getApiClient(String requestPath)
+      throws IOException {
     Client client = Client.create(getClientConfig());
     Configuration conf = getConfig();
     client.setChunkedEncodingSize(null);
     Builder builder = client
-        .resource(getApiUrl(appName)).type(MediaType.APPLICATION_JSON);
+        .resource(requestPath).type(MediaType.APPLICATION_JSON);
     if (conf.get("hadoop.http.authentication.type").equals("kerberos")) {
       AuthenticatedURL.Token token = new AuthenticatedURL.Token();
       builder.header("WWW-Authenticate", token);
@@ -312,7 +335,7 @@ public class ApiServiceClient extends AppAdminClient {
       service.setName(appName);
       service.setState(ServiceState.STOPPED);
       String buffer = jsonSerDeser.toJson(service);
-      ClientResponse response = getApiClient(appName)
+      ClientResponse response = getApiClient(getServicePath(appName))
           .put(ClientResponse.class, buffer);
       result = processResponse(response);
     } catch (Exception e) {
@@ -335,7 +358,7 @@ public class ApiServiceClient extends AppAdminClient {
       service.setName(appName);
       service.setState(ServiceState.STARTED);
       String buffer = jsonSerDeser.toJson(service);
-      ClientResponse response = getApiClient(appName)
+      ClientResponse response = getApiClient(getServicePath(appName))
           .put(ClientResponse.class, buffer);
       result = processResponse(response);
     } catch (Exception e) {
@@ -381,7 +404,7 @@ public class ApiServiceClient extends AppAdminClient {
   public int actionDestroy(String appName) throws IOException, YarnException {
     int result = EXIT_SUCCESS;
     try {
-      ClientResponse response = getApiClient(appName)
+      ClientResponse response = getApiClient(getServicePath(appName))
           .delete(ClientResponse.class);
       result = processResponse(response);
     } catch (Exception e) {
@@ -413,7 +436,7 @@ public class ApiServiceClient extends AppAdminClient {
         service.addComponent(component);
       }
       String buffer = jsonSerDeser.toJson(service);
-      ClientResponse response = getApiClient(appName)
+      ClientResponse response = getApiClient(getServicePath(appName))
           .put(ClientResponse.class, buffer);
       result = processResponse(response);
     } catch (Exception e) {
@@ -454,7 +477,8 @@ public class ApiServiceClient extends AppAdminClient {
       ServiceApiUtil.validateNameFormat(appName, getConfig());
     }
     try {
-      ClientResponse response = getApiClient(appName).get(ClientResponse.class);
+      ClientResponse response = getApiClient(getServicePath(appName))
+          .get(ClientResponse.class);
       if (response.getStatus() != 200) {
         StringBuilder sb = new StringBuilder();
         sb.append(appName);
@@ -470,16 +494,20 @@ public class ApiServiceClient extends AppAdminClient {
   }
 
   @Override
-  public int actionUpgrade(String appName,
-      String fileName) throws IOException, YarnException {
+  public int initiateUpgrade(String appName,
+      String fileName, boolean autoFinalize) throws IOException, YarnException {
     int result;
     try {
       Service service =
           loadAppJsonFromLocalFS(fileName, appName, null, null);
-      service.setState(ServiceState.UPGRADING);
+      if (autoFinalize) {
+        service.setState(ServiceState.UPGRADING_AUTO_FINALIZE);
+      } else {
+        service.setState(ServiceState.UPGRADING);
+      }
       String buffer = jsonSerDeser.toJson(service);
-      ClientResponse response = getApiClient()
-          .post(ClientResponse.class, buffer);
+      ClientResponse response = getApiClient(getServicePath(appName))
+          .put(ClientResponse.class, buffer);
       result = processResponse(response);
     } catch (Exception e) {
       LOG.error("Failed to upgrade application: ", e);
@@ -487,4 +515,32 @@ public class ApiServiceClient extends AppAdminClient {
     }
     return result;
   }
+
+  @Override
+  public int actionUpgradeInstances(String appName, List<String> compInstances)
+      throws IOException, YarnException {
+    int result;
+    Container[] toUpgrade = new Container[compInstances.size()];
+    try {
+      int idx = 0;
+      for (String instanceName : compInstances) {
+        Container container = new Container();
+        container.setComponentInstanceName(instanceName);
+        container.setState(ContainerState.UPGRADING);
+        toUpgrade[idx++] = container;
+      }
+      String buffer = containerJsonSerde.toJson(toUpgrade);
+      ClientResponse response = getApiClient(getInstancesPath(appName))
+          .put(ClientResponse.class, buffer);
+      result = processResponse(response);
+    } catch (Exception e) {
+      LOG.error("Failed to upgrade component instance: ", e);
+      result = EXIT_EXCEPTION_THROWN;
+    }
+    return result;
+  }
+
+  private static JsonSerDeser<Container[]> containerJsonSerde =
+      new JsonSerDeser<>(Container[].class,
+      PropertyNamingStrategy.CAMEL_CASE_TO_LOWER_CASE_WITH_UNDERSCORES);
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/86ae380e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/webapp/ApiServer.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/webapp/ApiServer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/webapp/ApiServer.java
index 14c77f6..6f32598 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/webapp/ApiServer.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/webapp/ApiServer.java
@@ -17,6 +17,7 @@
 
 package org.apache.hadoop.yarn.service.webapp;
 
+import com.google.common.collect.Lists;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
 
@@ -28,10 +29,14 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.service.api.records.Component;
+import org.apache.hadoop.yarn.service.api.records.Container;
+import org.apache.hadoop.yarn.service.api.records.ContainerState;
 import org.apache.hadoop.yarn.service.api.records.Service;
 import org.apache.hadoop.yarn.service.api.records.ServiceState;
 import org.apache.hadoop.yarn.service.api.records.ServiceStatus;
 import org.apache.hadoop.yarn.service.client.ServiceClient;
+import org.apache.hadoop.yarn.service.conf.RestApiConstants;
+import org.apache.hadoop.yarn.service.utils.ServiceApiUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -53,9 +58,12 @@ import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.lang.reflect.UndeclaredThrowableException;
 import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;
 
 import static org.apache.hadoop.yarn.service.api.records.ServiceState.ACCEPTED;
 import static org.apache.hadoop.yarn.service.conf.RestApiConstants.*;
@@ -177,17 +185,7 @@ public class ApiServer {
       }
       UserGroupInformation ugi = getProxyUser(request);
       LOG.info("GET: getService for appName = {} user = {}", appName, ugi);
-      Service app = ugi.doAs(new PrivilegedExceptionAction<Service>() {
-        @Override
-        public Service run() throws IOException, YarnException {
-          ServiceClient sc = getServiceClient();
-          sc.init(YARN_CONFIG);
-          sc.start();
-          Service app = sc.getStatus(appName);
-          sc.close();
-          return app;
-        }
-      });
+      Service app = getServiceFromClient(ugi, appName);
       return Response.ok(app).build();
     } catch (AccessControlException e) {
       return formatResponse(Status.FORBIDDEN, e.getMessage());
@@ -393,17 +391,19 @@ public class ApiServer {
         return startService(appName, ugi);
       }
 
+      // If an UPGRADE is requested
+      if (updateServiceData.getState() != null && (
+          updateServiceData.getState() == ServiceState.UPGRADING ||
+              updateServiceData.getState() ==
+                  ServiceState.UPGRADING_AUTO_FINALIZE)) {
+        return upgradeService(updateServiceData, ugi);
+      }
+
       // If new lifetime value specified then update it
       if (updateServiceData.getLifetime() != null
           && updateServiceData.getLifetime() > 0) {
         return updateLifetime(appName, updateServiceData, ugi);
       }
-
-      // If an UPGRADE is requested
-      if (updateServiceData.getState() != null &&
-          updateServiceData.getState() == ServiceState.UPGRADING) {
-        return upgradeService(updateServiceData, ugi);
-      }
     } catch (UndeclaredThrowableException e) {
       return formatResponse(Status.BAD_REQUEST,
           e.getCause().getMessage());
@@ -427,6 +427,103 @@ public class ApiServer {
     return Response.status(Status.NO_CONTENT).build();
   }
 
+  @PUT
+  @Path(COMP_INSTANCE_LONG_PATH)
+  @Consumes({MediaType.APPLICATION_JSON})
+  @Produces({RestApiConstants.MEDIA_TYPE_JSON_UTF8, MediaType.TEXT_PLAIN})
+  public Response updateComponentInstance(@Context HttpServletRequest request,
+      @PathParam(SERVICE_NAME) String serviceName,
+      @PathParam(COMPONENT_NAME) String componentName,
+      @PathParam(COMP_INSTANCE_NAME) String compInstanceName,
+      Container reqContainer) {
+
+    try {
+      UserGroupInformation ugi = getProxyUser(request);
+      LOG.info("PUT: update component instance {} for component = {}" +
+              " service = {} user = {}", compInstanceName, componentName,
+          serviceName, ugi);
+      if (reqContainer == null) {
+        throw new YarnException("No container data provided.");
+      }
+      Service service = getServiceFromClient(ugi, serviceName);
+      Component component = service.getComponent(componentName);
+      if (component == null) {
+        throw new YarnException(String.format(
+            "The component name in the URI path (%s) is invalid.",
+            componentName));
+      }
+
+      Container liveContainer = component.getComponentInstance(
+          compInstanceName);
+      if (liveContainer == null) {
+        throw new YarnException(String.format(
+            "The component (%s) does not have a component instance (%s).",
+            componentName, compInstanceName));
+      }
+
+      if (reqContainer.getState() != null
+          && reqContainer.getState().equals(ContainerState.UPGRADING)) {
+        return processContainerUpgrade(ugi, service,
+            Lists.newArrayList(liveContainer));
+      }
+    } catch (AccessControlException e) {
+      return formatResponse(Response.Status.FORBIDDEN, e.getMessage());
+    } catch (YarnException e) {
+      return formatResponse(Response.Status.BAD_REQUEST, e.getMessage());
+    } catch (IOException | InterruptedException e) {
+      return formatResponse(Response.Status.INTERNAL_SERVER_ERROR,
+          e.getMessage());
+    } catch (UndeclaredThrowableException e) {
+      return formatResponse(Response.Status.INTERNAL_SERVER_ERROR,
+          e.getCause().getMessage());
+    }
+    return Response.status(Status.NO_CONTENT).build();
+  }
+
+  @PUT
+  @Path(COMP_INSTANCES_PATH)
+  @Consumes({MediaType.APPLICATION_JSON})
+  @Produces({RestApiConstants.MEDIA_TYPE_JSON_UTF8, MediaType.TEXT_PLAIN})
+  public Response updateComponentInstances(@Context HttpServletRequest request,
+      @PathParam(SERVICE_NAME) String serviceName,
+      List<Container> requestContainers) {
+
+    try {
+      if (requestContainers == null || requestContainers.isEmpty()) {
+        throw new YarnException("No containers provided.");
+      }
+      UserGroupInformation ugi = getProxyUser(request);
+      List<String> toUpgrade = new ArrayList<>();
+      for (Container reqContainer : requestContainers) {
+        if (reqContainer.getState() != null &&
+            reqContainer.getState().equals(ContainerState.UPGRADING)) {
+          toUpgrade.add(reqContainer.getComponentInstanceName());
+        }
+      }
+
+      if (!toUpgrade.isEmpty()) {
+        Service service = getServiceFromClient(ugi, serviceName);
+        LOG.info("PUT: upgrade component instances {} for service = {} " +
+            "user = {}", toUpgrade, serviceName, ugi);
+        List<Container> liveContainers = ServiceApiUtil
+            .getLiveContainers(service, toUpgrade);
+
+        return processContainerUpgrade(ugi, service, liveContainers);
+      }
+    } catch (AccessControlException e) {
+      return formatResponse(Response.Status.FORBIDDEN, e.getMessage());
+    } catch (YarnException e) {
+      return formatResponse(Response.Status.BAD_REQUEST, e.getMessage());
+    } catch (IOException | InterruptedException e) {
+      return formatResponse(Response.Status.INTERNAL_SERVER_ERROR,
+          e.getMessage());
+    } catch (UndeclaredThrowableException e) {
+      return formatResponse(Response.Status.INTERNAL_SERVER_ERROR,
+          e.getCause().getMessage());
+    }
+    return Response.status(Status.NO_CONTENT).build();
+  }
+
   private Response flexService(Service service, UserGroupInformation ugi)
       throws IOException, InterruptedException {
     String appName = service.getName();
@@ -511,17 +608,70 @@ public class ApiServer {
       ServiceClient sc = getServiceClient();
       sc.init(YARN_CONFIG);
       sc.start();
-      sc.actionUpgrade(service);
+      sc.initiateUpgrade(service);
       sc.close();
       return null;
     });
-    LOG.info("Service {} version {} upgrade initialized");
+    LOG.info("Service {} version {} upgrade initialized", service.getName(),
+        service.getVersion());
     status.setDiagnostics("Service " + service.getName() +
         " version " + service.getVersion() + " saved.");
     status.setState(ServiceState.ACCEPTED);
     return formatResponse(Status.ACCEPTED, status);
   }
 
+  private Response processContainerUpgrade(UserGroupInformation ugi,
+      Service service, List<Container> containers) throws YarnException,
+      IOException, InterruptedException {
+
+    if (service.getState() != ServiceState.UPGRADING) {
+      throw new YarnException(
+          String.format("The upgrade of service %s has not been initiated.",
+              service.getName()));
+    }
+    for (Container liveContainer : containers) {
+      if (liveContainer.getState() != ContainerState.NEEDS_UPGRADE) {
+        // Nothing to upgrade
+        throw new YarnException(String.format(
+            "The component instance (%s) does not need an upgrade.",
+            liveContainer.getComponentInstanceName()));
+      }
+    }
+
+    Integer result = ugi.doAs((PrivilegedExceptionAction<Integer>) () -> {
+      int result1;
+      ServiceClient sc = getServiceClient();
+      sc.init(YARN_CONFIG);
+      sc.start();
+      result1 = sc.actionUpgrade(service, containers);
+      sc.close();
+      return result1;
+    });
+
+    if (result == EXIT_SUCCESS) {
+      ServiceStatus status = new ServiceStatus();
+      status.setDiagnostics(
+          "Upgrading component instances " + containers.stream()
+              .map(Container::getId).collect(Collectors.joining(",")) + ".");
+      return formatResponse(Response.Status.ACCEPTED, status);
+    }
+    // If result is not a success, consider it a no-op
+    return Response.status(Response.Status.NO_CONTENT).build();
+  }
+
+  private Service getServiceFromClient(UserGroupInformation ugi,
+      String serviceName) throws IOException, InterruptedException {
+
+    return ugi.doAs((PrivilegedExceptionAction<Service>) () -> {
+      ServiceClient sc = getServiceClient();
+      sc.init(YARN_CONFIG);
+      sc.start();
+      Service app1 = sc.getStatus(serviceName);
+      sc.close();
+      return app1;
+    });
+  }
+
   /**
    * Used by negative test case.
    *

http://git-wip-us.apache.org/repos/asf/hadoop/blob/86ae380e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/test/java/org/apache/hadoop/yarn/service/ServiceClientTest.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/test/java/org/apache/hadoop/yarn/service/ServiceClientTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/test/java/org/apache/hadoop/yarn/service/ServiceClientTest.java
index 543c583..cff3e39 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/test/java/org/apache/hadoop/yarn/service/ServiceClientTest.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/test/java/org/apache/hadoop/yarn/service/ServiceClientTest.java
@@ -17,17 +17,24 @@
 
 package org.apache.hadoop.yarn.service;
 
-import java.io.IOException;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException;
 import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.service.api.records.Artifact;
+import org.apache.hadoop.yarn.service.api.records.Component;
+import org.apache.hadoop.yarn.service.api.records.Container;
+import org.apache.hadoop.yarn.service.api.records.ContainerState;
+import org.apache.hadoop.yarn.service.api.records.Resource;
 import org.apache.hadoop.yarn.service.api.records.Service;
 import org.apache.hadoop.yarn.service.client.ServiceClient;
 import org.apache.hadoop.yarn.service.utils.ServiceApiUtil;
 import org.apache.hadoop.yarn.service.utils.SliderFileSystem;
 
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
 /**
  * A mock version of ServiceClient - This class is design
  * to simulate various error conditions that will happen
@@ -36,15 +43,32 @@ import org.apache.hadoop.yarn.service.utils.SliderFileSystem;
 public class ServiceClientTest extends ServiceClient {
 
   private Configuration conf = new Configuration();
-
-  protected static void init() {
-  }
+  private Service goodServiceStatus = buildLiveGoodService();
+  private boolean initialized;
 
   public ServiceClientTest() {
     super();
   }
 
   @Override
+  public void init(Configuration conf) {
+    if (!initialized) {
+      super.init(conf);
+      initialized = true;
+    }
+  }
+
+  @Override
+  public void stop() {
+    // This is needed for testing  API Server which use client to get status
+    // and then perform an action.
+  }
+
+  public void forceStop() {
+    super.stop();
+  }
+
+  @Override
   public Configuration getConfig() {
     return conf;
   }
@@ -58,11 +82,8 @@ public class ServiceClientTest extends ServiceClient {
 
   @Override
   public Service getStatus(String appName) {
-    if (appName == null) {
-      throw new NullPointerException();
-    }
-    if (appName.equals("jenkins")) {
-      return new Service();
+    if (appName != null && appName.equals("jenkins")) {
+      return goodServiceStatus;
     } else {
       throw new IllegalArgumentException();
     }
@@ -71,10 +92,7 @@ public class ServiceClientTest extends ServiceClient {
   @Override
   public int actionStart(String serviceName)
       throws YarnException, IOException {
-    if (serviceName == null) {
-      throw new NullPointerException();
-    }
-    if (serviceName.equals("jenkins")) {
+    if (serviceName != null && serviceName.equals("jenkins")) {
       return EXIT_SUCCESS;
     } else {
       throw new ApplicationNotFoundException("");
@@ -98,19 +116,77 @@ public class ServiceClientTest extends ServiceClient {
 
   @Override
   public int actionDestroy(String serviceName) {
-    if (serviceName == null) {
-      throw new NullPointerException();
+    if (serviceName != null) {
+      if (serviceName.equals("jenkins")) {
+        return EXIT_SUCCESS;
+      } else if (serviceName.equals("jenkins-already-stopped")) {
+        return EXIT_SUCCESS;
+      } else if (serviceName.equals("jenkins-doesn't-exist")) {
+        return EXIT_NOT_FOUND;
+      } else if (serviceName.equals("jenkins-error-cleaning-registry")) {
+        return EXIT_OTHER_FAILURE;
+      }
     }
-    if (serviceName.equals("jenkins")) {
+    throw new IllegalArgumentException();
+  }
+
+  @Override
+  public int initiateUpgrade(Service service) throws YarnException,
+      IOException {
+    if (service.getName() != null && service.getName().equals("jenkins")) {
       return EXIT_SUCCESS;
-    } else if (serviceName.equals("jenkins-already-stopped")) {
+    } else {
+      throw new IllegalArgumentException();
+    }
+  }
+
+  @Override
+  public int actionUpgrade(Service service, List<Container> compInstances)
+      throws IOException, YarnException {
+    if (service.getName() != null && service.getName().equals("jenkins")) {
       return EXIT_SUCCESS;
-    } else if (serviceName.equals("jenkins-doesn't-exist")) {
-      return EXIT_NOT_FOUND;
-    } else if (serviceName.equals("jenkins-error-cleaning-registry")) {
-      return EXIT_OTHER_FAILURE;
     } else {
       throw new IllegalArgumentException();
     }
   }
+
+  Service getGoodServiceStatus() {
+    return goodServiceStatus;
+  }
+
+  static Service buildGoodService() {
+    Service service = new Service();
+    service.setName("jenkins");
+    service.setVersion("v1");
+    Artifact artifact = new Artifact();
+    artifact.setType(Artifact.TypeEnum.DOCKER);
+    artifact.setId("jenkins:latest");
+    Resource resource = new Resource();
+    resource.setCpus(1);
+    resource.setMemory("2048");
+    List<Component> components = new ArrayList<>();
+    Component c = new Component();
+    c.setName("jenkins");
+    c.setNumberOfContainers(2L);
+    c.setArtifact(artifact);
+    c.setLaunchCommand("");
+    c.setResource(resource);
+    components.add(c);
+    service.setComponents(components);
+    return service;
+  }
+
+  static Service buildLiveGoodService() {
+    Service service = buildGoodService();
+    Component comp = service.getComponents().iterator().next();
+    List<Container> containers = new ArrayList<>();
+    for (int i = 0; i < comp.getNumberOfContainers(); i++) {
+      Container container = new Container();
+      container.setComponentInstanceName(comp.getName() + "-" + (i + 1));
+      container.setState(ContainerState.READY);
+      containers.add(container);
+    }
+    comp.setContainers(containers);
+    return service;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/86ae380e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/test/java/org/apache/hadoop/yarn/service/TestApiServer.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/test/java/org/apache/hadoop/yarn/service/TestApiServer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/test/java/org/apache/hadoop/yarn/service/TestApiServer.java
index 72c6e2f..85c3cd4 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/test/java/org/apache/hadoop/yarn/service/TestApiServer.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/test/java/org/apache/hadoop/yarn/service/TestApiServer.java
@@ -35,12 +35,14 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.service.api.records.Artifact;
 import org.apache.hadoop.yarn.service.api.records.Artifact.TypeEnum;
 import org.apache.hadoop.yarn.service.api.records.Component;
+import org.apache.hadoop.yarn.service.api.records.Container;
+import org.apache.hadoop.yarn.service.api.records.ContainerState;
 import org.apache.hadoop.yarn.service.api.records.Resource;
 import org.apache.hadoop.yarn.service.api.records.Service;
 import org.apache.hadoop.yarn.service.api.records.ServiceState;
 import org.apache.hadoop.yarn.service.api.records.ServiceStatus;
-import org.apache.hadoop.yarn.service.client.ServiceClient;
 import org.apache.hadoop.yarn.service.webapp.ApiServer;
+import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.Mockito;
@@ -52,13 +54,14 @@ import org.mockito.Mockito;
 public class TestApiServer {
   private ApiServer apiServer;
   private HttpServletRequest request;
+  private ServiceClientTest mockServerClient;
 
   @Before
   public void setup() throws Exception {
     request = Mockito.mock(HttpServletRequest.class);
     Mockito.when(request.getRemoteUser())
         .thenReturn(System.getProperty("user.name"));
-    ServiceClient mockServerClient = new ServiceClientTest();
+    mockServerClient = new ServiceClientTest();
     Configuration conf = new Configuration();
     conf.set("yarn.api-service.service.client.class",
         ServiceClientTest.class.getName());
@@ -66,6 +69,11 @@ public class TestApiServer {
     apiServer.setServiceClient(mockServerClient);
   }
 
+  @After
+  public void teardown() {
+    mockServerClient.forceStop();
+  }
+
   @Test
   public void testPathAnnotation() {
     assertNotNull(this.apiServer.getClass().getAnnotation(Path.class));
@@ -107,24 +115,7 @@ public class TestApiServer {
     BufferedWriter bw = new BufferedWriter(new FileWriter(dockerConfig));
     bw.write(json);
     bw.close();
-    Service service = new Service();
-    service.setName("jenkins");
-    service.setVersion("v1");
-    Artifact artifact = new Artifact();
-    artifact.setType(TypeEnum.DOCKER);
-    artifact.setId("jenkins:latest");
-    Resource resource = new Resource();
-    resource.setCpus(1);
-    resource.setMemory("2048");
-    List<Component> components = new ArrayList<Component>();
-    Component c = new Component();
-    c.setName("jenkins");
-    c.setNumberOfContainers(1L);
-    c.setArtifact(artifact);
-    c.setLaunchCommand("");
-    c.setResource(resource);
-    components.add(c);
-    service.setComponents(components);
+    Service service = ServiceClientTest.buildGoodService();
     final Response actual = apiServer.createService(request, service);
     assertEquals("Create service is ",
         Response.status(Status.ACCEPTED).build().getStatus(),
@@ -495,4 +486,60 @@ public class TestApiServer {
             + "that in the URI path (jenkins-master)",
         serviceStatus.getDiagnostics());
   }
+
+  @Test
+  public void testInitiateUpgrade() {
+    Service goodService = ServiceClientTest.buildLiveGoodService();
+    goodService.setVersion("v2");
+    goodService.setState(ServiceState.UPGRADING);
+    final Response actual = apiServer.updateService(request,
+        goodService.getName(), goodService);
+    assertEquals("Initiate upgrade is ",
+        Response.status(Status.ACCEPTED).build().getStatus(),
+        actual.getStatus());
+  }
+
+  @Test
+  public void testUpgradeSingleInstance() {
+    Service goodService = ServiceClientTest.buildLiveGoodService();
+    Component comp = goodService.getComponents().iterator().next();
+    Container container = comp.getContainers().iterator().next();
+    container.setState(ContainerState.UPGRADING);
+
+    // To be able to upgrade, the service needs to be in UPGRADING
+    // and container state needs to be in NEEDS_UPGRADE.
+    Service serviceStatus = mockServerClient.getGoodServiceStatus();
+    serviceStatus.setState(ServiceState.UPGRADING);
+    serviceStatus.getComponents().iterator().next().getContainers().iterator()
+        .next().setState(ContainerState.NEEDS_UPGRADE);
+
+    final Response actual = apiServer.updateComponentInstance(request,
+        goodService.getName(), comp.getName(),
+        container.getComponentInstanceName(), container);
+    assertEquals("Instance upgrade is ",
+        Response.status(Status.ACCEPTED).build().getStatus(),
+        actual.getStatus());
+  }
+
+  @Test
+  public void testUpgradeMultipleInstances() {
+    Service goodService = ServiceClientTest.buildLiveGoodService();
+    Component comp = goodService.getComponents().iterator().next();
+    comp.getContainers().forEach(container ->
+        container.setState(ContainerState.UPGRADING));
+
+    // To be able to upgrade, the service needs to be in UPGRADING
+    // and container state needs to be in NEEDS_UPGRADE.
+    Service serviceStatus = mockServerClient.getGoodServiceStatus();
+    serviceStatus.setState(ServiceState.UPGRADING);
+    serviceStatus.getComponents().iterator().next().getContainers().forEach(
+        container -> container.setState(ContainerState.NEEDS_UPGRADE)
+    );
+
+    final Response actual = apiServer.updateComponentInstances(request,
+        goodService.getName(), comp.getContainers());
+    assertEquals("Instance upgrade is ",
+        Response.status(Status.ACCEPTED).build().getStatus(),
+        actual.getStatus());
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/86ae380e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/test/java/org/apache/hadoop/yarn/service/client/TestApiServiceClient.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/test/java/org/apache/hadoop/yarn/service/client/TestApiServiceClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/test/java/org/apache/hadoop/yarn/service/client/TestApiServiceClient.java
index ffd9328..a245144 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/test/java/org/apache/hadoop/yarn/service/client/TestApiServiceClient.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/test/java/org/apache/hadoop/yarn/service/client/TestApiServiceClient.java
@@ -26,6 +26,7 @@ import javax.servlet.http.HttpServlet;
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
 
+import com.google.common.collect.Lists;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.eclipse.jetty.server.Server;
@@ -256,4 +257,29 @@ public class TestApiServiceClient {
     }
   }
 
+  @Test
+  public void testInitiateServiceUpgrade() {
+    String appName = "example-app";
+    String upgradeFileName = "target/test-classes/example-app.json";
+    try {
+      int result = asc.initiateUpgrade(appName, upgradeFileName, false);
+      assertEquals(EXIT_SUCCESS, result);
+    } catch (IOException | YarnException e) {
+      fail();
+    }
+  }
+
+  @Test
+  public void testInstancesUpgrade() {
+    String appName = "example-app";
+    try {
+      int result = asc.actionUpgradeInstances(appName, Lists.newArrayList(
+          "comp-1", "comp-2"));
+      assertEquals(EXIT_SUCCESS, result);
+    } catch (IOException | YarnException e) {
+      fail();
+    }
+  }
+
+
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/86ae380e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ClientAMProtocol.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ClientAMProtocol.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ClientAMProtocol.java
index 4422451..45ff98a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ClientAMProtocol.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ClientAMProtocol.java
@@ -19,6 +19,8 @@
 package org.apache.hadoop.yarn.service;
 
 import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.proto.ClientAMProtocol.CompInstancesUpgradeResponseProto;
+import org.apache.hadoop.yarn.proto.ClientAMProtocol.CompInstancesUpgradeRequestProto;
 import org.apache.hadoop.yarn.proto.ClientAMProtocol.FlexComponentsRequestProto;
 import org.apache.hadoop.yarn.proto.ClientAMProtocol.FlexComponentsResponseProto;
 import org.apache.hadoop.yarn.proto.ClientAMProtocol.GetStatusResponseProto;
@@ -49,4 +51,8 @@ public interface ClientAMProtocol {
 
   RestartServiceResponseProto restart(RestartServiceRequestProto request)
       throws IOException, YarnException;
+
+  CompInstancesUpgradeResponseProto upgrade(
+      CompInstancesUpgradeRequestProto request) throws IOException,
+      YarnException;
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/86ae380e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ClientAMService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ClientAMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ClientAMService.java
index 3d037e7..d5d6fa4 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ClientAMService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ClientAMService.java
@@ -26,8 +26,11 @@ import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.service.AbstractService;
 import org.apache.hadoop.util.ExitUtil;
 import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.proto.ClientAMProtocol.CompInstancesUpgradeRequestProto;
+import org.apache.hadoop.yarn.proto.ClientAMProtocol.CompInstancesUpgradeResponseProto;
 import org.apache.hadoop.yarn.proto.ClientAMProtocol.ComponentCountProto;
 import org.apache.hadoop.yarn.proto.ClientAMProtocol.FlexComponentsRequestProto;
 import org.apache.hadoop.yarn.proto.ClientAMProtocol.FlexComponentsResponseProto;
@@ -40,6 +43,8 @@ import org.apache.hadoop.yarn.proto.ClientAMProtocol.StopResponseProto;
 import org.apache.hadoop.yarn.proto.ClientAMProtocol.UpgradeServiceRequestProto;
 import org.apache.hadoop.yarn.proto.ClientAMProtocol.UpgradeServiceResponseProto;
 import org.apache.hadoop.yarn.service.component.ComponentEvent;
+import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEvent;
+import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEventType;
 import org.apache.hadoop.yarn.service.utils.ServiceApiUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -151,12 +156,16 @@ public class ClientAMService extends AbstractService
   @Override
   public UpgradeServiceResponseProto upgrade(
       UpgradeServiceRequestProto request) throws IOException {
-    ServiceEvent event = new ServiceEvent(ServiceEventType.UPGRADE);
-    event.setVersion(request.getVersion());
-    context.scheduler.getDispatcher().getEventHandler().handle(event);
-    LOG.info("Upgrading service to version {} by {}", request.getVersion(),
-        UserGroupInformation.getCurrentUser());
-    return UpgradeServiceResponseProto.newBuilder().build();
+    try {
+      context.getServiceManager().processUpgradeRequest(request.getVersion(),
+          request.getAutoFinalize());
+      LOG.info("Upgrading service to version {} by {}", request.getVersion(),
+          UserGroupInformation.getCurrentUser());
+      return UpgradeServiceResponseProto.newBuilder().build();
+    } catch (Exception ex) {
+      return UpgradeServiceResponseProto.newBuilder().setError(ex.getMessage())
+          .build();
+    }
   }
 
   @Override
@@ -167,4 +176,21 @@ public class ClientAMService extends AbstractService
     LOG.info("Restart service by {}", UserGroupInformation.getCurrentUser());
     return RestartServiceResponseProto.newBuilder().build();
   }
+
+  @Override
+  public CompInstancesUpgradeResponseProto upgrade(
+      CompInstancesUpgradeRequestProto request)
+      throws IOException, YarnException {
+    if (!request.getContainerIdsList().isEmpty()) {
+
+      for (String containerId : request.getContainerIdsList()) {
+        ComponentInstanceEvent event =
+            new ComponentInstanceEvent(ContainerId.fromString(containerId),
+                ComponentInstanceEventType.UPGRADE);
+        LOG.info("Upgrade container {}", containerId);
+        context.scheduler.getDispatcher().getEventHandler().handle(event);
+      }
+    }
+    return CompInstancesUpgradeResponseProto.newBuilder().build();
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/86ae380e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceContext.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceContext.java
index cd41ab7..6c91b9c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceContext.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceContext.java
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.yarn.service;
 
+import com.google.common.base.Preconditions;
 import com.google.common.cache.LoadingCache;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.security.client.ClientToAMTokenSecretManager;
@@ -42,8 +43,17 @@ public class ServiceContext {
   public String principal;
   // AM keytab location
   public String keytab;
+  private ServiceManager serviceManager;
 
   public ServiceContext() {
 
   }
+
+  public ServiceManager getServiceManager() {
+    return serviceManager;
+  }
+
+  void setServiceManager(ServiceManager serviceManager) {
+    this.serviceManager = Preconditions.checkNotNull(serviceManager);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/86ae380e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceEvent.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceEvent.java
index 9e7d442..0196be2 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceEvent.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceEvent.java
@@ -28,6 +28,7 @@ public class ServiceEvent extends AbstractEvent<ServiceEventType> {
 
   private final ServiceEventType type;
   private String version;
+  private boolean autoFinalize;
 
   public ServiceEvent(ServiceEventType serviceEventType) {
     super(serviceEventType);
@@ -46,4 +47,13 @@ public class ServiceEvent extends AbstractEvent<ServiceEventType> {
     this.version = version;
     return this;
   }
+
+  public boolean isAutoFinalize() {
+    return autoFinalize;
+  }
+
+  public ServiceEvent setAutoFinalize(boolean autoFinalize) {
+    this.autoFinalize = autoFinalize;
+    return this;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/86ae380e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceEventType.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceEventType.java
index 2162eb5..4fc420b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceEventType.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceEventType.java
@@ -24,5 +24,5 @@ package org.apache.hadoop.yarn.service;
 public enum ServiceEventType {
   START,
   UPGRADE,
-  STOP_UPGRADE
+  CHECK_STABLE
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/86ae380e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceManager.java
index a3fbe89..869d7f3 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceManager.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -24,6 +24,7 @@ import org.apache.hadoop.yarn.event.AsyncDispatcher;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.service.api.records.Service;
 import org.apache.hadoop.yarn.service.api.records.ServiceState;
+import org.apache.hadoop.yarn.service.component.Component;
 import org.apache.hadoop.yarn.service.component.ComponentEvent;
 import org.apache.hadoop.yarn.service.component.ComponentEventType;
 import org.apache.hadoop.yarn.service.utils.ServiceApiUtil;
@@ -39,10 +40,13 @@ import java.io.IOException;
 import java.text.MessageFormat;
 import java.util.EnumSet;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
+import static org.apache.hadoop.yarn.service.utils.ServiceApiUtil.jsonSerDeser;
+
 /**
- * Manages the state of the service.
+ * Manages the state of Service.
  */
 public class ServiceManager implements EventHandler<ServiceEvent> {
   private static final Logger LOG = LoggerFactory.getLogger(
@@ -56,10 +60,10 @@ public class ServiceManager implements EventHandler<ServiceEvent> {
 
   private final StateMachine<State, ServiceEventType, ServiceEvent>
       stateMachine;
+  private final UpgradeComponentsFinder componentsFinder;
 
   private final AsyncDispatcher dispatcher;
   private final SliderFileSystem fs;
-  private final UpgradeComponentsFinder componentsFinder;
 
   private String upgradeVersion;
 
@@ -72,9 +76,16 @@ public class ServiceManager implements EventHandler<ServiceEvent> {
               State.UPGRADING), ServiceEventType.UPGRADE,
               new StartUpgradeTransition())
 
+          .addTransition(State.STABLE, EnumSet.of(State.STABLE),
+              ServiceEventType.CHECK_STABLE, new CheckStableTransition())
+
           .addTransition(State.UPGRADING, EnumSet.of(State.STABLE,
               State.UPGRADING), ServiceEventType.START,
-              new StopUpgradeTransition())
+              new CheckStableTransition())
+
+          .addTransition(State.UPGRADING, EnumSet.of(State.STABLE,
+              State.UPGRADING), ServiceEventType.CHECK_STABLE,
+              new CheckStableTransition())
           .installTopology();
 
   public ServiceManager(ServiceContext context) {
@@ -102,7 +113,7 @@ public class ServiceManager implements EventHandler<ServiceEvent> {
         stateMachine.doTransition(event.getType(), event);
       } catch (InvalidStateTransitionException e) {
         LOG.error(MessageFormat.format(
-            "[SERVICE]: Invalid event {0} at {1}.", event.getType(),
+            "[SERVICE]: Invalid event {1} at {2}.", event.getType(),
             oldState), e);
       }
       if (oldState != getState()) {
@@ -130,22 +141,11 @@ public class ServiceManager implements EventHandler<ServiceEvent> {
     public State transition(ServiceManager serviceManager,
         ServiceEvent event) {
       try {
-        Service targetSpec = ServiceApiUtil.loadServiceUpgrade(
-            serviceManager.fs, serviceManager.getName(), event.getVersion());
-
-        serviceManager.serviceSpec.setState(ServiceState.UPGRADING);
-        List<org.apache.hadoop.yarn.service.api.records.Component>
-            compsThatNeedUpgrade = serviceManager.componentsFinder.
-            findTargetComponentSpecs(serviceManager.serviceSpec, targetSpec);
-
-        if (compsThatNeedUpgrade != null && !compsThatNeedUpgrade.isEmpty()) {
-          compsThatNeedUpgrade.forEach(component -> {
-            ComponentEvent needUpgradeEvent = new ComponentEvent(
-                component.getName(), ComponentEventType.UPGRADE).
-                setTargetSpec(component);
-            serviceManager.dispatcher.getEventHandler().handle(
-                needUpgradeEvent);
-          });
+        if (!event.isAutoFinalize()) {
+          serviceManager.serviceSpec.setState(ServiceState.UPGRADING);
+        } else {
+          serviceManager.serviceSpec.setState(
+              ServiceState.UPGRADING_AUTO_FINALIZE);
         }
         serviceManager.upgradeVersion = event.getVersion();
         return State.UPGRADING;
@@ -157,22 +157,29 @@ public class ServiceManager implements EventHandler<ServiceEvent> {
     }
   }
 
-  private static class StopUpgradeTransition implements
+  private static class CheckStableTransition implements
       MultipleArcTransition<ServiceManager, ServiceEvent, State> {
 
     @Override
     public State transition(ServiceManager serviceManager,
         ServiceEvent event) {
-      //abort is not supported currently
-      //trigger re-check of service state
-      ServiceMaster.checkAndUpdateServiceState(serviceManager.scheduler,
-          true);
-      if (serviceManager.serviceSpec.getState().equals(ServiceState.STABLE)) {
-        return serviceManager.finalizeUpgrade() ? State.STABLE :
-            State.UPGRADING;
-      } else {
-        return State.UPGRADING;
+      //trigger check of service state
+      ServiceState currState = serviceManager.serviceSpec.getState();
+      if (currState.equals(ServiceState.STABLE)) {
+        return State.STABLE;
       }
+      if (currState.equals(ServiceState.UPGRADING_AUTO_FINALIZE) ||
+          event.getType().equals(ServiceEventType.START)) {
+        ServiceState targetState = checkIfStable(serviceManager.serviceSpec);
+        if (targetState.equals(ServiceState.STABLE)) {
+          if (serviceManager.finalizeUpgrade()) {
+            LOG.info("Service def state changed from {} -> {}", currState,
+                serviceManager.serviceSpec.getState());
+            return State.STABLE;
+          }
+        }
+      }
+      return State.UPGRADING;
     }
   }
 
@@ -181,12 +188,21 @@ public class ServiceManager implements EventHandler<ServiceEvent> {
    */
   private boolean finalizeUpgrade() {
     try {
-      Service upgradeSpec = ServiceApiUtil.loadServiceUpgrade(
+      // save the application id and state to
+      Service targetSpec = ServiceApiUtil.loadServiceUpgrade(
           fs, getName(), upgradeVersion);
-      ServiceApiUtil.writeAppDefinition(fs,
-          ServiceApiUtil.getServiceJsonPath(fs, getName()), upgradeSpec);
+      targetSpec.setId(serviceSpec.getId());
+      targetSpec.setState(ServiceState.STABLE);
+      Map<String, Component> allComps = scheduler.getAllComponents();
+      targetSpec.getComponents().forEach(compSpec -> {
+        Component comp = allComps.get(compSpec.getName());
+        compSpec.setState(comp.getComponentSpec().getState());
+      });
+      jsonSerDeser.save(fs.getFileSystem(),
+          ServiceApiUtil.getServiceJsonPath(fs, getName()), targetSpec, true);
+      fs.deleteClusterUpgradeDir(getName(), upgradeVersion);
     } catch (IOException e) {
-      LOG.error("Upgrade did not complete because unable to overwrite the" +
+      LOG.error("Upgrade did not complete because unable to re-write the" +
           " service definition", e);
       return false;
     }
@@ -195,13 +211,79 @@ public class ServiceManager implements EventHandler<ServiceEvent> {
       fs.deleteClusterUpgradeDir(getName(), upgradeVersion);
     } catch (IOException e) {
       LOG.warn("Unable to delete upgrade definition for service {} " +
-              "version {}", getName(), upgradeVersion);
+          "version {}", getName(), upgradeVersion);
     }
+    serviceSpec.setState(ServiceState.STABLE);
     serviceSpec.setVersion(upgradeVersion);
     upgradeVersion = null;
     return true;
   }
 
+  private static ServiceState checkIfStable(Service service) {
+    // if desired == running
+    for (org.apache.hadoop.yarn.service.api.records.Component comp :
+        service.getComponents()) {
+      if (!comp.getState().equals(
+          org.apache.hadoop.yarn.service.api.records.ComponentState.STABLE)) {
+        return service.getState();
+      }
+    }
+    return ServiceState.STABLE;
+  }
+
+  /**
+   * Service state gets directly modified by ServiceMaster and Component.
+   * This is a problem for upgrade and flexing. For now, invoking
+   * ServiceMaster.checkAndUpdateServiceState here to make it easy to fix
+   * this in future.
+   */
+  public void checkAndUpdateServiceState(boolean isIncrement) {
+    writeLock.lock();
+    try {
+      if (!getState().equals(State.UPGRADING)) {
+        ServiceMaster.checkAndUpdateServiceState(this.scheduler,
+            isIncrement);
+      }
+    } finally {
+      writeLock.unlock();
+    }
+  }
+
+  void processUpgradeRequest(String upgradeVersion,
+      boolean autoFinalize) throws IOException {
+    Service targetSpec = ServiceApiUtil.loadServiceUpgrade(
+        context.fs, context.service.getName(), upgradeVersion);
+
+    List<org.apache.hadoop.yarn.service.api.records.Component>
+        compsThatNeedUpgrade = componentsFinder.
+        findTargetComponentSpecs(context.service, targetSpec);
+    ServiceEvent event = new ServiceEvent(ServiceEventType.UPGRADE)
+        .setVersion(upgradeVersion)
+        .setAutoFinalize(autoFinalize);
+    context.scheduler.getDispatcher().getEventHandler().handle(event);
+
+    if (compsThatNeedUpgrade != null && !compsThatNeedUpgrade.isEmpty()) {
+      if (autoFinalize) {
+        event.setAutoFinalize(true);
+      }
+      compsThatNeedUpgrade.forEach(component -> {
+        ComponentEvent needUpgradeEvent = new ComponentEvent(
+            component.getName(), ComponentEventType.UPGRADE)
+            .setTargetSpec(component)
+            .setUpgradeVersion(event.getVersion());
+        context.scheduler.getDispatcher().getEventHandler().handle(
+            needUpgradeEvent);
+      });
+    } else {
+      // nothing to upgrade if upgrade auto finalize is requested, trigger a
+      // state check.
+      if (autoFinalize) {
+        context.scheduler.getDispatcher().getEventHandler().handle(
+            new ServiceEvent(ServiceEventType.CHECK_STABLE));
+      }
+    }
+  }
+
   /**
    * Returns the name of the service.
    */
@@ -216,10 +298,8 @@ public class ServiceManager implements EventHandler<ServiceEvent> {
     STABLE, UPGRADING
   }
 
-
   @VisibleForTesting
   Service getServiceSpec() {
     return serviceSpec;
   }
-
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/86ae380e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java
index 8d01410..ee0a1a7 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java
@@ -329,6 +329,7 @@ public class ServiceScheduler extends CompositeService {
     // Since AM has been started and registered, the service is in STARTED state
     app.setState(ServiceState.STARTED);
     serviceManager = new ServiceManager(context);
+    context.setServiceManager(serviceManager);
 
     // recover components based on containers sent from RM
     recoverComponents(response);
@@ -757,6 +758,32 @@ public class ServiceScheduler extends CompositeService {
       // automatically which will trigger stopping COMPONENT INSTANCE
     }
 
+    @Override
+    public void onContainerReInitialize(ContainerId containerId) {
+      ComponentInstance instance = liveInstances.get(containerId);
+      if (instance == null) {
+        LOG.error("No component instance exists for {}", containerId);
+        return;
+      }
+      ComponentInstanceEvent becomeReadyEvent = new ComponentInstanceEvent(
+          containerId, ComponentInstanceEventType.BECOME_READY);
+      dispatcher.getEventHandler().handle(becomeReadyEvent);
+    }
+
+    @Override
+    public void onContainerReInitializeError(ContainerId containerId,
+        Throwable t) {
+      ComponentInstance instance = liveInstances.get(containerId);
+      if (instance == null) {
+        LOG.error("No component instance exists for {}", containerId);
+        return;
+      }
+      ComponentEvent event = new ComponentEvent(instance.getCompName(),
+          ComponentEventType.CONTAINER_COMPLETED)
+          .setInstance(instance).setContainerId(containerId);
+      dispatcher.getEventHandler().handle(event);
+    }
+
     @Override public void onContainerResourceIncreased(ContainerId containerId,
         Resource resource) {
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/86ae380e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/Component.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/Component.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/Component.java
index 667b1aa..7deb076 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/Component.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/Component.java
@@ -250,6 +250,15 @@ public class Component implements Serializable {
     return null;
   }
 
+  public Container getComponentInstance(String compInstanceName) {
+    for (Container container : containers) {
+      if (compInstanceName.equals(container.getComponentInstanceName())) {
+        return container;
+      }
+    }
+    return null;
+  }
+
   /**
    * Run all containers of this component in privileged mode (YARN-4262).
    **/
@@ -441,4 +450,16 @@ public class Component implements Serializable {
       this.setReadinessCheck(that.getReadinessCheck());
     }
   }
+
+  public void overwrite(Component that) {
+    setArtifact(that.getArtifact());
+    setResource(that.resource);
+    setNumberOfContainers(that.getNumberOfContainers());
+    setLaunchCommand(that.getLaunchCommand());
+    setConfiguration(that.configuration);
+    setRunPrivilegedContainer(that.getRunPrivilegedContainer());
+    setDependencies(that.getDependencies());
+    setPlacementPolicy(that.getPlacementPolicy());
+    setReadinessCheck(that.getReadinessCheck());
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/86ae380e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ContainerState.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ContainerState.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ContainerState.java
index bf09ff2..6e39073 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ContainerState.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ContainerState.java
@@ -26,5 +26,5 @@ import org.apache.hadoop.classification.InterfaceStability;
 @InterfaceAudience.Public
 @InterfaceStability.Unstable
 public enum ContainerState {
-  RUNNING_BUT_UNREADY, READY, STOPPED
+  RUNNING_BUT_UNREADY, READY, STOPPED, NEEDS_UPGRADE, UPGRADING;
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/86ae380e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ServiceState.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ServiceState.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ServiceState.java
index 286eaa2..b6ae38b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ServiceState.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ServiceState.java
@@ -29,5 +29,6 @@ import org.apache.hadoop.classification.InterfaceStability;
 @ApiModel(description = "The current state of an service.")
 @javax.annotation.Generated(value = "class io.swagger.codegen.languages.JavaClientCodegen", date = "2016-06-02T08:15:05.615-07:00")
 public enum ServiceState {
-  ACCEPTED, STARTED, STABLE, STOPPED, FAILED, FLEX, UPGRADING;
+  ACCEPTED, STARTED, STABLE, STOPPED, FAILED, FLEX, UPGRADING,
+  UPGRADING_AUTO_FINALIZE;
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/86ae380e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/client/ServiceClient.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/client/ServiceClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/client/ServiceClient.java
index 453619b..52cd369 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/client/ServiceClient.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/client/ServiceClient.java
@@ -52,6 +52,7 @@ import org.apache.hadoop.yarn.client.util.YarnClientUtils;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.proto.ClientAMProtocol.CompInstancesUpgradeRequestProto;
 import org.apache.hadoop.yarn.proto.ClientAMProtocol.ComponentCountProto;
 import org.apache.hadoop.yarn.proto.ClientAMProtocol.FlexComponentsRequestProto;
 import org.apache.hadoop.yarn.proto.ClientAMProtocol.GetStatusRequestProto;
@@ -59,8 +60,10 @@ import org.apache.hadoop.yarn.proto.ClientAMProtocol.GetStatusResponseProto;
 import org.apache.hadoop.yarn.proto.ClientAMProtocol.RestartServiceRequestProto;
 import org.apache.hadoop.yarn.proto.ClientAMProtocol.StopRequestProto;
 import org.apache.hadoop.yarn.proto.ClientAMProtocol.UpgradeServiceRequestProto;
+import org.apache.hadoop.yarn.proto.ClientAMProtocol.UpgradeServiceResponseProto;
 import org.apache.hadoop.yarn.service.ClientAMProtocol;
 import org.apache.hadoop.yarn.service.ServiceMaster;
+import org.apache.hadoop.yarn.service.api.records.Container;
 import org.apache.hadoop.yarn.service.api.records.Component;
 import org.apache.hadoop.yarn.service.api.records.Service;
 import org.apache.hadoop.yarn.service.api.records.ServiceState;
@@ -206,15 +209,21 @@ public class ServiceClient extends AppAdminClient implements SliderExitCodes,
   }
 
   @Override
-  public int actionUpgrade(String appName, String fileName)
+  public int initiateUpgrade(String appName, String fileName,
+      boolean autoFinalize)
       throws IOException, YarnException {
-    checkAppExistOnHdfs(appName);
     Service upgradeService = loadAppJsonFromLocalFS(fileName, appName,
         null, null);
-    return actionUpgrade(upgradeService);
+    if (autoFinalize) {
+      upgradeService.setState(ServiceState.UPGRADING_AUTO_FINALIZE);
+    } else {
+      upgradeService.setState(ServiceState.UPGRADING);
+    }
+    return initiateUpgrade(upgradeService);
   }
 
-  public int actionUpgrade(Service service) throws YarnException, IOException {
+  public int initiateUpgrade(Service service) throws YarnException,
+      IOException {
     Service persistedService =
         ServiceApiUtil.loadService(fs, service.getName());
     if (!StringUtils.isEmpty(persistedService.getId())) {
@@ -231,6 +240,15 @@ public class ServiceClient extends AppAdminClient implements SliderExitCodes,
       throw new YarnException(message);
     }
 
+    Service liveService = getStatus(service.getName());
+    if (!liveService.getState().equals(ServiceState.STABLE)) {
+      String message = service.getName() + " is at " +
+          liveService.getState()
+          + " state, upgrade can not be invoked when service is STABLE.";
+      LOG.error(message);
+      throw new YarnException(message);
+    }
+
     Path serviceUpgradeDir = checkAppNotExistOnHdfs(service, true);
     ServiceApiUtil.validateAndResolveService(service, fs, getConfig());
     ServiceApiUtil.createDirAndPersistApp(fs, serviceUpgradeDir, service);
@@ -245,8 +263,56 @@ public class ServiceClient extends AppAdminClient implements SliderExitCodes,
     UpgradeServiceRequestProto.Builder requestBuilder =
         UpgradeServiceRequestProto.newBuilder();
     requestBuilder.setVersion(service.getVersion());
+    if (service.getState().equals(ServiceState.UPGRADING_AUTO_FINALIZE)) {
+      requestBuilder.setAutoFinalize(true);
+    }
+    UpgradeServiceResponseProto responseProto = proxy.upgrade(
+        requestBuilder.build());
+    if (responseProto.hasError()) {
+      LOG.error("Service {} upgrade to version {} failed because {}",
+          service.getName(), service.getVersion(), responseProto.getError());
+      throw new YarnException("Failed to upgrade service " + service.getName()
+          + " to version " + service.getVersion() + " because " +
+          responseProto.getError());
+    }
+    return EXIT_SUCCESS;
+  }
+
+  @Override
+  public int actionUpgradeInstances(String appName,
+      List<String> componentInstances) throws IOException, YarnException {
+    checkAppExistOnHdfs(appName);
+    Service persistedService = ServiceApiUtil.loadService(fs, appName);
+    List<Container> containersToUpgrade = ServiceApiUtil.
+        getLiveContainers(persistedService, componentInstances);
+    return actionUpgrade(persistedService, containersToUpgrade);
+  }
 
-    proxy.upgrade(requestBuilder.build());
+  public int actionUpgrade(Service service, List<Container> compInstances)
+      throws IOException, YarnException {
+    ApplicationReport appReport =
+        yarnClient.getApplicationReport(getAppId(service.getName()));
+
+    if (appReport.getYarnApplicationState() != RUNNING) {
+      String message = service.getName() + " is at " +
+          appReport.getYarnApplicationState()
+          + " state, upgrade can only be invoked when service is running.";
+      LOG.error(message);
+      throw new YarnException(message);
+    }
+    if (StringUtils.isEmpty(appReport.getHost())) {
+      throw new YarnException(service.getName() + " AM hostname is empty.");
+    }
+    ClientAMProtocol proxy = createAMProxy(service.getName(), appReport);
+
+    List<String> containerIdsToUpgrade = new ArrayList<>();
+    compInstances.forEach(compInst ->
+        containerIdsToUpgrade.add(compInst.getId()));
+    LOG.info("instances to upgrade {}", containerIdsToUpgrade);
+    CompInstancesUpgradeRequestProto.Builder upgradeRequestBuilder =
+        CompInstancesUpgradeRequestProto.newBuilder();
+    upgradeRequestBuilder.addAllContainerIds(containerIdsToUpgrade);
+    proxy.upgrade(upgradeRequestBuilder.build());
     return EXIT_SUCCESS;
   }
 
@@ -391,6 +457,17 @@ public class ServiceClient extends AppAdminClient implements SliderExitCodes,
       LOG.error(message);
       throw new YarnException(message);
     }
+
+    Service liveService = getStatus(serviceName);
+    if (liveService.getState().equals(ServiceState.UPGRADING) ||
+        liveService.getState().equals(ServiceState.UPGRADING_AUTO_FINALIZE)) {
+      String message = serviceName + " is at " +
+          liveService.getState()
+          + " state, flex can not be invoked when service is upgrading. ";
+      LOG.error(message);
+      throw new YarnException(message);
+    }
+
     if (StringUtils.isEmpty(appReport.getHost())) {
       throw new YarnException(serviceName + " AM hostname is empty");
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message