zookeeper-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From si...@apache.org
Subject svn commit: r1559918 - in /zookeeper/bookkeeper/trunk: ./ bookkeeper-server/ bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/
Date Tue, 21 Jan 2014 04:19:09 GMT
Author: sijie
Date: Tue Jan 21 04:19:09 2014
New Revision: 1559918

URL: http://svn.apache.org/r1559918
Log:
BOOKKEEPER-696: stats collection on bookkeeper client (Aniruddha, ivank via sijie)

Added:
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperClientStats.java
Modified:
    zookeeper/bookkeeper/trunk/CHANGES.txt
    zookeeper/bookkeeper/trunk/bookkeeper-server/pom.xml
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerCreateOp.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerDeleteOp.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerOpenOp.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
    zookeeper/bookkeeper/trunk/pom.xml

Modified: zookeeper/bookkeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/CHANGES.txt?rev=1559918&r1=1559917&r2=1559918&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/CHANGES.txt (original)
+++ zookeeper/bookkeeper/trunk/CHANGES.txt Tue Jan 21 04:19:09 2014
@@ -140,6 +140,8 @@ Trunk (unreleased changes)
 
         BOOKKEEPER-429: Provide separate read and write threads in the bookkeeper server (Aniruddha via sijie)
 
+        BOOKKEEPER-696: stats collection on bookkeeper client (Aniruddha, ivank via sijie)
+
       hedwig-server:
 
         BOOKKEEPER-601: readahead cache size isn't updated correctly (sijie via fpj)

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/pom.xml
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/pom.xml?rev=1559918&r1=1559917&r2=1559918&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/pom.xml (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/pom.xml Tue Jan 21 04:19:09 2014
@@ -33,6 +33,11 @@
   </properties>
   <dependencies>
     <dependency>
+      <groupId>org.apache.bookkeeper.stats</groupId>
+      <artifactId>bookkeeper-stats-api</artifactId>
+      <version>${project.parent.version}</version>
+    </dependency>
+    <dependency>
       <groupId>com.google.protobuf</groupId>
       <artifactId>protobuf-java</artifactId>
       <version>2.4.1</version>
@@ -255,5 +260,25 @@
         </plugins>
       </build>
     </profile>
+    <profile>
+      <id>twitter-science-provider</id>
+      <dependencies>
+        <dependency>
+          <groupId>org.apache.bookkeeper.stats</groupId>
+          <artifactId>twitter-science-provider</artifactId>
+          <version>${project.parent.version}</version>
+        </dependency>
+      </dependencies>
+    </profile>
+    <profile>
+      <id>codahale-metrics-provider</id>
+      <dependencies>
+        <dependency>
+          <groupId>org.apache.bookkeeper.stats</groupId>
+          <artifactId>codahale-metrics-provider</artifactId>
+          <version>${project.parent.version}</version>
+        </dependency>
+      </dependencies>
+    </profile>
   </profiles>
 </project>

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java?rev=1559918&r1=1559917&r2=1559918&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java Tue Jan 21 04:19:09 2014
@@ -36,6 +36,9 @@ import org.apache.bookkeeper.meta.Ledger
 import org.apache.bookkeeper.meta.LedgerManagerFactory;
 import org.apache.bookkeeper.proto.BookieClient;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
+import org.apache.bookkeeper.stats.NullStatsLogger;
+import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.bookkeeper.stats.OpStatsLogger;
 import org.apache.bookkeeper.util.OrderedSafeExecutor;
 import org.apache.bookkeeper.util.ReflectionUtils;
 import org.apache.bookkeeper.util.ZkUtils;
@@ -72,6 +75,14 @@ public class BookKeeper {
     final static int zkConnectTimeoutMs = 5000;
     final ClientSocketChannelFactory channelFactory;
 
+    // The stats logger for this client.
+    private final StatsLogger statsLogger;
+    private OpStatsLogger createOpLogger;
+    private OpStatsLogger openOpLogger;
+    private OpStatsLogger deleteOpLogger;
+    private OpStatsLogger readOpLogger;
+    private OpStatsLogger addOpLogger;
+
     // whether the socket factory is one we created, or is owned by whoever
     // instantiated us
     boolean ownChannelFactory = false;
@@ -99,6 +110,63 @@ public class BookKeeper {
         public void connectionFailed(int code);
     }
 
+    static class Builder {
+        final ClientConfiguration conf;
+
+        ZooKeeper zk = null;
+        ClientSocketChannelFactory channelFactory = null;
+        StatsLogger statsLogger = NullStatsLogger.INSTANCE;
+
+        Builder(ClientConfiguration conf) {
+            this.conf = conf;
+        }
+
+        Builder setChannelFactory(ClientSocketChannelFactory f) {
+            channelFactory = f;
+            return this;
+        }
+
+        Builder setZookeeper(ZooKeeper zk) {
+            this.zk = zk;
+            return this;
+        }
+
+        Builder setStatsLogger(StatsLogger statsLogger) {
+            this.statsLogger = statsLogger;
+            return this;
+        }
+
+        BookKeeper build() throws IOException, InterruptedException, KeeperException {
+            boolean ownZK = false;
+            boolean ownChannelFactory = false;
+            if (zk == null) {
+                ownZK = true;
+                ZooKeeperWatcherBase w = new ZooKeeperWatcherBase(conf.getZkTimeout());
+                zk = ZkUtils.createConnectedZookeeperClient(conf.getZkServers(), w);
+                w.waitForConnection();
+            }
+            if (channelFactory == null) {
+                ownChannelFactory = true;
+                ThreadFactoryBuilder tfb = new ThreadFactoryBuilder();
+                channelFactory = new NioClientSocketChannelFactory(
+                        Executors.newCachedThreadPool(tfb.setNameFormat(
+                                                              "BookKeeper-NIOBoss-%d").build()),
+                        Executors.newCachedThreadPool(tfb.setNameFormat(
+                                                              "BookKeeper-NIOWorker-%d").build()));
+            }
+
+            BookKeeper bk = new BookKeeper(conf, zk, channelFactory, statsLogger);
+            bk.ownZKHandle = ownZK;
+            bk.ownChannelFactory = ownChannelFactory;
+
+            return bk;
+        }
+    }
+
+    public static Builder forConfig(final ClientConfiguration conf) {
+        return new Builder(conf);
+    }
+
     /**
      * Create a bookkeeper client. A zookeeper client and a client socket factory
      * will be instantiated as part of this constructor.
@@ -142,6 +210,8 @@ public class BookKeeper {
                         "BookKeeper-NIOWorker-%d").build()));
         this.scheduler = Executors.newSingleThreadScheduledExecutor(tfb
                 .setNameFormat("BookKeeperClientScheduler-%d").build());
+        this.statsLogger = NullStatsLogger.INSTANCE;
+        initOpLoggers(this.statsLogger);
         // initialize the ensemble placement
         this.placementPolicy = initializeEnsemblePlacementPolicy(conf);
 
@@ -156,7 +226,7 @@ public class BookKeeper {
 
         ownChannelFactory = true;
         ownZKHandle = true;
-     }
+    }
 
     /**
      * Create a bookkeeper client but use the passed in zookeeper client instead
@@ -202,6 +272,15 @@ public class BookKeeper {
      */
     public BookKeeper(ClientConfiguration conf, ZooKeeper zk, ClientSocketChannelFactory channelFactory)
             throws IOException, InterruptedException, KeeperException {
+        this(conf, zk, channelFactory, NullStatsLogger.INSTANCE);
+    }
+
+    /**
+     * Contructor for use with the builder. Other constructors also use it.
+     */
+    private BookKeeper(ClientConfiguration conf, ZooKeeper zk,
+                       ClientSocketChannelFactory channelFactory, StatsLogger statsLogger)
+            throws IOException, InterruptedException, KeeperException {
         if (zk == null || channelFactory == null) {
             throw new NullPointerException();
         }
@@ -216,12 +295,15 @@ public class BookKeeper {
                 "BookKeeperClientScheduler-%d");
         this.scheduler = Executors
                 .newSingleThreadScheduledExecutor(tfb.build());
+        this.statsLogger = statsLogger.scope(BookKeeperClientStats.CLIENT_SCOPE);
+        initOpLoggers(this.statsLogger);
+
         // initialize the ensemble placement
         this.placementPolicy = initializeEnsemblePlacementPolicy(conf);
 
         mainWorkerPool = new OrderedSafeExecutor(conf.getNumWorkerThreads(),
                 "BookKeeperClientWorker");
-        bookieClient = new BookieClient(conf, channelFactory, mainWorkerPool);
+        bookieClient = new BookieClient(conf, channelFactory, mainWorkerPool, statsLogger);
         bookieWatcher = new BookieWatcher(conf, scheduler, placementPolicy, this);
         bookieWatcher.readBookiesBlocking();
 
@@ -262,6 +344,10 @@ public class BookKeeper {
         return conf;
     }
 
+    StatsLogger getStatsLogger() {
+        return statsLogger;
+    }
+
     /**
      * Get the BookieClient, currently used for doing bookie recovery.
      *
@@ -747,6 +833,17 @@ public class BookKeeper {
         }
     }
 
+    private final void initOpLoggers(StatsLogger stats) {
+        createOpLogger = stats.getOpStatsLogger(BookKeeperClientStats.CREATE_OP);
+        deleteOpLogger = stats.getOpStatsLogger(BookKeeperClientStats.DELETE_OP);
+        openOpLogger = stats.getOpStatsLogger(BookKeeperClientStats.OPEN_OP);
+        readOpLogger = stats.getOpStatsLogger(BookKeeperClientStats.READ_OP);
+        addOpLogger = stats.getOpStatsLogger(BookKeeperClientStats.ADD_OP);
+    }
 
-
+    OpStatsLogger getCreateOpLogger() { return createOpLogger; }
+    OpStatsLogger getOpenOpLogger() { return openOpLogger; }
+    OpStatsLogger getDeleteOpLogger() { return deleteOpLogger; }
+    OpStatsLogger getReadOpLogger() { return readOpLogger; }
+    OpStatsLogger getAddOpLogger() { return addOpLogger; }
 }

Added: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperClientStats.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperClientStats.java?rev=1559918&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperClientStats.java (added)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperClientStats.java Tue Jan 21 04:19:09 2014
@@ -0,0 +1,41 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.bookkeeper.client;
+
+public interface BookKeeperClientStats {
+    public final static String CLIENT_SCOPE = "bookkeeper_client";
+    public final static String CREATE_OP = "LEDGER_CREATE";
+    public final static String DELETE_OP = "LEDGER_DELETE";
+    public final static String OPEN_OP = "LEDGER_OPEN";
+    public final static String ADD_OP = "ADD_ENTRY";
+    public final static String READ_OP = "READ_ENTRY";
+    public final static String PENDING_ADDS = "NUM_PENDING_ADD";
+    public final static String ENSEMBLE_CHANGES = "NUM_ENSEMBLE_CHANGE";
+
+    // per channel stats
+    public final static String CHANNEL_SCOPE = "per_channel_bookie_client";
+
+    public final static String CHANNEL_READ_OP = "READ_ENTRY";
+    public final static String CHANNEL_TIMEOUT_READ = "TIMEOUT_READ_ENTRY";
+    public final static String CHANNEL_ADD_OP = "ADD_ENTRY";
+    public final static String CHANNEL_TIMEOUT_ADD = "TIMEOUT_ADD_ENTRY";
+}

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerCreateOp.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerCreateOp.java?rev=1559918&r1=1559917&r2=1559918&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerCreateOp.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerCreateOp.java Tue Jan 21 04:19:09 2014
@@ -29,7 +29,8 @@ import org.apache.bookkeeper.client.Asyn
 import org.apache.bookkeeper.client.BKException.BKNotEnoughBookiesException;
 import org.apache.bookkeeper.client.BookKeeper.DigestType;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
-
+import org.apache.bookkeeper.stats.OpStatsLogger;
+import org.apache.bookkeeper.util.MathUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -48,6 +49,8 @@ class LedgerCreateOp implements GenericC
     byte[] passwd;
     BookKeeper bk;
     DigestType digestType;
+    long startTime;
+    OpStatsLogger createOpLogger;
 
     /**
      * Constructor
@@ -78,6 +81,8 @@ class LedgerCreateOp implements GenericC
         this.passwd = passwd;
         this.cb = cb;
         this.ctx = ctx;
+        this.startTime = MathUtils.nowInNano();
+        this.createOpLogger = bk.getCreateOpLogger();
     }
 
     /**
@@ -96,7 +101,7 @@ class LedgerCreateOp implements GenericC
                     .newEnsemble(metadata.getEnsembleSize(), metadata.getWriteQuorumSize());
         } catch (BKNotEnoughBookiesException e) {
             LOG.error("Not enough bookies to create ledger");
-            cb.createComplete(e.getCode(), null, this.ctx);
+            createComplete(e.getCode(), null);
             return;
         }
 
@@ -115,7 +120,7 @@ class LedgerCreateOp implements GenericC
     @Override
     public void operationComplete(int rc, Long ledgerId) {
         if (BKException.Code.OK != rc) {
-            cb.createComplete(rc, null, this.ctx);
+            createComplete(rc, null);
             return;
         }
 
@@ -123,16 +128,25 @@ class LedgerCreateOp implements GenericC
             lh = new LedgerHandle(bk, ledgerId, metadata, digestType, passwd);
         } catch (GeneralSecurityException e) {
             LOG.error("Security exception while creating ledger: " + ledgerId, e);
-            cb.createComplete(BKException.Code.DigestNotInitializedException, null, this.ctx);
+            createComplete(BKException.Code.DigestNotInitializedException, null);
             return;
         } catch (NumberFormatException e) {
             LOG.error("Incorrectly entered parameter throttle: " + bk.getConf().getThrottleValue(), e);
-            cb.createComplete(BKException.Code.IncorrectParameterException, null, this.ctx);
+            createComplete(BKException.Code.IncorrectParameterException, null);
             return;
         }
-
         // return the ledger handle back
-        cb.createComplete(BKException.Code.OK, lh, this.ctx);
+        createComplete(BKException.Code.OK, lh);
+    }
+
+    private void createComplete(int rc, LedgerHandle lh) {
+        // Opened a new ledger
+        if (BKException.Code.OK != rc) {
+            createOpLogger.registerFailedEvent(MathUtils.elapsedMSec(startTime));
+        } else {
+            createOpLogger.registerSuccessfulEvent(MathUtils.elapsedMSec(startTime));
+        }
+        cb.createComplete(rc, lh, ctx);
     }
 
 }

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerDeleteOp.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerDeleteOp.java?rev=1559918&r1=1559917&r2=1559918&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerDeleteOp.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerDeleteOp.java Tue Jan 21 04:19:09 2014
@@ -22,6 +22,8 @@
 package org.apache.bookkeeper.client;
 
 import org.apache.bookkeeper.client.AsyncCallback.DeleteCallback;
+import org.apache.bookkeeper.stats.OpStatsLogger;
+import org.apache.bookkeeper.util.MathUtils;
 import org.apache.bookkeeper.util.OrderedSafeExecutor.OrderedSafeGenericCallback;
 import org.apache.bookkeeper.versioning.Version;
 import org.slf4j.Logger;
@@ -39,6 +41,8 @@ class LedgerDeleteOp extends OrderedSafe
     long ledgerId;
     DeleteCallback cb;
     Object ctx;
+    long startTime;
+    OpStatsLogger deleteOpLogger;
 
     /**
      * Constructor
@@ -58,6 +62,8 @@ class LedgerDeleteOp extends OrderedSafe
         this.ledgerId = ledgerId;
         this.cb = cb;
         this.ctx = ctx;
+        this.startTime = MathUtils.nowInNano();
+        this.deleteOpLogger = bk.getDeleteOpLogger();
     }
 
     /**
@@ -74,6 +80,11 @@ class LedgerDeleteOp extends OrderedSafe
      */
     @Override
     public void safeOperationComplete(int rc, Void result) {
+        if (BKException.Code.OK != rc) {
+            deleteOpLogger.registerFailedEvent(startTime);
+        } else {
+            deleteOpLogger.registerSuccessfulEvent(startTime);
+        }
         cb.deleteComplete(rc, this.ctx);
     }
 }

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java?rev=1559918&r1=1559917&r2=1559918&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java Tue Jan 21 04:19:09 2014
@@ -39,6 +39,8 @@ import org.apache.bookkeeper.client.Book
 import org.apache.bookkeeper.proto.BookieProtocol;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
 import org.apache.bookkeeper.proto.DataFormats.LedgerMetadataFormat.State;
+import org.apache.bookkeeper.stats.Counter;
+import org.apache.bookkeeper.stats.Gauge;
 import org.apache.bookkeeper.util.OrderedSafeExecutor.OrderedSafeGenericCallback;
 import org.apache.bookkeeper.util.SafeRunnable;
 import org.jboss.netty.buffer.ChannelBuffer;
@@ -75,6 +77,8 @@ public class LedgerHandle {
     final AtomicInteger blockAddCompletions = new AtomicInteger(0);
     final Queue<PendingAddOp> pendingAddOps = new ConcurrentLinkedQueue<PendingAddOp>();
 
+    final Counter ensembleChangeCounter;
+
     LedgerHandle(BookKeeper bk, long ledgerId, LedgerMetadata metadata,
                  DigestType digestType, byte[] password)
             throws GeneralSecurityException, NumberFormatException {
@@ -97,6 +101,17 @@ public class LedgerHandle {
         this.ledgerKey = MacDigestManager.genDigest("ledger", password);
         distributionSchedule = new RoundRobinDistributionSchedule(
                 metadata.getWriteQuorumSize(), metadata.getAckQuorumSize(), metadata.getEnsembleSize());
+
+        ensembleChangeCounter = bk.getStatsLogger().getCounter(BookKeeperClientStats.ENSEMBLE_CHANGES);
+        bk.getStatsLogger().registerGauge(BookKeeperClientStats.PENDING_ADDS,
+                                          new Gauge<Integer>() {
+                                              public Integer getDefaultValue() {
+                                                  return 0;
+                                              }
+                                              public Integer getSample() {
+                                                  return pendingAddOps.size();
+                                              }
+                                          });
     }
 
     /**
@@ -200,7 +215,7 @@ public class LedgerHandle {
      * Close this ledger synchronously.
      * @see #asyncClose
      */
-    public void close() 
+    public void close()
             throws InterruptedException, BKException {
         SyncCounter counter = new SyncCounter();
         counter.inc();
@@ -215,11 +230,11 @@ public class LedgerHandle {
 
     /**
      * Asynchronous close, any adds in flight will return errors.
-     * 
-     * Closing a ledger will ensure that all clients agree on what the last entry 
-     * of the ledger is. This ensures that, once the ledger has been closed, all 
-     * reads from the ledger will return the same set of entries. 
-     * 
+     *
+     * Closing a ledger will ensure that all clients agree on what the last entry
+     * of the ledger is. This ensures that, once the ledger has been closed, all
+     * reads from the ledger will return the same set of entries.
+     *
      * @param cb
      *          callback implementation
      * @param ctx
@@ -416,7 +431,7 @@ public class LedgerHandle {
         SyncAddCallback callback = new SyncAddCallback();
         asyncAddEntry(data, offset, length, callback, counter);
         counter.block(0);
-        
+
         if (counter.getrc() != BKException.Code.OK) {
             throw BKException.create(counter.getrc());
         }
@@ -521,14 +536,14 @@ public class LedgerHandle {
     }
 
     /**
-     * Obtains asynchronously the last confirmed write from a quorum of bookies. This 
+     * Obtains asynchronously the last confirmed write from a quorum of bookies. This
      * call obtains the the last add confirmed each bookie has received for this ledger
      * and returns the maximum. If the ledger has been closed, the value returned by this
      * call may not correspond to the id of the last entry of the ledger, since it reads
-     * the hint of bookies. Consequently, in the case the ledger has been closed, it may 
-     * return a different value than getLastAddConfirmed, which returns the local value 
+     * the hint of bookies. Consequently, in the case the ledger has been closed, it may
+     * return a different value than getLastAddConfirmed, which returns the local value
      * of the ledger handle.
-     * 
+     *
      * @see #getLastAddConfirmed()
      *
      * @param cb
@@ -601,18 +616,17 @@ public class LedgerHandle {
      * obtains the the last add confirmed each bookie has received for this ledger
      * and returns the maximum. If the ledger has been closed, the value returned by this
      * call may not correspond to the id of the last entry of the ledger, since it reads
-     * the hint of bookies. Consequently, in the case the ledger has been closed, it may 
-     * return a different value than getLastAddConfirmed, which returns the local value 
+     * the hint of bookies. Consequently, in the case the ledger has been closed, it may
+     * return a different value than getLastAddConfirmed, which returns the local value
      * of the ledger handle.
-     * 
+     *
      * @see #getLastAddConfirmed()
-     * 
+     *
      * @return The entry id of the last confirmed write or {@link #INVALID_ENTRY_ID INVALID_ENTRY_ID}
      *         if no entry has been confirmed
      * @throws InterruptedException
      * @throws BKException
      */
-    
     public long readLastConfirmed()
             throws InterruptedException, BKException {
         LastConfirmedCtx ctx = new LastConfirmedCtx();
@@ -745,6 +759,9 @@ public class LedgerHandle {
         @Override
         public void safeOperationComplete(final int rc, Void result) {
             if (rc == BKException.Code.MetadataVersionException) {
+                // We changed the ensemble, but got a version exception. We
+                // should still consider this as an ensemble change
+                ensembleChangeCounter.inc();
                 rereadMetadata(new ReReadLedgerMetadataCb(rc,
                                        ensembleInfo));
                 return;
@@ -758,6 +775,8 @@ public class LedgerHandle {
             }
             blockAddCompletions.decrementAndGet();
 
+            // We've successfully changed an ensemble
+            ensembleChangeCounter.inc();
             // the failed bookie has been replaced
             unsetSuccessAndSendWriteRequest(ensembleInfo.bookieIndex);
         }
@@ -850,6 +869,8 @@ public class LedgerHandle {
                     return updateMetadataIfPossible(newMeta);
                 }
             } else {
+                ensembleChangeCounter.inc();
+                // We've successfully changed an ensemble
                 // the failed bookie has been replaced
                 blockAddCompletions.decrementAndGet();
                 unsetSuccessAndSendWriteRequest(ensembleInfo.bookieIndex);
@@ -948,7 +969,7 @@ public class LedgerHandle {
             // noop
         }
     }
-    
+
     private static class SyncReadCallback implements ReadCallback {
         /**
          * Implementation of callback interface for synchronous read method.
@@ -965,7 +986,7 @@ public class LedgerHandle {
         @Override
         public void readComplete(int rc, LedgerHandle lh,
                                  Enumeration<LedgerEntry> seq, Object ctx) {
-            
+
             SyncCounter counter = (SyncCounter) ctx;
             synchronized (counter) {
                 counter.setSequence(seq);
@@ -1008,7 +1029,7 @@ public class LedgerHandle {
         @Override
         public void readLastConfirmedComplete(int rc, long lastConfirmed, Object ctx) {
             LastConfirmedCtx lcCtx = (LastConfirmedCtx) ctx;
-            
+
             synchronized(lcCtx) {
                 lcCtx.setRC(rc);
                 lcCtx.setLastConfirmed(lastConfirmed);

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerOpenOp.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerOpenOp.java?rev=1559918&r1=1559917&r2=1559918&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerOpenOp.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerOpenOp.java Tue Jan 21 04:19:09 2014
@@ -28,6 +28,8 @@ import org.apache.bookkeeper.client.Asyn
 import org.apache.bookkeeper.client.AsyncCallback.ReadLastConfirmedCallback;
 import org.apache.bookkeeper.client.BookKeeper.DigestType;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
+import org.apache.bookkeeper.stats.OpStatsLogger;
+import org.apache.bookkeeper.util.MathUtils;
 import org.apache.bookkeeper.util.OrderedSafeExecutor.OrderedSafeGenericCallback;
 
 import org.slf4j.Logger;
@@ -49,6 +51,8 @@ class LedgerOpenOp implements GenericCal
     final DigestType digestType;
     boolean doRecovery = true;
     boolean administrativeOpen = false;
+    long startTime;
+    OpStatsLogger openOpLogger;
 
     /**
      * Constructor.
@@ -85,6 +89,10 @@ class LedgerOpenOp implements GenericCal
      * Inititates the ledger open operation
      */
     public void initiate() {
+        startTime = MathUtils.nowInNano();
+
+        openOpLogger = bk.getOpenOpLogger();
+
         /**
          * Asynchronously read the ledger metadata node.
          */
@@ -102,10 +110,11 @@ class LedgerOpenOp implements GenericCal
     /**
      * Implements Open Ledger Callback.
      */
+    @Override
     public void operationComplete(int rc, LedgerMetadata metadata) {
         if (BKException.Code.OK != rc) {
             // open ledger failed.
-            cb.openComplete(rc, null, this.ctx);
+            openComplete(rc, null);
             return;
         }
 
@@ -125,12 +134,12 @@ class LedgerOpenOp implements GenericCal
             if (metadata.hasPassword()) {
                 if (!Arrays.equals(passwd, metadata.getPassword())) {
                     LOG.error("Provided passwd does not match that in metadata");
-                    cb.openComplete(BKException.Code.UnauthorizedAccessException, null, this.ctx);
+                    openComplete(BKException.Code.UnauthorizedAccessException, null);
                     return;
                 }
                 if (digestType != metadata.getDigestType()) {
                     LOG.error("Provided digest does not match that in metadata");
-                    cb.openComplete(BKException.Code.DigestMatchException, null, this.ctx);
+                    openComplete(BKException.Code.DigestMatchException, null);
                     return;
                 }
             }
@@ -141,49 +150,56 @@ class LedgerOpenOp implements GenericCal
             lh = new ReadOnlyLedgerHandle(bk, ledgerId, metadata, digestType, passwd, !doRecovery);
         } catch (GeneralSecurityException e) {
             LOG.error("Security exception while opening ledger: " + ledgerId, e);
-            cb.openComplete(BKException.Code.DigestNotInitializedException, null, this.ctx);
+            openComplete(BKException.Code.DigestNotInitializedException, null);
             return;
         } catch (NumberFormatException e) {
             LOG.error("Incorrectly entered parameter throttle: " + bk.getConf().getThrottleValue(), e);
-            cb.openComplete(BKException.Code.IncorrectParameterException, null, this.ctx);
+            openComplete(BKException.Code.IncorrectParameterException, null);
             return;
         }
 
         if (metadata.isClosed()) {
             // Ledger was closed properly
-            cb.openComplete(BKException.Code.OK, lh, this.ctx);
+            openComplete(BKException.Code.OK, lh);
             return;
         }
 
         if (doRecovery) {
             lh.recover(new OrderedSafeGenericCallback<Void>(bk.mainWorkerPool, ledgerId) {
-                    @Override
-                    public void safeOperationComplete(int rc, Void result) {
-                        if (rc == BKException.Code.OK) {
-                            cb.openComplete(BKException.Code.OK, lh, LedgerOpenOp.this.ctx);
-                        } else if (rc == BKException.Code.UnauthorizedAccessException) {
-                            cb.openComplete(BKException.Code.UnauthorizedAccessException, null, LedgerOpenOp.this.ctx);
-                        } else {
-                            cb.openComplete(BKException.Code.LedgerRecoveryException, null, LedgerOpenOp.this.ctx);
-                        }
+                @Override
+                public void safeOperationComplete(int rc, Void result) {
+                    if (rc == BKException.Code.OK) {
+                        openComplete(BKException.Code.OK, lh);
+                    } else if (rc == BKException.Code.UnauthorizedAccessException) {
+                        openComplete(BKException.Code.UnauthorizedAccessException, null);
+                    } else {
+                        openComplete(BKException.Code.LedgerRecoveryException, null);
                     }
-                });
+                }
+            });
         } else {
             lh.asyncReadLastConfirmed(new ReadLastConfirmedCallback() {
-
                 @Override
                 public void readLastConfirmedComplete(int rc,
                         long lastConfirmed, Object ctx) {
                     if (rc != BKException.Code.OK) {
-                        cb.openComplete(BKException.Code.ReadException, null, LedgerOpenOp.this.ctx);
+                        openComplete(BKException.Code.ReadException, null);
                     } else {
                         lh.lastAddConfirmed = lh.lastAddPushed = lastConfirmed;
-                        cb.openComplete(BKException.Code.OK, lh, LedgerOpenOp.this.ctx);
+                        openComplete(BKException.Code.OK, lh);
                     }
                 }
-                
             }, null);
-            
+
+        }
+    }
+
+    void openComplete(int rc, LedgerHandle lh) {
+        if (BKException.Code.OK != rc) {
+            openOpLogger.registerFailedEvent(startTime);
+        } else {
+            openOpLogger.registerSuccessfulEvent(startTime);
         }
+        cb.openComplete(rc, lh, ctx);
     }
 }

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java?rev=1559918&r1=1559917&r2=1559918&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java Tue Jan 21 04:19:09 2014
@@ -23,6 +23,8 @@ import java.net.InetSocketAddress;
 import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
 import org.apache.bookkeeper.proto.BookieProtocol;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
+import org.apache.bookkeeper.stats.OpStatsLogger;
+import org.apache.bookkeeper.util.MathUtils;
 import org.jboss.netty.buffer.ChannelBuffer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -51,14 +53,18 @@ class PendingAddOp implements WriteCallb
 
     LedgerHandle lh;
     boolean isRecoveryAdd = false;
+    long requestTimeMillis;
+    OpStatsLogger addOpLogger;
 
     PendingAddOp(LedgerHandle lh, AddCallback cb, Object ctx) {
         this.lh = lh;
         this.cb = cb;
         this.ctx = ctx;
         this.entryId = LedgerHandle.INVALID_ENTRY_ID;
-        
+
         ackSet = lh.distributionSchedule.getAckSet();
+
+        addOpLogger = lh.bk.getAddOpLogger();
     }
 
     /**
@@ -123,6 +129,7 @@ class PendingAddOp implements WriteCallb
     }
 
     void initiate(ChannelBuffer toSend, int entryLength) {
+        requestTimeMillis = MathUtils.now();
         this.toSend = toSend;
         this.entryLength = entryLength;
         for (int bookieIndex : writeSet) {
@@ -177,11 +184,14 @@ class PendingAddOp implements WriteCallb
     }
 
     void submitCallback(final int rc) {
+        long latencyMillis = MathUtils.now() - requestTimeMillis;
         if (rc != BKException.Code.OK) {
+            addOpLogger.registerFailedEvent(latencyMillis);
             LOG.error("Write of ledger entry to quorum failed: L{} E{}",
                       lh.getId(), entryId);
+        } else {
+            addOpLogger.registerSuccessfulEvent(latencyMillis);
         }
-
         cb.addComplete(rc, lh, entryId, ctx);
     }
 

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java?rev=1559918&r1=1559917&r2=1559918&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java Tue Jan 21 04:19:09 2014
@@ -1,5 +1,3 @@
-package org.apache.bookkeeper.client;
-
 /*
  *
  * Licensed to the Apache Software Foundation (ASF) under one
@@ -20,6 +18,8 @@ package org.apache.bookkeeper.client;
  * under the License.
  *
  */
+package org.apache.bookkeeper.client;
+
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.BitSet;
@@ -38,6 +38,8 @@ import java.util.concurrent.atomic.Atomi
 import org.apache.bookkeeper.client.AsyncCallback.ReadCallback;
 import org.apache.bookkeeper.client.BKException.BKDigestMatchException;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryCallback;
+import org.apache.bookkeeper.stats.OpStatsLogger;
+import org.apache.bookkeeper.util.MathUtils;
 import org.jboss.netty.buffer.ChannelBuffer;
 import org.jboss.netty.buffer.ChannelBufferInputStream;
 import org.slf4j.Logger;
@@ -64,6 +66,9 @@ class PendingReadOp implements Enumerati
     long numPendingEntries;
     long startEntryId;
     long endEntryId;
+    long requestTimeMillis;
+    OpStatsLogger readOpLogger;
+
     final int maxMissedReadsAllowed;
 
     class LedgerEntryRequest extends LedgerEntry {
@@ -238,6 +243,7 @@ class PendingReadOp implements Enumerati
             return complete.get();
         }
 
+        @Override
         public String toString() {
             return String.format("L%d-E%d", ledgerId, entryId);
         }
@@ -257,24 +263,37 @@ class PendingReadOp implements Enumerati
                 - getLedgerMetadata().getAckQuorumSize();
         speculativeReadTimeout = lh.bk.getConf().getSpeculativeReadTimeout();
         heardFromHosts = new HashSet<InetSocketAddress>();
+
+        readOpLogger = lh.bk.getReadOpLogger();
     }
 
     protected LedgerMetadata getLedgerMetadata() {
         return lh.metadata;
     }
 
+    private void cancelSpeculativeTask(boolean mayInterruptIfRunning) {
+        if (speculativeTask != null) {
+            speculativeTask.cancel(mayInterruptIfRunning);
+            speculativeTask = null;
+        }
+    }
+
     public void initiate() throws InterruptedException {
         long nextEnsembleChange = startEntryId, i = startEntryId;
-
+        this.requestTimeMillis = MathUtils.now();
         ArrayList<InetSocketAddress> ensemble = null;
 
         if (speculativeReadTimeout > 0) {
             speculativeTask = scheduler.scheduleWithFixedDelay(new Runnable() {
+                    @Override
                     public void run() {
                         int x = 0;
                         for (LedgerEntryRequest r : seq) {
                             if (!r.isComplete()) {
-                                if (null != r.maybeSendSpeculativeRead(heardFromHosts)) {
+                                if (null == r.maybeSendSpeculativeRead(heardFromHosts)) {
+                                    // Subsequent speculative read will not materialize anyway
+                                    cancelSpeculativeTask(false);
+                                } else {
                                     LOG.debug("Send speculative read for {}. Hosts heard are {}.",
                                               r, heardFromHosts);
                                     ++x;
@@ -343,10 +362,7 @@ class PendingReadOp implements Enumerati
     }
 
     private void submitCallback(int code) {
-        if (speculativeTask != null) {
-            speculativeTask.cancel(true);
-            speculativeTask = null;
-        }
+        long latencyMillis = MathUtils.now() - requestTimeMillis;
         if (code != BKException.Code.OK) {
             long firstUnread = LedgerHandle.INVALID_ENTRY_ID;
             for (LedgerEntryRequest req : seq) {
@@ -357,13 +373,20 @@ class PendingReadOp implements Enumerati
             }
             LOG.error("Read of ledger entry failed: L{} E{}-E{}, Heard from {}. First unread entry is {}",
                     new Object[] { lh.getId(), startEntryId, endEntryId, heardFromHosts, firstUnread });
+            readOpLogger.registerFailedEvent(latencyMillis);
+        } else {
+            readOpLogger.registerSuccessfulEvent(latencyMillis);
         }
+        cancelSpeculativeTask(true);
         cb.readComplete(code, lh, PendingReadOp.this, PendingReadOp.this.ctx);
     }
+
+    @Override
     public boolean hasMoreElements() {
         return !seq.isEmpty();
     }
 
+    @Override
     public LedgerEntry nextElement() throws NoSuchElementException {
         return seq.remove();
     }

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java?rev=1559918&r1=1559917&r2=1559918&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java Tue Jan 21 04:19:09 2014
@@ -39,6 +39,8 @@ import org.apache.bookkeeper.conf.Client
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryCallback;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
+import org.apache.bookkeeper.stats.NullStatsLogger;
+import org.apache.bookkeeper.stats.StatsLogger;
 import org.apache.bookkeeper.util.OrderedSafeExecutor;
 import org.apache.bookkeeper.util.SafeRunnable;
 import org.jboss.netty.buffer.ChannelBuffer;
@@ -70,13 +72,20 @@ public class BookieClient {
     private final ClientConfiguration conf;
     private volatile boolean closed;
     private final ReentrantReadWriteLock closeLock;
+    private final StatsLogger statsLogger;
 
     public BookieClient(ClientConfiguration conf, ClientSocketChannelFactory channelFactory, OrderedSafeExecutor executor) {
+        this(conf, channelFactory, executor, NullStatsLogger.INSTANCE);
+    }
+
+    public BookieClient(ClientConfiguration conf, ClientSocketChannelFactory channelFactory, OrderedSafeExecutor executor,
+                        StatsLogger statsLogger) {
         this.conf = conf;
         this.channelFactory = channelFactory;
         this.executor = executor;
         this.closed = false;
         this.closeLock = new ReentrantReadWriteLock();
+        this.statsLogger = statsLogger;
     }
 
     public PerChannelBookieClient lookupClient(InetSocketAddress addr) {
@@ -89,7 +98,7 @@ public class BookieClient {
                     return null;
                 }
                 channel = new PerChannelBookieClient(conf, executor, channelFactory, addr, totalBytesOutstanding,
-                        timeoutExecutor);
+                        timeoutExecutor, statsLogger);
                 PerChannelBookieClient prevChannel = channels.putIfAbsent(addr, channel);
                 if (prevChannel != null) {
                     channel = prevChannel;

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java?rev=1559918&r1=1559917&r2=1559918&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java Tue Jan 21 04:19:09 2014
@@ -28,10 +28,14 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.BookKeeperClientStats;
 import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryCallback;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
+import org.apache.bookkeeper.stats.NullStatsLogger;
+import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.bookkeeper.stats.OpStatsLogger;
 import org.apache.bookkeeper.util.MathUtils;
 import org.apache.bookkeeper.util.OrderedSafeExecutor;
 import org.apache.bookkeeper.util.SafeRunnable;
@@ -69,6 +73,7 @@ public class PerChannelBookieClient exte
     static final long maxMemory = Runtime.getRuntime().maxMemory() / 5;
     public static final int MAX_FRAME_LENGTH = 2 * 1024 * 1024; // 2M
 
+
     InetSocketAddress addr;
     AtomicLong totalBytesOutstanding;
     ClientSocketChannelFactory channelFactory;
@@ -78,6 +83,12 @@ public class PerChannelBookieClient exte
     ConcurrentHashMap<CompletionKey, AddCompletion> addCompletions = new ConcurrentHashMap<CompletionKey, AddCompletion>();
     ConcurrentHashMap<CompletionKey, ReadCompletion> readCompletions = new ConcurrentHashMap<CompletionKey, ReadCompletion>();
 
+    private final StatsLogger statsLogger;
+    private final OpStatsLogger readEntryOpLogger;
+    private final OpStatsLogger readTimeoutOpLogger;
+    private final OpStatsLogger addEntryOpLogger;
+    private final OpStatsLogger addTimeoutOpLogger;
+
     /**
      * The following member variables do not need to be concurrent, or volatile
      * because they are always updated under a lock
@@ -108,17 +119,23 @@ public class PerChannelBookieClient exte
         try {
             for (CompletionKey key : addCompletions.keySet()) {
                 total++;
-                if (key.shouldTimeout(conf.getAddEntryTimeout() * 1000)) {
-                    errorOutAddKey(key);
-                    numAdd++;
-                }
+                long elapsedTime = key.elapsedTime();
+                if (elapsedTime < conf.getAddEntryTimeout() * 1000) {
+                    continue;
+                }
+                errorOutAddKey(key);
+                numAdd++;
+                addTimeoutOpLogger.registerSuccessfulEvent(elapsedTime);
             }
             for (CompletionKey key : readCompletions.keySet()) {
                 total++;
-                if (key.shouldTimeout(conf.getReadEntryTimeout() * 1000)) {
-                    errorOutReadKey(key);
-                    numRead++;
-                }
+                long elapsedTime = key.elapsedTime();
+                if (elapsedTime < conf.getReadEntryTimeout() * 1000) {
+                    continue;
+                }
+                errorOutReadKey(key);
+                numRead++;
+                readTimeoutOpLogger.registerSuccessfulEvent(elapsedTime);
             }
         } catch (Throwable t) {
             LOG.error("Caught RuntimeException while erroring out timed out entries : ", t);
@@ -133,23 +150,39 @@ public class PerChannelBookieClient exte
     public PerChannelBookieClient(OrderedSafeExecutor executor, ClientSocketChannelFactory channelFactory,
                                   InetSocketAddress addr, AtomicLong totalBytesOutstanding,
                                   ScheduledExecutorService timeoutExecutor) {
-        this(new ClientConfiguration(), executor, channelFactory, addr, totalBytesOutstanding, timeoutExecutor);
+        this(new ClientConfiguration(), executor, channelFactory, addr, totalBytesOutstanding, timeoutExecutor,
+                NullStatsLogger.INSTANCE);
     }
 
     public PerChannelBookieClient(OrderedSafeExecutor executor, ClientSocketChannelFactory channelFactory,
                                   InetSocketAddress addr, AtomicLong totalBytesOutstanding) {
-        this(new ClientConfiguration(), executor, channelFactory, addr, totalBytesOutstanding, null);
+        this(new ClientConfiguration(), executor, channelFactory, addr, totalBytesOutstanding, null,
+                NullStatsLogger.INSTANCE);
     }
 
     public PerChannelBookieClient(ClientConfiguration conf, OrderedSafeExecutor executor,
                                   ClientSocketChannelFactory channelFactory, InetSocketAddress addr,
-                                  AtomicLong totalBytesOutstanding, ScheduledExecutorService timeoutExecutor) {
+                                  AtomicLong totalBytesOutstanding, ScheduledExecutorService timeoutExecutor,
+                                  StatsLogger parentStatsLogger) {
         this.conf = conf;
         this.addr = addr;
         this.executor = executor;
         this.totalBytesOutstanding = totalBytesOutstanding;
         this.channelFactory = channelFactory;
         this.state = ConnectionState.DISCONNECTED;
+
+        StringBuilder nameBuilder = new StringBuilder();
+        nameBuilder.append(addr.getHostName().replace('.', '_').replace('-', '_'))
+            .append("_").append(addr.getPort());
+
+        this.statsLogger = parentStatsLogger.scope(BookKeeperClientStats.CHANNEL_SCOPE)
+            .scope(nameBuilder.toString());
+
+        readEntryOpLogger = statsLogger.getOpStatsLogger(BookKeeperClientStats.CHANNEL_READ_OP);
+        addEntryOpLogger = statsLogger.getOpStatsLogger(BookKeeperClientStats.CHANNEL_ADD_OP);
+        readTimeoutOpLogger = statsLogger.getOpStatsLogger(BookKeeperClientStats.CHANNEL_TIMEOUT_READ);
+        addTimeoutOpLogger = statsLogger.getOpStatsLogger(BookKeeperClientStats.CHANNEL_TIMEOUT_ADD);
+
         this.timeoutExecutor = timeoutExecutor;
         // scheudle the timeout task
         if (null != this.timeoutExecutor) {
@@ -282,7 +315,7 @@ public class PerChannelBookieClient exte
                 ledgerId, entryId, (short)options, masterKey, toSend);
         final int entrySize = toSend.readableBytes();
         final CompletionKey completionKey = new CompletionKey(ledgerId, entryId);
-        addCompletions.put(completionKey, new AddCompletion(cb, entrySize, ctx));
+        addCompletions.put(completionKey, new AddCompletion(addEntryOpLogger, cb, ctx));
         final Channel c = channel;
         if (c == null) {
             errorOutAddKey(completionKey);
@@ -318,7 +351,7 @@ public class PerChannelBookieClient exte
                                         final long entryId,
                                         ReadEntryCallback cb, Object ctx) {
         final CompletionKey key = new CompletionKey(ledgerId, entryId);
-        readCompletions.put(key, new ReadCompletion(cb, ctx));
+        readCompletions.put(key, new ReadCompletion(readEntryOpLogger, cb, ctx));
 
         final BookieProtocol.ReadRequest r = new BookieProtocol.ReadRequest(
                 BookieProtocol.CURRENT_PROTOCOL_VERSION, ledgerId, entryId,
@@ -357,7 +390,7 @@ public class PerChannelBookieClient exte
 
     public void readEntry(final long ledgerId, final long entryId, ReadEntryCallback cb, Object ctx) {
         final CompletionKey key = new CompletionKey(ledgerId, entryId);
-        readCompletions.put(key, new ReadCompletion(cb, ctx));
+        readCompletions.put(key, new ReadCompletion(readEntryOpLogger, cb, ctx));
 
         final BookieProtocol.ReadRequest r = new BookieProtocol.ReadRequest(
                 BookieProtocol.CURRENT_PROTOCOL_VERSION, ledgerId, entryId,
@@ -713,27 +746,69 @@ public class PerChannelBookieClient exte
      * Boiler-plate wrapper classes follow
      *
      */
+
     // visible for testing
-    static class ReadCompletion {
-        final ReadEntryCallback cb;
+    static abstract class CompletionValue {
         final Object ctx;
 
-        public ReadCompletion(ReadEntryCallback cb, Object ctx) {
-            this.cb = cb;
+        public CompletionValue(Object ctx) {
             this.ctx = ctx;
         }
     }
 
     // visible for testing
-    static class AddCompletion {
+    static class ReadCompletion extends CompletionValue {
+        final ReadEntryCallback cb;
+
+        public ReadCompletion(ReadEntryCallback cb, Object ctx) {
+            this(null, cb, ctx);
+        }
+
+        public ReadCompletion(final OpStatsLogger readEntryOpLogger,
+                              final ReadEntryCallback originalCallback,
+                              final Object originalCtx) {
+            super(originalCtx);
+            final long requestTimeMillis = MathUtils.now();
+            this.cb = null == readEntryOpLogger ? originalCallback : new ReadEntryCallback() {
+                @Override
+                public void readEntryComplete(int rc, long ledgerId, long entryId, ChannelBuffer buffer, Object ctx) {
+                    long latencyMillis = MathUtils.now() - requestTimeMillis;
+                    if (rc != BKException.Code.OK) {
+                        readEntryOpLogger.registerFailedEvent(latencyMillis);
+                    } else {
+                        readEntryOpLogger.registerSuccessfulEvent(latencyMillis);
+                    }
+                    originalCallback.readEntryComplete(rc, ledgerId, entryId, buffer, originalCtx);
+                }
+            };
+        }
+    }
+
+    // visible for testing
+    static class AddCompletion extends CompletionValue {
         final WriteCallback cb;
-        //final long size;
-        final Object ctx;
 
-        public AddCompletion(WriteCallback cb, long size, Object ctx) {
-            this.cb = cb;
-            //this.size = size;
-            this.ctx = ctx;
+        public AddCompletion(WriteCallback cb, Object ctx) {
+            this(null, cb, ctx);
+        }
+
+        public AddCompletion(final OpStatsLogger addEntryOpLogger,
+                             final WriteCallback originalCallback,
+                             final Object originalCtx) {
+            super(originalCtx);
+            final long requestTimeMillis = MathUtils.now();
+            this.cb = null == addEntryOpLogger ? originalCallback : new WriteCallback() {
+                @Override
+                public void writeComplete(int rc, long ledgerId, long entryId, InetSocketAddress addr, Object ctx) {
+                    long latencyMillis = MathUtils.now() - requestTimeMillis;
+                    if (rc != BKException.Code.OK) {
+                        addEntryOpLogger.registerFailedEvent(latencyMillis);
+                    } else {
+                        addEntryOpLogger.registerSuccessfulEvent(latencyMillis);
+                    }
+                    originalCallback.writeComplete(rc, ledgerId, entryId, addr, originalCtx);
+                }
+            };
         }
     }
 

Modified: zookeeper/bookkeeper/trunk/pom.xml
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/pom.xml?rev=1559918&r1=1559917&r2=1559918&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/pom.xml (original)
+++ zookeeper/bookkeeper/trunk/pom.xml Tue Jan 21 04:19:09 2014
@@ -35,9 +35,11 @@
     <module>hedwig-client</module>
     <module>hedwig-server</module>
     <module>hedwig-protocol</module>
+    <module>hedwig-client-jms</module>
+    <module>bookkeeper-stats</module>
     <module>bookkeeper-server</module>
     <module>bookkeeper-benchmark</module>
-    <module>hedwig-client-jms</module>
+    <module>bookkeeper-stats-providers</module>
   </modules>
   <properties>
     <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>



Mime
View raw message