cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From xe...@apache.org
Subject [2/2] git commit: (stress) support for CQL prepared statements patch by David Alves; reviewed by Pavel Yaskevich for CASSANDRA-3633
Date Sat, 30 Jun 2012 12:16:59 GMT
(stress) support for CQL prepared statements
patch by David Alves; reviewed by Pavel Yaskevich for CASSANDRA-3633


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/b1c60d2b
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/b1c60d2b
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/b1c60d2b

Branch: refs/heads/trunk
Commit: b1c60d2b33815d7ba2136b5c3318f7dbae3ee062
Parents: b94d8d4
Author: Pavel Yaskevich <xedin@apache.org>
Authored: Sat Jun 30 14:52:55 2012 +0300
Committer: Pavel Yaskevich <xedin@apache.org>
Committed: Sat Jun 30 15:09:46 2012 +0300

----------------------------------------------------------------------
 CHANGES.txt                                        |    4 +
 .../src/org/apache/cassandra/stress/Session.java   |   26 ++++-
 .../org/apache/cassandra/stress/StressAction.java  |    4 +-
 .../cassandra/stress/operations/CounterAdder.java  |    3 +-
 .../cassandra/stress/operations/CounterGetter.java |    3 +-
 .../stress/operations/CqlCounterAdder.java         |   47 +++++++---
 .../stress/operations/CqlCounterGetter.java        |   38 ++++++--
 .../stress/operations/CqlIndexedRangeSlicer.java   |   44 +++++---
 .../cassandra/stress/operations/CqlInserter.java   |   50 +++++++---
 .../stress/operations/CqlMultiGetter.java          |    4 +-
 .../stress/operations/CqlRangeSlicer.java          |   38 ++++++--
 .../cassandra/stress/operations/CqlReader.java     |   61 ++++++++---
 .../stress/operations/IndexedRangeSlicer.java      |    3 +-
 .../cassandra/stress/operations/Inserter.java      |    3 +-
 .../cassandra/stress/operations/MultiGetter.java   |    3 +-
 .../cassandra/stress/operations/RangeSlicer.java   |    3 +-
 .../apache/cassandra/stress/operations/Reader.java |    3 +-
 .../cassandra/stress/util/CassandraClient.java     |   34 +++++++
 .../apache/cassandra/stress/util/Operation.java    |   77 +++++++++++++--
 19 files changed, 352 insertions(+), 96 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/b1c60d2b/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index eda806c..25d9784 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,3 +1,7 @@
+1.1.3
+ * (stress) support for CQL prepared statements (CASSANDRA-3633)
+
+
 1.1.2
  * Fix cleanup not deleting index entries (CASSANDRA-4379)
  * Use correct partitioner when saving + loading caches (CASSANDRA-4331)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b1c60d2b/tools/stress/src/org/apache/cassandra/stress/Session.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/Session.java b/tools/stress/src/org/apache/cassandra/stress/Session.java
index 5455e67..dbe1951 100644
--- a/tools/stress/src/org/apache/cassandra/stress/Session.java
+++ b/tools/stress/src/org/apache/cassandra/stress/Session.java
@@ -31,6 +31,7 @@ import org.apache.cassandra.db.marshal.*;
 import org.apache.commons.cli.*;
 
 import org.apache.cassandra.db.ColumnFamilyType;
+import org.apache.cassandra.stress.util.CassandraClient;
 import org.apache.cassandra.thrift.*;
 import org.apache.commons.lang.StringUtils;
 
