cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From slebre...@apache.org
Subject [3/3] git commit: Use prepared statement internally
Date Thu, 22 May 2014 12:46:11 GMT
Use prepared statement internally

patch by slebresne; reviewed by mishail for CASSANDRA-6975


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/1147ee3a
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/1147ee3a
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/1147ee3a

Branch: refs/heads/cassandra-2.1
Commit: 1147ee3a81e483b26b4b8c5d7cc7e55fcc2baeec
Parents: c3ec8fa
Author: Sylvain Lebresne <sylvain@datastax.com>
Authored: Wed May 14 14:25:29 2014 +0200
Committer: Sylvain Lebresne <sylvain@datastax.com>
Committed: Thu May 22 14:46:01 2014 +0200

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../org/apache/cassandra/cql3/CQLStatement.java |   2 +-
 .../org/apache/cassandra/cql3/QueryOptions.java |   5 +
 .../apache/cassandra/cql3/QueryProcessor.java   | 103 +++++++-
 .../apache/cassandra/cql3/UntypedResultSet.java |  58 ++++-
 .../statements/AuthenticationStatement.java     |   2 +-
 .../cql3/statements/AuthorizationStatement.java |   2 +-
 .../cql3/statements/BatchStatement.java         |   4 +-
 .../cql3/statements/ModificationStatement.java  |   4 +-
 .../statements/SchemaAlteringStatement.java     |   2 +-
 .../cql3/statements/SelectStatement.java        |  48 ++--
 .../cql3/statements/TruncateStatement.java      |   2 +-
 .../cassandra/cql3/statements/UseStatement.java |   2 +-
 .../apache/cassandra/db/BatchlogManager.java    |  27 +-
 .../org/apache/cassandra/db/SystemKeyspace.java | 245 ++++++++-----------
 .../ScheduledRangeTransferExecutorService.java  |   8 +-
 .../cassandra/service/StorageService.java       |  43 ++--
 .../cassandra/db/BatchlogManagerTest.java       |   8 +-
 .../apache/cassandra/db/HintedHandOffTest.java  |   6 +-
 .../unit/org/apache/cassandra/db/ScrubTest.java |  10 +-
 .../db/compaction/CompactionsPurgeTest.java     |  20 +-
 .../io/sstable/CQLSSTableWriterTest.java        |   2 +-
 .../service/LeaveAndBootstrapTest.java          |   4 +-
 .../cassandra/service/QueryPagerTest.java       |   4 +-
 .../apache/cassandra/triggers/TriggersTest.java |   4 +-
 25 files changed, 368 insertions(+), 248 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/1147ee3a/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 6b08fad..55fc400 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -21,6 +21,7 @@
  * New serialization format for UDT values (CASSANDRA-7209, CASSANDRA-7261)
  * Fix nodetool netstats (CASSANDRA-7270)
  * Fix potential ClassCastException in HintedHandoffManager (CASSANDRA-7284)
+ * Use prepared statements internally (CASSANDRA-6975)
 Merged from 2.0:
  * Always reallocate buffers in HSHA (CASSANDRA-6285)
  * (Hadoop) support authentication in CqlRecordReader (CASSANDRA-7221)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1147ee3a/src/java/org/apache/cassandra/cql3/CQLStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/CQLStatement.java b/src/java/org/apache/cassandra/cql3/CQLStatement.java
index 81cd2b2..a1642ef 100644
--- a/src/java/org/apache/cassandra/cql3/CQLStatement.java
+++ b/src/java/org/apache/cassandra/cql3/CQLStatement.java
@@ -57,5 +57,5 @@ public interface CQLStatement
      *
      * @param state the current query state
      */
-    public ResultMessage executeInternal(QueryState state) throws RequestValidationException, RequestExecutionException;
+    public ResultMessage executeInternal(QueryState state, QueryOptions options) throws RequestValidationException, RequestExecutionException;
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1147ee3a/src/java/org/apache/cassandra/cql3/QueryOptions.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/QueryOptions.java b/src/java/org/apache/cassandra/cql3/QueryOptions.java
index 9c28762..369dce4 100644
--- a/src/java/org/apache/cassandra/cql3/QueryOptions.java
+++ b/src/java/org/apache/cassandra/cql3/QueryOptions.java
@@ -61,6 +61,11 @@ public abstract class QueryOptions
         return new DefaultQueryOptions(consistency, values, false, SpecificOptions.DEFAULT, 3);
     }
 
+    public static QueryOptions forInternalCalls(List<ByteBuffer> values)
+    {
+        return new DefaultQueryOptions(ConsistencyLevel.ONE, values, false, SpecificOptions.DEFAULT, 3);
+    }
+
     public static QueryOptions fromPreV3Batch(ConsistencyLevel consistency)
     {
         return new DefaultQueryOptions(consistency, Collections.<ByteBuffer>emptyList(), false, SpecificOptions.DEFAULT, 2);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1147ee3a/src/java/org/apache/cassandra/cql3/QueryProcessor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/QueryProcessor.java b/src/java/org/apache/cassandra/cql3/QueryProcessor.java
index 40c45af..fca9c42 100644
--- a/src/java/org/apache/cassandra/cql3/QueryProcessor.java
+++ b/src/java/org/apache/cassandra/cql3/QueryProcessor.java
@@ -19,6 +19,8 @@ package org.apache.cassandra.cql3;
 
 import java.nio.ByteBuffer;
 import java.util.*;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ConcurrentHashMap;
 
 import com.google.common.primitives.Ints;
 
@@ -32,9 +34,12 @@ import org.slf4j.LoggerFactory;
 import org.apache.cassandra.cql3.statements.*;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.composites.*;
+import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.exceptions.*;
 import org.apache.cassandra.service.ClientState;
 import org.apache.cassandra.service.QueryState;
+import org.apache.cassandra.service.pager.QueryPager;
+import org.apache.cassandra.service.pager.QueryPagers;
 import org.apache.cassandra.thrift.ThriftClientState;
 import org.apache.cassandra.tracing.Tracing;
 import org.apache.cassandra.transport.messages.ResultMessage;
@@ -74,6 +79,11 @@ public class QueryProcessor implements QueryHandler
     private static final ConcurrentLinkedHashMap<MD5Digest, ParsedStatement.Prepared> preparedStatements;
     private static final ConcurrentLinkedHashMap<Integer, CQLStatement> thriftPreparedStatements;
 
+    // A map for prepared statements used internally (which we don't want to mix with user statement, in particular we don't
+    // bother with expiration on those.
+    private static final ConcurrentMap<String, ParsedStatement.Prepared> internalStatements = new ConcurrentHashMap<>();
+    private static final QueryState internalQueryState;
+
     static
     {
         preparedStatements = new ConcurrentLinkedHashMap.Builder<MD5Digest, ParsedStatement.Prepared>()
@@ -84,6 +94,17 @@ public class QueryProcessor implements QueryHandler
                                    .maximumWeightedCapacity(MAX_CACHE_PREPARED_MEMORY)
                                    .weigher(thriftMemoryUsageWeigher)
                                    .build();
+
+        ClientState state = ClientState.forInternalCalls();
+        try
+        {
+            state.setKeyspace(Keyspace.SYSTEM_KS);
+        }
+        catch (InvalidRequestException e)
+        {
+            throw new RuntimeException();
+        }
+        internalQueryState = new QueryState(state);
     }
 
     private QueryProcessor()
@@ -190,16 +211,40 @@ public class QueryProcessor implements QueryHandler
         }
     }
 
-    public static UntypedResultSet processInternal(String query)
+    private static QueryOptions makeInternalOptions(ParsedStatement.Prepared prepared, Object[] values)
+    {
+        if (prepared.boundNames.size() != values.length)
+            throw new IllegalArgumentException(String.format("Invalid number of values. Expecting %d but got %d", prepared.boundNames.size(), values.length));
+
+        List<ByteBuffer> boundValues = new ArrayList<ByteBuffer>(values.length);
+        for (int i = 0; i < values.length; i++)
+        {
+            Object value = values[i];
+            AbstractType type = prepared.boundNames.get(i).type;
+            boundValues.add(value instanceof ByteBuffer || value == null ? (ByteBuffer)value : type.decompose(value));
+        }
+        return QueryOptions.forInternalCalls(boundValues);
+    }
+
+    private static ParsedStatement.Prepared prepareInternal(String query) throws RequestValidationException
+    {
+        ParsedStatement.Prepared prepared = internalStatements.get(query);
+        if (prepared != null)
+            return prepared;
+
+        // Note: if 2 threads prepare the same query, we'll live so don't bother synchronizing
+        prepared = parseStatement(query, internalQueryState);
+        prepared.statement.validate(internalQueryState.getClientState());
+        internalStatements.putIfAbsent(query, prepared);
+        return prepared;
+    }
+
+    public static UntypedResultSet executeInternal(String query, Object... values)
     {
         try
         {
-            ClientState state = ClientState.forInternalCalls();
-            QueryState qState = new QueryState(state);
-            state.setKeyspace(Keyspace.SYSTEM_KS);
-            CQLStatement statement = getStatement(query, state).statement;
-            statement.validate(state);
-            ResultMessage result = statement.executeInternal(qState);
+            ParsedStatement.Prepared prepared = prepareInternal(query);
+            ResultMessage result = prepared.statement.executeInternal(internalQueryState, makeInternalOptions(prepared, values));
             if (result instanceof ResultMessage.Rows)
                 return UntypedResultSet.create(((ResultMessage.Rows)result).result);
             else
@@ -215,6 +260,50 @@ public class QueryProcessor implements QueryHandler
         }
     }
 
