cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From slebre...@apache.org
Subject [1/3] cassandra git commit: Fix JSON update with prepared statements
Date Thu, 26 Nov 2015 09:23:07 GMT
Repository: cassandra
Updated Branches:
  refs/heads/cassandra-3.0 71bca7800 -> 2cbd77625


Fix JSON update with prepared statements

patch by slebresne; reviewed by thobbs for CASSANDRA-10631


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/31be903a
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/31be903a
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/31be903a

Branch: refs/heads/cassandra-3.0
Commit: 31be903a74864e58980e0e22fd993a280c9138b3
Parents: 96b7603
Author: Sylvain Lebresne <sylvain@datastax.com>
Authored: Mon Nov 2 13:18:59 2015 +0100
Committer: Sylvain Lebresne <sylvain@datastax.com>
Committed: Thu Nov 26 10:11:55 2015 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 src/java/org/apache/cassandra/cql3/Json.java    | 25 +--------
 .../org/apache/cassandra/cql3/QueryOptions.java | 52 +++++++++++++++++--
 .../apache/cassandra/cql3/QueryProcessor.java   | 54 ++++++++++++++++----
 .../org/apache/cassandra/cql3/CQLTester.java    | 22 +++++++-
 5 files changed, 115 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/31be903a/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index a548c9f..7b32c2b 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.2.4
+ * Fix JSON update with prepared statements (CASSANDRA-10631)
  * Don't do anticompaction after subrange repair (CASSANDRA-10422)
  * Fix SimpleDateType type compatibility (CASSANDRA-10027)
  * (Hadoop) fix splits calculation (CASSANDRA-10640)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/31be903a/src/java/org/apache/cassandra/cql3/Json.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/Json.java b/src/java/org/apache/cassandra/cql3/Json.java
index e4bce29..5284793 100644
--- a/src/java/org/apache/cassandra/cql3/Json.java
+++ b/src/java/org/apache/cassandra/cql3/Json.java
@@ -156,8 +156,6 @@ public class Json
         private final int bindIndex;
         private final Collection<ColumnDefinition> columns;
 
-        private Map<ColumnIdentifier, Term> columnMap;
-
         public PreparedMarker(String keyspace, int bindIndex, Collection<ColumnDefinition>
columns)
         {
             super(keyspace);
@@ -169,24 +167,6 @@ public class Json
         {
             return new DelayedColumnValue(this, def);
         }
-
-        public void bind(QueryOptions options) throws InvalidRequestException
-        {
-            // this will be called once per column, so avoid duplicating work
-            if (columnMap != null)
-                return;
-
-            ByteBuffer value = options.getValues().get(bindIndex);
-            if (value == null)
-                throw new InvalidRequestException("Got null for INSERT JSON values");
-
-            columnMap = parseJson(UTF8Type.instance.getSerializer().deserialize(value), columns);
-        }
-
-        public Term getValue(ColumnDefinition def)
-        {
-            return columnMap.get(def.name);
-        }
     }
 
     /**
@@ -260,8 +240,7 @@ public class Json
         @Override
         public Terminal bind(QueryOptions options) throws InvalidRequestException
         {
-            marker.bind(options);
-            Term term = marker.getValue(column);
+            Term term = options.getJsonColumnValue(marker.bindIndex, column.name, marker.columns);
             return term == null ? null : term.bind(options);
         }
 
@@ -275,7 +254,7 @@ public class Json
     /**
      * Given a JSON string, return a map of columns to their values for the insert.
      */
