cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From slebre...@apache.org
Subject [3/3] git commit: Make prepared statement global instead of per-connection
Date Wed, 26 Sep 2012 12:10:25 GMT
Make prepared statement global instead of per-connection

patch by slebresne; reviewed by jbellis for CASSANDRA-4449


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/ccca5f1e
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/ccca5f1e
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/ccca5f1e

Branch: refs/heads/trunk
Commit: ccca5f1e39c220ddc7ce68883622667229e28113
Parents: f6bb970
Author: Sylvain Lebresne <sylvain@datastax.com>
Authored: Wed Sep 26 14:05:09 2012 +0200
Committer: Sylvain Lebresne <sylvain@datastax.com>
Committed: Wed Sep 26 14:05:09 2012 +0200

----------------------------------------------------------------------
 CHANGES.txt                                        |    1 +
 doc/native_protocol.spec                           |    4 +
 .../org/apache/cassandra/cql3/QueryProcessor.java  |   60 +++++++++---
 .../apache/cassandra/exceptions/ExceptionCode.java |    3 +-
 .../exceptions/PreparedQueryNotFoundException.java |   40 ++++++++
 .../org/apache/cassandra/service/ClientState.java  |   12 ---
 .../apache/cassandra/thrift/CassandraServer.java   |    9 +-
 .../org/apache/cassandra/transport/CBUtil.java     |   20 ++++
 .../org/apache/cassandra/transport/Client.java     |    3 +-
 .../apache/cassandra/transport/SimpleClient.java   |    2 +-
 .../cassandra/transport/messages/ErrorMessage.java |   11 ++
 .../transport/messages/ExecuteMessage.java         |   20 +++--
 .../transport/messages/PrepareMessage.java         |    2 +-
 .../transport/messages/ResultMessage.java          |   27 ++++--
 src/java/org/apache/cassandra/utils/MD5Digest.java |   75 +++++++++++++++
 15 files changed, 242 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/ccca5f1e/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 7314f99..b70c412 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -3,6 +3,7 @@
  * adjust blockFor calculation to account for pending ranges due to node 
    movement (CASSANDRA-833)
  * Change CQL version to 3.0.0 and stop accepting 3.0.0-beta1 (CASSANDRA-4649)
+ * Make prepared statement global instead of per connection (CASSANDRA-4449)
 
 1.2-beta1
  * add atomic_batch_mutate (CASSANDRA-4542, -4635)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ccca5f1e/doc/native_protocol.spec
----------------------------------------------------------------------
diff --git a/doc/native_protocol.spec b/doc/native_protocol.spec
index 0dd5c14..9a44697 100644
--- a/doc/native_protocol.spec
+++ b/doc/native_protocol.spec
@@ -520,3 +520,7 @@ Table of Contents
                         already exists. If the query was attempting to create a
                         keyspace, <table> will be present but will be the empty
                         string.
+    0x2500    Unprepared: Can be thrown while a prepared statement tries to be
+              executed if the provide prepared statement ID is not known by
+              this host. The rest of the ERROR message body will be [bytes]
+              representing the unknown ID.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ccca5f1e/src/java/org/apache/cassandra/cql3/QueryProcessor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/QueryProcessor.java b/src/java/org/apache/cassandra/cql3/QueryProcessor.java
index 146f775..856f6fd 100644
--- a/src/java/org/apache/cassandra/cql3/QueryProcessor.java
+++ b/src/java/org/apache/cassandra/cql3/QueryProcessor.java
@@ -20,6 +20,7 @@ package org.apache.cassandra.cql3;
 import java.nio.ByteBuffer;
 import java.util.*;
 
+import com.googlecode.concurrentlinkedhashmap.ConcurrentLinkedHashMap;
 import org.antlr.runtime.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -34,6 +35,7 @@ import org.apache.cassandra.exceptions.*;
 import org.apache.cassandra.service.ClientState;
 import org.apache.cassandra.thrift.SchemaDisagreementException;
 import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.MD5Digest;
 import org.apache.cassandra.utils.SemanticVersion;
 
 public class QueryProcessor
@@ -42,6 +44,26 @@ public class QueryProcessor
 
     private static final Logger logger = LoggerFactory.getLogger(QueryProcessor.class);
 
