From common-commits-return-82009-archive-asf-public=cust-asf.ponee.io@hadoop.apache.org Thu Apr 26 23:20:17 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id 19F5718067A for ; Thu, 26 Apr 2018 23:20:14 +0200 (CEST) Received: (qmail 15572 invoked by uid 500); 26 Apr 2018 21:19:51 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 11997 invoked by uid 99); 26 Apr 2018 21:19:49 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 26 Apr 2018 21:19:49 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 2F4ACF57D5; Thu, 26 Apr 2018 21:19:47 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: omalley@apache.org To: common-commits@hadoop.apache.org Date: Thu, 26 Apr 2018 21:20:43 -0000 Message-Id: <761bfa0547ec48b2979c6e21935e1e7b@git.apache.org> In-Reply-To: <22c955dda7834878b16dca8f59b4e137@git.apache.org> References: <22c955dda7834878b16dca8f59b4e137@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [58/68] [abbrv] hadoop git commit: YARN-7939. Added support to upgrade a component instance. Contributed by Chandni Singh YARN-7939. Added support to upgrade a component instance. Contributed by Chandni Singh Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/4a7369b0 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/4a7369b0 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/4a7369b0 Branch: refs/heads/trunk Commit: 4a7369b09547a78f532af66c763c6994a38b5d68 Parents: 83e60cd Author: Eric Yang Authored: Thu Apr 26 15:47:55 2018 -0400 Committer: Owen O'Malley Committed: Thu Apr 26 13:54:40 2018 -0700 ---------------------------------------------------------------------- .../yarn/service/client/ApiServiceClient.java | 86 ++++-- .../hadoop/yarn/service/webapp/ApiServer.java | 188 +++++++++++-- .../hadoop/yarn/service/ServiceClientTest.java | 120 +++++++-- .../hadoop/yarn/service/TestApiServer.java | 87 ++++-- .../service/client/TestApiServiceClient.java | 26 ++ .../hadoop/yarn/service/ClientAMProtocol.java | 6 + .../hadoop/yarn/service/ClientAMService.java | 38 ++- .../hadoop/yarn/service/ServiceContext.java | 10 + .../hadoop/yarn/service/ServiceEvent.java | 10 + .../hadoop/yarn/service/ServiceEventType.java | 2 +- .../hadoop/yarn/service/ServiceManager.java | 156 ++++++++--- .../hadoop/yarn/service/ServiceScheduler.java | 27 ++ .../yarn/service/api/records/Component.java | 21 ++ .../service/api/records/ContainerState.java | 2 +- .../yarn/service/api/records/ServiceState.java | 3 +- .../yarn/service/client/ServiceClient.java | 87 +++++- .../yarn/service/component/Component.java | 153 +++++++++-- .../yarn/service/component/ComponentEvent.java | 10 + .../service/component/ComponentEventType.java | 2 +- .../component/instance/ComponentInstance.java | 50 +++- .../instance/ComponentInstanceEventType.java | 3 +- .../yarn/service/conf/RestApiConstants.java | 12 + .../containerlaunch/ContainerLaunchService.java | 100 ++++++- .../pb/client/ClientAMProtocolPBClientImpl.java | 14 + .../service/ClientAMProtocolPBServiceImpl.java | 12 + .../provider/AbstractProviderService.java | 25 +- .../yarn/service/provider/ProviderService.java | 7 +- .../yarn/service/provider/ProviderUtils.java | 27 +- .../yarn/service/utils/ServiceApiUtil.java | 48 +++- .../src/main/proto/ClientAMProtocol.proto | 11 + .../hadoop/yarn/service/TestServiceAM.java | 5 + .../hadoop/yarn/service/TestServiceManager.java | 122 ++++++++- .../yarn/service/TestYarnNativeServices.java | 34 ++- .../yarn/service/client/TestServiceCLI.java | 90 +++++-- .../yarn/service/client/TestServiceClient.java | 187 +++++++++---- .../yarn/service/component/TestComponent.java | 265 +++++++++++++++++++ .../instance/TestComponentInstance.java | 88 ++++++ .../service/monitor/TestServiceMonitor.java | 1 + .../hadoop/yarn/client/api/AppAdminClient.java | 27 +- .../hadoop/yarn/client/cli/ApplicationCLI.java | 65 +++++ .../hadoop/yarn/client/cli/TestYarnCLI.java | 21 ++ 41 files changed, 1961 insertions(+), 287 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/4a7369b0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/client/ApiServiceClient.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/client/ApiServiceClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/client/ApiServiceClient.java index e4a245d..cdba555 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/client/ApiServiceClient.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/client/ApiServiceClient.java @@ -26,6 +26,7 @@ import java.util.Map; import javax.ws.rs.core.MediaType; +import com.google.common.base.Preconditions; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; @@ -40,11 +41,16 @@ import org.apache.hadoop.yarn.client.api.YarnClient; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.service.api.records.Component; +import org.apache.hadoop.yarn.service.api.records.Container; +import org.apache.hadoop.yarn.service.api.records.ContainerState; import org.apache.hadoop.yarn.service.api.records.Service; import org.apache.hadoop.yarn.service.api.records.ServiceState; import org.apache.hadoop.yarn.service.api.records.ServiceStatus; +import org.apache.hadoop.yarn.service.conf.RestApiConstants; +import org.apache.hadoop.yarn.service.utils.JsonSerDeser; import org.apache.hadoop.yarn.service.utils.ServiceApiUtil; import org.apache.hadoop.yarn.util.RMHAUtils; +import org.codehaus.jackson.map.PropertyNamingStrategy; import org.eclipse.jetty.util.UrlEncoded; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -131,7 +137,7 @@ public class ApiServiceClient extends AppAdminClient { * @return URI to API Service * @throws IOException */ - private String getApiUrl(String appName) throws IOException { + private String getServicePath(String appName) throws IOException { String url = getRMWebAddress(); StringBuilder api = new StringBuilder(); api.append(url); @@ -148,23 +154,40 @@ public class ApiServiceClient extends AppAdminClient { return api.toString(); } + private String getInstancesPath(String appName) throws IOException { + Preconditions.checkNotNull(appName); + String url = getRMWebAddress(); + StringBuilder api = new StringBuilder(); + api.append(url); + api.append("/app/v1/services/").append(appName).append("/") + .append(RestApiConstants.COMP_INSTANCES); + Configuration conf = getConfig(); + if (conf.get("hadoop.http.authentication.type").equalsIgnoreCase( + "simple")) { + api.append("?user.name=" + UrlEncoded + .encodeString(System.getProperty("user.name"))); + } + return api.toString(); + } + private Builder getApiClient() throws IOException { - return getApiClient(null); + return getApiClient(getServicePath(null)); } /** * Setup API service web request. * - * @param appName + * @param requestPath * @return * @throws IOException */ - private Builder getApiClient(String appName) throws IOException { + private Builder getApiClient(String requestPath) + throws IOException { Client client = Client.create(getClientConfig()); Configuration conf = getConfig(); client.setChunkedEncodingSize(null); Builder builder = client - .resource(getApiUrl(appName)).type(MediaType.APPLICATION_JSON); + .resource(requestPath).type(MediaType.APPLICATION_JSON); if (conf.get("hadoop.http.authentication.type").equals("kerberos")) { AuthenticatedURL.Token token = new AuthenticatedURL.Token(); builder.header("WWW-Authenticate", token); @@ -312,7 +335,7 @@ public class ApiServiceClient extends AppAdminClient { service.setName(appName); service.setState(ServiceState.STOPPED); String buffer = jsonSerDeser.toJson(service); - ClientResponse response = getApiClient(appName) + ClientResponse response = getApiClient(getServicePath(appName)) .put(ClientResponse.class, buffer); result = processResponse(response); } catch (Exception e) { @@ -335,7 +358,7 @@ public class ApiServiceClient extends AppAdminClient { service.setName(appName); service.setState(ServiceState.STARTED); String buffer = jsonSerDeser.toJson(service); - ClientResponse response = getApiClient(appName) + ClientResponse response = getApiClient(getServicePath(appName)) .put(ClientResponse.class, buffer); result = processResponse(response); } catch (Exception e) { @@ -381,7 +404,7 @@ public class ApiServiceClient extends AppAdminClient { public int actionDestroy(String appName) throws IOException, YarnException { int result = EXIT_SUCCESS; try { - ClientResponse response = getApiClient(appName) + ClientResponse response = getApiClient(getServicePath(appName)) .delete(ClientResponse.class); result = processResponse(response); } catch (Exception e) { @@ -413,7 +436,7 @@ public class ApiServiceClient extends AppAdminClient { service.addComponent(component); } String buffer = jsonSerDeser.toJson(service); - ClientResponse response = getApiClient(appName) + ClientResponse response = getApiClient(getServicePath(appName)) .put(ClientResponse.class, buffer); result = processResponse(response); } catch (Exception e) { @@ -454,7 +477,8 @@ public class ApiServiceClient extends AppAdminClient { ServiceApiUtil.validateNameFormat(appName, getConfig()); } try { - ClientResponse response = getApiClient(appName).get(ClientResponse.class); + ClientResponse response = getApiClient(getServicePath(appName)) + .get(ClientResponse.class); if (response.getStatus() != 200) { StringBuilder sb = new StringBuilder(); sb.append(appName); @@ -470,16 +494,20 @@ public class ApiServiceClient extends AppAdminClient { } @Override - public int actionUpgrade(String appName, - String fileName) throws IOException, YarnException { + public int initiateUpgrade(String appName, + String fileName, boolean autoFinalize) throws IOException, YarnException { int result; try { Service service = loadAppJsonFromLocalFS(fileName, appName, null, null); - service.setState(ServiceState.UPGRADING); + if (autoFinalize) { + service.setState(ServiceState.UPGRADING_AUTO_FINALIZE); + } else { + service.setState(ServiceState.UPGRADING); + } String buffer = jsonSerDeser.toJson(service); - ClientResponse response = getApiClient() - .post(ClientResponse.class, buffer); + ClientResponse response = getApiClient(getServicePath(appName)) + .put(ClientResponse.class, buffer); result = processResponse(response); } catch (Exception e) { LOG.error("Failed to upgrade application: ", e); @@ -487,4 +515,32 @@ public class ApiServiceClient extends AppAdminClient { } return result; } + + @Override + public int actionUpgradeInstances(String appName, List compInstances) + throws IOException, YarnException { + int result; + Container[] toUpgrade = new Container[compInstances.size()]; + try { + int idx = 0; + for (String instanceName : compInstances) { + Container container = new Container(); + container.setComponentInstanceName(instanceName); + container.setState(ContainerState.UPGRADING); + toUpgrade[idx++] = container; + } + String buffer = containerJsonSerde.toJson(toUpgrade); + ClientResponse response = getApiClient(getInstancesPath(appName)) + .put(ClientResponse.class, buffer); + result = processResponse(response); + } catch (Exception e) { + LOG.error("Failed to upgrade component instance: ", e); + result = EXIT_EXCEPTION_THROWN; + } + return result; + } + + private static JsonSerDeser containerJsonSerde = + new JsonSerDeser<>(Container[].class, + PropertyNamingStrategy.CAMEL_CASE_TO_LOWER_CASE_WITH_UNDERSCORES); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/4a7369b0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/webapp/ApiServer.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/webapp/ApiServer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/webapp/ApiServer.java index 14c77f6..6f32598 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/webapp/ApiServer.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/webapp/ApiServer.java @@ -17,6 +17,7 @@ package org.apache.hadoop.yarn.service.webapp; +import com.google.common.collect.Lists; import com.google.inject.Inject; import com.google.inject.Singleton; @@ -28,10 +29,14 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.service.api.records.Component; +import org.apache.hadoop.yarn.service.api.records.Container; +import org.apache.hadoop.yarn.service.api.records.ContainerState; import org.apache.hadoop.yarn.service.api.records.Service; import org.apache.hadoop.yarn.service.api.records.ServiceState; import org.apache.hadoop.yarn.service.api.records.ServiceStatus; import org.apache.hadoop.yarn.service.client.ServiceClient; +import org.apache.hadoop.yarn.service.conf.RestApiConstants; +import org.apache.hadoop.yarn.service.utils.ServiceApiUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -53,9 +58,12 @@ import java.io.FileNotFoundException; import java.io.IOException; import java.lang.reflect.UndeclaredThrowableException; import java.security.PrivilegedExceptionAction; +import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; +import java.util.stream.Collectors; import static org.apache.hadoop.yarn.service.api.records.ServiceState.ACCEPTED; import static org.apache.hadoop.yarn.service.conf.RestApiConstants.*; @@ -177,17 +185,7 @@ public class ApiServer { } UserGroupInformation ugi = getProxyUser(request); LOG.info("GET: getService for appName = {} user = {}", appName, ugi); - Service app = ugi.doAs(new PrivilegedExceptionAction() { - @Override - public Service run() throws IOException, YarnException { - ServiceClient sc = getServiceClient(); - sc.init(YARN_CONFIG); - sc.start(); - Service app = sc.getStatus(appName); - sc.close(); - return app; - } - }); + Service app = getServiceFromClient(ugi, appName); return Response.ok(app).build(); } catch (AccessControlException e) { return formatResponse(Status.FORBIDDEN, e.getMessage()); @@ -393,17 +391,19 @@ public class ApiServer { return startService(appName, ugi); } + // If an UPGRADE is requested + if (updateServiceData.getState() != null && ( + updateServiceData.getState() == ServiceState.UPGRADING || + updateServiceData.getState() == + ServiceState.UPGRADING_AUTO_FINALIZE)) { + return upgradeService(updateServiceData, ugi); + } + // If new lifetime value specified then update it if (updateServiceData.getLifetime() != null && updateServiceData.getLifetime() > 0) { return updateLifetime(appName, updateServiceData, ugi); } - - // If an UPGRADE is requested - if (updateServiceData.getState() != null && - updateServiceData.getState() == ServiceState.UPGRADING) { - return upgradeService(updateServiceData, ugi); - } } catch (UndeclaredThrowableException e) { return formatResponse(Status.BAD_REQUEST, e.getCause().getMessage()); @@ -427,6 +427,103 @@ public class ApiServer { return Response.status(Status.NO_CONTENT).build(); } + @PUT + @Path(COMP_INSTANCE_LONG_PATH) + @Consumes({MediaType.APPLICATION_JSON}) + @Produces({RestApiConstants.MEDIA_TYPE_JSON_UTF8, MediaType.TEXT_PLAIN}) + public Response updateComponentInstance(@Context HttpServletRequest request, + @PathParam(SERVICE_NAME) String serviceName, + @PathParam(COMPONENT_NAME) String componentName, + @PathParam(COMP_INSTANCE_NAME) String compInstanceName, + Container reqContainer) { + + try { + UserGroupInformation ugi = getProxyUser(request); + LOG.info("PUT: update component instance {} for component = {}" + + " service = {} user = {}", compInstanceName, componentName, + serviceName, ugi); + if (reqContainer == null) { + throw new YarnException("No container data provided."); + } + Service service = getServiceFromClient(ugi, serviceName); + Component component = service.getComponent(componentName); + if (component == null) { + throw new YarnException(String.format( + "The component name in the URI path (%s) is invalid.", + componentName)); + } + + Container liveContainer = component.getComponentInstance( + compInstanceName); + if (liveContainer == null) { + throw new YarnException(String.format( + "The component (%s) does not have a component instance (%s).", + componentName, compInstanceName)); + } + + if (reqContainer.getState() != null + && reqContainer.getState().equals(ContainerState.UPGRADING)) { + return processContainerUpgrade(ugi, service, + Lists.newArrayList(liveContainer)); + } + } catch (AccessControlException e) { + return formatResponse(Response.Status.FORBIDDEN, e.getMessage()); + } catch (YarnException e) { + return formatResponse(Response.Status.BAD_REQUEST, e.getMessage()); + } catch (IOException | InterruptedException e) { + return formatResponse(Response.Status.INTERNAL_SERVER_ERROR, + e.getMessage()); + } catch (UndeclaredThrowableException e) { + return formatResponse(Response.Status.INTERNAL_SERVER_ERROR, + e.getCause().getMessage()); + } + return Response.status(Status.NO_CONTENT).build(); + } + + @PUT + @Path(COMP_INSTANCES_PATH) + @Consumes({MediaType.APPLICATION_JSON}) + @Produces({RestApiConstants.MEDIA_TYPE_JSON_UTF8, MediaType.TEXT_PLAIN}) + public Response updateComponentInstances(@Context HttpServletRequest request, + @PathParam(SERVICE_NAME) String serviceName, + List requestContainers) { + + try { + if (requestContainers == null || requestContainers.isEmpty()) { + throw new YarnException("No containers provided."); + } + UserGroupInformation ugi = getProxyUser(request); + List toUpgrade = new ArrayList<>(); + for (Container reqContainer : requestContainers) { + if (reqContainer.getState() != null && + reqContainer.getState().equals(ContainerState.UPGRADING)) { + toUpgrade.add(reqContainer.getComponentInstanceName()); + } + } + + if (!toUpgrade.isEmpty()) { + Service service = getServiceFromClient(ugi, serviceName); + LOG.info("PUT: upgrade component instances {} for service = {} " + + "user = {}", toUpgrade, serviceName, ugi); + List liveContainers = ServiceApiUtil + .getLiveContainers(service, toUpgrade); + + return processContainerUpgrade(ugi, service, liveContainers); + } + } catch (AccessControlException e) { + return formatResponse(Response.Status.FORBIDDEN, e.getMessage()); + } catch (YarnException e) { + return formatResponse(Response.Status.BAD_REQUEST, e.getMessage()); + } catch (IOException | InterruptedException e) { + return formatResponse(Response.Status.INTERNAL_SERVER_ERROR, + e.getMessage()); + } catch (UndeclaredThrowableException e) { + return formatResponse(Response.Status.INTERNAL_SERVER_ERROR, + e.getCause().getMessage()); + } + return Response.status(Status.NO_CONTENT).build(); + } + private Response flexService(Service service, UserGroupInformation ugi) throws IOException, InterruptedException { String appName = service.getName(); @@ -511,17 +608,70 @@ public class ApiServer { ServiceClient sc = getServiceClient(); sc.init(YARN_CONFIG); sc.start(); - sc.actionUpgrade(service); + sc.initiateUpgrade(service); sc.close(); return null; }); - LOG.info("Service {} version {} upgrade initialized"); + LOG.info("Service {} version {} upgrade initialized", service.getName(), + service.getVersion()); status.setDiagnostics("Service " + service.getName() + " version " + service.getVersion() + " saved."); status.setState(ServiceState.ACCEPTED); return formatResponse(Status.ACCEPTED, status); } + private Response processContainerUpgrade(UserGroupInformation ugi, + Service service, List containers) throws YarnException, + IOException, InterruptedException { + + if (service.getState() != ServiceState.UPGRADING) { + throw new YarnException( + String.format("The upgrade of service %s has not been initiated.", + service.getName())); + } + for (Container liveContainer : containers) { + if (liveContainer.getState() != ContainerState.NEEDS_UPGRADE) { + // Nothing to upgrade + throw new YarnException(String.format( + "The component instance (%s) does not need an upgrade.", + liveContainer.getComponentInstanceName())); + } + } + + Integer result = ugi.doAs((PrivilegedExceptionAction) () -> { + int result1; + ServiceClient sc = getServiceClient(); + sc.init(YARN_CONFIG); + sc.start(); + result1 = sc.actionUpgrade(service, containers); + sc.close(); + return result1; + }); + + if (result == EXIT_SUCCESS) { + ServiceStatus status = new ServiceStatus(); + status.setDiagnostics( + "Upgrading component instances " + containers.stream() + .map(Container::getId).collect(Collectors.joining(",")) + "."); + return formatResponse(Response.Status.ACCEPTED, status); + } + // If result is not a success, consider it a no-op + return Response.status(Response.Status.NO_CONTENT).build(); + } + + private Service getServiceFromClient(UserGroupInformation ugi, + String serviceName) throws IOException, InterruptedException { + + return ugi.doAs((PrivilegedExceptionAction) () -> { + ServiceClient sc = getServiceClient(); + sc.init(YARN_CONFIG); + sc.start(); + Service app1 = sc.getStatus(serviceName); + sc.close(); + return app1; + }); + } + /** * Used by negative test case. * http://git-wip-us.apache.org/repos/asf/hadoop/blob/4a7369b0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/test/java/org/apache/hadoop/yarn/service/ServiceClientTest.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/test/java/org/apache/hadoop/yarn/service/ServiceClientTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/test/java/org/apache/hadoop/yarn/service/ServiceClientTest.java index 543c583..cff3e39 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/test/java/org/apache/hadoop/yarn/service/ServiceClientTest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/test/java/org/apache/hadoop/yarn/service/ServiceClientTest.java @@ -17,17 +17,24 @@ package org.apache.hadoop.yarn.service; -import java.io.IOException; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException; import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.service.api.records.Artifact; +import org.apache.hadoop.yarn.service.api.records.Component; +import org.apache.hadoop.yarn.service.api.records.Container; +import org.apache.hadoop.yarn.service.api.records.ContainerState; +import org.apache.hadoop.yarn.service.api.records.Resource; import org.apache.hadoop.yarn.service.api.records.Service; import org.apache.hadoop.yarn.service.client.ServiceClient; import org.apache.hadoop.yarn.service.utils.ServiceApiUtil; import org.apache.hadoop.yarn.service.utils.SliderFileSystem; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + /** * A mock version of ServiceClient - This class is design * to simulate various error conditions that will happen @@ -36,15 +43,32 @@ import org.apache.hadoop.yarn.service.utils.SliderFileSystem; public class ServiceClientTest extends ServiceClient { private Configuration conf = new Configuration(); - - protected static void init() { - } + private Service goodServiceStatus = buildLiveGoodService(); + private boolean initialized; public ServiceClientTest() { super(); } @Override + public void init(Configuration conf) { + if (!initialized) { + super.init(conf); + initialized = true; + } + } + + @Override + public void stop() { + // This is needed for testing API Server which use client to get status + // and then perform an action. + } + + public void forceStop() { + super.stop(); + } + + @Override public Configuration getConfig() { return conf; } @@ -58,11 +82,8 @@ public class ServiceClientTest extends ServiceClient { @Override public Service getStatus(String appName) { - if (appName == null) { - throw new NullPointerException(); - } - if (appName.equals("jenkins")) { - return new Service(); + if (appName != null && appName.equals("jenkins")) { + return goodServiceStatus; } else { throw new IllegalArgumentException(); } @@ -71,10 +92,7 @@ public class ServiceClientTest extends ServiceClient { @Override public int actionStart(String serviceName) throws YarnException, IOException { - if (serviceName == null) { - throw new NullPointerException(); - } - if (serviceName.equals("jenkins")) { + if (serviceName != null && serviceName.equals("jenkins")) { return EXIT_SUCCESS; } else { throw new ApplicationNotFoundException(""); @@ -98,19 +116,77 @@ public class ServiceClientTest extends ServiceClient { @Override public int actionDestroy(String serviceName) { - if (serviceName == null) { - throw new NullPointerException(); + if (serviceName != null) { + if (serviceName.equals("jenkins")) { + return EXIT_SUCCESS; + } else if (serviceName.equals("jenkins-already-stopped")) { + return EXIT_SUCCESS; + } else if (serviceName.equals("jenkins-doesn't-exist")) { + return EXIT_NOT_FOUND; + } else if (serviceName.equals("jenkins-error-cleaning-registry")) { + return EXIT_OTHER_FAILURE; + } } - if (serviceName.equals("jenkins")) { + throw new IllegalArgumentException(); + } + + @Override + public int initiateUpgrade(Service service) throws YarnException, + IOException { + if (service.getName() != null && service.getName().equals("jenkins")) { return EXIT_SUCCESS; - } else if (serviceName.equals("jenkins-already-stopped")) { + } else { + throw new IllegalArgumentException(); + } + } + + @Override + public int actionUpgrade(Service service, List compInstances) + throws IOException, YarnException { + if (service.getName() != null && service.getName().equals("jenkins")) { return EXIT_SUCCESS; - } else if (serviceName.equals("jenkins-doesn't-exist")) { - return EXIT_NOT_FOUND; - } else if (serviceName.equals("jenkins-error-cleaning-registry")) { - return EXIT_OTHER_FAILURE; } else { throw new IllegalArgumentException(); } } + + Service getGoodServiceStatus() { + return goodServiceStatus; + } + + static Service buildGoodService() { + Service service = new Service(); + service.setName("jenkins"); + service.setVersion("v1"); + Artifact artifact = new Artifact(); + artifact.setType(Artifact.TypeEnum.DOCKER); + artifact.setId("jenkins:latest"); + Resource resource = new Resource(); + resource.setCpus(1); + resource.setMemory("2048"); + List components = new ArrayList<>(); + Component c = new Component(); + c.setName("jenkins"); + c.setNumberOfContainers(2L); + c.setArtifact(artifact); + c.setLaunchCommand(""); + c.setResource(resource); + components.add(c); + service.setComponents(components); + return service; + } + + static Service buildLiveGoodService() { + Service service = buildGoodService(); + Component comp = service.getComponents().iterator().next(); + List containers = new ArrayList<>(); + for (int i = 0; i < comp.getNumberOfContainers(); i++) { + Container container = new Container(); + container.setComponentInstanceName(comp.getName() + "-" + (i + 1)); + container.setState(ContainerState.READY); + containers.add(container); + } + comp.setContainers(containers); + return service; + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/4a7369b0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/test/java/org/apache/hadoop/yarn/service/TestApiServer.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/test/java/org/apache/hadoop/yarn/service/TestApiServer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/test/java/org/apache/hadoop/yarn/service/TestApiServer.java index 72c6e2f..85c3cd4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/test/java/org/apache/hadoop/yarn/service/TestApiServer.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/test/java/org/apache/hadoop/yarn/service/TestApiServer.java @@ -35,12 +35,14 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.service.api.records.Artifact; import org.apache.hadoop.yarn.service.api.records.Artifact.TypeEnum; import org.apache.hadoop.yarn.service.api.records.Component; +import org.apache.hadoop.yarn.service.api.records.Container; +import org.apache.hadoop.yarn.service.api.records.ContainerState; import org.apache.hadoop.yarn.service.api.records.Resource; import org.apache.hadoop.yarn.service.api.records.Service; import org.apache.hadoop.yarn.service.api.records.ServiceState; import org.apache.hadoop.yarn.service.api.records.ServiceStatus; -import org.apache.hadoop.yarn.service.client.ServiceClient; import org.apache.hadoop.yarn.service.webapp.ApiServer; +import org.junit.After; import org.junit.Before; import org.junit.Test; import org.mockito.Mockito; @@ -52,13 +54,14 @@ import org.mockito.Mockito; public class TestApiServer { private ApiServer apiServer; private HttpServletRequest request; + private ServiceClientTest mockServerClient; @Before public void setup() throws Exception { request = Mockito.mock(HttpServletRequest.class); Mockito.when(request.getRemoteUser()) .thenReturn(System.getProperty("user.name")); - ServiceClient mockServerClient = new ServiceClientTest(); + mockServerClient = new ServiceClientTest(); Configuration conf = new Configuration(); conf.set("yarn.api-service.service.client.class", ServiceClientTest.class.getName()); @@ -66,6 +69,11 @@ public class TestApiServer { apiServer.setServiceClient(mockServerClient); } + @After + public void teardown() { + mockServerClient.forceStop(); + } + @Test public void testPathAnnotation() { assertNotNull(this.apiServer.getClass().getAnnotation(Path.class)); @@ -107,24 +115,7 @@ public class TestApiServer { BufferedWriter bw = new BufferedWriter(new FileWriter(dockerConfig)); bw.write(json); bw.close(); - Service service = new Service(); - service.setName("jenkins"); - service.setVersion("v1"); - Artifact artifact = new Artifact(); - artifact.setType(TypeEnum.DOCKER); - artifact.setId("jenkins:latest"); - Resource resource = new Resource(); - resource.setCpus(1); - resource.setMemory("2048"); - List components = new ArrayList(); - Component c = new Component(); - c.setName("jenkins"); - c.setNumberOfContainers(1L); - c.setArtifact(artifact); - c.setLaunchCommand(""); - c.setResource(resource); - components.add(c); - service.setComponents(components); + Service service = ServiceClientTest.buildGoodService(); final Response actual = apiServer.createService(request, service); assertEquals("Create service is ", Response.status(Status.ACCEPTED).build().getStatus(), @@ -495,4 +486,60 @@ public class TestApiServer { + "that in the URI path (jenkins-master)", serviceStatus.getDiagnostics()); } + + @Test + public void testInitiateUpgrade() { + Service goodService = ServiceClientTest.buildLiveGoodService(); + goodService.setVersion("v2"); + goodService.setState(ServiceState.UPGRADING); + final Response actual = apiServer.updateService(request, + goodService.getName(), goodService); + assertEquals("Initiate upgrade is ", + Response.status(Status.ACCEPTED).build().getStatus(), + actual.getStatus()); + } + + @Test + public void testUpgradeSingleInstance() { + Service goodService = ServiceClientTest.buildLiveGoodService(); + Component comp = goodService.getComponents().iterator().next(); + Container container = comp.getContainers().iterator().next(); + container.setState(ContainerState.UPGRADING); + + // To be able to upgrade, the service needs to be in UPGRADING + // and container state needs to be in NEEDS_UPGRADE. + Service serviceStatus = mockServerClient.getGoodServiceStatus(); + serviceStatus.setState(ServiceState.UPGRADING); + serviceStatus.getComponents().iterator().next().getContainers().iterator() + .next().setState(ContainerState.NEEDS_UPGRADE); + + final Response actual = apiServer.updateComponentInstance(request, + goodService.getName(), comp.getName(), + container.getComponentInstanceName(), container); + assertEquals("Instance upgrade is ", + Response.status(Status.ACCEPTED).build().getStatus(), + actual.getStatus()); + } + + @Test + public void testUpgradeMultipleInstances() { + Service goodService = ServiceClientTest.buildLiveGoodService(); + Component comp = goodService.getComponents().iterator().next(); + comp.getContainers().forEach(container -> + container.setState(ContainerState.UPGRADING)); + + // To be able to upgrade, the service needs to be in UPGRADING + // and container state needs to be in NEEDS_UPGRADE. + Service serviceStatus = mockServerClient.getGoodServiceStatus(); + serviceStatus.setState(ServiceState.UPGRADING); + serviceStatus.getComponents().iterator().next().getContainers().forEach( + container -> container.setState(ContainerState.NEEDS_UPGRADE) + ); + + final Response actual = apiServer.updateComponentInstances(request, + goodService.getName(), comp.getContainers()); + assertEquals("Instance upgrade is ", + Response.status(Status.ACCEPTED).build().getStatus(), + actual.getStatus()); + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/4a7369b0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/test/java/org/apache/hadoop/yarn/service/client/TestApiServiceClient.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/test/java/org/apache/hadoop/yarn/service/client/TestApiServiceClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/test/java/org/apache/hadoop/yarn/service/client/TestApiServiceClient.java index ffd9328..a245144 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/test/java/org/apache/hadoop/yarn/service/client/TestApiServiceClient.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/test/java/org/apache/hadoop/yarn/service/client/TestApiServiceClient.java @@ -26,6 +26,7 @@ import javax.servlet.http.HttpServlet; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; +import com.google.common.collect.Lists; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.eclipse.jetty.server.Server; @@ -256,4 +257,29 @@ public class TestApiServiceClient { } } + @Test + public void testInitiateServiceUpgrade() { + String appName = "example-app"; + String upgradeFileName = "target/test-classes/example-app.json"; + try { + int result = asc.initiateUpgrade(appName, upgradeFileName, false); + assertEquals(EXIT_SUCCESS, result); + } catch (IOException | YarnException e) { + fail(); + } + } + + @Test + public void testInstancesUpgrade() { + String appName = "example-app"; + try { + int result = asc.actionUpgradeInstances(appName, Lists.newArrayList( + "comp-1", "comp-2")); + assertEquals(EXIT_SUCCESS, result); + } catch (IOException | YarnException e) { + fail(); + } + } + + } http://git-wip-us.apache.org/repos/asf/hadoop/blob/4a7369b0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ClientAMProtocol.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ClientAMProtocol.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ClientAMProtocol.java index 4422451..45ff98a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ClientAMProtocol.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ClientAMProtocol.java @@ -19,6 +19,8 @@ package org.apache.hadoop.yarn.service; import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.proto.ClientAMProtocol.CompInstancesUpgradeResponseProto; +import org.apache.hadoop.yarn.proto.ClientAMProtocol.CompInstancesUpgradeRequestProto; import org.apache.hadoop.yarn.proto.ClientAMProtocol.FlexComponentsRequestProto; import org.apache.hadoop.yarn.proto.ClientAMProtocol.FlexComponentsResponseProto; import org.apache.hadoop.yarn.proto.ClientAMProtocol.GetStatusResponseProto; @@ -49,4 +51,8 @@ public interface ClientAMProtocol { RestartServiceResponseProto restart(RestartServiceRequestProto request) throws IOException, YarnException; + + CompInstancesUpgradeResponseProto upgrade( + CompInstancesUpgradeRequestProto request) throws IOException, + YarnException; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/4a7369b0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ClientAMService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ClientAMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ClientAMService.java index 3d037e7..d5d6fa4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ClientAMService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ClientAMService.java @@ -26,8 +26,11 @@ import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.util.ExitUtil; import org.apache.hadoop.yarn.api.ApplicationConstants; +import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.ipc.YarnRPC; +import org.apache.hadoop.yarn.proto.ClientAMProtocol.CompInstancesUpgradeRequestProto; +import org.apache.hadoop.yarn.proto.ClientAMProtocol.CompInstancesUpgradeResponseProto; import org.apache.hadoop.yarn.proto.ClientAMProtocol.ComponentCountProto; import org.apache.hadoop.yarn.proto.ClientAMProtocol.FlexComponentsRequestProto; import org.apache.hadoop.yarn.proto.ClientAMProtocol.FlexComponentsResponseProto; @@ -40,6 +43,8 @@ import org.apache.hadoop.yarn.proto.ClientAMProtocol.StopResponseProto; import org.apache.hadoop.yarn.proto.ClientAMProtocol.UpgradeServiceRequestProto; import org.apache.hadoop.yarn.proto.ClientAMProtocol.UpgradeServiceResponseProto; import org.apache.hadoop.yarn.service.component.ComponentEvent; +import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEvent; +import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEventType; import org.apache.hadoop.yarn.service.utils.ServiceApiUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -151,12 +156,16 @@ public class ClientAMService extends AbstractService @Override public UpgradeServiceResponseProto upgrade( UpgradeServiceRequestProto request) throws IOException { - ServiceEvent event = new ServiceEvent(ServiceEventType.UPGRADE); - event.setVersion(request.getVersion()); - context.scheduler.getDispatcher().getEventHandler().handle(event); - LOG.info("Upgrading service to version {} by {}", request.getVersion(), - UserGroupInformation.getCurrentUser()); - return UpgradeServiceResponseProto.newBuilder().build(); + try { + context.getServiceManager().processUpgradeRequest(request.getVersion(), + request.getAutoFinalize()); + LOG.info("Upgrading service to version {} by {}", request.getVersion(), + UserGroupInformation.getCurrentUser()); + return UpgradeServiceResponseProto.newBuilder().build(); + } catch (Exception ex) { + return UpgradeServiceResponseProto.newBuilder().setError(ex.getMessage()) + .build(); + } } @Override @@ -167,4 +176,21 @@ public class ClientAMService extends AbstractService LOG.info("Restart service by {}", UserGroupInformation.getCurrentUser()); return RestartServiceResponseProto.newBuilder().build(); } + + @Override + public CompInstancesUpgradeResponseProto upgrade( + CompInstancesUpgradeRequestProto request) + throws IOException, YarnException { + if (!request.getContainerIdsList().isEmpty()) { + + for (String containerId : request.getContainerIdsList()) { + ComponentInstanceEvent event = + new ComponentInstanceEvent(ContainerId.fromString(containerId), + ComponentInstanceEventType.UPGRADE); + LOG.info("Upgrade container {}", containerId); + context.scheduler.getDispatcher().getEventHandler().handle(event); + } + } + return CompInstancesUpgradeResponseProto.newBuilder().build(); + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/4a7369b0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceContext.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceContext.java index cd41ab7..6c91b9c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceContext.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.service; +import com.google.common.base.Preconditions; import com.google.common.cache.LoadingCache; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.security.client.ClientToAMTokenSecretManager; @@ -42,8 +43,17 @@ public class ServiceContext { public String principal; // AM keytab location public String keytab; + private ServiceManager serviceManager; public ServiceContext() { } + + public ServiceManager getServiceManager() { + return serviceManager; + } + + void setServiceManager(ServiceManager serviceManager) { + this.serviceManager = Preconditions.checkNotNull(serviceManager); + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/4a7369b0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceEvent.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceEvent.java index 9e7d442..0196be2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceEvent.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceEvent.java @@ -28,6 +28,7 @@ public class ServiceEvent extends AbstractEvent { private final ServiceEventType type; private String version; + private boolean autoFinalize; public ServiceEvent(ServiceEventType serviceEventType) { super(serviceEventType); @@ -46,4 +47,13 @@ public class ServiceEvent extends AbstractEvent { this.version = version; return this; } + + public boolean isAutoFinalize() { + return autoFinalize; + } + + public ServiceEvent setAutoFinalize(boolean autoFinalize) { + this.autoFinalize = autoFinalize; + return this; + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/4a7369b0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceEventType.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceEventType.java index 2162eb5..4fc420b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceEventType.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceEventType.java @@ -24,5 +24,5 @@ package org.apache.hadoop.yarn.service; public enum ServiceEventType { START, UPGRADE, - STOP_UPGRADE + CHECK_STABLE } http://git-wip-us.apache.org/repos/asf/hadoop/blob/4a7369b0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceManager.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceManager.java index a3fbe89..869d7f3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceManager.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -24,6 +24,7 @@ import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.service.api.records.Service; import org.apache.hadoop.yarn.service.api.records.ServiceState; +import org.apache.hadoop.yarn.service.component.Component; import org.apache.hadoop.yarn.service.component.ComponentEvent; import org.apache.hadoop.yarn.service.component.ComponentEventType; import org.apache.hadoop.yarn.service.utils.ServiceApiUtil; @@ -39,10 +40,13 @@ import java.io.IOException; import java.text.MessageFormat; import java.util.EnumSet; import java.util.List; +import java.util.Map; import java.util.concurrent.locks.ReentrantReadWriteLock; +import static org.apache.hadoop.yarn.service.utils.ServiceApiUtil.jsonSerDeser; + /** - * Manages the state of the service. + * Manages the state of Service. */ public class ServiceManager implements EventHandler { private static final Logger LOG = LoggerFactory.getLogger( @@ -56,10 +60,10 @@ public class ServiceManager implements EventHandler { private final StateMachine stateMachine; + private final UpgradeComponentsFinder componentsFinder; private final AsyncDispatcher dispatcher; private final SliderFileSystem fs; - private final UpgradeComponentsFinder componentsFinder; private String upgradeVersion; @@ -72,9 +76,16 @@ public class ServiceManager implements EventHandler { State.UPGRADING), ServiceEventType.UPGRADE, new StartUpgradeTransition()) + .addTransition(State.STABLE, EnumSet.of(State.STABLE), + ServiceEventType.CHECK_STABLE, new CheckStableTransition()) + .addTransition(State.UPGRADING, EnumSet.of(State.STABLE, State.UPGRADING), ServiceEventType.START, - new StopUpgradeTransition()) + new CheckStableTransition()) + + .addTransition(State.UPGRADING, EnumSet.of(State.STABLE, + State.UPGRADING), ServiceEventType.CHECK_STABLE, + new CheckStableTransition()) .installTopology(); public ServiceManager(ServiceContext context) { @@ -102,7 +113,7 @@ public class ServiceManager implements EventHandler { stateMachine.doTransition(event.getType(), event); } catch (InvalidStateTransitionException e) { LOG.error(MessageFormat.format( - "[SERVICE]: Invalid event {0} at {1}.", event.getType(), + "[SERVICE]: Invalid event {1} at {2}.", event.getType(), oldState), e); } if (oldState != getState()) { @@ -130,22 +141,11 @@ public class ServiceManager implements EventHandler { public State transition(ServiceManager serviceManager, ServiceEvent event) { try { - Service targetSpec = ServiceApiUtil.loadServiceUpgrade( - serviceManager.fs, serviceManager.getName(), event.getVersion()); - - serviceManager.serviceSpec.setState(ServiceState.UPGRADING); - List - compsThatNeedUpgrade = serviceManager.componentsFinder. - findTargetComponentSpecs(serviceManager.serviceSpec, targetSpec); - - if (compsThatNeedUpgrade != null && !compsThatNeedUpgrade.isEmpty()) { - compsThatNeedUpgrade.forEach(component -> { - ComponentEvent needUpgradeEvent = new ComponentEvent( - component.getName(), ComponentEventType.UPGRADE). - setTargetSpec(component); - serviceManager.dispatcher.getEventHandler().handle( - needUpgradeEvent); - }); + if (!event.isAutoFinalize()) { + serviceManager.serviceSpec.setState(ServiceState.UPGRADING); + } else { + serviceManager.serviceSpec.setState( + ServiceState.UPGRADING_AUTO_FINALIZE); } serviceManager.upgradeVersion = event.getVersion(); return State.UPGRADING; @@ -157,22 +157,29 @@ public class ServiceManager implements EventHandler { } } - private static class StopUpgradeTransition implements + private static class CheckStableTransition implements MultipleArcTransition { @Override public State transition(ServiceManager serviceManager, ServiceEvent event) { - //abort is not supported currently - //trigger re-check of service state - ServiceMaster.checkAndUpdateServiceState(serviceManager.scheduler, - true); - if (serviceManager.serviceSpec.getState().equals(ServiceState.STABLE)) { - return serviceManager.finalizeUpgrade() ? State.STABLE : - State.UPGRADING; - } else { - return State.UPGRADING; + //trigger check of service state + ServiceState currState = serviceManager.serviceSpec.getState(); + if (currState.equals(ServiceState.STABLE)) { + return State.STABLE; } + if (currState.equals(ServiceState.UPGRADING_AUTO_FINALIZE) || + event.getType().equals(ServiceEventType.START)) { + ServiceState targetState = checkIfStable(serviceManager.serviceSpec); + if (targetState.equals(ServiceState.STABLE)) { + if (serviceManager.finalizeUpgrade()) { + LOG.info("Service def state changed from {} -> {}", currState, + serviceManager.serviceSpec.getState()); + return State.STABLE; + } + } + } + return State.UPGRADING; } } @@ -181,12 +188,21 @@ public class ServiceManager implements EventHandler { */ private boolean finalizeUpgrade() { try { - Service upgradeSpec = ServiceApiUtil.loadServiceUpgrade( + // save the application id and state to + Service targetSpec = ServiceApiUtil.loadServiceUpgrade( fs, getName(), upgradeVersion); - ServiceApiUtil.writeAppDefinition(fs, - ServiceApiUtil.getServiceJsonPath(fs, getName()), upgradeSpec); + targetSpec.setId(serviceSpec.getId()); + targetSpec.setState(ServiceState.STABLE); + Map allComps = scheduler.getAllComponents(); + targetSpec.getComponents().forEach(compSpec -> { + Component comp = allComps.get(compSpec.getName()); + compSpec.setState(comp.getComponentSpec().getState()); + }); + jsonSerDeser.save(fs.getFileSystem(), + ServiceApiUtil.getServiceJsonPath(fs, getName()), targetSpec, true); + fs.deleteClusterUpgradeDir(getName(), upgradeVersion); } catch (IOException e) { - LOG.error("Upgrade did not complete because unable to overwrite the" + + LOG.error("Upgrade did not complete because unable to re-write the" + " service definition", e); return false; } @@ -195,13 +211,79 @@ public class ServiceManager implements EventHandler { fs.deleteClusterUpgradeDir(getName(), upgradeVersion); } catch (IOException e) { LOG.warn("Unable to delete upgrade definition for service {} " + - "version {}", getName(), upgradeVersion); + "version {}", getName(), upgradeVersion); } + serviceSpec.setState(ServiceState.STABLE); serviceSpec.setVersion(upgradeVersion); upgradeVersion = null; return true; } + private static ServiceState checkIfStable(Service service) { + // if desired == running + for (org.apache.hadoop.yarn.service.api.records.Component comp : + service.getComponents()) { + if (!comp.getState().equals( + org.apache.hadoop.yarn.service.api.records.ComponentState.STABLE)) { + return service.getState(); + } + } + return ServiceState.STABLE; + } + + /** + * Service state gets directly modified by ServiceMaster and Component. + * This is a problem for upgrade and flexing. For now, invoking + * ServiceMaster.checkAndUpdateServiceState here to make it easy to fix + * this in future. + */ + public void checkAndUpdateServiceState(boolean isIncrement) { + writeLock.lock(); + try { + if (!getState().equals(State.UPGRADING)) { + ServiceMaster.checkAndUpdateServiceState(this.scheduler, + isIncrement); + } + } finally { + writeLock.unlock(); + } + } + + void processUpgradeRequest(String upgradeVersion, + boolean autoFinalize) throws IOException { + Service targetSpec = ServiceApiUtil.loadServiceUpgrade( + context.fs, context.service.getName(), upgradeVersion); + + List + compsThatNeedUpgrade = componentsFinder. + findTargetComponentSpecs(context.service, targetSpec); + ServiceEvent event = new ServiceEvent(ServiceEventType.UPGRADE) + .setVersion(upgradeVersion) + .setAutoFinalize(autoFinalize); + context.scheduler.getDispatcher().getEventHandler().handle(event); + + if (compsThatNeedUpgrade != null && !compsThatNeedUpgrade.isEmpty()) { + if (autoFinalize) { + event.setAutoFinalize(true); + } + compsThatNeedUpgrade.forEach(component -> { + ComponentEvent needUpgradeEvent = new ComponentEvent( + component.getName(), ComponentEventType.UPGRADE) + .setTargetSpec(component) + .setUpgradeVersion(event.getVersion()); + context.scheduler.getDispatcher().getEventHandler().handle( + needUpgradeEvent); + }); + } else { + // nothing to upgrade if upgrade auto finalize is requested, trigger a + // state check. + if (autoFinalize) { + context.scheduler.getDispatcher().getEventHandler().handle( + new ServiceEvent(ServiceEventType.CHECK_STABLE)); + } + } + } + /** * Returns the name of the service. */ @@ -216,10 +298,8 @@ public class ServiceManager implements EventHandler { STABLE, UPGRADING } - @VisibleForTesting Service getServiceSpec() { return serviceSpec; } - } http://git-wip-us.apache.org/repos/asf/hadoop/blob/4a7369b0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java index 8d01410..ee0a1a7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java @@ -329,6 +329,7 @@ public class ServiceScheduler extends CompositeService { // Since AM has been started and registered, the service is in STARTED state app.setState(ServiceState.STARTED); serviceManager = new ServiceManager(context); + context.setServiceManager(serviceManager); // recover components based on containers sent from RM recoverComponents(response); @@ -757,6 +758,32 @@ public class ServiceScheduler extends CompositeService { // automatically which will trigger stopping COMPONENT INSTANCE } + @Override + public void onContainerReInitialize(ContainerId containerId) { + ComponentInstance instance = liveInstances.get(containerId); + if (instance == null) { + LOG.error("No component instance exists for {}", containerId); + return; + } + ComponentInstanceEvent becomeReadyEvent = new ComponentInstanceEvent( + containerId, ComponentInstanceEventType.BECOME_READY); + dispatcher.getEventHandler().handle(becomeReadyEvent); + } + + @Override + public void onContainerReInitializeError(ContainerId containerId, + Throwable t) { + ComponentInstance instance = liveInstances.get(containerId); + if (instance == null) { + LOG.error("No component instance exists for {}", containerId); + return; + } + ComponentEvent event = new ComponentEvent(instance.getCompName(), + ComponentEventType.CONTAINER_COMPLETED) + .setInstance(instance).setContainerId(containerId); + dispatcher.getEventHandler().handle(event); + } + @Override public void onContainerResourceIncreased(ContainerId containerId, Resource resource) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/4a7369b0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/Component.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/Component.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/Component.java index 667b1aa..7deb076 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/Component.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/Component.java @@ -250,6 +250,15 @@ public class Component implements Serializable { return null; } + public Container getComponentInstance(String compInstanceName) { + for (Container container : containers) { + if (compInstanceName.equals(container.getComponentInstanceName())) { + return container; + } + } + return null; + } + /** * Run all containers of this component in privileged mode (YARN-4262). **/ @@ -441,4 +450,16 @@ public class Component implements Serializable { this.setReadinessCheck(that.getReadinessCheck()); } } + + public void overwrite(Component that) { + setArtifact(that.getArtifact()); + setResource(that.resource); + setNumberOfContainers(that.getNumberOfContainers()); + setLaunchCommand(that.getLaunchCommand()); + setConfiguration(that.configuration); + setRunPrivilegedContainer(that.getRunPrivilegedContainer()); + setDependencies(that.getDependencies()); + setPlacementPolicy(that.getPlacementPolicy()); + setReadinessCheck(that.getReadinessCheck()); + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/4a7369b0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ContainerState.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ContainerState.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ContainerState.java index bf09ff2..6e39073 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ContainerState.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ContainerState.java @@ -26,5 +26,5 @@ import org.apache.hadoop.classification.InterfaceStability; @InterfaceAudience.Public @InterfaceStability.Unstable public enum ContainerState { - RUNNING_BUT_UNREADY, READY, STOPPED + RUNNING_BUT_UNREADY, READY, STOPPED, NEEDS_UPGRADE, UPGRADING; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/4a7369b0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ServiceState.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ServiceState.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ServiceState.java index 286eaa2..b6ae38b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ServiceState.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ServiceState.java @@ -29,5 +29,6 @@ import org.apache.hadoop.classification.InterfaceStability; @ApiModel(description = "The current state of an service.") @javax.annotation.Generated(value = "class io.swagger.codegen.languages.JavaClientCodegen", date = "2016-06-02T08:15:05.615-07:00") public enum ServiceState { - ACCEPTED, STARTED, STABLE, STOPPED, FAILED, FLEX, UPGRADING; + ACCEPTED, STARTED, STABLE, STOPPED, FAILED, FLEX, UPGRADING, + UPGRADING_AUTO_FINALIZE; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/4a7369b0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/client/ServiceClient.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/client/ServiceClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/client/ServiceClient.java index 453619b..52cd369 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/client/ServiceClient.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/client/ServiceClient.java @@ -52,6 +52,7 @@ import org.apache.hadoop.yarn.client.util.YarnClientUtils; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.ipc.YarnRPC; +import org.apache.hadoop.yarn.proto.ClientAMProtocol.CompInstancesUpgradeRequestProto; import org.apache.hadoop.yarn.proto.ClientAMProtocol.ComponentCountProto; import org.apache.hadoop.yarn.proto.ClientAMProtocol.FlexComponentsRequestProto; import org.apache.hadoop.yarn.proto.ClientAMProtocol.GetStatusRequestProto; @@ -59,8 +60,10 @@ import org.apache.hadoop.yarn.proto.ClientAMProtocol.GetStatusResponseProto; import org.apache.hadoop.yarn.proto.ClientAMProtocol.RestartServiceRequestProto; import org.apache.hadoop.yarn.proto.ClientAMProtocol.StopRequestProto; import org.apache.hadoop.yarn.proto.ClientAMProtocol.UpgradeServiceRequestProto; +import org.apache.hadoop.yarn.proto.ClientAMProtocol.UpgradeServiceResponseProto; import org.apache.hadoop.yarn.service.ClientAMProtocol; import org.apache.hadoop.yarn.service.ServiceMaster; +import org.apache.hadoop.yarn.service.api.records.Container; import org.apache.hadoop.yarn.service.api.records.Component; import org.apache.hadoop.yarn.service.api.records.Service; import org.apache.hadoop.yarn.service.api.records.ServiceState; @@ -206,15 +209,21 @@ public class ServiceClient extends AppAdminClient implements SliderExitCodes, } @Override - public int actionUpgrade(String appName, String fileName) + public int initiateUpgrade(String appName, String fileName, + boolean autoFinalize) throws IOException, YarnException { - checkAppExistOnHdfs(appName); Service upgradeService = loadAppJsonFromLocalFS(fileName, appName, null, null); - return actionUpgrade(upgradeService); + if (autoFinalize) { + upgradeService.setState(ServiceState.UPGRADING_AUTO_FINALIZE); + } else { + upgradeService.setState(ServiceState.UPGRADING); + } + return initiateUpgrade(upgradeService); } - public int actionUpgrade(Service service) throws YarnException, IOException { + public int initiateUpgrade(Service service) throws YarnException, + IOException { Service persistedService = ServiceApiUtil.loadService(fs, service.getName()); if (!StringUtils.isEmpty(persistedService.getId())) { @@ -231,6 +240,15 @@ public class ServiceClient extends AppAdminClient implements SliderExitCodes, throw new YarnException(message); } + Service liveService = getStatus(service.getName()); + if (!liveService.getState().equals(ServiceState.STABLE)) { + String message = service.getName() + " is at " + + liveService.getState() + + " state, upgrade can not be invoked when service is STABLE."; + LOG.error(message); + throw new YarnException(message); + } + Path serviceUpgradeDir = checkAppNotExistOnHdfs(service, true); ServiceApiUtil.validateAndResolveService(service, fs, getConfig()); ServiceApiUtil.createDirAndPersistApp(fs, serviceUpgradeDir, service); @@ -245,8 +263,56 @@ public class ServiceClient extends AppAdminClient implements SliderExitCodes, UpgradeServiceRequestProto.Builder requestBuilder = UpgradeServiceRequestProto.newBuilder(); requestBuilder.setVersion(service.getVersion()); + if (service.getState().equals(ServiceState.UPGRADING_AUTO_FINALIZE)) { + requestBuilder.setAutoFinalize(true); + } + UpgradeServiceResponseProto responseProto = proxy.upgrade( + requestBuilder.build()); + if (responseProto.hasError()) { + LOG.error("Service {} upgrade to version {} failed because {}", + service.getName(), service.getVersion(), responseProto.getError()); + throw new YarnException("Failed to upgrade service " + service.getName() + + " to version " + service.getVersion() + " because " + + responseProto.getError()); + } + return EXIT_SUCCESS; + } + + @Override + public int actionUpgradeInstances(String appName, + List componentInstances) throws IOException, YarnException { + checkAppExistOnHdfs(appName); + Service persistedService = ServiceApiUtil.loadService(fs, appName); + List containersToUpgrade = ServiceApiUtil. + getLiveContainers(persistedService, componentInstances); + return actionUpgrade(persistedService, containersToUpgrade); + } - proxy.upgrade(requestBuilder.build()); + public int actionUpgrade(Service service, List compInstances) + throws IOException, YarnException { + ApplicationReport appReport = + yarnClient.getApplicationReport(getAppId(service.getName())); + + if (appReport.getYarnApplicationState() != RUNNING) { + String message = service.getName() + " is at " + + appReport.getYarnApplicationState() + + " state, upgrade can only be invoked when service is running."; + LOG.error(message); + throw new YarnException(message); + } + if (StringUtils.isEmpty(appReport.getHost())) { + throw new YarnException(service.getName() + " AM hostname is empty."); + } + ClientAMProtocol proxy = createAMProxy(service.getName(), appReport); + + List containerIdsToUpgrade = new ArrayList<>(); + compInstances.forEach(compInst -> + containerIdsToUpgrade.add(compInst.getId())); + LOG.info("instances to upgrade {}", containerIdsToUpgrade); + CompInstancesUpgradeRequestProto.Builder upgradeRequestBuilder = + CompInstancesUpgradeRequestProto.newBuilder(); + upgradeRequestBuilder.addAllContainerIds(containerIdsToUpgrade); + proxy.upgrade(upgradeRequestBuilder.build()); return EXIT_SUCCESS; } @@ -391,6 +457,17 @@ public class ServiceClient extends AppAdminClient implements SliderExitCodes, LOG.error(message); throw new YarnException(message); } + + Service liveService = getStatus(serviceName); + if (liveService.getState().equals(ServiceState.UPGRADING) || + liveService.getState().equals(ServiceState.UPGRADING_AUTO_FINALIZE)) { + String message = serviceName + " is at " + + liveService.getState() + + " state, flex can not be invoked when service is upgrading. "; + LOG.error(message); + throw new YarnException(message); + } + if (StringUtils.isEmpty(appReport.getHost())) { throw new YarnException(serviceName + " AM hostname is empty"); } --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org For additional commands, e-mail: common-commits-help@hadoop.apache.org