From commits-return-7923-archive-asf-public=cust-asf.ponee.io@zookeeper.apache.org Fri Aug 23 17:43:11 2019 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [207.244.88.153]) by mx-eu-01.ponee.io (Postfix) with SMTP id 88F21180637 for ; Fri, 23 Aug 2019 19:43:10 +0200 (CEST) Received: (qmail 73525 invoked by uid 500); 23 Aug 2019 17:43:09 -0000 Mailing-List: contact commits-help@zookeeper.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@zookeeper.apache.org Delivered-To: mailing list commits@zookeeper.apache.org Received: (qmail 73514 invoked by uid 99); 23 Aug 2019 17:43:09 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 23 Aug 2019 17:43:09 +0000 Received: by gitbox.apache.org (ASF Mail Server at gitbox.apache.org, from userid 33) id AB72C85FB3; Fri, 23 Aug 2019 17:43:09 +0000 (UTC) Date: Fri, 23 Aug 2019 17:43:09 +0000 To: "commits@zookeeper.apache.org" Subject: [zookeeper] branch master updated: ZOOKEEPER-3448: Introduce MessageTracker to assist debug leader and leaner connectivity issues. MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit Message-ID: <156658218956.14395.3684041605840563107@gitbox.apache.org> From: eolivelli@apache.org X-Git-Host: gitbox.apache.org X-Git-Repo: zookeeper X-Git-Refname: refs/heads/master X-Git-Reftype: branch X-Git-Oldrev: 942213dfe28e464f068f8a195d1424c4b29af585 X-Git-Newrev: b5817fbb12b88bc28be72f109c695ebf51bebf3b X-Git-Rev: b5817fbb12b88bc28be72f109c695ebf51bebf3b X-Git-NotificationType: ref_changed_plus_diff X-Git-Multimail-Version: 1.5.dev Auto-Submitted: auto-generated This is an automated email from the ASF dual-hosted git repository. eolivelli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/zookeeper.git The following commit(s) were added to refs/heads/master by this push: new b5817fb ZOOKEEPER-3448: Introduce MessageTracker to assist debug leader and leaner connectivity issues. b5817fb is described below commit b5817fbb12b88bc28be72f109c695ebf51bebf3b Author: Michael Han AuthorDate: Fri Aug 23 19:43:02 2019 +0200 ZOOKEEPER-3448: Introduce MessageTracker to assist debug leader and leaner connectivity issues. We want to have better insight on the state of the world when learners lost connection with leader, so we need capture more information when that happens. We capture more information through MessageTracker which will record the last few sent and received messages at various protocol stage, and these information will be dumped to log files for further analysis. Author: Michael Han Author: Michael Han Reviewers: Enrico Olivelli , Fangmin Lyu Closes #1007 from hanm/twitter/2765eb0629d2f63f07d112270b582e8e931f734f --- .../src/main/resources/markdown/zookeeperAdmin.md | 20 +++ .../apache/zookeeper/server/quorum/Follower.java | 14 ++ .../apache/zookeeper/server/quorum/Learner.java | 8 + .../zookeeper/server/quorum/LearnerHandler.java | 19 +- .../apache/zookeeper/server/quorum/Observer.java | 13 +- .../zookeeper/server/util/CircularBuffer.java | 103 +++++++++++ .../zookeeper/server/util/MessageTracker.java | 165 +++++++++++++++++ .../zookeeper/server/util/CircularBufferTest.java | 198 +++++++++++++++++++++ .../zookeeper/server/util/MessageTrackerTest.java | 129 ++++++++++++++ 9 files changed, 667 insertions(+), 2 deletions(-) diff --git a/zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md b/zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md index dbf9a08..d3bcaff 100644 --- a/zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md +++ b/zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md @@ -1494,6 +1494,26 @@ Both subsystems need to have sufficient amount of threads to achieve peak read t minute. This prevents herding during container deletion. Default is "10000". + + +#### Debug Observability Configurations + +**New in 3.6.0:** The following options are introduced to make zookeeper easier to debug. + +* *zookeeper.messageTracker.BufferSize* : + (Java system property only) + Controls the maximum number of messages stored in **MessageTracker**. Value should be positive + integers. The default value is 10. **MessageTracker** is introduced in **3.6.0** to record the + last set of messages between a server (follower or observer) and a leader, when a server + disconnects with leader. These set of messages will then be dumped to zookeeper's log file, + and will help reconstruct the state of the servers at the time of the disconnection and + will be useful for debugging purpose. + +* *zookeeper.messageTracker.Enabled* : + (Java system property only) + When set to "true", will enable **MessageTracker** to track and record messages. Default value + is "false". + #### AdminServer configuration diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Follower.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Follower.java index 57b53e6..532dfc8 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Follower.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Follower.java @@ -74,11 +74,16 @@ public class Follower extends Learner { self.start_fle = 0; self.end_fle = 0; fzk.registerJMX(new FollowerBean(this, zk), self.jmxLocalPeerBean); + + long connectionTime = 0; + boolean completedSync = false; + try { self.setZabState(QuorumPeer.ZabState.DISCOVERY); QuorumServer leaderServer = findLeader(); try { connectToLeader(leaderServer.addr, leaderServer.hostname); + connectionTime = System.currentTimeMillis(); long newEpochZxid = registerWithLeader(Leader.FOLLOWERINFO); if (self.isReconfigStateChange()) { throw new Exception("learned about role change"); @@ -99,6 +104,7 @@ public class Follower extends Learner { self.setZabState(QuorumPeer.ZabState.SYNCHRONIZATION); syncWithLeader(newEpochZxid); self.setZabState(QuorumPeer.ZabState.BROADCAST); + completedSync = true; } finally { long syncTime = Time.currentElapsedTime() - startTime; ServerMetrics.getMetrics().FOLLOWER_SYNC_TIME.add(syncTime); @@ -129,6 +135,14 @@ public class Follower extends Learner { om.stop(); } zk.unregisterJMX(this); + + if (connectionTime != 0) { + long connectionDuration = System.currentTimeMillis() - connectionTime; + LOG.info("Disconnected from leader (with address: {}). " + + "Was connected for {}ms. Sync state: {}", + leaderAddr, connectionDuration, completedSync); + messageTracker.dumpToLog(leaderAddr.toString()); + } } } diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java index d33b609..6f55483 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java @@ -47,6 +47,7 @@ import org.apache.zookeeper.server.ServerCnxn; import org.apache.zookeeper.server.ZooTrace; import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer; import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier; +import org.apache.zookeeper.server.util.MessageTracker; import org.apache.zookeeper.server.util.SerializeUtils; import org.apache.zookeeper.server.util.ZxidUtils; import org.apache.zookeeper.txn.SetDataTxn; @@ -74,6 +75,7 @@ public class Learner { protected BufferedOutputStream bufferedOutput; protected Socket sock; + protected InetSocketAddress leaderAddr; /** * Socket getter @@ -88,6 +90,9 @@ public class Learner { /** the protocol version of the leader */ protected int leaderProtocolVersion = 0x01; + private static final int BUFFERED_MESSAGE_SIZE = 10; + protected final MessageTracker messageTracker = new MessageTracker(BUFFERED_MESSAGE_SIZE); + protected static final Logger LOG = LoggerFactory.getLogger(Learner.class); /** @@ -146,6 +151,7 @@ public class Learner { void writePacket(QuorumPacket pp, boolean flush) throws IOException { synchronized (leaderOs) { if (pp != null) { + messageTracker.trackSent(pp.getType()); leaderOs.writeRecord(pp, "packet"); } if (flush) { @@ -164,6 +170,7 @@ public class Learner { void readPacket(QuorumPacket pp) throws IOException { synchronized (leaderIs) { leaderIs.readRecord(pp, "packet"); + messageTracker.trackReceived(pp.getType()); } long traceMask = ZooTrace.SERVER_PACKET_TRACE_MASK; if (pp.getType() == Leader.PING) { @@ -250,6 +257,7 @@ public class Learner { */ protected void connectToLeader(InetSocketAddress addr, String hostname) throws IOException, InterruptedException, X509Exception { this.sock = createSocket(); + this.leaderAddr = addr; // leader connection timeout defaults to tickTime * initLimit int connectTimeout = self.tickTime * self.initLimit; diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerHandler.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerHandler.java index 7802a0e..148951a 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerHandler.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerHandler.java @@ -49,6 +49,7 @@ import org.apache.zookeeper.server.ZooTrace; import org.apache.zookeeper.server.quorum.Leader.Proposal; import org.apache.zookeeper.server.quorum.QuorumPeer.LearnerType; import org.apache.zookeeper.server.quorum.auth.QuorumAuthServer; +import org.apache.zookeeper.server.util.MessageTracker; import org.apache.zookeeper.server.util.SerializeUtils; import org.apache.zookeeper.server.util.ZxidUtils; import org.apache.zookeeper.txn.TxnHeader; @@ -220,6 +221,8 @@ public class LearnerHandler extends ZooKeeperThread { private final BufferedInputStream bufferedInput; private BufferedOutputStream bufferedOutput; + protected final MessageTracker messageTracker; + // for test only protected void setOutputArchive(BinaryOutputArchive oa) { this.oa = oa; @@ -280,6 +283,8 @@ public class LearnerHandler extends ZooKeeperThread { } throw new SaslException("Authentication failure: " + e.getMessage()); } + + this.messageTracker = new MessageTracker(MessageTracker.BUFFERED_MESSAGE_SIZE); } @Override @@ -349,6 +354,7 @@ public class LearnerHandler extends ZooKeeperThread { } oa.writeRecord(p, "packet"); packetsSent.incrementAndGet(); + messageTracker.trackSent(p.getType()); } catch (IOException e) { if (!sock.isClosed()) { LOG.warn("Unexpected exception at " + this, e); @@ -464,8 +470,11 @@ public class LearnerHandler extends ZooKeeperThread { QuorumPacket qp = new QuorumPacket(); ia.readRecord(qp, "packet"); + + messageTracker.trackReceived(qp.getType()); if (qp.getType() != Leader.FOLLOWERINFO && qp.getType() != Leader.OBSERVERINFO) { LOG.error("First packet " + qp.toString() + " is not FOLLOWERINFO or OBSERVERINFO!"); + return; } @@ -526,9 +535,11 @@ public class LearnerHandler extends ZooKeeperThread { ByteBuffer.wrap(ver).putInt(0x10000); QuorumPacket newEpochPacket = new QuorumPacket(Leader.LEADERINFO, newLeaderZxid, ver, null); oa.writeRecord(newEpochPacket, "packet"); + messageTracker.trackSent(Leader.LEADERINFO); bufferedOutput.flush(); QuorumPacket ackEpochPacket = new QuorumPacket(); ia.readRecord(ackEpochPacket, "packet"); + messageTracker.trackReceived(ackEpochPacket.getType()); if (ackEpochPacket.getType() != Leader.ACKEPOCH) { LOG.error(ackEpochPacket.toString() + " is not ACKEPOCH"); return; @@ -554,6 +565,7 @@ public class LearnerHandler extends ZooKeeperThread { try { long zxidToSend = learnerMaster.getZKDatabase().getDataTreeLastProcessedZxid(); oa.writeRecord(new QuorumPacket(Leader.SNAP, zxidToSend, null, null), "packet"); + messageTracker.trackSent(Leader.SNAP); bufferedOutput.flush(); LOG.info("Sending snapshot last zxid of peer is 0x{}, zxid of leader is 0x{}, " @@ -600,6 +612,8 @@ public class LearnerHandler extends ZooKeeperThread { */ qp = new QuorumPacket(); ia.readRecord(qp, "packet"); + + messageTracker.trackReceived(qp.getType()); if (qp.getType() != Leader.ACK) { LOG.error("Next packet was supposed to be an ACK," + " but received packet: {}", packetToString(qp)); return; @@ -632,6 +646,7 @@ public class LearnerHandler extends ZooKeeperThread { while (true) { qp = new QuorumPacket(); ia.readRecord(qp, "packet"); + messageTracker.trackReceived(qp.getType()); long traceMask = ZooTrace.SERVER_PACKET_TRACE_MASK; if (qp.getType() == Leader.PING) { @@ -716,7 +731,9 @@ public class LearnerHandler extends ZooKeeperThread { syncThrottler.endSync(); syncThrottler = null; } - LOG.warn("******* GOODBYE {} ********", getRemoteAddress()); + String remoteAddr = getRemoteAddress(); + LOG.warn("******* GOODBYE {} ********", remoteAddr); + messageTracker.dumpToLog(remoteAddr); shutdown(); } } diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Observer.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Observer.java index 4081c27..4a7f386 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Observer.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Observer.java @@ -97,12 +97,14 @@ public class Observer extends Learner { */ void observeLeader() throws Exception { zk.registerJMX(new ObserverBean(this, zk), self.jmxLocalPeerBean); - + long connectTime = 0; + boolean completedSync = false; try { self.setZabState(QuorumPeer.ZabState.DISCOVERY); QuorumServer master = findLearnerMaster(); try { connectToLeader(master.addr, master.hostname); + connectTime = System.currentTimeMillis(); long newLeaderZxid = registerWithLeader(Leader.OBSERVERINFO); if (self.isReconfigStateChange()) { throw new Exception("learned about role change"); @@ -112,6 +114,7 @@ public class Observer extends Learner { self.setZabState(QuorumPeer.ZabState.SYNCHRONIZATION); syncWithLeader(newLeaderZxid); self.setZabState(QuorumPeer.ZabState.BROADCAST); + completedSync = true; QuorumPacket qp = new QuorumPacket(); while (this.isRunning() && nextLearnerMaster.get() == null) { readPacket(qp); @@ -127,6 +130,14 @@ public class Observer extends Learner { } finally { currentLearnerMaster = null; zk.unregisterJMX(this); + if (connectTime != 0) { + long connectionDuration = System.currentTimeMillis() - connectTime; + + LOG.info("Disconnected from leader (with address: {}). " + + "Was connected for {}ms. Sync state: {}", + leaderAddr, connectionDuration, completedSync); + messageTracker.dumpToLog(leaderAddr.toString()); + } } } diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/util/CircularBuffer.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/util/CircularBuffer.java new file mode 100644 index 0000000..d338120 --- /dev/null +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/util/CircularBuffer.java @@ -0,0 +1,103 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.server.util; + +import java.lang.reflect.Array; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * Thread safe FIFO CircularBuffer implementation. + * When the buffer is full write operation overwrites the oldest element. + * + * Fun thing @todo, make this lock free as this is called on every quorum message + */ +public class CircularBuffer { + + private final T[] buffer; + private final int capacity; + private int oldest; + private AtomicInteger numberOfElements = new AtomicInteger(); + + @SuppressWarnings("unchecked") + public CircularBuffer(Class clazz, int capacity) { + if (capacity <= 0) { + throw new IllegalArgumentException("CircularBuffer capacity should be greater than 0"); + } + this.buffer = (T[]) Array.newInstance(clazz, capacity); + this.capacity = capacity; + } + + /** + * Puts elements in the next available index in the array. + * If the array is full the oldest element is replaced with + * the new value. + * @param element + */ + public synchronized void write(T element) { + int newSize = numberOfElements.incrementAndGet(); + if (newSize > capacity) { + buffer[oldest] = element; + oldest = ++oldest % capacity; + numberOfElements.decrementAndGet(); + } else { + int index = (oldest + numberOfElements.get() - 1) % capacity; + buffer[index] = element; + } + } + + /** + * Reads from the buffer in a FIFO manner. + * Returns the oldest element in the buffer if the buffer ie not empty + * Returns null if the buffer is empty + * @return + */ + public synchronized T take() { + int newSize = numberOfElements.decrementAndGet(); + if (newSize < 0) { + numberOfElements.incrementAndGet(); + return null; + } + T polled = buffer[oldest]; + oldest = ++oldest % capacity; + return polled; + } + + public synchronized T peek() { + if (numberOfElements.get() <= 0) { + return null; + } + return buffer[oldest]; + } + + public int size() { + return numberOfElements.get(); + } + + public boolean isEmpty() { + return numberOfElements.get() <= 0; + } + + public boolean isFull() { + return numberOfElements.get() >= capacity; + } + + public synchronized void reset() { + numberOfElements.set(0); + } +} diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/util/MessageTracker.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/util/MessageTracker.java new file mode 100644 index 0000000..086bdcc --- /dev/null +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/util/MessageTracker.java @@ -0,0 +1,165 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.server.util; + +import java.text.SimpleDateFormat; +import java.util.Date; +import org.apache.zookeeper.server.quorum.Leader; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * This class provides a way of buffering sentBuffer and receivedBuffer messages in order. + * It uses EvictingQueue of size BUFFERED_MESSAGE_SIZE to store the messages. + * When the queue is full it overrides the oldest in a circular manner. + * This class does doe not provide thread safety. + */ +public class MessageTracker { + + private static final Logger LOG = LoggerFactory.getLogger(MessageTracker.class); + + private final CircularBuffer sentBuffer; + private final CircularBuffer receivedBuffer; + + public static final String MESSAGE_TRACKER_BUFFER_SIZE = "zookeeper.messageTracker.BufferSize"; + public static final String MESSAGE_TRACKER_ENABLED = "zookeeper.messageTracker.Enabled"; + public static final int BUFFERED_MESSAGE_SIZE; + private static final boolean enabled; + static { + BUFFERED_MESSAGE_SIZE = Integer.getInteger(MESSAGE_TRACKER_BUFFER_SIZE, 10); + enabled = Boolean.getBoolean(MESSAGE_TRACKER_ENABLED); + } + + public MessageTracker(int buffer_size) { + this.sentBuffer = new CircularBuffer<>(BufferedMessage.class, buffer_size); + this.receivedBuffer = new CircularBuffer<>(BufferedMessage.class, buffer_size); + } + + public void trackSent(long timestamp) { + if (enabled) { + sentBuffer.write(new BufferedMessage(timestamp)); + } + } + + public void trackSent(int packetType) { + if (enabled) { + sentBuffer.write(new BufferedMessage(packetType)); + } + } + + public void trackReceived(long timestamp) { + if (enabled) { + receivedBuffer.write(new BufferedMessage(timestamp)); + } + } + + public void trackReceived(int packetType) { + if (enabled) { + receivedBuffer.write(new BufferedMessage(packetType)); + } + } + + public final BufferedMessage peekSent() { + return sentBuffer.peek(); + } + + public final BufferedMessage peekReceived() { + return receivedBuffer.peek(); + } + + public final long peekSentTimestamp() { + return enabled ? sentBuffer.peek().getTimestamp() : 0; + } + + public final long peekReceivedTimestamp() { + return enabled ? receivedBuffer.peek().getTimestamp() : 0; + } + + public void dumpToLog(String serverAddress) { + if (!enabled) { + return; + } + logMessages(serverAddress, receivedBuffer, Direction.RECEIVED); + logMessages(serverAddress, sentBuffer, Direction.SENT); + } + + private static void logMessages( + String serverAddr, + CircularBuffer messages, + Direction direction) { + String sentOrReceivedText = direction == Direction.SENT ? "sentBuffer to" : "receivedBuffer from"; + + if (messages.isEmpty()) { + LOG.info("No buffered timestamps for messages {} {}", sentOrReceivedText, serverAddr); + } else { + LOG.warn("Last {} timestamps for messages {} {}:", + messages.size(), sentOrReceivedText, serverAddr); + while (!messages.isEmpty()) { + LOG.warn("{} {} {}", + sentOrReceivedText, + serverAddr, + messages.take().toString()); + } + } + } + + /** + * Direction for message track. + */ + private enum Direction { + SENT, RECEIVED + } + + private static class BufferedMessage { + + private long timestamp; + private int messageType; + + private long getTimestamp() { + return timestamp; + } + + BufferedMessage(int messageType) { + this.messageType = messageType; + this.timestamp = System.currentTimeMillis(); + } + + BufferedMessage(long timestamp) { + this.messageType = -1; + this.timestamp = timestamp; + } + + @Override + /** + * ToString examples are as follows: + * TimeStamp: 2016-06-06 11:07:58,594 Type: PROPOSAL + * TimeStamp: 2016-06-06 11:07:58,187 + */ + public String toString() { + if (messageType == -1) { + return "TimeStamp: " + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss,SSS") + .format(new Date(timestamp)); + } else { + return "TimeStamp: " + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss,SSS") + .format(new Date(timestamp)) + " Type: " + Leader.getPacketType(messageType); + } + } + } +} diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/util/CircularBufferTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/util/CircularBufferTest.java new file mode 100644 index 0000000..ff3c83c --- /dev/null +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/util/CircularBufferTest.java @@ -0,0 +1,198 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.server.util; + +import org.junit.Assert; +import org.junit.Test; + +public class CircularBufferTest { + + @Test + public void testCircularBuffer() { + final int capacity = 3; + CircularBuffer buffer = new CircularBuffer<>(String.class, capacity); + + Assert.assertTrue(buffer.isEmpty()); + Assert.assertFalse(buffer.isFull()); + + // write to the buffer + buffer.write("A"); + Assert.assertFalse(buffer.isEmpty()); + Assert.assertFalse(buffer.isFull()); + + buffer.write("B"); + Assert.assertFalse(buffer.isEmpty()); + Assert.assertFalse(buffer.isFull()); + + buffer.write("C"); + Assert.assertFalse(buffer.isEmpty()); + Assert.assertTrue(buffer.isFull()); + + // Buffer is full. + // Read from buffer + Assert.assertEquals("A", buffer.take()); + Assert.assertFalse(buffer.isEmpty()); + Assert.assertFalse(buffer.isFull()); + + Assert.assertEquals("B", buffer.take()); + Assert.assertFalse(buffer.isEmpty()); + Assert.assertFalse(buffer.isFull()); + + Assert.assertEquals("C", buffer.take()); + Assert.assertTrue(buffer.isEmpty()); + Assert.assertFalse(buffer.isFull()); + + // write to the buffer + buffer.write("1"); + Assert.assertFalse(buffer.isEmpty()); + Assert.assertFalse(buffer.isFull()); + + buffer.write("2"); + Assert.assertFalse(buffer.isEmpty()); + Assert.assertFalse(buffer.isFull()); + + buffer.write("3"); + Assert.assertFalse(buffer.isEmpty()); + Assert.assertTrue(buffer.isFull()); + + buffer.write("4"); // 4 overwrites 1 + Assert.assertFalse(buffer.isEmpty()); + Assert.assertTrue(buffer.isFull()); + + // Buffer if full + // Read from buffer + Assert.assertEquals("2", buffer.take()); + Assert.assertFalse(buffer.isEmpty()); + Assert.assertFalse(buffer.isFull()); + + Assert.assertEquals("3", buffer.take()); + Assert.assertFalse(buffer.isEmpty()); + Assert.assertFalse(buffer.isFull()); + + Assert.assertEquals("4", buffer.take()); + Assert.assertTrue(buffer.isEmpty()); + Assert.assertFalse(buffer.isFull()); + + // write to the buffer + buffer.write("a"); + Assert.assertFalse(buffer.isEmpty()); + Assert.assertFalse(buffer.isFull()); + + buffer.write("b"); + Assert.assertFalse(buffer.isEmpty()); + Assert.assertFalse(buffer.isFull()); + + buffer.write("c"); + Assert.assertFalse(buffer.isEmpty()); + Assert.assertTrue(buffer.isFull()); + + buffer.write("d"); // d overwrites a + Assert.assertFalse(buffer.isEmpty()); + Assert.assertTrue(buffer.isFull()); + + buffer.write("e"); // e overwrites b + Assert.assertFalse(buffer.isEmpty()); + Assert.assertTrue(buffer.isFull()); + + buffer.write("f"); // f overwrites c + Assert.assertFalse(buffer.isEmpty()); + Assert.assertTrue(buffer.isFull()); + + buffer.write("g"); // g overwrites d + Assert.assertFalse(buffer.isEmpty()); + Assert.assertTrue(buffer.isFull()); + + // Buffer is full. + // Read from buffer + Assert.assertEquals("e", buffer.take()); + Assert.assertFalse(buffer.isEmpty()); + Assert.assertFalse(buffer.isFull()); + + Assert.assertEquals("f", buffer.take()); + Assert.assertFalse(buffer.isEmpty()); + Assert.assertFalse(buffer.isFull()); + + Assert.assertEquals("g", buffer.take()); + Assert.assertTrue(buffer.isEmpty()); + Assert.assertFalse(buffer.isFull()); + } + + @Test + public void testCircularBufferWithCapacity1() { + final int capacity = 1; + CircularBuffer buffer = new CircularBuffer<>(String.class, capacity); + + Assert.assertTrue(buffer.isEmpty()); + Assert.assertFalse(buffer.isFull()); + + // write to the buffer + buffer.write("A"); + Assert.assertFalse(buffer.isEmpty()); + Assert.assertTrue(buffer.isFull()); + + buffer.write("B"); // B overwrite A + Assert.assertFalse(buffer.isEmpty()); + Assert.assertTrue(buffer.isFull()); + + // Buffer is full. + // Read from buffer + Assert.assertEquals("B", buffer.take()); + Assert.assertTrue(buffer.isEmpty()); + Assert.assertFalse(buffer.isFull()); + } + + @Test + public void testCircularBufferReset() { + final int capacity = 3; + CircularBuffer buffer = new CircularBuffer<>(String.class, capacity); + + Assert.assertTrue(buffer.isEmpty()); + Assert.assertFalse(buffer.isFull()); + + // write to the buffer + buffer.write("A"); + Assert.assertFalse(buffer.isEmpty()); + Assert.assertFalse(buffer.isFull()); + Assert.assertEquals(1, buffer.size()); + Assert.assertEquals("A", buffer.peek()); + + buffer.write("B"); + Assert.assertFalse(buffer.isEmpty()); + Assert.assertFalse(buffer.isFull()); + Assert.assertEquals(2, buffer.size()); + Assert.assertEquals("A", buffer.peek()); + + // reset + buffer.reset(); + Assert.assertNull(buffer.peek()); + Assert.assertTrue(buffer.isEmpty()); + Assert.assertFalse(buffer.isFull()); + Assert.assertEquals(0, buffer.size()); + } + + @Test + public void testCircularBufferIllegalCapacity() { + try { + CircularBuffer buffer = new CircularBuffer<>(String.class, 0); + Assert.fail(); + } catch (IllegalArgumentException e) { + Assert.assertEquals("CircularBuffer capacity should be greater than 0", e.getMessage()); + } + } +} diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/util/MessageTrackerTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/util/MessageTrackerTest.java new file mode 100644 index 0000000..f1cef10 --- /dev/null +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/util/MessageTrackerTest.java @@ -0,0 +1,129 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.server.util; + +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class MessageTrackerTest { + private static final int BUFFERED_MESSAGE_SIZE = 5; + private static final Logger LOG = LoggerFactory.getLogger(MessageTrackerTest.class); + + @Before + public void setup() { + System.setProperty(MessageTracker.MESSAGE_TRACKER_ENABLED, "true"); + } + + @After + public void tearDown() throws Exception { + System.clearProperty(MessageTracker.MESSAGE_TRACKER_ENABLED); + } + + @Test + public void testTrackSend() throws InterruptedException { + long timestamp1 = System.currentTimeMillis(); + MessageTracker messageTracker = new MessageTracker(BUFFERED_MESSAGE_SIZE); + + // First timestamp is added + messageTracker.trackSent(timestamp1); + Assert.assertEquals(messageTracker.peekSentTimestamp(), timestamp1); + + Thread.sleep(2); + + // Second timestamp is added + long timestamp2 = System.currentTimeMillis(); + messageTracker.trackSent(timestamp2); + Assert.assertEquals(messageTracker.peekSentTimestamp(), timestamp1); + } + + @Test + public void testTrackReceived() throws InterruptedException { + long timestamp1 = System.currentTimeMillis(); + MessageTracker messageTracker = new MessageTracker(BUFFERED_MESSAGE_SIZE); + + // First timestamp is added + messageTracker.trackReceived(timestamp1); + Assert.assertEquals(messageTracker.peekReceivedTimestamp(), timestamp1); + + Thread.sleep(2); + + // Second timestamp is added + long timestamp2 = System.currentTimeMillis(); + messageTracker.trackReceived(timestamp2); + Assert.assertEquals(messageTracker.peekReceivedTimestamp(), timestamp1); + } + + @Test + public void testMessageTrackerFull() throws InterruptedException { + MessageTracker messageTracker = new MessageTracker(BUFFERED_MESSAGE_SIZE); + + // Add up to capacity + 1 + long timestampSent = 0; + long timestampReceived = 0; + for (int i = 0; i <= BUFFERED_MESSAGE_SIZE; i++) { + if (i == 1) { + timestampSent = System.currentTimeMillis(); + messageTracker.trackSent(timestampSent); + Thread.sleep(2); + timestampReceived = System.currentTimeMillis(); + messageTracker.trackReceived(timestampReceived); + } else { + messageTracker.trackSent(System.currentTimeMillis()); + messageTracker.trackReceived(System.currentTimeMillis()); + } + Thread.sleep(1); + } + + Assert.assertEquals(messageTracker.peekSentTimestamp(), timestampSent); + Assert.assertEquals(messageTracker.peekReceivedTimestamp(), timestampReceived); + } + + @Test + public void testDumpToLog() { + long timestamp1 = System.currentTimeMillis(); + MessageTracker messageTracker = new MessageTracker(BUFFERED_MESSAGE_SIZE); + String sid = "127.0.0.1"; + + // MessageTracker is empty + messageTracker.dumpToLog(sid); + Assert.assertNull(messageTracker.peekSent()); + Assert.assertNull(messageTracker.peekReceived()); + + // There is 1 sent and 0 received + messageTracker.trackSent(timestamp1); + Assert.assertEquals(messageTracker.peekSentTimestamp(), timestamp1); + Assert.assertNull(messageTracker.peekReceived()); + messageTracker.dumpToLog(sid); + Assert.assertNull(messageTracker.peekSent()); + Assert.assertNull(messageTracker.peekReceived()); + + // There is 1 sent and 1 received + messageTracker.trackSent(timestamp1); + messageTracker.trackReceived(timestamp1); + Assert.assertEquals(messageTracker.peekSentTimestamp(), timestamp1); + Assert.assertEquals(messageTracker.peekReceivedTimestamp(), timestamp1); + messageTracker.dumpToLog(sid); + Assert.assertNull(messageTracker.peekSent()); + Assert.assertNull(messageTracker.peekReceived()); + } +}