bookkeeper-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From eolive...@apache.org
Subject bookkeeper git commit: BOOKKEEPER-1068: Expose ByteBuf in LedgerEntry to avoid data copy
Date Thu, 25 May 2017 12:23:08 GMT
Repository: bookkeeper
Updated Branches:
  refs/heads/master 8ba8ab940 -> 7b1eec470


BOOKKEEPER-1068: Expose ByteBuf in LedgerEntry to avoid data copy

To avoid copying the entries payloads when writing/reading on a ledger and having to allocate
a lot of `byte[]` on the JVM heap, we need to accept Netty ByteBuf buffer.

By passing a ByteBuf, an application can use a pooled buffer, pointing to direct memory, to
the `LedgerHandle.addEntry()` and have the same buffer forwarded on the connection sockets
to the bookies.

The same thing on the read side, `LedgerEntry` exposes an additional `getEntryBuffer()` method
that can be used to get the underlying buffer and possibly forward that to some other connection,
with zero-copy behavior (excluding getting data in-out of the kernel).

Author: Matteo Merli <mmerli@yahoo-inc.com>

Reviewers: Jia Zhai, Sijie Guo, Enrico Olivelli

Closes #155 from merlimat/byte-buf-ledger-entry


Project: http://git-wip-us.apache.org/repos/asf/bookkeeper/repo
Commit: http://git-wip-us.apache.org/repos/asf/bookkeeper/commit/7b1eec47
Tree: http://git-wip-us.apache.org/repos/asf/bookkeeper/tree/7b1eec47
Diff: http://git-wip-us.apache.org/repos/asf/bookkeeper/diff/7b1eec47

Branch: refs/heads/master
Commit: 7b1eec47092d0de6776c5a89575dbfc678165ee7
Parents: 8ba8ab9
Author: Matteo Merli <mmerli@yahoo-inc.com>
Authored: Thu May 25 14:23:03 2017 +0200
Committer: eolivelli <eolivelli@apache.org>
Committed: Thu May 25 14:23:03 2017 +0200

----------------------------------------------------------------------
 .../apache/bookkeeper/client/DigestManager.java | 15 +++-----
 .../apache/bookkeeper/client/LedgerEntry.java   | 39 +++++++++-----------
 .../client/LedgerFragmentReplicator.java        |  3 +-
 .../apache/bookkeeper/client/LedgerHandle.java  | 35 ++++++++++--------
 .../bookkeeper/client/LedgerHandleAdv.java      | 25 ++++++-------
 .../apache/bookkeeper/client/PendingReadOp.java | 10 ++---
 .../apache/bookkeeper/client/ClientUtil.java    |  3 +-
 7 files changed, 62 insertions(+), 68 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/7b1eec47/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DigestManager.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DigestManager.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DigestManager.java
index 396e6d9..ac28a15 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DigestManager.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DigestManager.java
@@ -19,7 +19,6 @@ package org.apache.bookkeeper.client;
  */
 
 import io.netty.buffer.ByteBuf;
-import io.netty.buffer.ByteBufInputStream;
 import io.netty.buffer.PooledByteBufAllocator;
 import io.netty.buffer.Unpooled;
 
