accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From els...@apache.org
Subject git commit: ACCUMULO-3054 Create a metric for overall replication "health"
Date Sat, 09 Aug 2014 03:05:08 GMT
Repository: accumulo
Updated Branches:
  refs/heads/master 9b95011be -> 3c7b3da04


ACCUMULO-3054 Create a metric for overall replication "health"


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/3c7b3da0
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/3c7b3da0
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/3c7b3da0

Branch: refs/heads/master
Commit: 3c7b3da047f508647506d8b67f3b44981e137f4f
Parents: 9b95011
Author: Josh Elser <elserj@apache.org>
Authored: Fri Aug 8 23:04:28 2014 -0400
Committer: Josh Elser <elserj@apache.org>
Committed: Fri Aug 8 23:04:28 2014 -0400

----------------------------------------------------------------------
 .../server/replication/ReplicationUtil.java     | 16 ++++++++
 .../master/metrics/ReplicationMetrics.java      | 40 ++++++++++++++++++++
 .../master/metrics/ReplicationMetricsMBean.java | 26 +++++++++++--
 .../monitor/servlets/ReplicationServlet.java    | 32 +++++++++++++---
 4 files changed, 106 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/3c7b3da0/server/base/src/main/java/org/apache/accumulo/server/replication/ReplicationUtil.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/replication/ReplicationUtil.java
b/server/base/src/main/java/org/apache/accumulo/server/replication/ReplicationUtil.java
index 164df69..9f59b23 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/replication/ReplicationUtil.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/replication/ReplicationUtil.java
@@ -25,6 +25,7 @@ import java.util.NoSuchElementException;
 import java.util.Set;
 
 import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
 import org.apache.accumulo.core.client.BatchScanner;
 import org.apache.accumulo.core.client.Connector;
 import org.apache.accumulo.core.client.Scanner;
@@ -36,6 +37,7 @@ import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Range;
 import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.master.thrift.MasterMonitorInfo;
 import org.apache.accumulo.core.metadata.MetadataTable;
 import org.apache.accumulo.core.metadata.RootTable;
 import org.apache.accumulo.core.replication.ReplicationConstants;
@@ -68,6 +70,20 @@ public class ReplicationUtil {
     this.zooCache = cache;
   }
 
