cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ifesdj...@apache.org
Subject cassandra git commit: Add result set metadata to prepared statement MD5 hash calculation
Date Wed, 11 Oct 2017 14:16:24 GMT
Repository: cassandra
Updated Branches:
  refs/heads/trunk df147cc09 -> 922dbdb65


Add result set metadata to prepared statement MD5 hash calculation

Patch by Alex Petrov; reviewed by Robert Stupp for CASSANDRA-10786

Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/922dbdb6
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/922dbdb6
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/922dbdb6

Branch: refs/heads/trunk
Commit: 922dbdb658b1693973926026b213153d05b4077c
Parents: df147cc
Author: Alex Petrov <oleksandr.petrov@gmail.com>
Authored: Fri May 13 14:34:03 2016 +0200
Committer: Alex Petrov <oleksandr.petrov@gmail.com>
Committed: Wed Oct 11 16:15:29 2017 +0200

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 build.xml                                       |  12 +-
 doc/native_protocol_v5.spec                     |  27 +-
 lib/cassandra-driver-core-3.0.1-shaded.jar      | Bin 2445093 -> 0 bytes
 ...e-3.4.0-3d1e4f3-java1196-SNAPSHOT-shaded.jar | Bin 0 -> 2613656 bytes
 lib/cassandra-driver-internal-only-3.10.zip     | Bin 256997 -> 0 bytes
 lib/cassandra-driver-internal-only-3.11.zip     | Bin 0 -> 264882 bytes
 .../apache/cassandra/cql3/QueryProcessor.java   |  13 +-
 .../org/apache/cassandra/cql3/ResultSet.java    | 122 ++++++-
 .../cql3/selection/SelectionColumnMapping.java  |   4 +-
 .../statements/ListPermissionsStatement.java    |   3 +-
 .../cql3/statements/ListRolesStatement.java     |   3 +-
 .../cql3/statements/ListUsersStatement.java     |   4 +-
 .../cql3/statements/ModificationStatement.java  |  16 +-
 .../cql3/statements/ParsedStatement.java        |   8 +-
 .../org/apache/cassandra/transport/Client.java  |   6 +-
 .../cassandra/transport/SimpleClient.java       |   4 +-
 .../transport/messages/ExecuteMessage.java      |  48 ++-
 .../transport/messages/ResultMessage.java       |  39 ++-
 .../org/apache/cassandra/cql3/CQLTester.java    |  20 +-
 .../cassandra/cql3/PreparedStatementsTest.java  | 317 ++++++++++++++++---
 .../cassandra/cql3/PstmtPersistenceTest.java    |   4 +-
 .../cql3/validation/entities/JsonTest.java      |  10 +-
 .../validation/operations/SelectLimitTest.java  |   5 +-
 .../cassandra/transport/MessagePayloadTest.java |   6 +-
 .../operations/predefined/CqlOperation.java     |   8 +-
 26 files changed, 540 insertions(+), 140 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/922dbdb6/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 2454c4f..3f2fe15 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.0
+ * Add result set metadata to prepared statement MD5 hash calculation (CASSANDRA-10786)
  * Refactor GcCompactionTest to avoid boxing (CASSANDRA-13941)
  * Checksum sstable metadata (CASSANDRA-13321)
  * Expose recent histograms in JmxHistograms (CASSANDRA-13642)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/922dbdb6/build.xml
----------------------------------------------------------------------
diff --git a/build.xml b/build.xml
index 4e7f9b1..c657211 100644
--- a/build.xml
+++ b/build.xml
@@ -298,7 +298,7 @@
       <!-- define the remote repositories we use -->
       <artifact:remoteRepository id="central"   url="${artifact.remoteRepository.central}"/>
       <artifact:remoteRepository id="apache"    url="${artifact.remoteRepository.apache}"/>
-
+      
       <macrodef name="install">
         <attribute name="pomFile"/>
         <attribute name="file"/>
@@ -423,6 +423,7 @@
           <dependency groupId="io.netty" artifactId="netty-all" version="4.1.14.Final" />
           <dependency groupId="com.google.code.findbugs" artifactId="jsr305" version="2.0.2" />
           <dependency groupId="com.clearspring.analytics" artifactId="stream" version="2.5.2" />
+	  <!-- UPDATE AND UNCOMMENT ON THE DRIVER RELEASE, BEFORE 4.0 RELEASE
           <dependency groupId="com.datastax.cassandra" artifactId="cassandra-driver-core" version="3.0.1" classifier="shaded">
             <exclusion groupId="io.netty" artifactId="netty-buffer"/>
             <exclusion groupId="io.netty" artifactId="netty-codec"/>
@@ -430,6 +431,7 @@
             <exclusion groupId="io.netty" artifactId="netty-transport"/>
             <exclusion groupId="org.slf4j" artifactId="slf4j-api"/>
           </dependency>
+	  -->
           <dependency groupId="org.eclipse.jdt.core.compiler" artifactId="ecj" version="4.4.2" />
           <dependency groupId="org.caffinitas.ohc" artifactId="ohc-core" version="0.4.4">
             <exclusion groupId="org.slf4j" artifactId="slf4j-api"/>
@@ -506,7 +508,9 @@
       	<dependency groupId="org.apache.hadoop" artifactId="hadoop-minicluster"/>
       	<dependency groupId="com.google.code.findbugs" artifactId="jsr305"/>
         <dependency groupId="org.antlr" artifactId="antlr"/>
+	<!-- UPDATE AND UNCOMMENT ON THE DRIVER RELEASE, BEFORE 4.0 RELEASE	
         <dependency groupId="com.datastax.cassandra" artifactId="cassandra-driver-core" classifier="shaded"/>
+	-->
         <dependency groupId="org.eclipse.jdt.core.compiler" artifactId="ecj"/>
         <dependency groupId="org.caffinitas.ohc" artifactId="ohc-core"/>
         <dependency groupId="org.caffinitas.ohc" artifactId="ohc-core-j8"/>
@@ -523,7 +527,9 @@
                 artifactId="cassandra-parent"
                 version="${version}"/>
         <dependency groupId="junit" artifactId="junit"/>
+	<!-- UPDATE AND UNCOMMENT ON THE DRIVER RELEASE, BEFORE 4.0 RELEASE
         <dependency groupId="com.datastax.cassandra" artifactId="cassandra-driver-core" classifier="shaded"/>
+	-->     
         <dependency groupId="io.netty" artifactId="netty-all"/>
         <dependency groupId="org.eclipse.jdt.core.compiler" artifactId="ecj"/>
         <dependency groupId="org.caffinitas.ohc" artifactId="ohc-core"/>
@@ -596,13 +602,15 @@
         <dependency groupId="org.apache.hadoop" artifactId="hadoop-minicluster" optional="true"/>
 
         <!-- don't need the Java Driver to run, but if you use the hadoop stuff or UDFs -->
+	<!-- UPDATE AND UNCOMMENT ON THE DRIVER RELEASE, BEFORE 4.0 RELEASE
         <dependency groupId="com.datastax.cassandra" artifactId="cassandra-driver-core" classifier="shaded" optional="true">
           <exclusion groupId="io.netty" artifactId="netty-buffer"/>
           <exclusion groupId="io.netty" artifactId="netty-codec"/>
           <exclusion groupId="io.netty" artifactId="netty-handler"/>
           <exclusion groupId="io.netty" artifactId="netty-transport"/>
         </dependency>
-
+	-->
+	
         <!-- don't need jna to run, but nice to have -->
         <dependency groupId="net.java.dev.jna" artifactId="jna"/>
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/922dbdb6/doc/native_protocol_v5.spec
----------------------------------------------------------------------
diff --git a/doc/native_protocol_v5.spec b/doc/native_protocol_v5.spec
index 13ac208..0addbc4 100644
--- a/doc/native_protocol_v5.spec
+++ b/doc/native_protocol_v5.spec
@@ -407,12 +407,15 @@ Table of Contents
 4.1.6. EXECUTE
 
   Executes a prepared query. The body of the message must be:
-    <id><query_parameters>
-  where <id> is the prepared query ID. It's the [short bytes] returned as a
-  response to a PREPARE message. As for <query_parameters>, it has the exact
-  same definition as in QUERY (see Section 4.1.4).
-
-  The response from the server will be a RESULT message.
+  <id><result_metadata_id><query_parameters>
+  where
+  - <id> is the prepared query ID. It's the [short bytes] returned as a
+      response to a PREPARE message. As for <query_parameters>, it has the exact
+      same definition as in QUERY (see Section 4.1.4).
+    - <result_metadata_id> is the ID of the resultset metadata that was sent
+      along with response to PREPARE message. If a RESULT/Rows message reports
+      changed resultset metadata with the Metadata_changed flag, the reported new
+      resultset metadata must be used in subsequent executions.
 
 
 4.1.7. BATCH
@@ -583,7 +586,7 @@ Table of Contents
     <metadata><rows_count><rows_content>
   where:
     - <metadata> is composed of:
