bookkeeper-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From si...@apache.org
Subject bookkeeper git commit: BOOKKEEPER-945: Add counters to track the activity of auditor and repl…
Date Sat, 10 Sep 2016 19:01:34 GMT
Repository: bookkeeper
Updated Branches:
  refs/heads/master 9db51b8d5 -> 9dc05fc08


BOOKKEEPER-945: Add counters to track the activity of auditor and repl…

…ication workers

Once we enable auto recovery, auditor and replication workers start their activity.
Today there is no way to monitor it using counters. This change introduces the
following counters to track various activities of auditor and replication workers like:

- Time taken by auditor to build the bookie->ledger list
- No. of under replicated ledgers detected
- Time taken by auditor to publish the under replicated ledger list
- Time taken by auditor to check all the ledgers in the cluster
- No. of ledgers replicated by each replication worker
- No. of entries and bytes of data read and written by each replication worker
- Auditor can also report the distribution of ledgers within the cluster: how many bookies
own a piece of ledger, etc.

Author: Rithin <rithin.shetty@salesforce.com>

Reviewers: sijie@apache.org <sijie@apache.org>

Closes #57 from rithin-shetty/auto_recovery_counters


Project: http://git-wip-us.apache.org/repos/asf/bookkeeper/repo
Commit: http://git-wip-us.apache.org/repos/asf/bookkeeper/commit/9dc05fc0
Tree: http://git-wip-us.apache.org/repos/asf/bookkeeper/tree/9dc05fc0
Diff: http://git-wip-us.apache.org/repos/asf/bookkeeper/diff/9dc05fc0

Branch: refs/heads/master
Commit: 9dc05fc080ddf01e69eb89ce1b0865c552d3de53
Parents: 9db51b8
Author: Rithin <rithin.shetty@salesforce.com>
Authored: Sat Sep 10 12:01:27 2016 -0700
Committer: Sijie Guo <sijie@apache.org>
Committed: Sat Sep 10 12:01:27 2016 -0700

----------------------------------------------------------------------
 .../bookkeeper/client/BookKeeperAdmin.java      | 17 +++++--
 .../client/LedgerFragmentReplicator.java        | 26 +++++++++-
 .../apache/bookkeeper/client/LedgerHandle.java  | 26 ++++++++++
 .../apache/bookkeeper/replication/Auditor.java  | 51 +++++++++++++++++---
 .../bookkeeper/replication/AuditorElector.java  |  5 +-
 .../replication/ReplicationStats.java           | 12 +++++
 .../replication/ReplicationWorker.java          | 11 ++++-
 .../bookkeeper/client/UpdateLedgerOpTest.java   |  2 +-
 .../replication/AuditorPeriodicCheckTest.java   |  3 +-
 9 files changed, 137 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9dc05fc0/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java
index 1ad17fd..022d4da 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java
@@ -52,6 +52,8 @@ import org.apache.zookeeper.AsyncCallback;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.KeeperException.Code;
+import org.apache.bookkeeper.stats.NullStatsLogger;
+import org.apache.bookkeeper.stats.StatsLogger;
 import org.apache.zookeeper.ZKUtil;
 import org.apache.zookeeper.ZooDefs.Ids;
 import org.apache.zookeeper.ZooKeeper;
@@ -137,24 +139,29 @@ public class BookKeeperAdmin {
         // Create the BookKeeper client instance
         bkc = new BookKeeper(conf, zk);
         ownsBK = true;
-
-        this.lfr = new LedgerFragmentReplicator(bkc);
+        this.lfr = new LedgerFragmentReplicator(bkc, NullStatsLogger.INSTANCE);
     }
 
     /**
      * Constructor that takes in a BookKeeper instance . This will be useful,
-     * when users already has bk instance ready.
+     * when user already has bk instance ready.
      *
      * @param bkc
      *            - bookkeeper instance
+     * @param statsLogger
+     *            - stats logger
      */
