Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 736EF200C5A for ; Tue, 4 Apr 2017 03:22:13 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 71C6E160B9C; Tue, 4 Apr 2017 01:22:13 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id E9C73160B8F for ; Tue, 4 Apr 2017 03:22:11 +0200 (CEST) Received: (qmail 46300 invoked by uid 500); 4 Apr 2017 01:22:11 -0000 Mailing-List: contact commits-help@bookkeeper.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: bookkeeper-dev@bookkeeper.apache.org Delivered-To: mailing list commits@bookkeeper.apache.org Received: (qmail 46290 invoked by uid 99); 4 Apr 2017 01:22:11 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 04 Apr 2017 01:22:11 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 0C25ADFBAB; Tue, 4 Apr 2017 01:22:11 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 8bit From: sijie@apache.org To: commits@bookkeeper.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: =?utf-8?q?bookkeeper_git_commit=3A_BOOKKEEPER-1018=3A_Revert_=22?= =?utf-8?q?=3A_Allow_client_to_select_older_V2_protocol_=28no=E2=80=A6?= Date: Tue, 4 Apr 2017 01:22:11 +0000 (UTC) archived-at: Tue, 04 Apr 2017 01:22:13 -0000 Repository: bookkeeper Updated Branches: refs/heads/master 9001e300c -> f30f60889 BOOKKEEPER-1018: Revert ": Allow client to select older V2 protocol (no… … protobuf)" This reverts commit 9001e300ce0d5d2655d437e3eaa52f91487caed6. I broke trunk - not exactly sure how - I will fix it and put up the PR again. For now I'm reverting the commit. Author: Govind Menon Reviewers: Sijie Guo Closes #124 from govind-menon/BOOKKEEPER-1018-Revert Project: http://git-wip-us.apache.org/repos/asf/bookkeeper/repo Commit: http://git-wip-us.apache.org/repos/asf/bookkeeper/commit/f30f6088 Tree: http://git-wip-us.apache.org/repos/asf/bookkeeper/tree/f30f6088 Diff: http://git-wip-us.apache.org/repos/asf/bookkeeper/diff/f30f6088 Branch: refs/heads/master Commit: f30f60889e0810a47797daf4107c0d9bc2ee998c Parents: 9001e30 Author: Govind Menon Authored: Mon Apr 3 18:22:07 2017 -0700 Committer: Sijie Guo Committed: Mon Apr 3 18:22:07 2017 -0700 ---------------------------------------------------------------------- .../bookkeeper/conf/ClientConfiguration.java | 23 +- .../apache/bookkeeper/proto/AuthHandler.java | 8 - .../apache/bookkeeper/proto/BookieProtocol.java | 3 - .../proto/PerChannelBookieClient.java | 436 ++++++------------- 4 files changed, 134 insertions(+), 336 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/f30f6088/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java index 2b75e9e..ee137c0 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java @@ -57,7 +57,7 @@ public class ClientConfiguration extends AbstractConfiguration { protected final static String CLIENT_WRITEBUFFER_HIGH_WATER_MARK = "clientWriteBufferHighWaterMark"; protected final static String CLIENT_CONNECT_TIMEOUT_MILLIS = "clientConnectTimeoutMillis"; protected final static String NUM_CHANNELS_PER_BOOKIE = "numChannelsPerBookie"; - protected final static String USE_V2_WIRE_PROTOCOL = "useV2WireProtocol"; + // Read Parameters protected final static String READ_TIMEOUT = "readTimeout"; protected final static String SPECULATIVE_READ_TIMEOUT = "speculativeReadTimeout"; @@ -436,27 +436,6 @@ public class ClientConfiguration extends AbstractConfiguration { } /** - * Use older Bookkeeper wire protocol (no protobuf) - * - * @return whether or not to use older Bookkeeper wire protocol (no protobuf) - */ - public boolean getUseV2WireProtocol() { - return getBoolean(USE_V2_WIRE_PROTOCOL, false); - } - - /** - * Set whether or not to use older Bookkeeper wire protocol (no protobuf) - * - * @param useV2WireProtocol - * whether or not to use older Bookkeeper wire protocol (no protobuf) - * @return client configuration. - */ - public ClientConfiguration setUseV2WireProtocol(boolean useV2WireProtocol) { - setProperty(USE_V2_WIRE_PROTOCOL, useV2WireProtocol); - return this; - } - - /** * Get zookeeper servers to connect * * @return zookeeper servers http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/f30f6088/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/AuthHandler.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/AuthHandler.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/AuthHandler.java index d2608e7..75dced5 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/AuthHandler.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/AuthHandler.java @@ -322,14 +322,6 @@ class AuthHandler { } else { waitingForAuth.add(e); } - } else if (e.getMessage() instanceof BookieProtocol.Request) { - // let auth messages through, queue the rest - BookieProtocol.Request req = (BookieProtocol.Request)e.getMessage(); - if (BookkeeperProtocol.OperationType.AUTH.getNumber() == req.getOpCode()) { - super.writeRequested(ctx, e); - } else { - waitingForAuth.add(e); - } } // else just drop } } http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/f30f6088/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java index 1191d3c..2ce5ed8 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java @@ -140,9 +140,6 @@ public interface BookieProtocol { * by the auth providers themselves. */ public static final byte AUTH = 3; - public static final byte READ_LAC = 4; - public static final byte WRITE_LAC = 5; - public static final byte GET_BOOKIE_INFO = 6; /** * The error code that indicates success http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/f30f6088/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java index a24bb1e..f6e9e8f 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java @@ -21,10 +21,8 @@ import java.io.IOException; import java.nio.channels.ClosedChannelException; import java.util.ArrayDeque; import java.util.Collections; -import java.util.Collection; import java.util.Queue; import java.util.Set; -import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; @@ -100,7 +98,7 @@ import com.google.common.collect.Sets; import com.google.protobuf.ByteString; import com.google.protobuf.ExtensionRegistry; import java.net.SocketAddress; - +import java.util.Collection; import org.apache.bookkeeper.auth.BookKeeperPrincipal; import org.jboss.netty.channel.ChannelFactory; import org.apache.bookkeeper.client.ClientConnectionPeer; @@ -149,8 +147,6 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan private final OpStatsLogger getBookieInfoOpLogger; private final OpStatsLogger getBookieInfoTimeoutOpLogger; - private final boolean useV2WireProtocol; - /** * The following member variables do not need to be concurrent, or volatile * because they are always updated under a lock @@ -206,7 +202,6 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan this.addEntryTimeout = conf.getAddEntryTimeout(); this.readEntryTimeout = conf.getReadEntryTimeout(); this.getBookieInfoTimeout = conf.getBookieInfoTimeout(); - this.useV2WireProtocol = conf.getUseV2WireProtocol(); this.authProviderFactory = authProviderFactory; this.extRegistry = extRegistry; @@ -474,45 +469,33 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan */ void addEntry(final long ledgerId, byte[] masterKey, final long entryId, ChannelBuffer toSend, WriteCallback cb, Object ctx, final int options) { - Object request = null; - CompletionKey completion = null; - if (useV2WireProtocol) { - completion = new V2CompletionKey(ledgerId, entryId, OperationType.ADD_ENTRY); - request = new BookieProtocol.AddRequest(BookieProtocol.CURRENT_PROTOCOL_VERSION, ledgerId, entryId, - (short) options, masterKey, toSend); - + final long txnId = getTxnId(); + final int entrySize = toSend.readableBytes(); + final CompletionKey completionKey = new CompletionKey(txnId, OperationType.ADD_ENTRY); + completionObjects.put(completionKey, + new AddCompletion(this, addEntryOpLogger, cb, ctx, ledgerId, entryId, + scheduleTimeout(completionKey, addEntryTimeout))); - } else { - final long txnId = getTxnId(); - completion = new CompletionKey(txnId, OperationType.ADD_ENTRY); - // Build the request and calculate the total size to be included in the packet. - BKPacketHeader.Builder headerBuilder = BKPacketHeader.newBuilder() - .setVersion(ProtocolVersion.VERSION_THREE) - .setOperation(OperationType.ADD_ENTRY) - .setTxnId(txnId); - - AddRequest.Builder addBuilder = AddRequest.newBuilder() - .setLedgerId(ledgerId) - .setEntryId(entryId) - .setMasterKey(ByteString.copyFrom(masterKey)) - .setBody(ByteString.copyFrom(toSend.toByteBuffer())); - - if (((short) options & BookieProtocol.FLAG_RECOVERY_ADD) == BookieProtocol.FLAG_RECOVERY_ADD) { - addBuilder.setFlag(AddRequest.Flag.RECOVERY_ADD); - } - request = Request.newBuilder() - .setHeader(headerBuilder) - .setAddRequest(addBuilder) - .build(); - } + // Build the request and calculate the total size to be included in the packet. + BKPacketHeader.Builder headerBuilder = BKPacketHeader.newBuilder() + .setVersion(ProtocolVersion.VERSION_THREE) + .setOperation(OperationType.ADD_ENTRY) + .setTxnId(txnId); - final Object addRequest = request; - final CompletionKey completionKey = completion; + AddRequest.Builder addBuilder = AddRequest.newBuilder() + .setLedgerId(ledgerId) + .setEntryId(entryId) + .setMasterKey(ByteString.copyFrom(masterKey)) + .setBody(ByteString.copyFrom(toSend.toByteBuffer())); - completionObjects.put(completionKey, new AddCompletion(this, - addEntryOpLogger, cb, ctx, ledgerId, entryId, scheduleTimeout(completion, addEntryTimeout))); + if (((short)options & BookieProtocol.FLAG_RECOVERY_ADD) == BookieProtocol.FLAG_RECOVERY_ADD) { + addBuilder.setFlag(AddRequest.Flag.RECOVERY_ADD); + } - final int entrySize = toSend.readableBytes(); + final Request addRequest = Request.newBuilder() + .setHeader(headerBuilder) + .setAddRequest(addBuilder) + .build(); final Channel c = channel; if (c == null) { @@ -548,44 +531,28 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan public void readEntryAndFenceLedger(final long ledgerId, byte[] masterKey, final long entryId, ReadEntryCallback cb, Object ctx) { - Object request = null; - CompletionKey completion = null; - if (useV2WireProtocol) { - completion = new V2CompletionKey(ledgerId, entryId, OperationType.READ_ENTRY); - request = new BookieProtocol.ReadRequest(BookieProtocol.CURRENT_PROTOCOL_VERSION, ledgerId, entryId, - BookieProtocol.FLAG_DO_FENCING, masterKey); - } else { - final long txnId = getTxnId(); - final CompletionKey completionKey = new CompletionKey(txnId, OperationType.READ_ENTRY); - completionObjects.put(completionKey, - new ReadCompletion(this, readEntryOpLogger, cb, ctx, ledgerId, entryId, - scheduleTimeout(completionKey, readEntryTimeout))); - - // Build the request and calculate the total size to be included in the packet. - BKPacketHeader.Builder headerBuilder = BKPacketHeader.newBuilder() - .setVersion(ProtocolVersion.VERSION_THREE) - .setOperation(OperationType.READ_ENTRY) - .setTxnId(txnId); - - ReadRequest.Builder readBuilder = ReadRequest.newBuilder() - .setLedgerId(ledgerId) - .setEntryId(entryId) - .setMasterKey(ByteString.copyFrom(masterKey)) - .setFlag(ReadRequest.Flag.FENCE_LEDGER); - - final Request readRequest = Request.newBuilder() - .setHeader(headerBuilder) - .setReadRequest(readBuilder) - .build(); - } - - final CompletionKey completionKey = completion; - if (completionObjects.putIfAbsent(completionKey, new ReadCompletion(this, readEntryOpLogger, cb, - ctx, ledgerId, entryId, scheduleTimeout(completionKey, readEntryTimeout))) != null) { - // We cannot have more than 1 pending read on the same ledger/entry in the v2 protocol - cb.readEntryComplete(BKException.Code.BookieHandleNotAvailableException, ledgerId, entryId, null, ctx); - return; - } + final long txnId = getTxnId(); + final CompletionKey completionKey = new CompletionKey(txnId, OperationType.READ_ENTRY); + completionObjects.put(completionKey, + new ReadCompletion(this, readEntryOpLogger, cb, ctx, ledgerId, entryId, + scheduleTimeout(completionKey, readEntryTimeout))); + + // Build the request and calculate the total size to be included in the packet. + BKPacketHeader.Builder headerBuilder = BKPacketHeader.newBuilder() + .setVersion(ProtocolVersion.VERSION_THREE) + .setOperation(OperationType.READ_ENTRY) + .setTxnId(txnId); + + ReadRequest.Builder readBuilder = ReadRequest.newBuilder() + .setLedgerId(ledgerId) + .setEntryId(entryId) + .setMasterKey(ByteString.copyFrom(masterKey)) + .setFlag(ReadRequest.Flag.FENCE_LEDGER); + + final Request readRequest = Request.newBuilder() + .setHeader(headerBuilder) + .setReadRequest(readBuilder) + .build(); final Channel c = channel; if (c == null) { @@ -593,7 +560,6 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan return; } - final Object readRequest = request; try { ChannelFuture future = c.write(readRequest); future.addListener(new ChannelFutureListener() { @@ -620,34 +586,22 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan } public void readLac(final long ledgerId, ReadLacCallback cb, Object ctx) { - Object request = null; - CompletionKey completion = null; - if (useV2WireProtocol) { - request = new BookieProtocol.ReadRequest(BookieProtocol.CURRENT_PROTOCOL_VERSION, - ledgerId, (long) 0, (short) 0); - completion = new V2CompletionKey(ledgerId, (long) 0, OperationType.READ_LAC); - } else { - final long txnId = getTxnId(); - final CompletionKey completionKey = new CompletionKey(txnId, OperationType.READ_LAC); - - // Build the request and calculate the total size to be included in the packet. - BKPacketHeader.Builder headerBuilder = BKPacketHeader.newBuilder() - .setVersion(ProtocolVersion.VERSION_THREE) - .setOperation(OperationType.READ_LAC) - .setTxnId(txnId); - ReadLacRequest.Builder readLacBuilder = ReadLacRequest.newBuilder() - .setLedgerId(ledgerId); - request = Request.newBuilder() - .setHeader(headerBuilder) - .setReadLacRequest(readLacBuilder) - .build(); - } - final Object readLacRequest = request; - final CompletionKey completionKey = completion; - + final long txnId = getTxnId(); + final CompletionKey completionKey = new CompletionKey(txnId, OperationType.READ_LAC); completionObjects.put(completionKey, new ReadLacCompletion(readLacOpLogger, cb, ctx, ledgerId, scheduleTimeout(completionKey, readEntryTimeout))); + // Build the request and calculate the total size to be included in the packet. + BKPacketHeader.Builder headerBuilder = BKPacketHeader.newBuilder() + .setVersion(ProtocolVersion.VERSION_THREE) + .setOperation(OperationType.READ_LAC) + .setTxnId(txnId); + ReadLacRequest.Builder readLacBuilder = ReadLacRequest.newBuilder() + .setLedgerId(ledgerId); + final Request readLacRequest = Request.newBuilder() + .setHeader(headerBuilder) + .setReadLacRequest(readLacBuilder) + .build(); final Channel c = channel; if (c == null) { errorOutReadLacKey(completionKey); @@ -678,37 +632,27 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan } public void readEntry(final long ledgerId, final long entryId, ReadEntryCallback cb, Object ctx) { - Object request = null; - CompletionKey completion = null; - if (useV2WireProtocol) { - request = new BookieProtocol.ReadRequest(BookieProtocol.CURRENT_PROTOCOL_VERSION, - ledgerId, entryId, (short) 0); - completion = new V2CompletionKey(ledgerId, entryId, OperationType.READ_ENTRY); - } else { - final long txnId = getTxnId(); - completion = new CompletionKey(txnId, OperationType.READ_ENTRY); + final long txnId = getTxnId(); + final CompletionKey completionKey = new CompletionKey(txnId, OperationType.READ_ENTRY); + completionObjects.put(completionKey, + new ReadCompletion(this, readEntryOpLogger, cb, ctx, ledgerId, entryId, + scheduleTimeout(completionKey, readEntryTimeout))); - // Build the request and calculate the total size to be included in the packet. - BKPacketHeader.Builder headerBuilder = BKPacketHeader.newBuilder() - .setVersion(ProtocolVersion.VERSION_THREE) - .setOperation(OperationType.READ_ENTRY) - .setTxnId(txnId); + // Build the request and calculate the total size to be included in the packet. + BKPacketHeader.Builder headerBuilder = BKPacketHeader.newBuilder() + .setVersion(ProtocolVersion.VERSION_THREE) + .setOperation(OperationType.READ_ENTRY) + .setTxnId(txnId); - ReadRequest.Builder readBuilder = ReadRequest.newBuilder() - .setLedgerId(ledgerId) - .setEntryId(entryId); + ReadRequest.Builder readBuilder = ReadRequest.newBuilder() + .setLedgerId(ledgerId) + .setEntryId(entryId); - final Request readRequest = Request.newBuilder() - .setHeader(headerBuilder) - .setReadRequest(readBuilder) - .build(); - } - final Object readRequest = request; - final CompletionKey completionKey = completion; + final Request readRequest = Request.newBuilder() + .setHeader(headerBuilder) + .setReadRequest(readBuilder) + .build(); - completionObjects.put(completionKey, - new ReadCompletion(this, readEntryOpLogger, cb, ctx, ledgerId, entryId, - scheduleTimeout(completionKey, readEntryTimeout))); final Channel c = channel; if (c == null) { errorOutReadKey(completionKey); @@ -859,7 +803,6 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan } void errorOutReadKey(final CompletionKey key, final int rc) { - LOG.info("Removing completion key: {}", key); final ReadCompletion readCompletion = (ReadCompletion)completionObjects.remove(key); if (null == readCompletion) { return; @@ -892,7 +835,6 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan } void errorOutWriteLacKey(final CompletionKey key, final int rc) { - LOG.info("Removing completion key: {}", key); final WriteLacCompletion writeLacCompletion = (WriteLacCompletion)completionObjects.remove(key); if (null == writeLacCompletion) { return; @@ -917,7 +859,6 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan } void errorOutReadLacKey(final CompletionKey key, final int rc) { - LOG.info("Removing completion key: {}", key); final ReadLacCompletion readLacCompletion = (ReadLacCompletion)completionObjects.remove(key); if (null == readLacCompletion) { return; @@ -1117,101 +1058,12 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan */ @Override public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception { - - if (e.getMessage() instanceof BookieProtocol.Response) { - BookieProtocol.Response response = (BookieProtocol.Response) e.getMessage(); - readV2Response(response); - } else if (e.getMessage() instanceof Response) { - Response response = (Response) e.getMessage(); - readV3Response(response); - } else { + if (!(e.getMessage() instanceof Response)) { ctx.sendUpstream(e); + return; } - } - - private void readV2Response(final BookieProtocol.Response response) { - final long ledgerId = response.ledgerId; - final long entryId = response.entryId; - - final OperationType operationType = getOperationType(response.getOpCode()); - final StatusCode status = getStatusCodeFromErrorCode(response.errorCode); - - final CompletionValue completionValue = completionObjects.remove(new V2CompletionKey(ledgerId, entryId, operationType)); - - if (null == completionValue) { - // Unexpected response, so log it. The txnId should have been present. - if (LOG.isDebugEnabled()) { - LOG.debug("Unexpected response received from bookie : " + addr + " for type : " + operationType - + " and ledger:entry : " + ledgerId + ":" + entryId); - } - } else { - long orderingKey = completionValue.ledgerId; - - executor.submitOrdered(orderingKey, new SafeRunnable() { - @Override - public void safeRun() { - switch (operationType) { - case ADD_ENTRY: { - handleAddResponse(ledgerId, entryId, status, completionValue); - } - case READ_ENTRY: { - BookieProtocol.ReadResponse readResponse = (BookieProtocol.ReadResponse) response; - handleReadResponse(ledgerId, entryId, status, readResponse.getData(), completionValue); - } - default: - LOG.error("Unexpected response, type:{} received from bookie:{}, ignoring", operationType, addr); - break; - } - } - }); - } - } - private StatusCode getStatusCodeFromErrorCode(int errorCode) { - switch (errorCode) { - case BookieProtocol.EOK: - return StatusCode.EOK; - case BookieProtocol.ENOLEDGER: - return StatusCode.ENOLEDGER; - case BookieProtocol.ENOENTRY: - return StatusCode.ENOENTRY; - case BookieProtocol.EBADREQ: - return StatusCode.EBADREQ; - case BookieProtocol.EIO: - return StatusCode.EIO; - case BookieProtocol.EUA: - return StatusCode.EUA; - case BookieProtocol.EBADVERSION: - return StatusCode.EBADVERSION; - case BookieProtocol.EFENCED: - return StatusCode.EFENCED; - case BookieProtocol.EREADONLY: - return StatusCode.EREADONLY; - default: - throw new IllegalArgumentException("Invalid error code: " + errorCode); - } - } - - private OperationType getOperationType(byte opCode) { - switch (opCode) { - case BookieProtocol.ADDENTRY: - return OperationType.ADD_ENTRY; - case BookieProtocol.READENTRY: - return OperationType.READ_ENTRY; - case BookieProtocol.AUTH: - return OperationType.AUTH; - case BookieProtocol.READ_LAC: - return OperationType.READ_LAC; - case BookieProtocol.WRITE_LAC: - return OperationType.WRITE_LAC; - case BookieProtocol.GET_BOOKIE_INFO: - return OperationType.GET_BOOKIE_INFO; - default: - throw new IllegalArgumentException("Invalid operation type"); - } - } - - private void readV3Response(final Response response) { + final Response response = (Response) e.getMessage(); final BKPacketHeader header = response.getHeader(); final CompletionValue completionValue = completionObjects.remove(newCompletionKey(header.getTxnId(), @@ -1230,51 +1082,21 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan public void safeRun() { OperationType type = header.getOperation(); switch (type) { - case ADD_ENTRY: { - AddResponse addResponse = response.getAddResponse(); - StatusCode status = response.getStatus() == StatusCode.EOK ? addResponse.getStatus() : response.getStatus(); - handleAddResponse(addResponse.getLedgerId(), addResponse.getEntryId(), status, completionValue); + case ADD_ENTRY: + handleAddResponse(response, completionValue); break; - } - case READ_ENTRY: { - ReadResponse readResponse = response.getReadResponse(); - StatusCode status = response.getStatus() == StatusCode.EOK ? readResponse.getStatus() : response.getStatus(); - ChannelBuffer buffer = ChannelBuffers.buffer(0); - if (readResponse.hasBody()) { - buffer = ChannelBuffers.copiedBuffer(readResponse.getBody().asReadOnlyByteBuffer()); - } - handleReadResponse(readResponse.getLedgerId(), readResponse.getEntryId(), status, buffer, completionValue); + case READ_ENTRY: + handleReadResponse(response, completionValue); break; - } - case WRITE_LAC: { - WriteLacResponse writeLacResponse = response.getWriteLacResponse(); - StatusCode status = response.getStatus() == StatusCode.EOK ? writeLacResponse.getStatus() : response.getStatus(); - handleWriteLacResponse(writeLacResponse.getLedgerId(), status, completionValue); + case WRITE_LAC: + handleWriteLacResponse(response.getWriteLacResponse(), completionValue); break; - } - case READ_LAC: { - ReadLacResponse readLacResponse = response.getReadLacResponse(); - ChannelBuffer lacBuffer = ChannelBuffers.buffer(0); - ChannelBuffer lastEntryBuffer = ChannelBuffers.buffer(0); - StatusCode status = response.getStatus() == StatusCode.EOK ? readLacResponse.getStatus() : response.getStatus(); - // Thread.dumpStack(); - - if (readLacResponse.hasLacBody()) { - lacBuffer = ChannelBuffers.copiedBuffer(readLacResponse.getLacBody().asReadOnlyByteBuffer()); - } - - if (readLacResponse.hasLastEntryBody()) { - lastEntryBuffer = ChannelBuffers.copiedBuffer(readLacResponse.getLastEntryBody().asReadOnlyByteBuffer()); - } - handleReadLacResponse(readLacResponse.getLedgerId(), status, lacBuffer, lastEntryBuffer, completionValue); + case READ_LAC: + handleReadLacResponse(response.getReadLacResponse(), completionValue); break; - } - case GET_BOOKIE_INFO: { - GetBookieInfoResponse getBookieInfoResponse = response.getGetBookieInfoResponse(); - StatusCode status = response.getStatus() == StatusCode.EOK ? getBookieInfoResponse.getStatus() : response.getStatus(); - handleGetBookieInfoResponse(getBookieInfoResponse.getFreeDiskSpace(), getBookieInfoResponse.getTotalDiskCapacity(), status, completionValue); + case GET_BOOKIE_INFO: + handleGetBookieInfoResponse(response, completionValue); break; - } default: LOG.error("Unexpected response, type:{} received from bookie:{}, ignoring", type, addr); @@ -1292,10 +1114,13 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan } } - void handleWriteLacResponse(long ledgerId, StatusCode status, CompletionValue completionValue) { + void handleWriteLacResponse(WriteLacResponse writeLacResponse, CompletionValue completionValue) { // The completion value should always be an instance of an WriteLacCompletion object when we reach here. WriteLacCompletion plc = (WriteLacCompletion)completionValue; + long ledgerId = writeLacResponse.getLedgerId(); + StatusCode status = writeLacResponse.getStatus(); + LOG.debug("Got response for writeLac request from bookie: " + addr + " for ledger: " + ledgerId + " rc: " + status); // convert to BKException code @@ -1308,9 +1133,14 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan plc.cb.writeLacComplete(rcToRet, ledgerId, addr, plc.ctx); } - void handleAddResponse(long ledgerId, long entryId, StatusCode status, CompletionValue completionValue) { + void handleAddResponse(Response response, CompletionValue completionValue) { // The completion value should always be an instance of an AddCompletion object when we reach here. AddCompletion ac = (AddCompletion)completionValue; + AddResponse addResponse = response.getAddResponse(); + + long ledgerId = addResponse.getLedgerId(); + long entryId = addResponse.getEntryId(); + StatusCode status = response.getStatus() == StatusCode.EOK ? addResponse.getStatus() : response.getStatus(); if (LOG.isDebugEnabled()) { LOG.debug("Got response for add request from bookie: " + addr + " for ledger: " + ledgerId + " entry: " @@ -1330,10 +1160,25 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan ac.cb.writeComplete(rcToRet, ledgerId, entryId, addr, ac.ctx); } - void handleReadLacResponse(long ledgerId, StatusCode status, ChannelBuffer lacBuffer, ChannelBuffer lastEntryBuffer, CompletionValue completionValue) { + void handleReadLacResponse(ReadLacResponse readLacResponse, CompletionValue completionValue) { // The completion value should always be an instance of an WriteLacCompletion object when we reach here. ReadLacCompletion glac = (ReadLacCompletion)completionValue; + long ledgerId = readLacResponse.getLedgerId(); + StatusCode status = readLacResponse.getStatus(); + ChannelBuffer lacBuffer = ChannelBuffers.buffer(0); + ChannelBuffer lastEntryBuffer = ChannelBuffers.buffer(0); + + // Thread.dumpStack(); + + if (readLacResponse.hasLacBody()) { + lacBuffer = ChannelBuffers.copiedBuffer(readLacResponse.getLacBody().asReadOnlyByteBuffer()); + } + + if (readLacResponse.hasLastEntryBody()) { + lastEntryBuffer = ChannelBuffers.copiedBuffer(readLacResponse.getLastEntryBody().asReadOnlyByteBuffer()); + } + LOG.debug("Got response for readLac request from bookie: " + addr + " for ledger: " + ledgerId + " rc: " + status); // convert to BKException code Integer rcToRet = statusCodeToExceptionCode(status); @@ -1345,10 +1190,20 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan glac.cb.readLacComplete(rcToRet, ledgerId, lacBuffer.slice(), lastEntryBuffer.slice(), glac.ctx); } - void handleReadResponse(long ledgerId, long entryId, StatusCode status, ChannelBuffer buffer, CompletionValue completionValue) { + void handleReadResponse(Response response, CompletionValue completionValue) { // The completion value should always be an instance of a ReadCompletion object when we reach here. ReadCompletion rc = (ReadCompletion)completionValue; + ReadResponse readResponse = response.getReadResponse(); + + long ledgerId = readResponse.getLedgerId(); + long entryId = readResponse.getEntryId(); + StatusCode status = response.getStatus() == StatusCode.EOK ? readResponse.getStatus() : response.getStatus(); + ChannelBuffer buffer = ChannelBuffers.buffer(0); + + if (readResponse.hasBody()) { + buffer = ChannelBuffers.copiedBuffer(readResponse.getBody().asReadOnlyByteBuffer()); + } if (LOG.isDebugEnabled()) { LOG.debug("Got response for read request from bookie: " + addr + " for ledger: " + ledgerId + " entry: " @@ -1367,9 +1222,15 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan rc.cb.readEntryComplete(rcToRet, ledgerId, entryId, buffer.slice(), rc.ctx); } - void handleGetBookieInfoResponse(long freeDiskSpace, long totalDiskCapacity, StatusCode status, CompletionValue completionValue) { + void handleGetBookieInfoResponse(Response response, CompletionValue completionValue) { // The completion value should always be an instance of a GetBookieInfoCompletion object when we reach here. GetBookieInfoCompletion rc = (GetBookieInfoCompletion)completionValue; + GetBookieInfoResponse getBookieInfoResponse = response.getGetBookieInfoResponse(); + + long freeDiskSpace = getBookieInfoResponse.hasFreeDiskSpace() ? getBookieInfoResponse.getFreeDiskSpace() : 0L; + long totalDiskCapacity = getBookieInfoResponse.hasTotalDiskCapacity() ? getBookieInfoResponse.getTotalDiskCapacity() : 0L; + + StatusCode status = response.getStatus() == StatusCode.EOK ? getBookieInfoResponse.getStatus() : response.getStatus(); if (LOG.isDebugEnabled()) { LOG.debug("Got response for read metadata request from bookie: {} rc {}", addr, rc); @@ -1695,35 +1556,4 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan return txnIdGenerator.incrementAndGet(); } - private class V2CompletionKey extends CompletionKey { - final long ledgerId; - final long entryId; - - - public V2CompletionKey(long ledgerId, long entryId, OperationType operationType) { - super(0, operationType); - this.ledgerId = ledgerId; - this.entryId = entryId; - - } - - @Override - public boolean equals(Object object) { - if (!(object instanceof V2CompletionKey)) { - return false; - } - V2CompletionKey that = (V2CompletionKey) object; - return this.entryId == that.entryId && this.ledgerId == that.ledgerId; - } - - @Override - public int hashCode() { - return Objects.hash(ledgerId, entryId); - } - - @Override - public String toString() { - return String.format("%d:%d %s", ledgerId, entryId, operationType); - } - } }