@@ -83,22 +82,18 @@ abstract class DigestManager {
      * @param data
      * @return
      */
-
-    public ByteBuf computeDigestAndPackageForSending(long entryId, long lastAddConfirmed,
long length, byte[] data,
-            int doffset, int dlength) {
+    public ByteBuf computeDigestAndPackageForSending(long entryId, long lastAddConfirmed,
long length, ByteBuf data) {
         ByteBuf headersBuffer = PooledByteBufAllocator.DEFAULT.buffer(METADATA_LENGTH + macCodeLength);
         headersBuffer.writeLong(ledgerId);
         headersBuffer.writeLong(entryId);
         headersBuffer.writeLong(lastAddConfirmed);
         headersBuffer.writeLong(length);
 
-        ByteBuf dataBuffer = Unpooled.wrappedBuffer(data, doffset, dlength);
-
         update(headersBuffer);
-        update(dataBuffer);
+        update(data);
         populateValueAndReset(headersBuffer);
 
-        return DoubleByteBuf.get(headersBuffer, dataBuffer);
+        return DoubleByteBuf.get(headersBuffer, data);
     }
 
     /**
@@ -212,11 +207,11 @@ abstract class DigestManager {
      * @return
      * @throws BKDigestMatchException
      */
-    ByteBufInputStream verifyDigestAndReturnData(long entryId, ByteBuf dataReceived)
+    ByteBuf verifyDigestAndReturnData(long entryId, ByteBuf dataReceived)
             throws BKDigestMatchException {
         verifyDigest(entryId, dataReceived);
         dataReceived.readerIndex(METADATA_LENGTH + macCodeLength);
-        return new ByteBufInputStream(dataReceived);
+        return dataReceived;
     }
 
     static class RecoveryData {

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/7b1eec47/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerEntry.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerEntry.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerEntry.java
index 6502e05..c01ec54 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerEntry.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerEntry.java
@@ -21,14 +21,11 @@ package org.apache.bookkeeper.client;
  *
  */
 
+import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBufInputStream;
 
-import java.io.IOException;
 import java.io.InputStream;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 /**
  * Ledger entry. Its a simple tuple containing the ledger id, the entry-id, and
  * the entry content.
@@ -36,12 +33,10 @@ import org.slf4j.LoggerFactory;
  */
 
 public class LedgerEntry {
-    private final static Logger LOG = LoggerFactory.getLogger(LedgerEntry.class);
-
     long ledgerId;
     long entryId;
     long length;
-    ByteBufInputStream entryDataStream;
+    ByteBuf data;
 
     LedgerEntry(long lId, long eId) {
         this.ledgerId = lId;
@@ -61,23 +56,23 @@ public class LedgerEntry {
     }
 
     public byte[] getEntry() {
-        try {
-            // In general, you can't rely on the available() method of an input
-            // stream, but ChannelBufferInputStream is backed by a byte[] so it
-            // accurately knows the # bytes available
-            byte[] ret = new byte[entryDataStream.available()];
-            entryDataStream.readFully(ret);
-            return ret;
-        } catch (IOException e) {
-            // The channelbufferinput stream doesnt really throw the
-            // ioexceptions, it just has to be in the signature because
-            // InputStream says so. Hence this code, should never be reached.
-            LOG.error("Unexpected IOException while reading from channel buffer", e);
-            return new byte[0];
-        }
+        byte[] entry = new byte[data.readableBytes()];
+        data.readBytes(entry);
+        data.release();
+        return entry;
     }
 
     public InputStream getEntryInputStream() {
-        return entryDataStream;
+        return new ByteBufInputStream(data);
+    }
+
+    /**
+     * Return the internal buffer that contains the entry payload.
+     * <p>
+     *
+     * Note: It is responsibility of the caller to ensure to release the buffer after usage.
+     */
+    public ByteBuf getEntryBuffer() {
+        return data;
     }
 }

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/7b1eec47/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragmentReplicator.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragmentReplicator.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragmentReplicator.java
index 0522d50..fe1104a 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragmentReplicator.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragmentReplicator.java
@@ -20,6 +20,7 @@
 package org.apache.bookkeeper.client;
 
 import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
 
 import java.util.ArrayList;
 import java.util.Enumeration;
@@ -284,7 +285,7 @@ public class LedgerFragmentReplicator {
                 ByteBuf toSend = lh.getDigestManager()
                         .computeDigestAndPackageForSending(entryId,
                                 lh.getLastAddConfirmed(), entry.getLength(),
-                                data, 0, data.length);
+                                Unpooled.wrappedBuffer(data, 0, data.length));
                 bkc.getBookieClient().addEntry(newBookie, lh.getId(),
                         lh.getLedgerKey(), entryId, toSend,
                         new WriteCallback() {

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/7b1eec47/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
index 06f1d8c..f875923 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
@@ -22,6 +22,7 @@ package org.apache.bookkeeper.client;
 
 import static com.google.common.base.Charsets.UTF_8;
 import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
 
 import java.security.GeneralSecurityException;
 import java.security.NoSuchAlgorithmException;
@@ -722,8 +723,21 @@ public class LedgerHandle implements AutoCloseable {
      */
     public void asyncAddEntry(final byte[] data, final int offset, final int length,
                               final AddCallback cb, final Object ctx) {
+        if (offset < 0 || length < 0
+                || (offset + length) > data.length) {
+            throw new ArrayIndexOutOfBoundsException(
+                    "Invalid values for offset("+offset
+                    +") or length("+length+")");
+        }
+
+        PendingAddOp op = new PendingAddOp(LedgerHandle.this, cb, ctx);
+        doAsyncAddEntry(op, Unpooled.wrappedBuffer(data, offset, length), cb, ctx);
+    }
+
+    public void asyncAddEntry(ByteBuf data, final AddCallback cb, final Object ctx) {
+        data.retain();
         PendingAddOp op = new PendingAddOp(LedgerHandle.this, cb, ctx);
-        doAsyncAddEntry(op, data, offset, length, cb, ctx);
+        doAsyncAddEntry(op, data, cb, ctx);
     }
 
     /**
@@ -765,19 +779,10 @@ public class LedgerHandle implements AutoCloseable {
     void asyncRecoveryAddEntry(final byte[] data, final int offset, final int length,
                                final AddCallback cb, final Object ctx) {
         PendingAddOp op = new PendingAddOp(LedgerHandle.this, cb, ctx).enableRecoveryAdd();
-        doAsyncAddEntry(op, data, offset, length, cb, ctx);
+        doAsyncAddEntry(op, Unpooled.wrappedBuffer(data, offset, length), cb, ctx);
     }
 
-    void doAsyncAddEntry(final PendingAddOp op, final byte[] data, final int offset, final
int length,
-                         final AddCallback cb, final Object ctx) {
-
-        if (offset < 0 || length < 0
-                || (offset + length) > data.length) {
-            throw new ArrayIndexOutOfBoundsException(
-                    "Invalid values for offset(" +offset
-                    +") or length("+length+")");
-        }
-
+    protected void doAsyncAddEntry(final PendingAddOp op, final ByteBuf data, final AddCallback
cb, final Object ctx) {
         if (throttler != null) {
             throttler.acquire();
         }
@@ -795,7 +800,7 @@ public class LedgerHandle implements AutoCloseable {
                 currentLength = 0;
             } else {
                 entryId = ++lastAddPushed;
-                currentLength = addToLength(length);
+                currentLength = addToLength(data.readableBytes());
                 op.setEntryId(entryId);
                 pendingAddOps.add(op);
             }
@@ -829,9 +834,9 @@ public class LedgerHandle implements AutoCloseable {
                 @Override
                 public void safeRun() {
                     ByteBuf toSend = macManager.computeDigestAndPackageForSending(entryId,
lastAddConfirmed,
-                            currentLength, data, offset, length);
+                            currentLength, data);
                     try {
-                        op.initiate(toSend, length);
+                        op.initiate(toSend, data.readableBytes());
                     } finally {
                         toSend.release();
                     }

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/7b1eec47/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandleAdv.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandleAdv.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandleAdv.java
index ffc469e..f971b75 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandleAdv.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandleAdv.java
@@ -36,6 +36,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
 
 /**
  * Ledger Advanced handle extends {@link LedgerHandle} to provide API to add entries with
@@ -151,8 +152,8 @@ public class LedgerHandleAdv extends LedgerHandle {
             cb.addComplete(BKException.Code.DuplicateEntryIdException,
                     LedgerHandleAdv.this, entryId, ctx);
             return;
-        }       
-        doAsyncAddEntry(op, data, offset, length, cb, ctx);
+        }
+        doAsyncAddEntry(op, Unpooled.wrappedBuffer(data, offset, length), cb, ctx);
     }
 
     /**
@@ -161,14 +162,7 @@ public class LedgerHandleAdv extends LedgerHandle {
      * unaltered in the base class.
      */
     @Override
-    void doAsyncAddEntry(final PendingAddOp op, final byte[] data, final int offset, final
int length,
-            final AddCallback cb, final Object ctx) {
-        if (offset < 0 || length < 0
-                || (offset + length) > data.length) {
-            throw new ArrayIndexOutOfBoundsException(
-                "Invalid values for offset("+offset
-                +") or length("+length+")");
-        }
+    protected void doAsyncAddEntry(final PendingAddOp op, final ByteBuf data, final AddCallback
cb, final Object ctx) {
         if (throttler != null) {
             throttler.acquire();
         }
@@ -214,10 +208,13 @@ public class LedgerHandleAdv extends LedgerHandle {
             bk.mainWorkerPool.submit(new SafeRunnable() {
                 @Override
                 public void safeRun() {
-                    ByteBuf toSend = macManager.computeDigestAndPackageForSending(
-                                               op.getEntryId(), lastAddConfirmed, currentLength,
data, offset, length);
-                    op.initiate(toSend, length);
-                    toSend.release();
+                    ByteBuf toSend = macManager.computeDigestAndPackageForSending(op.getEntryId(),
lastAddConfirmed,
+                            currentLength, data);
+                    try {
+                        op.initiate(toSend, toSend.readableBytes());
+                    } finally {
+                        toSend.release();
+                    }
                 }
             });
         } catch (RejectedExecutionException e) {

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/7b1eec47/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
index 57a84f8..da15aab 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
@@ -21,7 +21,6 @@
 package org.apache.bookkeeper.client;
 
 import io.netty.buffer.ByteBuf;
-import io.netty.buffer.ByteBufInputStream;
 
 import java.util.ArrayList;
 import java.util.BitSet;
@@ -223,24 +222,25 @@ class PendingReadOp implements Enumeration<LedgerEntry>, ReadEntryCallback
{
         // return true if we managed to complete the entry
         // return false if the read entry is not complete or it is already completed before
         boolean complete(BookieSocketAddress host, final ByteBuf buffer) {
-            ByteBufInputStream is;
+            ByteBuf content;
             try {
-                is = lh.macManager.verifyDigestAndReturnData(entryId, buffer);
+                content = lh.macManager.verifyDigestAndReturnData(entryId, buffer);
             } catch (BKDigestMatchException e) {
                 logErrorAndReattemptRead(host, "Mac mismatch", BKException.Code.DigestMatchException);
+                buffer.release();
                 return false;
             }
 
             if (!complete.getAndSet(true)) {
-                entryDataStream = is;
-
                 /*
                  * The length is a long and it is the last field of the metadata of an entry.
                  * Consequently, we have to subtract 8 from METADATA_LENGTH to get the length.
                  */
                 length = buffer.getLong(DigestManager.METADATA_LENGTH - 8);
+                data = content;
                 return true;
             } else {
+                buffer.release();
                 return false;
             }
         }

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/7b1eec47/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ClientUtil.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ClientUtil.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ClientUtil.java
index 25b0d6b..d1ebef2 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ClientUtil.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ClientUtil.java
@@ -18,6 +18,7 @@
 package org.apache.bookkeeper.client;
 
 import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
 
 public class ClientUtil {
     public static ByteBuf generatePacket(long ledgerId, long entryId, long lastAddConfirmed,
@@ -29,7 +30,7 @@ public class ClientUtil {
                                                long length, byte[] data, int offset, int
len) {
         CRC32DigestManager dm = new CRC32DigestManager(ledgerId);
         return dm.computeDigestAndPackageForSending(entryId, lastAddConfirmed, length,
-                                                    data, offset, len);
+                                                    Unpooled.wrappedBuffer(data, offset,
len));
     }
 
     /** Returns that whether ledger is in open state */


Mime
View raw message