+    public static final int MAX_CACHE_PREPARED = 100000; // Enough to keep buggy clients
from OOM'ing us
+    private static final Map<MD5Digest, CQLStatement> preparedStatements = new ConcurrentLinkedHashMap.Builder<MD5Digest,
CQLStatement>()
+                                                                               .maximumWeightedCapacity(MAX_CACHE_PREPARED)
+                                                                               .build();
+
+    private static final Map<Integer, CQLStatement> thriftPreparedStatements = new
ConcurrentLinkedHashMap.Builder<Integer, CQLStatement>()
+                                                                                   .maximumWeightedCapacity(MAX_CACHE_PREPARED)
+                                                                                   .build();
+
+
+    public static CQLStatement getPrepared(MD5Digest id)
+    {
+        return preparedStatements.get(id);
+    }
+
+    public static CQLStatement getPrepared(Integer id)
+    {
+        return thriftPreparedStatements.get(id);
+    }
+
     public static void validateKey(ByteBuffer key) throws InvalidRequestException
     {
         if (key == null || key.remaining() == 0)
@@ -151,20 +173,38 @@ public class QueryProcessor
         }
     }
 
-    public static ResultMessage.Prepared prepare(String queryString, ClientState clientState)
+    public static ResultMessage.Prepared prepare(String queryString, ClientState clientState,
boolean forThrift)
     throws RequestValidationException
     {
         logger.trace("CQL QUERY: {}", queryString);
 
         ParsedStatement.Prepared prepared = getStatement(queryString, clientState);
-        int statementId = makeStatementId(queryString);
-        clientState.getCQL3Prepared().put(statementId, prepared.statement);
-        logger.trace(String.format("Stored prepared statement #%d with %d bind markers",
-                                   statementId,
-                                   prepared.statement.getBoundsTerms()));
+        ResultMessage.Prepared msg = storePreparedStatement(queryString, clientState, prepared,
forThrift);
 
         assert prepared.statement.getBoundsTerms() == prepared.boundNames.size();
-        return new ResultMessage.Prepared(statementId, prepared.boundNames);
+        return msg;
+    }
+
+    private static ResultMessage.Prepared storePreparedStatement(String queryString, ClientState
clientState, ParsedStatement.Prepared prepared, boolean forThrift)
+    {
+        if (forThrift)
+        {
+            int statementId = queryString.hashCode();
+            thriftPreparedStatements.put(statementId, prepared.statement);
+            logger.trace(String.format("Stored prepared statement #%d with %d bind markers",
+                                       statementId,
+                                       prepared.statement.getBoundsTerms()));
+            return ResultMessage.Prepared.forThrift(statementId, prepared.boundNames);
+        }
+        else
+        {
+            MD5Digest statementId = MD5Digest.compute(queryString);
+            logger.trace(String.format("Stored prepared statement %s with %d bind markers",
+                                       statementId,
+                                       prepared.statement.getBoundsTerms()));
+            preparedStatements.put(statementId, prepared.statement);
+            return new ResultMessage.Prepared(statementId, prepared.boundNames);
+        }
     }
 
     public static ResultMessage processPrepared(CQLStatement statement, ClientState clientState,
List<ByteBuffer> variables)
@@ -188,12 +228,6 @@ public class QueryProcessor
         return processStatement(statement, clientState, variables);
     }
 
-    private static final int makeStatementId(String cql)
-    {
-        // use the hash of the string till something better is provided
-        return cql.hashCode();
-    }
-
     private static ParsedStatement.Prepared getStatement(String queryStr, ClientState clientState)
     throws RequestValidationException
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ccca5f1e/src/java/org/apache/cassandra/exceptions/ExceptionCode.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/exceptions/ExceptionCode.java b/src/java/org/apache/cassandra/exceptions/ExceptionCode.java
index 13fcc6a..e8dfb4e 100644
--- a/src/java/org/apache/cassandra/exceptions/ExceptionCode.java
+++ b/src/java/org/apache/cassandra/exceptions/ExceptionCode.java
@@ -43,7 +43,8 @@ public enum ExceptionCode
     UNAUTHORIZED    (0x2100),
     INVALID         (0x2200),
     CONFIG_ERROR    (0x2300),
-    ALREADY_EXISTS  (0x2400);
+    ALREADY_EXISTS  (0x2400),
+    UNPREPARED      (0x2500);
 
     public final int value;
     private static final Map<Integer, ExceptionCode> valueToCode = new HashMap<Integer,
