cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From slebre...@apache.org
Subject [2/4] git commit: Update native server to Netty 4
Date Thu, 20 Mar 2014 17:25:54 GMT
Update native server to Netty 4

patch by benedict; reviewed by slebresne for CASSANDRA-6236


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/cbf304eb
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/cbf304eb
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/cbf304eb

Branch: refs/heads/trunk
Commit: cbf304ebd0436a321753e81231545b705aa8dd23
Parents: 48c9db6
Author: belliottsmith <github@sub.laerad.com>
Authored: Fri Mar 14 09:36:17 2014 +0000
Committer: Sylvain Lebresne <sylvain@datastax.com>
Committed: Thu Mar 20 18:23:53 2014 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 lib/netty-3.6.6.Final.jar                       | Bin 1206119 -> 0 bytes
 lib/netty-all-4.0.17.Final.jar                  | Bin 0 -> 1613159 bytes
 .../org/apache/cassandra/cql3/QueryOptions.java |   6 +-
 .../org/apache/cassandra/cql3/ResultSet.java    |  10 +-
 .../org/apache/cassandra/transport/CBCodec.java |   6 +-
 .../org/apache/cassandra/transport/CBUtil.java  |  75 +++++++------
 .../apache/cassandra/transport/Connection.java  |   5 +-
 .../apache/cassandra/transport/DataType.java    |   6 +-
 .../org/apache/cassandra/transport/Event.java   |  20 ++--
 .../org/apache/cassandra/transport/Frame.java   | 104 ++++++++++---------
 .../cassandra/transport/FrameCompressor.java    |  10 +-
 .../org/apache/cassandra/transport/Message.java |  71 ++++++-------
 .../apache/cassandra/transport/OptionCodec.java |  18 ++--
 .../transport/RequestThreadPoolExecutor.java    | 101 +++++++++---------
 .../org/apache/cassandra/transport/Server.java  |  90 ++++++++--------
 .../cassandra/transport/ServerConnection.java   |   4 +-
 .../cassandra/transport/SimpleClient.java       |  93 ++++++++---------
 .../transport/messages/AuthChallenge.java       |   6 +-
 .../transport/messages/AuthResponse.java        |   6 +-
 .../transport/messages/AuthSuccess.java         |   6 +-
 .../transport/messages/AuthenticateMessage.java |   6 +-
 .../transport/messages/BatchMessage.java        |   6 +-
 .../transport/messages/CredentialsMessage.java  |   6 +-
 .../transport/messages/ErrorMessage.java        |   6 +-
 .../transport/messages/EventMessage.java        |   6 +-
 .../transport/messages/ExecuteMessage.java      |   6 +-
 .../transport/messages/OptionsMessage.java      |   6 +-
 .../transport/messages/PrepareMessage.java      |   6 +-
 .../transport/messages/QueryMessage.java        |   6 +-
 .../transport/messages/ReadyMessage.java        |   6 +-
 .../transport/messages/RegisterMessage.java     |   6 +-
 .../transport/messages/ResultMessage.java       |  26 ++---
 .../transport/messages/StartupMessage.java      |   6 +-
 .../transport/messages/SupportedMessage.java    |   6 +-
 .../cassandra/stress/util/JavaDriverClient.java |   6 +-
 36 files changed, 374 insertions(+), 374 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/cbf304eb/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 69c8cc2..7d3c5d2 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -25,6 +25,7 @@
  * Improve cleanup/scrub/upgradesstables failure handling (CASSANDRA-6774)
  * ByteBuffer write() methods for serializing sstables (CASSANDRA-6781)
  * Proper compare function for CollectionType (CASSANDRA-6783)
+ * Update native server to Netty 4 (CASSANDRA-6236)
 Merged from 2.0:
  * Add uuid() function (CASSANDRA-6473)
  * Omit tombstones from schema digests (CASSANDRA-6862)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cbf304eb/lib/netty-3.6.6.Final.jar
----------------------------------------------------------------------
diff --git a/lib/netty-3.6.6.Final.jar b/lib/netty-3.6.6.Final.jar
deleted file mode 100644
index 35cb073..0000000
Binary files a/lib/netty-3.6.6.Final.jar and /dev/null differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cbf304eb/lib/netty-all-4.0.17.Final.jar
----------------------------------------------------------------------
diff --git a/lib/netty-all-4.0.17.Final.jar b/lib/netty-all-4.0.17.Final.jar
new file mode 100644
index 0000000..baaa5b8
Binary files /dev/null and b/lib/netty-all-4.0.17.Final.jar differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cbf304eb/src/java/org/apache/cassandra/cql3/QueryOptions.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/QueryOptions.java b/src/java/org/apache/cassandra/cql3/QueryOptions.java
index 630c6a0..76b1eeb 100644
--- a/src/java/org/apache/cassandra/cql3/QueryOptions.java
+++ b/src/java/org/apache/cassandra/cql3/QueryOptions.java
@@ -22,7 +22,7 @@ import java.util.Collections;
 import java.util.EnumSet;
 import java.util.List;
 
-import org.jboss.netty.buffer.ChannelBuffer;
+import io.netty.buffer.ByteBuf;
 
 import org.apache.cassandra.db.ConsistencyLevel;
 import org.apache.cassandra.service.pager.PagingState;
@@ -174,7 +174,7 @@ public class QueryOptions
             }
         }
 
-        public QueryOptions decode(ChannelBuffer body, int version)
+        public QueryOptions decode(ByteBuf body, int version)
         {
             assert version >= 2;
 
@@ -200,7 +200,7 @@ public class QueryOptions
             return new QueryOptions(consistency, values, skipMetadata, options, version);
         }
 
