zookeeper-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From iv...@apache.org
Subject svn commit: r1189753 - in /zookeeper/bookkeeper/trunk: ./ bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/ bookkeeper...
Date Thu, 27 Oct 2011 13:41:19 GMT
Author: ivank
Date: Thu Oct 27 13:41:19 2011
New Revision: 1189753

URL: http://svn.apache.org/viewvc?rev=1189753&view=rev
Log:
BOOKKEEPER-83: Added versioning and flags to the bookie protocol (ivank)

Added:
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestProtoVersions.java
Modified:
    zookeeper/bookkeeper/trunk/CHANGES.txt
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BKException.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieServer.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/NIOServerFactory.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieClientTest.java

Modified: zookeeper/bookkeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/CHANGES.txt?rev=1189753&r1=1189752&r2=1189753&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/CHANGES.txt (original)
+++ zookeeper/bookkeeper/trunk/CHANGES.txt Thu Oct 27 13:41:19 2011
@@ -52,6 +52,8 @@ BUGFIXES:
  
   BOOKKEEPER-94: Double callbacks in readLastConfirmedOp which fails readLastConfirmed operation
even received enough valid responses. (Sijie Guo via ivank)
 
+  BOOKKEEPER-83: Added versioning and flags to the bookie protocol (ivank)
+
  hedwig-server/
 
   BOOKKEEPER-43: NullPointException when releasing topic (Sijie Guo via breed)

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BKException.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BKException.java?rev=1189753&r1=1189752&r2=1189753&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BKException.java
(original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BKException.java
Thu Oct 27 13:41:19 2011
@@ -74,6 +74,8 @@ public abstract class BKException extend
             return new BKIncorrectParameterException();
         case Code.InterruptedException:
             return new BKInterruptedException();
+        case Code.ProtocolVersionException:
+            return new BKProtocolVersionException();
         default:
             return new BKIllegalOpException();
         }
@@ -100,7 +102,7 @@ public abstract class BKException extend
         int NoSuchEntryException = -13;
         int IncorrectParameterException = -14;
         int InterruptedException = -15;
-
+        int ProtocolVersionException = -16;
         int IllegalOpException = -100;
     }
 
@@ -146,6 +148,8 @@ public abstract class BKException extend
             return "Incorrect parameter input";
         case Code.InterruptedException:
             return "Interrupted while waiting for permit";
+        case Code.ProtocolVersionException:
+            return "Bookie protocol version on server is incompatible with client";
         default:
             return "Invalid operation";
         }
@@ -205,6 +209,12 @@ public abstract class BKException extend
         }
     }
 
