Return-Path: X-Original-To: apmail-hadoop-yarn-commits-archive@minotaur.apache.org Delivered-To: apmail-hadoop-yarn-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id E3DD81082C for ; Wed, 26 Feb 2014 20:21:31 +0000 (UTC) Received: (qmail 65105 invoked by uid 500); 26 Feb 2014 20:21:28 -0000 Delivered-To: apmail-hadoop-yarn-commits-archive@hadoop.apache.org Received: (qmail 64917 invoked by uid 500); 26 Feb 2014 20:21:25 -0000 Mailing-List: contact yarn-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: yarn-commits@hadoop.apache.org Delivered-To: mailing list yarn-commits@hadoop.apache.org Received: (qmail 64791 invoked by uid 99); 26 Feb 2014 20:21:23 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 26 Feb 2014 20:21:23 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 26 Feb 2014 20:21:20 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id C0BBC2388A33; Wed, 26 Feb 2014 20:21:00 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1572233 - in /hadoop/common/branches/branch-2.4/hadoop-yarn-project: ./ hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/ hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ had... Date: Wed, 26 Feb 2014 20:21:00 -0000 To: yarn-commits@hadoop.apache.org From: vinodkv@apache.org X-Mailer: svnmailer-1.0.9 Message-Id: <20140226202100.C0BBC2388A33@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: vinodkv Date: Wed Feb 26 20:20:59 2014 New Revision: 1572233 URL: http://svn.apache.org/r1572233 Log: YARN-1588. Enhanced RM and the scheduling protocol to also send NMTokens of transferred containers from previous app-attempts to new AMs after YARN-1490. Contributed by Jian He. svn merge --ignore-ancestry -c 1572230 ../../trunk/ Modified: hadoop/common/branches/branch-2.4/hadoop-yarn-project/CHANGES.txt hadoop/common/branches/branch-2.4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/RegisterApplicationMasterResponse.java hadoop/common/branches/branch-2.4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/NMToken.java hadoop/common/branches/branch-2.4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto hadoop/common/branches/branch-2.4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java hadoop/common/branches/branch-2.4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java hadoop/common/branches/branch-2.4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/RegisterApplicationMasterResponsePBImpl.java hadoop/common/branches/branch-2.4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/NMTokenPBImpl.java hadoop/common/branches/branch-2.4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java hadoop/common/branches/branch-2.4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java hadoop/common/branches/branch-2.4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java hadoop/common/branches/branch-2.4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java Modified: hadoop/common/branches/branch-2.4/hadoop-yarn-project/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.4/hadoop-yarn-project/CHANGES.txt?rev=1572233&r1=1572232&r2=1572233&view=diff ============================================================================== --- hadoop/common/branches/branch-2.4/hadoop-yarn-project/CHANGES.txt (original) +++ hadoop/common/branches/branch-2.4/hadoop-yarn-project/CHANGES.txt Wed Feb 26 20:20:59 2014 @@ -107,6 +107,10 @@ Release 2.4.0 - UNRELEASED YARN-1497. Command line additions for moving apps between queues (Sandy Ryza) + YARN-1588. Enhanced RM and the scheduling protocol to also send NMTokens of + transferred containers from previous app-attempts to new AMs after YARN-1490. + (Jian He via vinodkv) + IMPROVEMENTS YARN-1007. Enhance History Reader interface for Containers. (Mayank Bansal via Modified: hadoop/common/branches/branch-2.4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/RegisterApplicationMasterResponse.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/RegisterApplicationMasterResponse.java?rev=1572233&r1=1572232&r2=1572233&view=diff ============================================================================== --- hadoop/common/branches/branch-2.4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/RegisterApplicationMasterResponse.java (original) +++ hadoop/common/branches/branch-2.4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/RegisterApplicationMasterResponse.java Wed Feb 26 20:20:59 2014 @@ -29,6 +29,7 @@ import org.apache.hadoop.classification. import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; import org.apache.hadoop.yarn.api.records.ApplicationAccessType; import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.NMToken; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.util.Records; @@ -55,13 +56,15 @@ public abstract class RegisterApplicatio public static RegisterApplicationMasterResponse newInstance( Resource minCapability, Resource maxCapability, Map acls, ByteBuffer key, - List containersFromPreviousAttempt, String queue) { + List containersFromPreviousAttempt, String queue, + List nmTokensFromPreviousAttempts) { RegisterApplicationMasterResponse response = Records.newRecord(RegisterApplicationMasterResponse.class); response.setMaximumResourceCapability(maxCapability); response.setApplicationACLs(acls); response.setClientToAMTokenMasterKey(key); - response.setContainersFromPreviousAttempt(containersFromPreviousAttempt); + response.setContainersFromPreviousAttempts(containersFromPreviousAttempt); + response.setNMTokensFromPreviousAttempts(nmTokensFromPreviousAttempts); response.setQueue(queue); return response; } @@ -129,26 +132,52 @@ public abstract class RegisterApplicatio /** *

