Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 335C1200B85 for ; Thu, 1 Sep 2016 05:23:54 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 254FD160AB4; Thu, 1 Sep 2016 03:23:39 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 07563160AB5 for ; Thu, 1 Sep 2016 05:23:34 +0200 (CEST) Received: (qmail 92799 invoked by uid 500); 1 Sep 2016 03:23:23 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 92530 invoked by uid 99); 1 Sep 2016 03:23:23 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 01 Sep 2016 03:23:23 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 50EACE055E; Thu, 1 Sep 2016 03:23:23 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: asuresh@apache.org To: common-commits@hadoop.apache.org Date: Thu, 01 Sep 2016 03:23:25 -0000 Message-Id: <885b5cf7f2be4cdda5d3d4fe0d00bb0d@git.apache.org> In-Reply-To: <7cc426b311594f198335732b0c630564@git.apache.org> References: <7cc426b311594f198335732b0c630564@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [3/4] hadoop git commit: YARN-5221. Expose UpdateResourceRequest API to allow AM to request for change in container properties. (asuresh) archived-at: Thu, 01 Sep 2016 03:23:54 -0000 http://git-wip-us.apache.org/repos/asf/hadoop/blob/979b29a0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java index da87465..8625e25 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java @@ -36,6 +36,8 @@ import org.apache.hadoop.yarn.api.records.PreemptionMessage; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Token; +import org.apache.hadoop.yarn.api.records.UpdateContainerError; +import org.apache.hadoop.yarn.api.records.UpdatedContainer; import org.apache.hadoop.yarn.api.records.impl.pb.ContainerPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ContainerStatusPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.NMTokenPBImpl; @@ -45,12 +47,14 @@ import org.apache.hadoop.yarn.api.records.impl.pb.PriorityPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils; import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.TokenPBImpl; +import org.apache.hadoop.yarn.api.records.impl.pb.UpdatedContainerPBImpl; import org.apache.hadoop.yarn.proto.YarnProtos.ContainerProto; import org.apache.hadoop.yarn.proto.YarnProtos.ContainerStatusProto; import org.apache.hadoop.yarn.proto.YarnProtos.NodeReportProto; import org.apache.hadoop.yarn.proto.YarnProtos.PreemptionMessageProto; import org.apache.hadoop.yarn.proto.YarnProtos.PriorityProto; import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto; +import org.apache.hadoop.yarn.proto.YarnServiceProtos; import org.apache.hadoop.yarn.proto.YarnServiceProtos.AllocateResponseProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.AllocateResponseProtoOrBuilder; import org.apache.hadoop.yarn.proto.YarnServiceProtos.NMTokenProto; @@ -69,10 +73,10 @@ public class AllocateResponsePBImpl extends AllocateResponse { private List allocatedContainers = null; private List nmTokens = null; private List completedContainersStatuses = null; - private List increasedContainers = null; - private List decreasedContainers = null; + private List updatedContainers = null; private List updatedNodes = null; + private List updateErrors = null; private PreemptionMessage preempt; private Token amrmToken = null; private Priority appPriority = null; @@ -143,17 +147,17 @@ public class AllocateResponsePBImpl extends AllocateResponse { if (this.preempt != null) { builder.setPreempt(convertToProtoFormat(this.preempt)); } - if (this.increasedContainers != null) { - builder.clearIncreasedContainers(); - Iterable iterable = - getContainerProtoIterable(this.increasedContainers); - builder.addAllIncreasedContainers(iterable); + if (this.updatedContainers != null) { + builder.clearUpdatedContainers(); + Iterable iterable = + getUpdatedContainerProtoIterable(this.updatedContainers); + builder.addAllUpdatedContainers(iterable); } - if (this.decreasedContainers != null) { - builder.clearDecreasedContainers(); - Iterable iterable = - getContainerProtoIterable(this.decreasedContainers); - builder.addAllDecreasedContainers(iterable); + if (this.updateErrors != null) { + builder.clearUpdateErrors(); + Iterable iterable = + getUpdateErrorsIterable(this.updateErrors); + builder.addAllUpdateErrors(iterable); } if (this.amrmToken != null) { builder.setAmRmToken(convertToProtoFormat(this.amrmToken)); @@ -248,49 +252,52 @@ public class AllocateResponsePBImpl extends AllocateResponse { } @Override - public synchronized List getAllocatedContainers() { - initLocalNewContainerList(); - return this.allocatedContainers; + public synchronized List getUpdateErrors() { + initLocalUpdateErrorsList(); + return this.updateErrors; } @Override - public synchronized void setAllocatedContainers( - final List containers) { - if (containers == null) + public synchronized void setUpdateErrors( + List updateErrors) { + if (updateErrors == null) { + this.updateErrors.clear(); return; - // this looks like a bug because it results in append and not set - initLocalNewContainerList(); - allocatedContainers.addAll(containers); + } + this.updateErrors = new ArrayList<>( + updateErrors.size()); + this.updateErrors.addAll(updateErrors); } @Override - public synchronized List getIncreasedContainers() { - initLocalIncreasedContainerList(); - return this.increasedContainers; + public synchronized List getAllocatedContainers() { + initLocalNewContainerList(); + return this.allocatedContainers; } @Override - public synchronized void setIncreasedContainers( + public synchronized void setAllocatedContainers( final List containers) { if (containers == null) return; - initLocalIncreasedContainerList(); - increasedContainers.addAll(containers); + // this looks like a bug because it results in append and not set + initLocalNewContainerList(); + allocatedContainers.addAll(containers); } @Override - public synchronized List getDecreasedContainers() { - initLocalDecreasedContainerList(); - return this.decreasedContainers; + public synchronized List getUpdatedContainers() { + initLocalUpdatedContainerList(); + return this.updatedContainers; } @Override - public synchronized void setDecreasedContainers( - final List containers) { + public synchronized void setUpdatedContainers( + final List containers) { if (containers == null) return; - initLocalDecreasedContainerList(); - decreasedContainers.addAll(containers); + initLocalUpdatedContainerList(); + updatedContainers.addAll(containers); } //// Finished containers @@ -406,29 +413,17 @@ public class AllocateResponsePBImpl extends AllocateResponse { this.appPriority = priority; } - private synchronized void initLocalIncreasedContainerList() { - if (this.increasedContainers != null) { + private synchronized void initLocalUpdatedContainerList() { + if (this.updatedContainers != null) { return; } AllocateResponseProtoOrBuilder p = viaProto ? proto : builder; - List list = p.getIncreasedContainersList(); - increasedContainers = new ArrayList<>(); + List list = + p.getUpdatedContainersList(); + updatedContainers = new ArrayList<>(); - for (ContainerProto c : list) { - increasedContainers.add(convertFromProtoFormat(c)); - } - } - - private synchronized void initLocalDecreasedContainerList() { - if (this.decreasedContainers != null) { - return; - } - AllocateResponseProtoOrBuilder p = viaProto ? proto : builder; - List list = p.getDecreasedContainersList(); - decreasedContainers = new ArrayList<>(); - - for (ContainerProto c : list) { - decreasedContainers.add(convertFromProtoFormat(c)); + for (YarnServiceProtos.UpdatedContainerProto c : list) { + updatedContainers.add(convertFromProtoFormat(c)); } } @@ -474,6 +469,53 @@ public class AllocateResponsePBImpl extends AllocateResponse { } } + private synchronized void initLocalUpdateErrorsList() { + if (updateErrors != null) { + return; + } + AllocateResponseProtoOrBuilder p = viaProto ? proto : builder; + List list = + p.getUpdateErrorsList(); + this.updateErrors = new ArrayList(); + for (YarnServiceProtos.UpdateContainerErrorProto t : list) { + updateErrors.add(ProtoUtils.convertFromProtoFormat(t)); + } + } + + private synchronized Iterable + getUpdateErrorsIterable( + final List updateErrorsList) { + maybeInitBuilder(); + return new Iterable() { + @Override + public synchronized Iterator iterator() { + return new Iterator() { + + private Iterator iter = + updateErrorsList.iterator(); + + @Override + public synchronized boolean hasNext() { + return iter.hasNext(); + } + + @Override + public synchronized YarnServiceProtos.UpdateContainerErrorProto + next() { + return ProtoUtils.convertToProtoFormat(iter.next()); + } + + @Override + public synchronized void remove() { + throw new UnsupportedOperationException(); + } + }; + + } + }; + } + private synchronized Iterable getContainerProtoIterable( final List newContainersList) { maybeInitBuilder(); @@ -505,6 +547,40 @@ public class AllocateResponsePBImpl extends AllocateResponse { }; } + private synchronized Iterable + getUpdatedContainerProtoIterable( + final List newUpdatedContainersList) { + maybeInitBuilder(); + return new Iterable() { + @Override + public synchronized Iterator + iterator() { + return new Iterator() { + + private Iterator iter = + newUpdatedContainersList.iterator(); + + @Override + public synchronized boolean hasNext() { + return iter.hasNext(); + } + + @Override + public synchronized YarnServiceProtos.UpdatedContainerProto next() { + return convertToProtoFormat(iter.next()); + } + + @Override + public synchronized void remove() { + throw new UnsupportedOperationException(); + + } + }; + + } + }; + } + private synchronized Iterable getTokenProtoIterable( final List nmTokenList) { maybeInitBuilder(); @@ -631,6 +707,16 @@ public class AllocateResponsePBImpl extends AllocateResponse { return ((ContainerPBImpl)t).getProto(); } + private synchronized UpdatedContainerPBImpl convertFromProtoFormat( + YarnServiceProtos.UpdatedContainerProto p) { + return new UpdatedContainerPBImpl(p); + } + + private synchronized YarnServiceProtos.UpdatedContainerProto + convertToProtoFormat(UpdatedContainer t) { + return ((UpdatedContainerPBImpl)t).getProto(); + } + private synchronized ContainerStatusPBImpl convertFromProtoFormat( ContainerStatusProto p) { return new ContainerStatusPBImpl(p); http://git-wip-us.apache.org/repos/asf/hadoop/blob/979b29a0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerPBImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerPBImpl.java index 1700068..f27e16a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerPBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerPBImpl.java @@ -248,6 +248,18 @@ public class ContainerPBImpl extends Container { this.containerToken = containerToken; } + @Override + public int getVersion() { + ContainerProtoOrBuilder p = viaProto ? proto : builder; + return p.getVersion(); + } + + @Override + public void setVersion(int version) { + maybeInitBuilder(); + builder.setVersion(version); + } + private ContainerIdPBImpl convertFromProtoFormat(ContainerIdProto p) { return new ContainerIdPBImpl(p); } @@ -292,6 +304,7 @@ public class ContainerPBImpl extends Container { StringBuilder sb = new StringBuilder(); sb.append("Container: ["); sb.append("ContainerId: ").append(getId()).append(", "); + sb.append("Version: ").append(getVersion()).append(", "); sb.append("NodeId: ").append(getNodeId()).append(", "); sb.append("NodeHttpAddress: ").append(getNodeHttpAddress()).append(", "); sb.append("Resource: ").append(getResource()).append(", "); http://git-wip-us.apache.org/repos/asf/hadoop/blob/979b29a0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerResourceChangeRequestPBImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerResourceChangeRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerResourceChangeRequestPBImpl.java deleted file mode 100644 index f382b8c..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerResourceChangeRequestPBImpl.java +++ /dev/null @@ -1,141 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.yarn.api.records.impl.pb; - -import org.apache.hadoop.yarn.api.records.ContainerId; -import org.apache.hadoop.yarn.api.records.ContainerResourceChangeRequest; -import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProto; -import org.apache.hadoop.yarn.proto.YarnProtos.ContainerResourceChangeRequestProto; -import org.apache.hadoop.yarn.proto.YarnProtos.ContainerResourceChangeRequestProtoOrBuilder; -import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto; - - -public class ContainerResourceChangeRequestPBImpl extends - ContainerResourceChangeRequest { - ContainerResourceChangeRequestProto proto = - ContainerResourceChangeRequestProto.getDefaultInstance(); - ContainerResourceChangeRequestProto.Builder builder = null; - boolean viaProto = false; - - private ContainerId existingContainerId = null; - private Resource targetCapability = null; - - public ContainerResourceChangeRequestPBImpl() { - builder = ContainerResourceChangeRequestProto.newBuilder(); - } - - public ContainerResourceChangeRequestPBImpl( - ContainerResourceChangeRequestProto proto) { - this.proto = proto; - viaProto = true; - } - - public ContainerResourceChangeRequestProto getProto() { - mergeLocalToProto(); - proto = viaProto ? proto : builder.build(); - viaProto = true; - return proto; - } - - @Override - public ContainerId getContainerId() { - ContainerResourceChangeRequestProtoOrBuilder p = viaProto ? proto - : builder; - if (this.existingContainerId != null) { - return this.existingContainerId; - } - if (p.hasContainerId()) { - this.existingContainerId = convertFromProtoFormat(p.getContainerId()); - } - return this.existingContainerId; - } - - @Override - public void setContainerId(ContainerId existingContainerId) { - maybeInitBuilder(); - if (existingContainerId == null) { - builder.clearContainerId(); - } - this.existingContainerId = existingContainerId; - } - - @Override - public Resource getCapability() { - ContainerResourceChangeRequestProtoOrBuilder p = viaProto ? proto - : builder; - if (this.targetCapability != null) { - return this.targetCapability; - } - if (p.hasCapability()) { - this.targetCapability = convertFromProtoFormat(p.getCapability()); - } - return this.targetCapability; - } - - @Override - public void setCapability(Resource targetCapability) { - maybeInitBuilder(); - if (targetCapability == null) { - builder.clearCapability(); - } - this.targetCapability = targetCapability; - } - - private ContainerIdPBImpl convertFromProtoFormat(ContainerIdProto p) { - return new ContainerIdPBImpl(p); - } - - private ContainerIdProto convertToProtoFormat(ContainerId t) { - return ((ContainerIdPBImpl) t).getProto(); - } - - private Resource convertFromProtoFormat(ResourceProto p) { - return new ResourcePBImpl(p); - } - - private ResourceProto convertToProtoFormat(Resource t) { - return ((ResourcePBImpl) t).getProto(); - } - - private void mergeLocalToProto() { - if (viaProto) { - maybeInitBuilder(); - } - mergeLocalToBuilder(); - proto = builder.build(); - viaProto = true; - } - - private void maybeInitBuilder() { - if (viaProto || builder == null) { - builder = ContainerResourceChangeRequestProto.newBuilder(proto); - } - viaProto = false; - } - - private void mergeLocalToBuilder() { - if (this.existingContainerId != null) { - builder.setContainerId(convertToProtoFormat(this.existingContainerId)); - } - if (this.targetCapability != null) { - builder.setCapability(convertToProtoFormat(this.targetCapability)); - } - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/979b29a0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java index e742f4c..590c088 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java @@ -26,7 +26,10 @@ import org.apache.hadoop.yarn.api.protocolrecords.ApplicationsRequestScope; import org.apache.hadoop.yarn.api.records.AMCommand; import org.apache.hadoop.yarn.api.records.ApplicationAccessType; import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerState; +import org.apache.hadoop.yarn.api.records.ContainerUpdateType; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.LocalResourceType; import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; @@ -36,11 +39,16 @@ import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.api.records.QueueACL; import org.apache.hadoop.yarn.api.records.QueueState; import org.apache.hadoop.yarn.api.records.ReservationRequestInterpreter; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.UpdateContainerError; +import org.apache.hadoop.yarn.api.records.UpdateContainerRequest; import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState; import org.apache.hadoop.yarn.api.records.YarnApplicationState; +import org.apache.hadoop.yarn.proto.YarnProtos; import org.apache.hadoop.yarn.proto.YarnProtos.AMCommandProto; import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationAccessTypeProto; import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationResourceUsageReportProto; +import org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProto; import org.apache.hadoop.yarn.proto.YarnProtos.ContainerStateProto; import org.apache.hadoop.yarn.proto.YarnProtos.FinalApplicationStatusProto; import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceTypeProto; @@ -51,10 +59,12 @@ import org.apache.hadoop.yarn.proto.YarnProtos.NodeStateProto; import org.apache.hadoop.yarn.proto.YarnProtos.QueueACLProto; import org.apache.hadoop.yarn.proto.YarnProtos.QueueStateProto; import org.apache.hadoop.yarn.proto.YarnProtos.ReservationRequestInterpreterProto; +import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto; import org.apache.hadoop.yarn.proto.YarnProtos.YarnApplicationAttemptStateProto; import org.apache.hadoop.yarn.proto.YarnProtos.YarnApplicationStateProto; import org.apache.hadoop.yarn.proto.YarnProtos.ContainerTypeProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.ContainerUpdateTypeProto; import org.apache.hadoop.yarn.server.api.ContainerType; import com.google.protobuf.ByteString; @@ -282,4 +292,84 @@ public class ProtoUtils { public static ContainerType convertFromProtoFormat(ContainerTypeProto e) { return ContainerType.valueOf(e.name()); } + + /* + * ContainerUpdateType + */ + public static ContainerUpdateTypeProto convertToProtoFormat( + ContainerUpdateType e) { + return ContainerUpdateTypeProto.valueOf(e.name()); + } + public static ContainerUpdateType convertFromProtoFormat( + ContainerUpdateTypeProto e) { + return ContainerUpdateType.valueOf(e.name()); + } + + /* + * Resource + */ + public static synchronized ResourceProto convertToProtoFormat(Resource r) { + return ((ResourcePBImpl) r).getProto(); + } + + public static Resource convertFromProtoFormat(ResourceProto resource) { + return new ResourcePBImpl(resource); + } + + /* + * Container + */ + public static YarnProtos.ContainerProto convertToProtoFormat( + Container t) { + return ((ContainerPBImpl)t).getProto(); + } + + public static ContainerPBImpl convertFromProtoFormat( + YarnProtos.ContainerProto t) { + return new ContainerPBImpl(t); + } + + public static ContainerStatusPBImpl convertFromProtoFormat( + YarnProtos.ContainerStatusProto p) { + return new ContainerStatusPBImpl(p); + } + + /* + * ContainerId + */ + public static ContainerIdPBImpl convertFromProtoFormat(ContainerIdProto p) { + return new ContainerIdPBImpl(p); + } + + public static ContainerIdProto convertToProtoFormat(ContainerId t) { + return ((ContainerIdPBImpl) t).getProto(); + } + + /* + * UpdateContainerRequest + */ + public static UpdateContainerRequestPBImpl convertFromProtoFormat( + YarnServiceProtos.UpdateContainerRequestProto p) { + return new UpdateContainerRequestPBImpl(p); + } + + public static YarnServiceProtos.UpdateContainerRequestProto + convertToProtoFormat(UpdateContainerRequest t) { + return ((UpdateContainerRequestPBImpl) t).getProto(); + } + + /* + * UpdateContainerError + */ + public static UpdateContainerErrorPBImpl convertFromProtoFormat( + YarnServiceProtos.UpdateContainerErrorProto p) { + return new UpdateContainerErrorPBImpl(p); + } + + public static YarnServiceProtos.UpdateContainerErrorProto + convertToProtoFormat(UpdateContainerError t) { + return ((UpdateContainerErrorPBImpl) t).getProto(); + } } + + http://git-wip-us.apache.org/repos/asf/hadoop/blob/979b29a0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/UpdateContainerErrorPBImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/UpdateContainerErrorPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/UpdateContainerErrorPBImpl.java new file mode 100644 index 0000000..fb6c1a7 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/UpdateContainerErrorPBImpl.java @@ -0,0 +1,125 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api.records.impl.pb; + +import org.apache.hadoop.yarn.api.records.UpdateContainerError; +import org.apache.hadoop.yarn.api.records.UpdateContainerRequest; +import org.apache.hadoop.yarn.proto.YarnServiceProtos; + +/** + * Implementation of UpdateContainerError. + */ +public class UpdateContainerErrorPBImpl extends UpdateContainerError { + private YarnServiceProtos.UpdateContainerErrorProto proto = + YarnServiceProtos.UpdateContainerErrorProto.getDefaultInstance(); + private YarnServiceProtos.UpdateContainerErrorProto.Builder builder = null; + private boolean viaProto = false; + + private String reason = null; + private UpdateContainerRequest updateRequest = null; + + public UpdateContainerErrorPBImpl() { + builder = YarnServiceProtos.UpdateContainerErrorProto.newBuilder(); + } + + public UpdateContainerErrorPBImpl(YarnServiceProtos + .UpdateContainerErrorProto proto) { + this.proto = proto; + viaProto = true; + } + + public YarnServiceProtos.UpdateContainerErrorProto getProto() { + mergeLocalToProto(); + proto = viaProto ? proto : builder.build(); + viaProto = true; + return proto; + } + + @Override + public String getReason() { + YarnServiceProtos.UpdateContainerErrorProtoOrBuilder p = viaProto ? proto + : builder; + if (this.reason != null) { + return this.reason; + } + if (p.hasReason()) { + this.reason = p.getReason(); + } + return this.reason; + } + + @Override + public void setReason(String reason) { + maybeInitBuilder(); + if (reason == null) { + builder.clearReason(); + } + this.reason = reason; + } + + @Override + public UpdateContainerRequest getUpdateContainerRequest() { + YarnServiceProtos.UpdateContainerErrorProtoOrBuilder p = viaProto ? proto + : builder; + if (this.updateRequest != null) { + return this.updateRequest; + } + if (p.hasUpdateRequest()) { + this.updateRequest = + ProtoUtils.convertFromProtoFormat(p.getUpdateRequest()); + } + return this.updateRequest; + } + + @Override + public void setUpdateContainerRequest( + UpdateContainerRequest updateContainerRequest) { + maybeInitBuilder(); + if (updateContainerRequest == null) { + builder.clearUpdateRequest(); + } + this.updateRequest = updateContainerRequest; + } + + private void mergeLocalToProto() { + if (viaProto) { + maybeInitBuilder(); + } + mergeLocalToBuilder(); + proto = builder.build(); + viaProto = true; + } + + private void maybeInitBuilder() { + if (viaProto || builder == null) { + builder = YarnServiceProtos.UpdateContainerErrorProto.newBuilder(proto); + } + viaProto = false; + } + + private void mergeLocalToBuilder() { + if (this.reason != null) { + builder.setReason(this.reason); + } + if (this.updateRequest != null) { + builder.setUpdateRequest( + ProtoUtils.convertToProtoFormat(this.updateRequest)); + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/979b29a0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/UpdateContainerRequestPBImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/UpdateContainerRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/UpdateContainerRequestPBImpl.java new file mode 100644 index 0000000..934638b --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/UpdateContainerRequestPBImpl.java @@ -0,0 +1,166 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api.records.impl.pb; + +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerUpdateType; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.UpdateContainerRequest; +import org.apache.hadoop.yarn.proto.YarnServiceProtos; + +/** + * Implementation of UpdateContainerRequest. + */ +public class UpdateContainerRequestPBImpl extends UpdateContainerRequest { + private YarnServiceProtos.UpdateContainerRequestProto proto = + YarnServiceProtos.UpdateContainerRequestProto.getDefaultInstance(); + private YarnServiceProtos.UpdateContainerRequestProto.Builder builder = null; + private boolean viaProto = false; + + private ContainerId existingContainerId = null; + private Resource targetCapability = null; + + public UpdateContainerRequestPBImpl() { + builder = YarnServiceProtos.UpdateContainerRequestProto.newBuilder(); + } + + public UpdateContainerRequestPBImpl(YarnServiceProtos + .UpdateContainerRequestProto proto) { + this.proto = proto; + viaProto = true; + } + + public YarnServiceProtos.UpdateContainerRequestProto getProto() { + mergeLocalToProto(); + proto = viaProto ? proto : builder.build(); + viaProto = true; + return proto; + } + + @Override + public int getContainerVersion() { + YarnServiceProtos.UpdateContainerRequestProtoOrBuilder p = + viaProto ? proto : builder; + if (!p.hasContainerVersion()) { + return 0; + } + return p.getContainerVersion(); + } + + @Override + public void setContainerVersion(int containerVersion) { + maybeInitBuilder(); + builder.setContainerVersion(containerVersion); + } + + @Override + public ContainerId getContainerId() { + YarnServiceProtos.UpdateContainerRequestProtoOrBuilder p = + viaProto ? proto : builder; + if (this.existingContainerId != null) { + return this.existingContainerId; + } + if (p.hasContainerId()) { + this.existingContainerId = + ProtoUtils.convertFromProtoFormat(p.getContainerId()); + } + return this.existingContainerId; + } + + @Override + public void setContainerId(ContainerId containerId) { + maybeInitBuilder(); + if (containerId == null) { + builder.clearContainerId(); + } + this.existingContainerId = containerId; + } + + @Override + public Resource getCapability() { + YarnServiceProtos.UpdateContainerRequestProtoOrBuilder p = viaProto ? proto + : builder; + if (this.targetCapability != null) { + return this.targetCapability; + } + if (p.hasCapability()) { + this.targetCapability = + ProtoUtils.convertFromProtoFormat(p.getCapability()); + } + return this.targetCapability; + } + + @Override + public void setCapability(Resource capability) { + maybeInitBuilder(); + if (capability == null) { + builder.clearCapability(); + } + this.targetCapability = capability; + } + + @Override + public ContainerUpdateType getContainerUpdateType() { + YarnServiceProtos.UpdateContainerRequestProtoOrBuilder p = + viaProto ? proto : builder; + if (!p.hasUpdateType()) { + return null; + } + return ProtoUtils.convertFromProtoFormat(p.getUpdateType()); + } + + @Override + public void setContainerUpdateType(ContainerUpdateType updateType) { + maybeInitBuilder(); + if (updateType == null) { + builder.clearUpdateType(); + return; + } + builder.setUpdateType(ProtoUtils.convertToProtoFormat(updateType)); + } + + private void mergeLocalToProto() { + if (viaProto) { + maybeInitBuilder(); + } + mergeLocalToBuilder(); + proto = builder.build(); + viaProto = true; + } + + private void maybeInitBuilder() { + if (viaProto || builder == null) { + builder = YarnServiceProtos.UpdateContainerRequestProto.newBuilder(proto); + } + viaProto = false; + } + + private void mergeLocalToBuilder() { + if (this.existingContainerId != null) { + builder.setContainerId( + ProtoUtils.convertToProtoFormat(this.existingContainerId)); + } + if (this.targetCapability != null) { + builder.setCapability( + ProtoUtils.convertToProtoFormat(this.targetCapability)); + } + } + + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/979b29a0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/UpdatedContainerPBImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/UpdatedContainerPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/UpdatedContainerPBImpl.java new file mode 100644 index 0000000..0cd1903 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/UpdatedContainerPBImpl.java @@ -0,0 +1,117 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api.records.impl.pb; + +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerUpdateType; +import org.apache.hadoop.yarn.api.records.UpdatedContainer; +import org.apache.hadoop.yarn.proto.YarnServiceProtos; + +/** + * Implementation of UpdatedContainer. + */ +public class UpdatedContainerPBImpl extends UpdatedContainer { + private YarnServiceProtos.UpdatedContainerProto proto = + YarnServiceProtos.UpdatedContainerProto.getDefaultInstance(); + private YarnServiceProtos.UpdatedContainerProto.Builder builder = null; + private boolean viaProto = false; + + private Container container = null; + + public UpdatedContainerPBImpl() { + builder = YarnServiceProtos.UpdatedContainerProto.newBuilder(); + } + + public UpdatedContainerPBImpl(YarnServiceProtos.UpdatedContainerProto proto) { + this.proto = proto; + viaProto = true; + } + + private void mergeLocalToBuilder() { + if (this.container != null) { + builder.setContainer(ProtoUtils.convertToProtoFormat(this.container)); + } + } + + private void mergeLocalToProto() { + if (viaProto) { + maybeInitBuilder(); + } + mergeLocalToBuilder(); + proto = builder.build(); + viaProto = true; + } + + private void maybeInitBuilder() { + if (viaProto || builder == null) { + builder = YarnServiceProtos.UpdatedContainerProto.newBuilder(proto); + } + viaProto = false; + } + + public YarnServiceProtos.UpdatedContainerProto getProto() { + mergeLocalToProto(); + proto = viaProto ? proto : builder.build(); + viaProto = true; + return proto; + } + + @Override + public ContainerUpdateType getUpdateType() { + YarnServiceProtos.UpdatedContainerProtoOrBuilder p = + viaProto ? proto : builder; + if (!p.hasUpdateType()) { + return null; + } + return ProtoUtils.convertFromProtoFormat(p.getUpdateType()); + } + + @Override + public void setUpdateType(ContainerUpdateType updateType) { + maybeInitBuilder(); + if (updateType == null) { + builder.clearUpdateType(); + return; + } + builder.setUpdateType(ProtoUtils.convertToProtoFormat(updateType)); + } + + @Override + public Container getContainer() { + YarnServiceProtos.UpdatedContainerProtoOrBuilder p = + viaProto ? proto : builder; + if (this.container != null) { + return this.container; + } + if (!p.hasContainer()) { + return null; + } + this.container = ProtoUtils.convertFromProtoFormat(p.getContainer()); + return this.container; + } + + @Override + public void setContainer(Container container) { + maybeInitBuilder(); + if (container == null) { + builder.clearContainer(); + } + this.container = container; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/979b29a0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/ContainerTokenIdentifier.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/ContainerTokenIdentifier.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/ContainerTokenIdentifier.java index 106e6d5..ee90c05 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/ContainerTokenIdentifier.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/ContainerTokenIdentifier.java @@ -85,11 +85,22 @@ public class ContainerTokenIdentifier extends TokenIdentifier { long rmIdentifier, Priority priority, long creationTime, LogAggregationContext logAggregationContext, String nodeLabelExpression, ContainerType containerType) { + this(containerID, 0, hostName, appSubmitter, r, expiryTimeStamp, + masterKeyId, rmIdentifier, priority, creationTime, + logAggregationContext, nodeLabelExpression, containerType); + } + + public ContainerTokenIdentifier(ContainerId containerID, int containerVersion, + String hostName, String appSubmitter, Resource r, long expiryTimeStamp, + int masterKeyId, long rmIdentifier, Priority priority, long creationTime, + LogAggregationContext logAggregationContext, String nodeLabelExpression, + ContainerType containerType) { ContainerTokenIdentifierProto.Builder builder = ContainerTokenIdentifierProto.newBuilder(); if (containerID != null) { builder.setContainerId(((ContainerIdPBImpl)containerID).getProto()); } + builder.setVersion(containerVersion); builder.setNmHostAddr(hostName); builder.setAppSubmitter(appSubmitter); if (r != null) { @@ -171,7 +182,7 @@ public class ContainerTokenIdentifier extends TokenIdentifier { } /** - * Get the ContainerType of container to allocate + * Get the ContainerType of container to allocate. * @return ContainerType */ public ContainerType getContainerType(){ @@ -217,7 +228,18 @@ public class ContainerTokenIdentifier extends TokenIdentifier { return UserGroupInformation.createRemoteUser( containerId); } - + + /** + * Get the Container version + * @return container version + */ + public int getVersion() { + if (proto.hasVersion()) { + return proto.getVersion(); + } else { + return 0; + } + } /** * Get the node-label-expression in the original ResourceRequest */ http://git-wip-us.apache.org/repos/asf/hadoop/blob/979b29a0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/proto/yarn_security_token.proto ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/proto/yarn_security_token.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/proto/yarn_security_token.proto index 339e99e..60cda24 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/proto/yarn_security_token.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/proto/yarn_security_token.proto @@ -51,6 +51,7 @@ message ContainerTokenIdentifierProto { optional LogAggregationContextProto logAggregationContext = 10; optional string nodeLabelExpression = 11; optional ContainerTypeProto containerType = 12; + optional int32 version = 14 [default = 0]; } message ClientToAMTokenIdentifierProto { http://git-wip-us.apache.org/repos/asf/hadoop/blob/979b29a0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestPBImplRecords.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestPBImplRecords.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestPBImplRecords.java index 15d3aa3..7e3abf2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestPBImplRecords.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestPBImplRecords.java @@ -119,7 +119,6 @@ import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.api.records.ContainerReport; -import org.apache.hadoop.yarn.api.records.ContainerResourceChangeRequest; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.hadoop.yarn.api.records.LogAggregationContext; @@ -151,6 +150,9 @@ import org.apache.hadoop.yarn.api.records.SerializedException; import org.apache.hadoop.yarn.api.records.StrictPreemptionContract; import org.apache.hadoop.yarn.api.records.Token; import org.apache.hadoop.yarn.api.records.URL; +import org.apache.hadoop.yarn.api.records.UpdateContainerError; +import org.apache.hadoop.yarn.api.records.UpdateContainerRequest; +import org.apache.hadoop.yarn.api.records.UpdatedContainer; import org.apache.hadoop.yarn.api.records.YarnClusterMetrics; import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationAttemptIdPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationAttemptReportPBImpl; @@ -162,7 +164,6 @@ import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ContainerLaunchContextPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ContainerPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ContainerReportPBImpl; -import org.apache.hadoop.yarn.api.records.impl.pb.ContainerResourceChangeRequestPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ContainerStatusPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.LocalResourcePBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.NMTokenPBImpl; @@ -184,6 +185,7 @@ import org.apache.hadoop.yarn.api.records.impl.pb.SerializedExceptionPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.StrictPreemptionContractPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.TokenPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.URLPBImpl; +import org.apache.hadoop.yarn.api.records.impl.pb.UpdateContainerRequestPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.YarnClusterMetricsPBImpl; import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationAttemptIdProto; import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationAttemptReportProto; @@ -195,7 +197,6 @@ import org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProto; import org.apache.hadoop.yarn.proto.YarnProtos.ContainerLaunchContextProto; import org.apache.hadoop.yarn.proto.YarnProtos.ContainerProto; import org.apache.hadoop.yarn.proto.YarnProtos.ContainerReportProto; -import org.apache.hadoop.yarn.proto.YarnProtos.ContainerResourceChangeRequestProto; import org.apache.hadoop.yarn.proto.YarnProtos.ContainerStatusProto; import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto; import org.apache.hadoop.yarn.proto.YarnProtos.NodeIdProto; @@ -240,6 +241,8 @@ import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.Repla import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ReplaceLabelsOnNodeResponseProto; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.UpdateNodeResourceRequestProto; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.UpdateNodeResourceResponseProto; + +import org.apache.hadoop.yarn.proto.YarnServiceProtos; import org.apache.hadoop.yarn.proto.YarnServiceProtos.AllocateRequestProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.AllocateResponseProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.FinishApplicationMasterRequestProto; @@ -478,7 +481,8 @@ public class TestPBImplRecords { generateByNewInstance(ContainerLaunchContext.class); generateByNewInstance(ApplicationSubmissionContext.class); generateByNewInstance(ContainerReport.class); - generateByNewInstance(ContainerResourceChangeRequest.class); + generateByNewInstance(UpdateContainerRequest.class); + generateByNewInstance(UpdateContainerError.class); generateByNewInstance(IncreaseContainersResourceRequest.class); generateByNewInstance(IncreaseContainersResourceResponse.class); generateByNewInstance(ContainerStatus.class); @@ -490,6 +494,7 @@ public class TestPBImplRecords { generateByNewInstance(PreemptionMessage.class); generateByNewInstance(StartContainerRequest.class); generateByNewInstance(NodeLabel.class); + generateByNewInstance(UpdatedContainer.class); // genByNewInstance does not apply to QueueInfo, cause // it is recursive(has sub queues) typeValueCache.put(QueueInfo.class, QueueInfo.newInstance("root", 1.0f, @@ -981,9 +986,9 @@ public class TestPBImplRecords { } @Test - public void testContainerResourceChangeRequestPBImpl() throws Exception { - validatePBImplRecord(ContainerResourceChangeRequestPBImpl.class, - ContainerResourceChangeRequestProto.class); + public void testUpdateContainerRequestPBImpl() throws Exception { + validatePBImplRecord(UpdateContainerRequestPBImpl.class, + YarnServiceProtos.UpdateContainerRequestProto.class); } @Test http://git-wip-us.apache.org/repos/asf/hadoop/blob/979b29a0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/security/TestYARNTokenIdentifier.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/security/TestYARNTokenIdentifier.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/security/TestYARNTokenIdentifier.java index 68f0b9d..a6f948d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/security/TestYARNTokenIdentifier.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/security/TestYARNTokenIdentifier.java @@ -152,7 +152,7 @@ public class TestYARNTokenIdentifier { long creationTime = 1000; ContainerTokenIdentifier token = new ContainerTokenIdentifier( - containerID, hostName, appSubmitter, r, expiryTimeStamp, + containerID, hostName, appSubmitter, r, expiryTimeStamp, masterKeyId, rmIdentifier, priority, creationTime); ContainerTokenIdentifier anotherToken = new ContainerTokenIdentifier(); @@ -385,7 +385,7 @@ public class TestYARNTokenIdentifier { anotherToken.getContainerType()); token = - new ContainerTokenIdentifier(containerID, hostName, appSubmitter, r, + new ContainerTokenIdentifier(containerID, 0, hostName, appSubmitter, r, expiryTimeStamp, masterKeyId, rmIdentifier, priority, creationTime, null, CommonNodeLabelsManager.NO_LABEL, ContainerType.TASK); http://git-wip-us.apache.org/repos/asf/hadoop/blob/979b29a0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NMContainerStatus.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NMContainerStatus.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NMContainerStatus.java index 4067c11..ed950ce 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NMContainerStatus.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NMContainerStatus.java @@ -35,21 +35,22 @@ public abstract class NMContainerStatus { // Used by tests only public static NMContainerStatus newInstance(ContainerId containerId, - ContainerState containerState, Resource allocatedResource, + int version, ContainerState containerState, Resource allocatedResource, String diagnostics, int containerExitStatus, Priority priority, long creationTime) { - return newInstance(containerId, containerState, allocatedResource, + return newInstance(containerId, version, containerState, allocatedResource, diagnostics, containerExitStatus, priority, creationTime, CommonNodeLabelsManager.NO_LABEL); } public static NMContainerStatus newInstance(ContainerId containerId, - ContainerState containerState, Resource allocatedResource, + int version, ContainerState containerState, Resource allocatedResource, String diagnostics, int containerExitStatus, Priority priority, long creationTime, String nodeLabelExpression) { NMContainerStatus status = Records.newRecord(NMContainerStatus.class); status.setContainerId(containerId); + status.setVersion(version); status.setContainerState(containerState); status.setAllocatedResource(allocatedResource); status.setDiagnostics(diagnostics); @@ -125,4 +126,12 @@ public abstract class NMContainerStatus { public abstract void setNodeLabelExpression( String nodeLabelExpression); + + public int getVersion() { + return 0; + } + + public void setVersion(int version) { + + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/979b29a0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NMContainerStatusPBImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NMContainerStatusPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NMContainerStatusPBImpl.java index 921c9d9..2d1046f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NMContainerStatusPBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NMContainerStatusPBImpl.java @@ -83,6 +83,7 @@ public class NMContainerStatusPBImpl extends NMContainerStatus { StringBuilder sb = new StringBuilder(); sb.append("[").append(getContainerId()).append(", ") .append("CreateTime: ").append(getCreationTime()).append(", ") + .append("Version: ").append(getVersion()).append(", ") .append("State: ").append(getContainerState()).append(", ") .append("Capability: ").append(getAllocatedResource()).append(", ") .append("Diagnostics: ").append(getDiagnostics()).append(", ") @@ -185,6 +186,18 @@ public class NMContainerStatusPBImpl extends NMContainerStatus { } @Override + public int getVersion() { + NMContainerStatusProtoOrBuilder p = viaProto ? proto : builder; + return p.getVersion(); + } + + @Override + public void setVersion(int version) { + maybeInitBuilder(); + builder.setVersion(version); + } + + @Override public Priority getPriority() { NMContainerStatusProtoOrBuilder p = viaProto ? proto : builder; if (this.priority != null) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/979b29a0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java index 9ad1d4d..14caedc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java @@ -62,9 +62,11 @@ import org.apache.hadoop.yarn.api.records.URL; import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; +import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager; import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.yarn.server.api.ContainerType; /** * Builder utilities to construct various objects. @@ -154,12 +156,14 @@ public class BuilderUtils { return cId; } - public static Token newContainerToken(ContainerId cId, String host, - int port, String user, Resource r, long expiryTime, int masterKeyId, - byte[] password, long rmIdentifier) throws IOException { + public static Token newContainerToken(ContainerId cId, int containerVersion, + String host, int port, String user, Resource r, long expiryTime, + int masterKeyId, byte[] password, long rmIdentifier) throws IOException { ContainerTokenIdentifier identifier = - new ContainerTokenIdentifier(cId, host + ":" + port, user, r, - expiryTime, masterKeyId, rmIdentifier, Priority.newInstance(0), 0); + new ContainerTokenIdentifier(cId, containerVersion, host + ":" + port, + user, r, expiryTime, masterKeyId, rmIdentifier, + Priority.newInstance(0), 0, null, CommonNodeLabelsManager.NO_LABEL, + ContainerType.TASK); return newContainerToken(BuilderUtils.newNodeId(host, port), password, identifier); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/979b29a0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto index a54bbdb..4c03864 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto @@ -101,6 +101,7 @@ message NMContainerStatusProto { optional int32 container_exit_status = 6; optional int64 creation_time = 7; optional string nodeLabelExpression = 8; + optional int32 version = 9; } message SCMUploaderNotifyRequestProto { http://git-wip-us.apache.org/repos/asf/hadoop/blob/979b29a0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/api/protocolrecords/TestProtocolRecords.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/api/protocolrecords/TestProtocolRecords.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/api/protocolrecords/TestProtocolRecords.java index 86e49f0..68f9bc8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/api/protocolrecords/TestProtocolRecords.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/api/protocolrecords/TestProtocolRecords.java @@ -55,7 +55,7 @@ public class TestProtocolRecords { Resource resource = Resource.newInstance(1000, 200); NMContainerStatus report = - NMContainerStatus.newInstance(containerId, + NMContainerStatus.newInstance(containerId, 0, ContainerState.COMPLETE, resource, "diagnostics", ContainerExitStatus.ABORTED, Priority.newInstance(10), 1234); NMContainerStatus reportProto = @@ -79,7 +79,7 @@ public class TestProtocolRecords { ContainerId containerId = ContainerId.newContainerId(attemptId, 1); NMContainerStatus containerReport = - NMContainerStatus.newInstance(containerId, + NMContainerStatus.newInstance(containerId, 0, ContainerState.RUNNING, Resource.newInstance(1024, 1), "diagnostics", 0, Priority.newInstance(10), 1234); List reports = Arrays.asList(containerReport); http://git-wip-us.apache.org/repos/asf/hadoop/blob/979b29a0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/api/protocolrecords/TestRegisterNodeManagerRequest.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/api/protocolrecords/TestRegisterNodeManagerRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/api/protocolrecords/TestRegisterNodeManagerRequest.java index 947dec1..9f91b87 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/api/protocolrecords/TestRegisterNodeManagerRequest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/api/protocolrecords/TestRegisterNodeManagerRequest.java @@ -40,7 +40,7 @@ public class TestRegisterNodeManagerRequest { "version", Arrays.asList(NMContainerStatus.newInstance( ContainerId.newContainerId( ApplicationAttemptId.newInstance( - ApplicationId.newInstance(1234L, 1), 1), 1), + ApplicationId.newInstance(1234L, 1), 1), 1), 0, ContainerState.RUNNING, Resource.newInstance(1024, 1), "good", -1, Priority.newInstance(0), 1234)), Arrays.asList( ApplicationId.newInstance(1234L, 1), http://git-wip-us.apache.org/repos/asf/hadoop/blob/979b29a0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java index 8ff1e73..0e9d86a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java @@ -345,8 +345,7 @@ public class ContainerManagerImpl extends CompositeService implements YarnServerSecurityUtils.parseCredentials(launchContext); Container container = new ContainerImpl(getConfig(), dispatcher, req.getContainerLaunchContext(), - credentials, metrics, token, rcs.getStatus(), rcs.getExitCode(), - rcs.getDiagnostics(), rcs.getKilled(), rcs.getCapability(), context); + credentials, metrics, token, context, rcs); context.getContainers().put(containerId, container); dispatcher.getEventHandler().handle( new ApplicationContainerInitEvent(container)); @@ -937,7 +936,8 @@ public class ContainerManagerImpl extends CompositeService implements logAggregationContext)); } - this.context.getNMStateStore().storeContainer(containerId, request); + this.context.getNMStateStore().storeContainer(containerId, + containerTokenIdentifier.getVersion(), request); dispatcher.getEventHandler().handle( new ApplicationContainerInitEvent(container)); @@ -1021,7 +1021,8 @@ public class ContainerManagerImpl extends CompositeService implements // an updated NMToken. updateNMTokenIdentifier(nmTokenIdentifier); Resource resource = containerTokenIdentifier.getResource(); - changeContainerResourceInternal(containerId, resource, true); + changeContainerResourceInternal(containerId, + containerTokenIdentifier.getVersion(), resource, true); successfullyIncreasedContainers.add(containerId); } catch (YarnException | InvalidToken e) { failedContainers.put(containerId, SerializedException.newInstance(e)); @@ -1035,8 +1036,8 @@ public class ContainerManagerImpl extends CompositeService implements } @SuppressWarnings("unchecked") - private void changeContainerResourceInternal( - ContainerId containerId, Resource targetResource, boolean increase) + private void changeContainerResourceInternal(ContainerId containerId, + int containerVersion, Resource targetResource, boolean increase) throws YarnException, IOException { Container container = context.getContainers().get(containerId); // Check container existence @@ -1103,7 +1104,7 @@ public class ContainerManagerImpl extends CompositeService implements if (!serviceStopped) { // Persist container resource change for recovery this.context.getNMStateStore().storeContainerResourceChanged( - containerId, targetResource); + containerId, containerVersion, targetResource); getContainersMonitor().handle( new ChangeMonitoringContainerResourceEvent( containerId, targetResource)); @@ -1338,7 +1339,7 @@ public class ContainerManagerImpl extends CompositeService implements : containersDecreasedEvent.getContainersToDecrease()) { try { changeContainerResourceInternal(container.getId(), - container.getResource(), false); + container.getVersion(), container.getResource(), false); } catch (YarnException e) { LOG.error("Unable to decrease container resource", e); } catch (IOException e) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/979b29a0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java index 74f581b..5891682 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java @@ -70,6 +70,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.Contai import org.apache.hadoop.yarn.server.nodemanager.Context; import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService; +import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerState; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerStatus; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.state.InvalidStateTransitionException; @@ -78,7 +79,6 @@ import org.apache.hadoop.yarn.state.SingleArcTransition; import org.apache.hadoop.yarn.state.StateMachine; import org.apache.hadoop.yarn.state.StateMachineFactory; import org.apache.hadoop.yarn.util.Clock; -import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.hadoop.yarn.util.SystemClock; import org.apache.hadoop.yarn.util.resource.Resources; @@ -95,6 +95,7 @@ public class ContainerImpl implements Container { private final ContainerId containerId; private volatile Resource resource; private final String user; + private int version; private int exitCode = ContainerExitStatus.INVALID; private final StringBuilder diagnostics; private boolean wasLaunched; @@ -135,6 +136,7 @@ public class ContainerImpl implements Container { this.daemonConf = conf; this.dispatcher = dispatcher; this.stateStore = context.getNMStateStore(); + this.version = containerTokenIdentifier.getVersion(); this.launchContext = launchContext; this.containerTokenIdentifier = containerTokenIdentifier; this.containerId = containerTokenIdentifier.getContainerID(); @@ -155,22 +157,22 @@ public class ContainerImpl implements Container { public ContainerImpl(Configuration conf, Dispatcher dispatcher, ContainerLaunchContext launchContext, Credentials creds, NodeManagerMetrics metrics, - ContainerTokenIdentifier containerTokenIdentifier, - RecoveredContainerStatus recoveredStatus, int exitCode, - String diagnostics, boolean wasKilled, Resource recoveredCapability, - Context context) { + ContainerTokenIdentifier containerTokenIdentifier, Context context, + RecoveredContainerState rcs) { this(conf, dispatcher, launchContext, creds, metrics, containerTokenIdentifier, context); - this.recoveredStatus = recoveredStatus; - this.exitCode = exitCode; - this.recoveredAsKilled = wasKilled; + this.recoveredStatus = rcs.getStatus(); + this.exitCode = rcs.getExitCode(); + this.recoveredAsKilled = rcs.getKilled(); this.diagnostics.append(diagnostics); + Resource recoveredCapability = rcs.getCapability(); if (recoveredCapability != null && !this.resource.equals(recoveredCapability)) { // resource capability had been updated before NM was down this.resource = Resource.newInstance(recoveredCapability.getMemorySize(), recoveredCapability.getVirtualCores()); } + this.version = rcs.getVersion(); } private static final ContainerDiagnosticsUpdateTransition UPDATE_DIAGNOSTICS_TRANSITION = @@ -446,8 +448,8 @@ public class ContainerImpl implements Container { public NMContainerStatus getNMContainerStatus() { this.readLock.lock(); try { - return NMContainerStatus.newInstance(this.containerId, getCurrentState(), - getResource(), diagnostics.toString(), exitCode, + return NMContainerStatus.newInstance(this.containerId, this.version, + getCurrentState(), getResource(), diagnostics.toString(), exitCode, containerTokenIdentifier.getPriority(), containerTokenIdentifier.getCreationTime(), containerTokenIdentifier.getNodeLabelExpression()); http://git-wip-us.apache.org/repos/asf/hadoop/blob/979b29a0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java index e8708c6..3d23334 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java @@ -104,6 +104,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService { private static final String CONTAINERS_KEY_PREFIX = "ContainerManager/containers/"; private static final String CONTAINER_REQUEST_KEY_SUFFIX = "/request"; + private static final String CONTAINER_VERSION_KEY_SUFFIX = "/version"; private static final String CONTAINER_DIAGS_KEY_SUFFIX = "/diagnostics"; private static final String CONTAINER_LAUNCHED_KEY_SUFFIX = "/launched"; private static final String CONTAINER_RESOURCE_CHANGED_KEY_SUFFIX = @@ -233,6 +234,8 @@ public class NMLeveldbStateStoreService extends NMStateStoreService { if (suffix.equals(CONTAINER_REQUEST_KEY_SUFFIX)) { rcs.startRequest = new StartContainerRequestPBImpl( StartContainerRequestProto.parseFrom(entry.getValue())); + } else if (suffix.equals(CONTAINER_VERSION_KEY_SUFFIX)) { + rcs.version = Integer.parseInt(asString(entry.getValue())); } else if (suffix.equals(CONTAINER_DIAGS_KEY_SUFFIX)) { rcs.diagnostics = asString(entry.getValue()); } else if (suffix.equals(CONTAINER_LAUNCHED_KEY_SUFFIX)) { @@ -255,13 +258,27 @@ public class NMLeveldbStateStoreService extends NMStateStoreService { } @Override - public void storeContainer(ContainerId containerId, + public void storeContainer(ContainerId containerId, int containerVersion, StartContainerRequest startRequest) throws IOException { - String key = CONTAINERS_KEY_PREFIX + containerId.toString() + if (LOG.isDebugEnabled()) { + LOG.debug("storeContainer: containerId= " + containerId + + ", startRequest= " + startRequest); + } + String keyRequest = CONTAINERS_KEY_PREFIX + containerId.toString() + CONTAINER_REQUEST_KEY_SUFFIX; + String keyVersion = CONTAINERS_KEY_PREFIX + containerId.toString() + + CONTAINER_VERSION_KEY_SUFFIX; try { - db.put(bytes(key), - ((StartContainerRequestPBImpl) startRequest).getProto().toByteArray()); + WriteBatch batch = db.createWriteBatch(); + try { + batch.put(bytes(keyRequest), + ((StartContainerRequestPBImpl) startRequest) + .getProto().toByteArray()); + batch.put(bytes(keyVersion), bytes(Integer.toString(containerVersion))); + db.write(batch); + } finally { + batch.close(); + } } catch (DBException e) { throw new IOException(e); } @@ -293,13 +310,27 @@ public class NMLeveldbStateStoreService extends NMStateStoreService { @Override public void storeContainerResourceChanged(ContainerId containerId, - Resource capability) throws IOException { - String key = CONTAINERS_KEY_PREFIX + containerId.toString() + int containerVersion, Resource capability) throws IOException { + if (LOG.isDebugEnabled()) { + LOG.debug("storeContainerResourceChanged: containerId=" + containerId + + ", capability=" + capability); + } + + String keyResChng = CONTAINERS_KEY_PREFIX + containerId.toString() + CONTAINER_RESOURCE_CHANGED_KEY_SUFFIX; + String keyVersion = CONTAINERS_KEY_PREFIX + containerId.toString() + + CONTAINER_VERSION_KEY_SUFFIX; try { - // New value will overwrite old values for the same key - db.put(bytes(key), - ((ResourcePBImpl) capability).getProto().toByteArray()); + WriteBatch batch = db.createWriteBatch(); + try { + // New value will overwrite old values for the same key + batch.put(bytes(keyResChng), + ((ResourcePBImpl) capability).getProto().toByteArray()); + batch.put(bytes(keyVersion), bytes(Integer.toString(containerVersion))); + db.write(batch); + } finally { + batch.close(); + } } catch (DBException e) { throw new IOException(e); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/979b29a0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java index a887e71..6ff9151 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java @@ -70,7 +70,7 @@ public class NMNullStateStoreService extends NMStateStoreService { } @Override - public void storeContainer(ContainerId containerId, + public void storeContainer(ContainerId containerId, int version, StartContainerRequest startRequest) throws IOException { } @@ -86,7 +86,7 @@ public class NMNullStateStoreService extends NMStateStoreService { @Override public void storeContainerResourceChanged(ContainerId containerId, - Resource capability) throws IOException { + int version, Resource capability) throws IOException { } @Override http://git-wip-us.apache.org/repos/asf/hadoop/blob/979b29a0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java index a9e8a8a..9fca5be 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java @@ -72,6 +72,7 @@ public abstract class NMStateStoreService extends AbstractService { String diagnostics = ""; StartContainerRequest startRequest; Resource capability; + int version; public RecoveredContainerStatus getStatus() { return status; @@ -89,6 +90,10 @@ public abstract class NMStateStoreService extends AbstractService { return diagnostics; } + public int getVersion() { + return version; + } + public StartContainerRequest getStartRequest() { return startRequest; } @@ -96,6 +101,18 @@ public abstract class NMStateStoreService extends AbstractService { public Resource getCapability() { return capability; } + + @Override + public String toString() { + return new StringBuffer("Status: ").append(getStatus()) + .append(", Exit code: ").append(exitCode) + .append(", Version: ").append(version) + .append(", Killed: ").append(getKilled()) + .append(", Diagnostics: ").append(getDiagnostics()) + .append(", Capability: ").append(getCapability()) + .append(", StartRequest: ").append(getStartRequest()) + .toString(); + } } public static class LocalResourceTrackerState { @@ -263,11 +280,13 @@ public abstract class NMStateStoreService extends AbstractService { /** * Record a container start request * @param containerId the container ID + * @param containerVersion the container Version * @param startRequest the container start request * @throws IOException */ public abstract void storeContainer(ContainerId containerId, - StartContainerRequest startRequest) throws IOException; + int containerVersion, StartContainerRequest startRequest) + throws IOException; /** * Record that a container has been launched @@ -280,11 +299,12 @@ public abstract class NMStateStoreService extends AbstractService { /** * Record that a container resource has been changed * @param containerId the container ID + * @param containerVersion the container version * @param capability the container resource capability * @throws IOException */ public abstract void storeContainerResourceChanged(ContainerId containerId, - Resource capability) throws IOException; + int containerVersion, Resource capability) throws IOException; /** * Record that a container has completed http://git-wip-us.apache.org/repos/asf/hadoop/blob/979b29a0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerResync.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerResync.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerResync.java index ee2677c..f6593f9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerResync.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerResync.java @@ -875,7 +875,7 @@ public class TestNodeManagerResync { ApplicationAttemptId.newInstance(applicationId, 1); ContainerId containerId = ContainerId.newContainerId(applicationAttemptId, id); NMContainerStatus containerReport = - NMContainerStatus.newInstance(containerId, containerState, + NMContainerStatus.newInstance(containerId, 0, containerState, Resource.newInstance(1024, 1), "recover container", 0, Priority.newInstance(10), 0); return containerReport; http://git-wip-us.apache.org/repos/asf/hadoop/blob/979b29a0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java index 3d72375..c348d16 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java @@ -249,7 +249,7 @@ public class TestNodeStatusUpdater { String user = "testUser"; ContainerTokenIdentifier containerToken = BuilderUtils .newContainerTokenIdentifier(BuilderUtils.newContainerToken( - firstContainerID, InetAddress.getByName("localhost") + firstContainerID, 0, InetAddress.getByName("localhost") .getCanonicalHostName(), 1234, user, resource, currentTime + 10000, 123, "password".getBytes(), currentTime)); Context context = mock(Context.class); @@ -290,7 +290,7 @@ public class TestNodeStatusUpdater { Resource resource = BuilderUtils.newResource(3, 1); ContainerTokenIdentifier containerToken = BuilderUtils .newContainerTokenIdentifier(BuilderUtils.newContainerToken( - secondContainerID, InetAddress.getByName("localhost") + secondContainerID, 0, InetAddress.getByName("localhost") .getCanonicalHostName(), 1234, user, resource, currentTime + 10000, 123, "password".getBytes(), currentTime)); Context context = mock(Context.class); @@ -1009,7 +1009,7 @@ public class TestNodeStatusUpdater { ContainerId cId = ContainerId.newContainerId(appAttemptId, 1); Token containerToken = - BuilderUtils.newContainerToken(cId, "anyHost", 1234, "anyUser", + BuilderUtils.newContainerToken(cId, 0, "anyHost", 1234, "anyUser", BuilderUtils.newResource(1024, 1), 0, 123, "password".getBytes(), 0); Container anyCompletedContainer = new ContainerImpl(conf, null, @@ -1031,7 +1031,7 @@ public class TestNodeStatusUpdater { ContainerId runningContainerId = ContainerId.newContainerId(appAttemptId, 3); Token runningContainerToken = - BuilderUtils.newContainerToken(runningContainerId, "anyHost", + BuilderUtils.newContainerToken(runningContainerId, 0, "anyHost", 1234, "anyUser", BuilderUtils.newResource(1024, 1), 0, 123, "password".getBytes(), 0); Container runningContainer = @@ -1090,7 +1090,7 @@ public class TestNodeStatusUpdater { ApplicationAttemptId.newInstance(appId, 0); ContainerId containerId = ContainerId.newContainerId(appAttemptId, 1); Token containerToken = - BuilderUtils.newContainerToken(containerId, "host", 1234, "user", + BuilderUtils.newContainerToken(containerId, 0, "host", 1234, "user", BuilderUtils.newResource(1024, 1), 0, 123, "password".getBytes(), 0); Container completedContainer = new ContainerImpl(conf, null, @@ -1128,7 +1128,7 @@ public class TestNodeStatusUpdater { ContainerId cId = ContainerId.newContainerId(appAttemptId, 1); Token containerToken = - BuilderUtils.newContainerToken(cId, "anyHost", 1234, "anyUser", + BuilderUtils.newContainerToken(cId, 0, "anyHost", 1234, "anyUser", BuilderUtils.newResource(1024, 1), 0, 123, "password".getBytes(), 0); Container anyCompletedContainer = new ContainerImpl(conf, null, --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org For additional commands, e-mail: common-commits-help@hadoop.apache.org