accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From els...@apache.org
Subject git commit: ACCUMULO-3040 Create a JXM MBean for high-level Replication Metrics
Date Tue, 05 Aug 2014 20:00:55 GMT
Repository: accumulo
Updated Branches:
  refs/heads/master f5984b35a -> 84100142e


ACCUMULO-3040 Create a JXM MBean for high-level Replication Metrics

Consolidates creation of replication metrics into a class that both
the replication servlet for the monitor and the JMX endpoint can
reuse.


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/84100142
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/84100142
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/84100142

Branch: refs/heads/master
Commit: 84100142e56e011b39089430b19d334f9d5d0e96
Parents: f5984b3
Author: Josh Elser <elserj@apache.org>
Authored: Tue Aug 5 12:10:40 2014 -0400
Committer: Josh Elser <elserj@apache.org>
Committed: Tue Aug 5 15:50:19 2014 -0400

----------------------------------------------------------------------
 .../server/replication/ReplicationUtil.java     | 257 +++++++++++++++++++
 .../java/org/apache/accumulo/master/Master.java |  11 +
 .../master/metrics/ReplicationMetrics.java      | 122 +++++++++
 .../master/metrics/ReplicationMetricsMBean.java |  28 ++
 .../monitor/servlets/ReplicationServlet.java    | 162 +-----------
 5 files changed, 432 insertions(+), 148 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/84100142/server/base/src/main/java/org/apache/accumulo/server/replication/ReplicationUtil.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/replication/ReplicationUtil.java
