zookeeper-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From eolive...@apache.org
Subject [zookeeper] branch master updated: ZOOKEEPER-3448: Introduce MessageTracker to assist debug leader and leaner connectivity issues.
Date Fri, 23 Aug 2019 17:43:09 GMT
This is an automated email from the ASF dual-hosted git repository.

eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zookeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new b5817fb  ZOOKEEPER-3448: Introduce MessageTracker to assist debug leader and leaner
connectivity issues.
b5817fb is described below

commit b5817fbb12b88bc28be72f109c695ebf51bebf3b
Author: Michael Han <lhan@twitter.com>
AuthorDate: Fri Aug 23 19:43:02 2019 +0200

    ZOOKEEPER-3448: Introduce MessageTracker to assist debug leader and leaner connectivity
issues.
    
    We want to have better insight on the state of the world when learners lost connection
with leader, so we need capture more information when that happens. We capture more information
through MessageTracker which will record the last few sent and received messages at various
protocol stage, and these information will be dumped to log files for further analysis.
    
    Author: Michael Han <lhan@twitter.com>
    Author: Michael Han <hanm@apache.org>
    
    Reviewers: Enrico Olivelli <eolivelli@apache.org>, Fangmin Lyu <fangmin@apache.org>
    
    Closes #1007 from hanm/twitter/2765eb0629d2f63f07d112270b582e8e931f734f
---
 .../src/main/resources/markdown/zookeeperAdmin.md  |  20 +++
 .../apache/zookeeper/server/quorum/Follower.java   |  14 ++
 .../apache/zookeeper/server/quorum/Learner.java    |   8 +
 .../zookeeper/server/quorum/LearnerHandler.java    |  19 +-
 .../apache/zookeeper/server/quorum/Observer.java   |  13 +-
 .../zookeeper/server/util/CircularBuffer.java      | 103 +++++++++++
 .../zookeeper/server/util/MessageTracker.java      | 165 +++++++++++++++++
 .../zookeeper/server/util/CircularBufferTest.java  | 198 +++++++++++++++++++++
 .../zookeeper/server/util/MessageTrackerTest.java  | 129 ++++++++++++++
 9 files changed, 667 insertions(+), 2 deletions(-)

diff --git a/zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md b/zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md
index dbf9a08..d3bcaff 100644
--- a/zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md
+++ b/zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md
@@ -1494,6 +1494,26 @@ Both subsystems need to have sufficient amount of threads to achieve
peak read t
     minute. This prevents herding during container deletion.
     Default is "10000".
 