+    public static UntypedResultSet executeInternalWithPaging(String query, int pageSize, Object... values)
+    {
+        try
+        {
+            ParsedStatement.Prepared prepared = prepareInternal(query);
+            if (!(prepared.statement instanceof SelectStatement))
+                throw new IllegalArgumentException("Only SELECTs can be paged");
+
+            SelectStatement select = (SelectStatement)prepared.statement;
+            QueryPager pager = QueryPagers.localPager(select.getPageableCommand(makeInternalOptions(prepared, values)));
+            return UntypedResultSet.create(select, pager, pageSize);
+        }
+        catch (RequestValidationException e)
+        {
+            throw new RuntimeException("Error validating query" + e);
+        }
+    }
+
+    /**
+     * Same than executeInternal, but to use for queries we know are only executed once so that the
+     * created statement object is not cached.
+     */
+    public static UntypedResultSet executeOnceInternal(String query, Object... values)
+    {
+        try
+        {
+            ParsedStatement.Prepared prepared = parseStatement(query, internalQueryState);
+            prepared.statement.validate(internalQueryState.getClientState());
+            ResultMessage result = prepared.statement.executeInternal(internalQueryState, makeInternalOptions(prepared, values));
+            if (result instanceof ResultMessage.Rows)
+                return UntypedResultSet.create(((ResultMessage.Rows)result).result);
+            else
+                return null;
+        }
+        catch (RequestExecutionException e)
+        {
+            throw new RuntimeException(e);
+        }
+        catch (RequestValidationException e)
+        {
+            throw new RuntimeException("Error validating query " + query, e);
+        }
+    }
+
     public static UntypedResultSet resultify(String query, Row row)
     {
         return resultify(query, Collections.singletonList(row));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1147ee3a/src/java/org/apache/cassandra/cql3/UntypedResultSet.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/UntypedResultSet.java b/src/java/org/apache/cassandra/cql3/UntypedResultSet.java
index 5519f2e..7e0f15a 100644
--- a/src/java/org/apache/cassandra/cql3/UntypedResultSet.java
+++ b/src/java/org/apache/cassandra/cql3/UntypedResultSet.java
@@ -24,8 +24,10 @@ import java.util.*;
 
 import com.google.common.collect.AbstractIterator;
 
+import org.apache.cassandra.cql3.statements.SelectStatement;
 import org.apache.cassandra.db.marshal.*;
-import org.apache.cassandra.cql3.ResultSet;
+import org.apache.cassandra.exceptions.*;
+import org.apache.cassandra.service.pager.QueryPager;
 
 /** a utility for doing internal cql-based queries */
 public abstract class UntypedResultSet implements Iterable<UntypedResultSet.Row>
@@ -40,6 +42,11 @@ public abstract class UntypedResultSet implements Iterable<UntypedResultSet.Row>
         return new FromResultList(results);
     }
 
+    public static UntypedResultSet create(SelectStatement select, QueryPager pager, int pageSize)
+    {
+        return new FromPager(select, pager, pageSize);
+    }
+
     public boolean isEmpty()
     {
         return size() == 0;
@@ -122,6 +129,55 @@ public abstract class UntypedResultSet implements Iterable<UntypedResultSet.Row>
         }
     }
 
