hadoop-yarn-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Varun Vasudev (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (YARN-4676) Automatic and Asynchronous Decommissioning Nodes Status Tracking
Date Thu, 05 May 2016 15:58:13 GMT

    [ https://issues.apache.org/jira/browse/YARN-4676?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15272550#comment-15272550
] 

Varun Vasudev commented on YARN-4676:
-------------------------------------

Thanks for the patch [~danzhi]. My apologies for coming in late but I have some concerns about
the patch and the approach.

The code to read the hostnames and timeouts in HostsFileReader is a little fragile and may
lead to problems.

1.
{code}
+              // look ahead for optional timeout values
+              Integer timeout = null;
+              if (i < nodes.length - 1) {
+                timeout = tryParseInteger(nodes[i+1]);
+              }
+              map.put(nodes[i], timeout);
+              // skip the timeout if exist
+              if (timeout != null) {
+                i++;
+              }
{code}

This code assumes that the node names are non-numerical - this is assumption is not correct.
As per RFC 1123, you can have hostnames made up entirely of digits. It also looks like we
decommission nodes based on hostname only, whereas it is possible to run multiple nodemanagers
on a node - this is probably something we can revisit later.

2.
{code}
+              map.put(nodes[i], timeout);
{code}
{code}
+  private static Integer tryParseInteger(String str) {
+    try{
+      int num = Integer.parseInt(str);
+      return num;
+    } catch (Exception e) {
+      return null;
+    }
+  }
{code}

Is it possible for us to use -1 instead of null to specify that a timeout wasn't specified?

3.
{code}
+  private static void prettyLogMap(
+      String type, Map<String, Integer> excludes, String filename) {
+    if (excludes.size() == 0) {
+      return;
+    }
+    StringBuilder sb = new StringBuilder();
+    for (Entry<String, Integer> n : excludes.entrySet()) {
+      if (n.getValue() != null) {
+        sb.append(String.format("%n  %s : %d", n.getKey(), n.getValue()));
+      } else {
+        sb.append(String.format("%n  %s", n.getKey()));
+      }
+    }
+    LOG.info("List of " + type + " hosts from " + filename + sb.toString());
+  }
{code}

Instead of %n, can we just print all the hosts on a single line so that we can use grep to
filter out the lines.

4.
{code}
+  @Test
+  public void testHostFileReaderWithTimeout() throws Exception {
{code}

The test needs to be updated to include numeric hostnames.

5.
{code}
+
+    @Override
+    public Integer getDecommissioningTimeout() {
+      return null;
+    }
+  /**
+   * Get the DecommissionTimeout.
+   *
+   * @return decommissionTimeout
+   */
+  public abstract Integer getDecommissionTimeout();
   }
{code}

Similar to above, can we user -1 instead of null?

6.
{code}
+  public static final String DECOMMISSIONING_DEFAULT_TIMEOUT_KEY =
+      RM_PREFIX + "decommissioning.default.timeout";
+  public static final int DEFAULT_DECOMMISSIONING_TIMEOUT = 3600;
{code}

Can you please rename DECOMMISSIONING_DEFAULT_TIMEOUT_KEY to RM_NODE_GRACEFUL_DECOMMISSION_TIMEOUT,
"decommissioning.default.timeout" to "nodemanager-graceful-decommission-timeout-secs" and
DEFAULT_DECOMMISSIONING_TIMEOUT to DEFAULT_RM_NODE_GRACEFUL_DECOMMISSION_TIMEOUT

7.
{code}
+  public static final String NM_EXIT_WAIT_MS = NM_PREFIX + "exit-wait.ms";
+  public static final long DEFAULT_NM_EXIT_WAIT_MS = 5000;
{code}

I saw your reasoning for this in your earlier comments, but I'm not convinced this should
be in the YARN nodemanager. This seems like an issue with the EMR setup. The change adds a
wait time for all shutdowns. Please remove it.

8.
{code}
+    // Additional seconds to wait before forcefully decommission nodes.
+    // This is usually not needed since RM enforces timeout automatically.
+    final int gracePeriod = 20;
{code}
Can you explain why this is needed? And why 20 seconds for the grace period?

9.
{code}
+      if ("-g".equals(args[1]) || "-graceful".equals(args[1])) {
+        if (args.length == 3) {
+          int timeout = validateTimeout(args[2]);
+          return refreshNodes(timeout);
+        } else {
+          return refreshNodes(true);
+        }
+      }
{code}

Just to clarify my understanding here -
yarn rmadmin -refreshNodes -g 1000 will decommission node gracefully up to a limit of 1000
seconds after which it will forcefully shut down the nodes
yarn rmadmin -refreshNodes -g -1 will gracefully shutdown the nodes with the timeout being
the value of yarn.resourcemanager.node-graceful-decommission-timeout
yarn rmadmin -refreshNodes -g is the same as "yarn rmadmin -refreshNodes -g -1"
Is my understanding correct?

10.
{code}
+  @Override
+  public synchronized void setDecommissionTimeout(Integer timeout) {
+    maybeInitBuilder();
+    if (timeout != null) {
+      builder.setDecommissionTimeout(timeout);
+    } else {
+      builder.clearDecommissionTimeout();
+    }
+  }
+
+  @Override
+  public synchronized Integer getDecommissionTimeout() {
+    RefreshNodesRequestProtoOrBuilder p = viaProto ? proto : builder;
+    return p.hasDecommissionTimeout()? p.getDecommissionTimeout() : null;
+  }
{code}

You've used Integer everywhere. Is there any reason you didn't go with int(apart from the
HashMap)?

11.
{code}
+    <name>yarn.resourcemanager.decommissioning.default.timeout</name>
{code}
Please change the name to match the config above.

12.
{code}
+      long exitWaitMs = conf.getLong(YarnConfiguration.NM_EXIT_WAIT_MS,
+          YarnConfiguration.DEFAULT_NM_EXIT_WAIT_MS);
+      LOG.fatal("Exit in " + exitWaitMs + " milliseconds");
+      if (exitWaitMs > 0) {
+        try {
+          Thread.sleep(exitWaitMs);
+        } catch (InterruptedException e) {
+        }
+      }
{code}

Please remove this.

13.
{code}
+import com.google.common.base.Stopwatch;
...
+  private Stopwatch pollWatch = new Stopwatch().start();
{code}

Please switch to the Hadoop stopwatch(org.apache.hadoop.util.StopWatch)

14.
{code}
+      // keep DECOMMISSIONED node for a while for status log.
+      if (context.decommissionedTime == 0) {
+        context.decommissionedTime = now;
+      } else if (now - context.decommissionedTime > 60000L) {
+        decomNodes.remove(rmNode.getNodeID());
+        LOG.info("remove " + rmNode.getState() + " " + rmNode.getNodeID());
+      }
{code}

Can you please explain why we need to keep nodes just for logging purposes after they've been
decommissioned? Is the state change to DECOMMISSIONED not logged?

15.
{code}
+  public boolean checkReadyToBeDecommissioned(NodeId nodeId) {
...
+    removeCompletedApps(context);
{code}
Can we move the removeCompleteApps outside this function? It doesn't seem intuitive that the
checkReadyToBeDecommissioned function needs to call removeCompletedApps.

16.
{code}
+    readDecommissioningTimeout(null);
{code}

Can you please remove this call and the function implementation? Creating a YarnConfiguration
object is a fairly expensive call and there's no reason to be doing every time poll is called.
The default value for the timeout is unlikely to change so often as to require reading it
repeatedly from the config file.

17.
{code}
+        LOG.info("Consider non-existing app " + appId + " as completed");
+        LOG.info("Remove " + rmApp.getState() + " app " + appId);
{code}

Please change the log levels for these to debug.

18.
{code}
+  private int getTimeoutInSec(DecommissioningNodeContext context) {
{code}

Rename getTimeoutInSec to getTimeoutTimestampInSec.

19.
{code}
+  private void logDecommissioningNodesStatus() {
+    if (decomNodes.size() == 0) {
+      return;
+    }
+    StringBuilder sb = new StringBuilder();
+    long now = System.currentTimeMillis();
+    for (DecommissioningNodeContext d : decomNodes.values()) {
+      DecomNodeStatus s = getDecommissioningStatus(d.nodeId);
+      sb.append(String.format(
+          "%n  %-34s %4ds fresh:%3ds containers:%2d %14s",
+          d.nodeId.getHost(),
+          (now - d.decommissioningStartTime) / 1000,
+          (now - d.lastUpdateTime) / 1000,
+          d.numActiveContainers,
+          s));
+      if (s == DecomNodeStatus.WAIT_APP ||
+          s == DecomNodeStatus.WAIT_CONTAINER) {
+        sb.append(String.format(" timeout:%4ds", getTimeoutInSec(d)));
+      }
+      for (ApplicationId aid : d.appIds) {
+        sb.append("\n    " + aid);
+        RMApp rmApp = rmContext.getRMApps().get(aid);
+        if (rmApp != null) {
+          sb.append(String.format(
+              " %s %9s %5.2f%% %5ds",
+              rmApp.getState(),
+              (rmApp.getApplicationType() == null)?
+                  "" : rmApp.getApplicationType(),
+              100.0 * rmApp.getProgress(),
+              (System.currentTimeMillis() - rmApp.getStartTime()) / 1000));
+        }
+      }
+    }
+    LOG.info("Decommissioning Nodes: " + sb.toString());
+  }
{code}

Instead of logging the status every 20 seconds, can we log it only in debug mode? Is there
any benefit to logging at info level?

20.
{code}
+  private DecommissioningNodesWatcher decomWatcher;
+
{code}

Please rename decomWatcher to decommissioningWatcher

21.
{code}
+    // Evaluate whether a DECOMMISSIONING node is ready to be DECOMMISSIONED.
+    if (rmNode.getState() == NodeState.DECOMMISSIONING &&
+        decomWatcher.checkReadyToBeDecommissioned(rmNode.getNodeID())) {
+      String message = "DECOMMISSIONING " + nodeId +
+          " is ready to be decommissioned";
+      LOG.info(message);
+      this.rmContext.getDispatcher().getEventHandler().handle(
+          new RMNodeEvent(nodeId, RMNodeEventType.DECOMMISSION));
+      return YarnServerBuilderUtils.newNodeHeartbeatResponse(
+          NodeAction.SHUTDOWN, message);
+    }
{code}

comes after 

{code}
    if (!this.nodesListManager.isValidNode(nodeId.getHost())
        && !isNodeInDecommissioning(nodeId)) {
      String message =
          "Disallowed NodeManager nodeId: " + nodeId + " hostname: "
              + nodeId.getHost();
      LOG.info(message);
      return YarnServerBuilderUtils.newNodeHeartbeatResponse(
          NodeAction.SHUTDOWN, message);
    }

{code}
Can you please clarify something here - won't this condition always be true if decomWatcher.checkReadyToBeDecommissioned(rmNode.getNodeID())
is true? When will the code you added be called?

22.
{code}
+  public Set<NodeId> poll() {
+    if (decomNodes.size() == 0 ||
+        pollWatch.elapsedTime(TimeUnit.SECONDS) < 20) {
+      return emptyNodeIdSet;
+    }
+    return pollInternal();
+  }
+
+  private synchronized Set<NodeId> pollInternal() {
+    pollWatch.reset().start();
+    readDecommissioningTimeout(null);
+    logDecommissioningNodesStatus();
+    long now = System.currentTimeMillis();
+    Set<NodeId> output = new HashSet<NodeId>();
+
+    for (Iterator<Map.Entry<NodeId, DecommissioningNodeContext>> it =
+        decomNodes.entrySet().iterator(); it.hasNext();) {
+      Map.Entry<NodeId, DecommissioningNodeContext> e = it.next();
+      DecommissioningNodeContext d = e.getValue();
+      // Skip node recently updated (NM usually updates every second).
+      if (now - d.lastUpdateTime < 30000L) {
+        continue;
+      }
+      // Remove stale non-DECOMMISSIONING node
+      if (d.nodeState != NodeState.DECOMMISSIONING) {
+        LOG.info("remove " + d.nodeState + " " + d.nodeId);
+        it.remove();
+        continue;
+      } else if (now - d.lastUpdateTime > 60000L) {
+        // Node DECOMMISSIONED could become stale, check RMNode state to remove.
+        RMNode rmNode = getRmNode(d.nodeId);
+        if (rmNode != null && rmNode.getState() == NodeState.DECOMMISSIONED) {
+          LOG.info("remove " + rmNode.getState() + " " + d.nodeId);
+          it.remove();
+          continue;
+        }
+      }
+      if (d.timeoutMs >= 0 && d.decommissioningStartTime + d.timeoutMs < now)
{
+        output.add(d.nodeId);
+        LOG.info("Identified stale and timeout node " + d.nodeId);
+      }
+    }
+    return output;
+  }
{code}
{code}
+    pollDecommissioningNodesWatcher();
{code}

We shouldn't be calling this for every node heartbeat. It seems from your implementation that
this should really be in it's own thread with a timer. That way you can avoid checks like
last called time, etc. I'm not even sure you need to call pollDecommissioningNodesWatcher
in ResourceTrackerService. It seems to me that it can be contained entirely in DecommissioningNodesWatcher.

23.
{code}
+import com.google.common.base.Joiner;
{code}

Please use the Apache commons join from StringUtils.

24.
{code}
 public class RMNodeEvent extends AbstractEvent<RMNodeEventType> {
 
   private final NodeId nodeId;
+  // Optional decommissioning timeout in second.
+  private final Integer decommissioningTimeout;
 
   public RMNodeEvent(NodeId nodeId, RMNodeEventType type) {
     super(type);
     this.nodeId = nodeId;
+    this.decommissioningTimeout = null;
+  }
+
+  // Create instance with optional timeout
+  // (timeout could be null which means use default).
+  public RMNodeEvent(NodeId nodeId, RMNodeEventType type, Integer timeout) {
+    super(type);
+    this.nodeId = nodeId;
+    this.decommissioningTimeout = timeout;
   }
 
   public NodeId getNodeId() {
     return this.nodeId;
   }
+
+  public Integer getDecommissioningTimeout() {
+    return this.decommissioningTimeout;
+  }
{code}

Instead of modifying the RMNodeEvent class, please create a new class which inherits from
RMNodeEvent for your purpose with the custom constructor.

25.
With regards to work preserving restart, I think the command should just exit with a message
saying that the feature is not supported when work preserving restart is enabled and not do
anything. Once support for work preserving restart has been added, the message can be removed.

Again, my apologies for coming in late with the review.

> Automatic and Asynchronous Decommissioning Nodes Status Tracking
> ----------------------------------------------------------------
>
>                 Key: YARN-4676
>                 URL: https://issues.apache.org/jira/browse/YARN-4676
>             Project: Hadoop YARN
>          Issue Type: Sub-task
>          Components: resourcemanager
>    Affects Versions: 2.8.0
>            Reporter: Daniel Zhi
>            Assignee: Daniel Zhi
>              Labels: features
>         Attachments: GracefulDecommissionYarnNode.pdf, GracefulDecommissionYarnNode.pdf,
YARN-4676.004.patch, YARN-4676.005.patch, YARN-4676.006.patch, YARN-4676.007.patch, YARN-4676.008.patch,
YARN-4676.009.patch, YARN-4676.010.patch, YARN-4676.011.patch, YARN-4676.012.patch, YARN-4676.013.patch
>
>
> YARN-4676 implements an automatic, asynchronous and flexible mechanism to graceful decommission
> YARN nodes. After user issues the refreshNodes request, ResourceManager automatically
evaluates
> status of all affected nodes to kicks out decommission or recommission actions. RM asynchronously
> tracks container and application status related to DECOMMISSIONING nodes to decommission
the
> nodes immediately after there are ready to be decommissioned. Decommissioning timeout
at individual
> nodes granularity is supported and could be dynamically updated. The mechanism naturally
supports multiple
> independent graceful decommissioning “sessions” where each one involves different
sets of nodes with
> different timeout settings. Such support is ideal and necessary for graceful decommission
request issued
> by external cluster management software instead of human.
> DecommissioningNodeWatcher inside ResourceTrackingService tracks DECOMMISSIONING nodes
status automatically and asynchronously after client/admin made the graceful decommission
request. It tracks DECOMMISSIONING nodes status to decide when, after all running containers
on the node have completed, will be transitioned into DECOMMISSIONED state. NodesListManager
detect and handle include and exclude list changes to kick out decommission or recommission
as necessary.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: yarn-issues-unsubscribe@hadoop.apache.org
For additional commands, e-mail: yarn-issues-help@hadoop.apache.org


Mime
View raw message