-        <flags><columns_count>[<paging_state>][<global_table_spec>?<col_spec_1>...<col_spec_n>]
+        <flags><columns_count>[<new_metadata_id>][<paging_state>][<global_table_spec>?<col_spec_1>...<col_spec_n>]
       where:
         - <flags> is an [int]. The bits of <flags> provides information on the
           formatting of the remaining information. A flag is set if the bit
@@ -604,9 +607,16 @@ Table of Contents
                       no other information (so no <global_table_spec> nor <col_spec_i>).
                       This will only ever be the case if this was requested
                       during the query (see QUERY and RESULT messages).
+            0x0008    Metadata_changed: if set, the No_metadata flag has to be unset
+                      and <new_metadata_id> has to be supplied. This flag is to be
+                      used to avoid a roundtrip in case of metadata changes for queries
+                      that requested metadata to be skipped.
         - <columns_count> is an [int] representing the number of columns selected
           by the query that produced this result. It defines the number of <col_spec_i>
           elements in and the number of elements for each row in <rows_content>.
+        - <new_metadata_id> is [short bytes] representing the new, changed resultset
+           metadata. The new metadata ID must also be used in subsequent executions of
+           the corresponding prepared statement, if any.
         - <global_table_spec> is present if the Global_tables_spec is set in
           <flags>. It is composed of two [string] representing the
           (unique) keyspace name and table name the columns belong to.
@@ -688,9 +698,10 @@ Table of Contents
 4.2.5.4. Prepared
 
   The result to a PREPARE message. The body of a Prepared result is:
-    <id><metadata><result_metadata>
+    <id><result_metadata_id><metadata><result_metadata>
   where:
     - <id> is [short bytes] representing the prepared query ID.
+    - <result_metadata_id> is [short bytes] representing the resultset metadata ID.
     - <metadata> is composed of:
         <flags><columns_count><pk_count>[<pk_index_1>...<pk_index_n>][<global_table_spec>?<col_spec_1>...<col_spec_n>]
       where:

http://git-wip-us.apache.org/repos/asf/cassandra/blob/922dbdb6/lib/cassandra-driver-core-3.0.1-shaded.jar
----------------------------------------------------------------------
diff --git a/lib/cassandra-driver-core-3.0.1-shaded.jar b/lib/cassandra-driver-core-3.0.1-shaded.jar
deleted file mode 100644
index bc269a0..0000000
Binary files a/lib/cassandra-driver-core-3.0.1-shaded.jar and /dev/null differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/922dbdb6/lib/cassandra-driver-core-3.4.0-3d1e4f3-java1196-SNAPSHOT-shaded.jar
----------------------------------------------------------------------
diff --git a/lib/cassandra-driver-core-3.4.0-3d1e4f3-java1196-SNAPSHOT-shaded.jar b/lib/cassandra-driver-core-3.4.0-3d1e4f3-java1196-SNAPSHOT-shaded.jar
new file mode 100644
index 0000000..d95a811
Binary files /dev/null and b/lib/cassandra-driver-core-3.4.0-3d1e4f3-java1196-SNAPSHOT-shaded.jar differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/922dbdb6/lib/cassandra-driver-internal-only-3.10.zip
----------------------------------------------------------------------
diff --git a/lib/cassandra-driver-internal-only-3.10.zip b/lib/cassandra-driver-internal-only-3.10.zip
deleted file mode 100644
index 22b877c..0000000
Binary files a/lib/cassandra-driver-internal-only-3.10.zip and /dev/null differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/922dbdb6/lib/cassandra-driver-internal-only-3.11.zip
----------------------------------------------------------------------
diff --git a/lib/cassandra-driver-internal-only-3.11.zip b/lib/cassandra-driver-internal-only-3.11.zip
new file mode 100644
index 0000000..f7760af
Binary files /dev/null and b/lib/cassandra-driver-internal-only-3.11.zip differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/922dbdb6/src/java/org/apache/cassandra/cql3/QueryProcessor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/QueryProcessor.java b/src/java/org/apache/cassandra/cql3/QueryProcessor.java
index ade98e7..3f0b196 100644
--- a/src/java/org/apache/cassandra/cql3/QueryProcessor.java
+++ b/src/java/org/apache/cassandra/cql3/QueryProcessor.java
@@ -28,8 +28,7 @@ import com.github.benmanes.caffeine.cache.Cache;
 import com.github.benmanes.caffeine.cache.Caffeine;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Predicate;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Iterators;
+import com.google.common.collect.*;
 import com.google.common.primitives.Ints;
 import com.google.common.util.concurrent.MoreExecutors;
 import org.slf4j.Logger;
@@ -420,7 +419,10 @@ public class QueryProcessor implements QueryHandler
 
         checkTrue(queryString.equals(existing.rawCQLStatement),
                 String.format("MD5 hash collision: query with the same MD5 hash was already prepared. \n Existing: '%s'", existing.rawCQLStatement));
-        return new ResultMessage.Prepared(statementId, existing);
+
+        ResultSet.PreparedMetadata preparedMetadata = ResultSet.PreparedMetadata.fromPrepared(existing);
+        ResultSet.ResultMetadata resultMetadata = ResultSet.ResultMetadata.fromPrepared(existing);
+        return new ResultMessage.Prepared(statementId, resultMetadata.getResultMetadataId(), preparedMetadata, resultMetadata);
     }
 
     private static ResultMessage.Prepared storePreparedStatement(String queryString, String keyspace, ParsedStatement.Prepared prepared)
@@ -438,7 +440,9 @@ public class QueryProcessor implements QueryHandler
         MD5Digest statementId = computeId(queryString, keyspace);
         preparedStatements.put(statementId, prepared);
         SystemKeyspace.writePreparedStatement(keyspace, statementId, queryString);
-        return new ResultMessage.Prepared(statementId, prepared);
+        ResultSet.PreparedMetadata preparedMetadata = ResultSet.PreparedMetadata.fromPrepared(prepared);
+        ResultSet.ResultMetadata resultMetadata = ResultSet.ResultMetadata.fromPrepared(prepared);
+        return new ResultMessage.Prepared(statementId, resultMetadata.getResultMetadataId(), preparedMetadata, resultMetadata);
     }
 
     public ResultMessage processPrepared(CQLStatement statement,
@@ -464,7 +468,6 @@ public class QueryProcessor implements QueryHandler
                                                                 variables.size()));
 
             // at this point there is a match in count between markers and variables that is non-zero
-
             if (logger.isTraceEnabled())
                 for (int i = 0; i < variables.size(); i++)
                     logger.trace("[{}] '{}'", i+1, variables.get(i));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/922dbdb6/src/java/org/apache/cassandra/cql3/ResultSet.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/ResultSet.java b/src/java/org/apache/cassandra/cql3/ResultSet.java
index 2bb9997..e4b03ca 100644
--- a/src/java/org/apache/cassandra/cql3/ResultSet.java
+++ b/src/java/org/apache/cassandra/cql3/ResultSet.java
@@ -18,14 +18,29 @@
 package org.apache.cassandra.cql3;
 
 import java.nio.ByteBuffer;
-import java.util.*;
+import java.nio.charset.StandardCharsets;
+import java.security.MessageDigest;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Objects;
 
 import io.netty.buffer.ByteBuf;
-
-import org.apache.cassandra.transport.*;
+import org.apache.cassandra.cql3.statements.ModificationStatement;
+import org.apache.cassandra.cql3.statements.ParsedStatement;
+import org.apache.cassandra.cql3.statements.SelectStatement;
 import org.apache.cassandra.db.marshal.AbstractType;
