hadoop-yarn-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sa...@apache.org
Subject svn commit: r1549627 - in /hadoop/common/trunk/hadoop-yarn-project: ./ hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/ hadoop-yarn/hadoop-yarn-api/src/main/proto/ hadoop-yarn/hadoop-yarn-common/src/main/java/org/ap...
Date Mon, 09 Dec 2013 17:44:29 GMT
Author: sandy
Date: Mon Dec  9 17:44:29 2013
New Revision: 1549627

URL: http://svn.apache.org/r1549627
Log:
YARN-1448. AM-RM protocol changes to support container resizing (Wangda Tan via Sandy Ryza)

Added:
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestAllocateRequest.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestAllocateResponse.java
Modified:
    hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateRequest.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateRequestPBImpl.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java

Modified: hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt?rev=1549627&r1=1549626&r2=1549627&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt Mon Dec  9 17:44:29 2013
@@ -46,6 +46,9 @@ Release 2.4.0 - UNRELEASED
     YARN-1447. Common PB type definitions for container resizing. (Wangda Tan
     via Sandy Ryza)
 
+    YARN-1448. AM-RM protocol changes to support container resizing (Wangda Tan
+    via Sandy Ryza)
+
   IMPROVEMENTS
 
     YARN-7. Support CPU resource for DistributedShell. (Junping Du via llu)

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateRequest.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateRequest.java?rev=1549627&r1=1549626&r2=1549627&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateRequest.java
(original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateRequest.java
Mon Dec  9 17:44:29 2013
@@ -26,6 +26,7 @@ import org.apache.hadoop.yarn.api.Applic
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest;
+import org.apache.hadoop.yarn.api.records.ContainerResourceIncreaseRequest;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.util.Records;
 
@@ -60,12 +61,24 @@ public abstract class AllocateRequest {
       List<ResourceRequest> resourceAsk,
       List<ContainerId> containersToBeReleased,
       ResourceBlacklistRequest resourceBlacklistRequest) {
+    return newInstance(responseID, appProgress, resourceAsk,
+        containersToBeReleased, resourceBlacklistRequest, null);
+  }
+  
+  @Public
+  @Stable
+  public static AllocateRequest newInstance(int responseID, float appProgress,
+      List<ResourceRequest> resourceAsk,
+      List<ContainerId> containersToBeReleased,
+      ResourceBlacklistRequest resourceBlacklistRequest,
+      List<ContainerResourceIncreaseRequest> increaseRequests) {
     AllocateRequest allocateRequest = Records.newRecord(AllocateRequest.class);
     allocateRequest.setResponseId(responseID);
     allocateRequest.setProgress(appProgress);
     allocateRequest.setAskList(resourceAsk);
     allocateRequest.setReleaseList(containersToBeReleased);
     allocateRequest.setResourceBlacklistRequest(resourceBlacklistRequest);
+    allocateRequest.setIncreaseRequests(increaseRequests);
     return allocateRequest;
   }
   
@@ -170,4 +183,22 @@ public abstract class AllocateRequest {
   @Stable
   public abstract void setResourceBlacklistRequest(
       ResourceBlacklistRequest resourceBlacklistRequest);
+  
+  /**
+   * Get the <code>ContainerResourceIncreaseRequest</code> being sent by the
+   * <code>ApplicationMaster</code>
+   */
+  @Public
+  @Stable
+  public abstract List<ContainerResourceIncreaseRequest> getIncreaseRequests();
+  
+  /**
+   * Set the <code>ContainerResourceIncreaseRequest</code> to inform the
+   * <code>ResourceManager</code> about some container's resources need to be
+   * increased
+   */
+  @Public
+  @Stable
+  public abstract void setIncreaseRequests(
+      List<ContainerResourceIncreaseRequest> increaseRequests);
 }

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java?rev=1549627&r1=1549626&r2=1549627&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java
(original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java
Mon Dec  9 17:44:29 2013
@@ -28,6 +28,8 @@ import org.apache.hadoop.classification.
 import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
 import org.apache.hadoop.yarn.api.records.AMCommand;
 import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerResourceDecrease;
+import org.apache.hadoop.yarn.api.records.ContainerResourceIncrease;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.NMToken;
 import org.apache.hadoop.yarn.api.records.NodeReport;
@@ -82,6 +84,23 @@ public abstract class AllocateResponse {
     response.setNMTokens(nmTokens);
     return response;
   }
+  
+  @Public
+  @Stable
+  public static AllocateResponse newInstance(int responseId,
+      List<ContainerStatus> completedContainers,
+      List<Container> allocatedContainers, List<NodeReport> updatedNodes,
+      Resource availResources, AMCommand command, int numClusterNodes,
+      PreemptionMessage preempt, List<NMToken> nmTokens,
+      List<ContainerResourceIncrease> increasedContainers,
+      List<ContainerResourceDecrease> decreasedContainers) {
+    AllocateResponse response = newInstance(responseId, completedContainers,
+        allocatedContainers, updatedNodes, availResources, command,
+        numClusterNodes, preempt, nmTokens);
+    response.setIncreasedContainers(increasedContainers);
+    response.setDecreasedContainers(decreasedContainers);
+    return response;
+  }
 
   /**
    * If the <code>ResourceManager</code> needs the
@@ -221,4 +240,34 @@ public abstract class AllocateResponse {
   @Private
   @Unstable
   public abstract void setNMTokens(List<NMToken> nmTokens);
+  
+  /**
+   * Get the list of newly increased containers by <code>ResourceManager</code>
+   */
+  @Public
+  @Stable
+  public abstract List<ContainerResourceIncrease> getIncreasedContainers();
+
+  /**
+   * Set the list of newly increased containers by <code>ResourceManager</code>
+   */
+  @Private
+  @Unstable
+  public abstract void setIncreasedContainers(
+      List<ContainerResourceIncrease> increasedContainers);
+
+  /**
+   * Get the list of newly decreased containers by <code>NodeManager</code>
+   */
+  @Public
+  @Stable
+  public abstract List<ContainerResourceDecrease> getDecreasedContainers();
+
+  /**
+   * Set the list of newly decreased containers by <code>NodeManager</code>
+   */
+  @Private
+  @Unstable
+  public abstract void setDecreasedContainers(
+      List<ContainerResourceDecrease> decreasedContainers);
 }

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto?rev=1549627&r1=1549626&r2=1549627&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto
(original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto
Mon Dec  9 17:44:29 2013
@@ -62,6 +62,7 @@ message AllocateRequestProto {
   optional ResourceBlacklistRequestProto blacklist_request = 3;
   optional int32 response_id = 4;
   optional float progress = 5;
+  repeated ContainerResourceIncreaseRequestProto increase_request = 6;
 }
 
 message NMTokenProto {
@@ -79,6 +80,8 @@ message AllocateResponseProto {
   optional int32 num_cluster_nodes = 7;
   optional PreemptionMessageProto preempt = 8;
   repeated NMTokenProto nm_tokens = 9;
+  repeated ContainerResourceIncreaseProto increased_containers = 10;
+  repeated ContainerResourceDecreaseProto decreased_containers = 11;
 }
 
 //////////////////////////////////////////////////////

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateRequestPBImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateRequestPBImpl.java?rev=1549627&r1=1549626&r2=1549627&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateRequestPBImpl.java
(original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateRequestPBImpl.java
Mon Dec  9 17:44:29 2013
@@ -27,12 +27,15 @@ import org.apache.hadoop.classification.
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
 import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerResourceIncreaseRequest;
 import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl;
+import org.apache.hadoop.yarn.api.records.impl.pb.ContainerResourceIncreaseRequestPBImpl;
 import org.apache.hadoop.yarn.api.records.impl.pb.ResourceBlacklistRequestPBImpl;
 import org.apache.hadoop.yarn.api.records.impl.pb.ResourceRequestPBImpl;
 import org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProto;
+import org.apache.hadoop.yarn.proto.YarnProtos.ContainerResourceIncreaseRequestProto;
 import org.apache.hadoop.yarn.proto.YarnProtos.ResourceBlacklistRequestProto;
 import org.apache.hadoop.yarn.proto.YarnProtos.ResourceRequestProto;
 import org.apache.hadoop.yarn.proto.YarnServiceProtos.AllocateRequestProto;
@@ -49,9 +52,9 @@ public class AllocateRequestPBImpl exten
 
   private List<ResourceRequest> ask = null;
   private List<ContainerId> release = null;
+  private List<ContainerResourceIncreaseRequest> increaseRequests = null;
   private ResourceBlacklistRequest blacklistRequest = null;
   
-  
   public AllocateRequestPBImpl() {
     builder = AllocateRequestProto.newBuilder();
   }
@@ -62,7 +65,7 @@ public class AllocateRequestPBImpl exten
   }
   
   public AllocateRequestProto getProto() {
-      mergeLocalToProto();
+    mergeLocalToProto();
     proto = viaProto ? proto : builder.build();
     viaProto = true;
     return proto;
@@ -95,6 +98,9 @@ public class AllocateRequestPBImpl exten
     if (this.release != null) {
       addReleasesToProto();
     }
+    if (this.increaseRequests != null) {
+      addIncreaseRequestsToProto();
+    }
     if (this.blacklistRequest != null) {
       builder.setBlacklistRequest(convertToProtoFormat(this.blacklistRequest));
     }
@@ -156,6 +162,23 @@ public class AllocateRequestPBImpl exten
   }
   
   @Override
+  public List<ContainerResourceIncreaseRequest> getIncreaseRequests() {
+    initIncreaseRequests();
+    return this.increaseRequests;
+  }
+
+  @Override
+  public void setIncreaseRequests(
+      List<ContainerResourceIncreaseRequest> increaseRequests) {
+    if (increaseRequests == null) {
+      return;
+    }
+    initIncreaseRequests();
+    this.increaseRequests.clear();
+    this.increaseRequests.addAll(increaseRequests);
+  }
+  
+  @Override
   public ResourceBlacklistRequest getResourceBlacklistRequest() {
     AllocateRequestProtoOrBuilder p = viaProto ? proto : builder;
     if (this.blacklistRequest != null) {
@@ -223,6 +246,57 @@ public class AllocateRequestPBImpl exten
     };
     builder.addAllAsk(iterable);
   }
+  
+  private void initIncreaseRequests() {
+    if (this.increaseRequests != null) {
+      return;
+    }
+    AllocateRequestProtoOrBuilder p = viaProto ? proto : builder;
+    List<ContainerResourceIncreaseRequestProto> list =
+        p.getIncreaseRequestList();
+    this.increaseRequests = new ArrayList<ContainerResourceIncreaseRequest>();
+
+    for (ContainerResourceIncreaseRequestProto c : list) {
+      this.increaseRequests.add(convertFromProtoFormat(c));
+    }
+  }
+  
+  private void addIncreaseRequestsToProto() {
+    maybeInitBuilder();
+    builder.clearIncreaseRequest();
+    if (increaseRequests == null) {
+      return;
+    }
+    Iterable<ContainerResourceIncreaseRequestProto> iterable =
+        new Iterable<ContainerResourceIncreaseRequestProto>() {
+          @Override
+          public Iterator<ContainerResourceIncreaseRequestProto> iterator() {
+            return new Iterator<ContainerResourceIncreaseRequestProto>() {
+
+              Iterator<ContainerResourceIncreaseRequest> iter =
+                  increaseRequests.iterator();
+
+              @Override
+              public boolean hasNext() {
+                return iter.hasNext();
+              }
+
+              @Override
+              public ContainerResourceIncreaseRequestProto next() {
+                return convertToProtoFormat(iter.next());
+              }
+
+              @Override
+              public void remove() {
+                throw new UnsupportedOperationException();
+              }
+            };
+
+          }
+        };
+    builder.addAllIncreaseRequest(iterable);
+  }
+  
   @Override
   public List<ContainerId> getReleaseList() {
     initReleases();
@@ -292,6 +366,16 @@ public class AllocateRequestPBImpl exten
   private ResourceRequestProto convertToProtoFormat(ResourceRequest t) {
     return ((ResourceRequestPBImpl)t).getProto();
   }
+  
+  private ContainerResourceIncreaseRequestPBImpl convertFromProtoFormat(
+      ContainerResourceIncreaseRequestProto p) {
+    return new ContainerResourceIncreaseRequestPBImpl(p);
+  }
+
+  private ContainerResourceIncreaseRequestProto convertToProtoFormat(
+      ContainerResourceIncreaseRequest t) {
+    return ((ContainerResourceIncreaseRequestPBImpl) t).getProto();
+  }
 
   private ContainerIdPBImpl convertFromProtoFormat(ContainerIdProto p) {
     return new ContainerIdPBImpl(p);
@@ -308,6 +392,4 @@ public class AllocateRequestPBImpl exten
   private ResourceBlacklistRequestProto convertToProtoFormat(ResourceBlacklistRequest t)
{
     return ((ResourceBlacklistRequestPBImpl)t).getProto();
   }
-
-
 }  

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java?rev=1549627&r1=1549626&r2=1549627&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java
(original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java
Mon Dec  9 17:44:29 2013
@@ -28,12 +28,16 @@ import org.apache.hadoop.classification.
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
 import org.apache.hadoop.yarn.api.records.AMCommand;
 import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerResourceDecrease;
+import org.apache.hadoop.yarn.api.records.ContainerResourceIncrease;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.NMToken;
 import org.apache.hadoop.yarn.api.records.NodeReport;
 import org.apache.hadoop.yarn.api.records.PreemptionMessage;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.impl.pb.ContainerPBImpl;
+import org.apache.hadoop.yarn.api.records.impl.pb.ContainerResourceDecreasePBImpl;
+import org.apache.hadoop.yarn.api.records.impl.pb.ContainerResourceIncreasePBImpl;
 import org.apache.hadoop.yarn.api.records.impl.pb.ContainerStatusPBImpl;
 import org.apache.hadoop.yarn.api.records.impl.pb.NMTokenPBImpl;
 import org.apache.hadoop.yarn.api.records.impl.pb.NodeReportPBImpl;
@@ -41,6 +45,8 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils;
 import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl;
 import org.apache.hadoop.yarn.proto.YarnProtos.ContainerProto;
+import org.apache.hadoop.yarn.proto.YarnProtos.ContainerResourceDecreaseProto;
+import org.apache.hadoop.yarn.proto.YarnProtos.ContainerResourceIncreaseProto;
 import org.apache.hadoop.yarn.proto.YarnProtos.ContainerStatusProto;
 import org.apache.hadoop.yarn.proto.YarnProtos.NodeReportProto;
 import org.apache.hadoop.yarn.proto.YarnProtos.PreemptionMessageProto;
@@ -63,6 +69,8 @@ public class AllocateResponsePBImpl exte
   private List<Container> allocatedContainers = null;
   private List<NMToken> nmTokens = null;
   private List<ContainerStatus> completedContainersStatuses = null;
+  private List<ContainerResourceIncrease> increasedContainers = null;
+  private List<ContainerResourceDecrease> decreasedContainers = null;
 
   private List<NodeReport> updatedNodes = null;
   private PreemptionMessage preempt;
@@ -108,7 +116,7 @@ public class AllocateResponsePBImpl exte
     if (this.allocatedContainers != null) {
       builder.clearAllocatedContainers();
       Iterable<ContainerProto> iterable =
-          getProtoIterable(this.allocatedContainers);
+          getContainerProtoIterable(this.allocatedContainers);
       builder.addAllAllocatedContainers(iterable);
     }
     if (nmTokens != null) {
@@ -134,6 +142,18 @@ public class AllocateResponsePBImpl exte
     if (this.preempt != null) {
       builder.setPreempt(convertToProtoFormat(this.preempt));
     }
+    if (this.increasedContainers != null) {
+      builder.clearIncreasedContainers();
+      Iterable<ContainerResourceIncreaseProto> iterable =
+          getIncreaseProtoIterable(this.increasedContainers);
+      builder.addAllIncreasedContainers(iterable);
+    }
+    if (this.decreasedContainers != null) {
+      builder.clearDecreasedContainers();
+      Iterable<ContainerResourceDecreaseProto> iterable =
+          getChangeProtoIterable(this.decreasedContainers);
+      builder.addAllDecreasedContainers(iterable);
+    }
   }
 
   private synchronized void mergeLocalToProto() {
@@ -306,6 +326,63 @@ public class AllocateResponsePBImpl exte
     this.preempt = preempt;
   }
 
+  @Override
+  public synchronized List<ContainerResourceIncrease> getIncreasedContainers() {
+    initLocalIncreasedContainerList();
+    return increasedContainers;
+  }
+
+  @Override
+  public synchronized void setIncreasedContainers(
+      List<ContainerResourceIncrease> increasedContainers) {
+    if (increasedContainers == null)
+      return;
+    initLocalIncreasedContainerList();
+    this.increasedContainers.addAll(increasedContainers);
+  }
+
+  @Override
+  public synchronized List<ContainerResourceDecrease> getDecreasedContainers() {
+    initLocalDecreasedContainerList();
+    return decreasedContainers;
+  }
+
+  @Override
+  public synchronized void setDecreasedContainers(
+      List<ContainerResourceDecrease> decreasedContainers) {
+    if (decreasedContainers == null) {
+      return;
+    }
+    initLocalDecreasedContainerList();
+    this.decreasedContainers.addAll(decreasedContainers);
+  }
+
+  private synchronized void initLocalIncreasedContainerList() {
+    if (this.increasedContainers != null) {
+      return;
+    }
+    AllocateResponseProtoOrBuilder p = viaProto ? proto : builder;
+    List<ContainerResourceIncreaseProto> list = p.getIncreasedContainersList();
+    increasedContainers = new ArrayList<ContainerResourceIncrease>();
+
+    for (ContainerResourceIncreaseProto c : list) {
+      increasedContainers.add(convertFromProtoFormat(c));
+    }
+  }
+
+  private synchronized void initLocalDecreasedContainerList() {
+    if (this.decreasedContainers != null) {
+      return;
+    }
+    AllocateResponseProtoOrBuilder p = viaProto ? proto : builder;
+    List<ContainerResourceDecreaseProto> list = p.getDecreasedContainersList();
+    decreasedContainers = new ArrayList<ContainerResourceDecrease>();
+
+    for (ContainerResourceDecreaseProto c : list) {
+      decreasedContainers.add(convertFromProtoFormat(c));
+    }
+  }
+
   // Once this is called. updatedNodes will never be null - until a getProto is
   // called.
   private synchronized void initLocalNewNodeReportList() {
@@ -348,7 +425,71 @@ public class AllocateResponsePBImpl exte
     }
   }
 
-  private synchronized Iterable<ContainerProto> getProtoIterable(
+  private synchronized Iterable<ContainerResourceIncreaseProto>
+      getIncreaseProtoIterable(
+          final List<ContainerResourceIncrease> newContainersList) {
+    maybeInitBuilder();
+    return new Iterable<ContainerResourceIncreaseProto>() {
+      @Override
+      public synchronized Iterator<ContainerResourceIncreaseProto> iterator() {
+        return new Iterator<ContainerResourceIncreaseProto>() {
+
+          Iterator<ContainerResourceIncrease> iter = newContainersList
+              .iterator();
+
+          @Override
+          public synchronized boolean hasNext() {
+            return iter.hasNext();
+          }
+
+          @Override
+          public synchronized ContainerResourceIncreaseProto next() {
+            return convertToProtoFormat(iter.next());
+          }
+
+          @Override
+          public synchronized void remove() {
+            throw new UnsupportedOperationException();
+          }
+        };
+
+      }
+    };
+  }
+
+  private synchronized Iterable<ContainerResourceDecreaseProto>
+      getChangeProtoIterable(
+          final List<ContainerResourceDecrease> newContainersList) {
+    maybeInitBuilder();
+    return new Iterable<ContainerResourceDecreaseProto>() {
+      @Override
+      public synchronized Iterator<ContainerResourceDecreaseProto> iterator() {
+        return new Iterator<ContainerResourceDecreaseProto>() {
+
+          Iterator<ContainerResourceDecrease> iter = newContainersList
+              .iterator();
+
+          @Override
+          public synchronized boolean hasNext() {
+            return iter.hasNext();
+          }
+
+          @Override
+          public synchronized ContainerResourceDecreaseProto next() {
+            return convertToProtoFormat(iter.next());
+          }
+
+          @Override
+          public synchronized void remove() {
+            throw new UnsupportedOperationException();
+          }
+        };
+
+      }
+    };
+  }
+  
+  private synchronized Iterable<ContainerProto> getContainerProtoIterable(
       final List<Container> newContainersList) {
     maybeInitBuilder();
     return new Iterable<ContainerProto>() {
@@ -467,7 +608,6 @@ public class AllocateResponsePBImpl exte
 
           }
         };
-
       }
     };
   }
@@ -486,6 +626,26 @@ public class AllocateResponsePBImpl exte
       completedContainersStatuses.add(convertFromProtoFormat(c));
     }
   }
