cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From slebre...@apache.org
Subject [3/6] Custom CQL protocol
Date Thu, 05 Jul 2012 16:22:39 GMT
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6d3a3eed/src/java/org/apache/cassandra/transport/CBUtil.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/CBUtil.java b/src/java/org/apache/cassandra/transport/CBUtil.java
new file mode 100644
index 0000000..791f097
--- /dev/null
+++ b/src/java/org/apache/cassandra/transport/CBUtil.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.transport;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.CharacterCodingException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBuffers;
+import org.jboss.netty.util.CharsetUtil;
+
+/**
+ * ChannelBuffer utility methods.
+ * Note that contrarily to ByteBufferUtil, these method do "read" the
+ * ChannelBuffer advancing it's (read) position. They also write by
+ * advancing the write position. Functions are also provided to create
+ * ChannelBuffer while avoiding copies.
+ */
+public abstract class CBUtil
+{
+    private CBUtil() {}
+
+    public static String readString(ChannelBuffer cb)
+    {
+        try
+        {
+            int length = cb.readUnsignedShort();
+            return readString(cb, length);
+        }
+        catch (IndexOutOfBoundsException e)
+        {
+            throw new ProtocolException("Not enough bytes to read an UTF8 serialized string preceded by it's 2 bytes length");
+        }
+    }
+
+    public static String readLongString(ChannelBuffer cb)
+    {
+        try
+        {
+            int length = cb.readInt();
+            return readString(cb, length);
+        }
+        catch (IndexOutOfBoundsException e)
+        {
+            throw new ProtocolException("Not enough bytes to read an UTF8 serialized string preceded by it's 4 bytes length");
+        }
+    }
+
+    private static String readString(ChannelBuffer cb, int length)
+    {
+        try
+        {
+            String str = cb.toString(cb.readerIndex(), length, CharsetUtil.UTF_8);
+            cb.readerIndex(cb.readerIndex() + length);
+            return str;
+        }
+        catch (IllegalStateException e)
+        {
+            // That's the way netty encapsulate a CCE
+            if (e.getCause() instanceof CharacterCodingException)
+                throw new ProtocolException("Cannot decode string as UTF8");
+            else
+                throw e;
+        }
+    }
+
+    private static ChannelBuffer bytes(String str)
+    {
+        return ChannelBuffers.wrappedBuffer(str.getBytes(CharsetUtil.UTF_8));
+    }
+
+    public static ChannelBuffer shortToCB(int s)
+    {
+        ChannelBuffer cb = ChannelBuffers.buffer(2);
+        cb.writeShort(s);
+        return cb;
+    }
+
+    public static ChannelBuffer intToCB(int i)
+    {
+        ChannelBuffer cb = ChannelBuffers.buffer(4);
+        cb.writeInt(i);
+        return cb;
+    }
+
+    public static ChannelBuffer stringToCB(String str)
+    {
+        ChannelBuffer bytes = bytes(str);
+        return ChannelBuffers.wrappedBuffer(shortToCB(bytes.readableBytes()), bytes);
+    }
+
+    public static ChannelBuffer longStringToCB(String str)
+    {
+        ChannelBuffer bytes = bytes(str);
+        return ChannelBuffers.wrappedBuffer(intToCB(bytes.readableBytes()), bytes);
+    }
+
+    public static List<String> readStringList(ChannelBuffer cb)
+    {
+        int length = cb.readUnsignedShort();
+        List<String> l = new ArrayList<String>();
+        for (int i = 0; i < length; i++)
+            l.add(readString(cb));
+        return l;
+    }
+
+    public static void writeStringList(ChannelBuffer cb, List<String> l)
+    {
+        cb.writeShort(l.size());
+        for (String str : l)
+            cb.writeBytes(stringToCB(str));
+    }
+
+    public static ByteBuffer readValue(ChannelBuffer cb)
+    {
+        int length = cb.readInt();
+        return length < 0 ? null : cb.readSlice(length).toByteBuffer();
+    }
+
+    public static class BufferBuilder
+    {
+        private final int size;
+        private final ChannelBuffer[] buffers;
+        private int i;
+
+        public BufferBuilder(int simpleBuffers, int stringBuffers, int valueBuffers)
+        {
+            this.size = simpleBuffers + 2 * stringBuffers + 2 * valueBuffers;
+            this.buffers = new ChannelBuffer[size];
+        }
+
+        public BufferBuilder add(ChannelBuffer cb)
+        {
+            buffers[i++] = cb;
+            return this;
+        }
+
+        public BufferBuilder addString(String str)
+        {
+            ChannelBuffer bytes = bytes(str);
+            add(shortToCB(bytes.readableBytes()));
+            return add(bytes);
+        }
+
+        public BufferBuilder addValue(ByteBuffer bb)
+        {
+            add(intToCB(bb == null ? -1 : bb.remaining()));
+            return add(bb == null ? ChannelBuffers.EMPTY_BUFFER : ChannelBuffers.wrappedBuffer(bb));
+        }
+
+        public ChannelBuffer build()
+        {
+            return ChannelBuffers.wrappedBuffer(buffers);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6d3a3eed/src/java/org/apache/cassandra/transport/Client.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/Client.java b/src/java/org/apache/cassandra/transport/Client.java
new file mode 100644
index 0000000..b2042b9
--- /dev/null
+++ b/src/java/org/apache/cassandra/transport/Client.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.transport;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.util.concurrent.Executors;
+import java.util.*;
+
+import com.google.common.base.Splitter;
+
+import org.jboss.netty.bootstrap.ClientBootstrap;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.Channels;
+import org.jboss.netty.channel.ChannelFuture;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.channel.ChannelPipeline;
+import org.jboss.netty.channel.ChannelPipelineFactory;
+import org.jboss.netty.channel.MessageEvent;
+import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
+import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
+
+import org.apache.cassandra.transport.messages.*;
+import org.apache.cassandra.db.marshal.*;
+import org.apache.cassandra.service.ClientState;
+import org.apache.cassandra.utils.SimpleCondition;
+
+public class Client extends SimpleClient
+{
+    public Client(String host, int port)
+    {
+        super(host, port);
+    }
+
+    public void run() throws IOException
+    {
+        // Start the connection attempt.
+        System.out.print("Connecting...");
+        establishConnection();
+        System.out.println();
+
+        // Read commands from the stdin.
+        BufferedReader in = new BufferedReader(new InputStreamReader(System.in));
+        for (;;)
+        {
+            System.out.print(">> ");
+            System.out.flush();
+            String line = in.readLine();
+            if (line == null) {
+                break;
+            }
+            Message.Request req = parseLine(line.trim());
+            if (req == null)
+            {
+                System.out.println("! Error parsing line.");
+                continue;
+            }
+
+            try
+            {
+                Message.Response resp = execute(req);
+                System.out.println("-> " + resp);
+            }
+            catch (Exception e)
+            {
+                System.err.println("ERROR: " + e.getMessage());
+            }
+        }
+
+        close();
+    }
+
+    private Message.Request parseLine(String line)
+    {
+        Splitter splitter = Splitter.on(' ').trimResults().omitEmptyStrings();
+        Iterator<String> iter = splitter.split(line).iterator();
+        if (!iter.hasNext())
+            return null;
+        String msgType = iter.next().toUpperCase();
+        if (msgType.equals("STARTUP"))
+        {
+            EnumMap<StartupMessage.Option, Object> options = new EnumMap<StartupMessage.Option, Object>(StartupMessage.Option.class);
+            while (iter.hasNext())
+            {
+               String next = iter.next();
+               if (next.toLowerCase().equals("snappy"))
+               {
+                   options.put(StartupMessage.Option.COMPRESSION, "snappy");
+                   connection.setCompressor(FrameCompressor.SnappyCompressor.instance);
+               }
+            }
+            return new StartupMessage("3.0.0", options);
+        }
+        else if (msgType.equals("QUERY"))
+        {
+            String query = line.substring(6);
+            return new QueryMessage(query);
+        }
+        else if (msgType.equals("PREPARE"))
+        {
+            String query = line.substring(8);
+            return new PrepareMessage(query);
+        }
+        else if (msgType.equals("EXECUTE"))
+        {
+            try
+            {
+                int id = Integer.parseInt(iter.next());
+                List<ByteBuffer> values = new ArrayList<ByteBuffer>();
+                while(iter.hasNext())
+                {
+                    String next = iter.next();
+                    ByteBuffer bb;
+                    try
+                    {
+                        int v = Integer.parseInt(next);
+                        bb = Int32Type.instance.decompose(v);
+                    }
+                    catch (NumberFormatException e)
+                    {
+                        bb = UTF8Type.instance.decompose(next);
+                    }
+                    values.add(bb);
+                }
+                return new ExecuteMessage(id, values);
+            }
+            catch (Exception e)
+            {
+                return null;
+            }
+        }
+        else if (msgType.equals("OPTIONS"))
+        {
+            return new OptionsMessage();
+        }
+        else if (msgType.equals("CREDENTIALS"))
+        {
+            CredentialsMessage msg = new CredentialsMessage();
+            while (iter.hasNext())
+            {
+                String next = iter.next();
+                String[] kv = next.split("=");
+                if (kv.length != 2)
+                    return null;
+                msg.credentials.put(kv[0], kv[1]);
+            }
+            return msg;
+        }
+        return null;
+    }
+
+    public static void main(String[] args) throws Exception
+    {
+        // Print usage if no argument is specified.
+        if (args.length != 2)
+        {
+            System.err.println("Usage: " + Client.class.getSimpleName() + " <host> <port>");
+            return;
+        }
+
+        // Parse options.
+        String host = args[0];
+        int port = Integer.parseInt(args[1]);
+
+        System.out.println("CQL binary protocol console " + host + "@" + port);
+
+        new Client(host, port).run();
+        System.exit(0);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6d3a3eed/src/java/org/apache/cassandra/transport/Connection.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/Connection.java b/src/java/org/apache/cassandra/transport/Connection.java
new file mode 100644
index 0000000..9f874a3
--- /dev/null
+++ b/src/java/org/apache/cassandra/transport/Connection.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.transport;
+
+import org.jboss.netty.channel.Channel;
+
+import org.apache.cassandra.service.ClientState;
+
+public abstract class Connection
+{
+    public static final Factory SERVER_FACTORY = new Factory()
+    {
+        public Connection newConnection()
+        {
+            return new ServerConnection();
+        }
+    };
+
+    private FrameCompressor frameCompressor;
+
+    public void setCompressor(FrameCompressor compressor)
+    {
+        this.frameCompressor = compressor;
+    }
+
+    public FrameCompressor getCompressor()
+    {
+        return frameCompressor;
+    }
+
+    public abstract void validateNewMessage(Message.Type type);
+    public abstract void applyStateTransition(Message.Type requestType, Message.Type responseType);
+    public abstract ClientState clientState();
+
+    public interface Factory
+    {
+        public Connection newConnection();
+    }
+
+    private static class ServerConnection extends Connection
+    {
+        private enum State { UNINITIALIZED, AUTHENTICATION, READY; }
+
+        private final ClientState clientState;
+        private State state;
+
+        public ServerConnection()
+        {
+            this.clientState = new ClientState();
+            this.state = State.UNINITIALIZED;
+        }
+
+        public ClientState clientState()
+        {
+            return clientState;
+        }
+
+        public void validateNewMessage(Message.Type type)
+        {
+            switch (state)
+            {
+                case UNINITIALIZED:
+                    if (type != Message.Type.STARTUP && type != Message.Type.OPTIONS)
+                        throw new ProtocolException(String.format("Unexpected message %s, expecting STARTUP or OPTIONS", type));
+                    break;
+                case AUTHENTICATION:
+                    if (type != Message.Type.CREDENTIALS)
+                        throw new ProtocolException(String.format("Unexpected message %s, needs authentication through CREDENTIALS message", type));
+                    break;
+                case READY:
+                    if (type == Message.Type.STARTUP)
+                        throw new ProtocolException("Unexpected message STARTUP, the connection is already initialized");
+                    break;
+                default:
+                    throw new AssertionError();
+            }
+        }
+
+        public void applyStateTransition(Message.Type requestType, Message.Type responseType)
+        {
+            switch (state)
+            {
+                case UNINITIALIZED:
+                    if (requestType == Message.Type.STARTUP)
+                    {
+                        if (responseType == Message.Type.AUTHENTICATE)
+                            state = State.AUTHENTICATION;
+                        else if (responseType == Message.Type.READY)
+                            state = State.READY;
+                    }
+                    break;
+                case AUTHENTICATION:
+                    assert requestType == Message.Type.CREDENTIALS;
+                    if (responseType == Message.Type.READY)
+                        state = State.READY;
+                case READY:
+                    break;
+                default:
+                    throw new AssertionError();
+            }
+        }
+    }
+
+    public interface Tracker
+    {
+        public void addConnection(Channel ch, Connection connection);
+        public void closeAll();
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6d3a3eed/src/java/org/apache/cassandra/transport/DataType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/DataType.java b/src/java/org/apache/cassandra/transport/DataType.java
new file mode 100644
index 0000000..5254945
--- /dev/null
+++ b/src/java/org/apache/cassandra/transport/DataType.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.transport;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import com.google.common.base.Charsets;
+import org.jboss.netty.buffer.ChannelBuffer;
+
+import org.apache.cassandra.config.ConfigurationException;
+import org.apache.cassandra.db.marshal.*;
+import org.apache.cassandra.utils.Pair;
+
+public enum DataType implements OptionCodec.Codecable<DataType>
+{
+    CUSTOM   (0,  null),
+    ASCII    (1,  AsciiType.instance),
+    BIGINT   (2,  LongType.instance),
+    BLOB     (3,  BytesType.instance),
+    BOOLEAN  (4,  BooleanType.instance),
+    COUNTER  (5,  CounterColumnType.instance),
+    DECIMAL  (6,  DecimalType.instance),
+    DOUBLE   (7,  DoubleType.instance),
+    FLOAT    (8,  FloatType.instance),
+    INT      (9,  Int32Type.instance),
+    TEXT     (10, UTF8Type.instance),
+    TIMESTAMP(11, DateType.instance),
+    UUID     (12, UUIDType.instance),
+    VARCHAR  (13, UTF8Type.instance),
+    VARINT   (14, IntegerType.instance),
+    TIMEUUID (15, TimeUUIDType.instance);
+
+    private final int id;
+    private final AbstractType type;
+    private static final Map<AbstractType, DataType> dataTypeMap = new HashMap<AbstractType, DataType>();
+    static
+    {
+        for (DataType type : DataType.values())
+        {
+            if (type.type != null)
+                dataTypeMap.put(type.type, type);
+        }
+    }
+
+    private DataType(int id, AbstractType type)
+    {
+        this.id = id;
+        this.type = type;
+    }
+
+    public int getId()
+    {
+        return id;
+    }
+
+    public Object readValue(ChannelBuffer cb)
+    {
+        switch (this)
+        {
+            case CUSTOM:
+                return CBUtil.readString(cb);
+            default:
+                return null;
+        }
+    }
+
+    public void writeValue(Object value, ChannelBuffer cb)
+    {
+        switch (this)
+        {
+            case CUSTOM:
+                assert value instanceof String;
+                cb.writeBytes(CBUtil.stringToCB((String)value));
+                break;
+        }
+    }
+
+    public int serializedValueSize(Object value)
+    {
+        switch (this)
+        {
+            case CUSTOM:
+                return 2 + ((String)value).getBytes(Charsets.UTF_8).length;
+            default:
+                return 0;
+        }
+    }
+
+    public static Pair<DataType, Object> fromType(AbstractType type)
+    {
+        DataType dt = dataTypeMap.get(type);
+        if (dt == null)
+            return Pair.<DataType, Object>create(CUSTOM, type.toString());
+        else
+            return Pair.create(dt, null);
+    }
+
+    public static AbstractType toType(Pair<DataType, Object> entry)
+    {
+        try
+        {
+            if (entry.left == CUSTOM)
+                return TypeParser.parse((String)entry.right);
+            else
+                return entry.left.type;
+        }
+        catch (ConfigurationException e)
+        {
+            throw new ProtocolException(e.getMessage());
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6d3a3eed/src/java/org/apache/cassandra/transport/Frame.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/Frame.java b/src/java/org/apache/cassandra/transport/Frame.java
new file mode 100644
index 0000000..9d4885d
--- /dev/null
+++ b/src/java/org/apache/cassandra/transport/Frame.java
@@ -0,0 +1,239 @@
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.transport;
+
+import java.io.IOException;
+import java.util.EnumSet;
+
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBuffers;
+import org.jboss.netty.channel.*;
+import org.jboss.netty.handler.codec.oneone.OneToOneDecoder;
+import org.jboss.netty.handler.codec.oneone.OneToOneEncoder;
+import org.jboss.netty.handler.codec.frame.LengthFieldBasedFrameDecoder;
+
+import org.apache.cassandra.utils.ByteBufferUtil;
+
+public class Frame
+{
+    public final Header header;
+    public final ChannelBuffer body;
+    public final Connection connection;
+
+    /**
+     * On-wire frame.
+     * Frames are defined as:
+     *
+     *   0         8        16        24        32
+     *   +---------+---------+---------+---------+
+     *   | version |  flags  |      opcode       |
+     *   +---------+---------+---------+---------+
+     *   |                length                 |
+     *   +---------+---------+---------+---------+
+     */
+    private Frame(Header header, ChannelBuffer body, Connection connection)
+    {
+        this.header = header;
+        this.body = body;
+        this.connection = connection;
+    }
+
+    public static Frame create(ChannelBuffer fullFrame, Connection connection)
+    {
+        assert fullFrame.readableBytes() >= Header.LENGTH : String.format("Frame too short (%d bytes = %s)",
+                                                                          fullFrame.readableBytes(),
+                                                                          ByteBufferUtil.bytesToHex(fullFrame.toByteBuffer()));
+
+        int version = fullFrame.readByte();
+        int flags = fullFrame.readByte();
+        int opcode = fullFrame.readUnsignedShort();
+        int length = fullFrame.readInt();
+        assert length == fullFrame.readableBytes();
+
+        // version first byte is the "direction" of the frame (request or response)
+        Message.Direction direction = Message.Direction.extractFromVersion(version);
+        version = version & 0x7F;
+
+        Header header = new Header(version, flags, Message.Type.fromOpcode(opcode, direction));
+        return new Frame(header, fullFrame, connection);
+    }
+
+    public static Frame create(Message.Type type, ChannelBuffer body, Connection connection)
+    {
+        EnumSet<Header.Flag> flags = EnumSet.noneOf(Header.Flag.class);
+        Header header = new Header(Header.CURRENT_VERSION, flags, type);
+        return new Frame(header, body, connection);
+    }
+
+    public static class Header
+    {
+        public static final int LENGTH = 8;
+        public static final int CURRENT_VERSION = 1;
+
+        public final int version;
+        public final EnumSet<Flag> flags;
+        public final Message.Type type;
+
+        private Header(int version, int flags, Message.Type type)
+        {
+            this(version, Flag.deserialize(flags), type);
+        }
+
+        private Header(int version, EnumSet<Flag> flags, Message.Type type)
+        {
+            this.version = version;
+            this.flags = flags;
+            this.type = type;
+        }
+
+        public static enum Flag
+        {
+            // The order of that enum matters!!
+            COMPRESSED;
+
+            public static EnumSet<Flag> deserialize(int flags)
+            {
+                EnumSet<Flag> set = EnumSet.noneOf(Flag.class);
+                Flag[] values = Flag.values();
+                for (int n = 0; n < 8; n++)
+                {
+                    if ((flags & (1 << n)) != 0)
+                        set.add(values[n]);
+                }
+                return set;
+            }
+
+            public static int serialize(EnumSet<Flag> flags)
+            {
+                int i = 0;
+                for (Flag flag : flags)
+                    i |= 1 << flag.ordinal();
+                return i;
+            }
+        }
+    }
+
+    public Frame with(ChannelBuffer newBody)
+    {
+        return new Frame(header, newBody, connection);
+    }
+
+    public static class Decoder extends LengthFieldBasedFrameDecoder
+    {
+        private static final int MAX_FRAME_LENTH = 256 * 1024 * 1024; // 256 MB
+        private final Connection.Tracker tracker;
+        private final Connection connection;
+
+        public Decoder(Connection.Tracker tracker, Connection.Factory factory)
+        {
+            super(MAX_FRAME_LENTH, 4, 4);
+            this.tracker = tracker;
+            this.connection = factory.newConnection();
+        }
+
+        @Override
+        public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent e)
+        throws Exception
+        {
+            tracker.addConnection(e.getChannel(), connection);
+        }
+
+        @Override
+        protected Object decode (ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer)
+        throws Exception
+        {
+            ChannelBuffer frame = (ChannelBuffer) super.decode(ctx, channel, buffer);
+            if (frame == null)
+            {
+                return null;
+            }
+            return Frame.create(frame, connection);
+        }
+
+        @Override
+        protected ChannelBuffer extractFrame(ChannelBuffer buffer, int index, int length)
+        {
+            // Don't copy
+            return buffer.slice(index, length);
+        }
+    }
+
+    public static class Encoder extends OneToOneEncoder
+    {
+        public Object encode(ChannelHandlerContext ctx, Channel channel, Object msg)
+        throws IOException
+        {
+            assert msg instanceof Frame : "Expecting frame, got " + msg;
+
+            Frame frame = (Frame)msg;
+
+            ChannelBuffer header = ChannelBuffers.buffer(Frame.Header.LENGTH);
+            Message.Type type = frame.header.type;
+            header.writeByte(type.direction.addToVersion(frame.header.version));
+            header.writeByte(Header.Flag.serialize(frame.header.flags));
+            header.writeShort(type.opcode);
+            header.writeInt(frame.body.readableBytes());
+
+            return ChannelBuffers.wrappedBuffer(header, frame.body);
+        }
+    }
+
+    public static class Decompressor extends OneToOneDecoder
+    {
+        public Object decode(ChannelHandlerContext ctx, Channel channel, Object msg)
+        throws IOException
+        {
+            assert msg instanceof Frame : "Expecting frame, got " + msg;
+
+            Frame frame = (Frame)msg;
+
+            if (!frame.header.flags.contains(Header.Flag.COMPRESSED))
+                return frame;
+
+            FrameCompressor compressor = frame.connection.getCompressor();
+            if (compressor == null)
+                return frame;
+
+            return compressor.decompress(frame);
+        }
+    }
+
+    public static class Compressor extends OneToOneEncoder
+    {
+        public Object encode(ChannelHandlerContext ctx, Channel channel, Object msg)
+        throws IOException
+        {
+            assert msg instanceof Frame : "Expecting frame, got " + msg;
+
+            Frame frame = (Frame)msg;
+
+            // Never compress STARTUP messages
+            if (frame.header.type == Message.Type.STARTUP || frame.connection == null)
+                return frame;
+
+            FrameCompressor compressor = frame.connection.getCompressor();
+            if (compressor == null)
+                return frame;
+
+            frame.header.flags.add(Header.Flag.COMPRESSED);
+            return compressor.compress(frame);
+
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6d3a3eed/src/java/org/apache/cassandra/transport/FrameCompressor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/FrameCompressor.java b/src/java/org/apache/cassandra/transport/FrameCompressor.java
new file mode 100644
index 0000000..a22766f
--- /dev/null
+++ b/src/java/org/apache/cassandra/transport/FrameCompressor.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.transport;
+
+import java.io.IOException;
+
+import org.jboss.netty.buffer.ChannelBuffers;
+import org.xerial.snappy.Snappy;
+
+public interface FrameCompressor
+{
+    public Frame compress(Frame frame) throws IOException;
+    public Frame decompress(Frame frame) throws IOException;
+
+    /*
+     * TODO: We can probably do more efficient, like by avoiding copy.
+     * Also, we don't reuse ICompressor because the API doesn't expose enough.
+     */
+    public static class SnappyCompressor implements FrameCompressor
+    {
+        public static final SnappyCompressor instance;
+        static
+        {
+            SnappyCompressor i;
+            try
+            {
+                i = new SnappyCompressor();
+            }
+            catch (NoClassDefFoundError e)
+            {
+                i = null;
+            }
+            instance = i;
+        }
+
+        private SnappyCompressor()
+        {
+            // this would throw java.lang.NoClassDefFoundError if Snappy class
+            // wasn't found at runtime which should be processed by the calling method
+            Snappy.getNativeLibraryVersion();
+        }
+
+        public Frame compress(Frame frame) throws IOException
+        {
+            byte[] input = new byte[frame.body.readableBytes()];
+            byte[] output = new byte[Snappy.maxCompressedLength(input.length)];
+
+            frame.body.readBytes(input);
+            int written = Snappy.compress(input, 0, input.length, output, 0);
+            return frame.with(ChannelBuffers.wrappedBuffer(output, 0, written));
+        }
+
+        public Frame decompress(Frame frame) throws IOException
+        {
+            byte[] input = new byte[frame.body.readableBytes()];
+            frame.body.readBytes(input);
+
+            if (!Snappy.isValidCompressedBuffer(input, 0, input.length))
+                throw new ProtocolException("Provided frame does not appear to be Snappy compressed");
+
+            byte[] output = new byte[Snappy.uncompressedLength(input)];
+            int size = Snappy.uncompress(input, 0, input.length, output, 0);
+            return frame.with(ChannelBuffers.wrappedBuffer(output, 0, size));
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6d3a3eed/src/java/org/apache/cassandra/transport/Message.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/Message.java b/src/java/org/apache/cassandra/transport/Message.java
new file mode 100644
index 0000000..24bdaf4
--- /dev/null
+++ b/src/java/org/apache/cassandra/transport/Message.java
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.transport;
+
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBuffers;
+import org.jboss.netty.channel.*;
+import org.jboss.netty.handler.codec.oneone.OneToOneDecoder;
+import org.jboss.netty.handler.codec.oneone.OneToOneEncoder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.transport.messages.*;
+
+/**
+ * A message from the CQL binary protocol.
+ */
+public abstract class Message
+{
+    protected static final Logger logger = LoggerFactory.getLogger(Message.class);
+
+    public interface Codec<M extends Message> extends CBCodec<M> {}
+
+    public enum Direction
+    {
+        REQUEST, RESPONSE;
+
+        public static Direction extractFromVersion(int versionWithDirection)
+        {
+            return (versionWithDirection & 0x80) == 0 ? REQUEST : RESPONSE;
+        }
+
+        public int addToVersion(int rawVersion)
+        {
+            return this == REQUEST ? (rawVersion & 0x7F) : (rawVersion | 0x80);
+        }
+    }
+
+    public enum Type
+    {
+        ERROR        (0,  Direction.RESPONSE, ErrorMessage.codec),
+        STARTUP      (1,  Direction.REQUEST,  StartupMessage.codec),
+        READY        (2,  Direction.RESPONSE, ReadyMessage.codec),
+        AUTHENTICATE (3,  Direction.RESPONSE, AuthenticateMessage.codec),
+        CREDENTIALS  (4,  Direction.REQUEST,  CredentialsMessage.codec),
+        OPTIONS      (5,  Direction.REQUEST,  OptionsMessage.codec),
+        SUPPORTED    (6,  Direction.RESPONSE, SupportedMessage.codec),
+        QUERY        (7,  Direction.REQUEST,  QueryMessage.codec),
+        RESULT       (8,  Direction.RESPONSE, ResultMessage.codec),
+        PREPARE      (9,  Direction.REQUEST,  PrepareMessage.codec),
+        EXECUTE      (10, Direction.REQUEST,  ExecuteMessage.codec);
+
+        public final int opcode;
+        public final Direction direction;
+        public final Codec<?> codec;
+
+        private static final Type[] opcodeIdx;
+        static
+        {
+            int maxOpcode = -1;
+            for (Type type : Type.values())
+                maxOpcode = Math.max(maxOpcode, type.opcode);
+            opcodeIdx = new Type[maxOpcode + 1];
+            for (Type type : Type.values())
+            {
+                if (opcodeIdx[type.opcode] != null)
+                    throw new IllegalStateException("Duplicate opcode");
+                opcodeIdx[type.opcode] = type;
+            }
+        }
+
+        private Type(int opcode, Direction direction, Codec<?> codec)
+        {
+            this.opcode = opcode;
+            this.direction = direction;
+            this.codec = codec;
+        }
+
+        public static Type fromOpcode(int opcode, Direction direction)
+        {
+            Type t = opcodeIdx[opcode];
+            if (t == null)
+                throw new ProtocolException(String.format("Unknown opcode %d", opcode));
+            if (t.direction != direction)
+                throw new ProtocolException(String.format("Wrong protocol direction (expected %s, got %s) for opcode %d (%s)",
+                                                          t.direction,
+                                                          direction,
+                                                          opcode,
+                                                          t));
+            return t;
+        }
+    }
+
+    public final Type type;
+    protected Connection connection;
+
+    protected Message(Type type)
+    {
+        this.type = type;
+    }
+
+    public void attach(Connection connection)
+    {
+        this.connection = connection;
+    }
+
+    public Connection connection()
+    {
+        return connection;
+    }
+
+    public abstract ChannelBuffer encode();
+
+    public static abstract class Request extends Message
+    {
+        protected Request(Type type)
+        {
+            super(type);
+
+            if (type.direction != Direction.REQUEST)
+                throw new IllegalArgumentException();
+        }
+
+        public abstract Response execute();
+    }
+
+    public static abstract class Response extends Message
+    {
+        protected Response(Type type)
+        {
+            super(type);
+
+            if (type.direction != Direction.RESPONSE)
+                throw new IllegalArgumentException();
+        }
+    }
+
+    public static class ProtocolDecoder extends OneToOneDecoder
+    {
+        public Object decode(ChannelHandlerContext ctx, Channel channel, Object msg)
+        {
+            assert msg instanceof Frame : "Expecting frame, got " + msg;
+
+            Frame frame = (Frame)msg;
+            Message message = frame.header.type.codec.decode(frame.body);
+            if (message instanceof Request)
+                ((Request)message).attach(frame.connection);
+            return message;
+        }
+    }
+
+    public static class ProtocolEncoder extends OneToOneEncoder
+    {
+        public Object encode(ChannelHandlerContext ctx, Channel channel, Object msg)
+        {
+            assert msg instanceof Message : "Expecting message, got " + msg;
+
+            Message message = (Message)msg;
+            return Frame.create(message.type, message.encode(), message.connection());
+        }
+    }
+
+    public static class Dispatcher extends SimpleChannelUpstreamHandler
+    {
+        @Override
+        public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
+        {
+            assert e.getMessage() instanceof Message : "Expecting message, got " + e.getMessage();
+
+            if (e.getMessage() instanceof Response)
+                throw new ProtocolException("Invalid response message received, expecting requests");
+
+            Request request = (Request)e.getMessage();
+            Connection connection = request.connection();
+            connection.validateNewMessage(request.type);
+
+            logger.debug("Received: " + request);
+
+            Response response = request.execute();
+            response.attach(connection);
+            response.connection().applyStateTransition(request.type, response.type);
+
+            logger.debug("Responding: " + response);
+
+            e.getChannel().write(response);
+        }
+
+        @Override
+        public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
+        throws Exception
+        {
+            ctx.getChannel().write(ErrorMessage.fromException(e.getCause()));
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6d3a3eed/src/java/org/apache/cassandra/transport/OptionCodec.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/OptionCodec.java b/src/java/org/apache/cassandra/transport/OptionCodec.java
new file mode 100644
index 0000000..2898300
--- /dev/null
+++ b/src/java/org/apache/cassandra/transport/OptionCodec.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.transport;
+
+import java.lang.reflect.Array;
+import java.util.ArrayList;
+import java.util.EnumMap;
+import java.util.List;
+import java.util.Map;
+
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBuffers;
+
+import org.apache.cassandra.utils.Pair;
+
+public class OptionCodec<T extends Enum<T> & OptionCodec.Codecable<T>>
+{
+    public interface Codecable<T extends Enum<T>>
+    {
+        public int getId();
+
+        public Object readValue(ChannelBuffer cb);
+        public void writeValue(Object value, ChannelBuffer cb);
+        public int serializedValueSize(Object obj);
+    }
+
+    private final Class<T> klass;
+    private final T[] ids;
+
+    @SuppressWarnings({"unchecked"})
+    public OptionCodec(Class<T> klass)
+    {
+        this.klass = klass;
+
+        T[] values = klass.getEnumConstants();
+        int maxId = -1;
+        for (T opt : values)
+            maxId = Math.max(maxId, opt.getId());
+        ids = (T[])Array.newInstance(klass, maxId + 1);
+        for (T opt : values)
+        {
+            if (ids[opt.getId()] != null)
+                throw new IllegalStateException(String.format("Duplicate option id %d", opt.getId()));
+            ids[opt.getId()] = opt;
+        }
+    }
+
+    private T fromId(int id)
+    {
+        T opt = ids[id];
+        if (opt == null)
+            throw new ProtocolException(String.format("Unknown option id %d", id));
+        return opt;
+    }
+
+    public Map<T, Object> decode(ChannelBuffer body)
+    {
+        EnumMap<T, Object> options = new EnumMap<T, Object>(klass);
+        int n = body.readUnsignedShort();
+        for (int i = 0; i < n; i++)
+        {
+            T opt = fromId(body.readUnsignedShort());
+            Object value = opt.readValue(body);
+            if (options.containsKey(opt))
+                throw new ProtocolException(String.format("Duplicate option %d in message", opt));
+            options.put(opt, value);
+        }
+        return options;
+    }
+
+    public ChannelBuffer encode(Map<T, Object> options)
+    {
+        int optLength = 2;
+        for (Map.Entry<T, Object> entry : options.entrySet())
+            optLength += 2 + entry.getKey().serializedValueSize(entry.getValue());
+        ChannelBuffer cb = ChannelBuffers.buffer(optLength);
+        cb.writeShort(options.size());
+        for (Map.Entry<T, Object> entry : options.entrySet())
+        {
+            T opt = entry.getKey();
+            cb.writeShort(opt.getId());
+            opt.writeValue(entry.getValue(), cb);
+        }
+        return cb;
+    }
+
+    public Pair<T, Object> decodeOne(ChannelBuffer body)
+    {
+        T opt = fromId(body.readUnsignedShort());
+        Object value = opt.readValue(body);
+        return Pair.create(opt, value);
+    }
+
+    public ChannelBuffer encodeOne(Pair<T, Object> option)
+    {
+        T opt = option.left;
+        Object obj = option.right;
+
+        int l = 2 + opt.serializedValueSize(obj);
+        ChannelBuffer cb = ChannelBuffers.buffer(l);
+
+        cb.writeShort(opt.getId());
+        opt.writeValue(obj, cb);
+        return cb;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6d3a3eed/src/java/org/apache/cassandra/transport/ProtocolException.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/ProtocolException.java b/src/java/org/apache/cassandra/transport/ProtocolException.java
new file mode 100644
index 0000000..15112e1
--- /dev/null
+++ b/src/java/org/apache/cassandra/transport/ProtocolException.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.transport;
+
+/**
+ * Exceptions thrown when a client didn't not respect the protocol.
+ */
+public class ProtocolException extends RuntimeException
+{
+    public ProtocolException(String msg)
+    {
+        super(msg);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6d3a3eed/src/java/org/apache/cassandra/transport/RequestThreadPoolExecutor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/RequestThreadPoolExecutor.java b/src/java/org/apache/cassandra/transport/RequestThreadPoolExecutor.java
new file mode 100644
index 0000000..c7d19d7
--- /dev/null
+++ b/src/java/org/apache/cassandra/transport/RequestThreadPoolExecutor.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.transport;
+
+import java.util.concurrent.TimeUnit;
+
+import org.jboss.netty.handler.execution.OrderedMemoryAwareThreadPoolExecutor;
+import org.jboss.netty.util.DefaultObjectSizeEstimator;
+import org.jboss.netty.util.ObjectSizeEstimator;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.concurrent.NamedThreadFactory;
+
+public class RequestThreadPoolExecutor extends OrderedMemoryAwareThreadPoolExecutor
+{
+    private final static int CORE_THREAD_TIMEOUT_SEC = 30;
+
+    public RequestThreadPoolExecutor()
+    {
+        super(DatabaseDescriptor.getNativeTransportMaxThreads(),
+              0, 0,
+              CORE_THREAD_TIMEOUT_SEC, TimeUnit.SECONDS,
+              new NamedThreadFactory("Native-Transport-Requests"));
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6d3a3eed/src/java/org/apache/cassandra/transport/Server.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/Server.java b/src/java/org/apache/cassandra/transport/Server.java
new file mode 100644
index 0000000..8387179
--- /dev/null
+++ b/src/java/org/apache/cassandra/transport/Server.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.transport;
+
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.jboss.netty.bootstrap.ServerBootstrap;
+import org.jboss.netty.channel.*;
+import org.jboss.netty.channel.group.ChannelGroup;
+import org.jboss.netty.channel.group.DefaultChannelGroup;
+import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
+import org.jboss.netty.handler.execution.ExecutionHandler;
+import org.jboss.netty.handler.logging.LoggingHandler;
+import org.jboss.netty.logging.InternalLoggerFactory;
+import org.jboss.netty.logging.Slf4JLoggerFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.service.CassandraDaemon;
+
+public class Server implements CassandraDaemon.Server
+{
+    static
+    {
+        InternalLoggerFactory.setDefaultFactory(new Slf4JLoggerFactory());
+    }
+
+    private static final Logger logger = LoggerFactory.getLogger(Server.class);
+
+    private final ConnectionTracker connectionTracker = new ConnectionTracker();
+
+    public final InetSocketAddress socket;
+    private final AtomicBoolean isRunning = new AtomicBoolean(false);
+
+    private ChannelFactory factory;
+    private ExecutionHandler executionHandler;
+
+    public Server(InetSocketAddress socket)
+    {
+        this.socket = socket;
+    }
+
+    public Server(String hostname, int port)
+    {
+        this(new InetSocketAddress(hostname, port));
+    }
+
+    public Server(InetAddress host, int port)
+    {
+        this(new InetSocketAddress(host, port));
+    }
+
+    public Server(int port)
+    {
+        this(new InetSocketAddress(port));
+    }
+
+    public void start()
+    {
+        if (isRunning.compareAndSet(false, true))
+            run();
+    }
+
+    public void stop()
+    {
+        if (isRunning.compareAndSet(true, false))
+            close();
+    }
+
+    public boolean isRunning()
+    {
+        return isRunning.get();
+    }
+
+    public void run()
+    {
+        // Configure the server.
+        executionHandler = new ExecutionHandler(new RequestThreadPoolExecutor());
+        factory = new NioServerSocketChannelFactory(Executors.newCachedThreadPool(), Executors.newCachedThreadPool());
+        ServerBootstrap bootstrap = new ServerBootstrap(factory);
+
+        // Set up the event pipeline factory.
+        bootstrap.setPipelineFactory(new PipelineFactory(this));
+
+        // Bind and start to accept incoming connections.
+        logger.info("Starting listening for CQL clients on " + socket + "...");
+        Channel channel = bootstrap.bind(socket);
+        connectionTracker.allChannels.add(channel);
+    }
+
+    public void close()
+    {
+        // Close opened connections
+        connectionTracker.closeAll();
+        factory.releaseExternalResources();
+        factory = null;
+        executionHandler.releaseExternalResources();
+        executionHandler = null;
+    }
+
+    private static class ConnectionTracker implements Connection.Tracker
+    {
+        public final ChannelLocal<Connection> openedConnections = new ChannelLocal<Connection>(true);
+        public final ChannelGroup allChannels = new DefaultChannelGroup();
+
+        public void addConnection(Channel ch, Connection connection)
+        {
+            allChannels.add(ch);
+            openedConnections.set(ch, connection);
+        }
+
+        public void closeAll()
+        {
+            allChannels.close().awaitUninterruptibly();
+        }
+    }
+
+    private static class PipelineFactory implements ChannelPipelineFactory
+    {
+        // Stateless handlers
+        private static final Message.ProtocolDecoder messageDecoder = new Message.ProtocolDecoder();
+        private static final Message.ProtocolEncoder messageEncoder = new Message.ProtocolEncoder();
+        private static final Frame.Decompressor frameDecompressor = new Frame.Decompressor();
+        private static final Frame.Compressor frameCompressor = new Frame.Compressor();
+        private static final Frame.Encoder frameEncoder = new Frame.Encoder();
+        private static final Message.Dispatcher dispatcher = new Message.Dispatcher();
+
+        private final Server server;
+
+        public PipelineFactory(Server server)
+        {
+            this.server = server;
+        }
+
+        public ChannelPipeline getPipeline() throws Exception
+        {
+            ChannelPipeline pipeline = Channels.pipeline();
+
+            //pipeline.addLast("debug", new LoggingHandler());
+
+            pipeline.addLast("frameDecoder", new Frame.Decoder(server.connectionTracker, Connection.SERVER_FACTORY));
+            pipeline.addLast("frameEncoder", frameEncoder);
+
+            pipeline.addLast("frameDecompressor", frameDecompressor);
+            pipeline.addLast("frameCompressor", frameCompressor);
+
+            pipeline.addLast("messageDecoder", messageDecoder);
+            pipeline.addLast("messageEncoder", messageEncoder);
+
+            pipeline.addLast("executor", server.executionHandler);
+
+            pipeline.addLast("dispatcher", dispatcher);
+
+            return pipeline;
+      }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6d3a3eed/src/java/org/apache/cassandra/transport/SimpleClient.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/SimpleClient.java b/src/java/org/apache/cassandra/transport/SimpleClient.java
new file mode 100644
index 0000000..4c62d9b
--- /dev/null
+++ b/src/java/org/apache/cassandra/transport/SimpleClient.java
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.transport;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.util.concurrent.*;
+import java.util.*;
+
+import com.google.common.base.Splitter;
+
+import org.jboss.netty.bootstrap.ClientBootstrap;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.Channels;
+import org.jboss.netty.channel.ChannelFuture;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.channel.ChannelPipeline;
+import org.jboss.netty.channel.ChannelPipelineFactory;
+import org.jboss.netty.channel.MessageEvent;
+import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
+import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
+import org.jboss.netty.logging.InternalLoggerFactory;
+import org.jboss.netty.logging.Slf4JLoggerFactory;
+
+import org.apache.cassandra.transport.messages.*;
+import org.apache.cassandra.db.marshal.*;
+import org.apache.cassandra.service.ClientState;
+import org.apache.cassandra.utils.SimpleCondition;
+
+public class SimpleClient
+{
+    static
+    {
+        InternalLoggerFactory.setDefaultFactory(new Slf4JLoggerFactory());
+    }
+
+    public final String host;
+    public final int port;
+
+    protected final ResponseHandler responseHandler = new ResponseHandler();
+    protected final ClientConnection connection = new ClientConnection();
+    protected final Connection.Tracker tracker = new ConnectionTracker();
+    protected ClientBootstrap bootstrap;
+    protected Channel channel;
+    protected ChannelFuture lastWriteFuture;
+
+    private final Connection.Factory connectionFactory = new Connection.Factory()
+    {
+        public Connection newConnection()
+        {
+            return connection;
+        }
+    };
+
+    public SimpleClient(String host, int port)
+    {
+        this.host = host;
+        this.port = port;
+    }
+
+    public void connect(boolean useCompression) throws IOException
+    {
+        establishConnection();
+
+        EnumMap<StartupMessage.Option, Object> options = new EnumMap<StartupMessage.Option, Object>(StartupMessage.Option.class);
+        if (useCompression)
+        {
+            options.put(StartupMessage.Option.COMPRESSION, "snappy");
+            connection.setCompressor(FrameCompressor.SnappyCompressor.instance);
+        }
+        execute(new StartupMessage("3.0.0", options));
+    }
+
+    protected void establishConnection() throws IOException
+    {
+        // Configure the client.
+        bootstrap = new ClientBootstrap(
+                        new NioClientSocketChannelFactory(
+                            Executors.newCachedThreadPool(),
+                            Executors.newCachedThreadPool()));
+
+        // Configure the pipeline factory.
+        bootstrap.setPipelineFactory(new PipelineFactory());
+        ChannelFuture future = bootstrap.connect(new InetSocketAddress(host, port));
+
+        // Wait until the connection attempt succeeds or fails.
+        channel = future.awaitUninterruptibly().getChannel();
+        if (!future.isSuccess())
+        {
+            bootstrap.releaseExternalResources();
+            throw new IOException("Connection Error", future.getCause());
+        }
+    }
+
+    public void login(Map<String, String> credentials)
+    {
+        CredentialsMessage msg = new CredentialsMessage();
+        msg.credentials.putAll(credentials);
+        execute(msg);
+    }
+
+    public ResultMessage execute(String query)
+    {
+        Message.Response msg = execute(new QueryMessage(query));
+        assert msg instanceof ResultMessage;
+        return (ResultMessage)msg;
+    }
+
+    public ResultMessage.Prepared prepare(String query)
+    {
+        Message.Response msg = execute(new PrepareMessage(query));
+        assert msg instanceof ResultMessage.Prepared;
+        return (ResultMessage.Prepared)msg;
+    }
+
+    public ResultMessage executePrepared(int statementId, List<ByteBuffer> values)
+    {
+        Message.Response msg = execute(new ExecuteMessage(statementId, values));
+        assert msg instanceof ResultMessage;
+        return (ResultMessage)msg;
+    }
+
+    public void close()
+    {
+        // Wait until all messages are flushed before closing the channel.
+        if (lastWriteFuture != null)
+            lastWriteFuture.awaitUninterruptibly();
+
+        // Close the connection.  Make sure the close operation ends because
+        // all I/O operations are asynchronous in Netty.
+        channel.close().awaitUninterruptibly();
+
+        // Shut down all thread pools to exit.
+        bootstrap.releaseExternalResources();
+    }
+
+    protected Message.Response execute(Message.Request request)
+    {
+        try
+        {
+            request.attach(connection);
+            lastWriteFuture = channel.write(request);
+            Message.Response msg = responseHandler.responses.take();
+            if (msg instanceof ErrorMessage)
+                throw new RuntimeException(((ErrorMessage)msg).errorMsg);
+            return msg;
+        }
+        catch (InterruptedException e)
+        {
+            throw new RuntimeException(e);
+        }
+    }
+
+    protected static class ClientConnection extends Connection
+    {
+        public ClientState clientState()
+        {
+            return null;
+        }
+
+        public void validateNewMessage(Message.Type type) {}
+
+        public void applyStateTransition(Message.Type requestType, Message.Type responseType) {}
+    }
+
+    // Stateless handlers
+    private static final Message.ProtocolDecoder messageDecoder = new Message.ProtocolDecoder();
+    private static final Message.ProtocolEncoder messageEncoder = new Message.ProtocolEncoder();
+    private static final Frame.Decompressor frameDecompressor = new Frame.Decompressor();
+    private static final Frame.Compressor frameCompressor = new Frame.Compressor();
+    private static final Frame.Encoder frameEncoder = new Frame.Encoder();
+
+    private static class ConnectionTracker implements Connection.Tracker
+    {
+        public void addConnection(Channel ch, Connection connection) {}
+        public void closeAll() {}
+    }
+
+    private class PipelineFactory implements ChannelPipelineFactory
+    {
+        public ChannelPipeline getPipeline() throws Exception
+        {
+            ChannelPipeline pipeline = Channels.pipeline();
+
+            //pipeline.addLast("debug", new LoggingHandler());
+
+            pipeline.addLast("frameDecoder", new Frame.Decoder(tracker, connectionFactory));
+            pipeline.addLast("frameEncoder", frameEncoder);
+
+            pipeline.addLast("frameDecompressor", frameDecompressor);
+            pipeline.addLast("frameCompressor", frameCompressor);
+
+            pipeline.addLast("messageDecoder", messageDecoder);
+            pipeline.addLast("messageEncoder", messageEncoder);
+
+            pipeline.addLast("handler", responseHandler);
+
+            return pipeline;
+        }
+    }
+
+    private static class ResponseHandler extends SimpleChannelUpstreamHandler
+    {
+        public final BlockingQueue<Message.Response> responses = new SynchronousQueue<Message.Response>(true);
+
+        @Override
+        public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
+        {
+            assert e.getMessage() instanceof Message.Response;
+            try
+            {
+                responses.put((Message.Response)e.getMessage());
+            }
+            catch (InterruptedException ie)
+            {
+                throw new RuntimeException(ie);
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6d3a3eed/src/java/org/apache/cassandra/transport/messages/AuthenticateMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/AuthenticateMessage.java b/src/java/org/apache/cassandra/transport/messages/AuthenticateMessage.java
new file mode 100644
index 0000000..24c8cbc
--- /dev/null
+++ b/src/java/org/apache/cassandra/transport/messages/AuthenticateMessage.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.transport.messages;
+
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBuffers;
+
+import org.apache.cassandra.cql3.QueryProcessor;
+import org.apache.cassandra.transport.CBUtil;
+import org.apache.cassandra.transport.FrameCompressor;
+import org.apache.cassandra.transport.Message;
+
+/**
+ * Message to indicate that the server is ready to receive requests.
+ */
+public class AuthenticateMessage extends Message.Response
+{
+    public static final Message.Codec<AuthenticateMessage> codec = new Message.Codec<AuthenticateMessage>()
+    {
+        public AuthenticateMessage decode(ChannelBuffer body)
+        {
+            String authenticator = CBUtil.readString(body);
+            return new AuthenticateMessage(authenticator);
+        }
+
+        public ChannelBuffer encode(AuthenticateMessage msg)
+        {
+            return CBUtil.stringToCB(msg.authenticator);
+        }
+    };
+
+    public final String authenticator;
+
+    public AuthenticateMessage(String authenticator)
+    {
+        super(Message.Type.AUTHENTICATE);
+        this.authenticator = authenticator;
+    }
+
+    public ChannelBuffer encode()
+    {
+        return codec.encode(this);
+    }
+
+    @Override
+    public String toString()
+    {
+        return "AUTHENTICATE " + authenticator;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6d3a3eed/src/java/org/apache/cassandra/transport/messages/CredentialsMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/CredentialsMessage.java b/src/java/org/apache/cassandra/transport/messages/CredentialsMessage.java
new file mode 100644
index 0000000..43ed9dd
--- /dev/null
+++ b/src/java/org/apache/cassandra/transport/messages/CredentialsMessage.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.transport.messages;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBuffers;
+
+import org.apache.cassandra.transport.CBUtil;
+import org.apache.cassandra.transport.Message;
+import org.apache.cassandra.utils.SemanticVersion;
+import org.apache.cassandra.thrift.AuthenticationException;
+
+/**
+ * Message to indicate that the server is ready to receive requests.
+ */
+public class CredentialsMessage extends Message.Request
+{
+    public static final Message.Codec<CredentialsMessage> codec = new Message.Codec<CredentialsMessage>()
+    {
+        public CredentialsMessage decode(ChannelBuffer body)
+        {
+            CredentialsMessage msg = new CredentialsMessage();
+            int count = body.readUnsignedShort();
+            for (int i = 0; i < count; i++)
+            {
+                String key = CBUtil.readString(body);
+                String value = CBUtil.readString(body);
+                msg.credentials.put(key, value);
+            }
+            return msg;
+        }
+
+        public ChannelBuffer encode(CredentialsMessage msg)
+        {
+            ChannelBuffer cb = ChannelBuffers.dynamicBuffer();
+
+            cb.writeShort(msg.credentials.size());
+            for (Map.Entry<String, String> entry : msg.credentials.entrySet())
+            {
+                cb.writeBytes(CBUtil.stringToCB(entry.getKey()));
+                cb.writeBytes(CBUtil.stringToCB(entry.getValue()));
+            }
+            return cb;
+        }
+    };
+
+    public final Map<String, String> credentials = new HashMap<String, String>();
+
+    public CredentialsMessage()
+    {
+        super(Message.Type.CREDENTIALS);
+    }
+
+    public ChannelBuffer encode()
+    {
+        return codec.encode(this);
+    }
+
+    public Message.Response execute()
+    {
+        try
+        {
+            connection.clientState().login(credentials);
+            return new ReadyMessage();
+        }
+        catch (AuthenticationException e)
+        {
+            return ErrorMessage.fromException(e);
+        }
+    }
+
+    @Override
+    public String toString()
+    {
+        return "CREDENTIALS " + credentials;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6d3a3eed/src/java/org/apache/cassandra/transport/messages/ErrorMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/ErrorMessage.java b/src/java/org/apache/cassandra/transport/messages/ErrorMessage.java
new file mode 100644
index 0000000..1cec3fc
--- /dev/null
+++ b/src/java/org/apache/cassandra/transport/messages/ErrorMessage.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.transport.messages;
+
+import java.util.concurrent.TimeoutException;
+
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBuffers;
+
+import org.apache.cassandra.transport.CBUtil;
+import org.apache.cassandra.transport.Message;
+import org.apache.cassandra.transport.ProtocolException;
+import org.apache.cassandra.thrift.AuthenticationException;
+import org.apache.cassandra.thrift.InvalidRequestException;
+import org.apache.cassandra.thrift.SchemaDisagreementException;
+import org.apache.cassandra.thrift.TimedOutException;
+import org.apache.cassandra.thrift.UnavailableException;
+
+/**
+ * Message to indicate an error to the client.
+ *
+ * Error codes are:
+ *   0x0000: Server error
+ *   0x0001: Protocol error
+ *   0x0002: Authentication error
+ *   0x0100: Unavailable exception
+ *   0x0101: Timeout exception
+ *   0x0102: Schema disagreement exception
+ *   0x0200: Request exception
+ */
+public class ErrorMessage extends Message.Response
+{
+    public static final Message.Codec<ErrorMessage> codec = new Message.Codec<ErrorMessage>()
+    {
+        public ErrorMessage decode(ChannelBuffer body)
+        {
+            int code = body.readInt();
+            String msg = CBUtil.readString(body);
+            return new ErrorMessage(code, msg);
+        }
+
+        public ChannelBuffer encode(ErrorMessage msg)
+        {
+            ChannelBuffer ccb = CBUtil.intToCB(msg.code);
+            ChannelBuffer mcb = CBUtil.stringToCB(msg.errorMsg);
+            return ChannelBuffers.wrappedBuffer(ccb, mcb);
+        }
+    };
+
+    // We need to figure error codes out (#3979)
+    public final int code;
+    public final String errorMsg;
+
+    public ErrorMessage(int code, String errorMsg)
+    {
+        super(Message.Type.ERROR);
+        this.code = code;
+        this.errorMsg = errorMsg;
+    }
+
+    public static ErrorMessage fromException(Throwable t)
+    {
+        String msg = t.getMessage() == null ? t.toString() : t.getMessage();
+
+        if (t instanceof TimeoutException || t instanceof TimedOutException)
+            return new ErrorMessage(0x0101, msg);
+        else if (t instanceof UnavailableException)
+            return new ErrorMessage(0x0100, msg);
+        else if (t instanceof SchemaDisagreementException)
+            return new ErrorMessage(0x0102, msg);
+        else if (t instanceof InvalidRequestException)
+            return new ErrorMessage(0x0200, msg);
+        else if (t instanceof ProtocolException)
+            return new ErrorMessage(0x0001, msg);
+        else if (t instanceof AuthenticationException)
+            return new ErrorMessage(0x0002, msg);
+
+        logger.error("Unknown exception during request", t);
+        return new ErrorMessage(0x0000, msg);
+    }
+
+    public ChannelBuffer encode()
+    {
+        return codec.encode(this);
+    }
+
+    @Override
+    public String toString()
+    {
+        return "ERROR " + code + ": " + errorMsg;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6d3a3eed/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java b/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java
new file mode 100644
index 0000000..c5b6c38
--- /dev/null
+++ b/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.transport.messages;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBuffers;
+
+import org.apache.cassandra.cql3.CQLStatement;
+import org.apache.cassandra.cql3.QueryProcessor;
+import org.apache.cassandra.transport.*;
+import org.apache.cassandra.thrift.InvalidRequestException;
+
+public class ExecuteMessage extends Message.Request
+{
+    public static final Message.Codec<ExecuteMessage> codec = new Message.Codec<ExecuteMessage>()
+    {
+        public ExecuteMessage decode(ChannelBuffer body)
+        {
+            int id = body.readInt();
+
+            int count = body.readUnsignedShort();
+            List<ByteBuffer> values = new ArrayList<ByteBuffer>(count);
+            for (int i = 0; i < count; i++)
+                values.add(CBUtil.readValue(body));
+
+            return new ExecuteMessage(id, values);
+        }
+
+        public ChannelBuffer encode(ExecuteMessage msg)
+        {
+            // We have:
+            //   - statementId
+            //   - Number of values
+            //   - The values
+            //   - options
+            int vs = msg.values.size();
+            CBUtil.BufferBuilder builder = new CBUtil.BufferBuilder(2, 0, vs);
+            builder.add(CBUtil.intToCB(msg.statementId));
+            builder.add(CBUtil.shortToCB(vs));
+
+            // Values
+            for (ByteBuffer value : msg.values)
+                builder.addValue(value);
+
+            return builder.build();
+        }
+    };
+
+    public final int statementId;
+    public final List<ByteBuffer> values;
+
+    public ExecuteMessage(int statementId, List<ByteBuffer> values)
+    {
+        super(Message.Type.EXECUTE);
+        this.statementId = statementId;
+        this.values = values;
+    }
+
+    public ChannelBuffer encode()
+    {
+        return codec.encode(this);
+    }
+
+    public Message.Response execute()
+    {
+        try
+        {
+            CQLStatement statement = connection.clientState().getCQL3Prepared().get(statementId);
+
+            if (statement == null)
+                throw new InvalidRequestException(String.format("Prepared query with ID %d not found", statementId));
+
+            return QueryProcessor.processPrepared(statement, connection.clientState(), values);
+        }
+        catch (Exception e)
+        {
+            return ErrorMessage.fromException(e);
+        }
+    }
+
+    @Override
+    public String toString()
+    {
+        return "EXECUTE " + statementId + " with " + values.size() + " values";
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6d3a3eed/src/java/org/apache/cassandra/transport/messages/OptionsMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/OptionsMessage.java b/src/java/org/apache/cassandra/transport/messages/OptionsMessage.java
new file mode 100644
index 0000000..1a028de
--- /dev/null
+++ b/src/java/org/apache/cassandra/transport/messages/OptionsMessage.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.transport.messages;
+
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBuffers;
+
+import org.apache.cassandra.cql3.QueryProcessor;
+import org.apache.cassandra.transport.FrameCompressor;
+import org.apache.cassandra.transport.Message;
+
+/**
+ * Message to indicate that the server is ready to receive requests.
+ */
+public class OptionsMessage extends Message.Request
+{
+    public static final Message.Codec<OptionsMessage> codec = new Message.Codec<OptionsMessage>()
+    {
+        public OptionsMessage decode(ChannelBuffer body)
+        {
+            return new OptionsMessage();
+        }
+
+        public ChannelBuffer encode(OptionsMessage msg)
+        {
+            return ChannelBuffers.EMPTY_BUFFER;
+        }
+    };
+
+    public OptionsMessage()
+    {
+        super(Message.Type.OPTIONS);
+    }
+
+    public ChannelBuffer encode()
+    {
+        return codec.encode(this);
+    }
+
+    public Message.Response execute()
+    {
+        SupportedMessage supported = new SupportedMessage();
+        supported.cqlVersions.add(QueryProcessor.CQL_VERSION.toString());
+        if (FrameCompressor.SnappyCompressor.instance != null)
+            supported.compressions.add("snappy");
+        return supported;
+    }
+
+    @Override
+    public String toString()
+    {
+        return "OPTIONS";
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6d3a3eed/src/java/org/apache/cassandra/transport/messages/PrepareMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/PrepareMessage.java b/src/java/org/apache/cassandra/transport/messages/PrepareMessage.java
new file mode 100644
index 0000000..a430faf
--- /dev/null
+++ b/src/java/org/apache/cassandra/transport/messages/PrepareMessage.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.transport.messages;
+
+import java.util.EnumMap;
+import java.util.Map;
+
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBuffers;
+
+import org.apache.cassandra.cql3.QueryProcessor;
+import org.apache.cassandra.transport.*;
+import org.apache.cassandra.thrift.InvalidRequestException;
+
+public class PrepareMessage extends Message.Request
+{
+    public static final Message.Codec<PrepareMessage> codec = new Message.Codec<PrepareMessage>()
+    {
+        public PrepareMessage decode(ChannelBuffer body)
+        {
+            String query = CBUtil.readLongString(body);
+            return new PrepareMessage(query);
+        }
+
+        public ChannelBuffer encode(PrepareMessage msg)
+        {
+            return CBUtil.longStringToCB(msg.query);
+        }
+    };
+
+    private final String query;
+
+    public PrepareMessage(String query)
+    {
+        super(Message.Type.PREPARE);
+        this.query = query;
+    }
+
+    public ChannelBuffer encode()
+    {
+        return codec.encode(this);
+    }
+
+    public Message.Response execute()
+    {
+        try
+        {
+            return QueryProcessor.prepare(query, connection.clientState());
+        }
+        catch (Exception e)
+        {
+            return ErrorMessage.fromException(e);
+        }
+    }
+
+    @Override
+    public String toString()
+    {
+        return "PREPARE " + query;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6d3a3eed/src/java/org/apache/cassandra/transport/messages/QueryMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/QueryMessage.java b/src/java/org/apache/cassandra/transport/messages/QueryMessage.java
new file mode 100644
index 0000000..d94d8d4
--- /dev/null
+++ b/src/java/org/apache/cassandra/transport/messages/QueryMessage.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.transport.messages;
+
+import java.util.EnumMap;
+import java.util.Map;
+
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBuffers;
+
+import org.apache.cassandra.cql3.QueryProcessor;
+import org.apache.cassandra.transport.*;
+import org.apache.cassandra.thrift.InvalidRequestException;
+import org.apache.cassandra.thrift.SchemaDisagreementException;
+import org.apache.cassandra.thrift.TimedOutException;
+import org.apache.cassandra.thrift.UnavailableException;
+
+/**
+ * A CQL query
+ */
+public class QueryMessage extends Message.Request
+{
+    public static final Message.Codec<QueryMessage> codec = new Message.Codec<QueryMessage>()
+    {
+        public QueryMessage decode(ChannelBuffer body)
+        {
+            String query = CBUtil.readLongString(body);
+            return new QueryMessage(query);
+        }
+
+        public ChannelBuffer encode(QueryMessage msg)
+        {
+            return CBUtil.longStringToCB(msg.query);
+        }
+    };
+
+    public final String query;
+
+    public QueryMessage(String query)
+    {
+        super(Message.Type.QUERY);
+        this.query = query;
+    }
+
+    public ChannelBuffer encode()
+    {
+        return codec.encode(this);
+    }
+
+    public Message.Response execute()
+    {
+        try
+        {
+            return QueryProcessor.process(query, connection.clientState());
+        }
+        catch (Exception e)
+        {
+            if (!((e instanceof UnavailableException)
+               || (e instanceof InvalidRequestException)
+               || (e instanceof TimedOutException)
+               || (e instanceof SchemaDisagreementException)))
+            {
+                logger.error("Unexpected error during query", e);
+            }
+            return ErrorMessage.fromException(e);
+        }
+    }
+
+    @Override
+    public String toString()
+    {
+        return "QUERY " + query;
+    }
+}


Mime
View raw message