apex-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From vro...@apache.org
Subject apex-core git commit: APEXCORE-624 decrement unallocated containers and also released containers so that the exit condition for the shutdown check is satisfied.
Date Fri, 17 Feb 2017 01:51:37 GMT
Repository: apex-core
Updated Branches:
  refs/heads/release-3.2 332810722 -> f84c035c2


APEXCORE-624 decrement unallocated containers and also released containers so that the exit
condition for the shutdown check is satisfied.


Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/f84c035c
Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/f84c035c
Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/f84c035c

Branch: refs/heads/release-3.2
Commit: f84c035c205526a7412a228661a9c7d45c5b7f15
Parents: 3328107
Author: Sanjay Pujare <sanjaypujare@Sanjay-DT-Mac2.local>
Authored: Fri Jan 27 10:35:09 2017 -0800
Committer: Sanjay Pujare <sanjaypujare@Sanjay-DT-Mac2.local>
Committed: Wed Feb 15 15:00:29 2017 -0800

----------------------------------------------------------------------
 .../stram/StreamingAppMasterService.java            | 16 ++++++++--------
 1 file changed, 8 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-core/blob/f84c035c/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java b/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java
index d3f674a..10b78f2 100644
--- a/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java
+++ b/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java
@@ -770,6 +770,7 @@ public class StreamingAppMasterService extends CompositeService
         for (Map.Entry<StreamingContainerAgent.ContainerStartRequest, MutablePair<Integer,
ContainerRequest>> entry : requestedResources.entrySet()) {
           if ((loopCounter - entry.getValue().getKey()) > NUMBER_MISSED_HEARTBEATS) {
             StreamingContainerAgent.ContainerStartRequest csr = entry.getKey();
+            LOG.debug("Request for container {} timed out. Re-requesting container", csr.container);
             removedContainerRequests.add(entry.getValue().getRight());
             ContainerRequest cr = resourceRequestor.createContainerRequest(csr, false);
             entry.getValue().setLeft(loopCounter);
@@ -779,7 +780,7 @@ public class StreamingAppMasterService extends CompositeService
         }
       }
 
-     /* Remove nodes from blacklist after timeout */
+      /* Remove nodes from blacklist after timeout */
       long currentTime = System.currentTimeMillis();
       List<String> blacklistRemovals = new ArrayList<String>();
       for (Iterator<Pair<Long, List<String>>> it = blacklistedNodesQueueWithTimeStamp.iterator();
it.hasNext();) {
@@ -797,7 +798,7 @@ public class StreamingAppMasterService extends CompositeService
       }
 
       numTotalContainers += containerRequests.size();
-      numRequestedContainers += containerRequests.size();
+      numRequestedContainers += containerRequests.size() - removedContainerRequests.size();
       AllocateResponse amResp = sendContainerAskToRM(containerRequests, removedContainerRequests,
releasedContainers);
       if (amResp.getAMCommand() != null) {
         LOG.info(" statement executed:{}", amResp.getAMCommand());
@@ -836,7 +837,7 @@ public class StreamingAppMasterService extends CompositeService
           LOG.info("Releasing {} as resource with priority {} was already assigned", allocatedContainer.getId(),
allocatedContainer.getPriority());
           releasedContainers.add(allocatedContainer.getId());
           numReleasedContainers++;
-          numRequestedContainers++;
+          numRequestedContainers--;
           continue;
         }
         if (csr != null) {
@@ -964,7 +965,8 @@ public class StreamingAppMasterService extends CompositeService
         appDone = true;
       }
 
-      LOG.debug("Current application state: loop=" + loopCounter + ", appDone=" + appDone
+ ", total=" + numTotalContainers + ", requested=" + numRequestedContainers + ", released="
+ numReleasedContainers + ", completed=" + numCompletedContainers + ", failed=" + numFailedContainers
+ ", currentAllocated=" + allocatedContainers.size());
+      LOG.debug("Current application state: loop={}, appDone={}, total={}, requested={},
released={}, completed={}, failed={}, currentAllocated={}, dnmgr.containerStartRequests={}",
+            loopCounter, appDone, numTotalContainers, numRequestedContainers, numReleasedContainers,
numCompletedContainers, numFailedContainers, allocatedContainers.size(), dnmgr.containerStartRequests);
 
       // monitor child containers
       dnmgr.monitorHeartbeat();
@@ -1038,16 +1040,14 @@ public class StreamingAppMasterService extends CompositeService
   private AllocateResponse sendContainerAskToRM(List<ContainerRequest> containerRequests,
List<ContainerRequest> removedContainerRequests, List<ContainerId> releasedContainers)
throws YarnException, IOException
   {
     if (removedContainerRequests.size() > 0) {
-      LOG.info(" Removing container request: " + removedContainerRequests);
+      LOG.debug("Removing container request: {}", removedContainerRequests);
       for (ContainerRequest cr : removedContainerRequests) {
-        LOG.info("Removed container: {}", cr.toString());
         amRmClient.removeContainerRequest(cr);
       }
     }
     if (containerRequests.size() > 0) {
-      LOG.info("Asking RM for containers: " + containerRequests);
+      LOG.debug("Asking RM for containers: {}", containerRequests);
       for (ContainerRequest cr : containerRequests) {
-        LOG.info("Requested container: {}", cr.toString());
         amRmClient.addContainerRequest(cr);
       }
     }


Mime
View raw message