b/server/base/src/main/java/org/apache/accumulo/server/replication/ReplicationUtil.java
new file mode 100644
index 0000000..164df69
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/replication/ReplicationUtil.java
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.replication;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.NoSuchElementException;
+import java.util.Set;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.BatchScanner;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.client.admin.TableOperations;
+import org.apache.accumulo.core.client.replication.ReplicaSystem;
+import org.apache.accumulo.core.client.replication.ReplicaSystemFactory;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.metadata.MetadataTable;
+import org.apache.accumulo.core.metadata.RootTable;
+import org.apache.accumulo.core.replication.ReplicationConstants;
+import org.apache.accumulo.core.replication.ReplicationSchema.WorkSection;
+import org.apache.accumulo.core.replication.ReplicationTarget;
+import org.apache.accumulo.core.replication.StatusUtil;
+import org.apache.accumulo.core.replication.proto.Replication.Status;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.server.zookeeper.ZooCache;
+import org.apache.hadoop.io.Text;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Maps;
+import com.google.protobuf.InvalidProtocolBufferException;
+
+public class ReplicationUtil {
+  private static final Logger log = LoggerFactory.getLogger(ReplicationUtil.class);
+
+  private static final String REPLICATION_TARGET_PREFIX = Property.TABLE_REPLICATION_TARGET.getKey();
+
+  private ZooCache zooCache;
+
+  public ReplicationUtil() {
+    this(new ZooCache());
+  }
+
+  public ReplicationUtil(ZooCache cache) {
+    this.zooCache = cache;
+  }
+
+  /**
+   * Extract replication peers from system configuration
+   * @param systemProperties System properties, typically from Connector.instanceOperations().getSystemConfiguration()
+   * @return Configured replication peers
+   */
+  public Map<String,String> getPeers(Map<String,String> systemProperties) {
+    Map<String,String> peers = new HashMap<>();
+    String definedPeersPrefix = Property.REPLICATION_PEERS.getKey();
+
+    // Get the defined peers and what ReplicaSystem impl they're using
+    for (Entry<String,String> property : systemProperties.entrySet()) {
+      String key = property.getKey();
+      // Filter out cruft that we don't want
+      if (key.startsWith(definedPeersPrefix) && !key.startsWith(Property.REPLICATION_PEER_USER.getKey())
&& !key.startsWith(Property.REPLICATION_PEER_PASSWORD.getKey())) {
+        String peerName = property.getKey().substring(definedPeersPrefix.length());
+        ReplicaSystem replica;
+        try {
+         replica = ReplicaSystemFactory.get(property.getValue());
+        } catch (Exception e) {
+          log.warn("Could not instantiate ReplicaSystem for {} with configuration {}", property.getKey(),
property.getValue(), e);
+          continue;
+        }
+
+        peers.put(peerName, replica.getClass().getName());
+      }
+    }
+
+    return peers;
+  }
+
+  public Set<ReplicationTarget> getReplicationTargets(TableOperations tops) {
+    // The total set of configured targets
+    final Set<ReplicationTarget> allConfiguredTargets = new HashSet<>();
+    final Map<String,String> tableNameToId = tops.tableIdMap();
+
+    for (String table : tops.list()) {
+      if (MetadataTable.NAME.equals(table) || RootTable.NAME.equals(table)) {
+        continue;
+      }
+      String localId = tableNameToId.get(table);
+      if (null == localId) {
+        log.trace("Could not determine ID for {}", table);
+        continue;
+      }
+
+      Iterable<Entry<String,String>> propertiesForTable;
+      try {
+        propertiesForTable = tops.getProperties(table);
+      } catch (AccumuloException e) {
+        log.debug("Could not fetch properties for " + table, e);
+        continue;
+      } catch (TableNotFoundException e) {
+        log.debug("Could not fetch properties for " + table, e);
+        continue;
+      }
+
+      for (Entry<String,String> prop : propertiesForTable) {
+        if (prop.getKey().startsWith(REPLICATION_TARGET_PREFIX)) {
+          String peerName = prop.getKey().substring(REPLICATION_TARGET_PREFIX.length());
+          String remoteIdentifier = prop.getValue();
+          ReplicationTarget target = new ReplicationTarget(peerName, remoteIdentifier, localId);
+
+          allConfiguredTargets.add(target);
+        }
+      }
+    }
+
+    return allConfiguredTargets;
+  }
+
+  public Map<ReplicationTarget,Long> getPendingReplications(Connector conn) {
+    final Map<ReplicationTarget,Long> counts = new HashMap<>();
+
+    // Read over the queued work
+    BatchScanner bs;
+    try {
+      bs = conn.createBatchScanner(ReplicationConstants.TABLE_NAME, Authorizations.EMPTY,
4);
+    } catch (TableNotFoundException e) {
+      log.debug("No replication table exists", e);
+      return counts;
+    }
+
+    bs.setRanges(Collections.singleton(new Range()));
+    WorkSection.limit(bs);
+    try {
+      Text buffer = new Text();
+      for (Entry<Key,Value> entry : bs) {
+        Key k = entry.getKey();
+        k.getColumnQualifier(buffer);
+        ReplicationTarget target = ReplicationTarget.from(buffer);
+
+        // TODO ACCUMULO-2835 once explicit lengths are tracked, we can give size-based estimates
instead of just file-based
+        Long count = counts.get(target);
+        if (null == count) {
+          counts.put(target, Long.valueOf(1l));
+        } else {
+          counts.put(target, count + 1);
+        }
+      }
+    } finally {
+      bs.close();
+    }
+
+    return counts;
+  }
+
+  /**
+   * Fetches the absolute path of the file to be replicated.
+   * @param conn Accumulo Connector
+   * @param workQueuePath Root path for the Replication WorkQueue
+   * @param queueKey The Replication work queue key
+   * @return The absolute path for the file, or null if the key is no longer in ZooKeeper
+   */
+  public String getAbsolutePath(Connector conn, String workQueuePath, String queueKey) {
+    byte[] data = zooCache.get(workQueuePath + "/" + queueKey);
+    if (null != data) {
+      return new String(data);
+    }
+
+    return null;
+  }
+
+  /**
+   * Compute a progress string for the replication of the given WAL
+   * @param conn Accumulo Connector
+   * @param path Absolute path to a WAL, or null
+   * @param target ReplicationTarget the WAL is being replicated to
+   * @return A status message for a file being replicated
+   */
+  public String getProgress(Connector conn, String path, ReplicationTarget target) {
+    // We could try to grep over the table, but without knowing the full file path, we
+    // can't find the status quickly
+    String status = "Unknown";
+    if (null != path) {
+      Scanner s;
+      try {
+        s = conn.createScanner(ReplicationConstants.TABLE_NAME, Authorizations.EMPTY);
+      } catch (TableNotFoundException e) {
+        log.debug("Replication table no long exists", e);
+        return status;
+      }
+
+      s.setRange(Range.exact(path));
+      s.fetchColumn(WorkSection.NAME, target.toText());
+
+      // Fetch the work entry for this item
+      Entry<Key,Value> kv = null;
+      try {
+        kv = Iterables.getOnlyElement(s);
+      } catch (NoSuchElementException e) {
+       log.trace("Could not find status of {} replicating to {}", path, target);
+       status = "Unknown";
+      } finally {
+        s.close();
+      }
+
+      // If we found the work entry for it, try to compute some progress
+      if (null != kv) {
+        try {
+          Status stat = Status.parseFrom(kv.getValue().get());
+          if (StatusUtil.isFullyReplicated(stat)) {
+            status = "Finished";
+          } else {
+            if (stat.getInfiniteEnd()) {
+              status = stat.getBegin() + "/&infin; records";
+            } else {
+              status = stat.getBegin() + "/" + stat.getEnd() + " records";
+            }
+          }
+        } catch (InvalidProtocolBufferException e) {
+          log.warn("Could not deserialize protobuf for {}", kv.getKey(), e);
+          status = "Unknown";
+        }
+      }
+    }
+
+    return status;
+  }
+
+  public Map<String,String> invert(Map<String,String> map) {
+    Map<String,String> newMap = Maps.newHashMapWithExpectedSize(map.size());
+    for(Entry<String,String> entry : map.entrySet()) {
+      newMap.put(entry.getValue(), entry.getKey());
+    }
+    return newMap;
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/84100142/server/master/src/main/java/org/apache/accumulo/master/Master.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/Master.java b/server/master/src/main/java/org/apache/accumulo/master/Master.java
index d528550..11b5345 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/Master.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/Master.java
@@ -33,6 +33,8 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import javax.management.StandardMBean;
+
 import org.apache.accumulo.core.Constants;
 import org.apache.accumulo.core.client.AccumuloException;
 import org.apache.accumulo.core.client.AccumuloSecurityException;
@@ -77,6 +79,7 @@ import org.apache.accumulo.fate.zookeeper.IZooReaderWriter;
 import org.apache.accumulo.fate.zookeeper.ZooLock.LockLossReason;
 import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy;
 import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeMissingPolicy;
+import org.apache.accumulo.master.metrics.ReplicationMetrics;
 import org.apache.accumulo.master.recovery.RecoveryManager;
 import org.apache.accumulo.master.replication.MasterReplicationCoordinator;
 import org.apache.accumulo.master.replication.ReplicationDriver;
@@ -1074,6 +1077,14 @@ public class Master implements LiveTServerSet.Listener, TableObserver,
CurrentSt
     ZooReaderWriter.getInstance().putPersistentData(ZooUtil.getRoot(instance) + Constants.ZMASTER_REPLICATION_COORDINATOR_ADDR,
         replAddress.address.toString().getBytes(StandardCharsets.UTF_8), NodeExistsPolicy.OVERWRITE);
 
+    final SystemCredentials creds = SystemCredentials.get();
+    try {
+      ReplicationMetrics beanImpl = new ReplicationMetrics(this.instance.getConnector(creds.getPrincipal(),
creds.getToken()));
+      beanImpl.register();
+    } catch (Exception e) {
+      log.error("Error registering Replication metrics with JMX", e);
+    }
+
     while (clientService.isServing()) {
       UtilWaitThread.sleep(500);
     }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/84100142/server/master/src/main/java/org/apache/accumulo/master/metrics/ReplicationMetrics.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/metrics/ReplicationMetrics.java
b/server/master/src/main/java/org/apache/accumulo/master/metrics/ReplicationMetrics.java
new file mode 100644
index 0000000..eae6c23
--- /dev/null
+++ b/server/master/src/main/java/org/apache/accumulo/master/metrics/ReplicationMetrics.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.master.metrics;
+
+import java.util.Map;
+import java.util.Set;
+
+import javax.management.MalformedObjectNameException;
+import javax.management.ObjectName;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.admin.TableOperations;
+import org.apache.accumulo.core.replication.ReplicationConstants;
+import org.apache.accumulo.core.replication.ReplicationTarget;
+import org.apache.accumulo.server.metrics.AbstractMetricsImpl;
+import org.apache.accumulo.server.replication.ReplicationUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * JMX bindings to expose 'high-level' metrics about Replication
+ */
+public class ReplicationMetrics extends AbstractMetricsImpl implements ReplicationMetricsMBean
{
+  private static final Logger log = LoggerFactory.getLogger(ReplicationMetrics.class);
+  private static final String METRICS_PREFIX = "replication";
+
+  private Connector conn;
+  private TableOperations tops;
+  private ObjectName objectName = null;
+  private ReplicationUtil replicationUtil;
+
+  public ReplicationMetrics(Connector conn) throws MalformedObjectNameException {
+    super();
+    this.conn = conn;
+    this.tops = conn.tableOperations();
+    objectName = new ObjectName("accumulo.server.metrics:service=Replication Metrics,name=ReplicationMBean,instance="
+ Thread.currentThread().getName());
+    replicationUtil = new ReplicationUtil();
+  }
+
+  @Override
+  public int getNumFilesPendingReplication() {
+    if (!tops.exists(ReplicationConstants.TABLE_NAME)) {
+      return 0;
+    }
+
+    Map<String,String> properties;
+    try {
+      properties = conn.instanceOperations().getSystemConfiguration();
+    } catch (AccumuloException | AccumuloSecurityException e) {
+      log.debug("Could not extract system configuration", e);
+      return 0;
+    }
+
+    // Get all of the configured replication peers
+    Map<String,String> peers = replicationUtil.getPeers(properties);
+
+    // A quick lookup to see if have any replication peer configured
+    if (peers.isEmpty()) {
+      return 0;
+    }
+
+    // The total set of configured targets
+    Set<ReplicationTarget> allConfiguredTargets = replicationUtil.getReplicationTargets(conn.tableOperations());
+
+    // Number of files per target we have to replicate
+    Map<ReplicationTarget,Long> targetCounts = replicationUtil.getPendingReplications(conn);
+
+    int filesPending = 0;
+
+    // Sum pending replication over all targets
+    for (ReplicationTarget configuredTarget : allConfiguredTargets) {
+      Long numFiles = targetCounts.get(configuredTarget);
+
+      if (null != numFiles) {
+        filesPending += numFiles;
+      }
+    }
+
+    return filesPending;
+  }
+
+  @Override
+  public int getNumConfiguredPeers() {
+    Map<String,String> properties;
+    try {
+      properties = conn.instanceOperations().getSystemConfiguration();
+    } catch (AccumuloException | AccumuloSecurityException e) {
+      log.debug("Could not extract system configuration", e);
+      return 0;
+    }
+
+    // Get all of the configured replication peers
+    return replicationUtil.getPeers(properties).size();
+  }
+
+  @Override
+  protected ObjectName getObjectName() {
+    return objectName;
+  }
+
+  @Override
+  protected String getMetricsPrefix() {
+    return METRICS_PREFIX;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/84100142/server/master/src/main/java/org/apache/accumulo/master/metrics/ReplicationMetricsMBean.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/metrics/ReplicationMetricsMBean.java
b/server/master/src/main/java/org/apache/accumulo/master/metrics/ReplicationMetricsMBean.java
new file mode 100644
index 0000000..2430466
--- /dev/null
+++ b/server/master/src/main/java/org/apache/accumulo/master/metrics/ReplicationMetricsMBean.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.master.metrics;
+
+/**
+ * 
+ */
+public interface ReplicationMetricsMBean {
+
+  public int getNumFilesPendingReplication();
+
+  public int getNumConfiguredPeers();
+
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/84100142/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/ReplicationServlet.java
----------------------------------------------------------------------
diff --git a/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/ReplicationServlet.java
b/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/ReplicationServlet.java
index 2249f98..896ac20 100644
--- a/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/ReplicationServlet.java
+++ b/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/ReplicationServlet.java
@@ -16,36 +16,18 @@
  */
 package org.apache.accumulo.monitor.servlets;
 
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
 import java.util.Map;
 import java.util.Map.Entry;
-import java.util.NoSuchElementException;
 import java.util.Set;
 
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
 
-import org.apache.accumulo.core.client.BatchScanner;
 import org.apache.accumulo.core.client.Connector;
 import org.apache.accumulo.core.client.Instance;
-import org.apache.accumulo.core.client.Scanner;
 import org.apache.accumulo.core.client.admin.TableOperations;
-import org.apache.accumulo.core.client.replication.ReplicaSystem;
-import org.apache.accumulo.core.client.replication.ReplicaSystemFactory;
-import org.apache.accumulo.core.conf.Property;
-import org.apache.accumulo.core.data.Key;
-import org.apache.accumulo.core.data.Range;
-import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.metadata.MetadataTable;
-import org.apache.accumulo.core.metadata.RootTable;
 import org.apache.accumulo.core.replication.ReplicationConstants;
-import org.apache.accumulo.core.replication.ReplicationSchema.WorkSection;
 import org.apache.accumulo.core.replication.ReplicationTarget;
-import org.apache.accumulo.core.replication.StatusUtil;
-import org.apache.accumulo.core.replication.proto.Replication.Status;
-import org.apache.accumulo.core.security.Authorizations;
 import org.apache.accumulo.core.security.Credentials;
 import org.apache.accumulo.core.zookeeper.ZooUtil;
 import org.apache.accumulo.monitor.util.Table;
@@ -53,18 +35,13 @@ import org.apache.accumulo.monitor.util.celltypes.NumberType;
 import org.apache.accumulo.server.client.HdfsZooInstance;
 import org.apache.accumulo.server.conf.ServerConfigurationFactory;
 import org.apache.accumulo.server.replication.DistributedWorkQueueWorkAssignerHelper;
+import org.apache.accumulo.server.replication.ReplicationUtil;
 import org.apache.accumulo.server.security.SystemCredentials;
 import org.apache.accumulo.server.zookeeper.DistributedWorkQueue;
-import org.apache.accumulo.server.zookeeper.ZooCache;
-import org.apache.hadoop.io.Text;
 import org.apache.zookeeper.KeeperException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Maps;
-import com.google.protobuf.InvalidProtocolBufferException;
-
 /**
  * 
  */
@@ -73,7 +50,11 @@ public class ReplicationServlet extends BasicServlet {
 
   private static final long serialVersionUID = 1L;
 
-  private ZooCache zooCache = new ZooCache();
+  private ReplicationUtil replicationUtil;
+
+  public ReplicationServlet() {
+    replicationUtil = new ReplicationUtil();
+  }
 
   @Override
   protected String getTitle(HttpServletRequest req) {
@@ -99,83 +80,16 @@ public class ReplicationServlet extends BasicServlet {
     replicationStats.addSortableColumn("ReplicaSystem Type");
     replicationStats.addSortableColumn("Files needing replication", new NumberType<Long>(),
null);
 
-    Map<String,String> properties = conn.instanceOperations().getSystemConfiguration();
-    Map<String,String> peers = new HashMap<>();
-    String definedPeersPrefix = Property.REPLICATION_PEERS.getKey();
-
-    // Get the defined peers and what ReplicaSystem impl they're using
-    for (Entry<String,String> property : properties.entrySet()) {
-      String key = property.getKey();
-      // Filter out cruft that we don't want
-      if (key.startsWith(definedPeersPrefix) && !key.startsWith(Property.REPLICATION_PEER_USER.getKey())
&& !key.startsWith(Property.REPLICATION_PEER_PASSWORD.getKey())) {
-        String peerName = property.getKey().substring(definedPeersPrefix.length());
-        ReplicaSystem replica;
-        try {
-         replica = ReplicaSystemFactory.get(property.getValue());
-        } catch (Exception e) {
-          log.warn("Could not instantiate ReplicaSystem for {} with configuration {}", property.getKey(),
property.getValue(), e);
-          continue;
-        }
-
-        peers.put(peerName, replica.getClass().getName());
-      }
-    }
-
-    final String targetPrefix = Property.TABLE_REPLICATION_TARGET.getKey();
+    Map<String,String> peers = replicationUtil.getPeers(conn.instanceOperations().getSystemConfiguration());
 
     // The total set of configured targets
-    Set<ReplicationTarget> allConfiguredTargets = new HashSet<>();
+    Set<ReplicationTarget> allConfiguredTargets = replicationUtil.getReplicationTargets(tops);
 
     // Number of files per target we have to replicate
-    Map<ReplicationTarget,Long> targetCounts = new HashMap<>();
+    Map<ReplicationTarget,Long> targetCounts = replicationUtil.getPendingReplications(conn);
 
     Map<String,String> tableNameToId = tops.tableIdMap();
-    Map<String,String> tableIdToName = invert(tableNameToId);
-
-    for (String table : tops.list()) {
-      if (MetadataTable.NAME.equals(table) || RootTable.NAME.equals(table)) {
-        continue;
-      }
-      String localId = tableNameToId.get(table);
-      if (null == localId) {
-        log.trace("Could not determine ID for {}", table);
-        continue;
-      }
-
-      Iterable<Entry<String,String>> propertiesForTable = tops.getProperties(table);
-      for (Entry<String,String> prop : propertiesForTable) {
-        if (prop.getKey().startsWith(targetPrefix)) {
-          String peerName = prop.getKey().substring(targetPrefix.length());
-          String remoteIdentifier = prop.getValue();
-          ReplicationTarget target = new ReplicationTarget(peerName, remoteIdentifier, localId);
-
-          allConfiguredTargets.add(target);
-        }
-      }
-    }
-
-    // Read over the queued work
-    BatchScanner bs = conn.createBatchScanner(ReplicationConstants.TABLE_NAME, Authorizations.EMPTY,
4);
-    bs.setRanges(Collections.singleton(new Range()));
-    WorkSection.limit(bs);
-    try {
-      Text buffer = new Text();
-      for (Entry<Key,Value> entry : bs) {
-        Key k = entry.getKey();
-        k.getColumnQualifier(buffer);
-        ReplicationTarget target = ReplicationTarget.from(buffer);
-
-        // TODO ACCUMULO-2835 once explicit lengths are tracked, we can give size-based estimates
instead of just file-based
-        Long count = targetCounts.get(target);
-        if (null == count) {
-          targetCounts.put(target, Long.valueOf(1l));
-        } else {
-          targetCounts.put(target, count + 1);
-        }
-      }
-    } finally {
-      bs.close();
-    }
+    Map<String,String> tableIdToName = replicationUtil.invert(tableNameToId);
 
     for (ReplicationTarget configuredTarget : allConfiguredTargets) {
       String tableName = tableIdToName.get(configuredTarget.getSourceTableId());
@@ -217,51 +131,11 @@ public class ReplicationServlet extends BasicServlet {
         String filename = queueKeyPair.getKey();
         ReplicationTarget target = queueKeyPair.getValue();
   
-        byte[] data = zooCache.get(workQueuePath + "/" + queueKey);
-  
-        // We could try to grep over the table, but without knowing the full file path, we
-        // can't find the status quickly
-        String status = "Unknown";
-        String path = null;
-        if (null != data) {
-          path = new String(data);
-          Scanner s = conn.createScanner(ReplicationConstants.TABLE_NAME, Authorizations.EMPTY);
-          s.setRange(Range.exact(path));
-          s.fetchColumn(WorkSection.NAME, target.toText());
-    
-          // Fetch the work entry for this item
-          Entry<Key,Value> kv = null;
-          try {
-            kv = Iterables.getOnlyElement(s);
-          } catch (NoSuchElementException e) {
-           log.trace("Could not find status of {} replicating to {}", filename, target);
-           status = "Unknown";
-          } finally {
-            s.close();
-          }
-    
-          // If we found the work entry for it, try to compute some progress
-          if (null != kv) {
-            try {
-              Status stat = Status.parseFrom(kv.getValue().get());
-              if (StatusUtil.isFullyReplicated(stat)) {
-                status = "Finished";
-              } else {
-                if (stat.getInfiniteEnd()) {
-                  status = stat.getBegin() + "/&infin; records";
-                } else {
-                  status = stat.getBegin() + "/" + stat.getEnd() + " records";
-                }
-              }
-            } catch (InvalidProtocolBufferException e) {
-              log.warn("Could not deserialize protobuf for {}", kv.getKey(), e);
-              status = "Unknown";
-            }
-          }
-        }
-  
+        String path = replicationUtil.getAbsolutePath(conn, workQueuePath, queueKey);
+        String progress = replicationUtil.getProgress(conn, path, target);
+        
         // Add a row in the table
-        replicationInProgress.addRow(null == path ? ".../" + filename : path, target.getPeerName(),
target.getSourceTableId(), target.getRemoteIdentifier(), status);
+        replicationInProgress.addRow(null == path ? ".../" + filename : path, target.getPeerName(),
target.getSourceTableId(), target.getRemoteIdentifier(), progress);
       }
     } catch (KeeperException | InterruptedException e) {
       log.warn("Could not calculate replication in progress", e);
@@ -269,12 +143,4 @@ public class ReplicationServlet extends BasicServlet {
 
     replicationInProgress.generate(req, sb);
   }
-
-  protected Map<String,String> invert(Map<String,String> map) {
-    Map<String,String> newMap = Maps.newHashMapWithExpectedSize(map.size());
-    for(Entry<String,String> entry : map.entrySet()) {
-      newMap.put(entry.getValue(), entry.getKey());
-    }
-    return newMap;
-  }
 }


Mime
View raw message