+  public int getMaxReplicationThreads(Map<String,String> systemProperties, MasterMonitorInfo
mmi) {
+    int activeTservers = mmi.getTServerInfoSize();
+
+    // The number of threads each tserver will use at most to replicate data
+    int replicationThreadsPerServer = Integer.parseInt(systemProperties.get(Property.REPLICATION_WORKER_THREADS.getKey()));
+
+    // The total number of "slots" we have to replicate data
+    return activeTservers * replicationThreadsPerServer;
+  }
+
+  public int getMaxReplicationThreads(Connector conn, MasterMonitorInfo mmi) throws AccumuloException,
AccumuloSecurityException {
+    return getMaxReplicationThreads(conn.instanceOperations().getSystemConfiguration(), mmi);
+  }
+
   /**
    * Extract replication peers from system configuration
    * @param systemProperties System properties, typically from Connector.instanceOperations().getSystemConfiguration()

http://git-wip-us.apache.org/repos/asf/accumulo/blob/3c7b3da0/server/master/src/main/java/org/apache/accumulo/master/metrics/ReplicationMetrics.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/metrics/ReplicationMetrics.java
b/server/master/src/main/java/org/apache/accumulo/master/metrics/ReplicationMetrics.java
index eae6c23..06a653d 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/metrics/ReplicationMetrics.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/metrics/ReplicationMetrics.java
@@ -26,10 +26,16 @@ import org.apache.accumulo.core.client.AccumuloException;
 import org.apache.accumulo.core.client.AccumuloSecurityException;
 import org.apache.accumulo.core.client.Connector;
 import org.apache.accumulo.core.client.admin.TableOperations;
+import org.apache.accumulo.core.client.impl.MasterClient;
+import org.apache.accumulo.core.master.thrift.MasterClientService;
+import org.apache.accumulo.core.master.thrift.MasterMonitorInfo;
 import org.apache.accumulo.core.replication.ReplicationConstants;
 import org.apache.accumulo.core.replication.ReplicationTarget;
+import org.apache.accumulo.server.client.HdfsZooInstance;
 import org.apache.accumulo.server.metrics.AbstractMetricsImpl;
 import org.apache.accumulo.server.replication.ReplicationUtil;
+import org.apache.accumulo.server.security.SystemCredentials;
+import org.apache.accumulo.trace.instrument.Tracer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -110,6 +116,40 @@ public class ReplicationMetrics extends AbstractMetricsImpl implements
Replicati
   }
 
   @Override
+  public int getMaxReplicationThreads() {
+    MasterMonitorInfo mmi = null;
+    for (int i = 0; i < 10; i++) {
+      MasterClientService.Iface client = null;
+      try {
+        client = MasterClient.getConnection(HdfsZooInstance.getInstance());
+        if (client != null) {
+          mmi = client.getMasterStats(Tracer.traceInfo(), SystemCredentials.get().toThrift(HdfsZooInstance.getInstance()));
+          break;
+        }
+      } catch (Exception e) {
+        log.debug("Error fetching stats: " + e);
+      } finally {
+        if (client != null) {
+          MasterClient.close(client);
+        }
+      }
+    }
+
+    if (null != mmi) {
+      try {
+        return replicationUtil.getMaxReplicationThreads(conn, mmi);
+      } catch (AccumuloException e) {
+        log.warn("Failed to fetch replication work queue size", e);
+      } catch (AccumuloSecurityException e) {
+        log.warn("Failed to fetch replication work queue size", e);
+      }
+    }
+
+    log.warn("Could not fetch metrics information from Master");
+    return -1;
+  }
+
+  @Override
   protected ObjectName getObjectName() {
     return objectName;
   }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/3c7b3da0/server/master/src/main/java/org/apache/accumulo/master/metrics/ReplicationMetricsMBean.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/metrics/ReplicationMetricsMBean.java
b/server/master/src/main/java/org/apache/accumulo/master/metrics/ReplicationMetricsMBean.java
index 2430466..84f8142 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/metrics/ReplicationMetricsMBean.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/metrics/ReplicationMetricsMBean.java
@@ -20,9 +20,29 @@ package org.apache.accumulo.master.metrics;
  * 
  */
 public interface ReplicationMetricsMBean {
-
+  
+  /**
+   * A system may have multiple Replication targets, each of which have a queue of files
to be replicated. This returns the sum across all targets, not
+   * de-duplicating files.
+   * 
+   * @return The number of files pending replication across all targets
+   */
   public int getNumFilesPendingReplication();
-
+  
+  /**
+   * The total number of threads available to replicate data to peers. Each TabletServer
has a number of threads devoted to replication, so this value is
+   * affected by the number of currently active TabletServers.
+   * 
+   * @return The number of threads available to replicate data across the instance
+   */
+  public int getMaxReplicationThreads();
+  
+  /**
+   * Peers are systems which data can be replicated to. This is the number of peers that
are defined, but this is not necessarily the number of peers which are
+   * actively being replicated to.
+   * 
+   * @return The number of peers/targets which are defined for data to be replicated to.
+   */
   public int getNumConfiguredPeers();
-
+  
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/3c7b3da0/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/ReplicationServlet.java
----------------------------------------------------------------------
diff --git a/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/ReplicationServlet.java
b/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/ReplicationServlet.java
index 896ac20..7a7941a 100644
--- a/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/ReplicationServlet.java
+++ b/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/ReplicationServlet.java
@@ -26,10 +26,13 @@ import javax.servlet.http.HttpServletResponse;
 import org.apache.accumulo.core.client.Connector;
 import org.apache.accumulo.core.client.Instance;
 import org.apache.accumulo.core.client.admin.TableOperations;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.master.thrift.MasterMonitorInfo;
 import org.apache.accumulo.core.replication.ReplicationConstants;
 import org.apache.accumulo.core.replication.ReplicationTarget;
 import org.apache.accumulo.core.security.Credentials;
 import org.apache.accumulo.core.zookeeper.ZooUtil;
+import org.apache.accumulo.monitor.Monitor;
 import org.apache.accumulo.monitor.util.Table;
 import org.apache.accumulo.monitor.util.celltypes.NumberType;
 import org.apache.accumulo.server.client.HdfsZooInstance;
@@ -63,9 +66,14 @@ public class ReplicationServlet extends BasicServlet {
   
   @Override
   protected void pageBody(HttpServletRequest req, HttpServletResponse response, StringBuilder
sb) throws Exception {
-    Instance inst = HdfsZooInstance.getInstance();
-    Credentials creds = SystemCredentials.get();
-    Connector conn = inst.getConnector(creds.getPrincipal(), creds.getToken());
+    final Instance inst = HdfsZooInstance.getInstance();
+    final Credentials creds = SystemCredentials.get();
+    final Connector conn = inst.getConnector(creds.getPrincipal(), creds.getToken());
+    final Map<String,String> systemProps = conn.instanceOperations().getSystemConfiguration();
+    final MasterMonitorInfo mmi = Monitor.getMmi();
+
+    // The total number of "slots" we have to replicate data
+    int totalWorkQueueSize = replicationUtil.getMaxReplicationThreads(systemProps, mmi);
 
     TableOperations tops = conn.tableOperations();
     if (!tops.exists(ReplicationConstants.TABLE_NAME)) {
@@ -80,7 +88,7 @@ public class ReplicationServlet extends BasicServlet {
     replicationStats.addSortableColumn("ReplicaSystem Type");
     replicationStats.addSortableColumn("Files needing replication", new NumberType<Long>(),
null);
 
-    Map<String,String> peers = replicationUtil.getPeers(conn.instanceOperations().getSystemConfiguration());
+    Map<String,String> peers = replicationUtil.getPeers(systemProps);
 
     // The total set of configured targets
     Set<ReplicationTarget> allConfiguredTargets = replicationUtil.getReplicationTargets(tops);
@@ -91,6 +99,7 @@ public class ReplicationServlet extends BasicServlet {
     Map<String,String> tableNameToId = tops.tableIdMap();
     Map<String,String> tableIdToName = replicationUtil.invert(tableNameToId);
 
+    long filesPendingOverAllTargets = 0l;
     for (ReplicationTarget configuredTarget : allConfiguredTargets) {
       String tableName = tableIdToName.get(configuredTarget.getSourceTableId());
       if (null == tableName) {
@@ -106,9 +115,22 @@ public class ReplicationServlet extends BasicServlet {
 
       Long numFiles = targetCounts.get(configuredTarget);
 
-      replicationStats.addRow(tableName, configuredTarget.getPeerName(), configuredTarget.getRemoteIdentifier(),
replicaSystemClass, (null == numFiles) ? 0 : numFiles); 
+      if (null == numFiles) {
+        replicationStats.addRow(tableName, configuredTarget.getPeerName(), configuredTarget.getRemoteIdentifier(),
replicaSystemClass, 0); 
+      } else {
+        replicationStats.addRow(tableName, configuredTarget.getPeerName(), configuredTarget.getRemoteIdentifier(),
replicaSystemClass, numFiles);
+        filesPendingOverAllTargets += numFiles;
+      }
     }
 
+    // Up to 2x the number of slots for replication available, WARN
+    // More than 2x the number of slots for replication available, ERROR
+    NumberType<Long> filesPendingFormat = new NumberType<Long>(Long.valueOf(0),
Long.valueOf(2 * totalWorkQueueSize), Long.valueOf(0), Long.valueOf(4 * totalWorkQueueSize));
+
+    String utilization = filesPendingFormat.format(filesPendingOverAllTargets);
+
+    sb.append("<div><center><br/><span class=\"table-caption\">Total
files pending replication: ").append(utilization).append("</span></center></div>");
+
     replicationStats.generate(req, sb);
 
     // Make a table for the replication data in progress


Mime
View raw message