Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id D4C86200C80 for ; Thu, 25 May 2017 14:23:10 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id D3579160BCA; Thu, 25 May 2017 12:23:10 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id A465C160BC7 for ; Thu, 25 May 2017 14:23:09 +0200 (CEST) Received: (qmail 91539 invoked by uid 500); 25 May 2017 12:23:08 -0000 Mailing-List: contact commits-help@bookkeeper.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: bookkeeper-dev@bookkeeper.apache.org Delivered-To: mailing list commits@bookkeeper.apache.org Received: (qmail 91530 invoked by uid 99); 25 May 2017 12:23:08 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 25 May 2017 12:23:08 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 9E3CDDFAEB; Thu, 25 May 2017 12:23:08 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: eolivelli@apache.org To: commits@bookkeeper.apache.org Message-Id: <2ea08f88863c41e79c9878ea4b700cdf@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: bookkeeper git commit: BOOKKEEPER-1068: Expose ByteBuf in LedgerEntry to avoid data copy Date: Thu, 25 May 2017 12:23:08 +0000 (UTC) archived-at: Thu, 25 May 2017 12:23:11 -0000 Repository: bookkeeper Updated Branches: refs/heads/master 8ba8ab940 -> 7b1eec470 BOOKKEEPER-1068: Expose ByteBuf in LedgerEntry to avoid data copy To avoid copying the entries payloads when writing/reading on a ledger and having to allocate a lot of `byte[]` on the JVM heap, we need to accept Netty ByteBuf buffer. By passing a ByteBuf, an application can use a pooled buffer, pointing to direct memory, to the `LedgerHandle.addEntry()` and have the same buffer forwarded on the connection sockets to the bookies. The same thing on the read side, `LedgerEntry` exposes an additional `getEntryBuffer()` method that can be used to get the underlying buffer and possibly forward that to some other connection, with zero-copy behavior (excluding getting data in-out of the kernel). Author: Matteo Merli Reviewers: Jia Zhai, Sijie Guo, Enrico Olivelli Closes #155 from merlimat/byte-buf-ledger-entry Project: http://git-wip-us.apache.org/repos/asf/bookkeeper/repo Commit: http://git-wip-us.apache.org/repos/asf/bookkeeper/commit/7b1eec47 Tree: http://git-wip-us.apache.org/repos/asf/bookkeeper/tree/7b1eec47 Diff: http://git-wip-us.apache.org/repos/asf/bookkeeper/diff/7b1eec47 Branch: refs/heads/master Commit: 7b1eec47092d0de6776c5a89575dbfc678165ee7 Parents: 8ba8ab9 Author: Matteo Merli Authored: Thu May 25 14:23:03 2017 +0200 Committer: eolivelli Committed: Thu May 25 14:23:03 2017 +0200 ---------------------------------------------------------------------- .../apache/bookkeeper/client/DigestManager.java | 15 +++----- .../apache/bookkeeper/client/LedgerEntry.java | 39 +++++++++----------- .../client/LedgerFragmentReplicator.java | 3 +- .../apache/bookkeeper/client/LedgerHandle.java | 35 ++++++++++-------- .../bookkeeper/client/LedgerHandleAdv.java | 25 ++++++------- .../apache/bookkeeper/client/PendingReadOp.java | 10 ++--- .../apache/bookkeeper/client/ClientUtil.java | 3 +- 7 files changed, 62 insertions(+), 68 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/7b1eec47/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DigestManager.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DigestManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DigestManager.java index 396e6d9..ac28a15 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DigestManager.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DigestManager.java @@ -19,7 +19,6 @@ package org.apache.bookkeeper.client; */ import io.netty.buffer.ByteBuf; -import io.netty.buffer.ByteBufInputStream; import io.netty.buffer.PooledByteBufAllocator; import io.netty.buffer.Unpooled; @@ -83,22 +82,18 @@ abstract class DigestManager { * @param data * @return */ - - public ByteBuf computeDigestAndPackageForSending(long entryId, long lastAddConfirmed, long length, byte[] data, - int doffset, int dlength) { + public ByteBuf computeDigestAndPackageForSending(long entryId, long lastAddConfirmed, long length, ByteBuf data) { ByteBuf headersBuffer = PooledByteBufAllocator.DEFAULT.buffer(METADATA_LENGTH + macCodeLength); headersBuffer.writeLong(ledgerId); headersBuffer.writeLong(entryId); headersBuffer.writeLong(lastAddConfirmed); headersBuffer.writeLong(length); - ByteBuf dataBuffer = Unpooled.wrappedBuffer(data, doffset, dlength); - update(headersBuffer); - update(dataBuffer); + update(data); populateValueAndReset(headersBuffer); - return DoubleByteBuf.get(headersBuffer, dataBuffer); + return DoubleByteBuf.get(headersBuffer, data); } /** @@ -212,11 +207,11 @@ abstract class DigestManager { * @return * @throws BKDigestMatchException */ - ByteBufInputStream verifyDigestAndReturnData(long entryId, ByteBuf dataReceived) + ByteBuf verifyDigestAndReturnData(long entryId, ByteBuf dataReceived) throws BKDigestMatchException { verifyDigest(entryId, dataReceived); dataReceived.readerIndex(METADATA_LENGTH + macCodeLength); - return new ByteBufInputStream(dataReceived); + return dataReceived; } static class RecoveryData { http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/7b1eec47/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerEntry.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerEntry.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerEntry.java index 6502e05..c01ec54 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerEntry.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerEntry.java @@ -21,14 +21,11 @@ package org.apache.bookkeeper.client; * */ +import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufInputStream; -import java.io.IOException; import java.io.InputStream; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - /** * Ledger entry. Its a simple tuple containing the ledger id, the entry-id, and * the entry content. @@ -36,12 +33,10 @@ import org.slf4j.LoggerFactory; */ public class LedgerEntry { - private final static Logger LOG = LoggerFactory.getLogger(LedgerEntry.class); - long ledgerId; long entryId; long length; - ByteBufInputStream entryDataStream; + ByteBuf data; LedgerEntry(long lId, long eId) { this.ledgerId = lId; @@ -61,23 +56,23 @@ public class LedgerEntry { } public byte[] getEntry() { - try { - // In general, you can't rely on the available() method of an input - // stream, but ChannelBufferInputStream is backed by a byte[] so it - // accurately knows the # bytes available - byte[] ret = new byte[entryDataStream.available()]; - entryDataStream.readFully(ret); - return ret; - } catch (IOException e) { - // The channelbufferinput stream doesnt really throw the - // ioexceptions, it just has to be in the signature because - // InputStream says so. Hence this code, should never be reached. - LOG.error("Unexpected IOException while reading from channel buffer", e); - return new byte[0]; - } + byte[] entry = new byte[data.readableBytes()]; + data.readBytes(entry); + data.release(); + return entry; } public InputStream getEntryInputStream() { - return entryDataStream; + return new ByteBufInputStream(data); + } + + /** + * Return the internal buffer that contains the entry payload. + *

