zookeeper-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [zookeeper] Diff for: [GitHub] asfgit closed pull request #684: ZOOKEEPER-3180: Add response cache to improve the throughput of read …
Date Mon, 14 Jan 2019 18:39:18 GMT
diff --git a/zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md b/zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md
index 2b68e38268..d808b612a9 100644
--- a/zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md
+++ b/zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md
@@ -685,6 +685,17 @@ property, when available, is noted below.
     defaults to 1000. This value can only be set as a
     system property.
 
+* *maxResponseCacheSize* :
+    (Java system property: **zookeeper.maxResponseCacheSize**)
+    When set to a positive integer, it determines the size
+    of the cache that stores the serialized form of recently
+    read records. Helps save the serialization cost on
+    popular znodes. The metrics **response_packet_cache_hits**
+    and **response_packet_cache_misses** can be used to tune
+    this value to a given workload. The feature is turned on
+    by default with a value of 400, set to 0 or a negative
+    integer to turn the feature off.
+
 * *autopurge.snapRetainCount* :
     (No Java system property)
     **New in 3.4.0:**
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/DumbWatcher.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/DumbWatcher.java
index f384d7c5c7..1f64dd09dc 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/DumbWatcher.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/DumbWatcher.java
@@ -26,8 +26,7 @@
 import org.apache.jute.Record;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.proto.ReplyHeader;