-import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.service.pager.PagingState;
+import org.apache.cassandra.transport.CBCodec;
+import org.apache.cassandra.transport.CBUtil;
+import org.apache.cassandra.transport.DataType;
+import org.apache.cassandra.transport.ProtocolVersion;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.MD5Digest;
 
 public class ResultSet
 {
@@ -34,14 +49,14 @@ public class ResultSet
     public final ResultMetadata metadata;
     public final List<List<ByteBuffer>> rows;
 
-    public ResultSet(List<ColumnSpecification> metadata)
+    public ResultSet(ResultMetadata resultMetadata)
     {
-        this(new ResultMetadata(metadata), new ArrayList<List<ByteBuffer>>());
+        this(resultMetadata, new ArrayList<List<ByteBuffer>>());
     }
 
-    public ResultSet(ResultMetadata metadata, List<List<ByteBuffer>> rows)
+    public ResultSet(ResultMetadata resultMetadata, List<List<ByteBuffer>> rows)
     {
-        this.metadata = metadata;
+        this.metadata = resultMetadata;
         this.rows = rows;
     }
 
@@ -179,7 +194,7 @@ public class ResultSet
     {
         public static final CBCodec<ResultMetadata> codec = new Codec();
 
-        public static final ResultMetadata EMPTY = new ResultMetadata(EnumSet.of(Flag.NO_METADATA), null, 0, null);
+        public static final ResultMetadata EMPTY = new ResultMetadata(MD5Digest.compute(new byte[0]), EnumSet.of(Flag.NO_METADATA), null, 0, null);
 
         private final EnumSet<Flag> flags;
         // Please note that columnCount can actually be smaller than names, even if names is not null. This is
@@ -189,16 +204,27 @@ public class ResultSet
         public final List<ColumnSpecification> names;
         private final int columnCount;
         private PagingState pagingState;
+        private final MD5Digest resultMetadataId;
+
+        public ResultMetadata(MD5Digest digest, List<ColumnSpecification> names)
+        {
+            this(digest, EnumSet.noneOf(Flag.class), names, names.size(), null);
+            if (!names.isEmpty() && ColumnSpecification.allInSameTable(names))
+                flags.add(Flag.GLOBAL_TABLES_SPEC);
+        }
 
+        // Problem is that we compute the metadata from the columns on creation;
+        // when re-preparing we create the intermediate object
         public ResultMetadata(List<ColumnSpecification> names)
         {
-            this(EnumSet.noneOf(Flag.class), names, names.size(), null);
+            this(computeResultMetadataId(names), EnumSet.noneOf(Flag.class), names, names.size(), null);
             if (!names.isEmpty() && ColumnSpecification.allInSameTable(names))
                 flags.add(Flag.GLOBAL_TABLES_SPEC);
         }
 
-        private ResultMetadata(EnumSet<Flag> flags, List<ColumnSpecification> names, int columnCount, PagingState pagingState)
+        private ResultMetadata(MD5Digest resultMetadataId, EnumSet<Flag> flags, List<ColumnSpecification> names, int columnCount, PagingState pagingState)
         {
+            this.resultMetadataId = resultMetadataId;
             this.flags = flags;
             this.names = names;
             this.columnCount = columnCount;
@@ -207,7 +233,7 @@ public class ResultSet
 
         public ResultMetadata copy()
         {
-            return new ResultMetadata(EnumSet.copyOf(flags), names, columnCount, pagingState);
+            return new ResultMetadata(resultMetadataId, EnumSet.copyOf(flags), names, columnCount, pagingState);
         }
 
         /**
@@ -252,6 +278,26 @@ public class ResultSet
             flags.add(Flag.NO_METADATA);
         }
 
+        public void setMetadataChanged()
+        {
+            flags.add(Flag.METADATA_CHANGED);
+        }
+
+        public MD5Digest getResultMetadataId()
+        {
+            return resultMetadataId;
+        }
+
+        public static ResultMetadata fromPrepared(ParsedStatement.Prepared prepared)
+        {
+            CQLStatement statement = prepared.statement;
+
+            if (statement instanceof SelectStatement)
+                return ((SelectStatement)statement).getResultMetadata();
+
+            return ResultSet.ResultMetadata.EMPTY;
+        }
+
         @Override
         public boolean equals(Object other)
         {
@@ -308,12 +354,21 @@ public class ResultSet
 
                 EnumSet<Flag> flags = Flag.deserialize(iflags);
 
+                MD5Digest resultMetadataId = null;
+                if (flags.contains(Flag.METADATA_CHANGED))
+                {
+                    assert version.isGreaterOrEqualTo(ProtocolVersion.V5) : "MetadataChanged flag is not supported before native protocol v5";
+                    assert !flags.contains(Flag.NO_METADATA) : "MetadataChanged and NoMetadata are mutually exclusive flags";
+
+                    resultMetadataId = MD5Digest.wrap(CBUtil.readBytes(body));
+                }
+
                 PagingState state = null;
                 if (flags.contains(Flag.HAS_MORE_PAGES))
                     state = PagingState.deserialize(CBUtil.readValueNoCopy(body), version);
 
                 if (flags.contains(Flag.NO_METADATA))
-                    return new ResultMetadata(flags, null, columnCount, state);
+                    return new ResultMetadata(null, flags, null, columnCount, state);
 
                 boolean globalTablesSpec = flags.contains(Flag.GLOBAL_TABLES_SPEC);
 
@@ -335,7 +390,7 @@ public class ResultSet
                     AbstractType type = DataType.toType(DataType.codec.decodeOne(body, version));
                     names.add(new ColumnSpecification(ksName, cfName, colName, type));
                 }
-                return new ResultMetadata(flags, names, names.size(), state);
+                return new ResultMetadata(resultMetadataId, flags, names, names.size(), state);
             }
 
             public void encode(ResultMetadata m, ByteBuf dest, ProtocolVersion version)
@@ -343,7 +398,7 @@ public class ResultSet
                 boolean noMetadata = m.flags.contains(Flag.NO_METADATA);
                 boolean globalTablesSpec = m.flags.contains(Flag.GLOBAL_TABLES_SPEC);
                 boolean hasMorePages = m.flags.contains(Flag.HAS_MORE_PAGES);
-
+                boolean metadataChanged = m.flags.contains(Flag.METADATA_CHANGED);
                 assert version.isGreaterThan(ProtocolVersion.V1) || (!hasMorePages && !noMetadata)
                     : "version = " + version + ", flags = " + m.flags;
 
@@ -353,6 +408,12 @@ public class ResultSet
                 if (hasMorePages)
                     CBUtil.writeValue(m.pagingState.serialize(version), dest);
 
+                if (version.isGreaterOrEqualTo(ProtocolVersion.V5)  && metadataChanged)
+                {
+                    assert !noMetadata : "MetadataChanged and NoMetadata are mutually exclusive flags";
+                    CBUtil.writeBytes(m.getResultMetadataId().bytes, dest);
+                }
+
                 if (!noMetadata)
                 {
                     if (globalTablesSpec)
@@ -380,11 +441,15 @@ public class ResultSet
                 boolean noMetadata = m.flags.contains(Flag.NO_METADATA);
                 boolean globalTablesSpec = m.flags.contains(Flag.GLOBAL_TABLES_SPEC);
                 boolean hasMorePages = m.flags.contains(Flag.HAS_MORE_PAGES);
+                boolean metadataChanged = m.flags.contains(Flag.METADATA_CHANGED);
 
                 int size = 8;
                 if (hasMorePages)
                     size += CBUtil.sizeOfValue(m.pagingState.serializedSize(version));
 
+                if (version.isGreaterOrEqualTo(ProtocolVersion.V5) && metadataChanged)
+                    size += CBUtil.sizeOfBytes(m.getResultMetadataId().bytes);
+
                 if (!noMetadata)
                 {
                     if (globalTablesSpec)
@@ -486,6 +551,11 @@ public class ResultSet
             return sb.toString();
         }
 
+        public static PreparedMetadata fromPrepared(ParsedStatement.Prepared prepared)
+        {
+            return new PreparedMetadata(prepared.boundNames, prepared.partitionKeyBindIndexes);
+        }
+
         private static class Codec implements CBCodec<PreparedMetadata>
         {
             public PreparedMetadata decode(ByteBuf body, ProtocolVersion version)
@@ -603,7 +673,8 @@ public class ResultSet
         // The order of that enum matters!!
         GLOBAL_TABLES_SPEC,
         HAS_MORE_PAGES,
-        NO_METADATA;
+        NO_METADATA,
+        METADATA_CHANGED;
 
         public static EnumSet<Flag> deserialize(int flags)
         {
@@ -625,4 +696,23 @@ public class ResultSet
             return i;
         }
     }
+
+    public static MD5Digest computeResultMetadataId(List<ColumnSpecification> columnSpecifications)
+    {
+        MessageDigest md = FBUtilities.threadLocalMD5Digest();
+
+        if (columnSpecifications != null)
+        {
+            for (ColumnSpecification cs : columnSpecifications)
+            {
+                md.update(cs.name.bytes.duplicate());
+                md.update((byte) 0);
+                md.update(cs.type.toString().getBytes(StandardCharsets.UTF_8));
+                md.update((byte) 0);
+                md.update((byte) 0);
+            }
+        }
+
+        return MD5Digest.wrap(md.digest());
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/922dbdb6/src/java/org/apache/cassandra/cql3/selection/SelectionColumnMapping.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/selection/SelectionColumnMapping.java b/src/java/org/apache/cassandra/cql3/selection/SelectionColumnMapping.java
index cd04d94..5d3727f 100644
--- a/src/java/org/apache/cassandra/cql3/selection/SelectionColumnMapping.java
+++ b/src/java/org/apache/cassandra/cql3/selection/SelectionColumnMapping.java
@@ -37,8 +37,8 @@ import org.apache.cassandra.cql3.ColumnSpecification;
  */
 public class SelectionColumnMapping implements SelectionColumns
 {
-    private final ArrayList<ColumnSpecification> columnSpecifications;
-    private final HashMultimap<ColumnSpecification, ColumnMetadata> columnMappings;
+    private final List<ColumnSpecification> columnSpecifications;
+    private final Multimap<ColumnSpecification, ColumnMetadata> columnMappings;
 
     private SelectionColumnMapping()
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/922dbdb6/src/java/org/apache/cassandra/cql3/statements/ListPermissionsStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/ListPermissionsStatement.java b/src/java/org/apache/cassandra/cql3/statements/ListPermissionsStatement.java
index be7fb5d..aa2157a 100644
--- a/src/java/org/apache/cassandra/cql3/statements/ListPermissionsStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/ListPermissionsStatement.java
@@ -118,7 +118,8 @@ public class ListPermissionsStatement extends AuthorizationStatement
         if (details.isEmpty())
             return new ResultMessage.Void();
 
-        ResultSet result = new ResultSet(metadata);
+        ResultSet.ResultMetadata resultMetadata = new ResultSet.ResultMetadata(metadata);
+        ResultSet result = new ResultSet(resultMetadata);
         for (PermissionDetails pd : details)
         {
             result.addColumnValue(UTF8Type.instance.decompose(pd.grantee));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/922dbdb6/src/java/org/apache/cassandra/cql3/statements/ListRolesStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/ListRolesStatement.java b/src/java/org/apache/cassandra/cql3/statements/ListRolesStatement.java
index 0c0822c..7ed460c 100644
--- a/src/java/org/apache/cassandra/cql3/statements/ListRolesStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/ListRolesStatement.java
@@ -112,7 +112,8 @@ public class ListRolesStatement extends AuthorizationStatement
     // overridden in ListUsersStatement to include legacy metadata
     protected ResultMessage formatResults(List<RoleResource> sortedRoles)
     {
-        ResultSet result = new ResultSet(metadata);
+        ResultSet.ResultMetadata resultMetadata = new ResultSet.ResultMetadata(metadata);
+        ResultSet result = new ResultSet(resultMetadata);
 
         IRoleManager roleManager = DatabaseDescriptor.getRoleManager();
         for (RoleResource role : sortedRoles)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/922dbdb6/src/java/org/apache/cassandra/cql3/statements/ListUsersStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/ListUsersStatement.java b/src/java/org/apache/cassandra/cql3/statements/ListUsersStatement.java
index 9641333..1347fba 100644
--- a/src/java/org/apache/cassandra/cql3/statements/ListUsersStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/ListUsersStatement.java
@@ -44,7 +44,8 @@ public class ListUsersStatement extends ListRolesStatement
     @Override
     protected ResultMessage formatResults(List<RoleResource> sortedRoles)
     {
-        ResultSet result = new ResultSet(metadata);
+        ResultSet.ResultMetadata resultMetadata = new ResultSet.ResultMetadata(metadata);
+        ResultSet result = new ResultSet(resultMetadata);
 
         IRoleManager roleManager = DatabaseDescriptor.getRoleManager();
         for (RoleResource role : sortedRoles)
@@ -54,6 +55,7 @@ public class ListUsersStatement extends ListRolesStatement
             result.addColumnValue(UTF8Type.instance.decompose(role.getRoleName()));
             result.addColumnValue(BooleanType.instance.decompose(Roles.hasSuperuserStatus(role)));
         }
+
         return new ResultMessage.Rows(result);
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/922dbdb6/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
index caa24b2..4191285 100644
--- a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
@@ -497,6 +497,19 @@ public abstract class ModificationStatement implements CQLStatement
         conditions.addConditionsTo(request, clustering, options);
     }
 
+    private static ResultSet.ResultMetadata buildCASSuccessMetadata(String ksName, String cfName)
+    {
+        List<ColumnSpecification> specs = new ArrayList<>();
+        specs.add(casResultColumnSpecification(ksName, cfName));
+
+        return new ResultSet.ResultMetadata(specs);
+    }
+
+    private static ColumnSpecification casResultColumnSpecification(String ksName, String cfName)
+    {
+        return new ColumnSpecification(ksName, cfName, CAS_RESULT_COLUMN, BooleanType.instance);
+    }
+
     private ResultSet buildCasResultSet(RowIterator partition, QueryOptions options) throws InvalidRequestException
     {
         return buildCasResultSet(keyspace(), columnFamily(), partition, getColumnsWithConditions(), false, options);
@@ -507,8 +520,7 @@ public abstract class ModificationStatement implements CQLStatement
     {
         boolean success = partition == null;
 
-        ColumnSpecification spec = new ColumnSpecification(ksName, tableName, CAS_RESULT_COLUMN, BooleanType.instance);
-        ResultSet.ResultMetadata metadata = new ResultSet.ResultMetadata(Collections.singletonList(spec));
+        ResultSet.ResultMetadata metadata = buildCASSuccessMetadata(ksName, tableName);
         List<List<ByteBuffer>> rows = Collections.singletonList(Collections.singletonList(BooleanType.instance.decompose(success)));
 
         ResultSet rs = new ResultSet(metadata, rows);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/922dbdb6/src/java/org/apache/cassandra/cql3/statements/ParsedStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/ParsedStatement.java b/src/java/org/apache/cassandra/cql3/statements/ParsedStatement.java
index e617ba7..34bfc3d 100644
--- a/src/java/org/apache/cassandra/cql3/statements/ParsedStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/ParsedStatement.java
@@ -17,12 +17,12 @@
  */
 package org.apache.cassandra.cql3.statements;
 
-import java.util.Collections;
-import java.util.List;
+import java.util.*;
 
 import org.apache.cassandra.cql3.*;
 import org.apache.cassandra.cql3.functions.Function;
 import org.apache.cassandra.exceptions.RequestValidationException;
+import org.apache.cassandra.utils.*;
 
 public abstract class ParsedStatement
 {
@@ -56,8 +56,9 @@ public abstract class ParsedStatement
          */
         public String rawCQLStatement;
 
-        public final CQLStatement statement;
+        public final MD5Digest resultMetadataId;
         public final List<ColumnSpecification> boundNames;
+        public final CQLStatement statement;
         public final short[] partitionKeyBindIndexes;
 
         protected Prepared(CQLStatement statement, List<ColumnSpecification> boundNames, short[] partitionKeyBindIndexes)
@@ -65,6 +66,7 @@ public abstract class ParsedStatement
             this.statement = statement;
             this.boundNames = boundNames;
             this.partitionKeyBindIndexes = partitionKeyBindIndexes;
+            this.resultMetadataId = ResultSet.ResultMetadata.fromPrepared(this).getResultMetadataId();
             this.rawCQLStatement = "";
         }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/922dbdb6/src/java/org/apache/cassandra/transport/Client.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/Client.java b/src/java/org/apache/cassandra/transport/Client.java
index 7fec473..4793d17 100644
--- a/src/java/org/apache/cassandra/transport/Client.java
+++ b/src/java/org/apache/cassandra/transport/Client.java
@@ -147,7 +147,9 @@ public class Client extends SimpleClient
         {
             try
             {
-                byte[] id = Hex.hexToBytes(iter.next());
+                byte[] preparedStatementId = Hex.hexToBytes(iter.next());
+                byte[] resultMetadataId = Hex.hexToBytes(iter.next());
+
                 List<ByteBuffer> values = new ArrayList<ByteBuffer>();
                 while(iter.hasNext())
                 {
@@ -164,7 +166,7 @@ public class Client extends SimpleClient
                     }
                     values.add(bb);
                 }
-                return new ExecuteMessage(MD5Digest.wrap(id), QueryOptions.forInternalCalls(ConsistencyLevel.ONE, values));
+                return new ExecuteMessage(MD5Digest.wrap(preparedStatementId), MD5Digest.wrap(resultMetadataId), QueryOptions.forInternalCalls(ConsistencyLevel.ONE, values));
             }
             catch (Exception e)
             {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/922dbdb6/src/java/org/apache/cassandra/transport/SimpleClient.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/SimpleClient.java b/src/java/org/apache/cassandra/transport/SimpleClient.java
index d5148ab..ddd3484 100644
--- a/src/java/org/apache/cassandra/transport/SimpleClient.java
+++ b/src/java/org/apache/cassandra/transport/SimpleClient.java
@@ -184,9 +184,9 @@ public class SimpleClient implements Closeable
         return (ResultMessage.Prepared)msg;
     }
 
-    public ResultMessage executePrepared(byte[] statementId, List<ByteBuffer> values, ConsistencyLevel consistency)
+    public ResultMessage executePrepared(ResultMessage.Prepared prepared, List<ByteBuffer> values, ConsistencyLevel consistency)
     {
-        Message.Response msg = execute(new ExecuteMessage(MD5Digest.wrap(statementId), QueryOptions.forInternalCalls(consistency, values)));
+        Message.Response msg = execute(new ExecuteMessage(prepared.statementId, prepared.resultMetadataId, QueryOptions.forInternalCalls(consistency, values)));
         assert msg instanceof ResultMessage;
         return (ResultMessage)msg;
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/922dbdb6/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java b/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java
index d881e63..a8fd2a0 100644
--- a/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java
@@ -26,7 +26,9 @@ import org.apache.cassandra.cql3.CQLStatement;
 import org.apache.cassandra.cql3.ColumnSpecification;
 import org.apache.cassandra.cql3.QueryHandler;
 import org.apache.cassandra.cql3.QueryOptions;
+import org.apache.cassandra.cql3.ResultSet;
 import org.apache.cassandra.cql3.statements.ParsedStatement;
+import org.apache.cassandra.cql3.statements.UpdateStatement;
 import org.apache.cassandra.exceptions.PreparedQueryNotFoundException;
 import org.apache.cassandra.service.ClientState;
 import org.apache.cassandra.service.QueryState;
@@ -42,13 +44,22 @@ public class ExecuteMessage extends Message.Request
     {
         public ExecuteMessage decode(ByteBuf body, ProtocolVersion version)
         {
-            byte[] id = CBUtil.readBytes(body);
-            return new ExecuteMessage(MD5Digest.wrap(id), QueryOptions.codec.decode(body, version));
+            MD5Digest statementId = MD5Digest.wrap(CBUtil.readBytes(body));
+
+            MD5Digest resultMetadataId = null;
+            if (version.isGreaterOrEqualTo(ProtocolVersion.V5))
+                resultMetadataId = MD5Digest.wrap(CBUtil.readBytes(body));
+
+            return new ExecuteMessage(statementId, resultMetadataId, QueryOptions.codec.decode(body, version));
         }
 
         public void encode(ExecuteMessage msg, ByteBuf dest, ProtocolVersion version)
         {
             CBUtil.writeBytes(msg.statementId.bytes, dest);
+
+            if (version.isGreaterOrEqualTo(ProtocolVersion.V5))
+                CBUtil.writeBytes(msg.resultMetadataId.bytes, dest);
+
             if (version == ProtocolVersion.V1)
             {
                 CBUtil.writeValueList(msg.options.getValues(), dest);
@@ -64,6 +75,10 @@ public class ExecuteMessage extends Message.Request
         {
             int size = 0;
             size += CBUtil.sizeOfBytes(msg.statementId.bytes);
+
+            if (version.isGreaterOrEqualTo(ProtocolVersion.V5))
+                size += CBUtil.sizeOfBytes(msg.resultMetadataId.bytes);
+
             if (version == ProtocolVersion.V1)
             {
                 size += CBUtil.sizeOfValueList(msg.options.getValues());
@@ -78,13 +93,15 @@ public class ExecuteMessage extends Message.Request
     };
 
     public final MD5Digest statementId;
+    public final MD5Digest resultMetadataId;
     public final QueryOptions options;
 
-    public ExecuteMessage(MD5Digest statementId, QueryOptions options)
+    public ExecuteMessage(MD5Digest statementId, MD5Digest resultMetadataId, QueryOptions options)
     {
         super(Message.Type.EXECUTE);
         this.statementId = statementId;
         this.options = options;
+        this.resultMetadataId = resultMetadataId;
     }
 
     public Message.Response execute(QueryState state, long queryStartNanoTime)
@@ -144,8 +161,29 @@ public class ExecuteMessage extends Message.Request
             // by wrapping the QueryOptions.
             QueryOptions queryOptions = QueryOptions.addColumnSpecifications(options, prepared.boundNames);
             Message.Response response = handler.processPrepared(statement, state, queryOptions, getCustomPayload(), queryStartNanoTime);
-            if (options.skipMetadata() && response instanceof ResultMessage.Rows)
-                ((ResultMessage.Rows)response).result.metadata.setSkipMetadata();
+
+            if (response instanceof ResultMessage.Rows)
+            {
+                ResultMessage.Rows rows = (ResultMessage.Rows) response;
+
+                ResultSet.ResultMetadata resultMetadata = rows.result.metadata;
+                if (options.getProtocolVersion().isGreaterOrEqualTo(ProtocolVersion.V5))
+                {
+                    // Starting with V5 we can rely on the result metadata id coming with execute message in order to
+                    // check if there was a change, comparing it with metadata that's about to be returned to client.
+                    if (!resultMetadata.getResultMetadataId().equals(resultMetadataId))
+                        resultMetadata.setMetadataChanged();
+                    else if (options.skipMetadata())
+                        resultMetadata.setSkipMetadata();
+                }
+                else
+                {
+                    // Pre-V5 code has to rely on the difference between the metadata in the prepared message cache
+                    // and compare it with the metadata to be returned to client.
+                    if (options.skipMetadata() && prepared.resultMetadataId.equals(resultMetadata.getResultMetadataId()))
+                        resultMetadata.setSkipMetadata();
+                }
+            }
 
             if (tracingId != null)
                 response.setTracingId(tracingId);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/922dbdb6/src/java/org/apache/cassandra/transport/messages/ResultMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/ResultMessage.java b/src/java/org/apache/cassandra/transport/messages/ResultMessage.java
index e1ea948..d8aefbe 100644
--- a/src/java/org/apache/cassandra/transport/messages/ResultMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/ResultMessage.java
@@ -20,9 +20,9 @@ package org.apache.cassandra.transport.messages;
 
 import io.netty.buffer.ByteBuf;
 
+import org.apache.cassandra.cql3.ColumnSpecification;
 import org.apache.cassandra.cql3.CQLStatement;
 import org.apache.cassandra.cql3.ResultSet;
-import org.apache.cassandra.cql3.statements.SelectStatement;
 import org.apache.cassandra.cql3.statements.ParsedStatement;
 import org.apache.cassandra.transport.*;
 import org.apache.cassandra.utils.MD5Digest;
@@ -51,12 +51,12 @@ public abstract class ResultMessage extends Message.Response
 
     public enum Kind
     {
-        VOID         (1, Void.subcodec),
-        ROWS         (2, Rows.subcodec),
-        SET_KEYSPACE (3, SetKeyspace.subcodec),
-        PREPARED     (4, Prepared.subcodec),
-        SCHEMA_CHANGE(5, SchemaChange.subcodec);
 
+        VOID               (1, Void.subcodec),
+        ROWS               (2, Rows.subcodec),
+        SET_KEYSPACE       (3, SetKeyspace.subcodec),
+        PREPARED           (4, Prepared.subcodec),
+        SCHEMA_CHANGE      (5, SchemaChange.subcodec);
         public final int id;
         public final Message.Codec<ResultMessage> subcodec;
 
@@ -216,13 +216,16 @@ public abstract class ResultMessage extends Message.Response
             public ResultMessage decode(ByteBuf body, ProtocolVersion version)
             {
                 MD5Digest id = MD5Digest.wrap(CBUtil.readBytes(body));
+                MD5Digest resultMetadataId = null;
+                if (version.isGreaterOrEqualTo(ProtocolVersion.V5))
+                    resultMetadataId = MD5Digest.wrap(CBUtil.readBytes(body));
                 ResultSet.PreparedMetadata metadata = ResultSet.PreparedMetadata.codec.decode(body, version);
 
                 ResultSet.ResultMetadata resultMetadata = ResultSet.ResultMetadata.EMPTY;
                 if (version.isGreaterThan(ProtocolVersion.V1))
                     resultMetadata = ResultSet.ResultMetadata.codec.decode(body, version);
 
-                return new Prepared(id, metadata, resultMetadata);
+                return new Prepared(id, resultMetadataId, metadata, resultMetadata);
             }
 
             public void encode(ResultMessage msg, ByteBuf dest, ProtocolVersion version)
@@ -232,6 +235,9 @@ public abstract class ResultMessage extends Message.Response
                 assert prepared.statementId != null;
 
                 CBUtil.writeBytes(prepared.statementId.bytes, dest);
+                if (version.isGreaterOrEqualTo(ProtocolVersion.V5))
+                    CBUtil.writeBytes(prepared.resultMetadataId.bytes, dest);
+
                 ResultSet.PreparedMetadata.codec.encode(prepared.metadata, dest, version);
                 if (version.isGreaterThan(ProtocolVersion.V1))
                     ResultSet.ResultMetadata.codec.encode(prepared.resultMetadata, dest, version);
@@ -245,6 +251,8 @@ public abstract class ResultMessage extends Message.Response
 
                 int size = 0;
                 size += CBUtil.sizeOfBytes(prepared.statementId.bytes);
+                if (version.isGreaterOrEqualTo(ProtocolVersion.V5))
+                    size += CBUtil.sizeOfBytes(prepared.resultMetadataId.bytes);
                 size += ResultSet.PreparedMetadata.codec.encodedSize(prepared.metadata, version);
                 if (version.isGreaterThan(ProtocolVersion.V1))
                     size += ResultSet.ResultMetadata.codec.encodedSize(prepared.resultMetadata, version);
@@ -253,6 +261,7 @@ public abstract class ResultMessage extends Message.Response
         };
 
         public final MD5Digest statementId;
+        public final MD5Digest resultMetadataId;
 
         /** Describes the variables to be bound in the prepared statement */
         public final ResultSet.PreparedMetadata metadata;
@@ -260,27 +269,15 @@ public abstract class ResultMessage extends Message.Response
         /** Describes the results of executing this prepared statement */
         public final ResultSet.ResultMetadata resultMetadata;
 
-        public Prepared(MD5Digest statementId, ParsedStatement.Prepared prepared)
-        {
-            this(statementId, new ResultSet.PreparedMetadata(prepared.boundNames, prepared.partitionKeyBindIndexes), extractResultMetadata(prepared.statement));
-        }
-
-        private Prepared(MD5Digest statementId, ResultSet.PreparedMetadata metadata, ResultSet.ResultMetadata resultMetadata)
+        public Prepared(MD5Digest statementId, MD5Digest resultMetadataId, ResultSet.PreparedMetadata metadata, ResultSet.ResultMetadata resultMetadata)
         {
             super(Kind.PREPARED);
             this.statementId = statementId;
+            this.resultMetadataId = resultMetadataId;
             this.metadata = metadata;
             this.resultMetadata = resultMetadata;
         }
 
-        private static ResultSet.ResultMetadata extractResultMetadata(CQLStatement statement)
-        {
-            if (!(statement instanceof SelectStatement))
-                return ResultSet.ResultMetadata.EMPTY;
-
-            return ((SelectStatement)statement).getResultMetadata();
-        }
-
         @Override
         public String toString()
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/922dbdb6/test/unit/org/apache/cassandra/cql3/CQLTester.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/CQLTester.java b/test/unit/org/apache/cassandra/cql3/CQLTester.java
index 0a0d757..062a4bc 100644
--- a/test/unit/org/apache/cassandra/cql3/CQLTester.java
+++ b/test/unit/org/apache/cassandra/cql3/CQLTester.java
@@ -94,7 +94,7 @@ public abstract class CQLTester
     protected static final int nativePort;
     protected static final InetAddress nativeAddr;
     private static final Map<ProtocolVersion, Cluster> clusters = new HashMap<>();
-    private static final Map<ProtocolVersion, Session> sessions = new HashMap<>();
+    protected static final Map<ProtocolVersion, Session> sessions = new HashMap<>();
 
     private static boolean isServerPrepared = false;
 
@@ -386,12 +386,18 @@ public abstract class CQLTester
             if (clusters.containsKey(version))
                 continue;
 
-            Cluster cluster = Cluster.builder()
-                                     .addContactPoints(nativeAddr)
-                                     .withClusterName("Test Cluster")
-                                     .withPort(nativePort)
-                                     .withProtocolVersion(com.datastax.driver.core.ProtocolVersion.fromInt(version.asInt()))
-                                     .build();
+            Cluster.Builder builder = Cluster.builder()
+                                             .withoutJMXReporting()
+                                             .addContactPoints(nativeAddr)
+                                             .withClusterName("Test Cluster")
+                                             .withPort(nativePort);
+
+            if (version.isBeta())
+                builder = builder.allowBetaProtocolVersion();
+            else
+                builder = builder.withProtocolVersion(com.datastax.driver.core.ProtocolVersion.fromInt(version.asInt()));
+
+            Cluster cluster = builder.build();
             clusters.put(version, cluster);
             sessions.put(version, cluster.connect());
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/922dbdb6/test/unit/org/apache/cassandra/cql3/PreparedStatementsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/PreparedStatementsTest.java b/test/unit/org/apache/cassandra/cql3/PreparedStatementsTest.java
index 385ebb7..f843965 100644
--- a/test/unit/org/apache/cassandra/cql3/PreparedStatementsTest.java
+++ b/test/unit/org/apache/cassandra/cql3/PreparedStatementsTest.java
@@ -17,63 +17,38 @@
  */
 package org.apache.cassandra.cql3;
 
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
+import org.junit.Before;
 import org.junit.Test;
 
-import com.datastax.driver.core.Cluster;
-import com.datastax.driver.core.PreparedStatement;
-import com.datastax.driver.core.Session;
+import com.datastax.driver.core.*;
+import com.datastax.driver.core.ResultSet;
 import com.datastax.driver.core.exceptions.SyntaxError;
-import org.apache.cassandra.SchemaLoader;
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.schema.Schema;
 import org.apache.cassandra.index.StubIndex;
-import org.apache.cassandra.service.EmbeddedCassandraService;
+import org.apache.cassandra.transport.ProtocolVersion;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
 
-public class PreparedStatementsTest extends SchemaLoader
+public class PreparedStatementsTest extends CQLTester
 {
-    private static Cluster cluster;
-    private static Session session;
-
     private static final String KEYSPACE = "prepared_stmt_cleanup";
     private static final String createKsStatement = "CREATE KEYSPACE " + KEYSPACE +
                                                     " WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 };";
     private static final String dropKsStatement = "DROP KEYSPACE IF EXISTS " + KEYSPACE;
 
-    @BeforeClass
-    public static void setup() throws Exception
+    @Before
+    public void setup()
     {
-        Schema.instance.clear();
-
-        EmbeddedCassandraService cassandra = new EmbeddedCassandraService();
-        cassandra.start();
-
-        // Currently the native server start method return before the server is fully binded to the socket, so we need
-        // to wait slightly before trying to connect to it. We should fix this but in the meantime using a sleep.
-        Thread.sleep(1500);
-
-        cluster = Cluster.builder().addContactPoint("127.0.0.1")
-                                   .withPort(DatabaseDescriptor.getNativeTransportPort())
-                                   .build();
-        session = cluster.connect();
-
-        session.execute(dropKsStatement);
-        session.execute(createKsStatement);
-    }
-
-    @AfterClass
-    public static void tearDown() throws Exception
-    {
-        cluster.close();
+        requireNetwork();
     }
 
     @Test
     public void testInvalidatePreparedStatementsOnDrop()
     {
+        Session session = sessions.get(ProtocolVersion.V5);
+        session.execute(dropKsStatement);
+        session.execute(createKsStatement);
+
         String createTableStatement = "CREATE TABLE IF NOT EXISTS " + KEYSPACE + ".qp_cleanup (id int PRIMARY KEY, cid int, val text);";
         String dropTableStatement = "DROP TABLE IF EXISTS " + KEYSPACE + ".qp_cleanup;";
 
@@ -101,15 +76,128 @@ public class PreparedStatementsTest extends SchemaLoader
     }
 
     @Test
+    public void testInvalidatePreparedStatementOnAlterV5()
+    {
+        testInvalidatePreparedStatementOnAlter(ProtocolVersion.V5, true);
+    }
+
+    @Test
+    public void testInvalidatePreparedStatementOnAlterV4()
+    {
+        testInvalidatePreparedStatementOnAlter(ProtocolVersion.V4, false);
+    }
+
+    private void testInvalidatePreparedStatementOnAlter(ProtocolVersion version, boolean supportsMetadataChange)
+    {
+        Session session = sessions.get(version);
+        String createTableStatement = "CREATE TABLE IF NOT EXISTS " + KEYSPACE + ".qp_cleanup (a int PRIMARY KEY, b int, c int);";
+        String alterTableStatement = "ALTER TABLE " + KEYSPACE + ".qp_cleanup ADD d int;";
+
+        session.execute(dropKsStatement);
+        session.execute(createKsStatement);
+        session.execute(createTableStatement);
+
+        PreparedStatement preparedSelect = session.prepare("SELECT * FROM " + KEYSPACE + ".qp_cleanup");
+        session.execute("INSERT INTO " + KEYSPACE + ".qp_cleanup (a, b, c) VALUES (?, ?, ?);",
+                        1, 2, 3);
+        session.execute("INSERT INTO " + KEYSPACE + ".qp_cleanup (a, b, c) VALUES (?, ?, ?);",
+                        2, 3, 4);
+
+        assertRowsNet(session.execute(preparedSelect.bind()),
+                      row(1, 2, 3),
+                      row(2, 3, 4));
+
+        session.execute(alterTableStatement);
+        session.execute("INSERT INTO " + KEYSPACE + ".qp_cleanup (a, b, c, d) VALUES (?, ?, ?, ?);",
+                        3, 4, 5, 6);
+
+        ResultSet rs;
+        if (supportsMetadataChange)
+        {
+            rs = session.execute(preparedSelect.bind());
+            assertRowsNet(version,
+                          rs,
+                          row(1, 2, 3, null),
+                          row(2, 3, 4, null),
+                          row(3, 4, 5, 6));
+            assertEquals(rs.getColumnDefinitions().size(), 4);
+        }
+        else
+        {
+            rs = session.execute(preparedSelect.bind());
+            assertRowsNet(rs,
+                          row(1, 2, 3),
+                          row(2, 3, 4),
+                          row(3, 4, 5));
+            assertEquals(rs.getColumnDefinitions().size(), 3);
+        }
+
+        session.execute(dropKsStatement);
+    }
+
+    @Test
+    public void testInvalidatePreparedStatementOnAlterUnchangedMetadataV4()
+    {
+        testInvalidatePreparedStatementOnAlterUnchangedMetadata(ProtocolVersion.V4);
+    }
+
+    @Test
+    public void testInvalidatePreparedStatementOnAlterUnchangedMetadataV5()
+    {
+        testInvalidatePreparedStatementOnAlterUnchangedMetadata(ProtocolVersion.V5);
+    }
+
+    private void testInvalidatePreparedStatementOnAlterUnchangedMetadata(ProtocolVersion version)
+    {
+        Session session = sessions.get(version);
+        String createTableStatement = "CREATE TABLE IF NOT EXISTS " + KEYSPACE + ".qp_cleanup (a int PRIMARY KEY, b int, c int);";
+        String alterTableStatement = "ALTER TABLE " + KEYSPACE + ".qp_cleanup ADD d int;";
+
+        session.execute(dropKsStatement);
+        session.execute(createKsStatement);
+        session.execute(createTableStatement);
+
+        PreparedStatement preparedSelect = session.prepare("SELECT a, b, c FROM " + KEYSPACE + ".qp_cleanup");
+        session.execute("INSERT INTO " + KEYSPACE + ".qp_cleanup (a, b, c) VALUES (?, ?, ?);",
+                        1, 2, 3);
+        session.execute("INSERT INTO " + KEYSPACE + ".qp_cleanup (a, b, c) VALUES (?, ?, ?);",
+                        2, 3, 4);
+
+        ResultSet rs = session.execute(preparedSelect.bind());
+
+        assertRowsNet(rs,
+                      row(1, 2, 3),
+                      row(2, 3, 4));
+        assertEquals(rs.getColumnDefinitions().size(), 3);
+
+        session.execute(alterTableStatement);
+        session.execute("INSERT INTO " + KEYSPACE + ".qp_cleanup (a, b, c, d) VALUES (?, ?, ?, ?);",
+                        3, 4, 5, 6);
+
+        rs = session.execute(preparedSelect.bind());
+        assertRowsNet(rs,
+                      row(1, 2, 3),
+                      row(2, 3, 4),
+                      row(3, 4, 5));
+        assertEquals(rs.getColumnDefinitions().size(), 3);
+
+        session.execute(dropKsStatement);
+    }
+
+    @Test
     public void testStatementRePreparationOnReconnect()
     {
+        Session session = sessions.get(ProtocolVersion.V5);
+        session.execute("USE " + keyspace());
+
         session.execute(dropKsStatement);
         session.execute(createKsStatement);
 
-        session.execute("CREATE TABLE IF NOT EXISTS " + KEYSPACE + ".qp_test (id int PRIMARY KEY, cid int, val text);");
+        createTable("CREATE TABLE %s (id int PRIMARY KEY, cid int, val text);");
+
 
-        String insertCQL = "INSERT INTO " + KEYSPACE + ".qp_test (id, cid, val) VALUES (?, ?, ?)";
-        String selectCQL = "Select * from " + KEYSPACE + ".qp_test where id = ?";
+        String insertCQL = "INSERT INTO " + currentTable() + " (id, cid, val) VALUES (?, ?, ?)";
+        String selectCQL = "Select * from " + currentTable() + " where id = ?";
 
         PreparedStatement preparedInsert = session.prepare(insertCQL);
         PreparedStatement preparedSelect = session.prepare(selectCQL);
@@ -117,23 +205,31 @@ public class PreparedStatementsTest extends SchemaLoader
         session.execute(preparedInsert.bind(1, 1, "value"));
         assertEquals(1, session.execute(preparedSelect.bind(1)).all().size());
 
-        cluster.close();
-
-        cluster = Cluster.builder().addContactPoint("127.0.0.1")
-                                   .withPort(DatabaseDescriptor.getNativeTransportPort())
-                                   .build();
-        session = cluster.connect();
-
-        preparedInsert = session.prepare(insertCQL);
-        preparedSelect = session.prepare(selectCQL);
-        session.execute(preparedInsert.bind(1, 1, "value"));
+        try (Cluster newCluster = Cluster.builder()
+                                 .addContactPoints(nativeAddr)
+                                 .withClusterName("Test Cluster")
+                                 .withPort(nativePort)
+                                 .withoutJMXReporting()
+                                 .allowBetaProtocolVersion()
+                                 .build())
+        {
+            try (Session newSession = newCluster.connect())
+            {
+                newSession.execute("USE " + keyspace());
+                preparedInsert = newSession.prepare(insertCQL);
+                preparedSelect = newSession.prepare(selectCQL);
+                session.execute(preparedInsert.bind(1, 1, "value"));
 
-        assertEquals(1, session.execute(preparedSelect.bind(1)).all().size());
+                assertEquals(1, session.execute(preparedSelect.bind(1)).all().size());
+            }
+        }
     }
 
     @Test
     public void prepareAndExecuteWithCustomExpressions() throws Throwable
     {
+        Session session = sessions.get(ProtocolVersion.V5);
+
         session.execute(dropKsStatement);
         session.execute(createKsStatement);
         String table = "custom_expr_test";
@@ -163,4 +259,123 @@ public class PreparedStatementsTest extends SchemaLoader
             assertEquals("Bind variables cannot be used for index names", e.getMessage());
         }
     }
+
+    @Test
+    public void testPrepareWithLWT() throws Throwable
+    {
+        testPrepareWithLWT(ProtocolVersion.V4);
+        testPrepareWithLWT(ProtocolVersion.V5);
+    }
+
+
+    private void testPrepareWithLWT(ProtocolVersion version) throws Throwable
+    {
+        Session session = sessionNet(version);
+        session.execute("USE " + keyspace());
+        createTable("CREATE TABLE %s (pk int, v1 int, v2 int, PRIMARY KEY (pk))");
+
+        PreparedStatement prepared1 = session.prepare(String.format("UPDATE %s SET v1 = ?, v2 = ?  WHERE pk = 1 IF v1 = ?", currentTable()));
+        PreparedStatement prepared2 = session.prepare(String.format("INSERT INTO %s (pk, v1, v2) VALUES (?, 200, 300) IF NOT EXISTS", currentTable()));
+        execute("INSERT INTO %s (pk, v1, v2) VALUES (1,1,1)");
+        execute("INSERT INTO %s (pk, v1, v2) VALUES (2,2,2)");
+
+        ResultSet rs;
+
+        rs = session.execute(prepared1.bind(10, 20, 1));
+        assertRowsNet(rs,
+                      row(true));
+        assertEquals(rs.getColumnDefinitions().size(), 1);
+
+        rs = session.execute(prepared1.bind(100, 200, 1));
+        assertRowsNet(rs,
+                      row(false, 10));
+        assertEquals(rs.getColumnDefinitions().size(), 2);
+
+        rs = session.execute(prepared1.bind(30, 40, 10));
+        assertRowsNet(rs,
+                      row(true));
+        assertEquals(rs.getColumnDefinitions().size(), 1);
+
+        // Try executing the same message once again
+        rs = session.execute(prepared1.bind(100, 200, 1));
+        assertRowsNet(rs,
+                      row(false, 30));
+        assertEquals(rs.getColumnDefinitions().size(), 2);
+
+        rs = session.execute(prepared2.bind(1));
+        assertRowsNet(rs,
+                      row(false, 1, 30, 40));
+        assertEquals(rs.getColumnDefinitions().size(), 4);
+
+        alterTable("ALTER TABLE %s ADD v3 int;");
+
+        rs = session.execute(prepared2.bind(1));
+        assertRowsNet(rs,
+                      row(false, 1, 30, 40, null));
+        assertEquals(rs.getColumnDefinitions().size(), 5);
+
+        rs = session.execute(prepared2.bind(20));
+        assertRowsNet(rs,
+                      row(true));
+        assertEquals(rs.getColumnDefinitions().size(), 1);
+
+        rs = session.execute(prepared2.bind(20));
+        assertRowsNet(rs,
+                      row(false, 20, 200, 300, null));
+        assertEquals(rs.getColumnDefinitions().size(), 5);
+    }
+
+    @Test
+    public void testPrepareWithBatchLWT() throws Throwable
+    {
+        testPrepareWithBatchLWT(ProtocolVersion.V4);
+        testPrepareWithBatchLWT(ProtocolVersion.V5);
+    }
+
+    private void testPrepareWithBatchLWT(ProtocolVersion version) throws Throwable
+    {
+        Session session = sessionNet(version);
+        session.execute("USE " + keyspace());
+        createTable("CREATE TABLE %s (pk int, v1 int, v2 int, PRIMARY KEY (pk))");
+
+        PreparedStatement prepared1 = session.prepare("BEGIN BATCH " +
+                                                      "UPDATE " + currentTable() + " SET v1 = ? WHERE pk = 1 IF v1 = ?;" +
+                                                      "UPDATE " + currentTable() + " SET v2 = ? WHERE pk = 1 IF v2 = ?;" +
+                                                      "APPLY BATCH;");
+        PreparedStatement prepared2 = session.prepare("BEGIN BATCH " +
+                                                      "INSERT INTO " + currentTable() + " (pk, v1, v2) VALUES (1, 200, 300) IF NOT EXISTS;" +
+                                                      "APPLY BATCH");
+        execute("INSERT INTO %s (pk, v1, v2) VALUES (1,1,1)");
+        execute("INSERT INTO %s (pk, v1, v2) VALUES (2,2,2)");
+
+        com.datastax.driver.core.ResultSet rs;
+
+        rs = session.execute(prepared1.bind(10, 1, 20, 1));
+        assertRowsNet(rs,
+                      row(true));
+        assertEquals(rs.getColumnDefinitions().size(), 1);
+
+        rs = session.execute(prepared1.bind(100, 1, 200, 1));
+        assertRowsNet(rs,
+                      row(false, 1, 10, 20));
+        assertEquals(rs.getColumnDefinitions().size(), 4);
+
+        // Try executing the same message once again
+        rs = session.execute(prepared1.bind(100, 1, 200, 1));
+        assertRowsNet(rs,
+                      row(false, 1, 10, 20));
+        assertEquals(rs.getColumnDefinitions().size(), 4);
+
+        rs = session.execute(prepared2.bind());
+        assertRowsNet(rs,
+                      row(false, 1, 10, 20));
+        assertEquals(rs.getColumnDefinitions().size(), 4);
+
+        alterTable("ALTER TABLE %s ADD v3 int;");
+
+        rs = session.execute(prepared2.bind());
+        assertRowsNet(rs,
+                      row(false, 1, 10, 20, null));
+        assertEquals(rs.getColumnDefinitions().size(), 5);
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/922dbdb6/test/unit/org/apache/cassandra/cql3/PstmtPersistenceTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/PstmtPersistenceTest.java b/test/unit/org/apache/cassandra/cql3/PstmtPersistenceTest.java
index 228352c..7ca6ab9 100644
--- a/test/unit/org/apache/cassandra/cql3/PstmtPersistenceTest.java
+++ b/test/unit/org/apache/cassandra/cql3/PstmtPersistenceTest.java
@@ -78,14 +78,14 @@ public class PstmtPersistenceTest extends CQLTester
         assertEquals(5, stmtIds.size());
         assertEquals(5, QueryProcessor.preparedStatementsCount());
 
-        Assert.assertEquals(5, numberOfStatementsOnDisk());
+        assertEquals(5, numberOfStatementsOnDisk());
 
         QueryHandler handler = ClientState.getCQLQueryHandler();
         validatePstmts(stmtIds, handler);
 
         // clear prepared statements cache
         QueryProcessor.clearPreparedStatements(true);
-        Assert.assertEquals(0, QueryProcessor.preparedStatementsCount());
+        assertEquals(0, QueryProcessor.preparedStatementsCount());
         for (MD5Digest stmtId : stmtIds)
             Assert.assertNull(handler.getPrepared(stmtId));
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/922dbdb6/test/unit/org/apache/cassandra/cql3/validation/entities/JsonTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/validation/entities/JsonTest.java b/test/unit/org/apache/cassandra/cql3/validation/entities/JsonTest.java
index 2bd95be..c7a41f3 100644
--- a/test/unit/org/apache/cassandra/cql3/validation/entities/JsonTest.java
+++ b/test/unit/org/apache/cassandra/cql3/validation/entities/JsonTest.java
@@ -17,6 +17,7 @@
  */
 package org.apache.cassandra.cql3.validation.entities;
 
+import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.cql3.Json;
 import org.apache.cassandra.cql3.CQLTester;
 import org.apache.cassandra.cql3.Duration;
@@ -41,10 +42,17 @@ import static org.junit.Assert.fail;
 
 public class JsonTest extends CQLTester
 {
+    // This method will be ran instead of the CQLTester#setUpClass
     @BeforeClass
-    public static void setUp()
+    public static void setUpClass()
     {
+        if (ROW_CACHE_SIZE_IN_MB > 0)
+            DatabaseDescriptor.setRowCacheSizeInMB(ROW_CACHE_SIZE_IN_MB);
+
         StorageService.instance.setPartitionerUnsafe(ByteOrderedPartitioner.instance);
+
+        // Once per-JVM is enough
+        prepareServer();
     }
 
     @Test

http://git-wip-us.apache.org/repos/asf/cassandra/blob/922dbdb6/test/unit/org/apache/cassandra/cql3/validation/operations/SelectLimitTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/validation/operations/SelectLimitTest.java b/test/unit/org/apache/cassandra/cql3/validation/operations/SelectLimitTest.java
index 68b2e93..5d6ffb1 100644
--- a/test/unit/org/apache/cassandra/cql3/validation/operations/SelectLimitTest.java
+++ b/test/unit/org/apache/cassandra/cql3/validation/operations/SelectLimitTest.java
@@ -30,11 +30,14 @@ import org.apache.cassandra.service.StorageService;
 
 public class SelectLimitTest extends CQLTester
 {
+    // This method will be ran instead of the CQLTester#setUpClass
     @BeforeClass
-    public static void setUp()
+    public static void setUpClass()
     {
         StorageService.instance.setPartitionerUnsafe(ByteOrderedPartitioner.instance);
         DatabaseDescriptor.setPartitionerUnsafe(ByteOrderedPartitioner.instance);
+
+        prepareServer();
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/922dbdb6/test/unit/org/apache/cassandra/transport/MessagePayloadTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/transport/MessagePayloadTest.java b/test/unit/org/apache/cassandra/transport/MessagePayloadTest.java
index c27593b..817cb06 100644
--- a/test/unit/org/apache/cassandra/transport/MessagePayloadTest.java
+++ b/test/unit/org/apache/cassandra/transport/MessagePayloadTest.java
@@ -162,7 +162,7 @@ public class MessagePayloadTest extends CQLTester
                 payloadEquals(reqMap, requestPayload);
                 payloadEquals(respMap, prepareResponse.getCustomPayload());
 
-                ExecuteMessage executeMessage = new ExecuteMessage(prepareResponse.statementId, QueryOptions.DEFAULT);
+                ExecuteMessage executeMessage = new ExecuteMessage(prepareResponse.statementId, prepareResponse.resultMetadataId, QueryOptions.DEFAULT);
                 reqMap = Collections.singletonMap("foo", bytes(44));
                 responsePayload = respMap = Collections.singletonMap("bar", bytes(44));
                 executeMessage.setCustomPayload(reqMap);
@@ -231,7 +231,7 @@ public class MessagePayloadTest extends CQLTester
                 payloadEquals(reqMap, requestPayload);
                 payloadEquals(respMap, prepareResponse.getCustomPayload());
 
-                ExecuteMessage executeMessage = new ExecuteMessage(prepareResponse.statementId, QueryOptions.DEFAULT);
+                ExecuteMessage executeMessage = new ExecuteMessage(prepareResponse.statementId, prepareResponse.resultMetadataId, QueryOptions.DEFAULT);
                 reqMap = Collections.singletonMap("foo", bytes(44));
                 responsePayload = respMap = Collections.singletonMap("bar", bytes(44));
                 executeMessage.setCustomPayload(reqMap);
@@ -315,7 +315,7 @@ public class MessagePayloadTest extends CQLTester
                 prepareMessage.setCustomPayload(null);
                 ResultMessage.Prepared prepareResponse = (ResultMessage.Prepared) client.execute(prepareMessage);
 
-                ExecuteMessage executeMessage = new ExecuteMessage(prepareResponse.statementId, QueryOptions.DEFAULT);
+                ExecuteMessage executeMessage = new ExecuteMessage(prepareResponse.statementId, prepareResponse.resultMetadataId, QueryOptions.DEFAULT);
                 reqMap = Collections.singletonMap("foo", bytes(44));
                 responsePayload = Collections.singletonMap("bar", bytes(44));
                 executeMessage.setCustomPayload(reqMap);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/922dbdb6/tools/stress/src/org/apache/cassandra/stress/operations/predefined/CqlOperation.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/predefined/CqlOperation.java b/tools/stress/src/org/apache/cassandra/stress/operations/predefined/CqlOperation.java
index c524107..c89a1d1 100644
--- a/tools/stress/src/org/apache/cassandra/stress/operations/predefined/CqlOperation.java
+++ b/tools/stress/src/org/apache/cassandra/stress/operations/predefined/CqlOperation.java
@@ -281,11 +281,11 @@ public abstract class CqlOperation<V> extends PredefinedOperation
         }
 
         @Override
-        public <V> V execute(Object preparedStatementId, ByteBuffer key, List<Object> queryParams, ResultHandler<V> handler)
+        public <V> V execute(Object preparedStatement, ByteBuffer key, List<Object> queryParams, ResultHandler<V> handler)
         {
             return handler.javaDriverHandler().apply(
                     client.executePrepared(
-                            (PreparedStatement) preparedStatementId,
+                            (PreparedStatement) preparedStatement,
                             queryParams,
                             settings.command.consistencyLevel));
         }
@@ -313,11 +313,11 @@ public abstract class CqlOperation<V> extends PredefinedOperation
         }
 
         @Override
-        public <V> V execute(Object preparedStatementId, ByteBuffer key, List<Object> queryParams, ResultHandler<V> handler)
+        public <V> V execute(Object preparedStatement, ByteBuffer key, List<Object> queryParams, ResultHandler<V> handler)
         {
             return handler.simpleClientHandler().apply(
                     client.executePrepared(
-                            (byte[]) preparedStatementId,
+                            (ResultMessage.Prepared) preparedStatement,
                             toByteBufferParams(queryParams),
                             settings.command.consistencyLevel));
         }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org


Mime
View raw message