-    public BookKeeperAdmin(final BookKeeper bkc) {
+    public BookKeeperAdmin(final BookKeeper bkc, StatsLogger statsLogger) {
         this.bkc = bkc;
         ownsBK = false;
         this.zk = bkc.zk;
         ownsZK = false;
         this.bookiesPath = bkc.getConf().getZkAvailableBookiesPath();
-        this.lfr = new LedgerFragmentReplicator(bkc);
+        this.lfr = new LedgerFragmentReplicator(bkc, statsLogger);
+    }
+
+    public BookKeeperAdmin(final BookKeeper bkc) {
+        this(bkc, NullStatsLogger.INSTANCE);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9dc05fc0/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragmentReplicator.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragmentReplicator.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragmentReplicator.java
index b4c8cc8..22241e6 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragmentReplicator.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragmentReplicator.java
@@ -33,6 +33,11 @@ import org.apache.bookkeeper.proto.BookieProtocol;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.MultiCallback;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
+import org.apache.bookkeeper.replication.ReplicationStats;
+import org.apache.bookkeeper.stats.Counter;
+import org.apache.bookkeeper.stats.NullStatsLogger;
+import org.apache.bookkeeper.stats.OpStatsLogger;
+import org.apache.bookkeeper.stats.StatsLogger;
 import org.apache.bookkeeper.util.OrderedSafeExecutor.OrderedSafeGenericCallback;
 import org.apache.zookeeper.AsyncCallback;
 import org.apache.zookeeper.KeeperException.Code;
@@ -48,9 +53,23 @@ public class LedgerFragmentReplicator {
 
     // BookKeeper instance
     private BookKeeper bkc;
+    private StatsLogger statsLogger;
+    private final Counter numEntriesRead;
+    private final OpStatsLogger numBytesRead;
+    private final Counter numEntriesWritten;
+    private final OpStatsLogger numBytesWritten;
 
-    public LedgerFragmentReplicator(BookKeeper bkc) {
+    public LedgerFragmentReplicator(BookKeeper bkc, StatsLogger statsLogger) {
         this.bkc = bkc;
+        this.statsLogger = statsLogger;
+        numEntriesRead = this.statsLogger.getCounter(ReplicationStats.NUM_ENTRIES_READ);
+        numBytesRead = this.statsLogger.getOpStatsLogger(ReplicationStats.NUM_BYTES_READ);
+        numEntriesWritten = this.statsLogger.getCounter(ReplicationStats.NUM_ENTRIES_WRITTEN);
+        numBytesWritten = this.statsLogger.getOpStatsLogger(ReplicationStats.NUM_BYTES_WRITTEN);
+    }
+
+    public LedgerFragmentReplicator(BookKeeper bkc) {
+        this(bkc, NullStatsLogger.INSTANCE);
     }
 
     private final static Logger LOG = LoggerFactory
@@ -259,6 +278,9 @@ public class LedgerFragmentReplicator {
                  */
                 LedgerEntry entry = seq.nextElement();
                 byte[] data = entry.getEntry();
+                final long dataLength = data.length;
+                numEntriesRead.inc();
+                numBytesRead.registerSuccessfulValue(dataLength);
                 ChannelBuffer toSend = lh.getDigestManager()
                         .computeDigestAndPackageForSending(entryId,
                                 lh.getLastAddConfirmed(), entry.getLength(),
@@ -278,6 +300,8 @@ public class LedgerFragmentReplicator {
                                                     + addr, BKException
                                                     .create(rc));
                                 } else {
+                                    numEntriesWritten.inc();
+                                    numBytesWritten.registerSuccessfulValue(dataLength);
                                     if (LOG.isDebugEnabled()) {
                                         LOG.debug("Success writing ledger id "
                                                 + ledgerId + ", entry id "

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9dc05fc0/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
index edb5873..3c8d475 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
@@ -29,6 +29,7 @@ import java.util.Enumeration;
 import java.util.List;
 import java.util.Map;
 import java.util.Queue;
+import java.util.Set;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -51,6 +52,7 @@ import org.jboss.netty.buffer.ChannelBuffer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.collect.Sets;
 import com.google.common.util.concurrent.RateLimiter;
 
 /**
@@ -190,6 +192,30 @@ public class LedgerHandle implements AutoCloseable {
     }
 
     /**
+     * Get the number of fragments that makeup this ledger
+     *
+     * @return the count of fragments
+     */
+    synchronized public long getNumFragments() {
+        return metadata.getEnsembles().size();
+    }
+
+    /**
+     * Get the count of unique bookies that own part of this ledger
+     * by going over all the fragments of the ledger.
+     *
+     * @return count of unique bookies
+     */
+    synchronized public long getNumBookies() {
+        Map<Long, ArrayList<BookieSocketAddress>> m = metadata.getEnsembles();
+        Set<BookieSocketAddress> s = Sets.newHashSet();
+        for (ArrayList<BookieSocketAddress> aList : m.values()) {
+            s.addAll(aList);
+        }
+        return s.size();
+    }
+
+    /**
      * Get the DigestManager
      *
      * @return DigestManager for the LedgerHandle

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9dc05fc0/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java
index 2e9e048..835b34f 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java
@@ -20,7 +20,9 @@
  */
 package org.apache.bookkeeper.replication;
 
+import com.google.common.base.Stopwatch;
 import com.google.common.collect.Sets;
+
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BookKeeper;
 import org.apache.bookkeeper.client.BookKeeperAdmin;
@@ -39,13 +41,18 @@ import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.Processor;
 import org.apache.bookkeeper.replication.ReplicationException.BKAuditException;
 import org.apache.bookkeeper.replication.ReplicationException.CompatibilityException;
 import org.apache.bookkeeper.replication.ReplicationException.UnavailableException;
-import org.apache.bookkeeper.util.ZkUtils;
+import org.apache.bookkeeper.replication.ReplicationStats;
+import org.apache.bookkeeper.stats.Counter;
+import org.apache.bookkeeper.stats.NullStatsLogger;
+import org.apache.bookkeeper.stats.OpStatsLogger;
+import org.apache.bookkeeper.stats.StatsLogger;
 import org.apache.bookkeeper.zookeeper.ZooKeeperClient;
-import org.apache.bookkeeper.zookeeper.ZooKeeperWatcherBase;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.zookeeper.AsyncCallback;
+
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.util.concurrent.SettableFuture;
+
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.ZooKeeper;
 import org.slf4j.Logger;
@@ -83,11 +90,28 @@ public class Auditor implements BookiesListener {
     private final ScheduledExecutorService executor;
     private List<String> knownBookies = new ArrayList<String>();
     private final String bookieIdentifier;
+    private final StatsLogger statsLogger;
+    private final OpStatsLogger numUnderReplicatedLedger;
+    private final OpStatsLogger uRLPublishTimeForLostBookies;
+    private final OpStatsLogger bookieToLedgersMapCreationTime;
+    private final OpStatsLogger checkAllLedgersTime;
+    private final Counter numLedgersChecked;
+    private final OpStatsLogger numFragmentsPerLedger;
+    private final OpStatsLogger numBookiesPerLedger;
 
     public Auditor(final String bookieIdentifier, ServerConfiguration conf,
-                   ZooKeeper zkc) throws UnavailableException {
+                   ZooKeeper zkc, StatsLogger statsLogger) throws UnavailableException {
         this.conf = conf;
         this.bookieIdentifier = bookieIdentifier;
+        this.statsLogger = statsLogger;
+
+        numUnderReplicatedLedger = this.statsLogger.getOpStatsLogger(ReplicationStats.NUM_UNDER_REPLICATED_LEDGERS);
+        uRLPublishTimeForLostBookies = this.statsLogger.getOpStatsLogger(ReplicationStats.URL_PUBLISH_TIME_FOR_LOST_BOOKIE);
+        bookieToLedgersMapCreationTime = this.statsLogger.getOpStatsLogger(ReplicationStats.BOOKIE_TO_LEDGERS_MAP_CREATION_TIME);
+        checkAllLedgersTime = this.statsLogger.getOpStatsLogger(ReplicationStats.CHECK_ALL_LEDGERS_TIME);
+        numLedgersChecked = this.statsLogger.getCounter(ReplicationStats.NUM_LEDGERS_CHECKED);
+        numFragmentsPerLedger = statsLogger.getOpStatsLogger(ReplicationStats.NUM_FRAGMENTS_PER_LEDGER);
+        numBookiesPerLedger = statsLogger.getOpStatsLogger(ReplicationStats.NUM_BOOKIES_PER_LEDGER);
 
         initialize(conf, zkc);
 
@@ -113,7 +137,7 @@ public class Auditor implements BookiesListener {
                     .newLedgerUnderreplicationManager();
 
             this.bkc = new BookKeeper(new ClientConfiguration(conf), zkc);
-            this.admin = new BookKeeperAdmin(bkc);
+            this.admin = new BookKeeperAdmin(bkc, statsLogger);
         } catch (CompatibilityException ce) {
             throw new UnavailableException(
                     "CompatibilityException while initializing Auditor", ce);
@@ -215,7 +239,10 @@ public class Auditor implements BookiesListener {
                                     return;
                                 }
 
+                                Stopwatch stopwatch = new Stopwatch().start();
                                 checkAllLedgers();
+                                checkAllLedgersTime.registerSuccessfulEvent(stopwatch.stop().elapsedMillis(),
+                                                                            TimeUnit.MILLISECONDS);
                             } catch (KeeperException ke) {
                                 LOG.error("Exception while running periodic check", ke);
                             } catch (InterruptedException ie) {
@@ -295,6 +322,7 @@ public class Auditor implements BookiesListener {
             return;
         }
 
+        Stopwatch stopwatch = new Stopwatch().start();
         // put exit cases here
         Map<String, Set<Long>> ledgerDetails = generateBookie2LedgersIndex();
         try {
@@ -316,8 +344,12 @@ public class Auditor implements BookiesListener {
         Collection<String> lostBookies = CollectionUtils.subtract(knownBookies,
                 availableBookies);
 
-        if (lostBookies.size() > 0)
+        bookieToLedgersMapCreationTime.registerSuccessfulEvent(stopwatch.elapsedMillis(),
TimeUnit.MILLISECONDS);
+        if (lostBookies.size() > 0) {
             handleLostBookies(lostBookies, ledgerDetails);
+            uRLPublishTimeForLostBookies.registerSuccessfulEvent(stopwatch.stop().elapsedMillis(),
TimeUnit.MILLISECONDS);
+        }
+
     }
 
     private Map<String, Set<Long>> generateBookie2LedgersIndex()
@@ -347,6 +379,7 @@ public class Auditor implements BookiesListener {
         }
         LOG.info("Following ledgers: " + ledgers + " of bookie: " + bookieIP
                 + " are identified as underreplicated");
+        numUnderReplicatedLedger.registerSuccessfulValue(ledgers.size());
         for (Long ledgerId : ledgers) {
             try {
                 ledgerUnderreplicationManager.markLedgerUnderreplicated(
@@ -418,7 +451,7 @@ public class Auditor implements BookiesListener {
 
         final BookKeeper client = new BookKeeper(new ClientConfiguration(conf),
                                                  newzk);
-        final BookKeeperAdmin admin = new BookKeeperAdmin(client);
+        final BookKeeperAdmin admin = new BookKeeperAdmin(client, statsLogger);
 
         try {
             final LedgerChecker checker = new LedgerChecker(client);
@@ -447,6 +480,12 @@ public class Auditor implements BookiesListener {
                     try {
                         lh = admin.openLedgerNoRecovery(ledgerId);
                         checker.checkLedger(lh, new ProcessLostFragmentsCb(lh, callback));
+                        // we collect the following stats to get a measure of the
+                        // distribution of a single ledger within the bk cluster
+                        // the higher the number of fragments/bookies, the more distributed
it is
+                        numFragmentsPerLedger.registerSuccessfulValue(lh.getNumFragments());
+                        numBookiesPerLedger.registerSuccessfulValue(lh.getNumBookies());
+                        numLedgersChecked.inc();
                     } catch (BKException.BKNoSuchLedgerExistsException bknsle) {
                         LOG.debug("Ledger was deleted before we could check it", bknsle);
                         callback.processResult(BKException.Code.OK,

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9dc05fc0/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/AuditorElector.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/AuditorElector.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/AuditorElector.java
index 6d8f8f1..e8dfb02 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/AuditorElector.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/AuditorElector.java
@@ -92,6 +92,8 @@ public class AuditorElector {
 
     // Expose Stats
     private final Counter electionAttempts;
+    private final StatsLogger statsLogger;
+
 
     /**
      * AuditorElector for performing the auditor election
@@ -129,6 +131,7 @@ public class AuditorElector {
         this.bookieId = bookieId;
         this.conf = conf;
         this.zkc = zkc;
+        this.statsLogger = statsLogger;
         this.electionAttempts = statsLogger.getCounter(ELECTION_ATTEMPTS);
         basePath = conf.getZkLedgersRootPath() + '/'
                 + BookKeeperConstants.UNDER_REPLICATION_NODE;
@@ -266,7 +269,7 @@ public class AuditorElector {
 
                             zkc.setData(getVotePath(""),
                                         TextFormat.printToString(builder.build()).getBytes(UTF_8),
-1);
-                            auditor = new Auditor(bookieId, conf, zkc);
+                            auditor = new Auditor(bookieId, conf, zkc, statsLogger);
                             auditor.start();
                         } else {
                             // If not an auditor, will be watching to my predecessor and

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9dc05fc0/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationStats.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationStats.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationStats.java
index 9e280a7..231ec01 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationStats.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationStats.java
@@ -26,9 +26,21 @@ public interface ReplicationStats {
 
     public final static String AUDITOR_SCOPE = "auditor";
     public final static String ELECTION_ATTEMPTS = "election_attempts";
+    public final static String NUM_UNDER_REPLICATED_LEDGERS = "NUM_UNDER_REPLICATED_LEDGERS";
+    public final static String URL_PUBLISH_TIME_FOR_LOST_BOOKIE = "URL_PUBLISH_TIME_FOR_LOST_BOOKIE";
+    public final static String BOOKIE_TO_LEDGERS_MAP_CREATION_TIME = "BOOKIE_TO_LEDGERS_MAP_CREATION_TIME";
+    public final static String CHECK_ALL_LEDGERS_TIME = "CHECK_ALL_LEDGERS_TIME";
+    public final static String NUM_FRAGMENTS_PER_LEDGER = "NUM_FRAGMENTS_PER_LEDGER";
+    public final static String NUM_BOOKIES_PER_LEDGER = "NUM_BOOKIES_PER_LEDGER";
+    public final static String NUM_LEDGERS_CHECKED = "NUM_LEDGERS_CHECKED";
 
     public final static String REPLICATION_WORKER_SCOPE = "replication_worker";
     public final static String REREPLICATE_OP = "rereplicate";
+    public final static String NUM_FULL_OR_PARTIAL_LEDGERS_REPLICATED = "NUM_FULL_OR_PARTIAL_LEDGERS_REPLICATED";
+    public final static String NUM_ENTRIES_READ = "NUM_ENTRIES_READ";
+    public final static String NUM_BYTES_READ = "NUM_BYTES_READ";
+    public final static String NUM_ENTRIES_WRITTEN = "NUM_ENTRIES_WRITTEN";
+    public final static String NUM_BYTES_WRITTEN = "NUM_BYTES_WRITTEN";
 
     public final static String BK_CLIENT_SCOPE = "bk_client";
 

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9dc05fc0/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationWorker.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationWorker.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationWorker.java
index e686e11..3f2261f 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationWorker.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationWorker.java
@@ -51,6 +51,7 @@ import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
 import org.apache.bookkeeper.replication.ReplicationException.CompatibilityException;
 import org.apache.bookkeeper.replication.ReplicationException.UnavailableException;
+import org.apache.bookkeeper.stats.Counter;
 import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.apache.bookkeeper.stats.OpStatsLogger;
 import org.apache.bookkeeper.stats.StatsLogger;
@@ -84,6 +85,7 @@ public class ReplicationWorker implements Runnable {
 
     // Expose Stats
     private final OpStatsLogger rereplicateOpStats;
+    private final Counter numLedgersReplicated;
 
     /**
      * Replication worker for replicating the ledger fragments from
@@ -134,7 +136,7 @@ public class ReplicationWorker implements Runnable {
                 .setZookeeper(zkc)
                 .setStatsLogger(statsLogger.scope(BK_CLIENT_SCOPE))
                 .build();
-        this.admin = new BookKeeperAdmin(bkc);
+        this.admin = new BookKeeperAdmin(bkc, statsLogger);
         this.ledgerChecker = new LedgerChecker(bkc);
         this.workerThread = new BookieThread(this, "ReplicationWorker");
         this.openLedgerRereplicationGracePeriod = conf
@@ -143,6 +145,7 @@ public class ReplicationWorker implements Runnable {
 
         // Expose Stats
         this.rereplicateOpStats = statsLogger.getOpStatsLogger(REREPLICATE_OP);
+        this.numLedgersReplicated = statsLogger.getCounter(ReplicationStats.NUM_FULL_OR_PARTIAL_LEDGERS_REPLICATED);
     }
 
     /** Start the replication worker */
@@ -254,6 +257,7 @@ public class ReplicationWorker implements Runnable {
         LOG.debug("Founds fragments {} for replication from ledger: {}", fragments, ledgerIdToReplicate);
 
         boolean foundOpenFragments = false;
+        long numFragsReplicated = 0;
         for (LedgerFragment ledgerFragment : fragments) {
             if (!ledgerFragment.isClosed()) {
                 foundOpenFragments = true;
@@ -266,6 +270,7 @@ public class ReplicationWorker implements Runnable {
             }
             try {
                 admin.replicateLedgerFragment(lh, ledgerFragment, targetBookie);
+                numFragsReplicated++;
             } catch (BKException.BKBookieHandleNotAvailableException e) {
                 LOG.warn("BKBookieHandleNotAvailableException "
                         + "while replicating the fragment", e);
@@ -279,6 +284,10 @@ public class ReplicationWorker implements Runnable {
             }
         }
 
+        if (numFragsReplicated > 0) {
+            numLedgersReplicated.inc();
+        }
+
         if (foundOpenFragments || isLastSegmentOpenAndMissingBookies(lh)) {
             deferLedgerLockRelease(ledgerIdToReplicate);
             return false;

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9dc05fc0/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/UpdateLedgerOpTest.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/UpdateLedgerOpTest.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/UpdateLedgerOpTest.java
index 2ea094f..1a9abea 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/UpdateLedgerOpTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/UpdateLedgerOpTest.java
@@ -302,4 +302,4 @@ public class UpdateLedgerOpTest extends BookKeeperClusterTestCase {
         }
         return lh;
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9dc05fc0/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorPeriodicCheckTest.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorPeriodicCheckTest.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorPeriodicCheckTest.java
index 353edfa..428a597 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorPeriodicCheckTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorPeriodicCheckTest.java
@@ -30,6 +30,7 @@ import org.apache.bookkeeper.client.LedgerHandle;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.meta.LedgerManagerFactory;
 import org.apache.bookkeeper.meta.LedgerUnderreplicationManager;
+import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
 import org.apache.bookkeeper.zookeeper.ZooKeeperClient;
 import org.apache.zookeeper.ZooKeeper;
@@ -313,7 +314,7 @@ public class AuditorPeriodicCheckTest extends BookKeeperClusterTestCase
{
         }
         final Auditor auditor = new Auditor(
                 Bookie.getBookieAddress(bsConfs.get(0)).toString(),
-                bsConfs.get(0), zkc);
+                bsConfs.get(0), zkc, NullStatsLogger.INSTANCE);
         final AtomicBoolean exceptionCaught = new AtomicBoolean(false);
         final CountDownLatch latch = new CountDownLatch(1);
         Thread t = new Thread() {


Mime
View raw message