-        public void encode(QueryOptions options, ChannelBuffer dest, int version)
+        public void encode(QueryOptions options, ByteBuf dest, int version)
         {
             assert version >= 2;
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cbf304eb/src/java/org/apache/cassandra/cql3/ResultSet.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/ResultSet.java b/src/java/org/apache/cassandra/cql3/ResultSet.java
index d49a3be..53ba380 100644
--- a/src/java/org/apache/cassandra/cql3/ResultSet.java
+++ b/src/java/org/apache/cassandra/cql3/ResultSet.java
@@ -20,7 +20,7 @@ package org.apache.cassandra.cql3;
 import java.nio.ByteBuffer;
 import java.util.*;
 
-import org.jboss.netty.buffer.ChannelBuffer;
+import io.netty.buffer.ByteBuf;
 
 import org.apache.cassandra.transport.*;
 import org.apache.cassandra.db.marshal.AbstractType;
@@ -195,7 +195,7 @@ public class ResultSet
          *   - rows count (4 bytes)
          *   - rows
          */
-        public ResultSet decode(ChannelBuffer body, int version)
+        public ResultSet decode(ByteBuf body, int version)
         {
             Metadata m = Metadata.codec.decode(body, version);
             int rowCount = body.readInt();
@@ -209,7 +209,7 @@ public class ResultSet
             return rs;
         }
 
-        public void encode(ResultSet rs, ChannelBuffer dest, int version)
+        public void encode(ResultSet rs, ByteBuf dest, int version)
         {
             Metadata.codec.encode(rs.metadata, dest, version);
             dest.writeInt(rs.rows.size());
@@ -341,7 +341,7 @@ public class ResultSet
 
         private static class Codec implements CBCodec<Metadata>
         {
-            public Metadata decode(ChannelBuffer body, int version)
+            public Metadata decode(ByteBuf body, int version)
             {
                 // flags & column count
                 int iflags = body.readInt();
@@ -379,7 +379,7 @@ public class ResultSet
                 return new Metadata(flags, names).setHasMorePages(state);
             }
 
-            public void encode(Metadata m, ChannelBuffer dest, int version)
+            public void encode(Metadata m, ByteBuf dest, int version)
             {
                 boolean noMetadata = m.flags.contains(Flag.NO_METADATA);
                 boolean globalTablesSpec = m.flags.contains(Flag.GLOBAL_TABLES_SPEC);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cbf304eb/src/java/org/apache/cassandra/transport/CBCodec.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/CBCodec.java b/src/java/org/apache/cassandra/transport/CBCodec.java
index 67b0cce..0ef619e 100644
--- a/src/java/org/apache/cassandra/transport/CBCodec.java
+++ b/src/java/org/apache/cassandra/transport/CBCodec.java
@@ -17,11 +17,11 @@
  */
 package org.apache.cassandra.transport;
 
-import org.jboss.netty.buffer.ChannelBuffer;
+import io.netty.buffer.ByteBuf;
 
 public interface CBCodec<T>
 {
-    public T decode(ChannelBuffer body, int version);
-    public void encode(T t, ChannelBuffer dest, int version);
+    public T decode(ByteBuf body, int version);
+    public void encode(T t, ByteBuf dest, int version);
     public int encodedSize(T t, int version);
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cbf304eb/src/java/org/apache/cassandra/transport/CBUtil.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/CBUtil.java b/src/java/org/apache/cassandra/transport/CBUtil.java
index 2816f47..e5222a1 100644
--- a/src/java/org/apache/cassandra/transport/CBUtil.java
+++ b/src/java/org/apache/cassandra/transport/CBUtil.java
@@ -29,25 +29,27 @@ import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 
-import org.jboss.netty.buffer.ChannelBuffer;
-import org.jboss.netty.util.CharsetUtil;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.util.AttributeKey;
+import io.netty.util.CharsetUtil;
 
 import org.apache.cassandra.db.ConsistencyLevel;
 import org.apache.cassandra.db.TypeSizes;
 import org.apache.cassandra.utils.UUIDGen;
 
 /**
- * ChannelBuffer utility methods.
+ * ByteBuf utility methods.
  * Note that contrarily to ByteBufferUtil, these method do "read" the
- * ChannelBuffer advancing it's (read) position. They also write by
+ * ByteBuf advancing it's (read) position. They also write by
  * advancing the write position. Functions are also provided to create
- * ChannelBuffer while avoiding copies.
+ * ByteBuf while avoiding copies.
  */
 public abstract class CBUtil
 {
     private CBUtil() {}
 
-    private static String readString(ChannelBuffer cb, int length)
+    private static String readString(ByteBuf cb, int length)
     {
         try
         {
@@ -65,7 +67,7 @@ public abstract class CBUtil
         }
     }
 
-    public static String readString(ChannelBuffer cb)
+    public static String readString(ByteBuf cb)
     {
         try
         {
@@ -78,7 +80,7 @@ public abstract class CBUtil
         }
     }
 
-    public static void writeString(String str, ChannelBuffer cb)
+    public static void writeString(String str, ByteBuf cb)
     {
         byte[] bytes = str.getBytes(CharsetUtil.UTF_8);
         cb.writeShort(bytes.length);
@@ -90,7 +92,7 @@ public abstract class CBUtil
         return 2 + TypeSizes.encodedUTF8Length(str);
     }
 
-    public static String readLongString(ChannelBuffer cb)
+    public static String readLongString(ByteBuf cb)
     {
         try
         {
@@ -103,7 +105,7 @@ public abstract class CBUtil
         }
     }
 
-    public static void writeLongString(String str, ChannelBuffer cb)
+    public static void writeLongString(String str, ByteBuf cb)
     {
         byte[] bytes = str.getBytes(CharsetUtil.UTF_8);
         cb.writeInt(bytes.length);
@@ -115,7 +117,7 @@ public abstract class CBUtil
         return 4 + str.getBytes(CharsetUtil.UTF_8).length;
     }
 
-    public static byte[] readBytes(ChannelBuffer cb)
+    public static byte[] readBytes(ByteBuf cb)
     {
         try
         {
@@ -130,7 +132,7 @@ public abstract class CBUtil
         }
     }
 
-    public static void writeBytes(byte[] bytes, ChannelBuffer cb)
+    public static void writeBytes(byte[] bytes, ByteBuf cb)
     {
         cb.writeShort(bytes.length);
         cb.writeBytes(bytes);
@@ -141,12 +143,12 @@ public abstract class CBUtil
         return 2 + bytes.length;
     }
 
-    public static ConsistencyLevel readConsistencyLevel(ChannelBuffer cb)
+    public static ConsistencyLevel readConsistencyLevel(ByteBuf cb)
     {
         return ConsistencyLevel.fromCode(cb.readUnsignedShort());
     }
 
-    public static void writeConsistencyLevel(ConsistencyLevel consistency, ChannelBuffer cb)
+    public static void writeConsistencyLevel(ConsistencyLevel consistency, ByteBuf cb)
     {
         cb.writeShort(consistency.code);
     }
@@ -156,7 +158,7 @@ public abstract class CBUtil
         return 2;
     }
 
-    public static <T extends Enum<T>> T readEnumValue(Class<T> enumType, ChannelBuffer cb)
+    public static <T extends Enum<T>> T readEnumValue(Class<T> enumType, ByteBuf cb)
     {
         String value = CBUtil.readString(cb);
         try
@@ -169,7 +171,7 @@ public abstract class CBUtil
         }
     }
 
-    public static <T extends Enum<T>> void writeEnumValue(T enumValue, ChannelBuffer cb)
+    public static <T extends Enum<T>> void writeEnumValue(T enumValue, ByteBuf cb)
     {
         writeString(enumValue.toString(), cb);
     }
@@ -179,14 +181,14 @@ public abstract class CBUtil
         return sizeOfString(enumValue.toString());
     }
 
-    public static UUID readUUID(ChannelBuffer cb)
+    public static UUID readUUID(ByteBuf cb)
     {
         byte[] bytes = new byte[16];
         cb.readBytes(bytes);
         return UUIDGen.getUUID(ByteBuffer.wrap(bytes));
     }
 
-    public static void writeUUID(UUID uuid, ChannelBuffer cb)
+    public static void writeUUID(UUID uuid, ByteBuf cb)
     {
         cb.writeBytes(UUIDGen.decompose(uuid));
     }
@@ -196,7 +198,7 @@ public abstract class CBUtil
         return 16;
     }
 
-    public static List<String> readStringList(ChannelBuffer cb)
+    public static List<String> readStringList(ByteBuf cb)
     {
         int length = cb.readUnsignedShort();
         List<String> l = new ArrayList<String>(length);
@@ -205,7 +207,7 @@ public abstract class CBUtil
         return l;
     }
 
-    public static void writeStringList(List<String> l, ChannelBuffer cb)
+    public static void writeStringList(List<String> l, ByteBuf cb)
     {
         cb.writeShort(l.size());
         for (String str : l)
@@ -220,7 +222,7 @@ public abstract class CBUtil
         return size;
     }
 
-    public static Map<String, String> readStringMap(ChannelBuffer cb)
+    public static Map<String, String> readStringMap(ByteBuf cb)
     {
         int length = cb.readUnsignedShort();
         Map<String, String> m = new HashMap<String, String>(length);
@@ -233,7 +235,7 @@ public abstract class CBUtil
         return m;
     }
 
-    public static void writeStringMap(Map<String, String> m, ChannelBuffer cb)
+    public static void writeStringMap(Map<String, String> m, ByteBuf cb)
     {
         cb.writeShort(m.size());
         for (Map.Entry<String, String> entry : m.entrySet())
@@ -254,7 +256,7 @@ public abstract class CBUtil
         return size;
     }
 
-    public static Map<String, List<String>> readStringToStringListMap(ChannelBuffer cb)
+    public static Map<String, List<String>> readStringToStringListMap(ByteBuf cb)
     {
         int length = cb.readUnsignedShort();
         Map<String, List<String>> m = new HashMap<String, List<String>>(length);
@@ -267,7 +269,7 @@ public abstract class CBUtil
         return m;
     }
 
-    public static void writeStringToStringListMap(Map<String, List<String>> m, ChannelBuffer cb)
+    public static void writeStringToStringListMap(Map<String, List<String>> m, ByteBuf cb)
     {
         cb.writeShort(m.size());
         for (Map.Entry<String, List<String>> entry : m.entrySet())
@@ -288,13 +290,19 @@ public abstract class CBUtil
         return size;
     }
 
-    public static ByteBuffer readValue(ChannelBuffer cb)
+    public static ByteBuffer readValue(ByteBuf cb)
     {
         int length = cb.readInt();
-        return length < 0 ? null : cb.readSlice(length).toByteBuffer();
+        if (length < 0)
+            return null;
+        ByteBuf slice = cb.readSlice(length);
+        if (slice.nioBufferCount() > 0)
+            return slice.nioBuffer();
+        else
+            return Unpooled.copiedBuffer(slice).nioBuffer();
     }
 
-    public static void writeValue(byte[] bytes, ChannelBuffer cb)
+    public static void writeValue(byte[] bytes, ByteBuf cb)
     {
         if (bytes == null)
         {
@@ -306,7 +314,7 @@ public abstract class CBUtil
         cb.writeBytes(bytes);
     }
 
-    public static void writeValue(ByteBuffer bytes, ChannelBuffer cb)
+    public static void writeValue(ByteBuffer bytes, ByteBuf cb)
     {
         if (bytes == null)
         {
@@ -328,7 +336,7 @@ public abstract class CBUtil
         return 4 + (bytes == null ? 0 : bytes.remaining());
     }
 
-    public static List<ByteBuffer> readValueList(ChannelBuffer cb)
+    public static List<ByteBuffer> readValueList(ByteBuf cb)
     {
         int size = cb.readUnsignedShort();
         if (size == 0)
@@ -340,7 +348,7 @@ public abstract class CBUtil
         return l;
     }
 
-    public static void writeValueList(List<ByteBuffer> values, ChannelBuffer cb)
+    public static void writeValueList(List<ByteBuffer> values, ByteBuf cb)
     {
         cb.writeShort(values.size());
         for (ByteBuffer value : values)
@@ -355,7 +363,7 @@ public abstract class CBUtil
         return size;
     }
 
-    public static InetSocketAddress readInet(ChannelBuffer cb)
+    public static InetSocketAddress readInet(ByteBuf cb)
     {
         int addrSize = cb.readByte();
         byte[] address = new byte[addrSize];
@@ -371,7 +379,7 @@ public abstract class CBUtil
         }
     }
 
-    public static void writeInet(InetSocketAddress inet, ChannelBuffer cb)
+    public static void writeInet(InetSocketAddress inet, ByteBuf cb)
     {
         byte[] address = inet.getAddress().getAddress();
 
@@ -390,7 +398,7 @@ public abstract class CBUtil
      * Reads *all* readable bytes from {@code cb} and return them.
      * If {@code cb} is backed by an array, this will return the underlying array directly, without copy.
      */
-    public static byte[] readRawBytes(ChannelBuffer cb)
+    public static byte[] readRawBytes(ByteBuf cb)
     {
         if (cb.hasArray() && cb.readableBytes() == cb.array().length)
         {
@@ -404,4 +412,5 @@ public abstract class CBUtil
         cb.readBytes(bytes);
         return bytes;
     }
+
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cbf304eb/src/java/org/apache/cassandra/transport/Connection.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/Connection.java b/src/java/org/apache/cassandra/transport/Connection.java
index a72402f..aa571a7 100644
--- a/src/java/org/apache/cassandra/transport/Connection.java
+++ b/src/java/org/apache/cassandra/transport/Connection.java
@@ -17,10 +17,13 @@
  */
 package org.apache.cassandra.transport;
 
-import org.jboss.netty.channel.Channel;
+import io.netty.channel.Channel;
+import io.netty.util.AttributeKey;
 
 public class Connection
 {
+    static final AttributeKey<Connection> attributeKey = AttributeKey.valueOf("CONN");
+
     private final Channel channel;
     private final int version;
     private final Tracker tracker;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cbf304eb/src/java/org/apache/cassandra/transport/DataType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/DataType.java b/src/java/org/apache/cassandra/transport/DataType.java
index 1656d24..f0b5d95 100644
--- a/src/java/org/apache/cassandra/transport/DataType.java
+++ b/src/java/org/apache/cassandra/transport/DataType.java
@@ -24,7 +24,7 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.List;
 
-import org.jboss.netty.buffer.ChannelBuffer;
+import io.netty.buffer.ByteBuf;
 
 import org.apache.cassandra.exceptions.RequestValidationException;
 import org.apache.cassandra.db.marshal.*;
@@ -78,7 +78,7 @@ public enum DataType implements OptionCodec.Codecable<DataType>
         return id;
     }
 
-    public Object readValue(ChannelBuffer cb)
+    public Object readValue(ByteBuf cb)
     {
         switch (this)
         {
@@ -98,7 +98,7 @@ public enum DataType implements OptionCodec.Codecable<DataType>
         }
     }
 
-    public void writeValue(Object value, ChannelBuffer cb)
+    public void writeValue(Object value, ByteBuf cb)
     {
         switch (this)
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cbf304eb/src/java/org/apache/cassandra/transport/Event.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/Event.java b/src/java/org/apache/cassandra/transport/Event.java
index d0b673b..242ad64 100644
--- a/src/java/org/apache/cassandra/transport/Event.java
+++ b/src/java/org/apache/cassandra/transport/Event.java
@@ -20,7 +20,7 @@ package org.apache.cassandra.transport;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 
-import org.jboss.netty.buffer.ChannelBuffer;
+import io.netty.buffer.ByteBuf;
 
 public abstract class Event
 {
@@ -33,7 +33,7 @@ public abstract class Event
         this.type = type;
     }
 
-    public static Event deserialize(ChannelBuffer cb)
+    public static Event deserialize(ByteBuf cb)
     {
         switch (CBUtil.readEnumValue(Type.class, cb))
         {
@@ -47,7 +47,7 @@ public abstract class Event
         throw new AssertionError();
     }
 
-    public void serialize(ChannelBuffer dest)
+    public void serialize(ByteBuf dest)
     {
         CBUtil.writeEnumValue(type, dest);
         serializeEvent(dest);
@@ -58,7 +58,7 @@ public abstract class Event
         return CBUtil.sizeOfEnumValue(type) + eventSerializedSize();
     }
 
-    protected abstract void serializeEvent(ChannelBuffer dest);
+    protected abstract void serializeEvent(ByteBuf dest);
     protected abstract int eventSerializedSize();
 
     public static class TopologyChange extends Event
@@ -91,14 +91,14 @@ public abstract class Event
         }
 
         // Assumes the type has already been deserialized
-        private static TopologyChange deserializeEvent(ChannelBuffer cb)
+        private static TopologyChange deserializeEvent(ByteBuf cb)
         {
             Change change = CBUtil.readEnumValue(Change.class, cb);
             InetSocketAddress node = CBUtil.readInet(cb);
             return new TopologyChange(change, node);
         }
 
-        protected void serializeEvent(ChannelBuffer dest)
+        protected void serializeEvent(ByteBuf dest)
         {
             CBUtil.writeEnumValue(change, dest);
             CBUtil.writeInet(node, dest);
@@ -141,14 +141,14 @@ public abstract class Event
         }
 
         // Assumes the type has already been deserialized
-        private static StatusChange deserializeEvent(ChannelBuffer cb)
+        private static StatusChange deserializeEvent(ByteBuf cb)
         {
             Status status = CBUtil.readEnumValue(Status.class, cb);
             InetSocketAddress node = CBUtil.readInet(cb);
             return new StatusChange(status, node);
         }
 
-        protected void serializeEvent(ChannelBuffer dest)
+        protected void serializeEvent(ByteBuf dest)
         {
             CBUtil.writeEnumValue(status, dest);
             CBUtil.writeInet(node, dest);
@@ -188,7 +188,7 @@ public abstract class Event
         }
 
         // Assumes the type has already been deserialized
-        private static SchemaChange deserializeEvent(ChannelBuffer cb)
+        private static SchemaChange deserializeEvent(ByteBuf cb)
         {
             Change change = CBUtil.readEnumValue(Change.class, cb);
             String keyspace = CBUtil.readString(cb);
@@ -196,7 +196,7 @@ public abstract class Event
             return new SchemaChange(change, keyspace, table);
         }
 
-        protected void serializeEvent(ChannelBuffer dest)
+        protected void serializeEvent(ByteBuf dest)
         {
             CBUtil.writeEnumValue(change, dest);
             CBUtil.writeString(keyspace, dest);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cbf304eb/src/java/org/apache/cassandra/transport/Frame.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/Frame.java b/src/java/org/apache/cassandra/transport/Frame.java
index e0f49e1..70fe150 100644
--- a/src/java/org/apache/cassandra/transport/Frame.java
+++ b/src/java/org/apache/cassandra/transport/Frame.java
@@ -20,13 +20,14 @@ package org.apache.cassandra.transport;
 
 import java.io.IOException;
 import java.util.EnumSet;
+import java.util.List;
 
-import org.jboss.netty.buffer.ChannelBuffer;
-import org.jboss.netty.buffer.ChannelBuffers;
-import org.jboss.netty.channel.*;
-import org.jboss.netty.handler.codec.oneone.OneToOneDecoder;
-import org.jboss.netty.handler.codec.oneone.OneToOneEncoder;
-import org.jboss.netty.handler.codec.frame.*;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.*;
+import io.netty.handler.codec.ByteToMessageDecoder;
+import io.netty.handler.codec.MessageToMessageDecoder;
+import io.netty.handler.codec.MessageToMessageEncoder;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.exceptions.InvalidRequestException;
@@ -35,7 +36,7 @@ import org.apache.cassandra.transport.messages.ErrorMessage;
 public class Frame
 {
     public final Header header;
-    public final ChannelBuffer body;
+    public final ByteBuf body;
 
     /**
      * On-wire frame.
@@ -48,13 +49,13 @@ public class Frame
      *   |                length                 |
      *   +---------+---------+---------+---------+
      */
-    private Frame(Header header, ChannelBuffer body)
+    private Frame(Header header, ByteBuf body)
     {
         this.header = header;
         this.body = body;
     }
 
-    public static Frame create(Message.Type type, int streamId, int version, EnumSet<Header.Flag> flags, ChannelBuffer body)
+    public static Frame create(Message.Type type, int streamId, int version, EnumSet<Header.Flag> flags, ByteBuf body)
     {
         Header header = new Header(version, flags, streamId, type);
         return new Frame(header, body);
@@ -113,12 +114,12 @@ public class Frame
         }
     }
 
-    public Frame with(ChannelBuffer newBody)
+    public Frame with(ByteBuf newBody)
     {
         return new Frame(header, newBody);
     }
 
-    public static class Decoder extends FrameDecoder
+    public static class Decoder extends ByteToMessageDecoder
     {
         private static final int MAX_FRAME_LENGTH = DatabaseDescriptor.getNativeTransportMaxFrameSize();
 
@@ -135,7 +136,7 @@ public class Frame
         }
 
         @Override
-        protected Object decode(ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer)
+        protected void decode(ChannelHandlerContext ctx, ByteBuf buffer, List<Object> results)
         throws Exception
         {
             if (discardingTooLongFrame)
@@ -144,12 +145,12 @@ public class Frame
                 // If we have discarded everything, throw the exception
                 if (bytesToDiscard <= 0)
                     fail();
-                return null;
+                return;
             }
 
             // Wait until we have read at least the header
             if (buffer.readableBytes() < Header.LENGTH)
-                return null;
+                return;
 
             int idx = buffer.readerIndex();
 
@@ -184,31 +185,32 @@ public class Frame
                 bytesToDiscard = discard(buffer, frameLength);
                 if (bytesToDiscard <= 0)
                     fail();
-                return null;
+                return;
             }
 
             // never overflows because it's less than the max frame length
             int frameLengthInt = (int) frameLength;
             if (buffer.readableBytes() < frameLengthInt)
-                return null;
+                return;
 
             // extract body
-            ChannelBuffer body = extractFrame(buffer, idx + Header.LENGTH, (int)bodyLength);
+            // TODO: do we need unpooled?
+            ByteBuf body = Unpooled.copiedBuffer(buffer.duplicate().slice(idx + Header.LENGTH, (int) bodyLength));
             buffer.readerIndex(idx + frameLengthInt);
 
-            Connection connection = (Connection)channel.getAttachment();
+            Connection connection = ctx.channel().attr(Connection.attributeKey).get();
             if (connection == null)
             {
                 // First message seen on this channel, attach the connection object
-                connection = factory.newConnection(channel, version);
-                channel.setAttachment(connection);
+                connection = factory.newConnection(ctx.channel(), version);
+                ctx.channel().attr(Connection.attributeKey).set(connection);
             }
             else if (connection.getVersion() != version)
             {
                 throw new ProtocolException(String.format("Invalid message version. Got %d but previous messages on this connection had version %d", version, connection.getVersion()));
             }
 
-            return new Frame(new Header(version, flags, streamId, type), body);
+            results.add(new Frame(new Header(version, flags, streamId, type), body));
         }
 
         private void fail()
@@ -223,23 +225,20 @@ public class Frame
     }
 
     // How much remains to be discarded
-    private static long discard(ChannelBuffer buffer, long remainingToDiscard)
+    private static long discard(ByteBuf buffer, long remainingToDiscard)
     {
         int availableToDiscard = (int) Math.min(remainingToDiscard, buffer.readableBytes());
         buffer.skipBytes(availableToDiscard);
         return remainingToDiscard - availableToDiscard;
     }
 
-    public static class Encoder extends OneToOneEncoder
+    @ChannelHandler.Sharable
+    public static class Encoder extends MessageToMessageEncoder<Frame>
     {
-        public Object encode(ChannelHandlerContext ctx, Channel channel, Object msg)
+        public void encode(ChannelHandlerContext ctx, Frame frame, List results)
         throws IOException
         {
-            assert msg instanceof Frame : "Expecting frame, got " + msg;
-
-            Frame frame = (Frame)msg;
-
-            ChannelBuffer header = ChannelBuffers.buffer(Frame.Header.LENGTH);
+            ByteBuf header = Unpooled.buffer(Frame.Header.LENGTH);
             Message.Type type = frame.header.type;
             header.writeByte(type.direction.addToVersion(frame.header.version));
             header.writeByte(Header.Flag.serialize(frame.header.flags));
@@ -247,52 +246,59 @@ public class Frame
             header.writeByte(type.opcode);
             header.writeInt(frame.body.readableBytes());
 
-            return ChannelBuffers.wrappedBuffer(header, frame.body);
+            results.add(Unpooled.wrappedBuffer(header, frame.body));
         }
     }
 
-    public static class Decompressor extends OneToOneDecoder
+    @ChannelHandler.Sharable
+    public static class Decompressor extends MessageToMessageDecoder<Frame>
     {
-        public Object decode(ChannelHandlerContext ctx, Channel channel, Object msg)
+        public void decode(ChannelHandlerContext ctx, Frame frame, List results)
         throws IOException
         {
-            assert msg instanceof Frame : "Expecting frame, got " + msg;
-
-            Frame frame = (Frame)msg;
-            Connection connection = (Connection)channel.getAttachment();
+            Connection connection = ctx.channel().attr(Connection.attributeKey).get();
 
             if (!frame.header.flags.contains(Header.Flag.COMPRESSED) || connection == null)
-                return frame;
+            {
+                results.add(frame);
+                return;
+            }
 
             FrameCompressor compressor = connection.getCompressor();
             if (compressor == null)
-                return frame;
+            {
+                results.add(frame);
+                return;
+            }
 
-            return compressor.decompress(frame);
+            results.add(compressor.decompress(frame));
         }
     }
 
-    public static class Compressor extends OneToOneEncoder
+    @ChannelHandler.Sharable
+    public static class Compressor extends MessageToMessageEncoder<Frame>
     {
-        public Object encode(ChannelHandlerContext ctx, Channel channel, Object msg)
+        public void encode(ChannelHandlerContext ctx, Frame frame, List results)
         throws IOException
         {
-            assert msg instanceof Frame : "Expecting frame, got " + msg;
-
-            Frame frame = (Frame)msg;
-            Connection connection = (Connection)channel.getAttachment();
+            Connection connection = ctx.channel().attr(Connection.attributeKey).get();
 
             // Never compress STARTUP messages
             if (frame.header.type == Message.Type.STARTUP || connection == null)
-                return frame;
+            {
+                results.add(frame);
+                return;
+            }
 
             FrameCompressor compressor = connection.getCompressor();
             if (compressor == null)
-                return frame;
+            {
+                results.add(frame);
+                return;
+            }
 
             frame.header.flags.add(Header.Flag.COMPRESSED);
-            return compressor.compress(frame);
-
+            results.add(compressor.compress(frame));
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cbf304eb/src/java/org/apache/cassandra/transport/FrameCompressor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/FrameCompressor.java b/src/java/org/apache/cassandra/transport/FrameCompressor.java
index a3c3848..3e8c555 100644
--- a/src/java/org/apache/cassandra/transport/FrameCompressor.java
+++ b/src/java/org/apache/cassandra/transport/FrameCompressor.java
@@ -19,7 +19,7 @@ package org.apache.cassandra.transport;
 
 import java.io.IOException;
 
-import org.jboss.netty.buffer.ChannelBuffers;
+import io.netty.buffer.Unpooled;
 import org.xerial.snappy.Snappy;
 import org.xerial.snappy.SnappyError;
 
@@ -77,7 +77,7 @@ public interface FrameCompressor
             byte[] output = new byte[Snappy.maxCompressedLength(input.length)];
 
             int written = Snappy.compress(input, 0, input.length, output, 0);
-            return frame.with(ChannelBuffers.wrappedBuffer(output, 0, written));
+            return frame.with(Unpooled.wrappedBuffer(output, 0, written));
         }
 
         public Frame decompress(Frame frame) throws IOException
@@ -89,7 +89,7 @@ public interface FrameCompressor
 
             byte[] output = new byte[Snappy.uncompressedLength(input)];
             int size = Snappy.uncompress(input, 0, input.length, output, 0);
-            return frame.with(ChannelBuffers.wrappedBuffer(output, 0, size));
+            return frame.with(Unpooled.wrappedBuffer(output, 0, size));
         }
     }
 
@@ -131,7 +131,7 @@ public interface FrameCompressor
             try
             {
                 int written = compressor.compress(input, 0, input.length, output, INTEGER_BYTES, maxCompressedLength);
-                return frame.with(ChannelBuffers.wrappedBuffer(output, 0, INTEGER_BYTES + written));
+                return frame.with(Unpooled.wrappedBuffer(output, 0, INTEGER_BYTES + written));
             }
             catch (LZ4Exception e)
             {
@@ -156,7 +156,7 @@ public interface FrameCompressor
                 if (read != input.length - INTEGER_BYTES)
                     throw new IOException("Compressed lengths mismatch");
 
-                return frame.with(ChannelBuffers.wrappedBuffer(output));
+                return frame.with(Unpooled.wrappedBuffer(output));
             }
             catch (LZ4Exception e)
             {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cbf304eb/src/java/org/apache/cassandra/transport/Message.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/Message.java b/src/java/org/apache/cassandra/transport/Message.java
index 0731081..32ae181 100644
--- a/src/java/org/apache/cassandra/transport/Message.java
+++ b/src/java/org/apache/cassandra/transport/Message.java
@@ -18,13 +18,14 @@
 package org.apache.cassandra.transport;
 
 import java.util.EnumSet;
+import java.util.List;
 import java.util.UUID;
 
-import org.jboss.netty.buffer.ChannelBuffer;
-import org.jboss.netty.buffer.ChannelBuffers;
-import org.jboss.netty.channel.*;
-import org.jboss.netty.handler.codec.oneone.OneToOneDecoder;
-import org.jboss.netty.handler.codec.oneone.OneToOneEncoder;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.*;
+import io.netty.handler.codec.MessageToMessageDecoder;
+import io.netty.handler.codec.MessageToMessageEncoder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -195,13 +196,11 @@ public abstract class Message
         }
     }
 
-    public static class ProtocolDecoder extends OneToOneDecoder
+    @ChannelHandler.Sharable
+    public static class ProtocolDecoder extends MessageToMessageDecoder<Frame>
     {
-        public Object decode(ChannelHandlerContext ctx, Channel channel, Object msg)
+        public void decode(ChannelHandlerContext ctx, Frame frame, List results)
         {
-            assert msg instanceof Frame : "Expecting frame, got " + msg;
-
-            Frame frame = (Frame)msg;
             boolean isRequest = frame.header.type.direction == Direction.REQUEST;
             boolean isTracing = frame.header.flags.contains(Frame.Header.Flag.TRACING);
 
@@ -216,7 +215,8 @@ public abstract class Message
                 {
                     assert message instanceof Request;
                     Request req = (Request)message;
-                    req.attach((Connection)channel.getAttachment());
+                    Connection connection = ctx.channel().attr(Connection.attributeKey).get();
+                    req.attach(connection);
                     if (isTracing)
                         req.setTracingRequested();
                 }
@@ -227,7 +227,7 @@ public abstract class Message
                         ((Response)message).setTracingId(tracingId);
                 }
 
-                return message;
+                results.add(message);
             }
             catch (Exception ex)
             {
@@ -237,15 +237,12 @@ public abstract class Message
         }
     }
 
-    public static class ProtocolEncoder extends OneToOneEncoder
+    @ChannelHandler.Sharable
+    public static class ProtocolEncoder extends MessageToMessageEncoder<Message>
     {
-        public Object encode(ChannelHandlerContext ctx, Channel channel, Object msg)
+        public void encode(ChannelHandlerContext ctx, Message message, List results)
         {
-            assert msg instanceof Message : "Expecting message, got " + msg;
-
-            Message message = (Message)msg;
-
-            Connection connection = (Connection)channel.getAttachment();
+            Connection connection = ctx.channel().attr(Connection.attributeKey).get();
             // The only case the connection can be null is when we send the initial STARTUP message (client side thus)
             int version = connection == null ? Server.CURRENT_VERSION : connection.getVersion();
 
@@ -253,46 +250,40 @@ public abstract class Message
 
             Codec<Message> codec = (Codec<Message>)message.type.codec;
             int messageSize = codec.encodedSize(message, version);
-            ChannelBuffer body;
+            ByteBuf body;
             if (message instanceof Response)
             {
                 UUID tracingId = ((Response)message).getTracingId();
                 if (tracingId != null)
                 {
-                    body = ChannelBuffers.buffer(CBUtil.sizeOfUUID(tracingId) + messageSize);
+                    body = Unpooled.buffer(CBUtil.sizeOfUUID(tracingId) + messageSize);
                     CBUtil.writeUUID(tracingId, body);
                     flags.add(Frame.Header.Flag.TRACING);
                 }
                 else
                 {
-                    body = ChannelBuffers.buffer(messageSize);
+                    body = Unpooled.buffer(messageSize);
                 }
             }
             else
             {
                 assert message instanceof Request;
-                body = ChannelBuffers.buffer(messageSize);
+                body = Unpooled.buffer(messageSize);
                 if (((Request)message).isTracingRequested())
                     flags.add(Frame.Header.Flag.TRACING);
             }
 
             codec.encode(message, body, version);
-            return Frame.create(message.type, message.getStreamId(), version, flags, body);
+            results.add(Frame.create(message.type, message.getStreamId(), version, flags, body));
         }
     }
 
-    public static class Dispatcher extends SimpleChannelUpstreamHandler
+    @ChannelHandler.Sharable
+    public static class Dispatcher extends SimpleChannelInboundHandler<Request>
     {
         @Override
-        public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
+        public void channelRead0(ChannelHandlerContext ctx, Request request)
         {
-            assert e.getMessage() instanceof Message : "Expecting message, got " + e.getMessage();
-
-            if (e.getMessage() instanceof Response)
-                throw new ProtocolException("Invalid response message received, expecting requests");
-
-            Request request = (Request)e.getMessage();
-
             try
             {
                 assert request.connection() instanceof ServerConnection;
@@ -308,28 +299,28 @@ public abstract class Message
 
                 logger.debug("Responding: {}, v={}", response, connection.getVersion());
 
-                ctx.getChannel().write(response);
+                ctx.channel().writeAndFlush(response);
             }
             catch (Exception ex)
             {
                 // Don't let the exception propagate to exceptionCaught() if we can help it so that we can assign the right streamID.
-                ctx.getChannel().write(ErrorMessage.fromException(ex).setStreamId(request.getStreamId()));
+                ctx.channel().writeAndFlush(ErrorMessage.fromException(ex).setStreamId(request.getStreamId()));
             }
         }
 
         @Override
-        public void exceptionCaught(final ChannelHandlerContext ctx, ExceptionEvent e)
+        public void exceptionCaught(final ChannelHandlerContext ctx, Throwable cause)
         throws Exception
         {
-            if (ctx.getChannel().isOpen())
+            if (ctx.channel().isOpen())
             {
-                ChannelFuture future = ctx.getChannel().write(ErrorMessage.fromException(e.getCause()));
+                ChannelFuture future = ctx.channel().writeAndFlush(ErrorMessage.fromException(cause));
                 // On protocol exception, close the channel as soon as the message have been sent
-                if (e.getCause() instanceof ProtocolException)
+                if (cause instanceof ProtocolException)
                 {
                     future.addListener(new ChannelFutureListener() {
                         public void operationComplete(ChannelFuture future) {
-                            ctx.getChannel().close();
+                            ctx.channel().close();
                         }
                     });
                 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cbf304eb/src/java/org/apache/cassandra/transport/OptionCodec.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/OptionCodec.java b/src/java/org/apache/cassandra/transport/OptionCodec.java
index c562889..9b82bda 100644
--- a/src/java/org/apache/cassandra/transport/OptionCodec.java
+++ b/src/java/org/apache/cassandra/transport/OptionCodec.java
@@ -21,9 +21,9 @@ import java.lang.reflect.Array;
 import java.util.EnumMap;
 import java.util.Map;
 
-import org.jboss.netty.buffer.ChannelBuffer;
-import org.jboss.netty.buffer.ChannelBuffers;
+import io.netty.buffer.ByteBuf;
 
+import io.netty.buffer.Unpooled;
 import org.apache.cassandra.utils.Pair;
 
 public class OptionCodec<T extends Enum<T> & OptionCodec.Codecable<T>>
@@ -32,8 +32,8 @@ public class OptionCodec<T extends Enum<T> & OptionCodec.Codecable<T>>
     {
         public int getId();
 
-        public Object readValue(ChannelBuffer cb);
-        public void writeValue(Object value, ChannelBuffer cb);
+        public Object readValue(ByteBuf cb);
+        public void writeValue(Object value, ByteBuf cb);
         public int serializedValueSize(Object obj);
     }
 
@@ -66,7 +66,7 @@ public class OptionCodec<T extends Enum<T> & OptionCodec.Codecable<T>>
         return opt;
     }
 
-    public Map<T, Object> decode(ChannelBuffer body)
+    public Map<T, Object> decode(ByteBuf body)
     {
         EnumMap<T, Object> options = new EnumMap<T, Object>(klass);
         int n = body.readUnsignedShort();
@@ -81,12 +81,12 @@ public class OptionCodec<T extends Enum<T> & OptionCodec.Codecable<T>>
         return options;
     }
 
-    public ChannelBuffer encode(Map<T, Object> options)
+    public ByteBuf encode(Map<T, Object> options)
     {
         int optLength = 2;
         for (Map.Entry<T, Object> entry : options.entrySet())
             optLength += 2 + entry.getKey().serializedValueSize(entry.getValue());
-        ChannelBuffer cb = ChannelBuffers.buffer(optLength);
+        ByteBuf cb = Unpooled.buffer(optLength);
         cb.writeShort(options.size());
         for (Map.Entry<T, Object> entry : options.entrySet())
         {
@@ -97,14 +97,14 @@ public class OptionCodec<T extends Enum<T> & OptionCodec.Codecable<T>>
         return cb;
     }
 
-    public Pair<T, Object> decodeOne(ChannelBuffer body)
+    public Pair<T, Object> decodeOne(ByteBuf body)
     {
         T opt = fromId(body.readUnsignedShort());
         Object value = opt.readValue(body);
         return Pair.create(opt, value);
     }
 
-    public void writeOne(Pair<T, Object> option, ChannelBuffer dest)
+    public void writeOne(Pair<T, Object> option, ByteBuf dest)
     {
         T opt = option.left;
         Object obj = option.right;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cbf304eb/src/java/org/apache/cassandra/transport/RequestThreadPoolExecutor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/RequestThreadPoolExecutor.java b/src/java/org/apache/cassandra/transport/RequestThreadPoolExecutor.java
index c2e6033..ee7d127 100644
--- a/src/java/org/apache/cassandra/transport/RequestThreadPoolExecutor.java
+++ b/src/java/org/apache/cassandra/transport/RequestThreadPoolExecutor.java
@@ -18,85 +18,84 @@
 package org.apache.cassandra.transport;
 
 import java.util.List;
+import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 
-import org.jboss.netty.handler.execution.MemoryAwareThreadPoolExecutor;
-import org.jboss.netty.util.ObjectSizeEstimator;
-import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
+import io.netty.util.concurrent.AbstractEventExecutor;
+import io.netty.util.concurrent.EventExecutorGroup;
+import io.netty.util.concurrent.Future;
+import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
 import org.apache.cassandra.concurrent.NamedThreadFactory;
 import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.metrics.ThreadPoolMetrics;
 
-public class RequestThreadPoolExecutor extends MemoryAwareThreadPoolExecutor
+public class RequestThreadPoolExecutor extends AbstractEventExecutor
 {
+
     private final static int CORE_THREAD_TIMEOUT_SEC = 30;
     // Number of request we accept to queue before blocking. We could allow this to be configured...
     private final static int MAX_QUEUED_REQUESTS = 128;
 
     private final static String THREAD_FACTORY_ID = "Native-Transport-Requests";
+    private final JMXEnabledThreadPoolExecutor wrapped = new JMXEnabledThreadPoolExecutor(DatabaseDescriptor.getNativeTransportMaxThreads(),
+                                                                                          CORE_THREAD_TIMEOUT_SEC, TimeUnit.SECONDS,
+                                                                                          new LinkedBlockingQueue<Runnable>(MAX_QUEUED_REQUESTS),
+                                                                                          new NamedThreadFactory(THREAD_FACTORY_ID),
+                                                                                          "transport");
 
-    private final ThreadPoolMetrics metrics;
-
-    public RequestThreadPoolExecutor()
+    public boolean isShuttingDown()
     {
-        super(DatabaseDescriptor.getNativeTransportMaxThreads(),
-              0, // We don't use the per-channel limit, only the global one
-              MAX_QUEUED_REQUESTS,
-              CORE_THREAD_TIMEOUT_SEC, TimeUnit.SECONDS,
-              sizeEstimator(),
-              new NamedThreadFactory(THREAD_FACTORY_ID));
-        metrics = new ThreadPoolMetrics(this, "transport", THREAD_FACTORY_ID);
+        return wrapped.isShutdown();
     }
 
-    /*
-     * In theory, the ObjectSizeEstimator should estimate the actual size of a
-     * request, and MemoryAwareThreadPoolExecutor sets a memory limit on how
-     * much memory we allow for request before blocking.
-     *
-     * However, the memory size used by a CQL query is not very intersting and
-     * by no mean reflect the memory size it's execution will use (the interesting part).
-     * Furthermore, we're mainly interested in limiting the number of unhandled requests that
-     * piles up to implement some back-pressure, and for that, there is no real need to do
-     * fancy esimation of request size. So we use a trivial estimator that just count the
-     * number of request.
-     *
-     * We could get more fancy later ...
-     */
-    private static ObjectSizeEstimator sizeEstimator()
+    public Future<?> shutdownGracefully(long l, long l2, TimeUnit timeUnit)
     {
-        return new ObjectSizeEstimator()
-        {
-            public int estimateSize(Object o)
-            {
-                return 1;
-            }
-        };
+        throw new IllegalStateException();
     }
 
-    @Override
-    protected void afterExecute(Runnable r, Throwable t)
+    public Future<?> terminationFuture()
     {
-        super.afterExecute(r, t);
-        DebuggableThreadPoolExecutor.logExceptionsAfterExecute(r, t);
+        throw new IllegalStateException();
     }
 
     @Override
     public void shutdown()
     {
-        if (!isShutdown())
-        {
-            metrics.release();
-        }
-        super.shutdown();
+        wrapped.shutdown();
     }
 
     @Override
     public List<Runnable> shutdownNow()
     {
-        if (!isShutdown())
-        {
-            metrics.release();
-        }
-        return super.shutdownNow();
+        return wrapped.shutdownNow();
+    }
+
+    public boolean isShutdown()
+    {
+        return wrapped.isShutdown();
+    }
+
+    public boolean isTerminated()
+    {
+        return wrapped.isTerminated();
+    }
+
+    public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException
+    {
+        return wrapped.awaitTermination(timeout, unit);
+    }
+
+    public EventExecutorGroup parent()
+    {
+        return null;
+    }
+
+    public boolean inEventLoop(Thread thread)
+    {
+        return false;
+    }
+
+    public void execute(Runnable command)
+    {
+        wrapped.execute(command);
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cbf304eb/src/java/org/apache/cassandra/transport/Server.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/Server.java b/src/java/org/apache/cassandra/transport/Server.java
index 3e1588a..8d08ffd 100644
--- a/src/java/org/apache/cassandra/transport/Server.java
+++ b/src/java/org/apache/cassandra/transport/Server.java
@@ -23,7 +23,6 @@ import java.net.InetSocketAddress;
 import java.net.UnknownHostException;
 import java.util.EnumMap;
 import java.util.concurrent.Callable;
-import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicBoolean;
 import javax.net.ssl.SSLContext;
 import javax.net.ssl.SSLEngine;
@@ -31,6 +30,12 @@ import javax.net.ssl.SSLEngine;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.util.concurrent.EventExecutor;
+import io.netty.util.concurrent.GlobalEventExecutor;
+import io.netty.util.internal.logging.InternalLoggerFactory;
+import io.netty.util.internal.logging.Slf4JLoggerFactory;
 import org.apache.cassandra.auth.IAuthenticator;
 import org.apache.cassandra.auth.ISaslAwareAuthenticator;
 import org.apache.cassandra.config.DatabaseDescriptor;
@@ -39,15 +44,11 @@ import org.apache.cassandra.metrics.ClientMetrics;
 import org.apache.cassandra.security.SSLFactory;
 import org.apache.cassandra.service.*;
 import org.apache.cassandra.transport.messages.EventMessage;
-import org.jboss.netty.bootstrap.ServerBootstrap;
-import org.jboss.netty.channel.*;
-import org.jboss.netty.channel.group.ChannelGroup;
-import org.jboss.netty.channel.group.DefaultChannelGroup;
-import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
-import org.jboss.netty.handler.execution.ExecutionHandler;
-import org.jboss.netty.handler.ssl.SslHandler;
-import org.jboss.netty.logging.InternalLoggerFactory;
-import org.jboss.netty.logging.Slf4JLoggerFactory;
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.channel.*;
+import io.netty.channel.group.ChannelGroup;
+import io.netty.channel.group.DefaultChannelGroup;
+import io.netty.handler.ssl.SslHandler;
 
 public class Server implements CassandraDaemon.Server
 {
@@ -74,8 +75,8 @@ public class Server implements CassandraDaemon.Server
     public final InetSocketAddress socket;
     private final AtomicBoolean isRunning = new AtomicBoolean(false);
 
-    private ChannelFactory factory;
-    private ExecutionHandler executionHandler;
+    private EventLoopGroup bossGroup, workerGroup;
+    private EventExecutor eventExecutorGroup;
 
     public Server(InetSocketAddress socket)
     {
@@ -105,7 +106,7 @@ public class Server implements CassandraDaemon.Server
     {
 	    if(!isRunning())
 	    {
-                run();
+            run();
 	    }
     }
 
@@ -133,27 +134,30 @@ public class Server implements CassandraDaemon.Server
         }
 
         // Configure the server.
-        executionHandler = new ExecutionHandler(new RequestThreadPoolExecutor());
-        factory = new NioServerSocketChannelFactory(Executors.newCachedThreadPool(), Executors.newCachedThreadPool());
-        ServerBootstrap bootstrap = new ServerBootstrap(factory);
+        eventExecutorGroup = new RequestThreadPoolExecutor();
+        bossGroup = new NioEventLoopGroup();
+        workerGroup = new NioEventLoopGroup();
 
-        bootstrap.setOption("child.tcpNoDelay", true);
+        ServerBootstrap bootstrap = new ServerBootstrap()
+                                    .group(bossGroup, workerGroup)
+                                    .channel(NioServerSocketChannel.class)
+                                    .childOption(ChannelOption.TCP_NODELAY, true);
 
         // Set up the event pipeline factory.
         final EncryptionOptions.ClientEncryptionOptions clientEnc = DatabaseDescriptor.getClientEncryptionOptions();
         if (clientEnc.enabled)
         {
             logger.info("Enabling encrypted CQL connections between client and server");
-            bootstrap.setPipelineFactory(new SecurePipelineFactory(this, clientEnc));
+            bootstrap.childHandler(new SecureInitializer(this, clientEnc));
         }
         else
         {
-            bootstrap.setPipelineFactory(new PipelineFactory(this));
+            bootstrap.childHandler(new Initializer(this));
         }
 
         // Bind and start to accept incoming connections.
         logger.info("Starting listening for CQL clients on {}...", socket);
-        Channel channel = bootstrap.bind(socket);
+        Channel channel = bootstrap.bind(socket).channel();
         connectionTracker.allChannels.add(channel);
         isRunning.set(true);
     }
@@ -174,23 +178,27 @@ public class Server implements CassandraDaemon.Server
     {
         // Close opened connections
         connectionTracker.closeAll();
-        factory.releaseExternalResources();
-        factory = null;
-        executionHandler.releaseExternalResources();
-        executionHandler = null;
+        bossGroup.shutdownGracefully();
+        workerGroup.shutdownGracefully();
+        bossGroup = null;
+        workerGroup = null;
+
+        eventExecutorGroup.shutdown();
+        eventExecutorGroup = null;
         logger.info("Stop listening for CQL clients");
     }
 
 
     public static class ConnectionTracker implements Connection.Tracker
     {
-        public final ChannelGroup allChannels = new DefaultChannelGroup();
+        // TODO: should we be using the GlobalEventExecutor or defining our own?
+        public final ChannelGroup allChannels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
         private final EnumMap<Event.Type, ChannelGroup> groups = new EnumMap<Event.Type, ChannelGroup>(Event.Type.class);
 
         public ConnectionTracker()
         {
             for (Event.Type type : Event.Type.values())
-                groups.put(type, new DefaultChannelGroup(type.toString()));
+                groups.put(type, new DefaultChannelGroup(type.toString(), GlobalEventExecutor.INSTANCE));
         }
 
         public void addConnection(Channel ch, Connection connection)
@@ -211,7 +219,7 @@ public class Server implements CassandraDaemon.Server
 
         public void send(Event event)
         {
-            groups.get(event.type).write(new EventMessage(event));
+            groups.get(event.type).writeAndFlush(new EventMessage(event));
         }
 
         public void closeAll()
@@ -230,7 +238,7 @@ public class Server implements CassandraDaemon.Server
         }
     }
 
-    private static class PipelineFactory implements ChannelPipelineFactory
+    private static class Initializer extends ChannelInitializer
     {
         // Stateless handlers
         private static final Message.ProtocolDecoder messageDecoder = new Message.ProtocolDecoder();
@@ -242,14 +250,14 @@ public class Server implements CassandraDaemon.Server
 
         private final Server server;
 
-        public PipelineFactory(Server server)
+        public Initializer(Server server)
         {
             this.server = server;
         }
 
-        public ChannelPipeline getPipeline() throws Exception
+        protected void initChannel(Channel channel) throws Exception
         {
-            ChannelPipeline pipeline = Channels.pipeline();
+            ChannelPipeline pipeline = channel.pipeline();
 
             //pipeline.addLast("debug", new LoggingHandler());
 
@@ -262,20 +270,16 @@ public class Server implements CassandraDaemon.Server
             pipeline.addLast("messageDecoder", messageDecoder);
             pipeline.addLast("messageEncoder", messageEncoder);
 
-            pipeline.addLast("executor", server.executionHandler);
-
-            pipeline.addLast("dispatcher", dispatcher);
-
-            return pipeline;
+            pipeline.addLast(server.eventExecutorGroup, "executor", dispatcher);
         }
     }
 
-    private static class SecurePipelineFactory extends PipelineFactory
+    private static class SecureInitializer extends Initializer
     {
         private final SSLContext sslContext;
         private final EncryptionOptions encryptionOptions;
 
-        public SecurePipelineFactory(Server server, EncryptionOptions encryptionOptions)
+        public SecureInitializer(Server server, EncryptionOptions encryptionOptions)
         {
             super(server);
             this.encryptionOptions = encryptionOptions;
@@ -289,18 +293,16 @@ public class Server implements CassandraDaemon.Server
             }
         }
 
-        public ChannelPipeline getPipeline() throws Exception
+        protected void initChannel(Channel channel) throws Exception
         {
             SSLEngine sslEngine = sslContext.createSSLEngine();
             sslEngine.setUseClientMode(false);
             sslEngine.setEnabledCipherSuites(encryptionOptions.cipher_suites);
             sslEngine.setNeedClientAuth(encryptionOptions.require_client_auth);
-            
+
             SslHandler sslHandler = new SslHandler(sslEngine);
-            sslHandler.setIssueHandshake(true);
-            ChannelPipeline pipeline = super.getPipeline();
-            pipeline.addFirst("ssl", sslHandler);
-            return pipeline;
+            super.initChannel(channel);
+            channel.pipeline().addFirst("ssl", sslHandler);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cbf304eb/src/java/org/apache/cassandra/transport/ServerConnection.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/ServerConnection.java b/src/java/org/apache/cassandra/transport/ServerConnection.java
index 9bc07cb..b28866f 100644
--- a/src/java/org/apache/cassandra/transport/ServerConnection.java
+++ b/src/java/org/apache/cassandra/transport/ServerConnection.java
@@ -19,7 +19,7 @@ package org.apache.cassandra.transport;
 
 import java.util.concurrent.ConcurrentMap;
 
-import org.jboss.netty.channel.Channel;
+import io.netty.channel.Channel;
 
 import org.apache.cassandra.auth.IAuthenticator;
 import org.apache.cassandra.auth.ISaslAwareAuthenticator;
@@ -43,7 +43,7 @@ public class ServerConnection extends Connection
     public ServerConnection(Channel channel, int version, Connection.Tracker tracker)
     {
         super(channel, version, tracker);
-        this.clientState = ClientState.forExternalCalls(channel.getRemoteAddress());
+        this.clientState = ClientState.forExternalCalls(channel.remoteAddress());
         this.state = State.UNINITIALIZED;
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cbf304eb/src/java/org/apache/cassandra/transport/SimpleClient.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/SimpleClient.java b/src/java/org/apache/cassandra/transport/SimpleClient.java
index 5f2efda..ef56882 100644
--- a/src/java/org/apache/cassandra/transport/SimpleClient.java
+++ b/src/java/org/apache/cassandra/transport/SimpleClient.java
@@ -33,6 +33,15 @@ import javax.net.ssl.SSLEngine;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import io.netty.bootstrap.Bootstrap;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.SimpleChannelInboundHandler;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.util.internal.logging.InternalLoggerFactory;
+import io.netty.util.internal.logging.Slf4JLoggerFactory;
 import org.apache.cassandra.cql3.QueryOptions;
 import org.apache.cassandra.db.ConsistencyLevel;
 import org.apache.cassandra.security.SSLFactory;
@@ -44,20 +53,11 @@ import org.apache.cassandra.transport.messages.QueryMessage;
 import org.apache.cassandra.transport.messages.ResultMessage;
 import org.apache.cassandra.transport.messages.StartupMessage;
 import org.apache.cassandra.utils.MD5Digest;
-import org.jboss.netty.bootstrap.ClientBootstrap;
-import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.ChannelFuture;
-import org.jboss.netty.channel.ChannelHandlerContext;
-import org.jboss.netty.channel.ChannelPipeline;
-import org.jboss.netty.channel.ChannelPipelineFactory;
-import org.jboss.netty.channel.Channels;
-import org.jboss.netty.channel.ExceptionEvent;
-import org.jboss.netty.channel.MessageEvent;
-import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
-import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
-import org.jboss.netty.handler.ssl.SslHandler;
-import org.jboss.netty.logging.InternalLoggerFactory;
-import org.jboss.netty.logging.Slf4JLoggerFactory;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelPipeline;
+import io.netty.handler.ssl.SslHandler;
 import static org.apache.cassandra.config.EncryptionOptions.ClientEncryptionOptions;
 
 public class SimpleClient
@@ -76,7 +76,7 @@ public class SimpleClient
     protected final Connection.Tracker tracker = new ConnectionTracker();
     // We don't track connection really, so we don't need one Connection per channel
     protected final Connection connection = new Connection(null, Server.CURRENT_VERSION, tracker);
-    protected ClientBootstrap bootstrap;
+    protected Bootstrap bootstrap;
     protected Channel channel;
     protected ChannelFuture lastWriteFuture;
 
@@ -118,30 +118,28 @@ public class SimpleClient
     protected void establishConnection() throws IOException
     {
         // Configure the client.
-        bootstrap = new ClientBootstrap(
-                        new NioClientSocketChannelFactory(
-                            Executors.newCachedThreadPool(),
-                            Executors.newCachedThreadPool()));
-
-        bootstrap.setOption("tcpNoDelay", true);
+        bootstrap = new Bootstrap()
+                    .group(new NioEventLoopGroup())
+                    .channel(io.netty.channel.socket.nio.NioSocketChannel.class)
+                    .option(ChannelOption.TCP_NODELAY, true);
 
         // Configure the pipeline factory.
         if(encryptionOptions.enabled)
         {
-            bootstrap.setPipelineFactory(new SecurePipelineFactory());
+            bootstrap.handler(new SecureInitializer());
         }
         else
         {
-            bootstrap.setPipelineFactory(new PipelineFactory());
+            bootstrap.handler(new Initializer());
         }
         ChannelFuture future = bootstrap.connect(new InetSocketAddress(host, port));
 
         // Wait until the connection attempt succeeds or fails.
-        channel = future.awaitUninterruptibly().getChannel();
+        channel = future.awaitUninterruptibly().channel();
         if (!future.isSuccess())
         {
-            bootstrap.releaseExternalResources();
-            throw new IOException("Connection Error", future.getCause());
+            bootstrap.group().shutdownGracefully();
+            throw new IOException("Connection Error", future.cause());
         }
     }
 
@@ -189,7 +187,7 @@ public class SimpleClient
         channel.close().awaitUninterruptibly();
 
         // Shut down all thread pools to exit.
-        bootstrap.releaseExternalResources();
+        bootstrap.group().shutdownGracefully();
     }
 
     protected Message.Response execute(Message.Request request)
@@ -197,7 +195,7 @@ public class SimpleClient
         try
         {
             request.attach(connection);
-            lastWriteFuture = channel.write(request);
+            lastWriteFuture = channel.writeAndFlush(request);
             Message.Response msg = responseHandler.responses.take();
             if (msg instanceof ErrorMessage)
                 throw new RuntimeException((Throwable)((ErrorMessage)msg).error);
@@ -222,14 +220,11 @@ public class SimpleClient
         public void closeAll() {}
     }
 
-    private class PipelineFactory implements ChannelPipelineFactory
+    private class Initializer extends ChannelInitializer<Channel>
     {
-        public ChannelPipeline getPipeline() throws Exception
+        protected void initChannel(Channel channel) throws Exception
         {
-            ChannelPipeline pipeline = Channels.pipeline();
-
-            //pipeline.addLast("debug", new LoggingHandler());
-
+            ChannelPipeline pipeline = channel.pipeline();
             pipeline.addLast("frameDecoder", new Frame.Decoder(connectionFactory));
             pipeline.addLast("frameEncoder", frameEncoder);
 
@@ -240,43 +235,39 @@ public class SimpleClient
             pipeline.addLast("messageEncoder", messageEncoder);
 
             pipeline.addLast("handler", responseHandler);
-
-            return pipeline;
         }
     }
 
-    private class SecurePipelineFactory extends PipelineFactory
+    private class SecureInitializer extends Initializer
     {
         private final SSLContext sslContext;
 
-        public SecurePipelineFactory() throws IOException
+        public SecureInitializer() throws IOException
         {
             this.sslContext = SSLFactory.createSSLContext(encryptionOptions, true);
         }
 
-        public ChannelPipeline getPipeline() throws Exception
+        protected void initChannel(Channel channel) throws Exception
         {
+            super.initChannel(channel);
             SSLEngine sslEngine = sslContext.createSSLEngine();
             sslEngine.setUseClientMode(true);
             sslEngine.setEnabledCipherSuites(encryptionOptions.cipher_suites);
-            ChannelPipeline pipeline = super.getPipeline();
-
-            pipeline.addFirst("ssl", new SslHandler(sslEngine));
-            return pipeline;
+            channel.pipeline().addFirst("ssl", new SslHandler(sslEngine));
         }
     }
 
-    private static class ResponseHandler extends SimpleChannelUpstreamHandler
+    @ChannelHandler.Sharable
+    private static class ResponseHandler extends SimpleChannelInboundHandler<Message.Response>
     {
         public final BlockingQueue<Message.Response> responses = new SynchronousQueue<Message.Response>(true);
 
         @Override
-        public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
+        public void channelRead0(ChannelHandlerContext ctx, Message.Response r)
         {
-            assert e.getMessage() instanceof Message.Response;
             try
             {
-                responses.put((Message.Response)e.getMessage());
+                responses.put(r);
             }
             catch (InterruptedException ie)
             {
@@ -284,11 +275,11 @@ public class SimpleClient
             }
         }
 
-        public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception
+        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception
         {
-            if (this == ctx.getPipeline().getLast())
-                logger.error("Exception in response", e.getCause());
-            ctx.sendUpstream(e);
+            if (this == ctx.pipeline().last())
+                logger.error("Exception in response", cause);
+            ctx.fireExceptionCaught(cause);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cbf304eb/src/java/org/apache/cassandra/transport/messages/AuthChallenge.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/AuthChallenge.java b/src/java/org/apache/cassandra/transport/messages/AuthChallenge.java
index b6634ad..15a9a9a 100644
--- a/src/java/org/apache/cassandra/transport/messages/AuthChallenge.java
+++ b/src/java/org/apache/cassandra/transport/messages/AuthChallenge.java
@@ -19,7 +19,7 @@ package org.apache.cassandra.transport.messages;
 
 import org.apache.cassandra.transport.CBUtil;
 import org.apache.cassandra.transport.Message;
-import org.jboss.netty.buffer.ChannelBuffer;
+import io.netty.buffer.ByteBuf;
 
 import java.nio.ByteBuffer;
 
@@ -30,7 +30,7 @@ public class AuthChallenge extends Message.Response
 {
     public static final Message.Codec<AuthChallenge> codec = new Message.Codec<AuthChallenge>()
     {
-        public AuthChallenge decode(ChannelBuffer body, int version)
+        public AuthChallenge decode(ByteBuf body, int version)
         {
             ByteBuffer b = CBUtil.readValue(body);
             byte[] token = new byte[b.remaining()];
@@ -38,7 +38,7 @@ public class AuthChallenge extends Message.Response
             return new AuthChallenge(token);
         }
 
-        public void encode(AuthChallenge challenge, ChannelBuffer dest, int version)
+        public void encode(AuthChallenge challenge, ByteBuf dest, int version)
         {
             CBUtil.writeValue(challenge.token, dest);
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cbf304eb/src/java/org/apache/cassandra/transport/messages/AuthResponse.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/AuthResponse.java b/src/java/org/apache/cassandra/transport/messages/AuthResponse.java
index 37245a7..3f3f774 100644
--- a/src/java/org/apache/cassandra/transport/messages/AuthResponse.java
+++ b/src/java/org/apache/cassandra/transport/messages/AuthResponse.java
@@ -26,7 +26,7 @@ import org.apache.cassandra.transport.Message;
 import org.apache.cassandra.transport.ProtocolException;
 import org.apache.cassandra.transport.ServerConnection;
 
-import org.jboss.netty.buffer.ChannelBuffer;
+import io.netty.buffer.ByteBuf;
 
 import java.nio.ByteBuffer;
 
@@ -39,7 +39,7 @@ public class AuthResponse extends Message.Request
 {
     public static final Message.Codec<AuthResponse> codec = new Message.Codec<AuthResponse>()
     {
-        public AuthResponse decode(ChannelBuffer body, int version)
+        public AuthResponse decode(ByteBuf body, int version)
         {
             if (version == 1)
                 throw new ProtocolException("SASL Authentication is not supported in version 1 of the protocol");
@@ -50,7 +50,7 @@ public class AuthResponse extends Message.Request
             return new AuthResponse(token);
         }
 
-        public void encode(AuthResponse response, ChannelBuffer dest, int version)
+        public void encode(AuthResponse response, ByteBuf dest, int version)
         {
             CBUtil.writeValue(response.token, dest);
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cbf304eb/src/java/org/apache/cassandra/transport/messages/AuthSuccess.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/AuthSuccess.java b/src/java/org/apache/cassandra/transport/messages/AuthSuccess.java
index 2595f28..98f50db 100644
--- a/src/java/org/apache/cassandra/transport/messages/AuthSuccess.java
+++ b/src/java/org/apache/cassandra/transport/messages/AuthSuccess.java
@@ -19,7 +19,7 @@ package org.apache.cassandra.transport.messages;
 
 import org.apache.cassandra.transport.CBUtil;
 import org.apache.cassandra.transport.Message;
-import org.jboss.netty.buffer.ChannelBuffer;
+import io.netty.buffer.ByteBuf;
 
 import java.nio.ByteBuffer;
 
@@ -33,7 +33,7 @@ public class AuthSuccess extends Message.Response
 {
     public static final Message.Codec<AuthSuccess> codec = new Message.Codec<AuthSuccess>()
     {
-        public AuthSuccess decode(ChannelBuffer body, int version)
+        public AuthSuccess decode(ByteBuf body, int version)
         {
             ByteBuffer b = CBUtil.readValue(body);
             byte[] token = new byte[b.remaining()];
@@ -41,7 +41,7 @@ public class AuthSuccess extends Message.Response
             return new AuthSuccess(token);
         }
 
-        public void encode(AuthSuccess success, ChannelBuffer dest, int version)
+        public void encode(AuthSuccess success, ByteBuf dest, int version)
         {
             CBUtil.writeValue(success.token, dest);
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cbf304eb/src/java/org/apache/cassandra/transport/messages/AuthenticateMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/AuthenticateMessage.java b/src/java/org/apache/cassandra/transport/messages/AuthenticateMessage.java
index 22207ab..230f0f2 100644
--- a/src/java/org/apache/cassandra/transport/messages/AuthenticateMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/AuthenticateMessage.java
@@ -17,7 +17,7 @@
  */
 package org.apache.cassandra.transport.messages;
 
-import org.jboss.netty.buffer.ChannelBuffer;
+import io.netty.buffer.ByteBuf;
 
 import org.apache.cassandra.transport.CBUtil;
 import org.apache.cassandra.transport.Message;
@@ -29,13 +29,13 @@ public class AuthenticateMessage extends Message.Response
 {
     public static final Message.Codec<AuthenticateMessage> codec = new Message.Codec<AuthenticateMessage>()
     {
-        public AuthenticateMessage decode(ChannelBuffer body, int version)
+        public AuthenticateMessage decode(ByteBuf body, int version)
         {
             String authenticator = CBUtil.readString(body);
             return new AuthenticateMessage(authenticator);
         }
 
-        public void encode(AuthenticateMessage msg, ChannelBuffer dest, int version)
+        public void encode(AuthenticateMessage msg, ByteBuf dest, int version)
         {
             CBUtil.writeString(msg.authenticator, dest);
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cbf304eb/src/java/org/apache/cassandra/transport/messages/BatchMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/BatchMessage.java b/src/java/org/apache/cassandra/transport/messages/BatchMessage.java
index 487e089..c34e35d 100644
--- a/src/java/org/apache/cassandra/transport/messages/BatchMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/BatchMessage.java
@@ -23,7 +23,7 @@ import java.util.Collections;
 import java.util.List;
 import java.util.UUID;
 
-import org.jboss.netty.buffer.ChannelBuffer;
+import io.netty.buffer.ByteBuf;
 
 import org.apache.cassandra.cql3.Attributes;
 import org.apache.cassandra.cql3.CQLStatement;
@@ -43,7 +43,7 @@ public class BatchMessage extends Message.Request
 {
     public static final Message.Codec<BatchMessage> codec = new Message.Codec<BatchMessage>()
     {
-        public BatchMessage decode(ChannelBuffer body, int version)
+        public BatchMessage decode(ByteBuf body, int version)
         {
             if (version == 1)
                 throw new ProtocolException("BATCH messages are not support in version 1 of the protocol");
@@ -67,7 +67,7 @@ public class BatchMessage extends Message.Request
             return new BatchMessage(toType(type), queryOrIds, variables, consistency);
         }
 
-        public void encode(BatchMessage msg, ChannelBuffer dest, int version)
+        public void encode(BatchMessage msg, ByteBuf dest, int version)
         {
             int queries = msg.queryOrIdList.size();
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cbf304eb/src/java/org/apache/cassandra/transport/messages/CredentialsMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/CredentialsMessage.java b/src/java/org/apache/cassandra/transport/messages/CredentialsMessage.java
index dca8bf8..eb39e30 100644
--- a/src/java/org/apache/cassandra/transport/messages/CredentialsMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/CredentialsMessage.java
@@ -23,7 +23,7 @@ import java.util.Map;
 import org.apache.cassandra.auth.AuthenticatedUser;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.transport.ProtocolException;
-import org.jboss.netty.buffer.ChannelBuffer;
+import io.netty.buffer.ByteBuf;
 
 import org.apache.cassandra.exceptions.AuthenticationException;
 import org.apache.cassandra.service.QueryState;
@@ -37,7 +37,7 @@ public class CredentialsMessage extends Message.Request
 {
     public static final Message.Codec<CredentialsMessage> codec = new Message.Codec<CredentialsMessage>()
     {
-        public CredentialsMessage decode(ChannelBuffer body, int version)
+        public CredentialsMessage decode(ByteBuf body, int version)
         {
             if (version > 1)
                 throw new ProtocolException("Legacy credentials authentication is not supported in " +
@@ -47,7 +47,7 @@ public class CredentialsMessage extends Message.Request
             return new CredentialsMessage(credentials);
         }
 
-        public void encode(CredentialsMessage msg, ChannelBuffer dest, int version)
+        public void encode(CredentialsMessage msg, ByteBuf dest, int version)
         {
             CBUtil.writeStringMap(msg.credentials, dest);
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cbf304eb/src/java/org/apache/cassandra/transport/messages/ErrorMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/ErrorMessage.java b/src/java/org/apache/cassandra/transport/messages/ErrorMessage.java
index 8f9ba6a..6109d1d 100644
--- a/src/java/org/apache/cassandra/transport/messages/ErrorMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/ErrorMessage.java
@@ -17,7 +17,7 @@
  */
 package org.apache.cassandra.transport.messages;
 
-import org.jboss.netty.buffer.ChannelBuffer;
+import io.netty.buffer.ByteBuf;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -39,7 +39,7 @@ public class ErrorMessage extends Message.Response
 
     public static final Message.Codec<ErrorMessage> codec = new Message.Codec<ErrorMessage>()
     {
-        public ErrorMessage decode(ChannelBuffer body, int version)
+        public ErrorMessage decode(ByteBuf body, int version)
         {
             ExceptionCode code = ExceptionCode.fromValue(body.readInt());
             String msg = CBUtil.readString(body);
@@ -119,7 +119,7 @@ public class ErrorMessage extends Message.Response
             return new ErrorMessage(te);
         }
 
-        public void encode(ErrorMessage msg, ChannelBuffer dest, int version)
+        public void encode(ErrorMessage msg, ByteBuf dest, int version)
         {
             dest.writeInt(msg.error.code().value);
             CBUtil.writeString(msg.error.getMessage(), dest);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cbf304eb/src/java/org/apache/cassandra/transport/messages/EventMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/EventMessage.java b/src/java/org/apache/cassandra/transport/messages/EventMessage.java
index 9acb401..890b9d1 100644
--- a/src/java/org/apache/cassandra/transport/messages/EventMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/EventMessage.java
@@ -17,7 +17,7 @@
  */
 package org.apache.cassandra.transport.messages;
 
-import org.jboss.netty.buffer.ChannelBuffer;
+import io.netty.buffer.ByteBuf;
 
 import org.apache.cassandra.transport.Event;
 import org.apache.cassandra.transport.Message;
@@ -26,12 +26,12 @@ public class EventMessage extends Message.Response
 {
     public static final Message.Codec<EventMessage> codec = new Message.Codec<EventMessage>()
     {
-        public EventMessage decode(ChannelBuffer body, int version)
+        public EventMessage decode(ByteBuf body, int version)
         {
             return new EventMessage(Event.deserialize(body));
         }
 
-        public void encode(EventMessage msg, ChannelBuffer dest, int version)
+        public void encode(EventMessage msg, ByteBuf dest, int version)
         {
             msg.event.serialize(dest);
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cbf304eb/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java b/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java
index 64e4785..4ee340c 100644
--- a/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java
@@ -22,7 +22,7 @@ import java.util.List;
 import java.util.UUID;
 
 import com.google.common.collect.ImmutableMap;
-import org.jboss.netty.buffer.ChannelBuffer;
+import io.netty.buffer.ByteBuf;
 
 import org.apache.cassandra.cql3.CQLStatement;
 import org.apache.cassandra.cql3.QueryProcessor;
@@ -39,7 +39,7 @@ public class ExecuteMessage extends Message.Request
 {
     public static final Message.Codec<ExecuteMessage> codec = new Message.Codec<ExecuteMessage>()
     {
-        public ExecuteMessage decode(ChannelBuffer body, int version)
+        public ExecuteMessage decode(ByteBuf body, int version)
         {
             byte[] id = CBUtil.readBytes(body);
             if (version == 1)
@@ -54,7 +54,7 @@ public class ExecuteMessage extends Message.Request
             }
         }
 
-        public void encode(ExecuteMessage msg, ChannelBuffer dest, int version)
+        public void encode(ExecuteMessage msg, ByteBuf dest, int version)
         {
             CBUtil.writeBytes(msg.statementId.bytes, dest);
             if (version == 1)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cbf304eb/src/java/org/apache/cassandra/transport/messages/OptionsMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/OptionsMessage.java b/src/java/org/apache/cassandra/transport/messages/OptionsMessage.java
index f74d252..2f6e3da 100644
--- a/src/java/org/apache/cassandra/transport/messages/OptionsMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/OptionsMessage.java
@@ -22,7 +22,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-import org.jboss.netty.buffer.ChannelBuffer;
+import io.netty.buffer.ByteBuf;
 
 import org.apache.cassandra.cql3.QueryProcessor;
 import org.apache.cassandra.service.QueryState;
@@ -36,12 +36,12 @@ public class OptionsMessage extends Message.Request
 {
     public static final Message.Codec<OptionsMessage> codec = new Message.Codec<OptionsMessage>()
     {
-        public OptionsMessage decode(ChannelBuffer body, int version)
+        public OptionsMessage decode(ByteBuf body, int version)
         {
             return new OptionsMessage();
         }
 
-        public void encode(OptionsMessage msg, ChannelBuffer dest, int version)
+        public void encode(OptionsMessage msg, ByteBuf dest, int version)
         {
         }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cbf304eb/src/java/org/apache/cassandra/transport/messages/PrepareMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/PrepareMessage.java b/src/java/org/apache/cassandra/transport/messages/PrepareMessage.java
index 002c33c..992d59c 100644
--- a/src/java/org/apache/cassandra/transport/messages/PrepareMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/PrepareMessage.java
@@ -20,7 +20,7 @@ package org.apache.cassandra.transport.messages;
 import java.util.UUID;
 
 import com.google.common.collect.ImmutableMap;
-import org.jboss.netty.buffer.ChannelBuffer;
+import io.netty.buffer.ByteBuf;
 
 import org.apache.cassandra.cql3.QueryProcessor;
 import org.apache.cassandra.service.QueryState;
@@ -32,13 +32,13 @@ public class PrepareMessage extends Message.Request
 {
     public static final Message.Codec<PrepareMessage> codec = new Message.Codec<PrepareMessage>()
     {
-        public PrepareMessage decode(ChannelBuffer body, int version)
+        public PrepareMessage decode(ByteBuf body, int version)
         {
             String query = CBUtil.readLongString(body);
             return new PrepareMessage(query);
         }
 
-        public void encode(PrepareMessage msg, ChannelBuffer dest, int version)
+        public void encode(PrepareMessage msg, ByteBuf dest, int version)
         {
             CBUtil.writeLongString(msg.query, dest);
         }


Mime
View raw message