cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From slebre...@apache.org
Subject [1/2] git commit: Add binary protocol to stress
Date Wed, 06 Mar 2013 08:09:57 GMT
Add binary protocol to stress

patch by slebresne; reviewed by yukim for CASSANDRA-4993


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/06699d47
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/06699d47
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/06699d47

Branch: refs/heads/trunk
Commit: 06699d47b62d8af8dbb60481ab36e5ed1805e0a0
Parents: c315745
Author: Sylvain Lebresne <sylvain@datastax.com>
Authored: Wed Mar 6 09:07:04 2013 +0100
Committer: Sylvain Lebresne <sylvain@datastax.com>
Committed: Wed Mar 6 09:08:18 2013 +0100

----------------------------------------------------------------------
 CHANGES.txt                                        |    1 +
 bin/cassandra                                      |    2 +-
 .../apache/cassandra/transport/SimpleClient.java   |    5 +
 .../src/org/apache/cassandra/stress/Session.java   |   26 ++++
 .../org/apache/cassandra/stress/StressAction.java  |   62 +++++++---
 .../cassandra/stress/operations/CQLOperation.java  |   96 +++++++++++++++
 .../stress/operations/CqlCounterAdder.java         |   39 +++----
 .../stress/operations/CqlCounterGetter.java        |   43 +++----
 .../stress/operations/CqlIndexedRangeSlicer.java   |   75 +++++++----
 .../cassandra/stress/operations/CqlInserter.java   |   39 +++----
 .../stress/operations/CqlMultiGetter.java          |    6 +
 .../stress/operations/CqlRangeSlicer.java          |   49 ++++----
 .../cassandra/stress/operations/CqlReader.java     |   42 +++----
 .../apache/cassandra/stress/util/Operation.java    |   29 ++++-
 14 files changed, 344 insertions(+), 170 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/06699d47/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index f4e854b..d53a1e3 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -14,6 +14,7 @@
  * cli: Warn about missing CQL3 tables in schema descriptions (CASSANDRA-5309)
  * Re-enable unknown option in replication/compaction strategies option for
    backward compatibility (CASSANDRA-4795)
+ * Add binary protocol support to stress (CASSANDRA-4993)
 Merged from 1.1:
  * nodetool: ability to repair specific range (CASSANDRA-5280)
  * Fix possible assertion triggered in SliceFromReadCommand (CASSANDRA-5284)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/06699d47/bin/cassandra
----------------------------------------------------------------------
diff --git a/bin/cassandra b/bin/cassandra
index 5403257..25498aa 100755
--- a/bin/cassandra
+++ b/bin/cassandra
@@ -129,7 +129,7 @@ launch_service()
     if [ "x$pidpath" != "x" ]; then
         cassandra_parms="$cassandra_parms -Dcassandra-pidfile=$pidpath"
     fi
-    
+
     # The cassandra-foreground option will tell CassandraDaemon not
     # to close stdout/stderr, but it's up to us not to background.
     if [ "x$foreground" != "x" ]; then

http://git-wip-us.apache.org/repos/asf/cassandra/blob/06699d47/src/java/org/apache/cassandra/transport/SimpleClient.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/SimpleClient.java b/src/java/org/apache/cassandra/transport/SimpleClient.java
index f2963bd..7979570 100644
--- a/src/java/org/apache/cassandra/transport/SimpleClient.java
+++ b/src/java/org/apache/cassandra/transport/SimpleClient.java
@@ -93,6 +93,11 @@ public class SimpleClient
         this.encryptionOptions = encryptionOptions;
     }
 
