Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 49BA4200C57 for ; Sat, 15 Apr 2017 13:59:03 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 48401160BA0; Sat, 15 Apr 2017 11:59:03 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id C08AA160B9D for ; Sat, 15 Apr 2017 13:59:01 +0200 (CEST) Received: (qmail 23957 invoked by uid 500); 15 Apr 2017 11:59:00 -0000 Mailing-List: contact commits-help@bookkeeper.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: bookkeeper-dev@bookkeeper.apache.org Delivered-To: mailing list commits@bookkeeper.apache.org Received: (qmail 23948 invoked by uid 99); 15 Apr 2017 11:59:00 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 15 Apr 2017 11:59:00 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 216DADFC8E; Sat, 15 Apr 2017 11:59:00 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: eolivelli@apache.org To: commits@bookkeeper.apache.org Message-Id: <8ca32d7c82e447959cf4c7590cceaac4@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: bookkeeper git commit: BOOKKEEPER-1018: Allow client to select older V2 protocol (no protobuf) Date: Sat, 15 Apr 2017 11:59:00 +0000 (UTC) archived-at: Sat, 15 Apr 2017 11:59:03 -0000 Repository: bookkeeper Updated Branches: refs/heads/master 24fae0322 -> f74d07d6b BOOKKEEPER-1018: Allow client to select older V2 protocol (no protobuf) Tested manually - running all tests locally. Will tag contributors. when ready for review (for now putting up to test via Screwdriver and manual review) Author: Govind Menon Reviewers: Enrico Olivelli , Sijie Guo Closes #126 from govind-menon/BOOKKEEPER-1018 Project: http://git-wip-us.apache.org/repos/asf/bookkeeper/repo Commit: http://git-wip-us.apache.org/repos/asf/bookkeeper/commit/f74d07d6 Tree: http://git-wip-us.apache.org/repos/asf/bookkeeper/tree/f74d07d6 Diff: http://git-wip-us.apache.org/repos/asf/bookkeeper/diff/f74d07d6 Branch: refs/heads/master Commit: f74d07d6b96ba3e6f1270d053fa36676cf680304 Parents: 24fae03 Author: Govind Menon Authored: Sat Apr 15 13:58:34 2017 +0200 Committer: family Committed: Sat Apr 15 13:58:34 2017 +0200 ---------------------------------------------------------------------- .../bookkeeper/conf/ClientConfiguration.java | 23 +- .../apache/bookkeeper/proto/AuthHandler.java | 8 + .../apache/bookkeeper/proto/BookieProtocol.java | 3 + .../proto/PerChannelBookieClient.java | 446 +++++++++++++------ 4 files changed, 345 insertions(+), 135 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/f74d07d6/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java index ee137c0..2b75e9e 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java @@ -57,7 +57,7 @@ public class ClientConfiguration extends AbstractConfiguration { protected final static String CLIENT_WRITEBUFFER_HIGH_WATER_MARK = "clientWriteBufferHighWaterMark"; protected final static String CLIENT_CONNECT_TIMEOUT_MILLIS = "clientConnectTimeoutMillis"; protected final static String NUM_CHANNELS_PER_BOOKIE = "numChannelsPerBookie"; - + protected final static String USE_V2_WIRE_PROTOCOL = "useV2WireProtocol"; // Read Parameters protected final static String READ_TIMEOUT = "readTimeout"; protected final static String SPECULATIVE_READ_TIMEOUT = "speculativeReadTimeout"; @@ -436,6 +436,27 @@ public class ClientConfiguration extends AbstractConfiguration { } /** + * Use older Bookkeeper wire protocol (no protobuf) + * + * @return whether or not to use older Bookkeeper wire protocol (no protobuf) + */ + public boolean getUseV2WireProtocol() { + return getBoolean(USE_V2_WIRE_PROTOCOL, false); + } + + /** + * Set whether or not to use older Bookkeeper wire protocol (no protobuf) + * + * @param useV2WireProtocol + * whether or not to use older Bookkeeper wire protocol (no protobuf) + * @return client configuration. + */ + public ClientConfiguration setUseV2WireProtocol(boolean useV2WireProtocol) { + setProperty(USE_V2_WIRE_PROTOCOL, useV2WireProtocol); + return this; + } + + /** * Get zookeeper servers to connect * * @return zookeeper servers http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/f74d07d6/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/AuthHandler.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/AuthHandler.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/AuthHandler.java index 75dced5..d2608e7 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/AuthHandler.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/AuthHandler.java @@ -322,6 +322,14 @@ class AuthHandler { } else { waitingForAuth.add(e); } + } else if (e.getMessage() instanceof BookieProtocol.Request) { + // let auth messages through, queue the rest + BookieProtocol.Request req = (BookieProtocol.Request)e.getMessage(); + if (BookkeeperProtocol.OperationType.AUTH.getNumber() == req.getOpCode()) { + super.writeRequested(ctx, e); + } else { + waitingForAuth.add(e); + } } // else just drop } } http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/f74d07d6/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java index 2ce5ed8..1191d3c 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java @@ -140,6 +140,9 @@ public interface BookieProtocol { * by the auth providers themselves. */ public static final byte AUTH = 3; + public static final byte READ_LAC = 4; + public static final byte WRITE_LAC = 5; + public static final byte GET_BOOKIE_INFO = 6; /** * The error code that indicates success http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/f74d07d6/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java index f6e9e8f..be169b6 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java @@ -18,11 +18,15 @@ package org.apache.bookkeeper.proto; import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.MappedByteBuffer; import java.nio.channels.ClosedChannelException; import java.util.ArrayDeque; import java.util.Collections; +import java.util.Collection; import java.util.Queue; import java.util.Set; +import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; @@ -98,7 +102,7 @@ import com.google.common.collect.Sets; import com.google.protobuf.ByteString; import com.google.protobuf.ExtensionRegistry; import java.net.SocketAddress; -import java.util.Collection; + import org.apache.bookkeeper.auth.BookKeeperPrincipal; import org.jboss.netty.channel.ChannelFactory; import org.apache.bookkeeper.client.ClientConnectionPeer; @@ -147,6 +151,8 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan private final OpStatsLogger getBookieInfoOpLogger; private final OpStatsLogger getBookieInfoTimeoutOpLogger; + private final boolean useV2WireProtocol; + /** * The following member variables do not need to be concurrent, or volatile * because they are always updated under a lock @@ -202,6 +208,7 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan this.addEntryTimeout = conf.getAddEntryTimeout(); this.readEntryTimeout = conf.getReadEntryTimeout(); this.getBookieInfoTimeout = conf.getBookieInfoTimeout(); + this.useV2WireProtocol = conf.getUseV2WireProtocol(); this.authProviderFactory = authProviderFactory; this.extRegistry = extRegistry; @@ -469,33 +476,45 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan */ void addEntry(final long ledgerId, byte[] masterKey, final long entryId, ChannelBuffer toSend, WriteCallback cb, Object ctx, final int options) { - final long txnId = getTxnId(); - final int entrySize = toSend.readableBytes(); - final CompletionKey completionKey = new CompletionKey(txnId, OperationType.ADD_ENTRY); - completionObjects.put(completionKey, - new AddCompletion(this, addEntryOpLogger, cb, ctx, ledgerId, entryId, - scheduleTimeout(completionKey, addEntryTimeout))); - - // Build the request and calculate the total size to be included in the packet. - BKPacketHeader.Builder headerBuilder = BKPacketHeader.newBuilder() - .setVersion(ProtocolVersion.VERSION_THREE) - .setOperation(OperationType.ADD_ENTRY) - .setTxnId(txnId); + Object request = null; + CompletionKey completion = null; + if (useV2WireProtocol) { + completion = new V2CompletionKey(ledgerId, entryId, OperationType.ADD_ENTRY); + request = new BookieProtocol.AddRequest(BookieProtocol.CURRENT_PROTOCOL_VERSION, ledgerId, entryId, + (short) options, masterKey, toSend); - AddRequest.Builder addBuilder = AddRequest.newBuilder() - .setLedgerId(ledgerId) - .setEntryId(entryId) - .setMasterKey(ByteString.copyFrom(masterKey)) - .setBody(ByteString.copyFrom(toSend.toByteBuffer())); - if (((short)options & BookieProtocol.FLAG_RECOVERY_ADD) == BookieProtocol.FLAG_RECOVERY_ADD) { - addBuilder.setFlag(AddRequest.Flag.RECOVERY_ADD); + } else { + final long txnId = getTxnId(); + completion = new CompletionKey(txnId, OperationType.ADD_ENTRY); + // Build the request and calculate the total size to be included in the packet. + BKPacketHeader.Builder headerBuilder = BKPacketHeader.newBuilder() + .setVersion(ProtocolVersion.VERSION_THREE) + .setOperation(OperationType.ADD_ENTRY) + .setTxnId(txnId); + + AddRequest.Builder addBuilder = AddRequest.newBuilder() + .setLedgerId(ledgerId) + .setEntryId(entryId) + .setMasterKey(ByteString.copyFrom(masterKey)) + .setBody(ByteString.copyFrom(toSend.toByteBuffer())); + + if (((short) options & BookieProtocol.FLAG_RECOVERY_ADD) == BookieProtocol.FLAG_RECOVERY_ADD) { + addBuilder.setFlag(AddRequest.Flag.RECOVERY_ADD); + } + request = Request.newBuilder() + .setHeader(headerBuilder) + .setAddRequest(addBuilder) + .build(); } - final Request addRequest = Request.newBuilder() - .setHeader(headerBuilder) - .setAddRequest(addBuilder) - .build(); + final Object addRequest = request; + final CompletionKey completionKey = completion; + + completionObjects.put(completionKey, new AddCompletion(this, + addEntryOpLogger, cb, ctx, ledgerId, entryId, scheduleTimeout(completion, addEntryTimeout))); + + final int entrySize = toSend.readableBytes(); final Channel c = channel; if (c == null) { @@ -531,28 +550,41 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan public void readEntryAndFenceLedger(final long ledgerId, byte[] masterKey, final long entryId, ReadEntryCallback cb, Object ctx) { - final long txnId = getTxnId(); - final CompletionKey completionKey = new CompletionKey(txnId, OperationType.READ_ENTRY); - completionObjects.put(completionKey, - new ReadCompletion(this, readEntryOpLogger, cb, ctx, ledgerId, entryId, - scheduleTimeout(completionKey, readEntryTimeout))); - - // Build the request and calculate the total size to be included in the packet. - BKPacketHeader.Builder headerBuilder = BKPacketHeader.newBuilder() - .setVersion(ProtocolVersion.VERSION_THREE) - .setOperation(OperationType.READ_ENTRY) - .setTxnId(txnId); - - ReadRequest.Builder readBuilder = ReadRequest.newBuilder() - .setLedgerId(ledgerId) - .setEntryId(entryId) - .setMasterKey(ByteString.copyFrom(masterKey)) - .setFlag(ReadRequest.Flag.FENCE_LEDGER); - - final Request readRequest = Request.newBuilder() - .setHeader(headerBuilder) - .setReadRequest(readBuilder) - .build(); + Object request = null; + CompletionKey completion = null; + if (useV2WireProtocol) { + completion = new V2CompletionKey(ledgerId, entryId, OperationType.READ_ENTRY); + request = new BookieProtocol.ReadRequest(BookieProtocol.CURRENT_PROTOCOL_VERSION, ledgerId, entryId, + BookieProtocol.FLAG_DO_FENCING, masterKey); + } else { + final long txnId = getTxnId(); + completion = new CompletionKey(txnId, OperationType.READ_ENTRY); + + // Build the request and calculate the total size to be included in the packet. + BKPacketHeader.Builder headerBuilder = BKPacketHeader.newBuilder() + .setVersion(ProtocolVersion.VERSION_THREE) + .setOperation(OperationType.READ_ENTRY) + .setTxnId(txnId); + + ReadRequest.Builder readBuilder = ReadRequest.newBuilder() + .setLedgerId(ledgerId) + .setEntryId(entryId) + .setMasterKey(ByteString.copyFrom(masterKey)) + .setFlag(ReadRequest.Flag.FENCE_LEDGER); + + request = Request.newBuilder() + .setHeader(headerBuilder) + .setReadRequest(readBuilder) + .build(); + } + + final CompletionKey completionKey = completion; + if (completionObjects.putIfAbsent(completionKey, new ReadCompletion(this, readEntryOpLogger, cb, + ctx, ledgerId, entryId, scheduleTimeout(completionKey, readEntryTimeout))) != null) { + // We cannot have more than 1 pending read on the same ledger/entry in the v2 protocol + cb.readEntryComplete(BKException.Code.BookieHandleNotAvailableException, ledgerId, entryId, null, ctx); + return; + } final Channel c = channel; if (c == null) { @@ -560,6 +592,7 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan return; } + final Object readRequest = request; try { ChannelFuture future = c.write(readRequest); future.addListener(new ChannelFutureListener() { @@ -586,22 +619,34 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan } public void readLac(final long ledgerId, ReadLacCallback cb, Object ctx) { - final long txnId = getTxnId(); - final CompletionKey completionKey = new CompletionKey(txnId, OperationType.READ_LAC); + Object request = null; + CompletionKey completion = null; + if (useV2WireProtocol) { + request = new BookieProtocol.ReadRequest(BookieProtocol.CURRENT_PROTOCOL_VERSION, + ledgerId, (long) 0, (short) 0); + completion = new V2CompletionKey(ledgerId, (long) 0, OperationType.READ_LAC); + } else { + final long txnId = getTxnId(); + completion = new CompletionKey(txnId, OperationType.READ_LAC); + + // Build the request and calculate the total size to be included in the packet. + BKPacketHeader.Builder headerBuilder = BKPacketHeader.newBuilder() + .setVersion(ProtocolVersion.VERSION_THREE) + .setOperation(OperationType.READ_LAC) + .setTxnId(txnId); + ReadLacRequest.Builder readLacBuilder = ReadLacRequest.newBuilder() + .setLedgerId(ledgerId); + request = Request.newBuilder() + .setHeader(headerBuilder) + .setReadLacRequest(readLacBuilder) + .build(); + } + final Object readLacRequest = request; + final CompletionKey completionKey = completion; + completionObjects.put(completionKey, new ReadLacCompletion(readLacOpLogger, cb, ctx, ledgerId, scheduleTimeout(completionKey, readEntryTimeout))); - // Build the request and calculate the total size to be included in the packet. - BKPacketHeader.Builder headerBuilder = BKPacketHeader.newBuilder() - .setVersion(ProtocolVersion.VERSION_THREE) - .setOperation(OperationType.READ_LAC) - .setTxnId(txnId); - ReadLacRequest.Builder readLacBuilder = ReadLacRequest.newBuilder() - .setLedgerId(ledgerId); - final Request readLacRequest = Request.newBuilder() - .setHeader(headerBuilder) - .setReadLacRequest(readLacBuilder) - .build(); final Channel c = channel; if (c == null) { errorOutReadLacKey(completionKey); @@ -632,27 +677,37 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan } public void readEntry(final long ledgerId, final long entryId, ReadEntryCallback cb, Object ctx) { - final long txnId = getTxnId(); - final CompletionKey completionKey = new CompletionKey(txnId, OperationType.READ_ENTRY); - completionObjects.put(completionKey, - new ReadCompletion(this, readEntryOpLogger, cb, ctx, ledgerId, entryId, - scheduleTimeout(completionKey, readEntryTimeout))); + Object request = null; + CompletionKey completion = null; + if (useV2WireProtocol) { + request = new BookieProtocol.ReadRequest(BookieProtocol.CURRENT_PROTOCOL_VERSION, + ledgerId, entryId, (short) 0); + completion = new V2CompletionKey(ledgerId, entryId, OperationType.READ_ENTRY); + } else { + final long txnId = getTxnId(); + completion = new CompletionKey(txnId, OperationType.READ_ENTRY); - // Build the request and calculate the total size to be included in the packet. - BKPacketHeader.Builder headerBuilder = BKPacketHeader.newBuilder() - .setVersion(ProtocolVersion.VERSION_THREE) - .setOperation(OperationType.READ_ENTRY) - .setTxnId(txnId); + // Build the request and calculate the total size to be included in the packet. + BKPacketHeader.Builder headerBuilder = BKPacketHeader.newBuilder() + .setVersion(ProtocolVersion.VERSION_THREE) + .setOperation(OperationType.READ_ENTRY) + .setTxnId(txnId); - ReadRequest.Builder readBuilder = ReadRequest.newBuilder() - .setLedgerId(ledgerId) - .setEntryId(entryId); + ReadRequest.Builder readBuilder = ReadRequest.newBuilder() + .setLedgerId(ledgerId) + .setEntryId(entryId); - final Request readRequest = Request.newBuilder() - .setHeader(headerBuilder) - .setReadRequest(readBuilder) - .build(); + request = Request.newBuilder() + .setHeader(headerBuilder) + .setReadRequest(readBuilder) + .build(); + } + final Object readRequest = request; + final CompletionKey completionKey = completion; + completionObjects.put(completionKey, + new ReadCompletion(this, readEntryOpLogger, cb, ctx, ledgerId, entryId, + scheduleTimeout(completionKey, readEntryTimeout))); final Channel c = channel; if (c == null) { errorOutReadKey(completionKey); @@ -803,6 +858,7 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan } void errorOutReadKey(final CompletionKey key, final int rc) { + LOG.debug("Removing completion key: {}", key); final ReadCompletion readCompletion = (ReadCompletion)completionObjects.remove(key); if (null == readCompletion) { return; @@ -835,6 +891,7 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan } void errorOutWriteLacKey(final CompletionKey key, final int rc) { + LOG.debug("Removing completion key: {}", key); final WriteLacCompletion writeLacCompletion = (WriteLacCompletion)completionObjects.remove(key); if (null == writeLacCompletion) { return; @@ -859,6 +916,7 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan } void errorOutReadLacKey(final CompletionKey key, final int rc) { + LOG.debug("Removing completion key: {}", key); final ReadLacCompletion readLacCompletion = (ReadLacCompletion)completionObjects.remove(key); if (null == readLacCompletion) { return; @@ -1058,12 +1116,107 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan */ @Override public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception { - if (!(e.getMessage() instanceof Response)) { + + if (e.getMessage() instanceof BookieProtocol.Response) { + BookieProtocol.Response response = (BookieProtocol.Response) e.getMessage(); + readV2Response(response); + } else if (e.getMessage() instanceof Response) { + Response response = (Response) e.getMessage(); + readV3Response(response); + } else { ctx.sendUpstream(e); - return; } + } + + private void readV2Response(final BookieProtocol.Response response) { + final long ledgerId = response.ledgerId; + final long entryId = response.entryId; - final Response response = (Response) e.getMessage(); + final OperationType operationType = getOperationType(response.getOpCode()); + final StatusCode status = getStatusCodeFromErrorCode(response.errorCode); + + final CompletionValue completionValue = completionObjects.remove(new V2CompletionKey(ledgerId, entryId, operationType)); + + if (null == completionValue) { + // Unexpected response, so log it. The txnId should have been present. + if (LOG.isDebugEnabled()) { + LOG.debug("Unexpected response received from bookie : " + addr + " for type : " + operationType + + " and ledger:entry : " + ledgerId + ":" + entryId); + } + } else { + long orderingKey = completionValue.ledgerId; + + executor.submitOrdered(orderingKey, new SafeRunnable() { + @Override + public void safeRun() { + switch (operationType) { + case ADD_ENTRY: { + handleAddResponse(ledgerId, entryId, status, completionValue); + break; + } + case READ_ENTRY: { + BookieProtocol.ReadResponse readResponse = (BookieProtocol.ReadResponse) response; + ChannelBuffer data = null; + if (readResponse.hasData()) { + data = readResponse.getData(); + } + handleReadResponse(ledgerId, entryId, status, data, completionValue); + break; + } + default: + LOG.error("Unexpected response, type:{} received from bookie:{}, ignoring", operationType, addr); + break; + } + } + }); + } + } + + private StatusCode getStatusCodeFromErrorCode(int errorCode) { + switch (errorCode) { + case BookieProtocol.EOK: + return StatusCode.EOK; + case BookieProtocol.ENOLEDGER: + return StatusCode.ENOLEDGER; + case BookieProtocol.ENOENTRY: + return StatusCode.ENOENTRY; + case BookieProtocol.EBADREQ: + return StatusCode.EBADREQ; + case BookieProtocol.EIO: + return StatusCode.EIO; + case BookieProtocol.EUA: + return StatusCode.EUA; + case BookieProtocol.EBADVERSION: + return StatusCode.EBADVERSION; + case BookieProtocol.EFENCED: + return StatusCode.EFENCED; + case BookieProtocol.EREADONLY: + return StatusCode.EREADONLY; + default: + throw new IllegalArgumentException("Invalid error code: " + errorCode); + } + } + + private OperationType getOperationType(byte opCode) { + switch (opCode) { + case BookieProtocol.ADDENTRY: + return OperationType.ADD_ENTRY; + case BookieProtocol.READENTRY: + return OperationType.READ_ENTRY; + case BookieProtocol.AUTH: + return OperationType.AUTH; + case BookieProtocol.READ_LAC: + return OperationType.READ_LAC; + case BookieProtocol.WRITE_LAC: + return OperationType.WRITE_LAC; + case BookieProtocol.GET_BOOKIE_INFO: + return OperationType.GET_BOOKIE_INFO; + default: + throw new IllegalArgumentException("Invalid operation type"); + } + } + + private void readV3Response(final Response response) { final BKPacketHeader header = response.getHeader(); final CompletionValue completionValue = completionObjects.remove(newCompletionKey(header.getTxnId(), @@ -1082,21 +1235,51 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan public void safeRun() { OperationType type = header.getOperation(); switch (type) { - case ADD_ENTRY: - handleAddResponse(response, completionValue); + case ADD_ENTRY: { + AddResponse addResponse = response.getAddResponse(); + StatusCode status = response.getStatus() == StatusCode.EOK ? addResponse.getStatus() : response.getStatus(); + handleAddResponse(addResponse.getLedgerId(), addResponse.getEntryId(), status, completionValue); break; - case READ_ENTRY: - handleReadResponse(response, completionValue); + } + case READ_ENTRY: { + ReadResponse readResponse = response.getReadResponse(); + StatusCode status = response.getStatus() == StatusCode.EOK ? readResponse.getStatus() : response.getStatus(); + ChannelBuffer buffer = ChannelBuffers.buffer(0); + if (readResponse.hasBody()) { + buffer = ChannelBuffers.copiedBuffer(readResponse.getBody().asReadOnlyByteBuffer()); + } + handleReadResponse(readResponse.getLedgerId(), readResponse.getEntryId(), status, buffer, completionValue); break; - case WRITE_LAC: - handleWriteLacResponse(response.getWriteLacResponse(), completionValue); + } + case WRITE_LAC: { + WriteLacResponse writeLacResponse = response.getWriteLacResponse(); + StatusCode status = response.getStatus() == StatusCode.EOK ? writeLacResponse.getStatus() : response.getStatus(); + handleWriteLacResponse(writeLacResponse.getLedgerId(), status, completionValue); break; - case READ_LAC: - handleReadLacResponse(response.getReadLacResponse(), completionValue); + } + case READ_LAC: { + ReadLacResponse readLacResponse = response.getReadLacResponse(); + ChannelBuffer lacBuffer = ChannelBuffers.buffer(0); + ChannelBuffer lastEntryBuffer = ChannelBuffers.buffer(0); + StatusCode status = response.getStatus() == StatusCode.EOK ? readLacResponse.getStatus() : response.getStatus(); + // Thread.dumpStack(); + + if (readLacResponse.hasLacBody()) { + lacBuffer = ChannelBuffers.copiedBuffer(readLacResponse.getLacBody().asReadOnlyByteBuffer()); + } + + if (readLacResponse.hasLastEntryBody()) { + lastEntryBuffer = ChannelBuffers.copiedBuffer(readLacResponse.getLastEntryBody().asReadOnlyByteBuffer()); + } + handleReadLacResponse(readLacResponse.getLedgerId(), status, lacBuffer, lastEntryBuffer, completionValue); break; - case GET_BOOKIE_INFO: - handleGetBookieInfoResponse(response, completionValue); + } + case GET_BOOKIE_INFO: { + GetBookieInfoResponse getBookieInfoResponse = response.getGetBookieInfoResponse(); + StatusCode status = response.getStatus() == StatusCode.EOK ? getBookieInfoResponse.getStatus() : response.getStatus(); + handleGetBookieInfoResponse(getBookieInfoResponse.getFreeDiskSpace(), getBookieInfoResponse.getTotalDiskCapacity(), status, completionValue); break; + } default: LOG.error("Unexpected response, type:{} received from bookie:{}, ignoring", type, addr); @@ -1114,13 +1297,10 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan } } - void handleWriteLacResponse(WriteLacResponse writeLacResponse, CompletionValue completionValue) { + void handleWriteLacResponse(long ledgerId, StatusCode status, CompletionValue completionValue) { // The completion value should always be an instance of an WriteLacCompletion object when we reach here. WriteLacCompletion plc = (WriteLacCompletion)completionValue; - long ledgerId = writeLacResponse.getLedgerId(); - StatusCode status = writeLacResponse.getStatus(); - LOG.debug("Got response for writeLac request from bookie: " + addr + " for ledger: " + ledgerId + " rc: " + status); // convert to BKException code @@ -1133,14 +1313,9 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan plc.cb.writeLacComplete(rcToRet, ledgerId, addr, plc.ctx); } - void handleAddResponse(Response response, CompletionValue completionValue) { + void handleAddResponse(long ledgerId, long entryId, StatusCode status, CompletionValue completionValue) { // The completion value should always be an instance of an AddCompletion object when we reach here. AddCompletion ac = (AddCompletion)completionValue; - AddResponse addResponse = response.getAddResponse(); - - long ledgerId = addResponse.getLedgerId(); - long entryId = addResponse.getEntryId(); - StatusCode status = response.getStatus() == StatusCode.EOK ? addResponse.getStatus() : response.getStatus(); if (LOG.isDebugEnabled()) { LOG.debug("Got response for add request from bookie: " + addr + " for ledger: " + ledgerId + " entry: " @@ -1160,25 +1335,10 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan ac.cb.writeComplete(rcToRet, ledgerId, entryId, addr, ac.ctx); } - void handleReadLacResponse(ReadLacResponse readLacResponse, CompletionValue completionValue) { + void handleReadLacResponse(long ledgerId, StatusCode status, ChannelBuffer lacBuffer, ChannelBuffer lastEntryBuffer, CompletionValue completionValue) { // The completion value should always be an instance of an WriteLacCompletion object when we reach here. ReadLacCompletion glac = (ReadLacCompletion)completionValue; - long ledgerId = readLacResponse.getLedgerId(); - StatusCode status = readLacResponse.getStatus(); - ChannelBuffer lacBuffer = ChannelBuffers.buffer(0); - ChannelBuffer lastEntryBuffer = ChannelBuffers.buffer(0); - - // Thread.dumpStack(); - - if (readLacResponse.hasLacBody()) { - lacBuffer = ChannelBuffers.copiedBuffer(readLacResponse.getLacBody().asReadOnlyByteBuffer()); - } - - if (readLacResponse.hasLastEntryBody()) { - lastEntryBuffer = ChannelBuffers.copiedBuffer(readLacResponse.getLastEntryBody().asReadOnlyByteBuffer()); - } - LOG.debug("Got response for readLac request from bookie: " + addr + " for ledger: " + ledgerId + " rc: " + status); // convert to BKException code Integer rcToRet = statusCodeToExceptionCode(status); @@ -1190,20 +1350,10 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan glac.cb.readLacComplete(rcToRet, ledgerId, lacBuffer.slice(), lastEntryBuffer.slice(), glac.ctx); } - void handleReadResponse(Response response, CompletionValue completionValue) { + void handleReadResponse(long ledgerId, long entryId, StatusCode status, ChannelBuffer buffer, CompletionValue completionValue) { // The completion value should always be an instance of a ReadCompletion object when we reach here. ReadCompletion rc = (ReadCompletion)completionValue; - ReadResponse readResponse = response.getReadResponse(); - - long ledgerId = readResponse.getLedgerId(); - long entryId = readResponse.getEntryId(); - StatusCode status = response.getStatus() == StatusCode.EOK ? readResponse.getStatus() : response.getStatus(); - ChannelBuffer buffer = ChannelBuffers.buffer(0); - - if (readResponse.hasBody()) { - buffer = ChannelBuffers.copiedBuffer(readResponse.getBody().asReadOnlyByteBuffer()); - } if (LOG.isDebugEnabled()) { LOG.debug("Got response for read request from bookie: " + addr + " for ledger: " + ledgerId + " entry: " @@ -1219,18 +1369,15 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan new Object[] { ledgerId, entryId, addr, status }); rcToRet = BKException.Code.ReadException; } - rc.cb.readEntryComplete(rcToRet, ledgerId, entryId, buffer.slice(), rc.ctx); + if(buffer != null) { + buffer = buffer.slice(); + } + rc.cb.readEntryComplete(rcToRet, ledgerId, entryId, buffer, rc.ctx); } - void handleGetBookieInfoResponse(Response response, CompletionValue completionValue) { + void handleGetBookieInfoResponse(long freeDiskSpace, long totalDiskCapacity, StatusCode status, CompletionValue completionValue) { // The completion value should always be an instance of a GetBookieInfoCompletion object when we reach here. GetBookieInfoCompletion rc = (GetBookieInfoCompletion)completionValue; - GetBookieInfoResponse getBookieInfoResponse = response.getGetBookieInfoResponse(); - - long freeDiskSpace = getBookieInfoResponse.hasFreeDiskSpace() ? getBookieInfoResponse.getFreeDiskSpace() : 0L; - long totalDiskCapacity = getBookieInfoResponse.hasTotalDiskCapacity() ? getBookieInfoResponse.getTotalDiskCapacity() : 0L; - - StatusCode status = response.getStatus() == StatusCode.EOK ? getBookieInfoResponse.getStatus() : response.getStatus(); if (LOG.isDebugEnabled()) { LOG.debug("Got response for read metadata request from bookie: {} rc {}", addr, rc); @@ -1556,4 +1703,35 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan return txnIdGenerator.incrementAndGet(); } + private class V2CompletionKey extends CompletionKey { + final long ledgerId; + final long entryId; + + + public V2CompletionKey(long ledgerId, long entryId, OperationType operationType) { + super(0, operationType); + this.ledgerId = ledgerId; + this.entryId = entryId; + + } + + @Override + public boolean equals(Object object) { + if (!(object instanceof V2CompletionKey)) { + return false; + } + V2CompletionKey that = (V2CompletionKey) object; + return this.entryId == that.entryId && this.ledgerId == that.ledgerId; + } + + @Override + public int hashCode() { + return Objects.hash(ledgerId, entryId); + } + + @Override + public String toString() { + return String.format("%d:%d %s", ledgerId, entryId, operationType); + } + } }