ambari-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From avija...@apache.org
Subject [2/3] ambari git commit: AMBARI-18487 : Test and refine Collector writes w.r.t sharing and timeouts. (avijayan)
Date Tue, 04 Oct 2016 18:32:43 GMT
AMBARI-18487 : Test and refine Collector writes w.r.t sharing and timeouts. (avijayan)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/c10053b0
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/c10053b0
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/c10053b0

Branch: refs/heads/trunk
Commit: c10053b086ae1b4f41c3d4cdba18349144ef7ec4
Parents: 06f3b8e
Author: Aravindan Vijayan <avijayan@hortonworks.com>
Authored: Tue Oct 4 11:10:52 2016 -0700
Committer: Aravindan Vijayan <avijayan@hortonworks.com>
Committed: Tue Oct 4 11:10:52 2016 -0700

----------------------------------------------------------------------
 .../timeline/AbstractTimelineMetricsSink.java   | 75 +++++++++++++-------
 .../availability/MetricCollectorHAHelper.java   |  9 +--
 .../src/main/python/core/blacklisted_set.py     | 14 ++++
 .../src/main/python/core/config_reader.py       |  3 +-
 .../src/main/python/core/emitter.py             | 74 +++++++++++++------
 5 files changed, 122 insertions(+), 53 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/c10053b0/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java