+    public SimpleClient(String host, int port)
+    {
+        this(host, port, new ClientEncryptionOptions());
+    }
+
     public void connect(boolean useCompression) throws IOException
     {
         establishConnection();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/06699d47/tools/stress/src/org/apache/cassandra/stress/Session.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/Session.java b/tools/stress/src/org/apache/cassandra/stress/Session.java
index 804e4e8..d16ee78 100644
--- a/tools/stress/src/org/apache/cassandra/stress/Session.java
+++ b/tools/stress/src/org/apache/cassandra/stress/Session.java
@@ -38,6 +38,7 @@ import org.apache.commons.cli.*;
 
 import org.apache.cassandra.db.ColumnFamilyType;
 import org.apache.cassandra.stress.util.CassandraClient;
+import org.apache.cassandra.transport.SimpleClient;
 import org.apache.cassandra.thrift.*;
 import org.apache.commons.lang.StringUtils;
 
@@ -95,6 +96,7 @@ public class Session implements Serializable
         availableOptions.addOption("l",  "replication-factor",   true,   "Replication Factor
to use when creating needed column families, default:1");
         availableOptions.addOption("L",  "enable-cql",           false,  "Perform queries
using CQL2 (Cassandra Query Language v 2.0.0)");
         availableOptions.addOption("L3", "enable-cql3",          false,  "Perform queries
using CQL3 (Cassandra Query Language v 3.0.0)");
+        availableOptions.addOption("b",  "enable-native-protocol",  false,  "Use the binary
native protocol (only work along with -L3)");
         availableOptions.addOption("P",  "use-prepared-statements", false, "Perform queries
using prepared statements (only applicable to CQL).");
         availableOptions.addOption("e",  "consistency-level",    true,   "Consistency Level
to use (ONE, QUORUM, LOCAL_QUORUM, EACH_QUORUM, ALL, ANY), default:ONE");
         availableOptions.addOption("x",  "create-index",         true,   "Type of index to
create on needed column families (KEYS)");
@@ -138,6 +140,7 @@ public class Session implements Serializable
     private boolean enable_cql    = false;
     private boolean use_prepared  = false;
     private boolean trace         = false;
+    public boolean use_native_protocol = false;
 
     private final String outFileName;
 
@@ -300,6 +303,12 @@ public class Session implements Serializable
                 cqlVersion = "3.0.0";
             }
 
+            if (cmd.hasOption("b"))
+            {
+                if (!(enable_cql && cqlVersion.startsWith("3")))
+                    throw new IllegalArgumentException("Cannot use binary protocol without
-L3");
+                use_native_protocol = true;
+            }
 
             if (cmd.hasOption("P"))
             {
@@ -691,6 +700,7 @@ public class Session implements Serializable
     {
         return getClient(true);
     }
+
     /**
      * Thrift client connection
      * @param setKeyspace - should we set keyspace for client or not
@@ -730,6 +740,22 @@ public class Session implements Serializable
         return client;
     }
 
+    public SimpleClient getNativeClient()
+    {
+        try
+        {
+            String currentNode = nodes[Stress.randomizer.nextInt(nodes.length)];
+            SimpleClient client = new SimpleClient(currentNode, 9042);
+            client.connect(false);
+            client.execute("USE \"Keyspace1\";", org.apache.cassandra.db.ConsistencyLevel.ONE);
+            return client;
+        }
+        catch (Exception e)
+        {
+            throw new RuntimeException(e.getMessage());
+        }
+    }
+
     public static InetAddress getLocalAddress()
     {
         if (localInetAddress == null)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/06699d47/tools/stress/src/org/apache/cassandra/stress/StressAction.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/StressAction.java b/tools/stress/src/org/apache/cassandra/stress/StressAction.java
index 27675d1..60e8cbd 100644
--- a/tools/stress/src/org/apache/cassandra/stress/StressAction.java
+++ b/tools/stress/src/org/apache/cassandra/stress/StressAction.java
@@ -25,6 +25,7 @@ import com.yammer.metrics.stats.Snapshot;
 import org.apache.cassandra.stress.operations.*;
 import org.apache.cassandra.stress.util.CassandraClient;
 import org.apache.cassandra.stress.util.Operation;
+import org.apache.cassandra.transport.SimpleClient;
 
 public class StressAction extends Thread
 {
@@ -218,29 +219,60 @@ public class StressAction extends Thread
 
         public void run()
         {
-            CassandraClient connection = client.getClient();
-
-            for (int i = 0; i < items; i++)
+            if (client.use_native_protocol)
             {
-                if (stop)
-                    break;
+                SimpleClient connection = client.getNativeClient();
 
-                try
+                for (int i = 0; i < items; i++)
                 {
-                    operations.take().run(connection); // running job
+                    if (stop)
+                        break;
+
+                    try
+                    {
+                        operations.take().run(connection); // running job
+                    }
+                    catch (Exception e)
+                    {
+                        if (output == null)
+                        {
+                            System.err.println(e.getMessage());
+                            returnCode = StressAction.FAILURE;
+                            System.exit(-1);
+                        }
+
+                        output.println(e.getMessage());
+                        returnCode = StressAction.FAILURE;
+                        break;
+                    }
                 }
-                catch (Exception e)
+            }
+            else
+            {
+                CassandraClient connection = client.getClient();
+
+                for (int i = 0; i < items; i++)
                 {
-                    if (output == null)
+                    if (stop)
+                        break;
+
+                    try
                     {
-                        System.err.println(e.getMessage());
+                        operations.take().run(connection); // running job
+                    }
+                    catch (Exception e)
+                    {
+                        if (output == null)
+                        {
+                            System.err.println(e.getMessage());
+                            returnCode = StressAction.FAILURE;
+                            System.exit(-1);
+                        }
+
+                        output.println(e.getMessage());
                         returnCode = StressAction.FAILURE;
-                        System.exit(-1);
+                        break;
                     }
-
-                    output.println(e.getMessage());
-                    returnCode = StressAction.FAILURE;
-                    break;
                 }
             }
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/06699d47/tools/stress/src/org/apache/cassandra/stress/operations/CQLOperation.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/CQLOperation.java b/tools/stress/src/org/apache/cassandra/stress/operations/CQLOperation.java
new file mode 100644
index 0000000..54737a4
--- /dev/null
+++ b/tools/stress/src/org/apache/cassandra/stress/operations/CQLOperation.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cassandra.stress.operations;
+
+import java.nio.ByteBuffer;
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.cassandra.stress.Session;
+import org.apache.cassandra.stress.util.CassandraClient;
+import org.apache.cassandra.stress.util.Operation;
+import org.apache.cassandra.transport.SimpleClient;
+import org.apache.cassandra.transport.messages.ResultMessage;
+import org.apache.cassandra.thrift.Compression;
+import org.apache.cassandra.thrift.CqlResult;
+import org.apache.cassandra.thrift.ThriftConversion;
+
+public abstract class CQLOperation extends Operation
+{
+    public CQLOperation(Session client, int idx)
+    {
+        super(client, idx);
+    }
+
+    protected abstract void run(CQLQueryExecutor executor) throws IOException;
+
+    protected abstract boolean validateThriftResult(CqlResult result);
+
+    protected abstract boolean validateNativeResult(ResultMessage result);
+
+    public void run(final CassandraClient client) throws IOException
+    {
+        run(new CQLQueryExecutor()
+        {
+            public boolean execute(String cqlQuery, List<String> queryParams) throws
Exception
+            {
+                CqlResult result = null;
+                if (session.usePreparedStatements())
+                {
+                    Integer stmntId = getPreparedStatement(client, cqlQuery);
+                    if (session.cqlVersion.startsWith("3"))
+                        result = client.execute_prepared_cql3_query(stmntId, queryParamsAsByteBuffer(queryParams),
session.getConsistencyLevel());
+                    else
+                        result = client.execute_prepared_cql_query(stmntId, queryParamsAsByteBuffer(queryParams));
+                }
+                else
+                {
+                    String formattedQuery = formatCqlQuery(cqlQuery, queryParams);
+                    if (session.cqlVersion.startsWith("3"))
+                        result = client.execute_cql3_query(ByteBuffer.wrap(formattedQuery.getBytes()),
Compression.NONE, session.getConsistencyLevel());
+                    else
+                        result = client.execute_cql_query(ByteBuffer.wrap(formattedQuery.getBytes()),
Compression.NONE);
+                }
+                return validateThriftResult(result);
+            }
+        });
+    }
+
+    public void run(final SimpleClient client) throws IOException
+    {
+        run(new CQLQueryExecutor()
+        {
+            public boolean execute(String cqlQuery, List<String> queryParams) throws
Exception
+            {
+                ResultMessage result = null;
+                if (session.usePreparedStatements())
+                {
+                    byte[] stmntId = getPreparedStatement(client, cqlQuery);
+                    result = client.executePrepared(stmntId, queryParamsAsByteBuffer(queryParams),
ThriftConversion.fromThrift(session.getConsistencyLevel()));
+                }
+                else
+                {
+                    String formattedQuery = formatCqlQuery(cqlQuery, queryParams);
+                    result = client.execute(formattedQuery, ThriftConversion.fromThrift(session.getConsistencyLevel()));
+                }
+                return validateNativeResult(result);
+            }
+        });
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/06699d47/tools/stress/src/org/apache/cassandra/stress/operations/CqlCounterAdder.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/CqlCounterAdder.java
b/tools/stress/src/org/apache/cassandra/stress/operations/CqlCounterAdder.java
index b0633ea..31e8371 100644
--- a/tools/stress/src/org/apache/cassandra/stress/operations/CqlCounterAdder.java
+++ b/tools/stress/src/org/apache/cassandra/stress/operations/CqlCounterAdder.java
@@ -24,16 +24,19 @@ package org.apache.cassandra.stress.operations;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.Collections;
+import java.util.List;
 
 import com.yammer.metrics.core.TimerContext;
 import org.apache.cassandra.db.ColumnFamilyType;
 import org.apache.cassandra.stress.Session;
 import org.apache.cassandra.stress.util.CassandraClient;
 import org.apache.cassandra.stress.util.Operation;
+import org.apache.cassandra.transport.messages.ResultMessage;
 import org.apache.cassandra.thrift.Compression;
+import org.apache.cassandra.thrift.CqlResult;
 import org.apache.cassandra.utils.ByteBufferUtil;
 
-public class CqlCounterAdder extends Operation
+public class CqlCounterAdder extends CQLOperation
 {
     private static String cqlQuery = null;
 
@@ -42,7 +45,7 @@ public class CqlCounterAdder extends Operation
         super(client, idx);
     }
 
-    public void run(CassandraClient client) throws IOException
+    protected void run(CQLQueryExecutor executor) throws IOException
     {
         if (session.getColumnFamilyType() == ColumnFamilyType.Super)
             throw new RuntimeException("Super columns are not implemented for CQL");
@@ -70,7 +73,7 @@ public class CqlCounterAdder extends Operation
         }
 
         String key = String.format("%0" + session.getTotalKeysLength() + "d", index);
-        String formattedQuery = null;
+        List<String> queryParams = Collections.singletonList(getUnQuotedCqlBlob(key,
session.cqlVersion.startsWith("3")));
 
         TimerContext context = session.latency.time();
 
@@ -84,25 +87,7 @@ public class CqlCounterAdder extends Operation
 
             try
             {
-                if (session.usePreparedStatements())
-                {
-                    Integer stmntId = getPreparedStatement(client, cqlQuery);
-                    if (session.cqlVersion.startsWith("3"))
-                        client.execute_prepared_cql3_query(stmntId, Collections.singletonList(ByteBuffer.wrap(key.getBytes())),
session.getConsistencyLevel());
-                    else
-                        client.execute_prepared_cql_query(stmntId, Collections.singletonList(ByteBuffer.wrap(key.getBytes())));
-                }
-                else
-                {
-                    if (formattedQuery == null)
-                        formattedQuery = formatCqlQuery(cqlQuery, Collections.singletonList(getUnQuotedCqlBlob(key,
session.cqlVersion.startsWith("3"))));
-                    if (session.cqlVersion.startsWith("3"))
-                        client.execute_cql3_query(ByteBuffer.wrap(formattedQuery.getBytes()),
Compression.NONE, session.getConsistencyLevel());
-                    else
-                        client.execute_cql_query(ByteBuffer.wrap(formattedQuery.getBytes()),
Compression.NONE);
-                }
-
-                success = true;
+                success = executor.execute(cqlQuery, queryParams);
             }
             catch (Exception e)
             {
@@ -124,4 +109,14 @@ public class CqlCounterAdder extends Operation
         session.keys.getAndIncrement();
         context.stop();
     }
+
+    protected boolean validateThriftResult(CqlResult result)
+    {
+        return true;
+    }
+
+    protected boolean validateNativeResult(ResultMessage result)
+    {
+        return true;
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/06699d47/tools/stress/src/org/apache/cassandra/stress/operations/CqlCounterGetter.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/CqlCounterGetter.java
b/tools/stress/src/org/apache/cassandra/stress/operations/CqlCounterGetter.java
index 7feee5b..a4d037a 100644
--- a/tools/stress/src/org/apache/cassandra/stress/operations/CqlCounterGetter.java
+++ b/tools/stress/src/org/apache/cassandra/stress/operations/CqlCounterGetter.java
@@ -24,18 +24,20 @@ package org.apache.cassandra.stress.operations;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.Collections;
+import java.util.List;
 
 import com.yammer.metrics.core.TimerContext;
 import org.apache.cassandra.db.ColumnFamilyType;
 import org.apache.cassandra.stress.Session;
 import org.apache.cassandra.stress.util.CassandraClient;
 import org.apache.cassandra.stress.util.Operation;
+import org.apache.cassandra.transport.messages.ResultMessage;
 import org.apache.cassandra.thrift.Compression;
 import org.apache.cassandra.thrift.CqlResult;
 import org.apache.cassandra.thrift.CqlResultType;
 import org.apache.cassandra.utils.ByteBufferUtil;
 
-public class CqlCounterGetter extends Operation
+public class CqlCounterGetter extends CQLOperation
 {
     private static String cqlQuery = null;
 
@@ -44,7 +46,7 @@ public class CqlCounterGetter extends Operation
         super(client, idx);
     }
 
-    public void run(CassandraClient client) throws IOException
+    protected void run(CQLQueryExecutor executor) throws IOException
     {
         if (session.getColumnFamilyType() == ColumnFamilyType.Super)
             throw new RuntimeException("Super columns are not implemented for CQL");
@@ -69,7 +71,7 @@ public class CqlCounterGetter extends Operation
         }
 
         byte[] key = generateKey();
-        String formattedQuery = null;
+        List<String> queryParams = Collections.singletonList(getUnQuotedCqlBlob(key,
session.cqlVersion.startsWith("3")));
 
         TimerContext context = session.latency.time();
 
@@ -83,30 +85,7 @@ public class CqlCounterGetter extends Operation
 
             try
             {
-                CqlResult result = null;
-
-                if (session.usePreparedStatements())
-                {
-                    Integer stmntId = getPreparedStatement(client, cqlQuery);
-                    if (session.cqlVersion.startsWith("3"))
-                        result = client.execute_prepared_cql3_query(stmntId, Collections.singletonList(ByteBuffer.wrap(key)),
session.getConsistencyLevel());
-                    else
-                        result = client.execute_prepared_cql_query(stmntId, Collections.singletonList(ByteBuffer.wrap(key)));
-                }
-                else
-                {
-                    if (formattedQuery == null)
-                        formattedQuery = formatCqlQuery(cqlQuery, Collections.singletonList(getUnQuotedCqlBlob(key,
session.cqlVersion.startsWith("3"))));
-
-                    if (session.cqlVersion.startsWith("3"))
-                        result = client.execute_cql3_query(ByteBuffer.wrap(formattedQuery.getBytes()),
Compression.NONE, session.getConsistencyLevel());
-                    else
-                        result = client.execute_cql_query(ByteBuffer.wrap(formattedQuery.getBytes()),
Compression.NONE);
-                }
-
-                assert result.type.equals(CqlResultType.ROWS) : "expected ROWS result type";
-                assert result.rows.size() == 0 : "expected exactly one row";
-                success = (result.rows.get(0).columns.size() != 0);
+                success = executor.execute(cqlQuery, queryParams);
             }
             catch (Exception e)
             {
@@ -128,4 +107,14 @@ public class CqlCounterGetter extends Operation
         session.keys.getAndIncrement();
         context.stop();
     }
+
+    protected boolean validateThriftResult(CqlResult result)
+    {
+        return result.rows.get(0).columns.size() != 0;
+    }
+
+    protected boolean validateNativeResult(ResultMessage result)
+    {
+        return result instanceof ResultMessage.Rows && ((ResultMessage.Rows)result).result.size()
!= 0;
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/06699d47/tools/stress/src/org/apache/cassandra/stress/operations/CqlIndexedRangeSlicer.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/CqlIndexedRangeSlicer.java
b/tools/stress/src/org/apache/cassandra/stress/operations/CqlIndexedRangeSlicer.java
index b1fa85e..bf416cc 100644
--- a/tools/stress/src/org/apache/cassandra/stress/operations/CqlIndexedRangeSlicer.java
+++ b/tools/stress/src/org/apache/cassandra/stress/operations/CqlIndexedRangeSlicer.java
@@ -27,26 +27,31 @@ import java.util.Collections;
 import java.util.List;
 
 import com.yammer.metrics.core.TimerContext;
+import org.apache.cassandra.cql3.ResultSet;
 import org.apache.cassandra.db.ColumnFamilyType;
 import org.apache.cassandra.stress.Session;
 import org.apache.cassandra.stress.util.CassandraClient;
 import org.apache.cassandra.stress.util.Operation;
+import org.apache.cassandra.transport.messages.ResultMessage;
 import org.apache.cassandra.thrift.Compression;
 import org.apache.cassandra.thrift.CqlResult;
 import org.apache.cassandra.thrift.CqlRow;
 import org.apache.cassandra.utils.ByteBufferUtil;
 
-public class CqlIndexedRangeSlicer extends Operation
+public class CqlIndexedRangeSlicer extends CQLOperation
 {
     private static List<ByteBuffer> values = null;
     private static String cqlQuery = null;
 
+    private int lastQueryResultSize;
+    private int lastMaxKey;
+
     public CqlIndexedRangeSlicer(Session client, int idx)
     {
         super(client, idx);
     }
 
-    public void run(CassandraClient client) throws IOException
+    protected void run(CQLQueryExecutor executor) throws IOException
     {
         if (session.getColumnFamilyType() == ColumnFamilyType.Super)
             throw new RuntimeException("Super columns are not implemented for CQL");
@@ -56,8 +61,14 @@ public class CqlIndexedRangeSlicer extends Operation
 
         if (cqlQuery == null)
         {
-            StringBuilder query = new StringBuilder("SELECT FIRST ").append(session.getColumnsPerKey())
-                 .append(" ''..'' FROM Standard1");
+            StringBuilder query = new StringBuilder("SELECT ");
+
+            if (session.cqlVersion.startsWith("2"))
+                query.append(session.getColumnsPerKey()).append(" ''..''");
+            else
+                query.append("*");
+
+            query.append(" FROM Standard1");
 
             if (session.cqlVersion.startsWith("2"))
                 query.append(" USING CONSISTENCY ").append(session.getConsistencyLevel());
@@ -79,7 +90,6 @@ public class CqlIndexedRangeSlicer extends Operation
 
             boolean success = false;
             String exceptionMessage = null;
-            CqlResult results = null;
             String formattedQuery = null;
             List<String> queryParms = Collections.singletonList(getUnQuotedCqlBlob(startOffset,
session.cqlVersion.startsWith("3")));
 
@@ -90,25 +100,7 @@ public class CqlIndexedRangeSlicer extends Operation
 
                 try
                 {
-                    if (session.usePreparedStatements())
-                    {
-                        Integer stmntId = getPreparedStatement(client, cqlQuery);
-                        if (session.cqlVersion.startsWith("3"))
-                            results = client.execute_prepared_cql3_query(stmntId, queryParamsAsByteBuffer(queryParms),
session.getConsistencyLevel());
-                        else
-                            results = client.execute_prepared_cql_query(stmntId, queryParamsAsByteBuffer(queryParms));
-                    }
-                    else
-                    {
-                        if (formattedQuery ==  null)
-                            formattedQuery = formatCqlQuery(cqlQuery, queryParms);
-                        if (session.cqlVersion.startsWith("3"))
-                            results = client.execute_cql3_query(ByteBuffer.wrap(formattedQuery.getBytes()),
Compression.NONE, session.getConsistencyLevel());
-                        else
-                            results = client.execute_cql_query(ByteBuffer.wrap(formattedQuery.getBytes()),
Compression.NONE);
-                    }
-
-                    success = (results.rows.size() != 0);
+                    success = executor.execute(cqlQuery, queryParms);
                 }
                 catch (Exception e)
                 {
@@ -126,13 +118,13 @@ public class CqlIndexedRangeSlicer extends Operation
                                     (exceptionMessage == null) ? "" : "(" + exceptionMessage
+ ")"));
             }
 
-            received += results.rows.size();
+            received += lastQueryResultSize;
 
             // convert max key found back to an integer, and increment it
-            startOffset = String.format(format, (1 + getMaxKey(results.rows)));
+            startOffset = String.format(format, (1 + lastMaxKey));
 
             session.operations.getAndIncrement();
-            session.keys.getAndAdd(results.rows.size());
+            session.keys.getAndAdd(lastQueryResultSize);
             context.stop();
         }
     }
@@ -155,4 +147,33 @@ public class CqlIndexedRangeSlicer extends Operation
 
         return maxKey;
     }
+
+    private int getMaxKey(ResultSet rs)
+    {
+        int maxKey = ByteBufferUtil.toInt(rs.rows.get(0).get(0));
+
+        for (List<ByteBuffer> row : rs.rows)
+        {
+            int currentKey = ByteBufferUtil.toInt(row.get(0));
+            if (currentKey > maxKey)
+                maxKey = currentKey;
+        }
+
+        return maxKey;
+    }
+
+    protected boolean validateThriftResult(CqlResult result)
+    {
+        lastQueryResultSize = result.rows.size();
+        lastMaxKey = getMaxKey(result.rows);
+        return lastQueryResultSize != 0;
+    }
+
+    protected boolean validateNativeResult(ResultMessage result)
+    {
+        assert result instanceof ResultMessage.Rows;
+        lastQueryResultSize = ((ResultMessage.Rows)result).result.size();
+        lastMaxKey = getMaxKey(((ResultMessage.Rows)result).result);
+        return lastQueryResultSize != 0;
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/06699d47/tools/stress/src/org/apache/cassandra/stress/operations/CqlInserter.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/CqlInserter.java b/tools/stress/src/org/apache/cassandra/stress/operations/CqlInserter.java
index ed03f1f..3572c36 100644
--- a/tools/stress/src/org/apache/cassandra/stress/operations/CqlInserter.java
+++ b/tools/stress/src/org/apache/cassandra/stress/operations/CqlInserter.java
@@ -31,10 +31,13 @@ import org.apache.cassandra.db.ColumnFamilyType;
 import org.apache.cassandra.stress.Session;
 import org.apache.cassandra.stress.util.CassandraClient;
 import org.apache.cassandra.stress.util.Operation;
+import org.apache.cassandra.transport.SimpleClient;
+import org.apache.cassandra.transport.messages.ResultMessage;
 import org.apache.cassandra.thrift.Compression;
+import org.apache.cassandra.thrift.CqlResult;
 import org.apache.cassandra.utils.UUIDGen;
 
-public class CqlInserter extends Operation
+public class CqlInserter extends CQLOperation
 {
     private static List<ByteBuffer> values;
     private static String cqlQuery = null;
@@ -44,7 +47,7 @@ public class CqlInserter extends Operation
         super(client, idx);
     }
 
-    public void run(CassandraClient client) throws IOException
+    protected void run(CQLQueryExecutor executor) throws IOException
     {
         if (session.getColumnFamilyType() == ColumnFamilyType.Super)
             throw new RuntimeException("Super columns are not implemented for CQL");
@@ -95,8 +98,6 @@ public class CqlInserter extends Operation
         String key = String.format("%0" + session.getTotalKeysLength() + "d", index);
         queryParms.add(getUnQuotedCqlBlob(key, session.cqlVersion.startsWith("3")));
 
-        String formattedQuery = null;
-
         TimerContext context = session.latency.time();
 
         boolean success = false;
@@ -109,25 +110,7 @@ public class CqlInserter extends Operation
 
             try
             {
-                if (session.usePreparedStatements())
-                {
-                    Integer stmntId = getPreparedStatement(client, cqlQuery);
-                    if (session.cqlVersion.startsWith("3"))
-                        client.execute_prepared_cql3_query(stmntId, queryParamsAsByteBuffer(queryParms),
session.getConsistencyLevel());
-                    else
-                        client.execute_prepared_cql_query(stmntId, queryParamsAsByteBuffer(queryParms));
-                }
-                else
-                {
-                    if (formattedQuery == null)
-                        formattedQuery = formatCqlQuery(cqlQuery, queryParms);
-                    if (session.cqlVersion.startsWith("3"))
-                        client.execute_cql3_query(ByteBuffer.wrap(formattedQuery.getBytes()),
Compression.NONE, session.getConsistencyLevel());
-                    else
-                        client.execute_cql_query(ByteBuffer.wrap(formattedQuery.getBytes()),
Compression.NONE);
-                }
-
-                success = true;
+                success = executor.execute(cqlQuery, queryParms);
             }
             catch (Exception e)
             {
@@ -150,4 +133,14 @@ public class CqlInserter extends Operation
         session.keys.getAndIncrement();
         context.stop();
     }
+
+    protected boolean validateThriftResult(CqlResult result)
+    {
+        return true;
+    }
+
+    protected boolean validateNativeResult(ResultMessage result)
+    {
+        return true;
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/06699d47/tools/stress/src/org/apache/cassandra/stress/operations/CqlMultiGetter.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/CqlMultiGetter.java b/tools/stress/src/org/apache/cassandra/stress/operations/CqlMultiGetter.java
index e9b1f47..ec645d4 100644
--- a/tools/stress/src/org/apache/cassandra/stress/operations/CqlMultiGetter.java
+++ b/tools/stress/src/org/apache/cassandra/stress/operations/CqlMultiGetter.java
@@ -26,6 +26,7 @@ import java.io.IOException;
 import org.apache.cassandra.stress.Session;
 import org.apache.cassandra.stress.util.CassandraClient;
 import org.apache.cassandra.stress.util.Operation;
+import org.apache.cassandra.transport.SimpleClient;
 
 public class CqlMultiGetter extends Operation
 {
@@ -38,4 +39,9 @@ public class CqlMultiGetter extends Operation
     {
         throw new RuntimeException("Multiget is not implemented for CQL");
     }
+
+    public void run(SimpleClient client) throws IOException
+    {
+        throw new RuntimeException("Multiget is not implemented for CQL");
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/06699d47/tools/stress/src/org/apache/cassandra/stress/operations/CqlRangeSlicer.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/CqlRangeSlicer.java b/tools/stress/src/org/apache/cassandra/stress/operations/CqlRangeSlicer.java
index acf0602..c01767b 100644
--- a/tools/stress/src/org/apache/cassandra/stress/operations/CqlRangeSlicer.java
+++ b/tools/stress/src/org/apache/cassandra/stress/operations/CqlRangeSlicer.java
@@ -24,6 +24,7 @@ package org.apache.cassandra.stress.operations;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.Collections;
+import java.util.List;
 
 import com.yammer.metrics.core.TimerContext;
 import org.apache.cassandra.utils.ByteBufferUtil;
@@ -32,19 +33,22 @@ import org.apache.cassandra.db.ColumnFamilyType;
 import org.apache.cassandra.stress.Session;
 import org.apache.cassandra.stress.util.CassandraClient;
 import org.apache.cassandra.stress.util.Operation;
+import org.apache.cassandra.transport.messages.ResultMessage;
 import org.apache.cassandra.thrift.Compression;
 import org.apache.cassandra.thrift.CqlResult;
+import org.apache.cassandra.transport.SimpleClient;
 
-public class CqlRangeSlicer extends Operation
+public class CqlRangeSlicer extends CQLOperation
 {
     private static String cqlQuery = null;
+    private int lastRowCount;
 
     public CqlRangeSlicer(Session client, int idx)
     {
         super(client, idx);
     }
 
-    public void run(CassandraClient client) throws IOException
+    protected void run(CQLQueryExecutor executor) throws IOException
     {
         if (session.getColumnFamilyType() == ColumnFamilyType.Super)
             throw new RuntimeException("Super columns are not implemented for CQL");
@@ -61,13 +65,12 @@ public class CqlRangeSlicer extends Operation
         }
 
         String key = String.format("%0" +  session.getTotalKeysLength() + "d", index);
-        String formattedQuery = null;
+        List<String> queryParams = Collections.singletonList(getUnQuotedCqlBlob(key,
session.cqlVersion.startsWith("3")));
 
         TimerContext context = session.latency.time();
 
         boolean success = false;
         String exceptionMessage = null;
-        int rowCount = 0;
 
         for (int t = 0; t < session.getRetryTimes(); t++)
         {
@@ -76,28 +79,7 @@ public class CqlRangeSlicer extends Operation
 
             try
             {
-                CqlResult result = null;
-
-                if (session.usePreparedStatements())
-                {
-                    Integer stmntId = getPreparedStatement(client, cqlQuery);
-                    if (session.cqlVersion.startsWith("3"))
-                        result = client.execute_prepared_cql3_query(stmntId, Collections.singletonList(ByteBuffer.wrap(key.getBytes())),
session.getConsistencyLevel());
-                    else
-                        result = client.execute_prepared_cql_query(stmntId, Collections.singletonList(ByteBuffer.wrap(key.getBytes())));
-                }
-                else
-                {
-                    if (formattedQuery == null)
-                        formattedQuery = formatCqlQuery(cqlQuery, Collections.singletonList(getUnQuotedCqlBlob(key,
session.cqlVersion.startsWith("3"))));
-                    if (session.cqlVersion.startsWith("3"))
-                        result = client.execute_cql3_query(ByteBuffer.wrap(formattedQuery.getBytes()),
Compression.NONE, session.getConsistencyLevel());
-                    else
-                        result = client.execute_cql_query(ByteBuffer.wrap(formattedQuery.getBytes()),
Compression.NONE);
-                }
-
-                rowCount = result.rows.size();
-                success = (rowCount != 0);
+                success = executor.execute(cqlQuery, queryParams);
             }
             catch (Exception e)
             {
@@ -117,7 +99,20 @@ public class CqlRangeSlicer extends Operation
         }
 
         session.operations.getAndIncrement();
-        session.keys.getAndAdd(rowCount);
+        session.keys.getAndAdd(lastRowCount);
         context.stop();
     }
+
+    protected boolean validateThriftResult(CqlResult result)
+    {
+        lastRowCount = result.rows.size();
+        return  lastRowCount != 0;
+    }
+
+    protected boolean validateNativeResult(ResultMessage result)
+    {
+        assert result instanceof ResultMessage.Rows;
+        lastRowCount = ((ResultMessage.Rows)result).result.size();
+        return lastRowCount != 0;
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/06699d47/tools/stress/src/org/apache/cassandra/stress/operations/CqlReader.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/CqlReader.java b/tools/stress/src/org/apache/cassandra/stress/operations/CqlReader.java
index 58d77dd..70273c1 100644
--- a/tools/stress/src/org/apache/cassandra/stress/operations/CqlReader.java
+++ b/tools/stress/src/org/apache/cassandra/stress/operations/CqlReader.java
@@ -31,10 +31,13 @@ import org.apache.cassandra.db.ColumnFamilyType;
 import org.apache.cassandra.stress.Session;
 import org.apache.cassandra.stress.util.CassandraClient;
 import org.apache.cassandra.stress.util.Operation;
+import org.apache.cassandra.transport.SimpleClient;
+import org.apache.cassandra.transport.messages.ResultMessage;
 import org.apache.cassandra.thrift.Compression;
 import org.apache.cassandra.thrift.CqlResult;
+import org.apache.cassandra.thrift.ThriftConversion;
 
-public class CqlReader extends Operation
+public class CqlReader extends CQLOperation
 {
     private static String cqlQuery = null;
 
@@ -43,7 +46,7 @@ public class CqlReader extends Operation
         super(client, idx);
     }
 
-    public void run(CassandraClient client) throws IOException
+    protected void run(CQLQueryExecutor executor) throws IOException
     {
         if (session.getColumnFamilyType() == ColumnFamilyType.Super)
             throw new RuntimeException("Super columns are not implemented for CQL");
@@ -85,8 +88,6 @@ public class CqlReader extends Operation
         byte[] key = generateKey();
         queryParams.add(getUnQuotedCqlBlob(key, session.cqlVersion.startsWith("3")));
 
-        String formattedQuery = null;
-
         TimerContext context = session.latency.time();
 
         boolean success = false;
@@ -99,31 +100,10 @@ public class CqlReader extends Operation
 
             try
             {
-                CqlResult result = null;
-
-                if (session.usePreparedStatements())
-                {
-                    Integer stmntId = getPreparedStatement(client, cqlQuery);
-                    if (session.cqlVersion.startsWith("3"))
-                        result = client.execute_prepared_cql3_query(stmntId, queryParamsAsByteBuffer(queryParams),
session.getConsistencyLevel());
-                    else
-                        result = client.execute_prepared_cql_query(stmntId, queryParamsAsByteBuffer(queryParams));
-                }
-                else
-                {
-                    if (formattedQuery == null)
-                        formattedQuery = formatCqlQuery(cqlQuery, queryParams);
-                    if (session.cqlVersion.startsWith("3"))
-                        result = client.execute_cql3_query(ByteBuffer.wrap(formattedQuery.getBytes()),
Compression.NONE, session.getConsistencyLevel());
-                    else
-                        result = client.execute_cql_query(ByteBuffer.wrap(formattedQuery.getBytes()),
Compression.NONE);
-                }
-
-                success = (result.rows.get(0).columns.size() != 0);
+                success = executor.execute(cqlQuery, queryParams);
             }
             catch (Exception e)
             {
-
                 exceptionMessage = getExceptionMessage(e);
                 success = false;
             }
@@ -143,4 +123,14 @@ public class CqlReader extends Operation
         session.keys.getAndIncrement();
         context.stop();
     }
+
+    protected boolean validateThriftResult(CqlResult result)
+    {
+        return result.rows.get(0).columns.size() != 0;
+    }
+
+    protected boolean validateNativeResult(ResultMessage result)
+    {
+        return result instanceof ResultMessage.Rows && ((ResultMessage.Rows)result).result.size()
!= 0;
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/06699d47/tools/stress/src/org/apache/cassandra/stress/util/Operation.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/util/Operation.java b/tools/stress/src/org/apache/cassandra/stress/util/Operation.java
index d1cfc22..f7924da 100644
--- a/tools/stress/src/org/apache/cassandra/stress/util/Operation.java
+++ b/tools/stress/src/org/apache/cassandra/stress/util/Operation.java
@@ -26,6 +26,8 @@ import java.security.MessageDigest;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Random;
+import java.util.Map;
+import java.util.HashMap;
 
 import com.google.common.base.Function;
 import com.google.common.collect.Lists;
@@ -33,6 +35,7 @@ import com.google.common.collect.Lists;
 import org.apache.cassandra.db.marshal.TimeUUIDType;
 import org.apache.cassandra.stress.Session;
 import org.apache.cassandra.stress.Stress;
+import org.apache.cassandra.transport.SimpleClient;
 import org.apache.cassandra.thrift.Compression;
 import org.apache.cassandra.thrift.CqlPreparedResult;
 import org.apache.cassandra.thrift.InvalidRequestException;
@@ -67,6 +70,8 @@ public abstract class Operation
      */
     public abstract void run(CassandraClient client) throws IOException;
 
+    public void run(SimpleClient client) throws IOException {}
+
     // Utility methods
 
     protected List<ByteBuffer> generateValues()
@@ -287,12 +292,14 @@ public abstract class Operation
         return result.toString();
     }
 
-    protected static Integer getPreparedStatement(CassandraClient client, String cqlQuery)
throws Exception
+    protected Integer getPreparedStatement(CassandraClient client, String cqlQuery) throws
Exception
     {
         Integer statementId = client.preparedStatements.get(cqlQuery.hashCode());
         if (statementId == null)
         {
-            CqlPreparedResult response = client.prepare_cql_query(ByteBufferUtil.bytes(cqlQuery),
Compression.NONE);
+            CqlPreparedResult response = session.cqlVersion.startsWith("3")
+                                       ? client.prepare_cql3_query(ByteBufferUtil.bytes(cqlQuery),
Compression.NONE)
+                                       : client.prepare_cql_query(ByteBufferUtil.bytes(cqlQuery),
Compression.NONE);
             statementId = response.itemId;
             client.preparedStatements.put(cqlQuery.hashCode(), statementId);
         }
@@ -300,10 +307,28 @@ public abstract class Operation
         return statementId;
     }
 
+    private static final Map<Integer, byte[]> preparedStatementsNative = new HashMap<Integer,
byte[]>();
+
+    protected static byte[] getPreparedStatement(SimpleClient client, String cqlQuery) throws
Exception
+    {
+        byte[] statementId = preparedStatementsNative.get(cqlQuery.hashCode());
+        if (statementId == null)
+        {
+            statementId = client.prepare(cqlQuery).statementId.bytes;
+            preparedStatementsNative.put(cqlQuery.hashCode(), statementId);
+        }
+        return statementId;
+    }
+
     protected String wrapInQuotesIfRequired(String string)
     {
         return session.cqlVersion.startsWith("3")
                 ? "\"" + string + "\""
                 : string;
     }
+
+    public interface CQLQueryExecutor
+    {
+        public boolean execute(String query, List<String> queryParameters) throws Exception;
+    }
 }


Mime
View raw message