ExceptionCode>(ExceptionCode.values().length);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ccca5f1e/src/java/org/apache/cassandra/exceptions/PreparedQueryNotFoundException.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/exceptions/PreparedQueryNotFoundException.java
b/src/java/org/apache/cassandra/exceptions/PreparedQueryNotFoundException.java
new file mode 100644
index 0000000..07502c8
--- /dev/null
+++ b/src/java/org/apache/cassandra/exceptions/PreparedQueryNotFoundException.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.exceptions;
+
+import org.apache.cassandra.cql3.QueryProcessor;
+import org.apache.cassandra.utils.MD5Digest;
+
+public class PreparedQueryNotFoundException extends RequestValidationException
+{
+    public final MD5Digest id;
+
+    public PreparedQueryNotFoundException(MD5Digest id)
+    {
+        super(ExceptionCode.UNPREPARED, makeMsg(id));
+        this.id = id;
+    }
+
+    private static String makeMsg(MD5Digest id)
+    {
+        return String.format("Prepared query with ID %d not found" +
+                             " (either the query was not prepared on this host (maybe the
host has been restarted?)" +
+                             " or you have prepared more than %d queries and queries %d has
been evicted from the internal cache)",
+                             id, QueryProcessor.MAX_CACHE_PREPARED, id);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ccca5f1e/src/java/org/apache/cassandra/service/ClientState.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/ClientState.java b/src/java/org/apache/cassandra/service/ClientState.java
index ef640af..6266a3b 100644
--- a/src/java/org/apache/cassandra/service/ClientState.java
+++ b/src/java/org/apache/cassandra/service/ClientState.java
@@ -65,12 +65,6 @@ public class ClientState
         }
     };
 
-    private final Map<Integer, org.apache.cassandra.cql3.CQLStatement> cql3Prepared
= new LinkedHashMap<Integer, org.apache.cassandra.cql3.CQLStatement>(16, 0.75f, true)
{
-        protected boolean removeEldestEntry(Map.Entry<Integer, org.apache.cassandra.cql3.CQLStatement>
eldest) {
-            return size() > MAX_CACHE_PREPARED;
-        }
-    };
-
     private long clock;
 
     // internalCall is used to mark ClientState as used by some internal component
@@ -96,11 +90,6 @@ public class ClientState
         return prepared;
     }
 
-    public Map<Integer, org.apache.cassandra.cql3.CQLStatement> getCQL3Prepared()
-    {
-        return cql3Prepared;
-    }
-
     public String getRawKeyspace()
     {
         return keyspace;
@@ -191,7 +180,6 @@ public class ClientState
         preparedTracingSession = null;
         resourceClear();
         prepared.clear();
-        cql3Prepared.clear();
     }
 
     public void hasKeyspaceAccess(String keyspace, Permission perm) throws UnauthorizedException,
InvalidRequestException

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ccca5f1e/src/java/org/apache/cassandra/thrift/CassandraServer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/thrift/CassandraServer.java b/src/java/org/apache/cassandra/thrift/CassandraServer.java
index 73ecd29..4ab19bb 100644
--- a/src/java/org/apache/cassandra/thrift/CassandraServer.java
+++ b/src/java/org/apache/cassandra/thrift/CassandraServer.java
@@ -1705,7 +1705,7 @@ public class CassandraServer implements Cassandra.Iface
             if (cState.getCQLVersion().major == 2)
                 return QueryProcessor.prepare(queryString, cState);
             else