-    private static Map<ColumnIdentifier, Term> parseJson(String jsonString, Collection<ColumnDefinition>
expectedReceivers)
+    public static Map<ColumnIdentifier, Term> parseJson(String jsonString, Collection<ColumnDefinition>
expectedReceivers)
     {
         try
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/31be903a/src/java/org/apache/cassandra/cql3/QueryOptions.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/QueryOptions.java b/src/java/org/apache/cassandra/cql3/QueryOptions.java
index 7fc0997..be773e1 100644
--- a/src/java/org/apache/cassandra/cql3/QueryOptions.java
+++ b/src/java/org/apache/cassandra/cql3/QueryOptions.java
@@ -18,15 +18,15 @@
 package org.apache.cassandra.cql3;
 
 import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.EnumSet;
-import java.util.List;
+import java.util.*;
 
 import com.google.common.collect.ImmutableList;
-
 import io.netty.buffer.ByteBuf;
+
+import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.db.marshal.UTF8Type;
+import org.apache.cassandra.exceptions.InvalidRequestException;
 import org.apache.cassandra.service.QueryState;
 import org.apache.cassandra.service.pager.PagingState;
 import org.apache.cassandra.transport.CBCodec;
@@ -48,6 +48,9 @@ public abstract class QueryOptions
 
     public static final CBCodec<QueryOptions> codec = new Codec();
 
+    // A cache of bind values parsed as JSON, see getJsonColumnValue for details.
+    private List<Map<ColumnIdentifier, Term>> jsonValuesCache;
+
     public static QueryOptions fromProtocolV1(ConsistencyLevel consistency, List<ByteBuffer>
values)
     {
         return new DefaultQueryOptions(consistency, values, false, SpecificOptions.DEFAULT,
Server.VERSION_1);
@@ -93,6 +96,45 @@ public abstract class QueryOptions
     public abstract boolean skipMetadata();
 
     /**
+     * Returns the term corresponding to column {@code columnName} in the JSON value of bind
index {@code bindIndex}.
+     *
+     * This is functionally equivalent to:
+     *   {@code Json.parseJson(UTF8Type.instance.getSerializer().deserialize(getValues().get(bindIndex)),
expectedReceivers).get(columnName)}
+     * but this cache the result of parsing the JSON so that while this might be called for
multiple columns on the same {@code bindIndex}
+     * value, the underlying JSON value is only parsed/processed once.
+     *
+     * Note: this is a bit more involved in CQL specifics than this class generally is but
we as we need to cache this per-query and in an object
+     * that is available when we bind values, this is the easier place to have this.
+     *
+     * @param bindIndex the index of the bind value that should be interpreted as a JSON
value.
+     * @param columnName the name of the column we want the value of.
+     * @param expectedReceivers the columns expected in the JSON value at index {@code bindIndex}.
This is only used when parsing the
+     * json initially and no check is done afterwards. So in practice, any call of this method
on the same QueryOptions object and with the same
+     * {@code bindIndx} values should use the same value for this parameter, but this isn't
validated in any way.
+     *
+     * @return the value correspong to column {@code columnName} in the (JSON) bind value
at index {@code bindIndex}. This may return null if the
+     * JSON value has no value for this column.
+     */
+    public Term getJsonColumnValue(int bindIndex, ColumnIdentifier columnName, Collection<ColumnDefinition>
expectedReceivers) throws InvalidRequestException
+    {
+        if (jsonValuesCache == null)
+            jsonValuesCache = new ArrayList<>(Collections.<Map<ColumnIdentifier,
Term>>nCopies(getValues().size(), null));
+
+        Map<ColumnIdentifier, Term> jsonValue = jsonValuesCache.get(bindIndex);
+        if (jsonValue == null)
+        {
+            ByteBuffer value = getValues().get(bindIndex);
+            if (value == null)
+                throw new InvalidRequestException("Got null for INSERT JSON values");
+
+            jsonValue = Json.parseJson(UTF8Type.instance.getSerializer().deserialize(value),
expectedReceivers);
+            jsonValuesCache.set(bindIndex, jsonValue);
+        }
+
+        return jsonValue.get(columnName);
+    }
+
+    /**
      * Tells whether or not this <code>QueryOptions</code> contains the column
specifications for the bound variables.
      * <p>The column specifications will be present only for prepared statements.</p>
      * @return <code>true</code> this <code>QueryOptions</code> contains
the column specifications for the bound

http://git-wip-us.apache.org/repos/asf/cassandra/blob/31be903a/src/java/org/apache/cassandra/cql3/QueryProcessor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/QueryProcessor.java b/src/java/org/apache/cassandra/cql3/QueryProcessor.java
index 30a111d..fa82fa7 100644
--- a/src/java/org/apache/cassandra/cql3/QueryProcessor.java
+++ b/src/java/org/apache/cassandra/cql3/QueryProcessor.java
@@ -24,6 +24,7 @@ import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Predicate;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Iterators;
@@ -544,6 +545,15 @@ public class QueryProcessor implements QueryHandler
         return meter.measureDeep(key);
     }
 
+    /**
+     * Clear our internal statmeent cache for test purposes.
+     */
+    @VisibleForTesting
+    public static void clearInternalStatementsCache()
+    {
+        internalStatements.clear();
+    }
+
     private static class MigrationSubscriber extends MigrationListener
     {
         private void removeInvalidPreparedStatements(String ksName, String cfName)
@@ -597,22 +607,23 @@ public class QueryProcessor implements QueryHandler
             return ksName.equals(statementKsName) && (cfName == null || cfName.equals(statementCfName));
         }
 
-        public void onCreateFunction(String ksName, String functionName, List<AbstractType<?>>
argTypes) {
+        public void onCreateFunction(String ksName, String functionName, List<AbstractType<?>>
argTypes)
+        {
             if (Functions.getOverloadCount(new FunctionName(ksName, functionName)) > 1)
             {
                 // in case there are other overloads, we have to remove all overloads since
argument type
                 // matching may change (due to type casting)
-                removeInvalidPreparedStatementsForFunction(preparedStatements.values().iterator(),
ksName, functionName);
-                removeInvalidPreparedStatementsForFunction(thriftPreparedStatements.values().iterator(),
ksName, functionName);
+                removeAllInvalidPreparedStatementsForFunction(ksName, functionName);
             }
         }
-        public void onCreateAggregate(String ksName, String aggregateName, List<AbstractType<?>>
argTypes) {
+
+        public void onCreateAggregate(String ksName, String aggregateName, List<AbstractType<?>>
argTypes)
+        {
             if (Functions.getOverloadCount(new FunctionName(ksName, aggregateName)) >
1)
             {
                 // in case there are other overloads, we have to remove all overloads since
argument type
                 // matching may change (due to type casting)
-                removeInvalidPreparedStatementsForFunction(preparedStatements.values().iterator(),
ksName, aggregateName);
-                removeInvalidPreparedStatementsForFunction(thriftPreparedStatements.values().iterator(),
ksName, aggregateName);
+                removeAllInvalidPreparedStatementsForFunction(ksName, aggregateName);
             }
         }
 
@@ -623,6 +634,24 @@ public class QueryProcessor implements QueryHandler
                 removeInvalidPreparedStatements(ksName, cfName);
         }
 
+        public void onUpdateFunction(String ksName, String functionName, List<AbstractType<?>>
argTypes)
+        {
+            // Updating a function may imply we've changed the body of the function, so we
need to invalid statements so that
+            // the new definition is picked (the function is resolved at preparation time).
+            // TODO: if the function has multiple overload, we could invalidate only the
statement refering to the overload
+            // that was updated. This requires a few changes however and probably doesn't
matter much in practice.
+            removeAllInvalidPreparedStatementsForFunction(ksName, functionName);
+        }
+
+        public void onUpdateAggregate(String ksName, String aggregateName, List<AbstractType<?>>
argTypes)
+        {
+            // Updating a function may imply we've changed the body of the function, so we
need to invalid statements so that
+            // the new definition is picked (the function is resolved at preparation time).
+            // TODO: if the function has multiple overload, we could invalidate only the
statement refering to the overload
+            // that was updated. This requires a few changes however and probably doesn't
matter much in practice.
+            removeAllInvalidPreparedStatementsForFunction(ksName, aggregateName);
+        }
+
         public void onDropKeyspace(String ksName)
         {
             logger.trace("Keyspace {} was dropped, invalidating related prepared statements",
ksName);
@@ -637,14 +666,19 @@ public class QueryProcessor implements QueryHandler
 
         public void onDropFunction(String ksName, String functionName, List<AbstractType<?>>
argTypes)
         {
-            removeInvalidPreparedStatementsForFunction(preparedStatements.values().iterator(),
ksName, functionName);
-            removeInvalidPreparedStatementsForFunction(thriftPreparedStatements.values().iterator(),
ksName, functionName);
+            removeAllInvalidPreparedStatementsForFunction(ksName, functionName);
         }
 
         public void onDropAggregate(String ksName, String aggregateName, List<AbstractType<?>>
argTypes)
         {
-            removeInvalidPreparedStatementsForFunction(preparedStatements.values().iterator(),
ksName, aggregateName);
-            removeInvalidPreparedStatementsForFunction(thriftPreparedStatements.values().iterator(),
ksName, aggregateName);
+            removeAllInvalidPreparedStatementsForFunction(ksName, aggregateName);
+        }
+
+        private void removeAllInvalidPreparedStatementsForFunction(String ksName, String
functionName)
+        {
+            removeInvalidPreparedStatementsForFunction(internalStatements.values().iterator(),
ksName, functionName);
+            removeInvalidPreparedStatementsForFunction(preparedStatements.values().iterator(),
ksName, functionName);
+            removeInvalidPreparedStatementsForFunction(thriftPreparedStatements.values().iterator(),
ksName, functionName);
         }
 
         private static void removeInvalidPreparedStatementsForFunction(Iterator<ParsedStatement.Prepared>
statements,

http://git-wip-us.apache.org/repos/asf/cassandra/blob/31be903a/test/unit/org/apache/cassandra/cql3/CQLTester.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/CQLTester.java b/test/unit/org/apache/cassandra/cql3/CQLTester.java
index 4b4631e..5e17d1b 100644
--- a/test/unit/org/apache/cassandra/cql3/CQLTester.java
+++ b/test/unit/org/apache/cassandra/cql3/CQLTester.java
@@ -136,6 +136,7 @@ public abstract class CQLTester
     // We don't use USE_PREPARED_VALUES in the code below so some test can foce value preparation
(if the result
     // is not expected to be the same without preparation)
     private boolean usePrepared = USE_PREPARED_VALUES;
+    private static final boolean reusePrepared = Boolean.valueOf(System.getProperty("cassandra.test.reuse_prepared",
"true"));
 
     @BeforeClass
     public static void setUpClass()
@@ -158,6 +159,11 @@ public abstract class CQLTester
 
         if (server != null)
             server.stop();
+
+        // We use queryInternal for CQLTester so prepared statement will populate our internal
cache (if reusePrepared is used; otherwise prepared
+        // statements are not cached but re-prepared every time). So we clear the cache between
test files to avoid accumulating too much.
+        if (reusePrepared)
+            QueryProcessor.clearInternalStatementsCache();
     }
 
     @Before
@@ -571,7 +577,21 @@ public abstract class CQLTester
         if (usePrepared)
         {
             logger.info("Executing: {} with values {}", query, formatAllValues(values));
-            rs = QueryProcessor.executeOnceInternal(query, transformValues(values));
+            if (reusePrepared)
+            {
+                rs = QueryProcessor.executeInternal(query, transformValues(values));
+
+                // If a test uses a "USE ...", then presumably its statements use relative
table. In that case, a USE
+                // change the meaning of the current keyspace, so we don't want a following
statement to reuse a previously
+                // prepared statement at this wouldn't use the right keyspace. To avoid that,
we drop the previously
+                // prepared statement.
+                if (query.startsWith("USE"))
+                    QueryProcessor.clearInternalStatementsCache();
+            }
+            else
+            {
+                rs = QueryProcessor.executeOnceInternal(query, transformValues(values));
+            }
         }
         else
         {


Mime
View raw message