+<a name="sc_debug_observability_config"></a>
+
+#### Debug Observability Configurations
+
+**New in 3.6.0:** The following options are introduced to make zookeeper easier to debug.
+
+* *zookeeper.messageTracker.BufferSize* :
+    (Java system property only)
+    Controls the maximum number of messages stored in **MessageTracker**. Value should be
positive
+    integers. The default value is 10. **MessageTracker** is introduced in **3.6.0** to record
the
+    last set of messages between a server (follower or observer) and a leader, when a server
+    disconnects with leader. These set of messages will then be dumped to zookeeper's log
file,
+    and will help reconstruct the state of the servers at the time of the disconnection and
+    will be useful for debugging purpose.
+
+* *zookeeper.messageTracker.Enabled* :
+    (Java system property only)
+    When set to "true", will enable **MessageTracker** to track and record messages. Default
value
+    is "false".
+
 <a name="sc_adminserver_config"></a>
 
 #### AdminServer configuration
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Follower.java
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Follower.java
index 57b53e6..532dfc8 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Follower.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Follower.java
@@ -74,11 +74,16 @@ public class Follower extends Learner {
         self.start_fle = 0;
         self.end_fle = 0;
         fzk.registerJMX(new FollowerBean(this, zk), self.jmxLocalPeerBean);
+
+        long connectionTime = 0;
+        boolean completedSync = false;
+
         try {
             self.setZabState(QuorumPeer.ZabState.DISCOVERY);
             QuorumServer leaderServer = findLeader();
             try {
                 connectToLeader(leaderServer.addr, leaderServer.hostname);
+                connectionTime = System.currentTimeMillis();
                 long newEpochZxid = registerWithLeader(Leader.FOLLOWERINFO);
                 if (self.isReconfigStateChange()) {
                     throw new Exception("learned about role change");
@@ -99,6 +104,7 @@ public class Follower extends Learner {
                     self.setZabState(QuorumPeer.ZabState.SYNCHRONIZATION);
                     syncWithLeader(newEpochZxid);
                     self.setZabState(QuorumPeer.ZabState.BROADCAST);
+                    completedSync = true;
                 } finally {
                     long syncTime = Time.currentElapsedTime() - startTime;
                     ServerMetrics.getMetrics().FOLLOWER_SYNC_TIME.add(syncTime);
@@ -129,6 +135,14 @@ public class Follower extends Learner {
                 om.stop();
             }
             zk.unregisterJMX(this);
+
+            if (connectionTime != 0) {
+                long connectionDuration = System.currentTimeMillis() - connectionTime;
+                LOG.info("Disconnected from leader (with address: {}). "
+                        + "Was connected for {}ms. Sync state: {}",
+                    leaderAddr, connectionDuration, completedSync);
+                messageTracker.dumpToLog(leaderAddr.toString());
+            }
         }
     }
 
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java
index d33b609..6f55483 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java
@@ -47,6 +47,7 @@ import org.apache.zookeeper.server.ServerCnxn;
 import org.apache.zookeeper.server.ZooTrace;
 import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer;
 import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier;
+import org.apache.zookeeper.server.util.MessageTracker;
 import org.apache.zookeeper.server.util.SerializeUtils;
 import org.apache.zookeeper.server.util.ZxidUtils;
 import org.apache.zookeeper.txn.SetDataTxn;
@@ -74,6 +75,7 @@ public class Learner {
     protected BufferedOutputStream bufferedOutput;
 
     protected Socket sock;
+    protected InetSocketAddress leaderAddr;
 
     /**
      * Socket getter
@@ -88,6 +90,9 @@ public class Learner {
     /** the protocol version of the leader */
     protected int leaderProtocolVersion = 0x01;
 
+    private static final int BUFFERED_MESSAGE_SIZE = 10;
+    protected final MessageTracker messageTracker = new MessageTracker(BUFFERED_MESSAGE_SIZE);
+
     protected static final Logger LOG = LoggerFactory.getLogger(Learner.class);
 
     /**
@@ -146,6 +151,7 @@ public class Learner {
     void writePacket(QuorumPacket pp, boolean flush) throws IOException {
         synchronized (leaderOs) {
             if (pp != null) {
+                messageTracker.trackSent(pp.getType());
                 leaderOs.writeRecord(pp, "packet");
             }
             if (flush) {
@@ -164,6 +170,7 @@ public class Learner {
     void readPacket(QuorumPacket pp) throws IOException {
         synchronized (leaderIs) {
             leaderIs.readRecord(pp, "packet");
+            messageTracker.trackReceived(pp.getType());
         }
         long traceMask = ZooTrace.SERVER_PACKET_TRACE_MASK;
         if (pp.getType() == Leader.PING) {
@@ -250,6 +257,7 @@ public class Learner {
      */
     protected void connectToLeader(InetSocketAddress addr, String hostname) throws IOException,
InterruptedException, X509Exception {
         this.sock = createSocket();
+        this.leaderAddr = addr;
 
         // leader connection timeout defaults to tickTime * initLimit
         int connectTimeout = self.tickTime * self.initLimit;
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerHandler.java
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerHandler.java
index 7802a0e..148951a 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerHandler.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerHandler.java
@@ -49,6 +49,7 @@ import org.apache.zookeeper.server.ZooTrace;
 import org.apache.zookeeper.server.quorum.Leader.Proposal;
 import org.apache.zookeeper.server.quorum.QuorumPeer.LearnerType;
 import org.apache.zookeeper.server.quorum.auth.QuorumAuthServer;
+import org.apache.zookeeper.server.util.MessageTracker;
 import org.apache.zookeeper.server.util.SerializeUtils;
 import org.apache.zookeeper.server.util.ZxidUtils;
 import org.apache.zookeeper.txn.TxnHeader;
@@ -220,6 +221,8 @@ public class LearnerHandler extends ZooKeeperThread {
     private final BufferedInputStream bufferedInput;
     private BufferedOutputStream bufferedOutput;
 
+    protected final MessageTracker messageTracker;
+
     // for test only
     protected void setOutputArchive(BinaryOutputArchive oa) {
         this.oa = oa;
@@ -280,6 +283,8 @@ public class LearnerHandler extends ZooKeeperThread {
             }
             throw new SaslException("Authentication failure: " + e.getMessage());
         }
+
+        this.messageTracker = new MessageTracker(MessageTracker.BUFFERED_MESSAGE_SIZE);
     }
 
     @Override
@@ -349,6 +354,7 @@ public class LearnerHandler extends ZooKeeperThread {
                 }
                 oa.writeRecord(p, "packet");
                 packetsSent.incrementAndGet();
+                messageTracker.trackSent(p.getType());
             } catch (IOException e) {
                 if (!sock.isClosed()) {
                     LOG.warn("Unexpected exception at " + this, e);
@@ -464,8 +470,11 @@ public class LearnerHandler extends ZooKeeperThread {
 
             QuorumPacket qp = new QuorumPacket();
             ia.readRecord(qp, "packet");
+
+            messageTracker.trackReceived(qp.getType());
             if (qp.getType() != Leader.FOLLOWERINFO && qp.getType() != Leader.OBSERVERINFO)
{
                 LOG.error("First packet " + qp.toString() + " is not FOLLOWERINFO or OBSERVERINFO!");
+
                 return;
             }
 
@@ -526,9 +535,11 @@ public class LearnerHandler extends ZooKeeperThread {
                 ByteBuffer.wrap(ver).putInt(0x10000);
                 QuorumPacket newEpochPacket = new QuorumPacket(Leader.LEADERINFO, newLeaderZxid,
ver, null);
                 oa.writeRecord(newEpochPacket, "packet");
+                messageTracker.trackSent(Leader.LEADERINFO);
                 bufferedOutput.flush();
                 QuorumPacket ackEpochPacket = new QuorumPacket();
                 ia.readRecord(ackEpochPacket, "packet");
+                messageTracker.trackReceived(ackEpochPacket.getType());
                 if (ackEpochPacket.getType() != Leader.ACKEPOCH) {
                     LOG.error(ackEpochPacket.toString() + " is not ACKEPOCH");
                     return;
@@ -554,6 +565,7 @@ public class LearnerHandler extends ZooKeeperThread {
                 try {
                     long zxidToSend = learnerMaster.getZKDatabase().getDataTreeLastProcessedZxid();
                     oa.writeRecord(new QuorumPacket(Leader.SNAP, zxidToSend, null, null),
"packet");
+                    messageTracker.trackSent(Leader.SNAP);
                     bufferedOutput.flush();
 
                     LOG.info("Sending snapshot last zxid of peer is 0x{}, zxid of leader
is 0x{}, "
@@ -600,6 +612,8 @@ public class LearnerHandler extends ZooKeeperThread {
              */
             qp = new QuorumPacket();
             ia.readRecord(qp, "packet");
+
+            messageTracker.trackReceived(qp.getType());
             if (qp.getType() != Leader.ACK) {
                 LOG.error("Next packet was supposed to be an ACK," + " but received packet:
{}", packetToString(qp));
                 return;
@@ -632,6 +646,7 @@ public class LearnerHandler extends ZooKeeperThread {
             while (true) {
                 qp = new QuorumPacket();
                 ia.readRecord(qp, "packet");
+                messageTracker.trackReceived(qp.getType());
 
                 long traceMask = ZooTrace.SERVER_PACKET_TRACE_MASK;
                 if (qp.getType() == Leader.PING) {
@@ -716,7 +731,9 @@ public class LearnerHandler extends ZooKeeperThread {
                 syncThrottler.endSync();
                 syncThrottler = null;
             }
-            LOG.warn("******* GOODBYE {} ********", getRemoteAddress());
+            String remoteAddr = getRemoteAddress();
+            LOG.warn("******* GOODBYE {} ********", remoteAddr);
+            messageTracker.dumpToLog(remoteAddr);
             shutdown();
         }
     }
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Observer.java
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Observer.java
index 4081c27..4a7f386 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Observer.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Observer.java
@@ -97,12 +97,14 @@ public class Observer extends Learner {
      */
     void observeLeader() throws Exception {
         zk.registerJMX(new ObserverBean(this, zk), self.jmxLocalPeerBean);
-
+        long connectTime = 0;
+        boolean completedSync = false;
         try {
             self.setZabState(QuorumPeer.ZabState.DISCOVERY);
             QuorumServer master = findLearnerMaster();
             try {
                 connectToLeader(master.addr, master.hostname);
+                connectTime = System.currentTimeMillis();
                 long newLeaderZxid = registerWithLeader(Leader.OBSERVERINFO);
                 if (self.isReconfigStateChange()) {
                     throw new Exception("learned about role change");
@@ -112,6 +114,7 @@ public class Observer extends Learner {
                 self.setZabState(QuorumPeer.ZabState.SYNCHRONIZATION);
                 syncWithLeader(newLeaderZxid);
                 self.setZabState(QuorumPeer.ZabState.BROADCAST);
+                completedSync = true;
                 QuorumPacket qp = new QuorumPacket();
                 while (this.isRunning() && nextLearnerMaster.get() == null) {
                     readPacket(qp);
@@ -127,6 +130,14 @@ public class Observer extends Learner {
         } finally {
             currentLearnerMaster = null;
             zk.unregisterJMX(this);
+            if (connectTime != 0) {
+                long connectionDuration = System.currentTimeMillis() - connectTime;
+
+                LOG.info("Disconnected from leader (with address: {}). "
+                        + "Was connected for {}ms. Sync state: {}",
+                    leaderAddr, connectionDuration, completedSync);
+                messageTracker.dumpToLog(leaderAddr.toString());
+            }
         }
     }
 
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/util/CircularBuffer.java
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/util/CircularBuffer.java
new file mode 100644
index 0000000..d338120
--- /dev/null
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/util/CircularBuffer.java
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.util;
+
+import java.lang.reflect.Array;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Thread safe FIFO CircularBuffer implementation.
+ * When the buffer is full write operation overwrites the oldest element.
+ *
+ * Fun thing @todo, make this lock free as this is called on every quorum message
+ */
+public class CircularBuffer<T> {
+
+    private final T[] buffer;
+    private final int capacity;
+    private int oldest;
+    private AtomicInteger numberOfElements = new AtomicInteger();
+
+    @SuppressWarnings("unchecked")
+    public CircularBuffer(Class<T> clazz, int capacity) {
+        if (capacity <= 0) {
+            throw new IllegalArgumentException("CircularBuffer capacity should be greater
than 0");
+        }
+        this.buffer = (T[]) Array.newInstance(clazz, capacity);
+        this.capacity = capacity;
+    }
+
+    /**
+     * Puts elements in the next available index in the array.
+     * If the array is full the oldest element is replaced with
+     * the new value.
+     * @param element
+     */
+    public synchronized void write(T element) {
+        int newSize = numberOfElements.incrementAndGet();
+        if (newSize > capacity) {
+            buffer[oldest] = element;
+            oldest = ++oldest % capacity;
+            numberOfElements.decrementAndGet();
+        } else {
+            int index = (oldest + numberOfElements.get() - 1) % capacity;
+            buffer[index] = element;
+        }
+    }
+
+    /**
+     * Reads from the buffer in a FIFO manner.
+     * Returns the oldest element in the buffer if the buffer ie not empty
+     * Returns null if the buffer is empty
+     * @return
+     */
+    public synchronized T take() {
+        int newSize = numberOfElements.decrementAndGet();
+        if (newSize < 0) {
+            numberOfElements.incrementAndGet();
+            return null;
+        }
+        T polled = buffer[oldest];
+        oldest = ++oldest % capacity;
+        return polled;
+    }
+
+    public synchronized T peek() {
+        if (numberOfElements.get() <= 0) {
+            return null;
+        }
+        return buffer[oldest];
+    }
+
+    public int size() {
+        return numberOfElements.get();
+    }
+
+    public boolean isEmpty() {
+        return numberOfElements.get() <= 0;
+    }
+
+    public boolean isFull() {
+        return numberOfElements.get() >= capacity;
+    }
+
+    public synchronized void  reset() {
+        numberOfElements.set(0);
+    }
+}
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/util/MessageTracker.java
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/util/MessageTracker.java
new file mode 100644
index 0000000..086bdcc
--- /dev/null
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/util/MessageTracker.java
@@ -0,0 +1,165 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.util;
+
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import org.apache.zookeeper.server.quorum.Leader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * This class provides a way of buffering sentBuffer and receivedBuffer messages in order.
+ * It uses EvictingQueue of size BUFFERED_MESSAGE_SIZE to store the messages.
+ * When the queue is full it overrides the oldest in a circular manner.
+ * This class does doe not provide thread safety.
+ */
+public class MessageTracker {
+
+    private static final Logger LOG = LoggerFactory.getLogger(MessageTracker.class);
+
+    private final CircularBuffer<BufferedMessage> sentBuffer;
+    private final CircularBuffer<BufferedMessage> receivedBuffer;
+
+    public static final String MESSAGE_TRACKER_BUFFER_SIZE = "zookeeper.messageTracker.BufferSize";
+    public static final String MESSAGE_TRACKER_ENABLED = "zookeeper.messageTracker.Enabled";
+    public static final int BUFFERED_MESSAGE_SIZE;
+    private static final boolean enabled;
+    static {
+        BUFFERED_MESSAGE_SIZE = Integer.getInteger(MESSAGE_TRACKER_BUFFER_SIZE, 10);
+        enabled = Boolean.getBoolean(MESSAGE_TRACKER_ENABLED);
+    }
+
+    public MessageTracker(int buffer_size) {
+        this.sentBuffer = new CircularBuffer<>(BufferedMessage.class, buffer_size);
+        this.receivedBuffer = new CircularBuffer<>(BufferedMessage.class, buffer_size);
+    }
+
+    public void trackSent(long timestamp) {
+        if (enabled) {
+            sentBuffer.write(new BufferedMessage(timestamp));
+        }
+    }
+
+    public void trackSent(int packetType) {
+        if (enabled) {
+            sentBuffer.write(new BufferedMessage(packetType));
+        }
+    }
+
+    public void trackReceived(long timestamp) {
+        if (enabled) {
+            receivedBuffer.write(new BufferedMessage(timestamp));
+        }
+    }
+
+    public void trackReceived(int packetType) {
+        if (enabled) {
+            receivedBuffer.write(new BufferedMessage(packetType));
+        }
+    }
+
+    public final BufferedMessage peekSent() {
+        return sentBuffer.peek();
+    }
+
+    public final BufferedMessage peekReceived() {
+        return receivedBuffer.peek();
+    }
+
+    public final long peekSentTimestamp() {
+        return enabled ? sentBuffer.peek().getTimestamp() : 0;
+    }
+
+    public final long peekReceivedTimestamp() {
+        return enabled ? receivedBuffer.peek().getTimestamp() : 0;
+    }
+
+    public void dumpToLog(String serverAddress) {
+        if (!enabled) {
+            return;
+        }
+        logMessages(serverAddress, receivedBuffer, Direction.RECEIVED);
+        logMessages(serverAddress, sentBuffer, Direction.SENT);
+    }
+
+    private static void logMessages(
+        String serverAddr,
+        CircularBuffer<BufferedMessage> messages,
+        Direction direction) {
+        String sentOrReceivedText = direction == Direction.SENT ? "sentBuffer to" : "receivedBuffer
from";
+
+        if (messages.isEmpty()) {
+            LOG.info("No buffered timestamps for messages {} {}", sentOrReceivedText, serverAddr);
+        } else {
+            LOG.warn("Last {} timestamps for messages {} {}:",
+                messages.size(), sentOrReceivedText, serverAddr);
+            while (!messages.isEmpty()) {
+                LOG.warn("{} {}  {}",
+                    sentOrReceivedText,
+                    serverAddr,
+                    messages.take().toString());
+            }
+        }
+    }
+
+    /**
+     * Direction for message track.
+     */
+    private enum Direction {
+        SENT, RECEIVED
+    }
+
+    private static class BufferedMessage {
+
+        private long timestamp;
+        private int messageType;
+
+        private long getTimestamp() {
+            return timestamp;
+        }
+
+        BufferedMessage(int messageType) {
+            this.messageType = messageType;
+            this.timestamp = System.currentTimeMillis();
+        }
+
+        BufferedMessage(long timestamp) {
+            this.messageType = -1;
+            this.timestamp = timestamp;
+        }
+
+        @Override
+        /**
+         * ToString examples are as follows:
+         * TimeStamp: 2016-06-06 11:07:58,594 Type: PROPOSAL
+         * TimeStamp: 2016-06-06 11:07:58,187
+         */
+        public String toString() {
+            if (messageType == -1) {
+                return "TimeStamp: " + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss,SSS")
+                    .format(new Date(timestamp));
+            } else {
+                return "TimeStamp: " + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss,SSS")
+                    .format(new Date(timestamp)) + " Type: " + Leader.getPacketType(messageType);
+            }
+        }
+    }
+}
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/util/CircularBufferTest.java
b/zookeeper-server/src/test/java/org/apache/zookeeper/server/util/CircularBufferTest.java
new file mode 100644
index 0000000..ff3c83c
--- /dev/null
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/util/CircularBufferTest.java
@@ -0,0 +1,198 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.util;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class CircularBufferTest {
+
+    @Test
+    public void testCircularBuffer() {
+        final int capacity = 3;
+        CircularBuffer<String> buffer = new CircularBuffer<>(String.class, capacity);
+
+        Assert.assertTrue(buffer.isEmpty());
+        Assert.assertFalse(buffer.isFull());
+
+        // write to the buffer
+        buffer.write("A");
+        Assert.assertFalse(buffer.isEmpty());
+        Assert.assertFalse(buffer.isFull());
+
+        buffer.write("B");
+        Assert.assertFalse(buffer.isEmpty());
+        Assert.assertFalse(buffer.isFull());
+
+        buffer.write("C");
+        Assert.assertFalse(buffer.isEmpty());
+        Assert.assertTrue(buffer.isFull());
+
+        // Buffer is full.
+        // Read from buffer
+        Assert.assertEquals("A", buffer.take());
+        Assert.assertFalse(buffer.isEmpty());
+        Assert.assertFalse(buffer.isFull());
+
+        Assert.assertEquals("B", buffer.take());
+        Assert.assertFalse(buffer.isEmpty());
+        Assert.assertFalse(buffer.isFull());
+
+        Assert.assertEquals("C", buffer.take());
+        Assert.assertTrue(buffer.isEmpty());
+        Assert.assertFalse(buffer.isFull());
+
+        // write to the buffer
+        buffer.write("1");
+        Assert.assertFalse(buffer.isEmpty());
+        Assert.assertFalse(buffer.isFull());
+
+        buffer.write("2");
+        Assert.assertFalse(buffer.isEmpty());
+        Assert.assertFalse(buffer.isFull());
+
+        buffer.write("3");
+        Assert.assertFalse(buffer.isEmpty());
+        Assert.assertTrue(buffer.isFull());
+
+        buffer.write("4"); // 4 overwrites 1
+        Assert.assertFalse(buffer.isEmpty());
+        Assert.assertTrue(buffer.isFull());
+
+        // Buffer if full
+        // Read from buffer
+        Assert.assertEquals("2", buffer.take());
+        Assert.assertFalse(buffer.isEmpty());
+        Assert.assertFalse(buffer.isFull());
+
+        Assert.assertEquals("3", buffer.take());
+        Assert.assertFalse(buffer.isEmpty());
+        Assert.assertFalse(buffer.isFull());
+
+        Assert.assertEquals("4", buffer.take());
+        Assert.assertTrue(buffer.isEmpty());
+        Assert.assertFalse(buffer.isFull());
+
+        // write to the buffer
+        buffer.write("a");
+        Assert.assertFalse(buffer.isEmpty());
+        Assert.assertFalse(buffer.isFull());
+
+        buffer.write("b");
+        Assert.assertFalse(buffer.isEmpty());
+        Assert.assertFalse(buffer.isFull());
+
+        buffer.write("c");
+        Assert.assertFalse(buffer.isEmpty());
+        Assert.assertTrue(buffer.isFull());
+
+        buffer.write("d"); // d overwrites a
+        Assert.assertFalse(buffer.isEmpty());
+        Assert.assertTrue(buffer.isFull());
+
+        buffer.write("e"); // e overwrites b
+        Assert.assertFalse(buffer.isEmpty());
+        Assert.assertTrue(buffer.isFull());
+
+        buffer.write("f"); // f overwrites c
+        Assert.assertFalse(buffer.isEmpty());
+        Assert.assertTrue(buffer.isFull());
+
+        buffer.write("g"); // g overwrites d
+        Assert.assertFalse(buffer.isEmpty());
+        Assert.assertTrue(buffer.isFull());
+
+        // Buffer is full.
+        // Read from buffer
+        Assert.assertEquals("e", buffer.take());
+        Assert.assertFalse(buffer.isEmpty());
+        Assert.assertFalse(buffer.isFull());
+
+        Assert.assertEquals("f", buffer.take());
+        Assert.assertFalse(buffer.isEmpty());
+        Assert.assertFalse(buffer.isFull());
+
+        Assert.assertEquals("g", buffer.take());
+        Assert.assertTrue(buffer.isEmpty());
+        Assert.assertFalse(buffer.isFull());
+    }
+
+    @Test
+    public void testCircularBufferWithCapacity1() {
+        final int capacity = 1;
+        CircularBuffer<String> buffer = new CircularBuffer<>(String.class, capacity);
+
+        Assert.assertTrue(buffer.isEmpty());
+        Assert.assertFalse(buffer.isFull());
+
+        // write to the buffer
+        buffer.write("A");
+        Assert.assertFalse(buffer.isEmpty());
+        Assert.assertTrue(buffer.isFull());
+
+        buffer.write("B"); // B overwrite A
+        Assert.assertFalse(buffer.isEmpty());
+        Assert.assertTrue(buffer.isFull());
+
+        // Buffer is full.
+        // Read from buffer
+        Assert.assertEquals("B", buffer.take());
+        Assert.assertTrue(buffer.isEmpty());
+        Assert.assertFalse(buffer.isFull());
+    }
+
+    @Test
+    public void testCircularBufferReset() {
+        final int capacity = 3;
+        CircularBuffer<String> buffer = new CircularBuffer<>(String.class, capacity);
+
+        Assert.assertTrue(buffer.isEmpty());
+        Assert.assertFalse(buffer.isFull());
+
+        // write to the buffer
+        buffer.write("A");
+        Assert.assertFalse(buffer.isEmpty());
+        Assert.assertFalse(buffer.isFull());
+        Assert.assertEquals(1, buffer.size());
+        Assert.assertEquals("A", buffer.peek());
+
+        buffer.write("B");
+        Assert.assertFalse(buffer.isEmpty());
+        Assert.assertFalse(buffer.isFull());
+        Assert.assertEquals(2, buffer.size());
+        Assert.assertEquals("A", buffer.peek());
+
+        // reset
+        buffer.reset();
+        Assert.assertNull(buffer.peek());
+        Assert.assertTrue(buffer.isEmpty());
+        Assert.assertFalse(buffer.isFull());
+        Assert.assertEquals(0, buffer.size());
+    }
+
+    @Test
+    public void testCircularBufferIllegalCapacity() {
+        try {
+            CircularBuffer<String> buffer = new CircularBuffer<>(String.class,
0);
+            Assert.fail();
+        } catch (IllegalArgumentException e) {
+            Assert.assertEquals("CircularBuffer capacity should be greater than 0", e.getMessage());
+        }
+    }
+}
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/util/MessageTrackerTest.java
b/zookeeper-server/src/test/java/org/apache/zookeeper/server/util/MessageTrackerTest.java
new file mode 100644
index 0000000..f1cef10
--- /dev/null
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/util/MessageTrackerTest.java
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.util;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MessageTrackerTest {
+    private static final int BUFFERED_MESSAGE_SIZE = 5;
+    private static final Logger LOG = LoggerFactory.getLogger(MessageTrackerTest.class);
+
+    @Before
+    public void setup() {
+        System.setProperty(MessageTracker.MESSAGE_TRACKER_ENABLED, "true");
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        System.clearProperty(MessageTracker.MESSAGE_TRACKER_ENABLED);
+    }
+
+    @Test
+    public void testTrackSend() throws InterruptedException {
+        long timestamp1 = System.currentTimeMillis();
+        MessageTracker messageTracker = new MessageTracker(BUFFERED_MESSAGE_SIZE);
+
+        // First timestamp is added
+        messageTracker.trackSent(timestamp1);
+        Assert.assertEquals(messageTracker.peekSentTimestamp(), timestamp1);
+
+        Thread.sleep(2);
+
+        // Second timestamp is added
+        long timestamp2 = System.currentTimeMillis();
+        messageTracker.trackSent(timestamp2);
+        Assert.assertEquals(messageTracker.peekSentTimestamp(), timestamp1);
+    }
+
+    @Test
+    public void testTrackReceived() throws InterruptedException {
+        long timestamp1 = System.currentTimeMillis();
+        MessageTracker messageTracker = new MessageTracker(BUFFERED_MESSAGE_SIZE);
+
+        // First timestamp is added
+        messageTracker.trackReceived(timestamp1);
+        Assert.assertEquals(messageTracker.peekReceivedTimestamp(), timestamp1);
+
+        Thread.sleep(2);
+
+        // Second timestamp is added
+        long timestamp2 = System.currentTimeMillis();
+        messageTracker.trackReceived(timestamp2);
+        Assert.assertEquals(messageTracker.peekReceivedTimestamp(), timestamp1);
+    }
+
+    @Test
+    public void testMessageTrackerFull() throws InterruptedException {
+        MessageTracker messageTracker = new MessageTracker(BUFFERED_MESSAGE_SIZE);
+
+        // Add up to capacity + 1
+        long timestampSent = 0;
+        long timestampReceived = 0;
+        for (int i = 0; i <= BUFFERED_MESSAGE_SIZE; i++) {
+            if (i == 1) {
+                timestampSent = System.currentTimeMillis();
+                messageTracker.trackSent(timestampSent);
+                Thread.sleep(2);
+                timestampReceived = System.currentTimeMillis();
+                messageTracker.trackReceived(timestampReceived);
+            } else {
+                messageTracker.trackSent(System.currentTimeMillis());
+                messageTracker.trackReceived(System.currentTimeMillis());
+            }
+            Thread.sleep(1);
+        }
+
+        Assert.assertEquals(messageTracker.peekSentTimestamp(), timestampSent);
+        Assert.assertEquals(messageTracker.peekReceivedTimestamp(), timestampReceived);
+    }
+
+    @Test
+    public void testDumpToLog() {
+        long timestamp1 = System.currentTimeMillis();
+        MessageTracker messageTracker = new MessageTracker(BUFFERED_MESSAGE_SIZE);
+        String sid = "127.0.0.1";
+
+        // MessageTracker is empty
+        messageTracker.dumpToLog(sid);
+        Assert.assertNull(messageTracker.peekSent());
+        Assert.assertNull(messageTracker.peekReceived());
+
+        // There is 1 sent and 0 received
+        messageTracker.trackSent(timestamp1);
+        Assert.assertEquals(messageTracker.peekSentTimestamp(), timestamp1);
+        Assert.assertNull(messageTracker.peekReceived());
+        messageTracker.dumpToLog(sid);
+        Assert.assertNull(messageTracker.peekSent());
+        Assert.assertNull(messageTracker.peekReceived());
+
+        // There is 1 sent and 1 received
+        messageTracker.trackSent(timestamp1);
+        messageTracker.trackReceived(timestamp1);
+        Assert.assertEquals(messageTracker.peekSentTimestamp(), timestamp1);
+        Assert.assertEquals(messageTracker.peekReceivedTimestamp(), timestamp1);
+        messageTracker.dumpToLog(sid);
+        Assert.assertNull(messageTracker.peekSent());
+        Assert.assertNull(messageTracker.peekReceived());
+    }
+}


Mime
View raw message