hive-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Work logged] (HIVE-21912) Implement BlacklistingLlapMetricsListener
Date Wed, 03 Jul 2019 17:46:02 GMT

     [ https://issues.apache.org/jira/browse/HIVE-21912?focusedWorklogId=271775&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-271775
]

ASF GitHub Bot logged work on HIVE-21912:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 03/Jul/19 17:45
            Start Date: 03/Jul/19 17:45
    Worklog Time Spent: 10m 
      Work Description: odraese commented on pull request #698: HIVE-21912: Implement DisablingDaemonStatisticsHandler
URL: https://github.com/apache/hive/pull/698#discussion_r300077210
 
 

 ##########
 File path: llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/metrics/BlacklistingLlapMetricsListener.java
 ##########
 @@ -0,0 +1,200 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.llap.tezplugins.metrics;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.protobuf.ServiceException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SetCapacityRequestProto;
+import org.apache.hadoop.hive.llap.impl.LlapManagementProtocolClientImpl;
+import org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo;
+import org.apache.hadoop.hive.llap.registry.LlapServiceInstance;
+import org.apache.hadoop.hive.llap.registry.impl.LlapRegistryService;
+import org.apache.hadoop.hive.llap.registry.impl.LlapZookeeperRegistryImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Implementation of MetricsListener which blacklists slow nodes based on the statistics.
+ */
+public class BlacklistingLlapMetricsListener implements LlapMetricsListener {
+  private static final Logger LOG = LoggerFactory.getLogger(BlacklistingLlapMetricsListener.class);
+  private LlapRegistryService registry;
+  private LlapManagementProtocolClientImplFactory clientFactory;
+  private int minServedTasksNumber;
+  private int maxBlacklistedNodes;
+  private long minConfigChangeDelayMs;
+  private float timeThreshold;
+  private float emptyExecutorsThreshold;
+  @VisibleForTesting
+  long nextCheckTime = Long.MIN_VALUE;
+
+  @VisibleForTesting
+  void init(Configuration conf, LlapRegistryService registry, LlapManagementProtocolClientImplFactory
clientFactory) {
+    this.registry = registry;
+    this.clientFactory = clientFactory;
+    this.minServedTasksNumber =
+        HiveConf.getIntVar(conf, HiveConf.ConfVars.LLAP_TASK_SCHEDULER_BLACKLISTING_METRICS_LISTENER_MIN_SERVED_TASKS);
+    this.minConfigChangeDelayMs =
+        HiveConf.getTimeVar(conf, HiveConf.ConfVars.LLAP_TASK_SCHEDULER_BLACKLISTING_METRICS_LISTENER_MIN_CHANGE_DELAY,
+            TimeUnit.MILLISECONDS);
+    this.timeThreshold =
+        HiveConf.getFloatVar(conf, HiveConf.ConfVars.LLAP_TASK_SCHEDULER_BLACKLISTING_METRICS_LISTENER_TIME_THRESHOLD);
+    this.emptyExecutorsThreshold =
+        HiveConf.getFloatVar(conf,
+            HiveConf.ConfVars.LLAP_TASK_SCHEDULER_BLACKLISTING_METRICS_LISTENER_EMPTY_EXECUTORS);
+    this.maxBlacklistedNodes =
+        HiveConf.getIntVar(conf, HiveConf.ConfVars.LLAP_TASK_SCHEDULER_BLACKLISTING_METRICS_LISTENER_MAX_LISTED_NODES);
+
+    Preconditions.checkArgument(minServedTasksNumber > 0,
+        "Minimum served tasks should be greater than 0");
+    Preconditions.checkArgument(minConfigChangeDelayMs > 0,
+        "Minimum config change delay should be greater than 0");
+    Preconditions.checkArgument(timeThreshold > 1.0f,
+        "The time threshold should be greater than 1");
+    Preconditions.checkArgument(maxBlacklistedNodes > 0,
+        "The maximum number of blacklisted node should be greater than 1");
+
+    LOG.debug("BlacklistingLlapMetricsListener initialized with " +
+                  "minServedTasksNumber={}, " +
+                  "minConfigChangeDelayMs={}, " +
+                  "timeThreshold={}, " +
+                  "emptyExecutorsThreshold={}, " +
+                  "maxBlacklistedNodes={}",
+        minServedTasksNumber, minConfigChangeDelayMs, timeThreshold, emptyExecutorsThreshold,
maxBlacklistedNodes);
+  }
+
+  @Override
+  public void init(Configuration conf, LlapRegistryService registry) {
+    init(conf, registry, LlapManagementProtocolClientImplFactory.basicInstance(conf));
+  }
+
+  @Override
+  public void newDaemonMetrics(String workerIdentity, LlapMetricsCollector.LlapMetrics newMetrics)
{
+    // no op
+  }
+
+  @Override
+  public void newClusterMetrics(Map<String, LlapMetricsCollector.LlapMetrics> newMetrics)
{
+    long sumAverageTime = 0;
+    long sumEmptyExecutors = 0;
+    long maxAverageTime = 0;
+    long maxAverageTimeEmptyExecutors = 0;
+    long maxAverageTimeMaxExecutors = 0;
+    long workerNum = 0;
+    int blacklistedNodes = 0;
+    String maxAverageTimeIdentity = null;
+    for (String workerIdentity : newMetrics.keySet()) {
+      Map<String, Long> metrics = newMetrics.get(workerIdentity).getMetrics();
+      long requestHandled = metrics.get(LlapDaemonExecutorInfo.ExecutorTotalRequestsHandled.name());
+      long averageTime = metrics.get(LlapDaemonExecutorInfo.AverageResponseTime.name());
+      long emptyExecutor =
+          metrics.get(LlapDaemonExecutorInfo.ExecutorNumExecutorsAvailableAverage.name());
+      long maxExecutors = metrics.get(LlapDaemonExecutorInfo.ExecutorNumExecutorsPerInstance.name());
+
+      LOG.debug("Checking node {} with data: " +
+                    "requestHandled={}, " +
+                    "averageTime={}, " +
+                    "emptyExecutors={}, " +
+                    "maxExecutors={}",
+          workerIdentity, requestHandled, averageTime, emptyExecutor, maxExecutors);
+
+      if (maxExecutors == 0) {
+        blacklistedNodes++;
+        if (blacklistedNodes >= this.maxBlacklistedNodes) {
+          LOG.debug("Already too many blacklisted nodes. Skipping.");
+          return;
+        }
+      }
+
+      if (requestHandled > this.minServedTasksNumber) {
+        workerNum++;
+        sumAverageTime += averageTime;
+        if ( averageTime > maxAverageTime) {
+          maxAverageTime = averageTime;
+          maxAverageTimeEmptyExecutors = emptyExecutor;
+          maxAverageTimeMaxExecutors = maxExecutors;
+          maxAverageTimeIdentity = workerIdentity;
+        }
+        sumEmptyExecutors += emptyExecutor;
+      }
+    }
+
+    // If we do not have enough data then return.
+    if (workerNum < 2) {
+      return;
+    }
+
+    LOG.debug("Found slowest node {} with data: " +
+                  "sumAverageTime={}, " +
+                  "sumEmptyExecutors={}, " +
+                  "maxAverageTime={}, " +
+                  "maxAverageTimeEmptyExecutors={}, " +
+                  "maxAverageTimeMaxExecutors={}, " +
+                  "workerNum={}, " +
+                  "maxAverageTimeIdentity={}, " +
+                  "blacklistedNodes={}",
+        sumAverageTime, sumEmptyExecutors, maxAverageTime, maxAverageTimeEmptyExecutors,
+        maxAverageTimeMaxExecutors, workerNum, maxAverageTimeIdentity, blacklistedNodes);
+    // Check if the slowest node is at least timeThreshold times slower than the average
+    long averageTimeWithoutSlowest = (sumAverageTime - maxAverageTime) / (workerNum - 1);
+    if (averageTimeWithoutSlowest * this.timeThreshold < maxAverageTime) {
+      // We have a candidate, let's see if we have enough empty executors.
+      long emptyExecutorsWithoutSlowest = sumEmptyExecutors - maxAverageTimeEmptyExecutors;
+      if (emptyExecutorsWithoutSlowest > maxAverageTimeMaxExecutors * this.emptyExecutorsThreshold)
{
+        // Seems like a good candidate, let's try to blacklist
+        try {
+          LOG.debug("Trying to blacklist node: " + maxAverageTimeIdentity);
+          setCapacity(maxAverageTimeIdentity, 0, 0);
+        } catch (Throwable t) {
+          LOG.debug("Can not blacklist node: " + maxAverageTimeIdentity, t);
+        }
+      }
+    }
+  }
+
+  protected void setCapacity(String workerIdentity, int newExecutorNum, int newWaitQueueSize)
+      throws IOException, ServiceException {
+    long currentTime = System.currentTimeMillis();
+    if (currentTime > nextCheckTime) {
+      LlapZookeeperRegistryImpl.ConfigChangeLockResult lockResult =
 
 Review comment:
   Here is the code that opens a window for more blacklisted nodes than the maximum. If multiple
AM come to the conclusion to blacklist (different) nodes at the same time, they all might
still have -1L in nextCheckTime and therefore go on, trying to call lockForConfigChange. If
they are executing in the right order (increasing time), the lock will be successful because
it only verifies that the next value is higher than the previous and we will blacklist potentially
more daemons than allowed.
   Instead, we need to
   1) fetch the current value of the distributed long.
   2) compare our current timestamp against that and skip if it is smaller.
   A local, cached nextCheckTime is unnecessary and can lead to false positives....
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 271775)
    Time Spent: 1h 50m  (was: 1h 40m)

> Implement BlacklistingLlapMetricsListener
> -----------------------------------------
>
>                 Key: HIVE-21912
>                 URL: https://issues.apache.org/jira/browse/HIVE-21912
>             Project: Hive
>          Issue Type: Sub-task
>          Components: llap, Tez
>            Reporter: Peter Vary
>            Assignee: Peter Vary
>            Priority: Major
>              Labels: pull-request-available
>         Attachments: HIVE-21912.patch, HIVE-21912.wip-2.patch, HIVE-21912.wip.patch
>
>          Time Spent: 1h 50m
>  Remaining Estimate: 0h
>
> We should implement a DaemonStatisticsHandler which:
>  * If a node average response time is bigger than 150% (configurable) of the other nodes
>  * If the other nodes has enough empty executors to handle the requests
> Then disables the limping node.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message