+    public static class BKProtocolVersionException extends BKException {
+        public BKProtocolVersionException() {
+            super(Code.ProtocolVersionException);
+        }
+    }
+
     public static class BKNoSuchLedgerExistsException extends BKException {
         public BKNoSuchLedgerExistsException() {
             super(Code.NoSuchLedgerExistsException);

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java?rev=1189753&r1=1189752&r2=1189753&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java
(original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java
Thu Oct 27 13:41:19 2011
@@ -28,13 +28,78 @@ package org.apache.bookkeeper.proto;
  *
  */
 public interface BookieProtocol {
+
+    /**
+     * Lowest protocol version which will work with the bookie.
+     */
+    public static final byte LOWEST_COMPAT_PROTOCOL_VERSION = 0;
+
+    /**
+     * Current version of the protocol, which client will use. 
+     */
+    public static final byte CURRENT_PROTOCOL_VERSION = 1;
+
+    /** 
+     * The first int of a packet is the header.
+     * It contains the version, opCode and flags.
+     * The initial versions of BK didn't have this structure
+     * and just had an int representing the opCode as the 
+     * first int. This handles that case also. 
+     */
+    static class PacketHeader {
+        final byte version;
+        final byte opCode;
+        final short flags;
+
+        public PacketHeader(byte version, byte opCode, short flags) {
+            this.version = version;
+            this.opCode = opCode;
+            this.flags = flags;
+        }
+        
+        int toInt() {
+            if (version == 0) {
+                return (int)opCode;
+            } else {
+                return ((version & 0xFF) << 24) 
+                    | ((opCode & 0xFF) << 16)
+                    | (flags & 0xFFFF);
+            }
+        }
+
+        static PacketHeader fromInt(int i) {
+            byte version = (byte)(i >> 24); 
+            byte opCode = 0;
+            short flags = 0;
+            if (version == 0) {
+                opCode = (byte)i;
+            } else {
+                opCode = (byte)((i >> 16) & 0xFF);
+                flags = (short)(i & 0xFFFF);
+            }
+            return new PacketHeader(version, opCode, flags);
+        }
+
+        byte getVersion() {
+            return version;
+        }
+
+        byte getOpCode() {
+            return opCode;
+        }
+
+        short getFlags() {
+            return flags;
+        }
+    }
+
     /**
      * The Add entry request payload will be a ledger entry exactly as it should
      * be logged. The response payload will be a 4-byte integer that has the
      * error code followed by the 8-byte ledger number and 8-byte entry number
      * of the entry written.
      */
-    public static final int ADDENTRY = 1;
+    public static final byte ADDENTRY = 1;
     /**
      * The Read entry request payload will be the ledger number and entry number
      * to read. (The ledger number is an 8-byte integer and the entry number is
@@ -44,7 +109,7 @@ public interface BookieProtocol {
      * requested. (Note that the first sixteen bytes of the entry happen to be
      * the ledger number and entry number as well.)
      */
-    public static final int READENTRY = 2;
+    public static final byte READENTRY = 2;
 
     /**
      * The error code that indicates success
@@ -71,5 +136,8 @@ public interface BookieProtocol {
      * Unauthorized access to ledger
      */
     public static final int EUA = 102;
-
+    /**
+     * The server version is incompatible with the client
+     */
+    public static final int EBADVERSION = 103;
 }

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieServer.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieServer.java?rev=1189753&r1=1189752&r2=1189753&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieServer.java
(original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieServer.java
Thu Oct 27 13:41:19 2011
@@ -29,6 +29,7 @@ import java.nio.ByteBuffer;
 import org.apache.bookkeeper.bookie.Bookie;
 import org.apache.bookkeeper.bookie.BookieException;
 import org.apache.bookkeeper.proto.NIOServerFactory.Cnxn;
+import static org.apache.bookkeeper.proto.BookieProtocol.PacketHeader;
 import org.apache.log4j.Logger;
 
 /**
@@ -147,8 +148,24 @@ public class BookieServer implements NIO
     }
 
     public void processPacket(ByteBuffer packet, Cnxn src) {
-        int type = packet.getInt();
-        switch (type) {
+        PacketHeader h = PacketHeader.fromInt(packet.getInt());
+
+        ByteBuffer bb = packet.duplicate();
+        long ledgerId = bb.getLong();
+        long entryId = bb.getLong();
+        
+        if (h.getVersion() < BookieProtocol.LOWEST_COMPAT_PROTOCOL_VERSION
+            || h.getVersion() > BookieProtocol.CURRENT_PROTOCOL_VERSION) {
+            LOG.error("Invalid protocol version, expected something between "
+                      + BookieProtocol.LOWEST_COMPAT_PROTOCOL_VERSION 
+                      + " & " + BookieProtocol.CURRENT_PROTOCOL_VERSION
+                    + ". got " + h.getVersion());
+            src.sendResponse(buildResponse(BookieProtocol.EBADVERSION, 
+                                           h.getVersion(), h.getOpCode(), ledgerId, entryId));
+            return;
+        }
+        
+        switch (h.getOpCode()) {
         case BookieProtocol.ADDENTRY:
             try {
                 byte[] masterKey = new byte[20];
@@ -156,70 +173,41 @@ public class BookieServer implements NIO
                 // LOG.debug("Master key: " + new String(masterKey));
                 bookie.addEntry(packet.slice(), this, src, masterKey);
             } catch (IOException e) {
-                ByteBuffer bb = packet.duplicate();
-
-                long ledgerId = bb.getLong();
-                long entryId = bb.getLong();
                 LOG.error("Error writing " + entryId + "@" + ledgerId, e);
-                ByteBuffer eio = ByteBuffer.allocate(8 + 16);
-                eio.putInt(type);
-                eio.putInt(BookieProtocol.EIO);
-                eio.putLong(ledgerId);
-                eio.putLong(entryId);
-                eio.flip();
-                src.sendResponse(new ByteBuffer[] { eio });
+                src.sendResponse(buildResponse(BookieProtocol.EIO, h.getVersion(), h.getOpCode(),
ledgerId, entryId));
             } catch (BookieException e) {
-                ByteBuffer bb = packet.duplicate();
-                long ledgerId = bb.getLong();
-                long entryId = bb.getLong();
-
                 LOG.error("Unauthorized access to ledger " + ledgerId);
-
-                ByteBuffer eio = ByteBuffer.allocate(8 + 16);
-                eio.putInt(type);
-                eio.putInt(BookieProtocol.EUA);
-                eio.putLong(ledgerId);
-                eio.putLong(entryId);
-                eio.flip();
-                src.sendResponse(new ByteBuffer[] { eio });
+                src.sendResponse(buildResponse(BookieProtocol.EUA, h.getVersion(), h.getOpCode(),
ledgerId, entryId));
             }
             break;
         case BookieProtocol.READENTRY:
             ByteBuffer[] rsp = new ByteBuffer[2];
-            ByteBuffer rc = ByteBuffer.allocate(8 + 8 + 8);
-            rsp[0] = rc;
-            rc.putInt(type);
-
-            long ledgerId = packet.getLong();
-            long entryId = packet.getLong();
             LOG.debug("Received new read request: " + ledgerId + ", " + entryId);
+            int errorCode = BookieProtocol.EIO;
             try {
                 rsp[1] = bookie.readEntry(ledgerId, entryId);
                 LOG.debug("##### Read entry ##### " + rsp[1].remaining());
-                rc.putInt(BookieProtocol.EOK);
+                errorCode = BookieProtocol.EOK;
             } catch (Bookie.NoLedgerException e) {
                 if (LOG.isTraceEnabled()) {
                     LOG.error("Error reading " + entryId + "@" + ledgerId, e);
                 }
-                rc.putInt(BookieProtocol.ENOLEDGER);
+                errorCode = BookieProtocol.ENOLEDGER;
             } catch (Bookie.NoEntryException e) {
                 if (LOG.isTraceEnabled()) {
                     LOG.error("Error reading " + entryId + "@" + ledgerId, e);
                 }
-                rc.putInt(BookieProtocol.ENOENTRY);
+                errorCode = BookieProtocol.ENOENTRY;
             } catch (IOException e) {
                 if (LOG.isTraceEnabled()) {
                     LOG.error("Error reading " + entryId + "@" + ledgerId, e);
                 }
-                rc.putInt(BookieProtocol.EIO);
+                errorCode = BookieProtocol.EIO;
             }
-            rc.putLong(ledgerId);
-            rc.putLong(entryId);
-            rc.flip();
+            rsp[0] = buildResponse(errorCode, h.getVersion(), h.getOpCode(), ledgerId, entryId);
+
             if (LOG.isTraceEnabled()) {
-                int rcCode = rc.getInt();
-                rc.rewind();
-                LOG.trace("Read entry rc = " + rcCode + " for " + entryId + "@" + ledgerId);
+                LOG.trace("Read entry rc = " + errorCode + " for " + entryId + "@" + ledgerId);
             }
             if (rsp[1] == null) {
                 // We haven't filled in entry data, so we have to send back
@@ -232,19 +220,28 @@ public class BookieServer implements NIO
             LOG.debug("Sending response for: " + entryId + ", " + new String(rsp[1].array()));
             src.sendResponse(rsp);
             break;
-        default:
-            ByteBuffer badType = ByteBuffer.allocate(8);
-            badType.putInt(type);
-            badType.putInt(BookieProtocol.EBADREQ);
-            badType.flip();
-            src.sendResponse(new ByteBuffer[] { packet });
+        default: 
+            src.sendResponse(buildResponse(BookieProtocol.EBADREQ, h.getVersion(), h.getOpCode(),
ledgerId, entryId));
         }
     }
+    
+    private ByteBuffer buildResponse(int errorCode, byte version, byte opCode, long ledgerId,
long entryId) {
+        ByteBuffer rsp = ByteBuffer.allocate(24);
+        rsp.putInt(new PacketHeader(version, 
+                                    opCode, (short)0).toInt());
+        rsp.putInt(errorCode);
+        rsp.putLong(ledgerId);
+        rsp.putLong(entryId);
+
+        rsp.flip();
+        return rsp;
+    }
 
     public void writeComplete(int rc, long ledgerId, long entryId, InetSocketAddress addr,
Object ctx) {
         Cnxn src = (Cnxn) ctx;
         ByteBuffer bb = ByteBuffer.allocate(24);
-        bb.putInt(BookieProtocol.ADDENTRY);
+        bb.putInt(new PacketHeader(BookieProtocol.CURRENT_PROTOCOL_VERSION, 
+                                   BookieProtocol.ADDENTRY, (short)0).toInt());
         bb.putInt(rc);
         bb.putLong(ledgerId);
         bb.putLong(entryId);

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/NIOServerFactory.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/NIOServerFactory.java?rev=1189753&r1=1189752&r2=1189753&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/NIOServerFactory.java
(original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/NIOServerFactory.java
Thu Oct 27 13:41:19 2011
@@ -461,7 +461,7 @@ public class NIOServerFactory extends Th
             makeWritable(sk);
         }
 
-        synchronized public void sendResponse(ByteBuffer bb[]) {
+        synchronized public void sendResponse(ByteBuffer... bb) {
             if (closed) {
                 return;
             }

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java?rev=1189753&r1=1189752&r2=1189753&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
(original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
Thu Oct 27 13:41:19 2011
@@ -30,6 +30,7 @@ import org.apache.bookkeeper.client.BKEx
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryCallback;
+import static org.apache.bookkeeper.proto.BookieProtocol.PacketHeader;
 import org.apache.bookkeeper.util.OrderedSafeExecutor;
 import org.apache.bookkeeper.util.SafeRunnable;
 import org.apache.log4j.Logger;
@@ -229,7 +230,8 @@ public class PerChannelBookieClient exte
 
         ChannelBuffer header = channel.getConfig().getBufferFactory().getBuffer(totalHeaderSize);
         header.writeInt(totalHeaderSize - 4 + entrySize);
-        header.writeInt(BookieProtocol.ADDENTRY);
+        header.writeInt(new PacketHeader(BookieProtocol.CURRENT_PROTOCOL_VERSION, 
+                                         BookieProtocol.ADDENTRY, (short)0).toInt());
         header.writeBytes(masterKey);
 
         ChannelBuffer wrappedBuffer = ChannelBuffers.wrappedBuffer(header, toSend);
@@ -264,7 +266,8 @@ public class PerChannelBookieClient exte
 
         ChannelBuffer tmpEntry = channel.getConfig().getBufferFactory().getBuffer(totalHeaderSize);
         tmpEntry.writeInt(totalHeaderSize - 4);
-        tmpEntry.writeInt(BookieProtocol.READENTRY);
+        tmpEntry.writeInt(new PacketHeader(BookieProtocol.CURRENT_PROTOCOL_VERSION, 
+                                           BookieProtocol.READENTRY, (short)0).toInt());
         tmpEntry.writeLong(ledgerId);
         tmpEntry.writeLong(entryId);
 
@@ -396,7 +399,8 @@ public class PerChannelBookieClient exte
     public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception
{
         Throwable t = e.getCause();
         if (t instanceof CorruptedFrameException || t instanceof TooLongFrameException) {
-            LOG.error("Corrupted fram recieved from bookie: " + e.getChannel().getRemoteAddress());
+            LOG.error("Corrupted fram received from bookie: "
+                      + e.getChannel().getRemoteAddress());
             return;
         }
         if (t instanceof IOException) {
@@ -423,9 +427,10 @@ public class PerChannelBookieClient exte
         final ChannelBuffer buffer = (ChannelBuffer) e.getMessage();
         final int type, rc;
         final long ledgerId, entryId;
+        final PacketHeader header;
 
         try {
-            type = buffer.readInt();
+            header = PacketHeader.fromInt(buffer.readInt());
             rc = buffer.readInt();
             ledgerId = buffer.readLong();
             entryId = buffer.readLong();
@@ -437,7 +442,7 @@ public class PerChannelBookieClient exte
         executor.submitOrdered(ledgerId, new SafeRunnable() {
             @Override
             public void safeRun() {
-                switch (type) {
+                switch (header.getOpCode()) {
                 case BookieProtocol.ADDENTRY:
                     handleAddResponse(ledgerId, entryId, rc);
                     break;
@@ -445,10 +450,10 @@ public class PerChannelBookieClient exte
                     handleReadResponse(ledgerId, entryId, rc, buffer);
                     break;
                 default:
-                    LOG.error("Unexpected response, type: " + type + " recieved from bookie:
" + addr + " , ignoring");
+                    LOG.error("Unexpected response, type: " + header.getOpCode() 
+                              + " received from bookie: " + addr + " , ignoring");
                 }
             }
-
         });
     }
 
@@ -465,6 +470,8 @@ public class PerChannelBookieClient exte
             LOG.error("Add for ledger: " + ledgerId + ", entry: " + entryId + " failed on
bookie: " + addr
                       + " with code: " + rc);
             rc = BKException.Code.WriteException;
+        } else if (rc == BookieProtocol.EBADVERSION) {
+            rc = BKException.Code.ProtocolVersionException;
         } else {
             rc = BKException.Code.OK;
         }
@@ -496,6 +503,8 @@ public class PerChannelBookieClient exte
             rc = BKException.Code.OK;
         } else if (rc == BookieProtocol.ENOENTRY || rc == BookieProtocol.ENOLEDGER) {
             rc = BKException.Code.NoSuchEntryException;
+        } else if (rc == BookieProtocol.EBADVERSION) {
+            rc = BKException.Code.ProtocolVersionException;
         } else {
             LOG.error("Read for ledger: " + ledgerId + ", entry: " + entryId + " failed on
bookie: " + addr
                       + " with code: " + rc);
@@ -515,7 +524,7 @@ public class PerChannelBookieClient exte
         }
 
         if (readCompletion == null) {
-            LOG.error("Unexpected read response recieved from bookie: " + addr + " for ledger:
" + ledgerId
+            LOG.error("Unexpected read response received from bookie: " + addr + " for ledger:
" + ledgerId
                       + ", entry: " + entryId + " , ignoring");
             return;
         }
@@ -527,8 +536,8 @@ public class PerChannelBookieClient exte
      * Boiler-plate wrapper classes follow
      *
      */
-
-    private static class ReadCompletion {
+    // visible for testing
+    static class ReadCompletion {
         final ReadEntryCallback cb;
         final Object ctx;
 
@@ -538,7 +547,8 @@ public class PerChannelBookieClient exte
         }
     }
 
-    private static class AddCompletion {
+    // visible for testing
+    static class AddCompletion {
         final WriteCallback cb;
         //final long size;
         final Object ctx;
@@ -549,8 +559,9 @@ public class PerChannelBookieClient exte
             this.ctx = ctx;
         }
     }
-
-    private static class CompletionKey {
+    
+    // visable for testing
+    static class CompletionKey {
         long ledgerId;
         long entryId;
 

Added: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestProtoVersions.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestProtoVersions.java?rev=1189753&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestProtoVersions.java
(added)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestProtoVersions.java
Thu Oct 27 13:41:19 2011
@@ -0,0 +1,113 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.bookkeeper.proto;
+
+import org.apache.bookkeeper.util.OrderedSafeExecutor;
+import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryCallback;
+import org.jboss.netty.buffer.ChannelBuffer;
+
+import org.apache.bookkeeper.client.BKException;
+
+import org.apache.bookkeeper.test.BaseTestCase;
+import org.apache.bookkeeper.test.BookieClientTest;
+import static org.junit.Assert.*;
+import org.junit.Test;
+import org.junit.Before;
+import org.junit.After;
+
+import java.util.concurrent.TimeUnit;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.net.InetSocketAddress;
+import java.net.InetAddress;
+
+public class TestProtoVersions {
+    private BookieClientTest base;
+
+    @Before
+    public void setup() throws Exception {
+        base = new BookieClientTest();
+        base.setUp();
+    }
+
+    @After
+    public void teardown() throws Exception {
+        base.tearDown();
+    }
+
+    private void testVersion(int version, int expectedresult) throws Exception {
+        PerChannelBookieClient bc = new PerChannelBookieClient(base.executor, base.channelFactory,

+                new InetSocketAddress(InetAddress.getLocalHost(), base.port), new AtomicLong(0));
+        final AtomicInteger outerrc = new AtomicInteger(-1);
+        final CountDownLatch connectLatch = new CountDownLatch(1);
+        bc.connectIfNeededAndDoOp(new GenericCallback<Void>() {
+                public void operationComplete(int rc, Void result) {
+                    outerrc.set(rc);
+                    connectLatch.countDown();
+                }
+            });
+        connectLatch.await(5, TimeUnit.SECONDS);
+        
+        assertEquals("client not connected", BKException.Code.OK, outerrc.get());
+        outerrc.set(-1);
+        final CountDownLatch readLatch = new CountDownLatch(1);
+        ReadEntryCallback cb = new ReadEntryCallback() {
+                public void readEntryComplete(int rc, long ledgerId, long entryId, ChannelBuffer
buffer, Object ctx) {
+                    outerrc.set(rc);
+                    readLatch.countDown();
+                }
+            };
+        bc.readCompletions.put(new PerChannelBookieClient.CompletionKey(1, 1), 
+                               new PerChannelBookieClient.ReadCompletion(cb, this));
+        
+        int totalHeaderSize = 4 // for the length of the packet
+            + 4 // for request type
+            + 8 // for ledgerId
+            + 8; // for entryId
+
+        // This will need to updated if the protocol for read changes
+        ChannelBuffer tmpEntry = bc.channel.getConfig().getBufferFactory().getBuffer(totalHeaderSize);
+        tmpEntry.writeInt(totalHeaderSize - 4);
+        tmpEntry.writeInt(new BookieProtocol.PacketHeader((byte)version, BookieProtocol.READENTRY,
(short)0).toInt());
+        tmpEntry.writeLong(1);
+        tmpEntry.writeLong(1);
+        
+        
+        bc.channel.write(tmpEntry).awaitUninterruptibly();
+        readLatch.await(5, TimeUnit.SECONDS);
+        assertEquals("Expected result differs", expectedresult, outerrc.get());
+        
+        bc.close();
+    }
+
+    @Test
+    public void testVersions() throws Exception {
+        testVersion(BookieProtocol.LOWEST_COMPAT_PROTOCOL_VERSION-1, BKException.Code.ProtocolVersionException);
+        testVersion(BookieProtocol.LOWEST_COMPAT_PROTOCOL_VERSION, BKException.Code.NoSuchEntryException);
+        testVersion(BookieProtocol.CURRENT_PROTOCOL_VERSION, BKException.Code.NoSuchEntryException);
+        testVersion(BookieProtocol.CURRENT_PROTOCOL_VERSION+1, BKException.Code.ProtocolVersionException);
+    }
+}
\ No newline at end of file

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieClientTest.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieClientTest.java?rev=1189753&r1=1189752&r2=1189753&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieClientTest.java
(original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieClientTest.java
Thu Oct 27 13:41:19 2011
@@ -46,12 +46,12 @@ public class BookieClientTest extends Te
     static Logger LOG = Logger.getLogger(BookieClientTest.class);
     BookieServer bs;
     File tmpDir;
-    int port = 13645;
-    ClientSocketChannelFactory channelFactory;
-    OrderedSafeExecutor executor;
+    public int port = 13645;
+    public ClientSocketChannelFactory channelFactory;
+    public OrderedSafeExecutor executor;
 
     @Override
-    protected void setUp() throws Exception {
+    public void setUp() throws Exception {
         tmpDir = File.createTempFile("bookie", "test");
         tmpDir.delete();
         tmpDir.mkdir();
@@ -66,7 +66,7 @@ public class BookieClientTest extends Te
     }
 
     @Override
-    protected void tearDown() throws Exception {
+    public void tearDown() throws Exception {
         bs.shutdown();
         recursiveDelete(tmpDir);
         channelFactory.releaseExternalResources();



Mime
View raw message