+  
+  private synchronized ContainerResourceIncrease convertFromProtoFormat(
+      ContainerResourceIncreaseProto p) {
+    return new ContainerResourceIncreasePBImpl(p);
+  }
+
+  private synchronized ContainerResourceIncreaseProto convertToProtoFormat(
+      ContainerResourceIncrease t) {
+    return ((ContainerResourceIncreasePBImpl) t).getProto();
+  }
+
+  private synchronized ContainerResourceDecrease convertFromProtoFormat(
+      ContainerResourceDecreaseProto p) {
+    return new ContainerResourceDecreasePBImpl(p);
+  }
+
+  private synchronized ContainerResourceDecreaseProto convertToProtoFormat(
+      ContainerResourceDecrease t) {
+    return ((ContainerResourceDecreasePBImpl) t).getProto();
+  }
 
   private synchronized NodeReportPBImpl convertFromProtoFormat(
       NodeReportProto p) {
@@ -500,8 +660,9 @@ public class AllocateResponsePBImpl exte
       ContainerProto p) {
     return new ContainerPBImpl(p);
   }
-
-  private synchronized ContainerProto convertToProtoFormat(Container t) {
+  
+  private synchronized ContainerProto convertToProtoFormat(
+      Container t) {
     return ((ContainerPBImpl)t).getProto();
   }
 

Added: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestAllocateRequest.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestAllocateRequest.java?rev=1549627&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestAllocateRequest.java
(added)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestAllocateRequest.java
Mon Dec  9 17:44:29 2013
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.api;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import junit.framework.Assert;
+
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.AllocateRequestPBImpl;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ContainerResourceIncreaseRequest;
+import org.apache.hadoop.yarn.proto.YarnServiceProtos.AllocateRequestProto;
+import org.junit.Test;
+
+public class TestAllocateRequest {
+  @Test
+  public void testAllcoateRequestWithIncrease() {
+    List<ContainerResourceIncreaseRequest> incRequests =
+        new ArrayList<ContainerResourceIncreaseRequest>();
+    for (int i = 0; i < 3; i++) {
+      incRequests.add(ContainerResourceIncreaseRequest.newInstance(null,
+          Resource.newInstance(0, i)));
+    }
+    AllocateRequest r =
+        AllocateRequest.newInstance(123, 0f, null, null, null, incRequests);
+
+    // serde
+    AllocateRequestProto p = ((AllocateRequestPBImpl) r).getProto();
+    r = new AllocateRequestPBImpl(p);
+
+    // check value
+    Assert.assertEquals(123, r.getResponseId());
+    Assert.assertEquals(incRequests.size(), r.getIncreaseRequests().size());
+
+    for (int i = 0; i < incRequests.size(); i++) {
+      Assert.assertEquals(r.getIncreaseRequests().get(i).getCapability()
+          .getVirtualCores(), incRequests.get(i).getCapability()
+          .getVirtualCores());
+    }
+  }
+
+  @Test
+  public void testAllcoateRequestWithoutIncrease() {
+    AllocateRequest r =
+        AllocateRequest.newInstance(123, 0f, null, null, null, null);
+
+    // serde
+    AllocateRequestProto p = ((AllocateRequestPBImpl) r).getProto();
+    r = new AllocateRequestPBImpl(p);
+
+    // check value
+    Assert.assertEquals(123, r.getResponseId());
+    Assert.assertEquals(0, r.getIncreaseRequests().size());
+  }
+}

Added: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestAllocateResponse.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestAllocateResponse.java?rev=1549627&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestAllocateResponse.java
(added)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestAllocateResponse.java
Mon Dec  9 17:44:29 2013
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.api;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import junit.framework.Assert;
+
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.AllocateResponsePBImpl;
+import org.apache.hadoop.yarn.api.records.AMCommand;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerResourceDecrease;
+import org.apache.hadoop.yarn.api.records.ContainerResourceIncrease;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.NMToken;
+import org.apache.hadoop.yarn.api.records.NodeReport;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.proto.YarnServiceProtos.AllocateResponseProto;
+import org.junit.Test;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+public class TestAllocateResponse {
+  @Test
+  public void testAllocateResponseWithIncDecContainers() {
+    List<ContainerResourceIncrease> incContainers =
+        new ArrayList<ContainerResourceIncrease>();
+    List<ContainerResourceDecrease> decContainers =
+        new ArrayList<ContainerResourceDecrease>();
+    for (int i = 0; i < 3; i++) {
+      incContainers.add(ContainerResourceIncrease.newInstance(null,
+          Resource.newInstance(1024, i), null));
+    }
+    for (int i = 0; i < 5; i++) {
+      decContainers.add(ContainerResourceDecrease.newInstance(null,
+          Resource.newInstance(1024, i)));
+    }
+
+    AllocateResponse r =
+        AllocateResponse.newInstance(3, new ArrayList<ContainerStatus>(),
+            new ArrayList<Container>(), new ArrayList<NodeReport>(), null,
+            AMCommand.AM_RESYNC, 3, null, new ArrayList<NMToken>(),
+            incContainers, decContainers);
+
+    // serde
+    AllocateResponseProto p = ((AllocateResponsePBImpl) r).getProto();
+    r = new AllocateResponsePBImpl(p);
+
+    // check value
+    Assert
+        .assertEquals(incContainers.size(), r.getIncreasedContainers().size());
+    Assert
+        .assertEquals(decContainers.size(), r.getDecreasedContainers().size());
+
+    for (int i = 0; i < incContainers.size(); i++) {
+      Assert.assertEquals(i, r.getIncreasedContainers().get(i).getCapability()
+          .getVirtualCores());
+    }
+
+    for (int i = 0; i < decContainers.size(); i++) {
+      Assert.assertEquals(i, r.getDecreasedContainers().get(i).getCapability()
+          .getVirtualCores());
+    }
+  }
+
+  @Test
+  public void testAllocateResponseWithoutIncDecContainers() {
+    AllocateResponse r =
+        AllocateResponse.newInstance(3, new ArrayList<ContainerStatus>(),
+            new ArrayList<Container>(), new ArrayList<NodeReport>(), null,
+            AMCommand.AM_RESYNC, 3, null, new ArrayList<NMToken>(), null, null);
+
+    // serde
+    AllocateResponseProto p = ((AllocateResponsePBImpl) r).getProto();
+    r = new AllocateResponsePBImpl(p);
+
+    // check value
+    Assert.assertEquals(0, r.getIncreasedContainers().size());
+    Assert.assertEquals(0, r.getDecreasedContainers().size());
+  }
+}



Mime
View raw message