-                return org.apache.cassandra.cql3.QueryProcessor.prepare(queryString, cState).toThriftPreparedResult();
+                return org.apache.cassandra.cql3.QueryProcessor.prepare(queryString, cState,
true).toThriftPreparedResult();
         }
         catch (RequestValidationException e)
         {
@@ -1741,10 +1741,13 @@ public class CassandraServer implements Cassandra.Iface
             }
             else
             {
-                org.apache.cassandra.cql3.CQLStatement statement = cState.getCQL3Prepared().get(itemId);
+                org.apache.cassandra.cql3.CQLStatement statement = org.apache.cassandra.cql3.QueryProcessor.getPrepared(itemId);
 
                 if (statement == null)
-                    throw new InvalidRequestException(String.format("Prepared query with
ID %d not found", itemId));
+                    throw new InvalidRequestException(String.format("Prepared query with
ID %d not found" +
+                                                                    " (either the query was
not prepared on this host (maybe the host has been restarted?)" +
+                                                                    " or you have prepared
more than %d queries and queries %d has been evicted from the internal cache)",
+                                                                    itemId, org.apache.cassandra.cql3.QueryProcessor.MAX_CACHE_PREPARED,
itemId));
                 logger.trace("Retrieved prepared statement #{} with {} bind markers", itemId,
                         statement.getBoundsTerms());
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ccca5f1e/src/java/org/apache/cassandra/transport/CBUtil.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/CBUtil.java b/src/java/org/apache/cassandra/transport/CBUtil.java
index b977f35..fe8863a 100644
--- a/src/java/org/apache/cassandra/transport/CBUtil.java
+++ b/src/java/org/apache/cassandra/transport/CBUtil.java
@@ -111,6 +111,26 @@ public abstract class CBUtil
         return ChannelBuffers.wrappedBuffer(shortToCB(bytes.readableBytes()), bytes);
     }
 
+    public static ChannelBuffer bytesToCB(byte[] bytes)
+    {
+        return ChannelBuffers.wrappedBuffer(shortToCB(bytes.length), ChannelBuffers.wrappedBuffer(bytes));
+    }
+
+    public static byte[] readBytes(ChannelBuffer cb)
+    {
+        try
+        {
+            int length = cb.readUnsignedShort();
+            byte[] bytes = new byte[length];
+            cb.readBytes(bytes);
+            return bytes;
+        }
+        catch (IndexOutOfBoundsException e)
+        {
+            throw new ProtocolException("Not enough bytes to read a byte array preceded by
it's 2 bytes length");
+        }
+    }
+
     public static ChannelBuffer longStringToCB(String str)
     {
         ChannelBuffer bytes = bytes(str);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ccca5f1e/src/java/org/apache/cassandra/transport/Client.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/Client.java b/src/java/org/apache/cassandra/transport/Client.java
index b9e00fa..3b4ace9 100644
--- a/src/java/org/apache/cassandra/transport/Client.java
+++ b/src/java/org/apache/cassandra/transport/Client.java
@@ -27,6 +27,7 @@ import com.google.common.base.Splitter;
 
 import org.apache.cassandra.transport.messages.*;
 import org.apache.cassandra.db.marshal.*;
+import org.apache.cassandra.utils.Hex;
 
 public class Client extends SimpleClient
 {
@@ -109,7 +110,7 @@ public class Client extends SimpleClient
         {
             try
             {
-                int id = Integer.parseInt(iter.next());
+                byte[] id = Hex.hexToBytes(iter.next());
                 List<ByteBuffer> values = new ArrayList<ByteBuffer>();
                 while(iter.hasNext())
                 {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ccca5f1e/src/java/org/apache/cassandra/transport/SimpleClient.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/SimpleClient.java b/src/java/org/apache/cassandra/transport/SimpleClient.java
index ea0a3df..8132e65 100644
--- a/src/java/org/apache/cassandra/transport/SimpleClient.java
+++ b/src/java/org/apache/cassandra/transport/SimpleClient.java
@@ -126,7 +126,7 @@ public class SimpleClient
         return (ResultMessage.Prepared)msg;
     }
 
-    public ResultMessage executePrepared(int statementId, List<ByteBuffer> values)
+    public ResultMessage executePrepared(byte[] statementId, List<ByteBuffer> values)
     {
         Message.Response msg = execute(new ExecuteMessage(statementId, values));
         assert msg instanceof ResultMessage;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ccca5f1e/src/java/org/apache/cassandra/transport/messages/ErrorMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/ErrorMessage.java b/src/java/org/apache/cassandra/transport/messages/ErrorMessage.java
index ecb387b..8ed8e94 100644
--- a/src/java/org/apache/cassandra/transport/messages/ErrorMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/ErrorMessage.java
@@ -33,6 +33,7 @@ import org.apache.cassandra.transport.ProtocolException;
 import org.apache.cassandra.transport.ServerError;
 import org.apache.cassandra.thrift.AuthenticationException;
 import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.MD5Digest;
 
 /**
  * Message to indicate an error to the client.
@@ -91,6 +92,12 @@ public class ErrorMessage extends Message.Response
                         te = new ReadTimeoutException(cl, received, blockFor, dataPresent
!= 0);
                     }
                     break;
+                case UNPREPARED:
+                    {
+                        MD5Digest id = MD5Digest.wrap(CBUtil.readBytes(body));
+                        te = new PreparedQueryNotFoundException(id);
+                    }
+                    break;
                 case SYNTAX_ERROR:
                     te = new SyntaxException(msg);
                     break;
@@ -145,6 +152,10 @@ public class ErrorMessage extends Message.Response
                     if (readEx != null)
                         acb.writeByte((byte)(readEx.dataPresent ? 1 : 0));
                     break;
+                case UNPREPARED:
+                    PreparedQueryNotFoundException pqnfe = (PreparedQueryNotFoundException)msg.error;
+                    acb = CBUtil.bytesToCB(pqnfe.id.bytes);
+                    break;
                 case ALREADY_EXISTS:
                     AlreadyExistsException aee = (AlreadyExistsException)msg.error;
                     acb = ChannelBuffers.wrappedBuffer(CBUtil.stringToCB(aee.ksName),

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ccca5f1e/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java b/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java
index 4172862..4400d12 100644
--- a/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java
@@ -25,8 +25,9 @@ import org.jboss.netty.buffer.ChannelBuffer;
 
 import org.apache.cassandra.cql3.CQLStatement;
 import org.apache.cassandra.cql3.QueryProcessor;
-import org.apache.cassandra.exceptions.InvalidRequestException;
+import org.apache.cassandra.exceptions.PreparedQueryNotFoundException;
 import org.apache.cassandra.transport.*;
+import org.apache.cassandra.utils.MD5Digest;
 
 public class ExecuteMessage extends Message.Request
 {
@@ -34,7 +35,7 @@ public class ExecuteMessage extends Message.Request
     {
         public ExecuteMessage decode(ChannelBuffer body)
         {
-            int id = body.readInt();
+            byte[] id = CBUtil.readBytes(body);
 
             int count = body.readUnsignedShort();
             List<ByteBuffer> values = new ArrayList<ByteBuffer>(count);
@@ -53,7 +54,7 @@ public class ExecuteMessage extends Message.Request
             //   - options
             int vs = msg.values.size();
             CBUtil.BufferBuilder builder = new CBUtil.BufferBuilder(2, 0, vs);
-            builder.add(CBUtil.intToCB(msg.statementId));
+            builder.add(CBUtil.bytesToCB(msg.statementId.bytes));
             builder.add(CBUtil.shortToCB(vs));
 
             // Values
@@ -64,10 +65,15 @@ public class ExecuteMessage extends Message.Request
         }
     };
 
-    public final int statementId;
+    public final MD5Digest statementId;
     public final List<ByteBuffer> values;
 
-    public ExecuteMessage(int statementId, List<ByteBuffer> values)
+    public ExecuteMessage(byte[] statementId, List<ByteBuffer> values)
+    {
+        this(MD5Digest.wrap(statementId), values);
+    }
+
+    public ExecuteMessage(MD5Digest statementId, List<ByteBuffer> values)
     {
         super(Message.Type.EXECUTE);
         this.statementId = statementId;
@@ -84,10 +90,10 @@ public class ExecuteMessage extends Message.Request
         try
         {
             ServerConnection c = (ServerConnection)connection;
-            CQLStatement statement = c.clientState().getCQL3Prepared().get(statementId);
+            CQLStatement statement = QueryProcessor.getPrepared(statementId);
 
             if (statement == null)
-                throw new InvalidRequestException(String.format("Prepared query with ID %d
not found", statementId));
+                throw new PreparedQueryNotFoundException(statementId);
 
             return QueryProcessor.processPrepared(statement, c.clientState(), values);
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ccca5f1e/src/java/org/apache/cassandra/transport/messages/PrepareMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/PrepareMessage.java b/src/java/org/apache/cassandra/transport/messages/PrepareMessage.java
index 5c2636a..382e834 100644
--- a/src/java/org/apache/cassandra/transport/messages/PrepareMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/PrepareMessage.java
@@ -55,7 +55,7 @@ public class PrepareMessage extends Message.Request
     {
         try
         {
-            return QueryProcessor.prepare(query, ((ServerConnection)connection).clientState());
+            return QueryProcessor.prepare(query, ((ServerConnection)connection).clientState(),
false);
         }
         catch (Exception e)
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ccca5f1e/src/java/org/apache/cassandra/transport/messages/ResultMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/ResultMessage.java b/src/java/org/apache/cassandra/transport/messages/ResultMessage.java
index 6b63948..d5009e9 100644
--- a/src/java/org/apache/cassandra/transport/messages/ResultMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/ResultMessage.java
@@ -29,6 +29,7 @@ import org.apache.cassandra.db.marshal.TypeParser;
 import org.apache.cassandra.thrift.CqlPreparedResult;
 import org.apache.cassandra.thrift.CqlResult;
 import org.apache.cassandra.thrift.CqlResultType;
+import org.apache.cassandra.utils.MD5Digest;
 
 public abstract class ResultMessage extends Message.Response
 {
@@ -248,30 +249,40 @@ public abstract class ResultMessage extends Message.Response
         {
             public ResultMessage decode(ChannelBuffer body)
             {
-                int id = body.readInt();
-                return new Prepared(id, ResultSet.Metadata.codec.decode(body));
+                MD5Digest id = MD5Digest.wrap(CBUtil.readBytes(body));
+                return new Prepared(id, -1, ResultSet.Metadata.codec.decode(body));
             }
 
             public ChannelBuffer encode(ResultMessage msg)
             {
                 assert msg instanceof Prepared;
                 Prepared prepared = (Prepared)msg;
-                return ChannelBuffers.wrappedBuffer(CBUtil.intToCB(prepared.statementId),
ResultSet.Metadata.codec.encode(prepared.metadata));
+                assert prepared.statementId != null;
+                return ChannelBuffers.wrappedBuffer(CBUtil.bytesToCB(prepared.statementId.bytes),
ResultSet.Metadata.codec.encode(prepared.metadata));
             }
         };
 
-        public final int statementId;
+        public final MD5Digest statementId;
         public final ResultSet.Metadata metadata;
 
-        public Prepared(int statementId, List<ColumnSpecification> names)
+        // statement id for CQL-over-thrift compatibility. The binary protocol ignore that.
+        private final int thriftStatementId;
+
+        public Prepared(MD5Digest statementId, List<ColumnSpecification> names)
+        {
+            this(statementId, -1, new ResultSet.Metadata(names));
+        }
+
+        public static Prepared forThrift(int statementId, List<ColumnSpecification>
names)
         {
-            this(statementId, new ResultSet.Metadata(names));
+            return new Prepared(null, statementId, new ResultSet.Metadata(names));
         }
 
-        private Prepared(int statementId, ResultSet.Metadata metadata)
+        private Prepared(MD5Digest statementId, int thriftStatementId, ResultSet.Metadata
metadata)
         {
             super(Kind.PREPARED);
             this.statementId = statementId;
+            this.thriftStatementId = thriftStatementId;
             this.metadata = metadata;
         }
 
@@ -294,7 +305,7 @@ public abstract class ResultMessage extends Message.Response
                 namesString.add(name.toString());
                 typesString.add(TypeParser.getShortName(name.type));
             }
-            return new CqlPreparedResult(statementId, metadata.names.size()).setVariable_types(typesString).setVariable_names(namesString);
+            return new CqlPreparedResult(thriftStatementId, metadata.names.size()).setVariable_types(typesString).setVariable_names(namesString);
         }
 
         @Override

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ccca5f1e/src/java/org/apache/cassandra/utils/MD5Digest.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/MD5Digest.java b/src/java/org/apache/cassandra/utils/MD5Digest.java
new file mode 100644
index 0000000..59c1aba
--- /dev/null
+++ b/src/java/org/apache/cassandra/utils/MD5Digest.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.utils;
+
+import java.util.Arrays;
+
+/**
+ * The result of the computation of an MD5 digest.
+ *
+ * A MD5 is really just a byte[] but arrays are a no go as map keys. We could
+ * wrap it in a ByteBuffer but:
+ *   1. MD5Digest is a more explicit name than ByteBuffer to represent a md5.
+ *   2. Using our own class allows to use our FastByteComparison for equals.
+ */
+public class MD5Digest
+{
+    public final byte[] bytes;
+
+    private MD5Digest(byte[] bytes)
+    {
+        this.bytes = bytes;
+    }
+
+    public static MD5Digest wrap(byte[] digest)
+    {
+        return new MD5Digest(digest);
+    }
+
+    public static MD5Digest compute(byte[] toHash)
+    {
+        return new MD5Digest(FBUtilities.threadLocalMD5Digest().digest(toHash));
+    }
+
+    public static MD5Digest compute(String toHash)
+    {
+        return compute(toHash.getBytes());
+    }
+
+    @Override
+    public final int hashCode()
+    {
+        return Arrays.hashCode(bytes);
+    }
+
+    @Override
+    public final boolean equals(Object o)
+    {
+        if(!(o instanceof MD5Digest))
+            return false;
+        MD5Digest that = (MD5Digest)o;
+        // handles nulls properly
+        return FBUtilities.compareUnsigned(this.bytes, that.bytes, 0, 0, this.bytes.length,
that.bytes.length) == 0;
+    }
+
+    @Override
+    public String toString()
+    {
+        return Hex.bytesToHex(bytes);
+    }
+}


Mime
View raw message