Return-Path: X-Original-To: apmail-zookeeper-commits-archive@www.apache.org Delivered-To: apmail-zookeeper-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 8A9A31159E for ; Thu, 24 Jul 2014 22:34:51 +0000 (UTC) Received: (qmail 16686 invoked by uid 500); 24 Jul 2014 22:34:51 -0000 Delivered-To: apmail-zookeeper-commits-archive@zookeeper.apache.org Received: (qmail 16656 invoked by uid 500); 24 Jul 2014 22:34:51 -0000 Mailing-List: contact commits-help@zookeeper.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ Delivered-To: mailing list commits@zookeeper.apache.org Received: (qmail 16645 invoked by uid 99); 24 Jul 2014 22:34:51 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 24 Jul 2014 22:34:51 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 24 Jul 2014 22:34:48 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id C138E2388906; Thu, 24 Jul 2014 22:34:21 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1613315 [1/4] - in /zookeeper/bookkeeper/trunk: ./ bookkeeper-server/ bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ bookkeeper-server/src/main/java/org/apache/bookkeeper/processor/ bookkeeper-server/src/main/java/org/apache... Date: Thu, 24 Jul 2014 22:34:20 -0000 To: commits@zookeeper.apache.org From: fpj@apache.org X-Mailer: svnmailer-1.0.9 Message-Id: <20140724223421.C138E2388906@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: fpj Date: Thu Jul 24 22:34:19 2014 New Revision: 1613315 URL: http://svn.apache.org/r1613315 Log: BOOKKEEPER-582: Make bookie and client use protobuf for requests (non-wire part) (sijie via fpj) Added: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookkeeperProtocol.java zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBase.java zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBaseV3.java zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessor.java zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessorV3.java zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessor.java zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3.java zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/proto/BookkeeperProtocol.proto Removed: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestProtoVersions.java Modified: zookeeper/bookkeeper/trunk/CHANGES.txt zookeeper/bookkeeper/trunk/bookkeeper-server/pom.xml zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerChecker.java zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/processor/RequestProcessor.java zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestHandler.java zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/DefaultPerChannelBookieClientPool.java zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClientPool.java zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/resources/findbugsExclude.xml zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorPeriodicCheckTest.java zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieClientTest.java zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/TestBackwardCompat.java zookeeper/bookkeeper/trunk/compat-deps/pom.xml zookeeper/bookkeeper/trunk/hedwig-server/pom.xml zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/TestBackwardCompat.java Modified: zookeeper/bookkeeper/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/CHANGES.txt?rev=1613315&r1=1613314&r2=1613315&view=diff ============================================================================== --- zookeeper/bookkeeper/trunk/CHANGES.txt (original) +++ zookeeper/bookkeeper/trunk/CHANGES.txt Thu Jul 24 22:34:19 2014 @@ -303,6 +303,9 @@ Trunk (unreleased changes) BOOKKEEPER-562: Ability to tell if a ledger is closed or not (fpj) BOOKKEEPER-257: Ability to list all ledgers (fpj via ivank) + + BOOKKEEPER-582: Make bookie and client use protobuf for requests (non-wire part) + (sijie via fpj) Release 4.2.0 - 2013-01-14 Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/pom.xml URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/pom.xml?rev=1613315&r1=1613314&r2=1613315&view=diff ============================================================================== --- zookeeper/bookkeeper/trunk/bookkeeper-server/pom.xml (original) +++ zookeeper/bookkeeper/trunk/bookkeeper-server/pom.xml Thu Jul 24 22:34:19 2014 @@ -161,6 +161,18 @@ + + org.apache.bookkeeper + bookkeeper-server-compat420 + 4.2.0 + test + + + org.apache.bookkeeper + bookkeeper-server + + + @@ -249,6 +261,7 @@ **/DataFormats.java + **/BookkeeperProtocol.java Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerChecker.java URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerChecker.java?rev=1613315&r1=1613314&r2=1613315&view=diff ============================================================================== --- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerChecker.java (original) +++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerChecker.java Thu Jul 24 22:34:19 2014 @@ -127,7 +127,8 @@ public class LedgerChecker { public void readEntryComplete(int rc, long ledgerId, long entryId, ChannelBuffer buffer, Object ctx) { - if (rc != BKException.Code.NoSuchEntryException) { + if (BKException.Code.NoSuchEntryException != rc && + BKException.Code.NoSuchLedgerExistsException != rc) { entryMayExist.set(true); } Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java?rev=1613315&r1=1613314&r2=1613315&view=diff ============================================================================== --- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java (original) +++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java Thu Jul 24 22:34:19 2014 @@ -186,15 +186,19 @@ class PendingReadOp implements Enumerati synchronized void logErrorAndReattemptRead(BookieSocketAddress host, String errMsg, int rc) { if (BKException.Code.OK == firstError || - BKException.Code.NoSuchEntryException == firstError) { + BKException.Code.NoSuchEntryException == firstError || + BKException.Code.NoSuchLedgerExistsException == firstError) { firstError = rc; } else if (BKException.Code.BookieHandleNotAvailableException == firstError && - BKException.Code.NoSuchEntryException != rc) { - // if other exception rather than NoSuchEntryException is returned - // we need to update firstError to indicate that it might be a valid read but just failed. + BKException.Code.NoSuchEntryException != rc && + BKException.Code.NoSuchLedgerExistsException != rc) { + // if other exception rather than NoSuchEntryException or NoSuchLedgerExistsException is + // returned we need to update firstError to indicate that it might be a valid read but just + // failed. firstError = rc; } - if (BKException.Code.NoSuchEntryException == rc) { + if (BKException.Code.NoSuchEntryException == rc || + BKException.Code.NoSuchLedgerExistsException == rc) { ++numMissedEntryReads; LOG.debug("No such entry found on bookie. L{} E{} bookie: {}", new Object[] { lh.ledgerId, entryId, host }); Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/processor/RequestProcessor.java URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/processor/RequestProcessor.java?rev=1613315&r1=1613314&r2=1613315&view=diff ============================================================================== --- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/processor/RequestProcessor.java (original) +++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/processor/RequestProcessor.java Thu Jul 24 22:34:19 2014 @@ -20,7 +20,6 @@ */ package org.apache.bookkeeper.processor; -import org.apache.bookkeeper.proto.BookieProtocol; import org.jboss.netty.channel.Channel; public interface RequestProcessor { @@ -38,6 +37,6 @@ public interface RequestProcessor { * @param channel * channel received the given request r */ - public void processRequest(BookieProtocol.Request r, Channel channel); + public void processRequest(Object r, Channel channel); } Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java?rev=1613315&r1=1613314&r2=1613315&view=diff ============================================================================== --- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java (original) +++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java Thu Jul 24 22:34:19 2014 @@ -118,9 +118,11 @@ public class BookieClient implements Per PerChannelBookieClientPool oldClientPool = channels.putIfAbsent(addr, newClientPool); if (null == oldClientPool) { clientPool = newClientPool; + // initialize the pool only after we put the pool into the map + clientPool.intialize(); } else { clientPool = oldClientPool; - newClientPool.close(); + newClientPool.close(false); } } finally { closeLock.readLock().unlock(); @@ -130,10 +132,10 @@ public class BookieClient implements Per } public void closeClients(Set addrs) { + final HashSet clients = + new HashSet(); closeLock.readLock().lock(); try { - final HashSet clients = - new HashSet(); for (BookieSocketAddress a : addrs) { PerChannelBookieClientPool c = channels.get(a); if (c != null) { @@ -144,17 +146,12 @@ public class BookieClient implements Per if (clients.size() == 0) { return; } - executor.submit(new SafeRunnable() { - @Override - public void safeRun() { - for (PerChannelBookieClientPool c : clients) { - c.disconnect(); - } - } - }); } finally { closeLock.readLock().unlock(); } + for (PerChannelBookieClientPool c : clients) { + c.disconnect(false); + } } public void addEntry(final BookieSocketAddress addr, final long ledgerId, final byte[] masterKey, @@ -279,7 +276,7 @@ public class BookieClient implements Per try { closed = true; for (PerChannelBookieClientPool pool : channels.values()) { - pool.close(); + pool.close(true); } channels.clear(); } finally { Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java?rev=1613315&r1=1613314&r2=1613315&view=diff ============================================================================== --- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java (original) +++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java Thu Jul 24 22:34:19 2014 @@ -20,6 +20,9 @@ */ package org.apache.bookkeeper.proto; +import com.google.protobuf.InvalidProtocolBufferException; +import org.jboss.netty.buffer.ChannelBufferFactory; +import org.jboss.netty.buffer.ChannelBufferInputStream; import org.jboss.netty.buffer.ChannelBuffers; import org.jboss.netty.buffer.ChannelBuffer; import org.jboss.netty.channel.Channel; @@ -35,9 +38,38 @@ import org.slf4j.LoggerFactory; public class BookieProtoEncoding { private final static Logger LOG = LoggerFactory.getLogger(BookieProtoEncoding.class); - public static class RequestEncoder extends OneToOneEncoder { + static final EnDecoder REQ_PREV3 = new RequestEnDeCoderPreV3(); + static final EnDecoder REP_PREV3 = new ResponseEnDeCoderPreV3(); + static final EnDecoder REQ_V3 = new RequestEnDecoderV3(); + static final EnDecoder REP_V3 = new ResponseEnDecoderV3(); + + static interface EnDecoder { + + /** + * Encode a object into channel buffer. + * + * @param object + * object. + * @return encode buffer. + * @throws Exception + */ + public Object encode(Object object, ChannelBufferFactory factory) throws Exception; + + /** + * Decode a packet into an object. + * + * @param packet + * received packet. + * @return parsed object. + * @throws Exception + */ + public Object decode(ChannelBuffer packet) throws Exception; + + } + + static class RequestEnDeCoderPreV3 implements EnDecoder { @Override - public Object encode(ChannelHandlerContext ctx, Channel channel, Object msg) + public Object encode(Object msg, ChannelBufferFactory bufferFactory) throws Exception { if (!(msg instanceof BookieProtocol.Request)) { return msg; @@ -47,7 +79,7 @@ public class BookieProtoEncoding { BookieProtocol.AddRequest ar = (BookieProtocol.AddRequest)r; int totalHeaderSize = 4 // for the header + BookieProtocol.MASTER_KEY_LENGTH; // for the master key - ChannelBuffer buf = channel.getConfig().getBufferFactory().getBuffer(totalHeaderSize); + ChannelBuffer buf = bufferFactory.getBuffer(totalHeaderSize); buf.writeInt(new PacketHeader(r.getProtocolVersion(), r.getOpCode(), r.getFlags()).toInt()); buf.writeBytes(r.getMasterKey(), 0, BookieProtocol.MASTER_KEY_LENGTH); return ChannelBuffers.wrappedBuffer(buf, ar.getData()); @@ -60,7 +92,7 @@ public class BookieProtoEncoding { totalHeaderSize += BookieProtocol.MASTER_KEY_LENGTH; } - ChannelBuffer buf = channel.getConfig().getBufferFactory().getBuffer(totalHeaderSize); + ChannelBuffer buf = bufferFactory.getBuffer(totalHeaderSize); buf.writeInt(new PacketHeader(r.getProtocolVersion(), r.getOpCode(), r.getFlags()).toInt()); buf.writeLong(r.getLedgerId()); buf.writeLong(r.getEntryId()); @@ -71,17 +103,10 @@ public class BookieProtoEncoding { return buf; } } - } - public static class RequestDecoder extends OneToOneDecoder { @Override - public Object decode(ChannelHandlerContext ctx, Channel channel, Object msg) + public Object decode(ChannelBuffer packet) throws Exception { - if (!(msg instanceof ChannelBuffer)) { - return msg; - } - ChannelBuffer packet = (ChannelBuffer)msg; - PacketHeader h = PacketHeader.fromInt(packet.readInt()); // packet format is different between ADDENTRY and READENTRY @@ -117,20 +142,19 @@ public class BookieProtoEncoding { return new BookieProtocol.ReadRequest(h.getVersion(), ledgerId, entryId, flags); } } - return msg; + return packet; } } - public static class ResponseEncoder extends OneToOneEncoder { + static class ResponseEnDeCoderPreV3 implements EnDecoder { @Override - public Object encode(ChannelHandlerContext ctx, Channel channel, Object msg) + public Object encode(Object msg, ChannelBufferFactory bufferFactory) throws Exception { if (!(msg instanceof BookieProtocol.Response)) { return msg; } BookieProtocol.Response r = (BookieProtocol.Response)msg; - ChannelBuffer buf = ctx.getChannel().getConfig().getBufferFactory() - .getBuffer(24); + ChannelBuffer buf = bufferFactory.getBuffer(24); buf.writeInt(new PacketHeader(r.getProtocolVersion(), r.getOpCode(), (short)0).toInt()); buf.writeInt(r.getErrorCode()); @@ -153,17 +177,9 @@ public class BookieProtoEncoding { return msg; } } - } - - public static class ResponseDecoder extends OneToOneDecoder { @Override - public Object decode(ChannelHandlerContext ctx, Channel channel, Object msg) + public Object decode(ChannelBuffer buffer) throws Exception { - if (!(msg instanceof ChannelBuffer)) { - return msg; - } - - final ChannelBuffer buffer = (ChannelBuffer)msg; final int rc; final long ledgerId, entryId; final PacketHeader header; @@ -185,10 +201,130 @@ public class BookieProtoEncoding { ledgerId, entryId); } default: - LOG.error("Unexpected response of type {} received from {}", - header.getOpCode(), channel.getRemoteAddress()); + return buffer; + } + } + } + + static class RequestEnDecoderV3 implements EnDecoder { + + @Override + public Object decode(ChannelBuffer packet) throws Exception { + return BookkeeperProtocol.Request.parseFrom(new ChannelBufferInputStream(packet)); + } + + @Override + public Object encode(Object msg, ChannelBufferFactory factory) throws Exception { + BookkeeperProtocol.Request request = (BookkeeperProtocol.Request) msg; + return ChannelBuffers.wrappedBuffer(request.toByteArray()); + } + + } + + static class ResponseEnDecoderV3 implements EnDecoder { + + @Override + public Object decode(ChannelBuffer packet) throws Exception { + return BookkeeperProtocol.Response.parseFrom(new ChannelBufferInputStream(packet)); + } + + @Override + public Object encode(Object msg, ChannelBufferFactory factory) throws Exception { + BookkeeperProtocol.Response response = (BookkeeperProtocol.Response) msg; + return ChannelBuffers.wrappedBuffer(response.toByteArray()); + } + + } + + public static class RequestEncoder extends OneToOneEncoder { + + @Override + protected Object encode(ChannelHandlerContext ctx, Channel channel, Object msg) + throws Exception { + if (LOG.isDebugEnabled()) { + LOG.debug("Encode request {} to channel {}.", msg, channel); + } + if (msg instanceof BookkeeperProtocol.Request) { + return REQ_V3.encode(msg, ctx.getChannel().getConfig().getBufferFactory()); + } else if (msg instanceof BookieProtocol.Request) { + return REQ_PREV3.encode(msg, ctx.getChannel().getConfig().getBufferFactory()); + } else { + LOG.error("Invalid request to encode to {}: {}", channel, msg.getClass().getName()); return msg; } } } + + public static class RequestDecoder extends OneToOneDecoder { + + @Override + protected Object decode(ChannelHandlerContext ctx, Channel channel, Object msg) + throws Exception { + if (LOG.isDebugEnabled()) { + LOG.debug("Received request {} from channel {} to decode.", msg, channel); + } + if (!(msg instanceof ChannelBuffer)) { + return msg; + } + ChannelBuffer buffer = (ChannelBuffer) msg; + try { + buffer.markReaderIndex(); + try { + return REQ_V3.decode(buffer); + } catch (InvalidProtocolBufferException e) { + buffer.resetReaderIndex(); + return REQ_PREV3.decode(buffer); + } + } catch (Exception e) { + LOG.error("Failed to decode a request from {} : ", channel, e); + throw e; + } + } + } + + public static class ResponseEncoder extends OneToOneEncoder { + + @Override + protected Object encode(ChannelHandlerContext ctx, Channel channel, Object msg) + throws Exception { + if (LOG.isDebugEnabled()) { + LOG.debug("Encode response {} to channel {}.", msg, channel); + } + if (msg instanceof BookkeeperProtocol.Response) { + return REP_V3.encode(msg, ctx.getChannel().getConfig().getBufferFactory()); + } else if (msg instanceof BookieProtocol.Response) { + return REP_PREV3.encode(msg, ctx.getChannel().getConfig().getBufferFactory()); + } else { + LOG.error("Invalid response to encode to {}: {}", channel, msg.getClass().getName()); + return msg; + } + } + } + + public static class ResponseDecoder extends OneToOneDecoder { + + @Override + protected Object decode(ChannelHandlerContext ctx, Channel channel, Object msg) + throws Exception { + if (LOG.isDebugEnabled()) { + LOG.debug("Received response {} from channel {} to decode.", msg, channel); + } + if (!(msg instanceof ChannelBuffer)) { + return msg; + } + ChannelBuffer buffer = (ChannelBuffer) msg; + try { + buffer.markReaderIndex(); + try { + return REP_V3.decode(buffer); + } catch (InvalidProtocolBufferException e) { + buffer.resetReaderIndex(); + return REP_PREV3.decode(buffer); + } + } catch (Exception e) { + LOG.error("Failed to decode a response from channel {} : ", channel, e); + throw e; + } + } + } } Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestHandler.java URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestHandler.java?rev=1613315&r1=1613314&r2=1613315&view=diff ============================================================================== --- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestHandler.java (original) +++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestHandler.java Thu Jul 24 22:34:19 2014 @@ -22,7 +22,6 @@ package org.apache.bookkeeper.proto; import org.apache.bookkeeper.conf.ServerConfiguration; import org.apache.bookkeeper.processor.RequestProcessor; -import org.jboss.netty.channel.Channel; import org.jboss.netty.channel.ChannelHandlerContext; import org.jboss.netty.channel.ChannelStateEvent; import org.jboss.netty.channel.ExceptionEvent; @@ -71,6 +70,7 @@ class BookieRequestHandler extends Simpl LOG.debug("Channel connected {}", e); } + @Override public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { LOG.debug("Channel disconnected {}", e); @@ -78,14 +78,12 @@ class BookieRequestHandler extends Simpl @Override public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception { - if (!(e.getMessage() instanceof BookieProtocol.Request)) { + Object event = e.getMessage(); + if (!(event instanceof BookkeeperProtocol.Request || event instanceof BookieProtocol.Request)) { ctx.sendUpstream(e); return; } - BookieProtocol.Request r = (BookieProtocol.Request)e.getMessage(); - Channel c = ctx.getChannel(); - requestProcessor.processRequest(r, c); + requestProcessor.processRequest(event, ctx.getChannel()); } - } Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java?rev=1613315&r1=1613314&r2=1613315&view=diff ============================================================================== --- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java (original) +++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java Thu Jul 24 22:34:19 2014 @@ -21,26 +21,18 @@ package org.apache.bookkeeper.proto; import com.google.common.util.concurrent.ThreadFactoryBuilder; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; import org.apache.bookkeeper.bookie.Bookie; -import org.apache.bookkeeper.bookie.BookieException; import org.apache.bookkeeper.conf.ServerConfiguration; import org.apache.bookkeeper.net.BookieSocketAddress; import org.apache.bookkeeper.processor.RequestProcessor; -import org.apache.bookkeeper.util.MathUtils; import org.jboss.netty.channel.Channel; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class BookieRequestProcessor implements RequestProcessor, BookkeeperInternalCallbacks.WriteCallback { +public class BookieRequestProcessor implements RequestProcessor { private final static Logger LOG = LoggerFactory.getLogger(BookieRequestProcessor.class); /** @@ -101,234 +93,83 @@ public class BookieRequestProcessor impl } @Override - public void processRequest(BookieProtocol.Request r, Channel c) { - if (r.getProtocolVersion() < BookieProtocol.LOWEST_COMPAT_PROTOCOL_VERSION - || r.getProtocolVersion() > BookieProtocol.CURRENT_PROTOCOL_VERSION) { - LOG.error("Invalid protocol version, expected something between " - + BookieProtocol.LOWEST_COMPAT_PROTOCOL_VERSION - + " & " + BookieProtocol.CURRENT_PROTOCOL_VERSION - + ". got " + r.getProtocolVersion()); - c.write(ResponseBuilder.buildErrorResponse(BookieProtocol.EBADVERSION, r)); - return; - } - - switch (r.getOpCode()) { - case BookieProtocol.ADDENTRY: - processAddRequest(r, c); - break; - case BookieProtocol.READENTRY: - processReadRequest(r, c); - break; - default: - LOG.error("Unknown op type {}, sending error", r.getOpCode()); - c.write(ResponseBuilder.buildErrorResponse(BookieProtocol.EBADREQ, r)); - if (statsEnabled) { - bkStats.getOpStats(BKStats.STATS_UNKNOWN).incrementFailedOps(); + public void processRequest(Object msg, Channel c) { + // If we can decode this packet as a Request protobuf packet, process + // it as a version 3 packet. Else, just use the old protocol. + if (msg instanceof BookkeeperProtocol.Request) { + BookkeeperProtocol.Request r = (BookkeeperProtocol.Request) msg; + BookkeeperProtocol.BKPacketHeader header = r.getHeader(); + switch (header.getOperation()) { + case ADD_ENTRY: + processAddRequestV3(r, c); + break; + case READ_ENTRY: + processReadRequestV3(r, c); + break; + default: + BookkeeperProtocol.Response.Builder response = + BookkeeperProtocol.Response.newBuilder().setHeader(r.getHeader()) + .setStatus(BookkeeperProtocol.StatusCode.EBADREQ); + c.write(response.build()); + if (statsEnabled) { + bkStats.getOpStats(BKStats.STATS_UNKNOWN).incrementFailedOps(); + } + break; } - break; - } - } - - class AddCtx { - final Channel c; - final BookieProtocol.AddRequest r; - final long startTime; - - AddCtx(Channel c, BookieProtocol.AddRequest r) { - this.c = c; - this.r = r; - - if (statsEnabled) { - startTime = MathUtils.now(); - } else { - startTime = 0; + } else { + BookieProtocol.Request r = (BookieProtocol.Request) msg; + // process packet + switch (r.getOpCode()) { + case BookieProtocol.ADDENTRY: + processAddRequest(r, c); + break; + case BookieProtocol.READENTRY: + processReadRequest(r, c); + break; + default: + LOG.error("Unknown op type {}, sending error", r.getOpCode()); + c.write(ResponseBuilder.buildErrorResponse(BookieProtocol.EBADREQ, r)); + if (statsEnabled) { + bkStats.getOpStats(BKStats.STATS_UNKNOWN).incrementFailedOps(); + } + break; } } } - private void processAddRequest(final BookieProtocol.Request r, final Channel c) { + private void processAddRequestV3(final BookkeeperProtocol.Request r, final Channel c) { + WriteEntryProcessorV3 write = new WriteEntryProcessorV3(r, c, bookie); if (null == writeThreadPool) { - handleAdd(r, c); + write.run(); } else { - writeThreadPool.submit(new Runnable() { - @Override - public void run() { - handleAdd(r, c); - } - }); + writeThreadPool.submit(write); } } - private void handleAdd(BookieProtocol.Request r, Channel c) { - assert (r instanceof BookieProtocol.AddRequest); - BookieProtocol.AddRequest add = (BookieProtocol.AddRequest) r; - - if (bookie.isReadOnly()) { - LOG.warn("BookieServer is running as readonly mode," - + " so rejecting the request from the client!"); - c.write(ResponseBuilder.buildErrorResponse(BookieProtocol.EREADONLY, add)); - if (statsEnabled) { - bkStats.getOpStats(BKStats.STATS_ADD).incrementFailedOps(); - } - return; - } - - int rc = BookieProtocol.EOK; - try { - if (add.isRecoveryAdd()) { - bookie.recoveryAddEntry(add.getDataAsByteBuffer(), this, new AddCtx(c, add), - add.getMasterKey()); - } else { - bookie.addEntry(add.getDataAsByteBuffer(), - this, new AddCtx(c, add), add.getMasterKey()); - } - } catch (IOException e) { - LOG.error("Error writing " + add, e); - rc = BookieProtocol.EIO; - } catch (BookieException.LedgerFencedException lfe) { - LOG.error("Attempt to write to fenced ledger", lfe); - rc = BookieProtocol.EFENCED; - } catch (BookieException e) { - LOG.error("Unauthorized access to ledger " + add.getLedgerId(), e); - rc = BookieProtocol.EUA; - } - if (rc != BookieProtocol.EOK) { - c.write(ResponseBuilder.buildErrorResponse(rc, add)); - if (statsEnabled) { - bkStats.getOpStats(BKStats.STATS_ADD).incrementFailedOps(); - } + private void processReadRequestV3(final BookkeeperProtocol.Request r, final Channel c) { + ReadEntryProcessorV3 read = new ReadEntryProcessorV3(r, c, bookie); + if (null == readThreadPool) { + read.run(); + } else { + readThreadPool.submit(read); } } - @Override - public void writeComplete(int rc, long ledgerId, long entryId, - BookieSocketAddress addr, Object ctx) { - assert (ctx instanceof AddCtx); - AddCtx addctx = (AddCtx) ctx; - addctx.c.write(ResponseBuilder.buildAddResponse(addctx.r)); - - if (statsEnabled) { - // compute the latency - if (0 == rc) { - // for add operations, we compute latency in writeComplete callbacks. - long elapsedTime = MathUtils.now() - addctx.startTime; - bkStats.getOpStats(BKStats.STATS_ADD).updateLatency(elapsedTime); - } else { - bkStats.getOpStats(BKStats.STATS_ADD).incrementFailedOps(); - } + private void processAddRequest(final BookieProtocol.Request r, final Channel c) { + WriteEntryProcessor write = new WriteEntryProcessor(r, c, bookie); + if (null == writeThreadPool) { + write.run(); + } else { + writeThreadPool.submit(write); } } private void processReadRequest(final BookieProtocol.Request r, final Channel c) { + ReadEntryProcessor read = new ReadEntryProcessor(r, c, bookie); if (null == readThreadPool) { - handleRead(r, c); - } else { - readThreadPool.submit(new Runnable() { - @Override - public void run() { - handleRead(r, c); - } - }); - } - } - - private void handleRead(BookieProtocol.Request r, Channel c) { - assert (r instanceof BookieProtocol.ReadRequest); - BookieProtocol.ReadRequest read = (BookieProtocol.ReadRequest) r; - - LOG.debug("Received new read request: {}", r); - int errorCode = BookieProtocol.EIO; - long startTime = 0; - if (statsEnabled) { - startTime = MathUtils.now(); - } - ByteBuffer data = null; - try { - Future fenceResult = null; - if (read.isFencingRequest()) { - LOG.warn("Ledger " + r.getLedgerId() + " fenced by " + c.getRemoteAddress()); - - if (read.hasMasterKey()) { - fenceResult = bookie.fenceLedger(read.getLedgerId(), read.getMasterKey()); - } else { - LOG.error("Password not provided, Not safe to fence {}", read.getLedgerId()); - if (statsEnabled) { - bkStats.getOpStats(BKStats.STATS_READ).incrementFailedOps(); - } - throw BookieException.create(BookieException.Code.UnauthorizedAccessException); - } - } - data = bookie.readEntry(r.getLedgerId(), r.getEntryId()); - LOG.debug("##### Read entry ##### {}", data.remaining()); - if (null != fenceResult) { - // TODO: - // currently we don't have readCallback to run in separated read - // threads. after BOOKKEEPER-429 is complete, we could improve - // following code to make it not wait here - // - // For now, since we only try to wait after read entry. so writing - // to journal and read entry are executed in different thread - // it would be fine. - try { - Boolean fenced = fenceResult.get(1000, TimeUnit.MILLISECONDS); - if (null == fenced || !fenced) { - // if failed to fence, fail the read request to make it retry. - errorCode = BookieProtocol.EIO; - data = null; - } else { - errorCode = BookieProtocol.EOK; - } - } catch (InterruptedException ie) { - LOG.error("Interrupting fence read entry " + read, ie); - errorCode = BookieProtocol.EIO; - data = null; - } catch (ExecutionException ee) { - LOG.error("Failed to fence read entry " + read, ee); - errorCode = BookieProtocol.EIO; - data = null; - } catch (TimeoutException te) { - LOG.error("Timeout to fence read entry " + read, te); - errorCode = BookieProtocol.EIO; - data = null; - } - } else { - errorCode = BookieProtocol.EOK; - } - } catch (Bookie.NoLedgerException e) { - if (LOG.isTraceEnabled()) { - LOG.error("Error reading " + read, e); - } - errorCode = BookieProtocol.ENOLEDGER; - } catch (Bookie.NoEntryException e) { - if (LOG.isTraceEnabled()) { - LOG.error("Error reading " + read, e); - } - errorCode = BookieProtocol.ENOENTRY; - } catch (IOException e) { - if (LOG.isTraceEnabled()) { - LOG.error("Error reading " + read, e); - } - errorCode = BookieProtocol.EIO; - } catch (BookieException e) { - LOG.error("Unauthorized access to ledger " + read.getLedgerId(), e); - errorCode = BookieProtocol.EUA; - } - - LOG.trace("Read entry rc = {} for {}", - new Object[] { errorCode, read }); - if (errorCode == BookieProtocol.EOK) { - assert data != null; - - c.write(ResponseBuilder.buildReadResponse(data, read)); - if (statsEnabled) { - long elapsedTime = MathUtils.now() - startTime; - bkStats.getOpStats(BKStats.STATS_READ).updateLatency(elapsedTime); - } + read.run(); } else { - c.write(ResponseBuilder.buildErrorResponse(errorCode, read)); - if (statsEnabled) { - bkStats.getOpStats(BKStats.STATS_READ).incrementFailedOps(); - } + readThreadPool.submit(read); } }