b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java
index fa5b694..efa5cba 100644
--- a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java
+++ b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java
@@ -55,6 +55,8 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Random;
 import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -78,8 +80,9 @@ public abstract class AbstractTimelineMetricsSink {
 
   protected static final AtomicInteger failedCollectorConnectionsCounter = new AtomicInteger(0);
   public static int NUMBER_OF_SKIPPED_COLLECTOR_EXCEPTIONS = 100;
-  public int ZK_CONNECT_TRY_TIME = 10000;
+  public int ZK_CONNECT_TRY_COUNT = 10;
   public int ZK_SLEEP_BETWEEN_RETRY_TIME = 2000;
+  public boolean shardExpired = true;
 
   private SSLSocketFactory sslSocketFactory;
 
@@ -95,7 +98,7 @@ public abstract class AbstractTimelineMetricsSink {
   // well as timed refresh
   protected Supplier<String> targetCollectorHostSupplier;
 
-  protected final List<String> allKnownLiveCollectors = new ArrayList<>();
+  protected final SortedSet<String> allKnownLiveCollectors = new TreeSet<>();
 
   private volatile boolean isInitializedForHA = false;
 
@@ -127,7 +130,7 @@ public abstract class AbstractTimelineMetricsSink {
   protected void init() {
     metricSinkWriteShardStrategy = new MetricSinkWriteShardHostnameHashingStrategy(getHostname());
     collectorHAHelper = new MetricCollectorHAHelper(getZookeeperQuorum(),
-      ZK_CONNECT_TRY_TIME, ZK_SLEEP_BETWEEN_RETRY_TIME);
+      ZK_CONNECT_TRY_COUNT, ZK_SLEEP_BETWEEN_RETRY_TIME);
     isInitializedForHA = true;
   }
 
@@ -204,6 +207,8 @@ public abstract class AbstractTimelineMetricsSink {
       collectorHost = targetCollectorHostSupplier.get();
       // Last X attempts have failed - force refresh
       if (failedCollectorConnectionsCounter.get() > RETRY_COUNT_BEFORE_COLLECTOR_FAILOVER)
{
+        LOG.info("Removing collector " + collectorHost + " from allKnownLiveCollectors.");
+        allKnownLiveCollectors.remove(collectorHost);
         targetCollectorHostSupplier = null;
         collectorHost = findPreferredCollectHost();
       }
@@ -321,6 +326,7 @@ public abstract class AbstractTimelineMetricsSink {
       init();
     }
 
+    shardExpired = false;
     // Auto expire and re-calculate after 1 hour
     if (targetCollectorHostSupplier != null) {
       String targetCollector = targetCollectorHostSupplier.get();
@@ -329,32 +335,12 @@ public abstract class AbstractTimelineMetricsSink {
       }
     }
 
-    Collection<String> collectorHosts = getConfiguredCollectorHosts();
-
-    LOG.debug("Trying to find live collector host from : " + collectorHosts);
     // Reach out to all configured collectors before Zookeeper
-    if (collectorHosts != null && !collectorHosts.isEmpty()) {
-      for (String hostStr : collectorHosts) {
-        hostStr = hostStr.trim();
-        if (!hostStr.isEmpty()) {
-          try {
-            Collection<String> liveHosts = findLiveCollectorHostsFromKnownCollector(hostStr,
getCollectorPort());
-            // Update live Hosts - current host will already be a part of this
-            for (String host : liveHosts) {
-              allKnownLiveCollectors.add(host);
-            }
-            break; // Found at least 1 live collector
-          } catch (MetricCollectorUnavailableException e) {
-            LOG.info("Collector " + hostStr + " is not longer live. Removing " +
-              "it from list of know live collector hosts : " + allKnownLiveCollectors);
-            allKnownLiveCollectors.remove(hostStr);
-          }
-        }
-      }
-    }
+    refreshCollectorsFromConfigured();
 
     // Lookup Zookeeper for live hosts - max 10 seconds wait time
     if (allKnownLiveCollectors.size() == 0 && getZookeeperQuorum() != null) {
+      LOG.info("No live collectors from configuration. Requesting zookeeper...");
       allKnownLiveCollectors.addAll(collectorHAHelper.findLiveCollectorHostsFromZNode());
     }
 
@@ -363,6 +349,13 @@ public abstract class AbstractTimelineMetricsSink {
         new Supplier<String>() {
           @Override
           public String get() {
+            //shardExpired flag is used to determine if the Supplier.get() is invoked through
the
+            // findPreferredCollectHost method (No need to refresh collector hosts
+            // OR
+            // through Expiry (Refresh needed to pick up dead collectors that might have
not become alive).
+            if (shardExpired) {
+              refreshCollectorsFromConfigured();
+            }
             return metricSinkWriteShardStrategy.findCollectorShard(new ArrayList<>(allKnownLiveCollectors));
           }
         },  // random.nextInt(max - min + 1) + min # (60 to 75 minutes)
@@ -372,12 +365,40 @@ public abstract class AbstractTimelineMetricsSink {
         TimeUnit.MINUTES
       );
 
-      return targetCollectorHostSupplier.get();
+      String collectorHost = targetCollectorHostSupplier.get();
+      shardExpired = true;
+      return collectorHost;
     }
     LOG.warn("Couldn't find any live collectors. Returning null");
+    shardExpired = true;
     return null;
   }
 
+  private void refreshCollectorsFromConfigured() {
+    Collection<String> collectorHosts = getConfiguredCollectorHosts();
+
+    LOG.debug("Trying to find live collector host from : " + collectorHosts);
+    if (collectorHosts != null && !collectorHosts.isEmpty()) {
+      for (String hostStr : collectorHosts) {
+        hostStr = hostStr.trim();
+        if (!hostStr.isEmpty()) {
+          try {
+            Collection<String> liveHosts = findLiveCollectorHostsFromKnownCollector(hostStr,
getCollectorPort());
+            // Update live Hosts - current host will already be a part of this
+            for (String host : liveHosts) {
+              allKnownLiveCollectors.add(host);
+            }
+            break; // Found at least 1 live collector
+          } catch (MetricCollectorUnavailableException e) {
+            LOG.info("Collector " + hostStr + " is not longer live. Removing " +
+              "it from list of know live collector hosts : " + allKnownLiveCollectors);
+            allKnownLiveCollectors.remove(hostStr);
+          }
+        }
+      }
+    }
+  }
+
   Collection<String> findLiveCollectorHostsFromKnownCollector(String host, String port)
throws MetricCollectorUnavailableException {
     List<String> collectors = new ArrayList<>();
     HttpURLConnection connection = null;
@@ -426,7 +447,7 @@ public abstract class AbstractTimelineMetricsSink {
       LOG.debug(errorMessage);
       LOG.debug(ioe);
       String warnMsg = "Unable to connect to collector to find live nodes.";
-      LOG.warn(warnMsg, ioe);
+      LOG.warn(warnMsg);
       throw new MetricCollectorUnavailableException(warnMsg);
     }
     return collectors;

http://git-wip-us.apache.org/repos/asf/ambari/blob/c10053b0/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/availability/MetricCollectorHAHelper.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/availability/MetricCollectorHAHelper.java
b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/availability/MetricCollectorHAHelper.java
index 4d0ec14..2254362 100644
--- a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/availability/MetricCollectorHAHelper.java
+++ b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/availability/MetricCollectorHAHelper.java
@@ -22,6 +22,7 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.curator.CuratorZookeeperClient;
 import org.apache.curator.RetryLoop;
 import org.apache.curator.RetryPolicy;
+import org.apache.curator.retry.BoundedExponentialBackoffRetry;
 import org.apache.curator.retry.RetryUntilElapsed;
 import org.apache.zookeeper.ZooKeeper;
 
@@ -38,7 +39,7 @@ import java.util.concurrent.Callable;
  */
 public class MetricCollectorHAHelper {
   private final String zookeeperQuorum;
-  private final int tryTime;
+  private final int tryCount;
   private final int sleepMsBetweenRetries;
 
   private static final int CONNECTION_TIMEOUT = 2000;
@@ -50,9 +51,9 @@ public class MetricCollectorHAHelper {
 
   private static final Log LOG = LogFactory.getLog(MetricCollectorHAHelper.class);
 
-  public MetricCollectorHAHelper(String zookeeperQuorum, int tryTime, int sleepMsBetweenRetries)
{
+  public MetricCollectorHAHelper(String zookeeperQuorum, int tryCount, int sleepMsBetweenRetries)
{
     this.zookeeperQuorum = zookeeperQuorum;
-    this.tryTime = tryTime;
+    this.tryCount = tryCount;
     this.sleepMsBetweenRetries = sleepMsBetweenRetries;
   }
 
@@ -63,7 +64,7 @@ public class MetricCollectorHAHelper {
   public Collection<String> findLiveCollectorHostsFromZNode() {
     Set<String> collectors = new HashSet<>();
 
-    RetryPolicy retryPolicy = new RetryUntilElapsed(tryTime, sleepMsBetweenRetries);
+    RetryPolicy retryPolicy = new BoundedExponentialBackoffRetry(sleepMsBetweenRetries, 10*sleepMsBetweenRetries,
tryCount);
     final CuratorZookeeperClient client = new CuratorZookeeperClient(zookeeperQuorum,
       SESSION_TIMEOUT, CONNECTION_TIMEOUT, null, retryPolicy);
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/c10053b0/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/blacklisted_set.py
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/blacklisted_set.py
b/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/blacklisted_set.py
index 3982c4e..dab54c0 100644
--- a/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/blacklisted_set.py
+++ b/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/blacklisted_set.py
@@ -41,6 +41,20 @@ class BlacklistedSet(set):
       if time.time() > self.__dict.get(item):
         yield item
 
+  def get_actual_size(self):
+    size = 0
+    for item in self.__iter__():
+      size += 1
+    return size
+
+  def get_item_at_index(self, index):
+    i = 0
+    for item in self.__iter__():
+      if i == index:
+        return item
+      i += 1
+    return None
+
   def blacklist(self, item):
     self.__dict[item] = time.time() + self.__blacklist_timeout
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/c10053b0/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/config_reader.py
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/config_reader.py
b/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/config_reader.py
index 3ca3a31..890d3ce 100644
--- a/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/config_reader.py
+++ b/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/config_reader.py
@@ -101,6 +101,7 @@ AMBARI_AGENT_CONF = '/etc/ambari-agent/conf/ambari-agent.ini'
 config_content = """
 [default]
 debug_level = INFO
+hostname = localhost
 metrics_servers = ['localhost','host1','host2']
 enable_time_threshold = false
 enable_value_threshold = false
@@ -112,7 +113,7 @@ send_interval = 60
 collector_sleep_interval = 5
 max_queue_size = 5000
 failover_strategy = round-robin
-failover_strategy_blacklisted_interval_seconds = 0
+failover_strategy_blacklisted_interval_seconds = 60
 host = localhost
 port = 6188
 https_enabled = false

http://git-wip-us.apache.org/repos/asf/ambari/blob/c10053b0/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/emitter.py
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/emitter.py
b/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/emitter.py
index 050af16..ba3f18e 100644
--- a/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/emitter.py
+++ b/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/emitter.py
@@ -39,6 +39,8 @@ class Emitter(threading.Thread):
     logger.debug('Initializing Emitter thread.')
     self.lock = threading.Lock()
     self.send_interval = config.get_send_interval()
+    self.hostname = config.get_hostname_config()
+    self.hostname_hash = self.compute_hash(self.hostname)
     self._stop_handler = stop_handler
     self.application_metric_map = application_metric_map
     self.collector_port = config.get_server_port()
@@ -80,30 +82,42 @@ class Emitter(threading.Thread):
     self.push_metrics(json_data)
 
   def push_metrics(self, data):
+    success = False
+    while self.active_collector_hosts.get_actual_size() > 0:
+      collector_host = self.get_collector_host_shard()
+      success = self.try_with_collector_host(collector_host, data)
+      if success:
+        break
+    pass
+
+    if not success:
+      logger.info('No valid collectors found...')
+      for collector_host in self.active_collector_hosts:
+        success = self.try_with_collector_host(collector_host, data)
+      pass
+
+  def try_with_collector_host(self, collector_host, data):
     headers = {"Content-Type" : "application/json", "Accept" : "*/*"}
-    for collector_host in self.active_collector_hosts:
-      connection = self.get_connection(collector_host)
-      logger.info("server: %s" % collector_host)
-      logger.debug("message to sent: %s" % data)
-
-      retry_count = 0
-      while retry_count < self.MAX_RETRY_COUNT:
-        response = self.get_response_from_submission(connection, data, headers)
-        if response and response.status == 200:
-          return
-        else:
-          logger.warn("Retrying after {0} ...".format(self.RETRY_SLEEP_INTERVAL))
-          retry_count += 1
-          #Wait for the service stop event instead of sleeping blindly
-          if 0 == self._stop_handler.wait(self.RETRY_SLEEP_INTERVAL):
-            return
-        pass
-
-      if retry_count >= self.MAX_RETRY_COUNT:
-        self.active_collector_hosts.blacklist(collector_host)
-        logger.warn("Metric collector host {0} was blacklisted.".format(collector_host))
+    connection = self.get_connection(collector_host)
+    logger.debug("message to send: %s" % data)
+    retry_count = 0
+    while retry_count < self.MAX_RETRY_COUNT:
+      response = self.get_response_from_submission(connection, data, headers)
+      if response and response.status == 200:
+        return True
+      else:
+        logger.warn("Retrying after {0} ...".format(self.RETRY_SLEEP_INTERVAL))
+        retry_count += 1
+        #Wait for the service stop event instead of sleeping blindly
+        if 0 == self._stop_handler.wait(self.RETRY_SLEEP_INTERVAL):
+          return True
     pass
 
+    if retry_count >= self.MAX_RETRY_COUNT:
+      self.active_collector_hosts.blacklist(collector_host)
+      logger.warn("Metric collector host {0} was blacklisted.".format(collector_host))
+      return False
+
   def get_connection(self, collector_host):
     timeout = int(self.send_interval - 10)
     if self.is_server_https_enabled:
@@ -130,3 +144,21 @@ class Emitter(threading.Thread):
       logger.warn('Error sending metrics to server. %s' % str(e))
       return None
 
+  def get_collector_host_shard(self):
+    size = self.active_collector_hosts.get_actual_size()
+    index = self.hostname_hash % size
+    index = index if index >= 0 else index + size
+    hostname = self.active_collector_hosts.get_item_at_index(index)
+    logger.info('Calculated collector shard based on hostname : %s' % hostname)
+    return hostname
+
+  def compute_hash(self, hostname):
+    hash = 11987
+    length = len(hostname)
+    for i in xrange(0, length - 1):
+      hash = 31*hash + ord(hostname[i])
+    return hash
+
+
+
+


Mime
View raw message