-import org.apache.zookeeper.server.ServerCnxn;
-import org.apache.zookeeper.server.ServerStats;
+import org.apache.zookeeper.data.Stat;
 
 /**
  * A empty watcher implementation used in bench and unit test.
@@ -58,7 +57,7 @@ public void process(WatchedEvent event) { }
     void close() { }
 
     @Override
-    public void sendResponse(ReplyHeader h, Record r, String tag) throws IOException { }
+    public void sendResponse(ReplyHeader h, Record r, String tag, String cacheKey, Stat stat)
throws IOException { }
 
     @Override
     public void sendCloseSession() { }
@@ -70,7 +69,7 @@ public void sendCloseSession() { }
     void setSessionId(long sessionId) { }
 
     @Override
-    void sendBuffer(ByteBuffer closeConn) { }
+    void sendBuffer(ByteBuffer... closeConn) { }
 
     @Override
     void enableRecv() { }
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/FinalRequestProcessor.java
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/FinalRequestProcessor.java
index b9427e8f40..d022193e73 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/FinalRequestProcessor.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/FinalRequestProcessor.java
@@ -168,6 +168,7 @@ public void processRequest(Request request) {
         zks.decInProcess();
         Code err = Code.OK;
         Record rsp = null;
+        String path = null;
         try {
             if (request.getHdr() != null && request.getHdr().getType() == OpCode.error)
{
                 /*
@@ -316,7 +317,7 @@ public void processRequest(Request request) {
                 ExistsRequest existsRequest = new ExistsRequest();
                 ByteBufferInputStream.byteBuffer2Record(request.request,
                         existsRequest);
-                String path = existsRequest.getPath();
+                path = existsRequest.getPath();
                 if (path.indexOf('\0') != -1) {
                     throw new KeeperException.BadArgumentsException();
                 }
@@ -330,15 +331,16 @@ public void processRequest(Request request) {
                 GetDataRequest getDataRequest = new GetDataRequest();
                 ByteBufferInputStream.byteBuffer2Record(request.request,
                         getDataRequest);
-                DataNode n = zks.getZKDatabase().getNode(getDataRequest.getPath());
+                path = getDataRequest.getPath();
+                DataNode n = zks.getZKDatabase().getNode(path);
                 if (n == null) {
                     throw new KeeperException.NoNodeException();
                 }
                 PrepRequestProcessor.checkACL(zks, request.cnxn, zks.getZKDatabase().aclForNode(n),
                         ZooDefs.Perms.READ,
-                        request.authInfo, getDataRequest.getPath(), null);
+                        request.authInfo, path, null);
                 Stat stat = new Stat();
-                byte b[] = zks.getZKDatabase().getData(getDataRequest.getPath(), stat,
+                byte b[] = zks.getZKDatabase().getData(path, stat,
                         getDataRequest.getWatch() ? cnxn : null);
                 rsp = new GetDataResponse(b, stat);
                 break;
@@ -362,8 +364,9 @@ public void processRequest(Request request) {
                 ByteBufferInputStream.byteBuffer2Record(request.request,
                         getACLRequest);
                 Stat stat = new Stat();
+                path = getACLRequest.getPath();
                 List<ACL> acl =
-                    zks.getZKDatabase().getACL(getACLRequest.getPath(), stat);
+                    zks.getZKDatabase().getACL(path, stat);
                 rsp = new GetACLResponse(acl, stat);
                 break;
             }
@@ -372,15 +375,16 @@ public void processRequest(Request request) {
                 GetChildrenRequest getChildrenRequest = new GetChildrenRequest();
                 ByteBufferInputStream.byteBuffer2Record(request.request,
                         getChildrenRequest);
-                DataNode n = zks.getZKDatabase().getNode(getChildrenRequest.getPath());
+                path = getChildrenRequest.getPath();
+                DataNode n = zks.getZKDatabase().getNode(path);
                 if (n == null) {
                     throw new KeeperException.NoNodeException();
                 }
                 PrepRequestProcessor.checkACL(zks, request.cnxn, zks.getZKDatabase().aclForNode(n),
                         ZooDefs.Perms.READ,
-                        request.authInfo, getChildrenRequest.getPath(), null);
+                        request.authInfo, path, null);
                 List<String> children = zks.getZKDatabase().getChildren(
-                        getChildrenRequest.getPath(), null, getChildrenRequest
+                        path, null, getChildrenRequest
                                 .getWatch() ? cnxn : null);
                 rsp = new GetChildrenResponse(children);
                 break;
@@ -391,15 +395,16 @@ public void processRequest(Request request) {
                 ByteBufferInputStream.byteBuffer2Record(request.request,
                         getChildren2Request);
                 Stat stat = new Stat();
-                DataNode n = zks.getZKDatabase().getNode(getChildren2Request.getPath());
+                path = getChildren2Request.getPath();
+                DataNode n = zks.getZKDatabase().getNode(path);
                 if (n == null) {
                     throw new KeeperException.NoNodeException();
                 }
                 PrepRequestProcessor.checkACL(zks, request.cnxn, zks.getZKDatabase().aclForNode(n),
                         ZooDefs.Perms.READ,
-                        request.authInfo, getChildren2Request.getPath(), null);
+                        request.authInfo, path, null);
                 List<String> children = zks.getZKDatabase().getChildren(
-                        getChildren2Request.getPath(), stat, getChildren2Request
+                        path, stat, getChildren2Request
                                 .getWatch() ? cnxn : null);
                 rsp = new GetChildren2Response(children, stat);
                 break;
@@ -410,11 +415,12 @@ public void processRequest(Request request) {
                 ByteBufferInputStream.byteBuffer2Record(request.request,
                         checkWatches);
                 WatcherType type = WatcherType.fromInt(checkWatches.getType());
+                path = checkWatches.getPath();
                 boolean containsWatcher = zks.getZKDatabase().containsWatcher(
-                        checkWatches.getPath(), type, cnxn);
+                        path, type, cnxn);
                 if (!containsWatcher) {
                     String msg = String.format(Locale.ENGLISH, "%s (type: %s)",
-                            checkWatches.getPath(), type);
+                            path, type);
                     throw new KeeperException.NoWatcherException(msg);
                 }
                 break;
@@ -425,11 +431,12 @@ public void processRequest(Request request) {
                 ByteBufferInputStream.byteBuffer2Record(request.request,
                         removeWatches);
                 WatcherType type = WatcherType.fromInt(removeWatches.getType());
+                path = removeWatches.getPath();
                 boolean removed = zks.getZKDatabase().removeWatch(
-                        removeWatches.getPath(), type, cnxn);
+                        path, type, cnxn);
                 if (!removed) {
                     String msg = String.format(Locale.ENGLISH, "%s (type: %s)",
-                            removeWatches.getPath(), type);
+                            path, type);
                     throw new KeeperException.NoWatcherException(msg);
                 }
                 break;
@@ -468,7 +475,19 @@ public void processRequest(Request request) {
         updateStats(request, lastOp, lastZxid);
 
         try {
-            cnxn.sendResponse(hdr, rsp, "response");
+            if (request.type == OpCode.getData && path != null && rsp !=
null) {
+                // Serialized read responses could be cached by the connection object.
+                // Cache entries are identified by their path and last modified zxid,
+                // so these values are passed along with the response.
+                GetDataResponse getDataResponse = (GetDataResponse)rsp;
+                Stat stat = null;
+                if (getDataResponse != null && getDataResponse.getStat() != null)
{
+                    stat = getDataResponse.getStat();
+                }
+                cnxn.sendResponse(hdr, rsp, "response", path, stat);
+            } else {
+                cnxn.sendResponse(hdr, rsp, "response");
+            }
             if (request.type == OpCode.closeSession) {
                 cnxn.sendCloseSession();
             }
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/NIOServerCnxn.java
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/NIOServerCnxn.java
index b48eb3dc3b..c2ab78487a 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/NIOServerCnxn.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/NIOServerCnxn.java
@@ -19,7 +19,6 @@
 package org.apache.zookeeper.server;
 
 import java.io.BufferedWriter;
-import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.PrintWriter;
 import java.io.Writer;
@@ -36,10 +35,10 @@
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.jute.BinaryInputArchive;
-import org.apache.jute.BinaryOutputArchive;
 import org.apache.jute.Record;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.data.Id;
+import org.apache.zookeeper.data.Stat;
 import org.apache.zookeeper.proto.ReplyHeader;
 import org.apache.zookeeper.proto.RequestHeader;
 import org.apache.zookeeper.proto.WatcherEvent;
@@ -137,12 +136,17 @@ void sendBufferSync(ByteBuffer bb) {
      * sendBuffer pushes a byte buffer onto the outgoing buffer queue for
      * asynchronous writes.
      */