+ * + * Note: It is responsibility of the caller to ensure to release the buffer after usage. + */ + public ByteBuf getEntryBuffer() { + return data; } } http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/7b1eec47/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragmentReplicator.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragmentReplicator.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragmentReplicator.java index 0522d50..fe1104a 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragmentReplicator.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragmentReplicator.java @@ -20,6 +20,7 @@ package org.apache.bookkeeper.client; import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; import java.util.ArrayList; import java.util.Enumeration; @@ -284,7 +285,7 @@ public class LedgerFragmentReplicator { ByteBuf toSend = lh.getDigestManager() .computeDigestAndPackageForSending(entryId, lh.getLastAddConfirmed(), entry.getLength(), - data, 0, data.length); + Unpooled.wrappedBuffer(data, 0, data.length)); bkc.getBookieClient().addEntry(newBookie, lh.getId(), lh.getLedgerKey(), entryId, toSend, new WriteCallback() { http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/7b1eec47/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java index 06f1d8c..f875923 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java @@ -22,6 +22,7 @@ package org.apache.bookkeeper.client; import static com.google.common.base.Charsets.UTF_8; import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; import java.security.GeneralSecurityException; import java.security.NoSuchAlgorithmException; @@ -722,8 +723,21 @@ public class LedgerHandle implements AutoCloseable { */ public void asyncAddEntry(final byte[] data, final int offset, final int length, final AddCallback cb, final Object ctx) { + if (offset < 0 || length < 0 + || (offset + length) > data.length) { + throw new ArrayIndexOutOfBoundsException( + "Invalid values for offset("+offset + +") or length("+length+")"); + } + + PendingAddOp op = new PendingAddOp(LedgerHandle.this, cb, ctx); + doAsyncAddEntry(op, Unpooled.wrappedBuffer(data, offset, length), cb, ctx); + } + + public void asyncAddEntry(ByteBuf data, final AddCallback cb, final Object ctx) { + data.retain(); PendingAddOp op = new PendingAddOp(LedgerHandle.this, cb, ctx); - doAsyncAddEntry(op, data, offset, length, cb, ctx); + doAsyncAddEntry(op, data, cb, ctx); } /** @@ -765,19 +779,10 @@ public class LedgerHandle implements AutoCloseable { void asyncRecoveryAddEntry(final byte[] data, final int offset, final int length, final AddCallback cb, final Object ctx) { PendingAddOp op = new PendingAddOp(LedgerHandle.this, cb, ctx).enableRecoveryAdd(); - doAsyncAddEntry(op, data, offset, length, cb, ctx); + doAsyncAddEntry(op, Unpooled.wrappedBuffer(data, offset, length), cb, ctx); } - void doAsyncAddEntry(final PendingAddOp op, final byte[] data, final int offset, final int length, - final AddCallback cb, final Object ctx) { - - if (offset < 0 || length < 0 - || (offset + length) > data.length) { - throw new ArrayIndexOutOfBoundsException( - "Invalid values for offset(" +offset - +") or length("+length+")"); - } - + protected void doAsyncAddEntry(final PendingAddOp op, final ByteBuf data, final AddCallback cb, final Object ctx) { if (throttler != null) { throttler.acquire(); } @@ -795,7 +800,7 @@ public class LedgerHandle implements AutoCloseable { currentLength = 0; } else { entryId = ++lastAddPushed; - currentLength = addToLength(length); + currentLength = addToLength(data.readableBytes()); op.setEntryId(entryId); pendingAddOps.add(op); } @@ -829,9 +834,9 @@ public class LedgerHandle implements AutoCloseable { @Override public void safeRun() { ByteBuf toSend = macManager.computeDigestAndPackageForSending(entryId, lastAddConfirmed, - currentLength, data, offset, length); + currentLength, data); try { - op.initiate(toSend, length); + op.initiate(toSend, data.readableBytes()); } finally { toSend.release(); } http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/7b1eec47/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandleAdv.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandleAdv.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandleAdv.java index ffc469e..f971b75 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandleAdv.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandleAdv.java @@ -36,6 +36,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; /** * Ledger Advanced handle extends {@link LedgerHandle} to provide API to add entries with @@ -151,8 +152,8 @@ public class LedgerHandleAdv extends LedgerHandle { cb.addComplete(BKException.Code.DuplicateEntryIdException, LedgerHandleAdv.this, entryId, ctx); return; - } - doAsyncAddEntry(op, data, offset, length, cb, ctx); + } + doAsyncAddEntry(op, Unpooled.wrappedBuffer(data, offset, length), cb, ctx); } /** @@ -161,14 +162,7 @@ public class LedgerHandleAdv extends LedgerHandle { * unaltered in the base class. */ @Override - void doAsyncAddEntry(final PendingAddOp op, final byte[] data, final int offset, final int length, - final AddCallback cb, final Object ctx) { - if (offset < 0 || length < 0 - || (offset + length) > data.length) { - throw new ArrayIndexOutOfBoundsException( - "Invalid values for offset("+offset - +") or length("+length+")"); - } + protected void doAsyncAddEntry(final PendingAddOp op, final ByteBuf data, final AddCallback cb, final Object ctx) { if (throttler != null) { throttler.acquire(); } @@ -214,10 +208,13 @@ public class LedgerHandleAdv extends LedgerHandle { bk.mainWorkerPool.submit(new SafeRunnable() { @Override public void safeRun() { - ByteBuf toSend = macManager.computeDigestAndPackageForSending( - op.getEntryId(), lastAddConfirmed, currentLength, data, offset, length); - op.initiate(toSend, length); - toSend.release(); + ByteBuf toSend = macManager.computeDigestAndPackageForSending(op.getEntryId(), lastAddConfirmed, + currentLength, data); + try { + op.initiate(toSend, toSend.readableBytes()); + } finally { + toSend.release(); + } } }); } catch (RejectedExecutionException e) { http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/7b1eec47/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java index 57a84f8..da15aab 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java @@ -21,7 +21,6 @@ package org.apache.bookkeeper.client; import io.netty.buffer.ByteBuf; -import io.netty.buffer.ByteBufInputStream; import java.util.ArrayList; import java.util.BitSet; @@ -223,24 +222,25 @@ class PendingReadOp implements Enumeration, ReadEntryCallback { // return true if we managed to complete the entry // return false if the read entry is not complete or it is already completed before boolean complete(BookieSocketAddress host, final ByteBuf buffer) { - ByteBufInputStream is; + ByteBuf content; try { - is = lh.macManager.verifyDigestAndReturnData(entryId, buffer); + content = lh.macManager.verifyDigestAndReturnData(entryId, buffer); } catch (BKDigestMatchException e) { logErrorAndReattemptRead(host, "Mac mismatch", BKException.Code.DigestMatchException); + buffer.release(); return false; } if (!complete.getAndSet(true)) { - entryDataStream = is; - /* * The length is a long and it is the last field of the metadata of an entry. * Consequently, we have to subtract 8 from METADATA_LENGTH to get the length. */ length = buffer.getLong(DigestManager.METADATA_LENGTH - 8); + data = content; return true; } else { + buffer.release(); return false; } } http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/7b1eec47/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ClientUtil.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ClientUtil.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ClientUtil.java index 25b0d6b..d1ebef2 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ClientUtil.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ClientUtil.java @@ -18,6 +18,7 @@ package org.apache.bookkeeper.client; import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; public class ClientUtil { public static ByteBuf generatePacket(long ledgerId, long entryId, long lastAddConfirmed, @@ -29,7 +30,7 @@ public class ClientUtil { long length, byte[] data, int offset, int len) { CRC32DigestManager dm = new CRC32DigestManager(ledgerId); return dm.computeDigestAndPackageForSending(entryId, lastAddConfirmed, length, - data, offset, len); + Unpooled.wrappedBuffer(data, offset, len)); } /** Returns that whether ledger is in open state */