@@ -80,6 +81,7 @@ public class Session implements Serializable
         availableOptions.addOption("g",  "keys-per-call",        true,   "Number of keys
to get_range_slices or multiget per call, default:1000");
         availableOptions.addOption("l",  "replication-factor",   true,   "Replication Factor
to use when creating needed column families, default:1");
         availableOptions.addOption("L",  "enable-cql",           false,  "Perform queries
using CQL (Cassandra Query Language).");
+        availableOptions.addOption("P",  "use-prepared-statements", false, "Perform queries
using prepared statements (only applicable to CQL).");
         availableOptions.addOption("e",  "consistency-level",    true,   "Consistency Level
to use (ONE, QUORUM, LOCAL_QUORUM, EACH_QUORUM, ALL, ANY), default:ONE");
         availableOptions.addOption("x",  "create-index",         true,   "Type of index to
create on needed column families (KEYS)");
         availableOptions.addOption("R",  "replication-strategy", true,   "Replication strategy
to use (only on insert if keyspace does not exist), default:org.apache.cassandra.locator.SimpleStrategy");
@@ -114,6 +116,7 @@ public class Session implements Serializable
     private boolean replicateOnWrite = true;
     private boolean ignoreErrors  = false;
     private boolean enable_cql    = false;
+    private boolean use_prepared  = false;
 
     private final String outFileName;
 
@@ -265,6 +268,16 @@ public class Session implements Serializable
             if (cmd.hasOption("L"))
                 enable_cql = true;
 
+            if (cmd.hasOption("P"))
+            {
+                if (!enable_cql)
+                {
+                    System.err.println("-P/--use-prepared-statements is only applicable with
CQL (-L/--enable-cql)");
+                    System.exit(-1);
+                }
+                use_prepared = true;
+            }
+
             if (cmd.hasOption("O"))
             {
                 String[] pairs = StringUtils.split(cmd.getOptionValue("O"), ',');
@@ -500,6 +513,11 @@ public class Session implements Serializable
         return enable_cql;
     }
 
+    public boolean usePreparedStatements()
+    {
+        return use_prepared;
+    }
+
     /**
      * Create Keyspace1 with Standard1 and Super1 column families
      */
@@ -556,7 +574,7 @@ public class Session implements Serializable
 
         keyspace.setCf_defs(new ArrayList<CfDef>(Arrays.asList(standardCfDef, superCfDef,
counterCfDef, counterSuperCfDef)));
 
-        Cassandra.Client client = getClient(false);
+        CassandraClient client = getClient(false);
 
         try
         {
@@ -578,7 +596,7 @@ public class Session implements Serializable
      * Thrift client connection with Keyspace1 set.
      * @return cassandra client connection
      */
-    public Cassandra.Client getClient()
+    public CassandraClient getClient()
     {
         return getClient(true);
     }
@@ -587,14 +605,14 @@ public class Session implements Serializable
      * @param setKeyspace - should we set keyspace for client or not
      * @return cassandra client connection
      */
-    public Cassandra.Client getClient(boolean setKeyspace)
+    public CassandraClient getClient(boolean setKeyspace)
     {
         // random node selection for fake load balancing
         String currentNode = nodes[Stress.randomizer.nextInt(nodes.length)];
 
         TSocket socket = new TSocket(currentNode, port);
         TTransport transport = (isUnframed()) ? socket : new TFramedTransport(socket);
-        Cassandra.Client client = new Cassandra.Client(new TBinaryProtocol(transport));
+        CassandraClient client = new CassandraClient(new TBinaryProtocol(transport));
 
         try
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b1c60d2b/tools/stress/src/org/apache/cassandra/stress/StressAction.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/StressAction.java b/tools/stress/src/org/apache/cassandra/stress/StressAction.java
index ac774b7..1227fe8 100644
--- a/tools/stress/src/org/apache/cassandra/stress/StressAction.java
+++ b/tools/stress/src/org/apache/cassandra/stress/StressAction.java
@@ -22,8 +22,8 @@ import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.SynchronousQueue;
 
 import org.apache.cassandra.stress.operations.*;
+import org.apache.cassandra.stress.util.CassandraClient;
 import org.apache.cassandra.stress.util.Operation;
-import org.apache.cassandra.thrift.Cassandra;
 
 public class StressAction extends Thread
 {
@@ -215,7 +215,7 @@ public class StressAction extends Thread
 
         public void run()
         {
-            Cassandra.Client connection = client.getClient();
+            CassandraClient connection = client.getClient();
 
             for (int i = 0; i < items; i++)
             {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b1c60d2b/tools/stress/src/org/apache/cassandra/stress/operations/CounterAdder.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/CounterAdder.java b/tools/stress/src/org/apache/cassandra/stress/operations/CounterAdder.java
index 0c80f0a..0420154 100644
--- a/tools/stress/src/org/apache/cassandra/stress/operations/CounterAdder.java
+++ b/tools/stress/src/org/apache/cassandra/stress/operations/CounterAdder.java
@@ -18,6 +18,7 @@
 package org.apache.cassandra.stress.operations;
 
 import org.apache.cassandra.stress.Session;
+import org.apache.cassandra.stress.util.CassandraClient;
 import org.apache.cassandra.stress.util.Operation;
 import org.apache.cassandra.db.ColumnFamilyType;
 import org.apache.cassandra.thrift.*;
@@ -37,7 +38,7 @@ public class CounterAdder extends Operation
         super(client, index);
     }
 
-    public void run(Cassandra.Client client) throws IOException
+    public void run(CassandraClient client) throws IOException
     {
         List<CounterColumn> columns = new ArrayList<CounterColumn>();
         List<CounterSuperColumn> superColumns = new ArrayList<CounterSuperColumn>();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b1c60d2b/tools/stress/src/org/apache/cassandra/stress/operations/CounterGetter.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/CounterGetter.java b/tools/stress/src/org/apache/cassandra/stress/operations/CounterGetter.java
index 3d8b1fd..a06298d 100644
--- a/tools/stress/src/org/apache/cassandra/stress/operations/CounterGetter.java
+++ b/tools/stress/src/org/apache/cassandra/stress/operations/CounterGetter.java
@@ -18,6 +18,7 @@
 package org.apache.cassandra.stress.operations;
 
 import org.apache.cassandra.stress.Session;
+import org.apache.cassandra.stress.util.CassandraClient;
 import org.apache.cassandra.stress.util.Operation;
 import org.apache.cassandra.db.ColumnFamilyType;
 import org.apache.cassandra.thrift.*;
@@ -33,7 +34,7 @@ public class CounterGetter extends Operation
         super(client, index);
     }
 
-    public void run(Cassandra.Client client) throws IOException
+    public void run(CassandraClient client) throws IOException
     {
         SliceRange sliceRange = new SliceRange();
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b1c60d2b/tools/stress/src/org/apache/cassandra/stress/operations/CqlCounterAdder.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/CqlCounterAdder.java
b/tools/stress/src/org/apache/cassandra/stress/operations/CqlCounterAdder.java
index fa82d57..7197eaa 100644
--- a/tools/stress/src/org/apache/cassandra/stress/operations/CqlCounterAdder.java
+++ b/tools/stress/src/org/apache/cassandra/stress/operations/CqlCounterAdder.java
@@ -23,39 +23,48 @@ package org.apache.cassandra.stress.operations;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.Collections;
 
 import org.apache.cassandra.db.ColumnFamilyType;
 import org.apache.cassandra.stress.Session;
+import org.apache.cassandra.stress.util.CassandraClient;
 import org.apache.cassandra.stress.util.Operation;
-import org.apache.cassandra.thrift.Cassandra;
 import org.apache.cassandra.thrift.Compression;
-
-import static com.google.common.base.Charsets.UTF_8;
+import org.apache.cassandra.utils.ByteBufferUtil;
 
 public class CqlCounterAdder extends Operation
 {
+    private static String cqlQuery = null;
+
     public CqlCounterAdder(Session client, int idx)
     {
         super(client, idx);
     }
 
-    public void run(Cassandra.Client client) throws IOException
+    public void run(CassandraClient client) throws IOException
     {
         if (session.getColumnFamilyType() == ColumnFamilyType.Super)
             throw new RuntimeException("Super columns are not implemented for CQL");
 
-        StringBuilder query = new StringBuilder(
-                "UPDATE Counter1 USING CONSISTENCY " + session.getConsistencyLevel().toString()
+ " SET ");
-
-        for (int i = 0; i < session.getColumnsPerKey(); i++)
+        if (cqlQuery == null)
         {
-            if (i > 0)
-                query.append(",");
-            query.append('C').append(i).append("=C").append(i).append("+1");
+            StringBuilder query = new StringBuilder(
+                    "UPDATE Counter1 USING CONSISTENCY " + session.getConsistencyLevel().toString()
+ " SET ");
+
+            for (int i = 0; i < session.getColumnsPerKey(); i++)
+            {
+                if (i > 0)
+                    query.append(",");
+
+                query.append('C').append(i).append("=C").append(i).append("+1");
+
+            }
+            query.append(" WHERE KEY=?");
+            cqlQuery = query.toString();
         }
 
         String key = String.format("%0" + session.getTotalKeysLength() + "d", index);
-        query.append( " WHERE KEY=").append(getQuotedCqlBlob(key.getBytes(UTF_8)));
+        String formattedQuery = null;
 
         long start = System.currentTimeMillis();
 
@@ -69,7 +78,19 @@ public class CqlCounterAdder extends Operation
 
             try
             {
-                client.execute_cql_query(ByteBuffer.wrap(query.toString().getBytes()), Compression.NONE);
+                if (session.usePreparedStatements())
+                {
+                    Integer stmntId = getPreparedStatement(client, cqlQuery);
+                    client.execute_prepared_cql_query(stmntId,
+                            Collections.singletonList(ByteBufferUtil.bytes(getUnQuotedCqlBlob(key))));
+                }
+                else
+                {
+                    if (formattedQuery == null)
+                        formattedQuery = formatCqlQuery(cqlQuery, Collections.singletonList(getUnQuotedCqlBlob(key)));
+                    client.execute_cql_query(ByteBuffer.wrap(formattedQuery.getBytes()),
Compression.NONE);
+                }
+
                 success = true;
             }
             catch (Exception e)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b1c60d2b/tools/stress/src/org/apache/cassandra/stress/operations/CqlCounterGetter.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/CqlCounterGetter.java
b/tools/stress/src/org/apache/cassandra/stress/operations/CqlCounterGetter.java
index 1044c6d..1133747 100644
--- a/tools/stress/src/org/apache/cassandra/stress/operations/CqlCounterGetter.java
+++ b/tools/stress/src/org/apache/cassandra/stress/operations/CqlCounterGetter.java
@@ -23,33 +23,41 @@ package org.apache.cassandra.stress.operations;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.Collections;
 
 import org.apache.cassandra.db.ColumnFamilyType;
 import org.apache.cassandra.stress.Session;
+import org.apache.cassandra.stress.util.CassandraClient;
 import org.apache.cassandra.stress.util.Operation;
-import org.apache.cassandra.thrift.Cassandra;
 import org.apache.cassandra.thrift.Compression;
 import org.apache.cassandra.thrift.CqlResult;
 import org.apache.cassandra.thrift.CqlResultType;
+import org.apache.cassandra.utils.ByteBufferUtil;
 
 public class CqlCounterGetter extends Operation
 {
+    private static String cqlQuery = null;
 
     public CqlCounterGetter(Session client, int idx)
     {
         super(client, idx);
     }
 
-    public void run(Cassandra.Client client) throws IOException
+    public void run(CassandraClient client) throws IOException
     {
         if (session.getColumnFamilyType() == ColumnFamilyType.Super)
             throw new RuntimeException("Super columns are not implemented for CQL");
 
+        if (cqlQuery == null)
+        {
+            StringBuilder query = new StringBuilder("SELECT FIRST ").append(session.getColumnsPerKey())
+                    .append(" ''..'' FROM Counter1 USING CONSISTENCY ").append(session.getConsistencyLevel().toString())
+                    .append(" WHERE KEY=?");
+            cqlQuery = query.toString();
+        }
+
         byte[] key = generateKey();
-        String hexKey = getQuotedCqlBlob(key);
-        StringBuilder query = new StringBuilder("SELECT FIRST ").append(session.getColumnsPerKey())
-                .append(" ''..'' FROM Counter1 USING CONSISTENCY ").append(session.getConsistencyLevel().toString())
-                .append(" WHERE KEY=").append(hexKey);
+        String formattedQuery = null;
 
         long start = System.currentTimeMillis();
 
@@ -63,8 +71,22 @@ public class CqlCounterGetter extends Operation
 
             try
             {
-                CqlResult result = client.execute_cql_query(ByteBuffer.wrap(query.toString().getBytes()),
-                                                            Compression.NONE);
+                CqlResult result = null;
+
+                if (session.usePreparedStatements())
+                {
+                    Integer stmntId = getPreparedStatement(client, cqlQuery);
+                    result = client.execute_prepared_cql_query(stmntId,
+                            Collections.singletonList(ByteBufferUtil.bytes(getUnQuotedCqlBlob(key))));
+                }
+                else
+                {
+                    if (formattedQuery == null)
+                        formattedQuery = formatCqlQuery(cqlQuery, Collections.singletonList(getUnQuotedCqlBlob(key)));
+                    result = client.execute_cql_query(ByteBuffer.wrap(formattedQuery.getBytes()),
+                                                      Compression.NONE);
+                }
+
                 assert result.type.equals(CqlResultType.ROWS) : "expected ROWS result type";
                 assert result.rows.size() == 0 : "expected exactly one row";
                 success = (result.rows.get(0).columns.size() != 0);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b1c60d2b/tools/stress/src/org/apache/cassandra/stress/operations/CqlIndexedRangeSlicer.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/CqlIndexedRangeSlicer.java
b/tools/stress/src/org/apache/cassandra/stress/operations/CqlIndexedRangeSlicer.java
index 978c1c4..383ad67 100644
--- a/tools/stress/src/org/apache/cassandra/stress/operations/CqlIndexedRangeSlicer.java
+++ b/tools/stress/src/org/apache/cassandra/stress/operations/CqlIndexedRangeSlicer.java
@@ -23,30 +23,29 @@ package org.apache.cassandra.stress.operations;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.Collections;
 import java.util.List;
 
 import org.apache.cassandra.db.ColumnFamilyType;
 import org.apache.cassandra.stress.Session;
+import org.apache.cassandra.stress.util.CassandraClient;
 import org.apache.cassandra.stress.util.Operation;
-import org.apache.cassandra.thrift.Cassandra;
 import org.apache.cassandra.thrift.Compression;
 import org.apache.cassandra.thrift.CqlResult;
 import org.apache.cassandra.thrift.CqlRow;
 import org.apache.cassandra.utils.ByteBufferUtil;
 
-import static org.apache.cassandra.utils.Hex.bytesToHex;;
-
 public class CqlIndexedRangeSlicer extends Operation
 {
     private static List<ByteBuffer> values = null;
-    private static String clauseFragment = "KEY > '%s' LIMIT %d";
+    private static String cqlQuery = null;
 
     public CqlIndexedRangeSlicer(Session client, int idx)
     {
         super(client, idx);
     }
 
-    public void run(Cassandra.Client client) throws IOException
+    public void run(CassandraClient client) throws IOException
     {
         if (session.getColumnFamilyType() == ColumnFamilyType.Super)
             throw new RuntimeException("Super columns are not implemented for CQL");
@@ -54,12 +53,18 @@ public class CqlIndexedRangeSlicer extends Operation
         if (values == null)
             values = generateValues();
 
-        String format = "%0" + session.getTotalKeysLength() + "d";
+        if (cqlQuery == null)
+        {
+            StringBuilder query = new StringBuilder("SELECT FIRST ").append(session.getColumnsPerKey())
+                 .append(" ''..'' FROM Standard1 USING CONSISTENCY ").append(session.getConsistencyLevel())
+                 .append(" WHERE C1=").append(getUnQuotedCqlBlob(values.get(1).array()))
+                 .append(" AND KEY > ? LIMIT ").append(session.getKeysPerCall());
+
+            cqlQuery = query.toString();
+        }
 
+        String format = "%0" + session.getTotalKeysLength() + "d";
         String startOffset = String.format(format, 0);
-        StringBuilder query = new StringBuilder("SELECT FIRST ").append(session.getColumnsPerKey())
-                .append(" ''..'' FROM Standard1 USING CONSISTENCY ").append(session.getConsistencyLevel())
-                .append(" WHERE C1 = ").append(getQuotedCqlBlob(values.get(1).array())).append("
AND ");
 
         int expectedPerValue = session.getNumKeys() / values.size(), received = 0;
 
@@ -70,6 +75,8 @@ public class CqlIndexedRangeSlicer extends Operation
             boolean success = false;
             String exceptionMessage = null;
             CqlResult results = null;
+            String formattedQuery = null;
+            List<String> queryParms = Collections.singletonList(getUnQuotedCqlBlob(startOffset));
 
             for (int t = 0; t < session.getRetryTimes(); t++)
             {
@@ -78,8 +85,18 @@ public class CqlIndexedRangeSlicer extends Operation
 
                 try
                 {
-                    ByteBuffer queryBytes = ByteBuffer.wrap(makeQuery(query, startOffset).getBytes());
-                    results = client.execute_cql_query(queryBytes, Compression.NONE);
+                    if (session.usePreparedStatements())
+                    {
+                        Integer stmntId = getPreparedStatement(client, cqlQuery);
+                        results = client.execute_prepared_cql_query(stmntId, queryParamsAsByteBuffer(queryParms));
+                    }
+                    else
+                    {
+                        if (formattedQuery ==  null)
+                            formattedQuery = formatCqlQuery(cqlQuery, queryParms);
+                        results = client.execute_cql_query(ByteBuffer.wrap(formattedQuery.getBytes()),
Compression.NONE);
+                    }
+
                     success = (results.rows.size() != 0);
                 }
                 catch (Exception e)
@@ -109,11 +126,6 @@ public class CqlIndexedRangeSlicer extends Operation
         }
     }
 
-    private String makeQuery(StringBuilder base, String startOffset)
-    {
-        return base.toString() + String.format(clauseFragment, bytesToHex(startOffset.getBytes()),
session.getKeysPerCall());
-    }
-
     /**
      * Get maximum key from CqlRow list
      * @param rows list of the CqlRow objects

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b1c60d2b/tools/stress/src/org/apache/cassandra/stress/operations/CqlInserter.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/CqlInserter.java b/tools/stress/src/org/apache/cassandra/stress/operations/CqlInserter.java
index d7a7641..c729f2f 100644
--- a/tools/stress/src/org/apache/cassandra/stress/operations/CqlInserter.java
+++ b/tools/stress/src/org/apache/cassandra/stress/operations/CqlInserter.java
@@ -23,25 +23,27 @@ package org.apache.cassandra.stress.operations;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.cassandra.db.ColumnFamilyType;
 import org.apache.cassandra.stress.Session;
+import org.apache.cassandra.stress.util.CassandraClient;
 import org.apache.cassandra.stress.util.Operation;
-import org.apache.cassandra.thrift.Cassandra;
 import org.apache.cassandra.thrift.Compression;
 import org.apache.cassandra.utils.UUIDGen;
 
 public class CqlInserter extends Operation
 {
     private static List<ByteBuffer> values;
+    private static String cqlQuery = null;
 
     public CqlInserter(Session client, int idx)
     {
         super(client, idx);
     }
 
-    public void run(Cassandra.Client client) throws IOException
+    public void run(CassandraClient client) throws IOException
     {
         if (session.getColumnFamilyType() == ColumnFamilyType.Super)
             throw new RuntimeException("Super columns are not implemented for CQL");
@@ -49,26 +51,39 @@ public class CqlInserter extends Operation
         if (values == null)
             values = generateValues();
 
-        StringBuilder query = new StringBuilder("UPDATE Standard1 USING CONSISTENCY ")
-                .append(session.getConsistencyLevel().toString()).append(" SET ");
+        // Construct a query string once.
+        if (cqlQuery == null)
+        {
+            StringBuilder query = new StringBuilder("UPDATE Standard1 USING CONSISTENCY ")
+                    .append(session.getConsistencyLevel().toString()).append(" SET ");
 
+            for (int i = 0; i < session.getColumnsPerKey(); i++)
+            {
+                if (i > 0) query.append(',');
+                query.append("?=?");
+            }
+
+            query.append(" WHERE KEY=?");
+            cqlQuery = query.toString();
+        }
+
+        List<String> queryParms = new ArrayList<String>();
         for (int i = 0; i < session.getColumnsPerKey(); i++)
         {
-            if (i > 0)
-                query.append(',');
-
             // Column name
             if (session.timeUUIDComparator)
-                query.append(UUIDGen.makeType1UUIDFromHost(Session.getLocalAddress()).toString());
+                queryParms.add(UUIDGen.makeType1UUIDFromHost(Session.getLocalAddress()).toString());
             else
-                query.append('C').append(i);
+                queryParms.add(new String("C" + i));
 
             // Column value
-            query.append('=').append(getQuotedCqlBlob(values.get(i % values.size()).array()));
+            queryParms.add(new String(getUnQuotedCqlBlob(values.get(i % values.size()).array())));
         }
 
         String key = String.format("%0" + session.getTotalKeysLength() + "d", index);
-        query.append(" WHERE KEY=").append(getQuotedCqlBlob(key));
+        queryParms.add(new String(getUnQuotedCqlBlob(key)));
+
+        String formattedQuery = null;
 
         long start = System.currentTimeMillis();
 
@@ -82,7 +97,18 @@ public class CqlInserter extends Operation
 
             try
             {
-                client.execute_cql_query(ByteBuffer.wrap(query.toString().getBytes()), Compression.NONE);
+                if (session.usePreparedStatements())
+                {
+                    Integer stmntId = getPreparedStatement(client, cqlQuery);
+                    client.execute_prepared_cql_query(stmntId, queryParamsAsByteBuffer(queryParms));
+                }
+                else
+                {
+                    if (formattedQuery == null)
+                        formattedQuery = formatCqlQuery(cqlQuery, queryParms);
+                    client.execute_cql_query(ByteBuffer.wrap(formattedQuery.getBytes()),
Compression.NONE);
+                }
+
                 success = true;
             }
             catch (Exception e)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b1c60d2b/tools/stress/src/org/apache/cassandra/stress/operations/CqlMultiGetter.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/CqlMultiGetter.java b/tools/stress/src/org/apache/cassandra/stress/operations/CqlMultiGetter.java
index 3125cff..e9b1f47 100644
--- a/tools/stress/src/org/apache/cassandra/stress/operations/CqlMultiGetter.java
+++ b/tools/stress/src/org/apache/cassandra/stress/operations/CqlMultiGetter.java
@@ -24,8 +24,8 @@ package org.apache.cassandra.stress.operations;
 import java.io.IOException;
 
 import org.apache.cassandra.stress.Session;
+import org.apache.cassandra.stress.util.CassandraClient;
 import org.apache.cassandra.stress.util.Operation;
-import org.apache.cassandra.thrift.Cassandra;
 
 public class CqlMultiGetter extends Operation
 {
@@ -34,7 +34,7 @@ public class CqlMultiGetter extends Operation
         super(client, idx);
     }
 
-    public void run(Cassandra.Client client) throws IOException
+    public void run(CassandraClient client) throws IOException
     {
         throw new RuntimeException("Multiget is not implemented for CQL");
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b1c60d2b/tools/stress/src/org/apache/cassandra/stress/operations/CqlRangeSlicer.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/CqlRangeSlicer.java b/tools/stress/src/org/apache/cassandra/stress/operations/CqlRangeSlicer.java
index e57a9ac..8b20867 100644
--- a/tools/stress/src/org/apache/cassandra/stress/operations/CqlRangeSlicer.java
+++ b/tools/stress/src/org/apache/cassandra/stress/operations/CqlRangeSlicer.java
@@ -23,30 +23,41 @@ package org.apache.cassandra.stress.operations;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.Collections;
+
+import org.apache.cassandra.utils.ByteBufferUtil;
 
 import org.apache.cassandra.db.ColumnFamilyType;
 import org.apache.cassandra.stress.Session;
+import org.apache.cassandra.stress.util.CassandraClient;
 import org.apache.cassandra.stress.util.Operation;
-import org.apache.cassandra.thrift.Cassandra;
 import org.apache.cassandra.thrift.Compression;
 import org.apache.cassandra.thrift.CqlResult;
 
 public class CqlRangeSlicer extends Operation
 {
+    private static String cqlQuery = null;
+
     public CqlRangeSlicer(Session client, int idx)
     {
         super(client, idx);
     }
 
-    public void run(Cassandra.Client client) throws IOException
+    public void run(CassandraClient client) throws IOException
     {
         if (session.getColumnFamilyType() == ColumnFamilyType.Super)
             throw new RuntimeException("Super columns are not implemented for CQL");
 
+        if (cqlQuery == null)
+        {
+            StringBuilder query = new StringBuilder("SELECT FIRST ").append(session.getColumnsPerKey())
+                    .append(" ''..'' FROM Standard1 USING CONSISTENCY ").append(session.getConsistencyLevel().toString())
+                    .append(" WHERE KEY > ?");
+            cqlQuery = query.toString();
+        }
+
         String key = String.format("%0" +  session.getTotalKeysLength() + "d", index);
-        StringBuilder query = new StringBuilder("SELECT FIRST ").append(session.getColumnsPerKey())
-                .append(" ''..'' FROM Standard1 USING CONSISTENCY ").append(session.getConsistencyLevel().toString())
-                .append(" WHERE KEY > ").append(getQuotedCqlBlob(key));
+        String formattedQuery = null;
 
         long startTime = System.currentTimeMillis();
 
@@ -61,8 +72,21 @@ public class CqlRangeSlicer extends Operation
 
             try
             {
-                CqlResult result = client.execute_cql_query(ByteBuffer.wrap(query.toString().getBytes()),
-                                                            Compression.NONE);
+                CqlResult result = null;
+
+                if (session.usePreparedStatements())
+                {
+                    Integer stmntId = getPreparedStatement(client, cqlQuery);
+                    result = client.execute_prepared_cql_query(stmntId,
+                            Collections.singletonList(ByteBufferUtil.bytes(getUnQuotedCqlBlob(key))));
+                }
+                else
+                {
+                    if (formattedQuery == null)
+                        formattedQuery = formatCqlQuery(cqlQuery, Collections.singletonList(getUnQuotedCqlBlob(key)));
+                    result = client.execute_cql_query(ByteBuffer.wrap(formattedQuery.getBytes()),
Compression.NONE);
+                }
+
                 rowCount = result.rows.size();
                 success = (rowCount != 0);
             }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b1c60d2b/tools/stress/src/org/apache/cassandra/stress/operations/CqlReader.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/CqlReader.java b/tools/stress/src/org/apache/cassandra/stress/operations/CqlReader.java
index 93a5c79..cfac2d6 100644
--- a/tools/stress/src/org/apache/cassandra/stress/operations/CqlReader.java
+++ b/tools/stress/src/org/apache/cassandra/stress/operations/CqlReader.java
@@ -23,46 +23,60 @@ package org.apache.cassandra.stress.operations;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
 
 import org.apache.cassandra.db.ColumnFamilyType;
 import org.apache.cassandra.stress.Session;
+import org.apache.cassandra.stress.util.CassandraClient;
 import org.apache.cassandra.stress.util.Operation;
-import org.apache.cassandra.thrift.Cassandra;
 import org.apache.cassandra.thrift.Compression;
 import org.apache.cassandra.thrift.CqlResult;
 
 public class CqlReader extends Operation
 {
+    private static String cqlQuery = null;
+
     public CqlReader(Session client, int idx)
     {
         super(client, idx);
     }
 
-    public void run(Cassandra.Client client) throws IOException
+    public void run(CassandraClient client) throws IOException
     {
         if (session.getColumnFamilyType() == ColumnFamilyType.Super)
             throw new RuntimeException("Super columns are not implemented for CQL");
 
-        StringBuilder query = new StringBuilder("SELECT ");
-
-        if (session.columnNames == null)
-        {
-            query.append("FIRST ").append(session.getColumnsPerKey()).append(" ''..''");
-        }
-        else
+        if (cqlQuery == null)
         {
-            for (int i = 0; i < session.columnNames.size(); i++)
+            StringBuilder query = new StringBuilder("SELECT ");
+
+            if (session.columnNames == null)
+                query.append("FIRST ").append(session.getColumnsPerKey()).append(" ''..''");
+            else
             {
-                if (i > 0)
-                    query.append(",");
-                query.append('\'').append(new String(session.columnNames.get(i).array())).append('\'');
+                for (int i = 0; i < session.columnNames.size(); i++)
+                {
+                    if (i > 0) query.append(",");
+                    query.append('?');
+                }
             }
+
+            query.append(" FROM Standard1 USING CONSISTENCY ").append(session.getConsistencyLevel().toString());
+            query.append(" WHERE KEY=?");
+
+            cqlQuery = query.toString();
         }
 
+        List<String> queryParams = new ArrayList<String>();
+        if (session.columnNames != null)
+            for (int i = 0; i < session.columnNames.size(); i++)
+                queryParams.add(getUnQuotedCqlBlob(session.columnNames.get(i).array()));
+
         byte[] key = generateKey();
+        queryParams.add(getUnQuotedCqlBlob(key));
 
-        query.append(" FROM Standard1 USING CONSISTENCY ").append(session.getConsistencyLevel().toString());
-        query.append(" WHERE KEY=").append(getQuotedCqlBlob(key));
+        String formattedQuery = null;
 
         long start = System.currentTimeMillis();
 
@@ -76,8 +90,21 @@ public class CqlReader extends Operation
 
             try
             {
-                CqlResult result = client.execute_cql_query(ByteBuffer.wrap(query.toString().getBytes()),
-                                                            Compression.NONE);
+                CqlResult result = null;
+
+                if (session.usePreparedStatements())
+                {
+                    Integer stmntId = getPreparedStatement(client, cqlQuery);
+                    result = client.execute_prepared_cql_query(stmntId, queryParamsAsByteBuffer(queryParams));
+                }
+                else
+                {
+                    if (formattedQuery == null)
+                        formattedQuery = formatCqlQuery(cqlQuery, queryParams);
+                    result = client.execute_cql_query(ByteBuffer.wrap(formattedQuery.getBytes()),
+                                                      Compression.NONE);
+                }
+
                 success = (result.rows.get(0).columns.size() != 0);
             }
             catch (Exception e)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b1c60d2b/tools/stress/src/org/apache/cassandra/stress/operations/IndexedRangeSlicer.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/IndexedRangeSlicer.java
b/tools/stress/src/org/apache/cassandra/stress/operations/IndexedRangeSlicer.java
index c117862..8768de8 100644
--- a/tools/stress/src/org/apache/cassandra/stress/operations/IndexedRangeSlicer.java
+++ b/tools/stress/src/org/apache/cassandra/stress/operations/IndexedRangeSlicer.java
@@ -18,6 +18,7 @@
 package org.apache.cassandra.stress.operations;
 
 import org.apache.cassandra.stress.Session;
+import org.apache.cassandra.stress.util.CassandraClient;
 import org.apache.cassandra.stress.util.Operation;
 import org.apache.cassandra.thrift.*;
 import org.apache.cassandra.utils.ByteBufferUtil;
@@ -36,7 +37,7 @@ public class IndexedRangeSlicer extends Operation
         super(client, index);
     }
 
-    public void run(Cassandra.Client client) throws IOException
+    public void run(CassandraClient client) throws IOException
     {
         if (values == null)
             values = generateValues();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b1c60d2b/tools/stress/src/org/apache/cassandra/stress/operations/Inserter.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/Inserter.java b/tools/stress/src/org/apache/cassandra/stress/operations/Inserter.java
index a887724..0623e4c 100644
--- a/tools/stress/src/org/apache/cassandra/stress/operations/Inserter.java
+++ b/tools/stress/src/org/apache/cassandra/stress/operations/Inserter.java
@@ -18,6 +18,7 @@
 package org.apache.cassandra.stress.operations;
 
 import org.apache.cassandra.stress.Session;
+import org.apache.cassandra.stress.util.CassandraClient;
 import org.apache.cassandra.stress.util.Operation;
 import org.apache.cassandra.db.ColumnFamilyType;
 import org.apache.cassandra.thrift.*;
@@ -40,7 +41,7 @@ public class Inserter extends Operation
         super(client, index);
     }
 
-    public void run(Cassandra.Client client) throws IOException
+    public void run(CassandraClient client) throws IOException
     {
         if (values == null)
             values = generateValues();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b1c60d2b/tools/stress/src/org/apache/cassandra/stress/operations/MultiGetter.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/MultiGetter.java b/tools/stress/src/org/apache/cassandra/stress/operations/MultiGetter.java
index c50dd1b..f569f66 100644
--- a/tools/stress/src/org/apache/cassandra/stress/operations/MultiGetter.java
+++ b/tools/stress/src/org/apache/cassandra/stress/operations/MultiGetter.java
@@ -18,6 +18,7 @@
 package org.apache.cassandra.stress.operations;
 
 import org.apache.cassandra.stress.Session;
+import org.apache.cassandra.stress.util.CassandraClient;
 import org.apache.cassandra.stress.util.Operation;
 import org.apache.cassandra.db.ColumnFamilyType;
 import org.apache.cassandra.thrift.*;
@@ -37,7 +38,7 @@ public class MultiGetter extends Operation
         super(client, index);
     }
 
-    public void run(Cassandra.Client client) throws IOException
+    public void run(CassandraClient client) throws IOException
     {
         SlicePredicate predicate = new SlicePredicate().setSlice_range(new SliceRange(ByteBufferUtil.EMPTY_BYTE_BUFFER,
                                                                                       ByteBufferUtil.EMPTY_BYTE_BUFFER,

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b1c60d2b/tools/stress/src/org/apache/cassandra/stress/operations/RangeSlicer.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/RangeSlicer.java b/tools/stress/src/org/apache/cassandra/stress/operations/RangeSlicer.java
index 308eefe..e462e30 100644
--- a/tools/stress/src/org/apache/cassandra/stress/operations/RangeSlicer.java
+++ b/tools/stress/src/org/apache/cassandra/stress/operations/RangeSlicer.java
@@ -18,6 +18,7 @@
 package org.apache.cassandra.stress.operations;
 
 import org.apache.cassandra.stress.Session;
+import org.apache.cassandra.stress.util.CassandraClient;
 import org.apache.cassandra.stress.util.Operation;
 import org.apache.cassandra.db.ColumnFamilyType;
 import org.apache.cassandra.thrift.*;
@@ -36,7 +37,7 @@ public class RangeSlicer extends Operation
         super(client, index);
     }
 
-    public void run(Cassandra.Client client) throws IOException
+    public void run(CassandraClient client) throws IOException
     {
         String format = "%0" + session.getTotalKeysLength() + "d";
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b1c60d2b/tools/stress/src/org/apache/cassandra/stress/operations/Reader.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/Reader.java b/tools/stress/src/org/apache/cassandra/stress/operations/Reader.java
index b5a8781..412ebdf 100644
--- a/tools/stress/src/org/apache/cassandra/stress/operations/Reader.java
+++ b/tools/stress/src/org/apache/cassandra/stress/operations/Reader.java
@@ -18,6 +18,7 @@
 package org.apache.cassandra.stress.operations;
 
 import org.apache.cassandra.stress.Session;
+import org.apache.cassandra.stress.util.CassandraClient;
 import org.apache.cassandra.stress.util.Operation;
 import org.apache.cassandra.db.ColumnFamilyType;
 import org.apache.cassandra.thrift.*;
@@ -35,7 +36,7 @@ public class Reader extends Operation
         super(client, index);
     }
 
-    public void run(Cassandra.Client client) throws IOException
+    public void run(CassandraClient client) throws IOException
     {
         // initialize SlicePredicate with existing SliceRange
         SlicePredicate predicate = new SlicePredicate();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b1c60d2b/tools/stress/src/org/apache/cassandra/stress/util/CassandraClient.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/util/CassandraClient.java b/tools/stress/src/org/apache/cassandra/stress/util/CassandraClient.java
new file mode 100644
index 0000000..5136a55
--- /dev/null
+++ b/tools/stress/src/org/apache/cassandra/stress/util/CassandraClient.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.stress.util;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.cassandra.thrift.Cassandra.Client;
+import org.apache.thrift.protocol.TProtocol;
+
+public class CassandraClient extends Client
+{
+    public Map<Integer, Integer> preparedStatements = new HashMap<Integer, Integer>();
+
+    public CassandraClient(TProtocol protocol)
+    {
+        super(protocol);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b1c60d2b/tools/stress/src/org/apache/cassandra/stress/util/Operation.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/util/Operation.java b/tools/stress/src/org/apache/cassandra/stress/util/Operation.java
index 4e08909..cbeaad7 100644
--- a/tools/stress/src/org/apache/cassandra/stress/util/Operation.java
+++ b/tools/stress/src/org/apache/cassandra/stress/util/Operation.java
@@ -17,6 +17,8 @@
  */
 package org.apache.cassandra.stress.util;
 
+import static com.google.common.base.Charsets.UTF_8;
+
 import java.io.IOException;
 import java.math.BigInteger;
 import java.nio.ByteBuffer;
@@ -25,17 +27,19 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Random;
 
-import static com.google.common.base.Charsets.UTF_8;
+import com.google.common.base.Function;
+import com.google.common.collect.Lists;
 
 import org.apache.cassandra.db.marshal.TimeUUIDType;
 import org.apache.cassandra.stress.Session;
 import org.apache.cassandra.stress.Stress;
-import org.apache.cassandra.thrift.Cassandra;
+import org.apache.cassandra.thrift.Compression;
+import org.apache.cassandra.thrift.CqlPreparedResult;
 import org.apache.cassandra.thrift.InvalidRequestException;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.UUIDGen;
 import org.apache.cassandra.utils.Hex;
+import org.apache.cassandra.utils.UUIDGen;
 
 public abstract class Operation
 {
@@ -61,7 +65,7 @@ public abstract class Operation
      * @param client Cassandra Thrift client connection
      * @throws IOException on any I/O error.
      */
-    public abstract void run(Cassandra.Client client) throws IOException;
+    public abstract void run(CassandraClient client) throws IOException;
 
     // Utility methods
 
@@ -226,13 +230,70 @@ public abstract class Operation
             System.err.println(message);
     }
 
-    protected String getQuotedCqlBlob(String term)
+    protected String getUnQuotedCqlBlob(String term)
+    {
+        return getUnQuotedCqlBlob(term.getBytes());
+    }
+
+    protected String getUnQuotedCqlBlob(byte[] term)
+    {
+        return Hex.bytesToHex(term);
+    }
+
+    protected List<ByteBuffer> queryParamsAsByteBuffer(List<String> queryParams)
+    {
+        return Lists.transform(queryParams, new Function<String, ByteBuffer>()
+        {
+            @Override
+            public ByteBuffer apply(String param)
+            {
+                return ByteBufferUtil.bytes(param);
+            }
+        });
+    }
+
+    /**
+     * Constructs a CQL query string by replacing instances of the character
+     * '?', with the corresponding parameter.
+     *
+     * @param query base query string to format
+     * @param parms sequence of string query parameters
+     * @return formatted CQL query string
+     */
+    protected static String formatCqlQuery(String query, List<String> parms)
     {
-        return getQuotedCqlBlob(term.getBytes());
+        int marker = 0, position = 0;
+        StringBuilder result = new StringBuilder();
+
+        if (-1 == (marker = query.indexOf('?')) || parms.size() == 0)
+            return query;
+
+        for (String parm : parms)
+        {
+            result.append(query.substring(position, marker));
+            result.append('\'').append(parm).append('\'');
+
+            position = marker + 1;
+            if (-1 == (marker = query.indexOf('?', position + 1)))
+                break;
+        }
+
+        if (position < query.length())
+            result.append(query.substring(position));
+
+        return result.toString();
     }
 
-    protected String getQuotedCqlBlob(byte[] term)
+    protected static Integer getPreparedStatement(CassandraClient client, String cqlQuery)
throws Exception
     {
-        return String.format("'%s'", Hex.bytesToHex(term));
+        Integer statementId = client.preparedStatements.get(cqlQuery.hashCode());
+        if (statementId == null)
+        {
+            CqlPreparedResult response = client.prepare_cql_query(ByteBufferUtil.bytes(cqlQuery),
Compression.NONE);
+            statementId = response.itemId;
+            client.preparedStatements.put(cqlQuery.hashCode(), statementId);
+        }
+
+        return statementId;
     }
 }


Mime
View raw message