+    private static class FromPager extends UntypedResultSet
+    {
+        private final SelectStatement select;
+        private final QueryPager pager;
+        private final int pageSize;
+        private final List<ColumnSpecification> metadata;
+
+        private FromPager(SelectStatement select, QueryPager pager, int pageSize)
+        {
+            this.select = select;
+            this.pager = pager;
+            this.pageSize = pageSize;
+            this.metadata = select.getResultMetadata().names;
+        }
+
+        public int size()
+        {
+            throw new UnsupportedOperationException();
+        }
+
+        public Row one()
+        {
+            throw new UnsupportedOperationException();
+        }
+
+        public Iterator<Row> iterator()
+        {
+            return new AbstractIterator<Row>()
+            {
+                private Iterator<List<ByteBuffer>> currentPage;
+
+                protected Row computeNext()
+                {
+                    try {
+                        while (currentPage == null || !currentPage.hasNext())
+                        {
+                            if (pager.isExhausted())
+                                return endOfData();
+                            currentPage = select.process(pager.fetchPage(pageSize)).rows.iterator();
+                        }
+                        return new Row(metadata, currentPage.next());
+                    } catch (RequestValidationException | RequestExecutionException e) {
+                        throw new RuntimeException(e);
+                    }
+                }
+            };
+        }
+    }
+
     public static class Row
     {
         private final Map<String, ByteBuffer> data = new HashMap<>();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1147ee3a/src/java/org/apache/cassandra/cql3/statements/AuthenticationStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/AuthenticationStatement.java b/src/java/org/apache/cassandra/cql3/statements/AuthenticationStatement.java
index 5fcf085..b47dd92 100644
--- a/src/java/org/apache/cassandra/cql3/statements/AuthenticationStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/AuthenticationStatement.java
@@ -45,7 +45,7 @@ public abstract class AuthenticationStatement extends ParsedStatement implements
 
     public abstract ResultMessage execute(ClientState state) throws RequestExecutionException, RequestValidationException;
 
-    public ResultMessage executeInternal(QueryState state)
+    public ResultMessage executeInternal(QueryState state, QueryOptions options)
     {
         // executeInternal is for local query only, thus altering users doesn't make sense and is not supported
         throw new UnsupportedOperationException();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1147ee3a/src/java/org/apache/cassandra/cql3/statements/AuthorizationStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/AuthorizationStatement.java b/src/java/org/apache/cassandra/cql3/statements/AuthorizationStatement.java
index db4581e..2c7f2cb 100644
--- a/src/java/org/apache/cassandra/cql3/statements/AuthorizationStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/AuthorizationStatement.java
@@ -47,7 +47,7 @@ public abstract class AuthorizationStatement extends ParsedStatement implements
 
     public abstract ResultMessage execute(ClientState state) throws RequestValidationException, RequestExecutionException;
 
-    public ResultMessage executeInternal(QueryState state)
+    public ResultMessage executeInternal(QueryState state, QueryOptions options)
     {
         // executeInternal is for local query only, thus altering permission doesn't make sense and is not supported
         throw new UnsupportedOperationException();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1147ee3a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
index 05d37da..5b058f3 100644
--- a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
@@ -311,10 +311,10 @@ public class BatchStatement implements CQLStatement, MeasurableForPreparedCache
         return new ResultMessage.Rows(ModificationStatement.buildCasResultSet(ksName, key, cfName, result, columnsWithConditions, true, options.forStatement(0)));
     }
 
-    public ResultMessage executeInternal(QueryState queryState) throws RequestValidationException, RequestExecutionException
+    public ResultMessage executeInternal(QueryState queryState, QueryOptions options) throws RequestValidationException, RequestExecutionException
     {
         assert !hasConditions;
-        for (IMutation mutation : getMutations(BatchQueryOptions.DEFAULT, true, queryState.getTimestamp()))
+        for (IMutation mutation : getMutations(BatchQueryOptions.withoutPerStatementVariables(options), true, queryState.getTimestamp()))
         {
             // We don't use counters internally.
             assert mutation instanceof Mutation;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1147ee3a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
index e1468fb..ad88eaf 100644
--- a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
@@ -621,12 +621,12 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
         return builder.build();
     }
 
-    public ResultMessage executeInternal(QueryState queryState) throws RequestValidationException, RequestExecutionException
+    public ResultMessage executeInternal(QueryState queryState, QueryOptions options) throws RequestValidationException, RequestExecutionException
     {
         if (hasConditions())
             throw new UnsupportedOperationException();
 
-        for (IMutation mutation : getMutations(QueryOptions.DEFAULT, true, queryState.getTimestamp()))
+        for (IMutation mutation : getMutations(options, true, queryState.getTimestamp()))
         {
             // We don't use counters internally.
             assert mutation instanceof Mutation;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1147ee3a/src/java/org/apache/cassandra/cql3/statements/SchemaAlteringStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/SchemaAlteringStatement.java b/src/java/org/apache/cassandra/cql3/statements/SchemaAlteringStatement.java
index 337e8dc..94df854 100644
--- a/src/java/org/apache/cassandra/cql3/statements/SchemaAlteringStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/SchemaAlteringStatement.java
@@ -73,7 +73,7 @@ public abstract class SchemaAlteringStatement extends CFStatement implements CQL
         return new ResultMessage.SchemaChange(changeType(), keyspace(), tableName);
     }
 
-    public ResultMessage executeInternal(QueryState state)
+    public ResultMessage executeInternal(QueryState state, QueryOptions options)
     {
         // executeInternal is for local query only, thus altering schema is not supported
         throw new UnsupportedOperationException();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1147ee3a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
index b212147..420f475 100644
--- a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
@@ -191,18 +191,8 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
         cl.validateForRead(keyspace());
 
         int limit = getLimit(options);
-        int limitForQuery = updateLimitForQuery(limit);
         long now = System.currentTimeMillis();
-        Pageable command;
-        if (isKeyRange || usesSecondaryIndexing)
-        {
-            command = getRangeCommand(options, limitForQuery, now);
-        }
-        else
-        {
-            List<ReadCommand> commands = getSliceCommands(options, limitForQuery, now);
-            command = commands == null ? null : new Pageable.ReadCommands(commands);
-        }
+        Pageable command = getPageableCommand(options, limit, now);
 
         int pageSize = options.getPageSize();
         // A count query will never be paged for the user, but we always page it internally to avoid OOM.
@@ -234,6 +224,21 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
         }
     }
 
+    private Pageable getPageableCommand(QueryOptions options, int limit, long now) throws RequestValidationException
+    {
+        int limitForQuery = updateLimitForQuery(limit);
+        if (isKeyRange || usesSecondaryIndexing)
+            return getRangeCommand(options, limitForQuery, now);
+
+        List<ReadCommand> commands = getSliceCommands(options, limitForQuery, now);
+        return commands == null ? null : new Pageable.ReadCommands(commands);
+    }
+
+    public Pageable getPageableCommand(QueryOptions options) throws RequestValidationException
+    {
+        return getPageableCommand(options, getLimit(options), System.currentTimeMillis());
+    }
+
     private ResultMessage.Rows execute(Pageable command, QueryOptions options, int limit, long now) throws RequestValidationException, RequestExecutionException
     {
         List<Row> rows;
@@ -285,23 +290,16 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
         return rows;
     }
 
-    public ResultMessage.Rows executeInternal(QueryState state) throws RequestExecutionException, RequestValidationException
+    public ResultMessage.Rows executeInternal(QueryState state, QueryOptions options) throws RequestExecutionException, RequestValidationException
     {
-        QueryOptions options = QueryOptions.DEFAULT;
         int limit = getLimit(options);
-        int limitForQuery = updateLimitForQuery(limit);
         long now = System.currentTimeMillis();
-        List<Row> rows;
-        if (isKeyRange || usesSecondaryIndexing)
-        {
-            RangeSliceCommand command = getRangeCommand(options, limitForQuery, now);
-            rows = command == null ? Collections.<Row>emptyList() : command.executeLocally();
-        }
-        else
-        {
-            List<ReadCommand> commands = getSliceCommands(options, limitForQuery, now);
-            rows = commands == null ? Collections.<Row>emptyList() : readLocally(keyspace(), commands);
-        }
+        Pageable command = getPageableCommand(options, limit, now);
+        List<Row> rows = command == null
+                       ? Collections.<Row>emptyList()
+                       : (command instanceof Pageable.ReadCommands
+                          ? readLocally(keyspace(), ((Pageable.ReadCommands)command).commands)
+                          : ((RangeSliceCommand)command).executeLocally());
 
         return processResults(rows, options, limit, now);
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1147ee3a/src/java/org/apache/cassandra/cql3/statements/TruncateStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/TruncateStatement.java b/src/java/org/apache/cassandra/cql3/statements/TruncateStatement.java
index 30e57d5..ef1c4a4 100644
--- a/src/java/org/apache/cassandra/cql3/statements/TruncateStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/TruncateStatement.java
@@ -77,7 +77,7 @@ public class TruncateStatement extends CFStatement implements CQLStatement
         return null;
     }
 
-    public ResultMessage executeInternal(QueryState state)
+    public ResultMessage executeInternal(QueryState state, QueryOptions options)
     {
         throw new UnsupportedOperationException();
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1147ee3a/src/java/org/apache/cassandra/cql3/statements/UseStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/UseStatement.java b/src/java/org/apache/cassandra/cql3/statements/UseStatement.java
index ee70f9d..efda72d 100644
--- a/src/java/org/apache/cassandra/cql3/statements/UseStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/UseStatement.java
@@ -59,7 +59,7 @@ public class UseStatement extends ParsedStatement implements CQLStatement
         return new ResultMessage.SetKeyspace(keyspace);
     }
 
-    public ResultMessage executeInternal(QueryState state)
+    public ResultMessage executeInternal(QueryState state, QueryOptions options)
     {
         // Internal queries are exclusively on the system keyspace and 'use' is thus useless
         throw new UnsupportedOperationException();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1147ee3a/src/java/org/apache/cassandra/db/BatchlogManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/BatchlogManager.java b/src/java/org/apache/cassandra/db/BatchlogManager.java
index 1a441f6..4e7e412 100644
--- a/src/java/org/apache/cassandra/db/BatchlogManager.java
+++ b/src/java/org/apache/cassandra/db/BatchlogManager.java
@@ -58,6 +58,8 @@ import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.WrappedRunnable;
 
+import static org.apache.cassandra.cql3.QueryProcessor.executeInternal;
+
 public class BatchlogManager implements BatchlogManagerMBean
 {
     private static final String MBEAN_NAME = "org.apache.cassandra.db:type=BatchlogManager";
@@ -97,7 +99,7 @@ public class BatchlogManager implements BatchlogManagerMBean
 
     public int countAllBatches()
     {
-        return (int) process("SELECT count(*) FROM %s.%s", Keyspace.SYSTEM_KS, SystemKeyspace.BATCHLOG_CF).one().getLong("count");
+        return (int) executeInternal("SELECT count(*) FROM %s.%s", Keyspace.SYSTEM_KS, SystemKeyspace.BATCHLOG_CF).one().getLong("count");
     }
 
     public long getTotalBatchesReplayed()
@@ -166,10 +168,10 @@ public class BatchlogManager implements BatchlogManagerMBean
 
         try
         {
-            UntypedResultSet page = process("SELECT id, data, written_at, version FROM %s.%s LIMIT %d",
-                                            Keyspace.SYSTEM_KS,
-                                            SystemKeyspace.BATCHLOG_CF,
-                                            PAGE_SIZE);
+            UntypedResultSet page = executeInternal(String.format("SELECT id, data, written_at, version FROM %s.%s LIMIT %d",
+                                                                  Keyspace.SYSTEM_KS,
+                                                                  SystemKeyspace.BATCHLOG_CF,
+                                                                  PAGE_SIZE));
 
             while (!page.isEmpty())
             {
@@ -178,11 +180,11 @@ public class BatchlogManager implements BatchlogManagerMBean
                 if (page.size() < PAGE_SIZE)
                     break; // we've exhausted the batchlog, next query would be empty.
 
-                page = process("SELECT id, data, written_at, version FROM %s.%s WHERE token(id) > token(%s) LIMIT %d",
-                               Keyspace.SYSTEM_KS,
-                               SystemKeyspace.BATCHLOG_CF,
-                               id,
-                               PAGE_SIZE);
+                page = executeInternal(String.format("SELECT id, data, written_at, version FROM %s.%s WHERE token(id) > token(?) LIMIT %d",
+                                                     Keyspace.SYSTEM_KS,
+                                                     SystemKeyspace.BATCHLOG_CF,
+                                                     PAGE_SIZE), 
+                                       id);
             }
 
             cleanup();
@@ -450,11 +452,6 @@ public class BatchlogManager implements BatchlogManagerMBean
             CompactionManager.instance.submitUserDefined(cfs, descriptors, Integer.MAX_VALUE).get();
     }
 
-    private static UntypedResultSet process(String format, Object... args)
-    {
-        return QueryProcessor.processInternal(String.format(format, args));
-    }
-
     public static class EndpointFilter
     {
         private final String localRack;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1147ee3a/src/java/org/apache/cassandra/db/SystemKeyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SystemKeyspace.java b/src/java/org/apache/cassandra/db/SystemKeyspace.java
index 2025d5e..9cb6e94 100644
--- a/src/java/org/apache/cassandra/db/SystemKeyspace.java
+++ b/src/java/org/apache/cassandra/db/SystemKeyspace.java
@@ -61,7 +61,8 @@ import org.apache.cassandra.thrift.cassandraConstants;
 import org.apache.cassandra.transport.Server;
 import org.apache.cassandra.utils.*;
 
-import static org.apache.cassandra.cql3.QueryProcessor.processInternal;
+import static org.apache.cassandra.cql3.QueryProcessor.executeInternal;
+import static org.apache.cassandra.cql3.QueryProcessor.executeOnceInternal;
 
 public class SystemKeyspace
 {
@@ -123,10 +124,7 @@ public class SystemKeyspace
 
             // delete old, possibly obsolete entries in schema columnfamilies
             for (String cfname : Arrays.asList(SystemKeyspace.SCHEMA_KEYSPACES_CF, SystemKeyspace.SCHEMA_COLUMNFAMILIES_CF, SystemKeyspace.SCHEMA_COLUMNS_CF))
-            {
-                String req = String.format("DELETE FROM system.%s WHERE keyspace_name = '%s'", cfname, ksmd.name);
-                processInternal(req);
-            }
+                executeOnceInternal(String.format("DELETE FROM system.%s WHERE keyspace_name = ?", cfname), ksmd.name);
 
             // (+1 to timestamp to make sure we don't get shadowed by the tombstones we just added)
             ksmd.toSchema(FBUtilities.timestampMicros() + 1).apply();
@@ -135,24 +133,24 @@ public class SystemKeyspace
 
     private static void setupVersion()
     {
-        String req = "INSERT INTO system.%s (key, release_version, cql_version, thrift_version, native_protocol_version, data_center, rack, partitioner) VALUES ('%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s')";
+        String req = "INSERT INTO system.%s (key, release_version, cql_version, thrift_version, native_protocol_version, data_center, rack, partitioner) VALUES (?, ?, ?, ?, ?, ?, ?, ?)";
         IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch();
-        processInternal(String.format(req, LOCAL_CF,
-                                         LOCAL_KEY,
-                                         FBUtilities.getReleaseVersionString(),
-                                         QueryProcessor.CQL_VERSION.toString(),
-                                         cassandraConstants.VERSION,
-                                         Server.CURRENT_VERSION,
-                                         snitch.getDatacenter(FBUtilities.getBroadcastAddress()),
-                                         snitch.getRack(FBUtilities.getBroadcastAddress()),
-                                         DatabaseDescriptor.getPartitioner().getClass().getName()));
+        executeOnceInternal(String.format(req, LOCAL_CF),
+                            LOCAL_KEY,
+                            FBUtilities.getReleaseVersionString(),
+                            QueryProcessor.CQL_VERSION.toString(),
+                            cassandraConstants.VERSION,
+                            String.valueOf(Server.CURRENT_VERSION),
+                            snitch.getDatacenter(FBUtilities.getBroadcastAddress()),
+                            snitch.getRack(FBUtilities.getBroadcastAddress()),
+                            DatabaseDescriptor.getPartitioner().getClass().getName());
     }
 
     // TODO: In 3.0, remove this and the index_interval column from system.schema_columnfamilies
     /** Migrates index_interval values to min_index_interval and sets index_interval to null */
     private static void migrateIndexInterval()
     {
-        for (UntypedResultSet.Row row : processInternal(String.format("SELECT * FROM system.%s", SCHEMA_COLUMNFAMILIES_CF)))
+        for (UntypedResultSet.Row row : executeOnceInternal(String.format("SELECT * FROM system.%s", SCHEMA_COLUMNFAMILIES_CF)))
         {
             if (!row.has("index_interval"))
                 continue;
@@ -160,13 +158,8 @@ public class SystemKeyspace
             logger.debug("Migrating index_interval to min_index_interval");
 
             CFMetaData table = CFMetaData.fromSchema(row);
-            String query = String.format("SELECT writetime(type) "
-                    + "FROM system.%s "
-                    + "WHERE keyspace_name = '%s' AND columnfamily_name = '%s'",
-                    SCHEMA_COLUMNFAMILIES_CF,
-                    table.ksName,
-                    table.cfName);
-            long timestamp = processInternal(query).one().getLong("writetime(type)");
+            String query = String.format("SELECT writetime(type) FROM system.%s WHERE keyspace_name = ? AND columnfamily_name = ?", SCHEMA_COLUMNFAMILIES_CF);
+            long timestamp = executeOnceInternal(query, table.ksName, table.cfName).one().getLong("writetime(type)");
             try
             {
                 table.toSchema(timestamp).apply();
@@ -180,7 +173,7 @@ public class SystemKeyspace
 
     private static void migrateCachingOption()
     {
-        for (UntypedResultSet.Row row : processInternal(String.format("SELECT * FROM system.%s", SCHEMA_COLUMNFAMILIES_CF)))
+        for (UntypedResultSet.Row row : executeOnceInternal(String.format("SELECT * FROM system.%s", SCHEMA_COLUMNFAMILIES_CF)))
         {
             if (!row.has("caching"))
                 continue;
@@ -192,13 +185,8 @@ public class SystemKeyspace
                 CachingOptions caching = CachingOptions.fromString(row.getString("caching"));
                 CFMetaData table = CFMetaData.fromSchema(row);
                 logger.info("Migrating caching option {} to {} for {}.{}", row.getString("caching"), caching.toString(), table.ksName, table.cfName);
-                String query = String.format("SELECT writetime(type) "
-                        + "FROM system.%s "
-                        + "WHERE keyspace_name = '%s' AND columnfamily_name = '%s'",
-                        SCHEMA_COLUMNFAMILIES_CF,
-                        table.ksName,
-                        table.cfName);
-                long timestamp = processInternal(query).one().getLong("writetime(type)");
+                String query = String.format("SELECT writetime(type) FROM system.%s WHERE keyspace_name = ? AND columnfamily_name = ?", SCHEMA_COLUMNFAMILIES_CF);
+                long timestamp = executeOnceInternal(query, table.ksName, table.cfName).one().getLong("writetime(type)");
                 table.toSchema(timestamp).apply();
             }
             catch (ConfigurationException e)
@@ -221,7 +209,6 @@ public class SystemKeyspace
             return null;
 
         UUID compactionId = UUIDGen.getTimeUUID();
-        String req = "INSERT INTO system.%s (id, keyspace_name, columnfamily_name, inputs) VALUES (%s, '%s', '%s', {%s})";
         Iterable<Integer> generations = Iterables.transform(toCompact, new Function<SSTableReader, Integer>()
         {
             public Integer apply(SSTableReader sstable)
@@ -229,7 +216,8 @@ public class SystemKeyspace
                 return sstable.descriptor.generation;
             }
         });
-        processInternal(String.format(req, COMPACTION_LOG, compactionId, cfs.keyspace.getName(), cfs.name, StringUtils.join(Sets.newHashSet(generations), ',')));
+        String req = "INSERT INTO system.%s (id, keyspace_name, columnfamily_name, inputs) VALUES (?, ?, ?, ?)";
+        executeInternal(String.format(req, COMPACTION_LOG), compactionId, cfs.keyspace.getName(), cfs.name, Sets.newHashSet(generations));
         forceBlockingFlush(COMPACTION_LOG);
         return compactionId;
     }
@@ -243,8 +231,7 @@ public class SystemKeyspace
     {
         assert taskId != null;
 
-        String req = "DELETE FROM system.%s WHERE id = %s";
-        processInternal(String.format(req, COMPACTION_LOG, taskId));
+        executeInternal(String.format("DELETE FROM system.%s WHERE id = ?", COMPACTION_LOG), taskId);
         forceBlockingFlush(COMPACTION_LOG);
     }
 
@@ -255,7 +242,7 @@ public class SystemKeyspace
     public static Map<Pair<String, String>, Map<Integer, UUID>> getUnfinishedCompactions()
     {
         String req = "SELECT * FROM system.%s";
-        UntypedResultSet resultSet = processInternal(String.format(req, COMPACTION_LOG));
+        UntypedResultSet resultSet = executeInternal(String.format(req, COMPACTION_LOG));
 
         Map<Pair<String, String>, Map<Integer, UUID>> unfinishedCompactions = new HashMap<>();
         for (UntypedResultSet.Row row : resultSet)
@@ -294,21 +281,20 @@ public class SystemKeyspace
         // don't write anything when the history table itself is compacted, since that would in turn cause new compactions
         if (ksname.equals("system") && cfname.equals(COMPACTION_HISTORY_CF))
             return;
-        String req = "INSERT INTO system.%s (id, keyspace_name, columnfamily_name, compacted_at, bytes_in, bytes_out, rows_merged) "
-                     + "VALUES (%s, '%s', '%s', %d, %d, %d, {%s})";
-        processInternal(String.format(req, COMPACTION_HISTORY_CF, UUIDGen.getTimeUUID().toString(), ksname, cfname, compactedAt, bytesIn, bytesOut, FBUtilities.toString(rowsMerged)));
+        String req = "INSERT INTO system.%s (id, keyspace_name, columnfamily_name, compacted_at, bytes_in, bytes_out, rows_merged) VALUES (?, ?, ?, ?, ?, ?, ?)";
+        executeInternal(String.format(req, COMPACTION_HISTORY_CF), UUIDGen.getTimeUUID(), ksname, cfname, ByteBufferUtil.bytes(compactedAt), bytesIn, bytesOut, rowsMerged);
     }
 
     public static TabularData getCompactionHistory() throws OpenDataException
     {
-        UntypedResultSet queryResultSet = processInternal("SELECT * from system.compaction_history");
+        UntypedResultSet queryResultSet = executeInternal(String.format("SELECT * from system.%s", COMPACTION_HISTORY_CF));
         return CompactionHistoryTabularData.from(queryResultSet);
     }
 
     public static synchronized void saveTruncationRecord(ColumnFamilyStore cfs, long truncatedAt, ReplayPosition position)
     {
-        String req = "UPDATE system.%s SET truncated_at = truncated_at + %s WHERE key = '%s'";
-        processInternal(String.format(req, LOCAL_CF, truncationAsMapEntry(cfs, truncatedAt, position), LOCAL_KEY));
+        String req = "UPDATE system.%s SET truncated_at = truncated_at + ? WHERE key = '%s'";
+        executeInternal(String.format(req, LOCAL_CF, LOCAL_KEY), truncationAsMapEntry(cfs, truncatedAt, position));
         truncationRecords = null;
         forceBlockingFlush(LOCAL_CF);
     }
@@ -318,13 +304,13 @@ public class SystemKeyspace
      */
     public static synchronized void removeTruncationRecord(UUID cfId)
     {
-        String req = "DELETE truncated_at[%s] from system.%s WHERE key = '%s'";
-        processInternal(String.format(req, cfId, LOCAL_CF, LOCAL_KEY));
+        String req = "DELETE truncated_at[?] from system.%s WHERE key = '%s'";
+        executeInternal(String.format(req, LOCAL_CF, LOCAL_KEY), cfId);
         truncationRecords = null;
         forceBlockingFlush(LOCAL_CF);
     }
 
-    private static String truncationAsMapEntry(ColumnFamilyStore cfs, long truncatedAt, ReplayPosition position)
+    private static Map<UUID, ByteBuffer> truncationAsMapEntry(ColumnFamilyStore cfs, long truncatedAt, ReplayPosition position)
     {
         DataOutputBuffer out = new DataOutputBuffer();
         try
@@ -336,9 +322,7 @@ public class SystemKeyspace
         {
             throw new RuntimeException(e);
         }
-        return String.format("{%s: 0x%s}",
-                             cfs.metadata.cfId,
-                             ByteBufferUtil.bytesToHex(ByteBuffer.wrap(out.getData(), 0, out.getLength())));
+        return Collections.<UUID, ByteBuffer>singletonMap(cfs.metadata.cfId, ByteBuffer.wrap(out.getData(), 0, out.getLength()));
     }
 
     public static ReplayPosition getTruncatedPosition(UUID cfId)
@@ -362,9 +346,7 @@ public class SystemKeyspace
 
     private static Map<UUID, Pair<ReplayPosition, Long>> readTruncationRecords()
     {
-        UntypedResultSet rows = processInternal(String.format("SELECT truncated_at FROM system.%s WHERE key = '%s'",
-                                                              LOCAL_CF,
-                                                              LOCAL_KEY));
+        UntypedResultSet rows = executeInternal(String.format("SELECT truncated_at FROM system.%s WHERE key = '%s'", LOCAL_CF, LOCAL_KEY));
 
         Map<UUID, Pair<ReplayPosition, Long>> records = new HashMap<>();
 
@@ -402,53 +384,46 @@ public class SystemKeyspace
             return;
         }
 
-        String req = "INSERT INTO system.%s (peer, tokens) VALUES ('%s', %s)";
-        processInternal(String.format(req, PEERS_CF, ep.getHostAddress(), tokensAsSet(tokens)));
+        String req = "INSERT INTO system.%s (peer, tokens) VALUES (?, ?)";
+        executeInternal(String.format(req, PEERS_CF), ep, tokensAsSet(tokens));
     }
 
     public static synchronized void updatePreferredIP(InetAddress ep, InetAddress preferred_ip)
     {
-        String req = "INSERT INTO system.%s (peer, preferred_ip) VALUES ('%s', '%s')";
-        processInternal(String.format(req, PEERS_CF, ep.getHostAddress(), preferred_ip.getHostAddress()));
+        String req = "INSERT INTO system.%s (peer, preferred_ip) VALUES (?, ?)";
+        executeInternal(String.format(req, PEERS_CF), ep, preferred_ip);
         forceBlockingFlush(PEERS_CF);
     }
 
-    public static synchronized void updatePeerInfo(InetAddress ep, String columnName, String value)
+    public static synchronized void updatePeerInfo(InetAddress ep, String columnName, Object value)
     {
         if (ep.equals(FBUtilities.getBroadcastAddress()))
             return;
 
-        String req = "INSERT INTO system.%s (peer, %s) VALUES ('%s', %s)";
-        processInternal(String.format(req, PEERS_CF, columnName, ep.getHostAddress(), value));
+        String req = "INSERT INTO system.%s (peer, %s) VALUES (?, ?)";
+        executeInternal(String.format(req, PEERS_CF, columnName), ep, value);
     }
 
     public static synchronized void updateHintsDropped(InetAddress ep, UUID timePeriod, int value)
     {
         // with 30 day TTL
-        String req = "UPDATE system.%s USING TTL 2592000 SET hints_dropped[ %s ] = %s WHERE peer = '%s'";
-        processInternal(String.format(req, PEER_EVENTS_CF, timePeriod.toString(), value, ep.getHostAddress()));
+        String req = "UPDATE system.%s USING TTL 2592000 SET hints_dropped[ ? ] = ? WHERE peer = ?";
+        executeInternal(String.format(req, PEER_EVENTS_CF), timePeriod, value, ep);
     }
 
     public static synchronized void updateSchemaVersion(UUID version)
     {
-        String req = "INSERT INTO system.%s (key, schema_version) VALUES ('%s', %s)";
-        processInternal(String.format(req, LOCAL_CF, LOCAL_KEY, version.toString()));
+        String req = "INSERT INTO system.%s (key, schema_version) VALUES ('%s', ?)";
+        executeInternal(String.format(req, LOCAL_CF, LOCAL_KEY), version);
     }
 
-    private static String tokensAsSet(Collection<Token> tokens)
+    private static Set<String> tokensAsSet(Collection<Token> tokens)
     {
         Token.TokenFactory factory = StorageService.getPartitioner().getTokenFactory();
-        StringBuilder sb = new StringBuilder();
-        sb.append("{");
-        Iterator<Token> iter = tokens.iterator();
-        while (iter.hasNext())
-        {
-            sb.append("'").append(factory.toString(iter.next())).append("'");
-            if (iter.hasNext())
-                sb.append(",");
-        }
-        sb.append("}");
-        return sb.toString();
+        Set<String> s = new HashSet<>(tokens.size());
+        for (Token tk : tokens)
+            s.add(factory.toString(tk));
+        return s;
     }
 
     private static Collection<Token> deserializeTokens(Collection<String> tokensStrings)
@@ -465,8 +440,8 @@ public class SystemKeyspace
      */
     public static synchronized void removeEndpoint(InetAddress ep)
     {
-        String req = "DELETE FROM system.%s WHERE peer = '%s'";
-        processInternal(String.format(req, PEERS_CF, ep.getHostAddress()));
+        String req = "DELETE FROM system.%s WHERE peer = ?";
+        executeInternal(String.format(req, PEERS_CF), ep);
     }
 
     /**
@@ -475,8 +450,8 @@ public class SystemKeyspace
     public static synchronized void updateTokens(Collection<Token> tokens)
     {
         assert !tokens.isEmpty() : "removeEndpoint should be used instead";
-        String req = "INSERT INTO system.%s (key, tokens) VALUES ('%s', %s)";
-        processInternal(String.format(req, LOCAL_CF, LOCAL_KEY, tokensAsSet(tokens)));
+        String req = "INSERT INTO system.%s (key, tokens) VALUES ('%s', ?)";
+        executeInternal(String.format(req, LOCAL_CF, LOCAL_KEY), tokensAsSet(tokens));
         forceBlockingFlush(LOCAL_CF);
     }
 
@@ -509,7 +484,7 @@ public class SystemKeyspace
     public static SetMultimap<InetAddress, Token> loadTokens()
     {
         SetMultimap<InetAddress, Token> tokenMap = HashMultimap.create();
-        for (UntypedResultSet.Row row : processInternal("SELECT peer, tokens FROM system." + PEERS_CF))
+        for (UntypedResultSet.Row row : executeInternal("SELECT peer, tokens FROM system." + PEERS_CF))
         {
             InetAddress peer = row.getInetAddress("peer");
             if (row.has("tokens"))
@@ -526,7 +501,7 @@ public class SystemKeyspace
     public static Map<InetAddress, UUID> loadHostIds()
     {
         Map<InetAddress, UUID> hostIdMap = new HashMap<InetAddress, UUID>();
-        for (UntypedResultSet.Row row : processInternal("SELECT peer, host_id FROM system." + PEERS_CF))
+        for (UntypedResultSet.Row row : executeInternal("SELECT peer, host_id FROM system." + PEERS_CF))
         {
             InetAddress peer = row.getInetAddress("peer");
             if (row.has("host_id"))
@@ -539,8 +514,8 @@ public class SystemKeyspace
 
     public static InetAddress getPreferredIP(InetAddress ep)
     {
-        String req = "SELECT preferred_ip FROM system.%s WHERE peer='%s'";
-        UntypedResultSet result = processInternal(String.format(req, PEERS_CF, ep.getHostAddress()));
+        String req = "SELECT preferred_ip FROM system.%s WHERE peer=?";
+        UntypedResultSet result = executeInternal(String.format(req, PEERS_CF), ep);
         if (!result.isEmpty() && result.one().has("preferred_ip"))
             return result.one().getInetAddress("preferred_ip");
         return null;
@@ -552,7 +527,7 @@ public class SystemKeyspace
     public static Map<InetAddress, Map<String,String>> loadDcRackInfo()
     {
         Map<InetAddress, Map<String, String>> result = new HashMap<InetAddress, Map<String, String>>();
-        for (UntypedResultSet.Row row : processInternal("SELECT peer, data_center, rack from system." + PEERS_CF))
+        for (UntypedResultSet.Row row : executeInternal("SELECT peer, data_center, rack from system." + PEERS_CF))
         {
             InetAddress peer = row.getInetAddress("peer");
             if (row.has("data_center") && row.has("rack"))
@@ -590,7 +565,7 @@ public class SystemKeyspace
         ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(LOCAL_CF);
 
         String req = "SELECT cluster_name FROM system.%s WHERE key='%s'";
-        UntypedResultSet result = processInternal(String.format(req, LOCAL_CF, LOCAL_KEY));
+        UntypedResultSet result = executeInternal(String.format(req, LOCAL_CF, LOCAL_KEY));
 
         if (result.isEmpty() || !result.one().has("cluster_name"))
         {
@@ -599,8 +574,8 @@ public class SystemKeyspace
                 throw new ConfigurationException("Found system keyspace files, but they couldn't be loaded!");
 
             // no system files.  this is a new node.
-            req = "INSERT INTO system.%s (key, cluster_name) VALUES ('%s', '%s')";
-            processInternal(String.format(req, LOCAL_CF, LOCAL_KEY, DatabaseDescriptor.getClusterName()));
+            req = "INSERT INTO system.%s (key, cluster_name) VALUES ('%s', ?)";
+            executeInternal(String.format(req, LOCAL_CF, LOCAL_KEY), DatabaseDescriptor.getClusterName());
             return;
         }
 
@@ -612,7 +587,7 @@ public class SystemKeyspace
     public static Collection<Token> getSavedTokens()
     {
         String req = "SELECT tokens FROM system.%s WHERE key='%s'";
-        UntypedResultSet result = processInternal(String.format(req, LOCAL_CF, LOCAL_KEY));
+        UntypedResultSet result = executeInternal(String.format(req, LOCAL_CF, LOCAL_KEY));
         return result.isEmpty() || !result.one().has("tokens")
              ? Collections.<Token>emptyList()
              : deserializeTokens(result.one().<String>getSet("tokens", UTF8Type.instance));
@@ -621,7 +596,7 @@ public class SystemKeyspace
     public static int incrementAndGetGeneration()
     {
         String req = "SELECT gossip_generation FROM system.%s WHERE key='%s'";
-        UntypedResultSet result = processInternal(String.format(req, LOCAL_CF, LOCAL_KEY));
+        UntypedResultSet result = executeInternal(String.format(req, LOCAL_CF, LOCAL_KEY));
 
         int generation;
         if (result.isEmpty() || !result.one().has("gossip_generation"))
@@ -648,8 +623,8 @@ public class SystemKeyspace
             }
         }
 
-        req = "INSERT INTO system.%s (key, gossip_generation) VALUES ('%s', %d)";
-        processInternal(String.format(req, LOCAL_CF, LOCAL_KEY, generation));
+        req = "INSERT INTO system.%s (key, gossip_generation) VALUES ('%s', ?)";
+        executeInternal(String.format(req, LOCAL_CF, LOCAL_KEY), generation);
         forceBlockingFlush(LOCAL_CF);
 
         return generation;
@@ -658,7 +633,7 @@ public class SystemKeyspace
     public static BootstrapState getBootstrapState()
     {
         String req = "SELECT bootstrapped FROM system.%s WHERE key='%s'";
-        UntypedResultSet result = processInternal(String.format(req, LOCAL_CF, LOCAL_KEY));
+        UntypedResultSet result = executeInternal(String.format(req, LOCAL_CF, LOCAL_KEY));
 
         if (result.isEmpty() || !result.one().has("bootstrapped"))
             return BootstrapState.NEEDS_BOOTSTRAP;
@@ -678,8 +653,8 @@ public class SystemKeyspace
 
     public static void setBootstrapState(BootstrapState state)
     {
-        String req = "INSERT INTO system.%s (key, bootstrapped) VALUES ('%s', '%s')";
-        processInternal(String.format(req, LOCAL_CF, LOCAL_KEY, state.name()));
+        String req = "INSERT INTO system.%s (key, bootstrapped) VALUES ('%s', ?)";
+        executeInternal(String.format(req, LOCAL_CF, LOCAL_KEY), state.name());
         forceBlockingFlush(LOCAL_CF);
     }
 
@@ -716,7 +691,7 @@ public class SystemKeyspace
         UUID hostId = null;
 
         String req = "SELECT host_id FROM system.%s WHERE key='%s'";
-        UntypedResultSet result = processInternal(String.format(req, LOCAL_CF, LOCAL_KEY));
+        UntypedResultSet result = executeInternal(String.format(req, LOCAL_CF, LOCAL_KEY));
 
         // Look up the Host UUID (return it if found)
         if (!result.isEmpty() && result.one().has("host_id"))
@@ -735,8 +710,8 @@ public class SystemKeyspace
      */
     public static UUID setLocalHostId(UUID hostId)
     {
-        String req = "INSERT INTO system.%s (key, host_id) VALUES ('%s', %s)";
-        processInternal(String.format(req, LOCAL_CF, LOCAL_KEY, hostId));
+        String req = "INSERT INTO system.%s (key, host_id) VALUES ('%s', ?)";
+        executeInternal(String.format(req, LOCAL_CF, LOCAL_KEY), hostId);
         return hostId;
     }
 
@@ -890,8 +865,8 @@ public class SystemKeyspace
 
     public static PaxosState loadPaxosState(ByteBuffer key, CFMetaData metadata)
     {
-        String req = "SELECT * FROM system.%s WHERE row_key = 0x%s AND cf_id = %s";
-        UntypedResultSet results = processInternal(String.format(req, PAXOS_CF, ByteBufferUtil.bytesToHex(key), metadata.cfId));
+        String req = "SELECT * FROM system.%s WHERE row_key = ? AND cf_id = ?";
+        UntypedResultSet results = executeInternal(String.format(req, PAXOS_CF), key, metadata.cfId);
         if (results.isEmpty())
             return new PaxosState(key, metadata);
         UntypedResultSet.Row row = results.one();
@@ -911,26 +886,24 @@ public class SystemKeyspace
 
     public static void savePaxosPromise(Commit promise)
     {
-        String req = "UPDATE %s USING TIMESTAMP %d AND TTL %d SET in_progress_ballot = %s WHERE row_key = 0x%s AND cf_id = %s";
-        processInternal(String.format(req,
-                                      PAXOS_CF,
-                                      UUIDGen.microsTimestamp(promise.ballot),
-                                      paxosTtl(promise.update.metadata),
-                                      promise.ballot,
-                                      ByteBufferUtil.bytesToHex(promise.key),
-                                      promise.update.id()));
+        String req = "UPDATE system.%s USING TIMESTAMP ? AND TTL ? SET in_progress_ballot = ? WHERE row_key = ? AND cf_id = ?";
+        executeInternal(String.format(req, PAXOS_CF),
+                        UUIDGen.microsTimestamp(promise.ballot),
+                        paxosTtl(promise.update.metadata),
+                        promise.ballot,
+                        promise.key,
+                        promise.update.id());
     }
 
     public static void savePaxosProposal(Commit proposal)
     {
-        processInternal(String.format("UPDATE %s USING TIMESTAMP %d AND TTL %d SET proposal_ballot = %s, proposal = 0x%s WHERE row_key = 0x%s AND cf_id = %s",
-                                      PAXOS_CF,
-                                      UUIDGen.microsTimestamp(proposal.ballot),
-                                      paxosTtl(proposal.update.metadata),
-                                      proposal.ballot,
-                                      ByteBufferUtil.bytesToHex(proposal.update.toBytes()),
-                                      ByteBufferUtil.bytesToHex(proposal.key),
-                                      proposal.update.id()));
+        executeInternal(String.format("UPDATE system.%s USING TIMESTAMP ? AND TTL ? SET proposal_ballot = ?, proposal = ? WHERE row_key = ? AND cf_id = ?", PAXOS_CF),
+                        UUIDGen.microsTimestamp(proposal.ballot),
+                        paxosTtl(proposal.update.metadata),
+                        proposal.ballot,
+                        proposal.update.toBytes(),
+                        proposal.key,
+                        proposal.update.id());
     }
 
     private static int paxosTtl(CFMetaData metadata)
@@ -943,15 +916,14 @@ public class SystemKeyspace
     {
         // We always erase the last proposal (with the commit timestamp to no erase more recent proposal in case the commit is old)
         // even though that's really just an optimization  since SP.beginAndRepairPaxos will exclude accepted proposal older than the mrc.
-        String cql = "UPDATE %s USING TIMESTAMP %d AND TTL %d SET proposal_ballot = null, proposal = null, most_recent_commit_at = %s, most_recent_commit = 0x%s WHERE row_key = 0x%s AND cf_id = %s";
-        processInternal(String.format(cql,
-                                      PAXOS_CF,
-                                      UUIDGen.microsTimestamp(commit.ballot),
-                                      paxosTtl(commit.update.metadata),
-                                      commit.ballot,
-                                      ByteBufferUtil.bytesToHex(commit.update.toBytes()),
-                                      ByteBufferUtil.bytesToHex(commit.key),
-                                      commit.update.id()));
+        String cql = "UPDATE system.%s USING TIMESTAMP ? AND TTL ? SET proposal_ballot = null, proposal = null, most_recent_commit_at = ?, most_recent_commit = ? WHERE row_key = ? AND cf_id = ?";
+        executeInternal(String.format(cql, PAXOS_CF),
+                        UUIDGen.microsTimestamp(commit.ballot),
+                        paxosTtl(commit.update.metadata),
+                        commit.ballot,
+                        commit.update.toBytes(),
+                        commit.key,
+                        commit.update.id());
     }
 
     /**
@@ -963,12 +935,8 @@ public class SystemKeyspace
      */
     public static RestorableMeter getSSTableReadMeter(String keyspace, String table, int generation)
     {
-        String cql = "SELECT * FROM %s WHERE keyspace_name='%s' and columnfamily_name='%s' and generation=%d";
-        UntypedResultSet results = processInternal(String.format(cql,
-                                                                 SSTABLE_ACTIVITY_CF,
-                                                                 keyspace,
-                                                                 table,
-                                                                 generation));
+        String cql = "SELECT * FROM system.%s WHERE keyspace_name=? and columnfamily_name=? and generation=?";
+        UntypedResultSet results = executeInternal(String.format(cql, SSTABLE_ACTIVITY_CF), keyspace, table, generation);
 
         if (results.isEmpty())
             return new RestorableMeter();
@@ -985,14 +953,13 @@ public class SystemKeyspace
     public static void persistSSTableReadMeter(String keyspace, String table, int generation, RestorableMeter meter)
     {
         // Store values with a one-day TTL to handle corner cases where cleanup might not occur
-        String cql = "INSERT INTO %s (keyspace_name, columnfamily_name, generation, rate_15m, rate_120m) VALUES ('%s', '%s', %d, %f, %f) USING TTL 864000";
-        processInternal(String.format(cql,
-                                      SSTABLE_ACTIVITY_CF,
-                                      keyspace,
-                                      table,
-                                      generation,
-                                      meter.fifteenMinuteRate(),
-                                      meter.twoHourRate()));
+        String cql = "INSERT INTO system.%s (keyspace_name, columnfamily_name, generation, rate_15m, rate_120m) VALUES (?, ?, ?, ?, ?) USING TTL 864000";
+        executeInternal(String.format(cql, SSTABLE_ACTIVITY_CF),
+                        keyspace,
+                        table,
+                        generation,
+                        meter.fifteenMinuteRate(),
+                        meter.twoHourRate());
     }
 
     /**
@@ -1000,7 +967,7 @@ public class SystemKeyspace
      */
     public static void clearSSTableReadMeter(String keyspace, String table, int generation)
     {
-        String cql = "DELETE FROM %s WHERE keyspace_name='%s' AND columnfamily_name='%s' and generation=%d";
-        processInternal(String.format(cql, SSTABLE_ACTIVITY_CF, keyspace, table, generation));
+        String cql = "DELETE FROM system.%s WHERE keyspace_name=? AND columnfamily_name=? and generation=?";
+        executeInternal(String.format(cql, SSTABLE_ACTIVITY_CF), keyspace, table, generation);
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1147ee3a/src/java/org/apache/cassandra/service/ScheduledRangeTransferExecutorService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/ScheduledRangeTransferExecutorService.java b/src/java/org/apache/cassandra/service/ScheduledRangeTransferExecutorService.java
index 5591ea4..860619a 100644
--- a/src/java/org/apache/cassandra/service/ScheduledRangeTransferExecutorService.java
+++ b/src/java/org/apache/cassandra/service/ScheduledRangeTransferExecutorService.java
@@ -17,7 +17,7 @@
  */
 package org.apache.cassandra.service;
 
-import static org.apache.cassandra.cql3.QueryProcessor.processInternal;
+import static org.apache.cassandra.cql3.QueryProcessor.executeInternal;
 
 import java.nio.ByteBuffer;
 import java.util.Collections;
@@ -74,7 +74,7 @@ class RangeTransfer implements Runnable
 
     public void run()
     {
-        UntypedResultSet res = processInternal("SELECT * FROM system." + SystemKeyspace.RANGE_XFERS_CF);
+        UntypedResultSet res = executeInternal("SELECT * FROM system." + SystemKeyspace.RANGE_XFERS_CF);
 
         if (res.size() < 1)
         {
@@ -103,9 +103,7 @@ class RangeTransfer implements Runnable
         finally
         {
             LOG.debug("Removing queued entry for transfer of {}", token);
-            processInternal(String.format("DELETE FROM system.%s WHERE token_bytes = 0x%s",
-                                          SystemKeyspace.RANGE_XFERS_CF,
-                                          ByteBufferUtil.bytesToHex(tokenBytes)));
+            executeInternal(String.format("DELETE FROM system.%s WHERE token_bytes = ?", SystemKeyspace.RANGE_XFERS_CF), tokenBytes);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1147ee3a/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index e29530a..2425baf 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -1431,23 +1431,30 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
             switch (state)
             {
                 case RELEASE_VERSION:
-                    SystemKeyspace.updatePeerInfo(endpoint, "release_version", quote(value.value));
+                    SystemKeyspace.updatePeerInfo(endpoint, "release_version", value.value);
                     break;
                 case DC:
-                    SystemKeyspace.updatePeerInfo(endpoint, "data_center", quote(value.value));
+                    SystemKeyspace.updatePeerInfo(endpoint, "data_center", value.value);
                     break;
                 case RACK:
-                    SystemKeyspace.updatePeerInfo(endpoint, "rack", quote(value.value));
+                    SystemKeyspace.updatePeerInfo(endpoint, "rack", value.value);
                     break;
                 case RPC_ADDRESS:
-                    SystemKeyspace.updatePeerInfo(endpoint, "rpc_address", quote(value.value));
+                    try
+                    {
+                        SystemKeyspace.updatePeerInfo(endpoint, "rpc_address", InetAddress.getByName(value.value));
+                    }
+                    catch (UnknownHostException e)
+                    {
+                        throw new RuntimeException(e);
+                    }
                     break;
                 case SCHEMA:
-                    SystemKeyspace.updatePeerInfo(endpoint, "schema_version", value.value);
+                    SystemKeyspace.updatePeerInfo(endpoint, "schema_version", UUID.fromString(value.value));
                     MigrationManager.instance.scheduleSchemaPull(endpoint, epState);
                     break;
                 case HOST_ID:
-                    SystemKeyspace.updatePeerInfo(endpoint, "host_id", value.value);
+                    SystemKeyspace.updatePeerInfo(endpoint, "host_id", UUID.fromString(value.value));
                     break;
             }
         }
@@ -1461,32 +1468,34 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
             switch (entry.getKey())
             {
                 case RELEASE_VERSION:
-                    SystemKeyspace.updatePeerInfo(endpoint, "release_version", quote(entry.getValue().value));
+                    SystemKeyspace.updatePeerInfo(endpoint, "release_version", entry.getValue().value);
                     break;
                 case DC:
-                    SystemKeyspace.updatePeerInfo(endpoint, "data_center", quote(entry.getValue().value));
+                    SystemKeyspace.updatePeerInfo(endpoint, "data_center", entry.getValue().value);
                     break;
                 case RACK:
-                    SystemKeyspace.updatePeerInfo(endpoint, "rack", quote(entry.getValue().value));
+                    SystemKeyspace.updatePeerInfo(endpoint, "rack", entry.getValue().value);
                     break;
                 case RPC_ADDRESS:
-                    SystemKeyspace.updatePeerInfo(endpoint, "rpc_address", quote(entry.getValue().value));
+                    try
+                    {
+                        SystemKeyspace.updatePeerInfo(endpoint, "rpc_address", InetAddress.getByName(entry.getValue().value));
+                    }
+                    catch (UnknownHostException e)
+                    {
+                        throw new RuntimeException(e);
+                    }
                     break;
                 case SCHEMA:
-                    SystemKeyspace.updatePeerInfo(endpoint, "schema_version", entry.getValue().value);
+                    SystemKeyspace.updatePeerInfo(endpoint, "schema_version", UUID.fromString(entry.getValue().value));
                     break;
                 case HOST_ID:
-                    SystemKeyspace.updatePeerInfo(endpoint, "host_id", entry.getValue().value);
+                    SystemKeyspace.updatePeerInfo(endpoint, "host_id", UUID.fromString(entry.getValue().value));
                     break;
             }
         }
     }
 
-    private String quote(String value)
-    {
-        return "'" + value + "'";
-    }
-
     private byte[] getApplicationStateValue(InetAddress endpoint, ApplicationState appstate)
     {
         String vvalue = Gossiper.instance.getEndpointStateForEndpoint(endpoint).getApplicationState(appstate).value;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1147ee3a/test/unit/org/apache/cassandra/db/BatchlogManagerTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/BatchlogManagerTest.java b/test/unit/org/apache/cassandra/db/BatchlogManagerTest.java
index 9982be9..e0eae6b 100644
--- a/test/unit/org/apache/cassandra/db/BatchlogManagerTest.java
+++ b/test/unit/org/apache/cassandra/db/BatchlogManagerTest.java
@@ -93,7 +93,7 @@ public class BatchlogManagerTest extends SchemaLoader
 
         for (int i = 0; i < 1000; i++)
         {
-            UntypedResultSet result = QueryProcessor.processInternal(String.format("SELECT * FROM \"Keyspace1\".\"Standard1\" WHERE key = intAsBlob(%d)", i));
+            UntypedResultSet result = QueryProcessor.executeInternal(String.format("SELECT * FROM \"Keyspace1\".\"Standard1\" WHERE key = intAsBlob(%d)", i));
             if (i < 500)
             {
                 assertEquals(bytes(i), result.one().getBytes("key"));
@@ -107,7 +107,7 @@ public class BatchlogManagerTest extends SchemaLoader
         }
 
         // Ensure that no stray mutations got somehow applied.
-        UntypedResultSet result = QueryProcessor.processInternal(String.format("SELECT count(*) FROM \"Keyspace1\".\"Standard1\""));
+        UntypedResultSet result = QueryProcessor.executeInternal(String.format("SELECT count(*) FROM \"Keyspace1\".\"Standard1\""));
         assertEquals(500, result.one().getLong("count"));
     }
 
@@ -157,7 +157,7 @@ public class BatchlogManagerTest extends SchemaLoader
         // We should see half of Standard2-targeted mutations written after the replay and all of Standard3 mutations applied.
         for (int i = 0; i < 1000; i++)
         {
-            UntypedResultSet result = QueryProcessor.processInternal(String.format("SELECT * FROM \"Keyspace1\".\"Standard2\" WHERE key = intAsBlob(%d)", i));
+            UntypedResultSet result = QueryProcessor.executeInternal(String.format("SELECT * FROM \"Keyspace1\".\"Standard2\" WHERE key = intAsBlob(%d)", i));
             if (i >= 500)
             {
                 assertEquals(bytes(i), result.one().getBytes("key"));
@@ -172,7 +172,7 @@ public class BatchlogManagerTest extends SchemaLoader
 
         for (int i = 0; i < 1000; i++)
         {
-            UntypedResultSet result = QueryProcessor.processInternal(String.format("SELECT * FROM \"Keyspace1\".\"Standard3\" WHERE key = intAsBlob(%d)", i));
+            UntypedResultSet result = QueryProcessor.executeInternal(String.format("SELECT * FROM \"Keyspace1\".\"Standard3\" WHERE key = intAsBlob(%d)", i));
             assertEquals(bytes(i), result.one().getBytes("key"));
             assertEquals(bytes(i), result.one().getBytes("column1"));
             assertEquals(bytes(i), result.one().getBytes("value"));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1147ee3a/test/unit/org/apache/cassandra/db/HintedHandOffTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/HintedHandOffTest.java b/test/unit/org/apache/cassandra/db/HintedHandOffTest.java
index 622c816..9bc0724 100644
--- a/test/unit/org/apache/cassandra/db/HintedHandOffTest.java
+++ b/test/unit/org/apache/cassandra/db/HintedHandOffTest.java
@@ -40,7 +40,7 @@ import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
 
 import static org.junit.Assert.assertEquals;
-import static org.apache.cassandra.cql3.QueryProcessor.processInternal;
+import static org.apache.cassandra.cql3.QueryProcessor.executeInternal;
 
 public class HintedHandOffTest extends SchemaLoader
 {
@@ -92,7 +92,7 @@ public class HintedHandOffTest extends SchemaLoader
             HintedHandOffManager.instance.metrics.incrPastWindow(InetAddress.getLocalHost());
         HintedHandOffManager.instance.metrics.log();
 
-        UntypedResultSet rows = processInternal("SELECT hints_dropped FROM system." + SystemKeyspace.PEER_EVENTS_CF);
+        UntypedResultSet rows = executeInternal("SELECT hints_dropped FROM system." + SystemKeyspace.PEER_EVENTS_CF);
         Map<UUID, Integer> returned = rows.one().getMap("hints_dropped", UUIDType.instance, Int32Type.instance);
         assertEquals(Iterators.getLast(returned.values().iterator()).intValue(), 99);
     }
@@ -129,7 +129,7 @@ public class HintedHandOffTest extends SchemaLoader
     private int getNoOfHints()
     {
         String req = "SELECT * FROM system.%s";
-        UntypedResultSet resultSet = processInternal(String.format(req, SystemKeyspace.HINTS_CF));
+        UntypedResultSet resultSet = executeInternal(String.format(req, SystemKeyspace.HINTS_CF));
         return resultSet.size();
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1147ee3a/test/unit/org/apache/cassandra/db/ScrubTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/ScrubTest.java b/test/unit/org/apache/cassandra/db/ScrubTest.java
index 220e2a4..571fe0e 100644
--- a/test/unit/org/apache/cassandra/db/ScrubTest.java
+++ b/test/unit/org/apache/cassandra/db/ScrubTest.java
@@ -288,7 +288,7 @@ public class ScrubTest extends SchemaLoader
         Keyspace keyspace = Keyspace.open("Keyspace1");
         ColumnFamilyStore cfs = keyspace.getColumnFamilyStore("test_compact_static_columns");
 
-        QueryProcessor.processInternal("INSERT INTO \"Keyspace1\".test_compact_static_columns (a, b, c, d) VALUES (123, c3db07e8-b602-11e3-bc6b-e0b9a54a6d93, true, 'foobar')");
+        QueryProcessor.executeInternal("INSERT INTO \"Keyspace1\".test_compact_static_columns (a, b, c, d) VALUES (123, c3db07e8-b602-11e3-bc6b-e0b9a54a6d93, true, 'foobar')");
         cfs.forceBlockingFlush();
         CompactionManager.instance.performScrub(cfs, false);
     }
@@ -324,14 +324,14 @@ public class ScrubTest extends SchemaLoader
         Keyspace keyspace = Keyspace.open("Keyspace1");
         ColumnFamilyStore cfs = keyspace.getColumnFamilyStore("test_compact_dynamic_columns");
 
-        QueryProcessor.processInternal("INSERT INTO \"Keyspace1\".test_compact_dynamic_columns (a, b, c) VALUES (0, 'a', 'foo')");
-        QueryProcessor.processInternal("INSERT INTO \"Keyspace1\".test_compact_dynamic_columns (a, b, c) VALUES (0, 'b', 'bar')");
-        QueryProcessor.processInternal("INSERT INTO \"Keyspace1\".test_compact_dynamic_columns (a, b, c) VALUES (0, 'c', 'boo')");
+        QueryProcessor.executeInternal("INSERT INTO \"Keyspace1\".test_compact_dynamic_columns (a, b, c) VALUES (0, 'a', 'foo')");
+        QueryProcessor.executeInternal("INSERT INTO \"Keyspace1\".test_compact_dynamic_columns (a, b, c) VALUES (0, 'b', 'bar')");
+        QueryProcessor.executeInternal("INSERT INTO \"Keyspace1\".test_compact_dynamic_columns (a, b, c) VALUES (0, 'c', 'boo')");
         cfs.forceBlockingFlush();
         CompactionManager.instance.performScrub(cfs, true);
 
         // Scrub is silent, but it will remove broken records. So reading everything back to make sure nothing to "scrubbed away"
-        UntypedResultSet rs = QueryProcessor.processInternal("SELECT * FROM \"Keyspace1\".test_compact_dynamic_columns");
+        UntypedResultSet rs = QueryProcessor.executeInternal("SELECT * FROM \"Keyspace1\".test_compact_dynamic_columns");
         assertEquals(3, rs.size());
 
         Iterator<UntypedResultSet.Row> iter = rs.iterator();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1147ee3a/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java
index 751e7ae..912c7f1 100644
--- a/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java
@@ -37,7 +37,7 @@ import static org.apache.cassandra.db.KeyspaceTest.assertColumns;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
-import static org.apache.cassandra.cql3.QueryProcessor.processInternal;
+import static org.apache.cassandra.cql3.QueryProcessor.executeInternal;
 
 import static org.apache.cassandra.Util.cellname;
 import org.apache.cassandra.utils.ByteBufferUtil;
@@ -329,20 +329,20 @@ public class CompactionsPurgeTest extends SchemaLoader
         cfs.disableAutoCompaction();
 
         // write a row out to one sstable
-        processInternal(String.format("INSERT INTO %s.%s (k, v1, v2) VALUES (%d, '%s', %d)",
+        executeInternal(String.format("INSERT INTO %s.%s (k, v1, v2) VALUES (%d, '%s', %d)",
                                       keyspace, table, 1, "foo", 1));
         cfs.forceBlockingFlush();
 
-        UntypedResultSet result = processInternal(String.format("SELECT * FROM %s.%s WHERE k = %d", keyspace, table, 1));
+        UntypedResultSet result = executeInternal(String.format("SELECT * FROM %s.%s WHERE k = %d", keyspace, table, 1));
         assertEquals(1, result.size());
 
         // write a row tombstone out to a second sstable
-        processInternal(String.format("DELETE FROM %s.%s WHERE k = %d", keyspace, table, 1));
+        executeInternal(String.format("DELETE FROM %s.%s WHERE k = %d", keyspace, table, 1));
         cfs.forceBlockingFlush();
 
         // basic check that the row is considered deleted
         assertEquals(2, cfs.getSSTables().size());
-        result = processInternal(String.format("SELECT * FROM %s.%s WHERE k = %d", keyspace, table, 1));
+        result = executeInternal(String.format("SELECT * FROM %s.%s WHERE k = %d", keyspace, table, 1));
         assertEquals(0, result.size());
 
         // compact the two sstables with a gcBefore that does *not* allow the row tombstone to be purged
@@ -351,19 +351,19 @@ public class CompactionsPurgeTest extends SchemaLoader
 
         // the data should be gone, but the tombstone should still exist
         assertEquals(1, cfs.getSSTables().size());
-        result = processInternal(String.format("SELECT * FROM %s.%s WHERE k = %d", keyspace, table, 1));
+        result = executeInternal(String.format("SELECT * FROM %s.%s WHERE k = %d", keyspace, table, 1));
         assertEquals(0, result.size());
 
         // write a row out to one sstable
-        processInternal(String.format("INSERT INTO %s.%s (k, v1, v2) VALUES (%d, '%s', %d)",
+        executeInternal(String.format("INSERT INTO %s.%s (k, v1, v2) VALUES (%d, '%s', %d)",
                                       keyspace, table, 1, "foo", 1));
         cfs.forceBlockingFlush();
         assertEquals(2, cfs.getSSTables().size());
-        result = processInternal(String.format("SELECT * FROM %s.%s WHERE k = %d", keyspace, table, 1));
+        result = executeInternal(String.format("SELECT * FROM %s.%s WHERE k = %d", keyspace, table, 1));
         assertEquals(1, result.size());
 
         // write a row tombstone out to a different sstable
-        processInternal(String.format("DELETE FROM %s.%s WHERE k = %d", keyspace, table, 1));
+        executeInternal(String.format("DELETE FROM %s.%s WHERE k = %d", keyspace, table, 1));
         cfs.forceBlockingFlush();
 
         // compact the two sstables with a gcBefore that *does* allow the row tombstone to be purged
@@ -372,7 +372,7 @@ public class CompactionsPurgeTest extends SchemaLoader
 
         // both the data and the tombstone should be gone this time
         assertEquals(0, cfs.getSSTables().size());
-        result = processInternal(String.format("SELECT * FROM %s.%s WHERE k = %d", keyspace, table, 1));
+        result = executeInternal(String.format("SELECT * FROM %s.%s WHERE k = %d", keyspace, table, 1));
         assertEquals(0, result.size());
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1147ee3a/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterTest.java b/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterTest.java
index c35a1df..f60e173 100644
--- a/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterTest.java
@@ -93,7 +93,7 @@ public class CQLSSTableWriterTest
 
         loader.stream().get();
 
-        UntypedResultSet rs = QueryProcessor.processInternal("SELECT * FROM cql_keyspace.table1;");
+        UntypedResultSet rs = QueryProcessor.executeInternal("SELECT * FROM cql_keyspace.table1;");
         assertEquals(4, rs.size());
 
         Iterator<UntypedResultSet.Row> iter = rs.iterator();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1147ee3a/test/unit/org/apache/cassandra/service/LeaveAndBootstrapTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/LeaveAndBootstrapTest.java b/test/unit/org/apache/cassandra/service/LeaveAndBootstrapTest.java
index ff5a394..eef8c86 100644
--- a/test/unit/org/apache/cassandra/service/LeaveAndBootstrapTest.java
+++ b/test/unit/org/apache/cassandra/service/LeaveAndBootstrapTest.java
@@ -666,8 +666,8 @@ public class LeaveAndBootstrapTest
         Util.createInitialRing(ss, partitioner, endpointTokens, new ArrayList<Token>(), hosts, new ArrayList<UUID>(), 2);
 
         InetAddress toRemove = hosts.get(1);
-        SystemKeyspace.updatePeerInfo(toRemove, "data_center", "'dc42'");
-        SystemKeyspace.updatePeerInfo(toRemove, "rack", "'rack42'");
+        SystemKeyspace.updatePeerInfo(toRemove, "data_center", "dc42");
+        SystemKeyspace.updatePeerInfo(toRemove, "rack", "rack42");
         assertEquals("rack42", SystemKeyspace.loadDcRackInfo().get(toRemove).get("rack"));
 
         // mark the node as removed

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1147ee3a/test/unit/org/apache/cassandra/service/QueryPagerTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/QueryPagerTest.java b/test/unit/org/apache/cassandra/service/QueryPagerTest.java
index abd030d..e71e97a 100644
--- a/test/unit/org/apache/cassandra/service/QueryPagerTest.java
+++ b/test/unit/org/apache/cassandra/service/QueryPagerTest.java
@@ -38,7 +38,7 @@ import org.apache.cassandra.service.pager.*;
 import org.apache.cassandra.utils.ByteBufferUtil;
 
 import static org.junit.Assert.*;
-import static org.apache.cassandra.cql3.QueryProcessor.processInternal;
+import static org.apache.cassandra.cql3.QueryProcessor.executeInternal;
 import static org.apache.cassandra.Util.range;
 import static org.apache.cassandra.utils.ByteBufferUtil.bytes;
 
@@ -341,7 +341,7 @@ public class QueryPagerTest extends SchemaLoader
 
         // Insert rows but with a tombstone as last cell
         for (int i = 0; i < 5; i++)
-            processInternal(String.format("INSERT INTO %s.%s (k, c, v) VALUES ('k%d', 'c%d', null)", keyspace, table, 0, i));
+            executeInternal(String.format("INSERT INTO %s.%s (k, c, v) VALUES ('k%d', 'c%d', null)", keyspace, table, 0, i));
 
         SliceQueryFilter filter = new SliceQueryFilter(ColumnSlice.ALL_COLUMNS_ARRAY, false, 100);
         QueryPager pager = QueryPagers.localPager(new SliceFromReadCommand(keyspace, bytes("k0"), table, 0, filter));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1147ee3a/test/unit/org/apache/cassandra/triggers/TriggersTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/triggers/TriggersTest.java b/test/unit/org/apache/cassandra/triggers/TriggersTest.java
index ee97d8a..74fde69 100644
--- a/test/unit/org/apache/cassandra/triggers/TriggersTest.java
+++ b/test/unit/org/apache/cassandra/triggers/TriggersTest.java
@@ -289,7 +289,7 @@ public class TriggersTest extends SchemaLoader
 
     private void assertUpdateIsAugmented(int key)
     {
-        UntypedResultSet rs = QueryProcessor.processInternal(
+        UntypedResultSet rs = QueryProcessor.executeInternal(
                                 String.format("SELECT * FROM %s.%s WHERE k=%s", ksName, cfName, key));
         assertTrue(String.format("Expected value (%s) for augmented cell v2 was not found", key), rs.one().has("v2"));
         assertEquals(999, rs.one().getInt("v2"));
@@ -297,7 +297,7 @@ public class TriggersTest extends SchemaLoader
 
     private void assertUpdateNotExecuted(String cf, int key)
     {
-        UntypedResultSet rs = QueryProcessor.processInternal(
+        UntypedResultSet rs = QueryProcessor.executeInternal(
                 String.format("SELECT * FROM %s.%s WHERE k=%s", ksName, cf, key));
         assertTrue(rs.isEmpty());
     }


Mime
View raw message