-    public void sendBuffer(ByteBuffer bb) {
+    public void sendBuffer(ByteBuffer... buffers) {
         if (LOG.isTraceEnabled()) {
             LOG.trace("Add a buffer to outgoingBuffers, sk " + sk
                       + " is valid: " + sk.isValid());
         }
-        outgoingBuffers.add(bb);
+        synchronized (outgoingBuffers) {
+            for (ByteBuffer buffer : buffers) {
+                outgoingBuffers.add(buffer);
+            }
+            outgoingBuffers.add(packetSentinel);
+        }
         requestInterestOpsUpdate();
     }
 
@@ -221,10 +225,12 @@ void handleWrite(SelectionKey k) throws IOException, CloseRequestException
{
                 if (bb == ServerCnxnFactory.closeConn) {
                     throw new CloseRequestException("close requested");
                 }
+                if (bb == packetSentinel) {
+                    packetSent();
+                }
                 if (bb.remaining() > 0) {
                     break;
                 }
-                packetSent();
                 outgoingBuffers.remove();
             }
          } else {
@@ -269,6 +275,9 @@ void handleWrite(SelectionKey k) throws IOException, CloseRequestException
{
                 if (bb == ServerCnxnFactory.closeConn) {
                     throw new CloseRequestException("close requested");
                 }
+                if (bb == packetSentinel) {
+                    packetSent();
+                }
                 if (sent < bb.remaining()) {
                     /*
                      * We only partially sent this buffer, so we update
@@ -277,7 +286,6 @@ void handleWrite(SelectionKey k) throws IOException, CloseRequestException
{
                     bb.position(bb.position() + sent);
                     break;
                 }
-                packetSent();
                 /* We've sent the whole buffer, so drop the buffer */
                 sent -= bb.remaining();
                 outgoingBuffers.remove();
@@ -648,16 +656,34 @@ public static void closeSock(SocketChannel sock) {
         }
     }
 
-    /*
-     * (non-Javadoc)
+    private final static ByteBuffer packetSentinel = ByteBuffer.allocate(0);
+
+    /**
+     * Serializes a ZooKeeper response and enqueues it for sending.
+     *
+     * Serializes client response parts and enqueues them into outgoing queue.
+     *
+     * If both cache key and last modified zxid are provided, the serialized
+     * response is caсhed under the provided key, the last modified zxid is
+     * stored along with the value. A cache entry is invalidated if the
+     * provided last modified zxid is more recent than the stored one.
+     *
+     * Attention: this function is not thread safe, due to caching not being
+     * thread safe.
      *
-     * @see org.apache.zookeeper.server.ServerCnxnIface#sendResponse(org.apache.zookeeper.proto.ReplyHeader,
-     *      org.apache.jute.Record, java.lang.String)
+     * @param h reply header
+     * @param r reply payload, can be null
+     * @param tag Jute serialization tag, can be null
+     * @param cacheKey key for caching the serialized payload. a null value
+     *     prvents caching
+     * @param stat stat information for the the reply payload, used
+     *     for cache invalidation. a value of 0 prevents caching.
      */
     @Override
-    public void sendResponse(ReplyHeader h, Record r, String tag) {
+    public void sendResponse(ReplyHeader h, Record r, String tag,
+                             String cacheKey, Stat stat) {
         try {
-            super.sendResponse(h, r, tag);
+            sendBuffer(serialize(h, r, tag, cacheKey, stat));
             decrOutstandingAndCheckThrottle(h);
          } catch(Exception e) {
             LOG.warn("Unexpected exception. Destruction averted.", e);
@@ -682,7 +708,7 @@ public void process(WatchedEvent event) {
         // Convert WatchedEvent to a type that can be sent over the wire
         WatcherEvent e = event.getWrapper();
 
-        sendResponse(h, e, "notification");
+        sendResponse(h, e, "notification", null, null);
     }
 
     /*
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxn.java
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxn.java
index 311d3c1d20..b6bb343f49 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxn.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxn.java
@@ -39,6 +39,7 @@
 import io.netty.util.ReferenceCountUtil;
 import org.apache.jute.BinaryInputArchive;
 import org.apache.jute.Record;
+import org.apache.zookeeper.data.Stat;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.proto.ReplyHeader;
 import org.apache.zookeeper.proto.WatcherEvent;
@@ -160,12 +161,14 @@ public void process(WatchedEvent event) {
     }
 
     @Override
-    public void sendResponse(ReplyHeader h, Record r, String tag)
-            throws IOException {
+    public void sendResponse(ReplyHeader h, Record r, String tag,
+                             String cacheKey, Stat stat) throws IOException {
+        // cacheKey and stat are used in caching, which is not
+        // implemented here. Implementation example can be found in NIOServerCnxn.
         if (closingChannel || !channel.isOpen()) {
             return;
         }
-        super.sendResponse(h, r, tag);
+        sendBuffer(serialize(h, r, tag, cacheKey, stat));
         decrOutstandingAndCheckThrottle(h);
     }
 
@@ -176,12 +179,12 @@ public void setSessionId(long sessionId) {
     }
 
     @Override
-    public void sendBuffer(ByteBuffer sendBuffer) {
-        if (sendBuffer == ServerCnxnFactory.closeConn) {
+    public void sendBuffer(ByteBuffer... buffers) {
+        if (buffers.length == 1 && buffers[0] == ServerCnxnFactory.closeConn) {
             close();
             return;
         }
-        channel.writeAndFlush(Unpooled.wrappedBuffer(sendBuffer)).addListener(f -> {
+        channel.writeAndFlush(Unpooled.wrappedBuffer(buffers)).addListener(f -> {
             if (f.isSuccess()) {
                 packetSent();
             }
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ResponseCache.java
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ResponseCache.java
new file mode 100644
index 0000000000..73db7d5802
--- /dev/null
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ResponseCache.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server;
+
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+import org.apache.zookeeper.data.Stat;
+
+@SuppressWarnings("serial")
+public class ResponseCache {
+    // Magic number chosen to be "big enough but not too big"
+    private static final int DEFAULT_RESPONSE_CACHE_SIZE = 400;
+
+    private static class Entry {
+        public Stat stat;
+        public byte[] data;
+    }
+
+    private Map<String, Entry> cache = Collections.synchronizedMap(
+        new LRUCache<String, Entry>(getResponseCacheSize()));
+
+    public ResponseCache() {
+    }
+
+    public void put(String path, byte[] data, Stat stat) {
+        Entry entry = new Entry();
+        entry.data = data;
+        entry.stat = stat;
+        cache.put(path, entry);
+    }
+
+    public byte[] get(String key, Stat stat) {
+        Entry entry = cache.get(key);
+        if (entry == null) {
+            return null;
+        }
+        if (!stat.equals(entry.stat)) {
+            // The node has been modified, invalidate cache.
+            cache.remove(key);
+            return null;
+        } else {
+            return entry.data;
+        }
+    }
+
+    private static int getResponseCacheSize() {
+        return Integer.getInteger("zookeeper.maxResponseCacheSize", DEFAULT_RESPONSE_CACHE_SIZE);
+    }
+
+    public static boolean isEnabled() {
+        return getResponseCacheSize() > 0;
+    }
+
+    private static class LRUCache<K, V> extends LinkedHashMap<K, V> {
+        private int cacheSize;
+
+        LRUCache(int cacheSize) {
+            super(cacheSize/4);
+            this.cacheSize = cacheSize;
+        }
+
+        protected boolean removeEldestEntry(Map.Entry<K, V> eldest) {
+            return size() >= cacheSize;
+        }
+    }
+}
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerCnxn.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerCnxn.java
index 8e145cbeb1..b0088d1fba 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerCnxn.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerCnxn.java
@@ -37,9 +37,11 @@
 
 import org.apache.jute.BinaryOutputArchive;
 import org.apache.jute.Record;
+import org.apache.zookeeper.Quotas;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.data.Id;
+import org.apache.zookeeper.data.Stat;
 import org.apache.zookeeper.proto.ReplyHeader;
 import org.apache.zookeeper.proto.RequestHeader;
 import org.slf4j.Logger;
@@ -103,25 +105,64 @@ public void decrOutstandingAndCheckThrottle(ReplyHeader h) {
 
     abstract void close();
 
+    public abstract void sendResponse(ReplyHeader h, Record r,
+            String tag, String cacheKey, Stat stat) throws IOException;
+
     public void sendResponse(ReplyHeader h, Record r, String tag) throws IOException {
-        ByteArrayOutputStream baos = new ByteArrayOutputStream();
-        // Make space for length
+        sendResponse(h, r, tag, null, null);
+    }
+
+    protected byte[] serializeRecord(Record record) throws IOException {
+        ByteArrayOutputStream baos = new ByteArrayOutputStream(
+            ZooKeeperServer.intBufferStartingSizeBytes);
         BinaryOutputArchive bos = BinaryOutputArchive.getArchive(baos);
-        try {
-            baos.write(fourBytes);
-            bos.writeRecord(h, "header");
-            if (r != null) {
-                bos.writeRecord(r, tag);
+        bos.writeRecord(record, null);
+        return baos.toByteArray();
+    }
+
+    protected ByteBuffer[] serialize(ReplyHeader h, Record r, String tag,
+            String cacheKey, Stat stat) throws IOException {
+        byte[] header = serializeRecord(h);
+        byte[] data = null;
+        if (r != null) {
+            ResponseCache cache = zkServer.getReadResponseCache();
+            if (cache != null && stat != null && cacheKey != null &&
+                    !cacheKey.endsWith(Quotas.statNode)) {
+                // Use cache to get serialized data.
+                //
+                // NB: Tag is ignored both during cache lookup and serialization,
+                // since is is not used in read responses, which are being cached.
+                data = cache.get(cacheKey, stat);
+                if (data == null) {
+                    // Cache miss, serialize the response and put it in cache.
+                    data = serializeRecord(r);
+                    cache.put(cacheKey, data, stat);
+                    ServerMetrics.RESPONSE_PACKET_CACHE_MISSING.add(1);
+                } else {
+                    ServerMetrics.RESPONSE_PACKET_CACHE_HITS.add(1);
+                }
+            } else {
+                data = serializeRecord(r);
             }
-            baos.close();
-        } catch (IOException e) {
-            LOG.error("Error serializing response");
         }
-        byte b[] = baos.toByteArray();
-        serverStats().updateClientResponseSize(b.length - 4);
-        ByteBuffer bb = ByteBuffer.wrap(b);
-        bb.putInt(b.length - 4).rewind();
-        sendBuffer(bb);
+        int dataLength = data == null ? 0 : data.length;
+        int packetLength = header.length + dataLength;
+        ServerStats serverStats = serverStats();
+        if (serverStats != null) {
+            serverStats.updateClientResponseSize(packetLength);
+        }
+        ByteBuffer lengthBuffer = ByteBuffer.allocate(4).putInt(packetLength);
+        lengthBuffer.rewind();
+
+        int bufferLen = data != null ? 3 : 2;
+        ByteBuffer[] buffers = new ByteBuffer[bufferLen];
+
+        buffers[0] = lengthBuffer;
+        buffers[1] = ByteBuffer.wrap(header);
+        if (data != null) {
+            buffers[2] = ByteBuffer.wrap(data);
+        }
+        return buffers;
     }
 
     /* notify the client the session is closing and close/cleanup socket */
@@ -146,7 +187,7 @@ public boolean removeAuthInfo(Id id) {
         return authInfo.remove(id);
     }
 
-    abstract void sendBuffer(ByteBuffer closeConn);
+    abstract void sendBuffer(ByteBuffer... buffers);
 
     abstract void enableRecv();
 
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerMetrics.java
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerMetrics.java
index c5d82deebb..3420b88e8b 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerMetrics.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerMetrics.java
@@ -67,7 +67,10 @@
     SNAP_COUNT(new SimpleCounter("snap_count")),
     COMMIT_COUNT(new SimpleCounter("commit_count")),
     CONNECTION_REQUEST_COUNT(new SimpleCounter("connection_request_count")),
-    BYTES_RECEIVED_COUNT(new SimpleCounter("bytes_received_count"));
+    BYTES_RECEIVED_COUNT(new SimpleCounter("bytes_received_count")),
+
+    RESPONSE_PACKET_CACHE_HITS(new SimpleCounter("response_packet_cache_hits")),
+    RESPONSE_PACKET_CACHE_MISSING(new SimpleCounter("response_packet_cache_misses"));
 
     private final Metric metric;
 
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java
index 20ab023ec5..833c79bab0 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java
@@ -106,10 +106,12 @@
     protected SessionTracker sessionTracker;
     private FileTxnSnapLog txnLogFactory = null;
     private ZKDatabase zkDb;
+    private ResponseCache readResponseCache;
     private final AtomicLong hzxid = new AtomicLong(0);
     public final static Exception ok = new Exception("No prob");
     protected RequestProcessor firstProcessor;
     protected volatile State state = State.INITIAL;
+    private boolean isResponseCachingEnabled = true;
 
     protected enum State {
         INITIAL, RUNNING, SHUTDOWN, ERROR
@@ -138,6 +140,30 @@
     private ZooKeeperServerShutdownHandler zkShutdownHandler;
     private volatile int createSessionTrackerServerId = 1;
 
+    /**
+     * Starting size of read and write ByteArroyOuputBuffers. Default is 32 bytes.
+     * Flag not used for small transfers like connectResponses.
+     */
+    public static final String INT_BUFFER_STARTING_SIZE_BYTES =
+            "zookeeper.intBufferStartingSizeBytes";
+    public static final int DEFAULT_STARTING_BUFFER_SIZE = 1024;
+    public static final int intBufferStartingSizeBytes;
+
+    static {
+        intBufferStartingSizeBytes = Integer.getInteger(
+                INT_BUFFER_STARTING_SIZE_BYTES,
+                DEFAULT_STARTING_BUFFER_SIZE);
+
+        if (intBufferStartingSizeBytes < 32) {
+            String msg = "Buffer starting size must be greater than 0." +
+                    "Configure with \"-Dzookeeper.intBufferStartingSizeBytes=<size>\"
";
+            LOG.error(msg);
+            throw new IllegalArgumentException(msg);
+        }
+
+        LOG.info(INT_BUFFER_STARTING_SIZE_BYTES + " = " + intBufferStartingSizeBytes);
+    }
+
     void removeCnxn(ServerCnxn cnxn) {
         zkDb.removeCnxn(cnxn);
     }
@@ -170,6 +196,7 @@ public ZooKeeperServer(FileTxnSnapLog txnLogFactory, int tickTime,
         setMinSessionTimeout(minSessionTimeout);
         setMaxSessionTimeout(maxSessionTimeout);
         listener = new ZooKeeperServerListenerImpl(this);
+        readResponseCache = new ResponseCache();
         LOG.info("Created server with tickTime " + tickTime
                 + " minSessionTimeout " + getMinSessionTimeout()
                 + " maxSessionTimeout " + getMaxSessionTimeout()
@@ -1282,4 +1309,16 @@ private ProcessTxnResult processTxn(Request request, TxnHeader hdr,
     void registerServerShutdownHandler(ZooKeeperServerShutdownHandler zkShutdownHandler)
{
         this.zkShutdownHandler = zkShutdownHandler;
     }
+
+    public boolean isResponseCachingEnabled() {
+        return isResponseCachingEnabled;
+    }
+
+    public void setResponseCachingEnabled(boolean isEnabled) {
+        isResponseCachingEnabled = isEnabled;
+    }
+
+    public ResponseCache getReadResponseCache() {
+        return isResponseCachingEnabled ? readResponseCache : null;
+    }
 }
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServerBean.java
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServerBean.java
index cf84b2f9e5..deae98d9b4 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServerBean.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServerBean.java
@@ -197,4 +197,14 @@ public int getMinClientResponseSize() {
     public int getMaxClientResponseSize() {
         return zks.serverStats().getClientResponseStats().getMaxBufferSize();
     }
+
+    @Override
+    public boolean getResponseCachingEnabled() {
+        return zks.isResponseCachingEnabled();
+    }
+
+    @Override
+    public void setResponseCachingEnabled(boolean isEnabled) {
+        zks.setResponseCachingEnabled(isEnabled);
+    }
 }
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServerMXBean.java
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServerMXBean.java
index feb6875870..bd4d3498d2 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServerMXBean.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServerMXBean.java
@@ -95,6 +95,9 @@
      */
     public void setMaxSessionTimeout(int max);
 
+    public boolean getResponseCachingEnabled();
+    public void setResponseCachingEnabled(boolean isEnabled);
+
     /**
      * Reset packet and latency statistics 
      */
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/MockServerCnxn.java
b/zookeeper-server/src/test/java/org/apache/zookeeper/server/MockServerCnxn.java
index 20cf36dc88..a8fdeaf7dd 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/MockServerCnxn.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/MockServerCnxn.java
@@ -24,6 +24,7 @@
 import org.apache.jute.Record;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.proto.ReplyHeader;
+import org.apache.zookeeper.data.Stat;
 
 public class MockServerCnxn extends ServerCnxn {
     public Certificate[] clientChain;
@@ -43,7 +44,7 @@ void close() {
     }
 
     @Override
-    public void sendResponse(ReplyHeader h, Record r, String tag)
+    public void sendResponse(ReplyHeader h, Record r, String tag, String cacheKey, Stat stat)
             throws IOException {
     }
 
@@ -80,7 +81,7 @@ public void setClientCertificateChain(Certificate[] chain) {
     }
 
     @Override
-    void sendBuffer(ByteBuffer closeConn) {
+    void sendBuffer(ByteBuffer... closeConn) {
     }
 
     @Override
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/test/JMXEnv.java b/zookeeper-server/src/test/java/org/apache/zookeeper/test/JMXEnv.java
index 4edcc0eb12..d8a923a865 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/test/JMXEnv.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/test/JMXEnv.java
@@ -19,9 +19,12 @@
 package org.apache.zookeeper.test;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Set;
+import java.util.regex.Pattern;
 
 import javax.management.MBeanServer;
 import javax.management.MBeanServerConnection;
@@ -319,4 +322,46 @@ private static boolean compare(String bean, String name) {
         }
         return false;
     }
+
+    static Pattern standaloneRegEx = Pattern.compile(
+            "^org.apache.ZooKeeperService:name0=StandaloneServer_port-?\\d+$"
+    );
+    static Pattern instanceRegEx = Pattern.compile(
+            "^org.apache.ZooKeeperService:name0=ReplicatedServer_id(\\d+)" +
+            ",name1=replica.(\\d+),name2=(Follower|Leader)$"
+    );
+    static Pattern observerRegEx = Pattern.compile(
+            "^org.apache.ZooKeeperService:name0=ReplicatedServer_id(-?\\d+)" +
+            ",name1=replica.(-?\\d+),name2=(StandaloneServer_port-?\\d+)$"
+    );
+    static List<Pattern> beanPatterns = Arrays.asList(standaloneRegEx, instanceRegEx,
observerRegEx);
+
+    public static List<ObjectName> getServerBeans() throws IOException {
+        ArrayList<ObjectName> serverBeans = new ArrayList<>();
+        Set<ObjectName> beans;
+        try {
+            beans = conn().queryNames(
+                    new ObjectName(CommonNames.DOMAIN + ":*"), null);
+        } catch (MalformedObjectNameException e) {
+            throw new RuntimeException(e);
+        }
+        for (ObjectName bean : beans) {
+            String name = bean.toString();
+            LOG.info("bean:" + name);
+            for (Pattern pattern : beanPatterns) {
+                if (pattern.matcher(name).find()) {
+                    serverBeans.add(bean);
+                }
+            }
+        }
+        return serverBeans;
+    }
+
+    public static ObjectName getServerBean() throws Exception {
+        List<ObjectName> serverBeans = getServerBeans();
+        if (serverBeans.size() != 1) {
+            throw new RuntimeException("Unable to find one and only one server bean");
+        }
+        return serverBeans.get(0);
+    }
 }
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/test/ResponseCacheTest.java
b/zookeeper-server/src/test/java/org/apache/zookeeper/test/ResponseCacheTest.java
new file mode 100644
index 0000000000..e220c61828
--- /dev/null
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/test/ResponseCacheTest.java
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.test;
+
+import java.util.Map;
+
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.Stat;
+import org.apache.zookeeper.server.ServerMetrics;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ResponseCacheTest extends ClientBase {
+    protected static final Logger LOG =
+            LoggerFactory.getLogger(ResponseCacheTest.class);
+
+    @Test
+    public void testResponseCache() throws Exception {
+        ZooKeeper zk = createClient();
+
+        try {
+            performCacheTest(zk, "/cache", true);
+            performCacheTest(zk, "/nocache", false);
+        }
+        finally {
+            zk.close();
+        }
+    }
+
+    private void checkCacheStatus(long expectedHits, long expectedMisses) {
+        Map<String, Object> metrics = ServerMetrics.getAllValues();
+        Assert.assertEquals(expectedHits, metrics.get("response_packet_cache_hits"));
+        Assert.assertEquals(expectedMisses, metrics.get("response_packet_cache_misses"));
+    }
+
+    public void performCacheTest(ZooKeeper zk, String path, boolean useCache) throws Exception
{
+        ServerMetrics.resetAll();
+        Stat writeStat = new Stat();
+        Stat readStat = new Stat();
+        byte[] readData = null;
+        int reads = 10;
+        long expectedHits = 0;
+        long expectedMisses = 0;
+
+        getServer(serverFactory).setResponseCachingEnabled(useCache);
+        LOG.info("caching: {}", useCache);
+
+        byte[] writeData = "test1".getBytes();
+        zk.create(path, writeData, ZooDefs.Ids.OPEN_ACL_UNSAFE,
+                CreateMode.PERSISTENT, writeStat);
+        for (int i = 0; i < reads; ++i) {
+            readData = zk.getData(path, false, readStat);
+            Assert.assertArrayEquals(writeData, readData);
+            Assert.assertEquals(writeStat, readStat);
+        }
+        if (useCache) {
+            expectedMisses += 1;
+            expectedHits += reads - 1;
+        }
+        checkCacheStatus(expectedHits, expectedMisses);
+
+        writeData = "test2".getBytes();
+        writeStat = zk.setData(path, writeData, -1);
+        for (int i = 0; i < 10; ++i) {
+            readData = zk.getData(path, false, readStat);
+            Assert.assertArrayEquals(writeData, readData);
+            Assert.assertEquals(writeStat, readStat);
+        }
+        if (useCache) {
+            expectedMisses += 1;
+            expectedHits += reads - 1;
+        }
+        checkCacheStatus(expectedHits, expectedMisses);
+
+        // Create a child beneath the tested node. This won't change the data of
+        // the tested node, but will change it's pzxid. The next read of the tested
+        // node should miss in the cache. The data should still match what was written
+        // before, but the stat information should not.
+        zk.create(path + "/child", "child".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,
+                CreateMode.PERSISTENT, null);
+        readData = zk.getData(path, false, readStat);
+        if (useCache) {
+            expectedMisses++;
+        }
+        Assert.assertArrayEquals(writeData, readData);
+        Assert.assertNotSame(writeStat, readStat);
+        checkCacheStatus(expectedHits, expectedMisses);
+    }
+}


With regards,
Apache Git Services

Mime
View raw message