* Get the list of running containers as viewed by - * ResourceManager from previous application attempt. + * ResourceManager from previous application attempts. *

* * @return the list of running containers as viewed by - * ResourceManager from previous application attempt + * ResourceManager from previous application attempts + * @see RegisterApplicationMasterResponse#getNMTokensFromPreviousAttempts() */ @Public @Unstable - public abstract List getContainersFromPreviousAttempt(); + public abstract List getContainersFromPreviousAttempts(); /** * Set the list of running containers as viewed by - * ResourceManager from previous application attempt. + * ResourceManager from previous application attempts. * * @param containersFromPreviousAttempt * the list of running containers as viewed by - * ResourceManager from previous application attempt. + * ResourceManager from previous application attempts. */ @Private @Unstable - public abstract void setContainersFromPreviousAttempt( + public abstract void setContainersFromPreviousAttempts( List containersFromPreviousAttempt); + + /** + * Get the list of NMTokens for communicating with the NMs where the + * containers of previous application attempts are running. + * + * @return the list of NMTokens for communicating with the NMs where the + * containers of previous application attempts are running. + * + * @see RegisterApplicationMasterResponse#getContainersFromPreviousAttempts() + */ + @Public + @Stable + public abstract List getNMTokensFromPreviousAttempts(); + + /** + * Set the list of NMTokens for communicating with the NMs where the the + * containers of previous application attempts are running. + * + * @param nmTokens + * the list of NMTokens for communicating with the NMs where the + * containers of previous application attempts are running. + */ + @Private + @Unstable + public abstract void setNMTokensFromPreviousAttempts(List nmTokens); } Modified: hadoop/common/branches/branch-2.4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/NMToken.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/NMToken.java?rev=1572233&r1=1572232&r2=1572233&view=diff ============================================================================== --- hadoop/common/branches/branch-2.4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/NMToken.java (original) +++ hadoop/common/branches/branch-2.4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/NMToken.java Wed Feb 26 20:20:59 2014 @@ -72,4 +72,37 @@ public abstract class NMToken { @Stable public abstract void setToken(Token token); + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = + prime * result + ((getNodeId() == null) ? 0 : getNodeId().hashCode()); + result = + prime * result + ((getToken() == null) ? 0 : getToken().hashCode()); + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + if (getClass() != obj.getClass()) + return false; + NMToken other = (NMToken) obj; + if (getNodeId() == null) { + if (other.getNodeId() != null) + return false; + } else if (!getNodeId().equals(other.getNodeId())) + return false; + if (getToken() == null) { + if (other.getToken() != null) + return false; + } else if (!getToken().equals(other.getToken())) + return false; + return true; + } } Modified: hadoop/common/branches/branch-2.4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto?rev=1572233&r1=1572232&r2=1572233&view=diff ============================================================================== --- hadoop/common/branches/branch-2.4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto (original) +++ hadoop/common/branches/branch-2.4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto Wed Feb 26 20:20:59 2014 @@ -44,8 +44,9 @@ message RegisterApplicationMasterRespons optional ResourceProto maximumCapability = 1; optional bytes client_to_am_token_master_key = 2; repeated ApplicationACLMapProto application_ACLs = 3; - repeated ContainerProto containers_from_previous_attempt = 4; + repeated ContainerProto containers_from_previous_attempts = 4; optional string queue = 5; + repeated NMTokenProto nm_tokens_from_previous_attempts = 6; } message FinishApplicationMasterRequestProto { Modified: hadoop/common/branches/branch-2.4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java?rev=1572233&r1=1572232&r2=1572233&view=diff ============================================================================== --- hadoop/common/branches/branch-2.4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java (original) +++ hadoop/common/branches/branch-2.4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java Wed Feb 26 20:20:59 2014 @@ -74,6 +74,7 @@ import org.apache.hadoop.yarn.api.record import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.hadoop.yarn.api.records.LocalResourceType; import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; +import org.apache.hadoop.yarn.api.records.NMToken; import org.apache.hadoop.yarn.api.records.NodeReport; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; @@ -542,7 +543,7 @@ public class ApplicationMaster { } List previousAMRunningContainers = - response.getContainersFromPreviousAttempt(); + response.getContainersFromPreviousAttempts(); LOG.info("Received " + previousAMRunningContainers.size() + " previous AM's running containers on AM registration."); numAllocatedContainers.addAndGet(previousAMRunningContainers.size()); Modified: hadoop/common/branches/branch-2.4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java?rev=1572233&r1=1572232&r2=1572233&view=diff ============================================================================== --- hadoop/common/branches/branch-2.4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java (original) +++ hadoop/common/branches/branch-2.4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java Wed Feb 26 20:20:59 2014 @@ -195,6 +195,12 @@ public class AMRMClientImpl nmTokens) { + for (NMToken token : nmTokens) { String nodeId = token.getNodeId().toString(); if (getNMTokenCache().containsToken(nodeId)) { - LOG.debug("Replacing token for : " + nodeId); + if (LOG.isDebugEnabled()) { + LOG.debug("Replacing token for : " + nodeId); + } } else { - LOG.debug("Received new token for : " + nodeId); + if (LOG.isDebugEnabled()) { + LOG.debug("Received new token for : " + nodeId); + } } getNMTokenCache().setToken(nodeId, token.getToken()); } Modified: hadoop/common/branches/branch-2.4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/RegisterApplicationMasterResponsePBImpl.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/RegisterApplicationMasterResponsePBImpl.java?rev=1572233&r1=1572232&r2=1572233&view=diff ============================================================================== --- hadoop/common/branches/branch-2.4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/RegisterApplicationMasterResponsePBImpl.java (original) +++ hadoop/common/branches/branch-2.4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/RegisterApplicationMasterResponsePBImpl.java Wed Feb 26 20:20:59 2014 @@ -31,13 +31,16 @@ import org.apache.hadoop.classification. import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; import org.apache.hadoop.yarn.api.records.ApplicationAccessType; import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.NMToken; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.impl.pb.ContainerPBImpl; +import org.apache.hadoop.yarn.api.records.impl.pb.NMTokenPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils; import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl; import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationACLMapProto; import org.apache.hadoop.yarn.proto.YarnProtos.ContainerProto; import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.NMTokenProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.RegisterApplicationMasterResponseProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.RegisterApplicationMasterResponseProtoOrBuilder; @@ -56,7 +59,8 @@ public class RegisterApplicationMasterRe private Resource maximumResourceCapability; private Map applicationACLS = null; - private List containersFromPreviousAttempt = null; + private List containersFromPreviousAttempts = null; + private List nmTokens = null; public RegisterApplicationMasterResponsePBImpl() { builder = RegisterApplicationMasterResponseProto.newBuilder(); @@ -110,8 +114,13 @@ public class RegisterApplicationMasterRe if (this.applicationACLS != null) { addApplicationACLs(); } - if (this.containersFromPreviousAttempt != null) { - addRunningContainersToProto(); + if (this.containersFromPreviousAttempts != null) { + addContainersFromPreviousAttemptToProto(); + } + if (nmTokens != null) { + builder.clearNmTokensFromPreviousAttempts(); + Iterable iterable = getTokenProtoIterable(nmTokens); + builder.addAllNmTokensFromPreviousAttempts(iterable); } } @@ -236,21 +245,22 @@ public class RegisterApplicationMasterRe } @Override - public List getContainersFromPreviousAttempt() { - if (this.containersFromPreviousAttempt != null) { - return this.containersFromPreviousAttempt; + public List getContainersFromPreviousAttempts() { + if (this.containersFromPreviousAttempts != null) { + return this.containersFromPreviousAttempts; } - initRunningContainersList(); - return this.containersFromPreviousAttempt; + initContainersPreviousAttemptList(); + return this.containersFromPreviousAttempts; } @Override - public void setContainersFromPreviousAttempt(final List containers) { + public void + setContainersFromPreviousAttempts(final List containers) { if (containers == null) { return; } - this.containersFromPreviousAttempt = new ArrayList(); - this.containersFromPreviousAttempt.addAll(containers); + this.containersFromPreviousAttempts = new ArrayList(); + this.containersFromPreviousAttempts.addAll(containers); } @Override @@ -272,25 +282,88 @@ public class RegisterApplicationMasterRe } } - private void initRunningContainersList() { - RegisterApplicationMasterResponseProtoOrBuilder p = viaProto ? proto : builder; - List list = p.getContainersFromPreviousAttemptList(); - containersFromPreviousAttempt = new ArrayList(); + + private void initContainersPreviousAttemptList() { + RegisterApplicationMasterResponseProtoOrBuilder p = + viaProto ? proto : builder; + List list = p.getContainersFromPreviousAttemptsList(); + containersFromPreviousAttempts = new ArrayList(); for (ContainerProto c : list) { - containersFromPreviousAttempt.add(convertFromProtoFormat(c)); + containersFromPreviousAttempts.add(convertFromProtoFormat(c)); } } - private void addRunningContainersToProto() { + private void addContainersFromPreviousAttemptToProto() { maybeInitBuilder(); - builder.clearContainersFromPreviousAttempt(); + builder.clearContainersFromPreviousAttempts(); List list = new ArrayList(); - for (Container c : containersFromPreviousAttempt) { + for (Container c : containersFromPreviousAttempts) { list.add(convertToProtoFormat(c)); } - builder.addAllContainersFromPreviousAttempt(list); + builder.addAllContainersFromPreviousAttempts(list); + } + + + @Override + public List getNMTokensFromPreviousAttempts() { + if (nmTokens != null) { + return nmTokens; + } + initLocalNewNMTokenList(); + return nmTokens; } + @Override + public void setNMTokensFromPreviousAttempts(final List nmTokens) { + if (nmTokens == null || nmTokens.isEmpty()) { + if (this.nmTokens != null) { + this.nmTokens.clear(); + } + builder.clearNmTokensFromPreviousAttempts(); + return; + } + this.nmTokens = new ArrayList(); + this.nmTokens.addAll(nmTokens); + } + + private synchronized void initLocalNewNMTokenList() { + RegisterApplicationMasterResponseProtoOrBuilder p = viaProto ? proto : builder; + List list = p.getNmTokensFromPreviousAttemptsList(); + nmTokens = new ArrayList(); + for (NMTokenProto t : list) { + nmTokens.add(convertFromProtoFormat(t)); + } + } + + private synchronized Iterable getTokenProtoIterable( + final List nmTokenList) { + maybeInitBuilder(); + return new Iterable() { + @Override + public synchronized Iterator iterator() { + return new Iterator() { + + Iterator iter = nmTokenList.iterator(); + + @Override + public boolean hasNext() { + return iter.hasNext(); + } + + @Override + public NMTokenProto next() { + return convertToProtoFormat(iter.next()); + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + }; + } + }; + } + private Resource convertFromProtoFormat(ResourceProto resource) { return new ResourcePBImpl(resource); } @@ -306,4 +379,12 @@ public class RegisterApplicationMasterRe private ContainerProto convertToProtoFormat(Container t) { return ((ContainerPBImpl) t).getProto(); } + + private NMTokenProto convertToProtoFormat(NMToken token) { + return ((NMTokenPBImpl) token).getProto(); + } + + private NMToken convertFromProtoFormat(NMTokenProto proto) { + return new NMTokenPBImpl(proto); + } } Modified: hadoop/common/branches/branch-2.4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/NMTokenPBImpl.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/NMTokenPBImpl.java?rev=1572233&r1=1572232&r2=1572233&view=diff ============================================================================== --- hadoop/common/branches/branch-2.4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/NMTokenPBImpl.java (original) +++ hadoop/common/branches/branch-2.4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/NMTokenPBImpl.java Wed Feb 26 20:20:59 2014 @@ -47,7 +47,7 @@ public class NMTokenPBImpl extends NMTok this.proto = proto; viaProto = true; } - + @Override public synchronized NodeId getNodeId() { NMTokenProtoOrBuilder p = viaProto ? proto : builder; Modified: hadoop/common/branches/branch-2.4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java?rev=1572233&r1=1572232&r2=1572233&view=diff ============================================================================== --- hadoop/common/branches/branch-2.4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java (original) +++ hadoop/common/branches/branch-2.4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java Wed Feb 26 20:20:59 2014 @@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.re import java.io.IOException; import java.net.InetSocketAddress; +import java.net.UnknownHostException; import java.util.ArrayList; import java.util.HashSet; import java.util.List; @@ -51,6 +52,7 @@ import org.apache.hadoop.yarn.api.record import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.NMToken; import org.apache.hadoop.yarn.api.records.NodeReport; import org.apache.hadoop.yarn.api.records.PreemptionContainer; import org.apache.hadoop.yarn.api.records.PreemptionContract; @@ -280,10 +282,32 @@ public class ApplicationMasterService ex .getMasterKey(applicationAttemptId).getEncoded())); } - List containerList = + // For work-preserving AM restart, retrieve previous attempts' containers + // and corresponding NM tokens. + List transferredContainers = ((AbstractYarnScheduler) rScheduler) .getTransferredContainers(applicationAttemptId); - response.setContainersFromPreviousAttempt(containerList); + if (!transferredContainers.isEmpty()) { + response.setContainersFromPreviousAttempts(transferredContainers); + List nmTokens = new ArrayList(); + for (Container container : transferredContainers) { + try { + nmTokens.add(rmContext.getNMTokenSecretManager() + .createAndGetNMToken(app.getUser(), applicationAttemptId, + container)); + } catch (IllegalArgumentException e) { + // if it's a DNS issue, throw UnknowHostException directly and that + // will be automatically retried by RMProxy in RPC layer. + if (e.getCause() instanceof UnknownHostException) { + throw (UnknownHostException) e.getCause(); + } + } + } + response.setNMTokensFromPreviousAttempts(nmTokens); + LOG.info("Application " + appID + " retrieved " + + transferredContainers.size() + " containers from previous" + + " attempts and " + nmTokens.size() + " NM tokens."); + } return response; } } Modified: hadoop/common/branches/branch-2.4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java?rev=1572233&r1=1572232&r2=1572233&view=diff ============================================================================== --- hadoop/common/branches/branch-2.4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java (original) +++ hadoop/common/branches/branch-2.4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java Wed Feb 26 20:20:59 2014 @@ -385,9 +385,8 @@ public class SchedulerApplicationAttempt } } catch (IllegalArgumentException e) { // DNS might be down, skip returning this container. - LOG.error( - "Error trying to assign container token to allocated container " - + container.getId(), e); + LOG.error("Error trying to assign container token and NM token to" + + " an allocated container " + container.getId(), e); continue; } returnContainerList.add(container); Modified: hadoop/common/branches/branch-2.4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java?rev=1572233&r1=1572232&r2=1572233&view=diff ============================================================================== --- hadoop/common/branches/branch-2.4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java (original) +++ hadoop/common/branches/branch-2.4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java Wed Feb 26 20:20:59 2014 @@ -486,6 +486,7 @@ public class MockRM extends ResourceMana public static MockAM launchAM(RMApp app, MockRM rm, MockNM nm) throws Exception { + rm.waitForState(app.getApplicationId(), RMAppState.ACCEPTED); RMAppAttempt attempt = app.getCurrentAppAttempt(); nm.nodeHeartbeat(true); MockAM am = rm.sendAMLaunched(attempt.getAppAttemptId()); Modified: hadoop/common/branches/branch-2.4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java?rev=1572233&r1=1572232&r2=1572233&view=diff ============================================================================== --- hadoop/common/branches/branch-2.4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java (original) +++ hadoop/common/branches/branch-2.4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java Wed Feb 26 20:20:59 2014 @@ -24,6 +24,7 @@ import java.util.List; import junit.framework.Assert; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; import org.apache.hadoop.yarn.api.records.ApplicationAccessType; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; @@ -31,6 +32,7 @@ import org.apache.hadoop.yarn.api.record import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.NMToken; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.MockAM; @@ -160,11 +162,11 @@ public class TestAMRestart { am2.registerAppAttempt(); // Assert two containers are running: container2 and container3; - Assert.assertEquals(2, registerResponse.getContainersFromPreviousAttempt() + Assert.assertEquals(2, registerResponse.getContainersFromPreviousAttempts() .size()); boolean containerId2Exists = false, containerId3Exists = false; for (Container container : registerResponse - .getContainersFromPreviousAttempt()) { + .getContainersFromPreviousAttempts()) { if (container.getId().equals(containerId2)) { containerId2Exists = true; } @@ -232,4 +234,100 @@ public class TestAMRestart { rm1.stop(); } + + @Test + public void testNMTokensRebindOnAMRestart() throws Exception { + YarnConfiguration conf = new YarnConfiguration(); + conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 3); + + MockRM rm1 = new MockRM(conf); + rm1.start(); + RMApp app1 = + rm1.submitApp(200, "myname", "myuser", + new HashMap(), false, "default", -1, + null, "MAPREDUCE", false, true); + MockNM nm1 = + new MockNM("127.0.0.1:1234", 8000, rm1.getResourceTrackerService()); + nm1.registerNode(); + MockNM nm2 = + new MockNM("127.1.1.1:4321", 8000, rm1.getResourceTrackerService()); + nm2.registerNode(); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); + + int NUM_CONTAINERS = 1; + List containers = new ArrayList(); + // nmTokens keeps track of all the nmTokens issued in the allocate call. + List expectedNMTokens = new ArrayList(); + + // am1 allocate 1 container on nm1. + while (true) { + AllocateResponse response = + am1.allocate("127.0.0.1", 2000, NUM_CONTAINERS, + new ArrayList()); + nm1.nodeHeartbeat(true); + containers.addAll(response.getAllocatedContainers()); + expectedNMTokens.addAll(response.getNMTokens()); + if (containers.size() == NUM_CONTAINERS) { + break; + } + Thread.sleep(200); + System.out.println("Waiting for container to be allocated."); + } + // launch the container + nm1.nodeHeartbeat(am1.getApplicationAttemptId(), 2, ContainerState.RUNNING); + ContainerId containerId2 = + ContainerId.newInstance(am1.getApplicationAttemptId(), 2); + rm1.waitForState(nm1, containerId2, RMContainerState.RUNNING); + + // fail am1 + nm1.nodeHeartbeat(am1.getApplicationAttemptId(), 1, ContainerState.COMPLETE); + am1.waitForState(RMAppAttemptState.FAILED); + rm1.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED); + + // restart the am + MockAM am2 = MockRM.launchAM(app1, rm1, nm1); + RegisterApplicationMasterResponse registerResponse = + am2.registerAppAttempt(); + rm1.waitForState(app1.getApplicationId(), RMAppState.RUNNING); + + // check am2 get the nm token from am1. + Assert.assertEquals(expectedNMTokens, + registerResponse.getNMTokensFromPreviousAttempts()); + + // am2 allocate 1 container on nm2 + containers = new ArrayList(); + while (true) { + AllocateResponse allocateResponse = + am2.allocate("127.1.1.1", 4000, NUM_CONTAINERS, + new ArrayList()); + nm2.nodeHeartbeat(true); + containers.addAll(allocateResponse.getAllocatedContainers()); + expectedNMTokens.addAll(allocateResponse.getNMTokens()); + if (containers.size() == NUM_CONTAINERS) { + break; + } + Thread.sleep(200); + System.out.println("Waiting for container to be allocated."); + } + nm1.nodeHeartbeat(am2.getApplicationAttemptId(), 2, ContainerState.RUNNING); + ContainerId am2ContainerId2 = + ContainerId.newInstance(am2.getApplicationAttemptId(), 2); + rm1.waitForState(nm1, am2ContainerId2, RMContainerState.RUNNING); + + // fail am2. + nm1.nodeHeartbeat(am2.getApplicationAttemptId(), 1, ContainerState.COMPLETE); + am2.waitForState(RMAppAttemptState.FAILED); + rm1.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED); + + // restart am + MockAM am3 = MockRM.launchAM(app1, rm1, nm1); + registerResponse = am3.registerAppAttempt(); + rm1.waitForState(app1.getApplicationId(), RMAppState.RUNNING); + + // check am3 get the NM token from both am1 and am2; + List transferredTokens = registerResponse.getNMTokensFromPreviousAttempts(); + Assert.assertEquals(2, transferredTokens.size()); + Assert.assertTrue(transferredTokens.containsAll(expectedNMTokens)); + rm1.stop(); + } }