cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From slebre...@apache.org
Subject [1/2] git commit: Optimize single partition batch statements
Date Fri, 21 Feb 2014 10:04:01 GMT
Repository: cassandra
Updated Branches:
  refs/heads/cassandra-2.1 13b753b01 -> 3b4084b6c


Optimize single partition batch statements

patch by slebresne; reviewed by benedict for CASSANDRA-6737


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/54a7e003
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/54a7e003
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/54a7e003

Branch: refs/heads/cassandra-2.1
Commit: 54a7e0034148f451ff493f9f5363c26f10a21f20
Parents: edf16c9
Author: Sylvain Lebresne <sylvain@datastax.com>
Authored: Wed Feb 19 19:10:09 2014 +0100
Committer: Sylvain Lebresne <sylvain@datastax.com>
Committed: Fri Feb 21 10:18:02 2014 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../cql3/statements/BatchStatement.java         | 57 ++++++++++++++------
 .../cql3/statements/DeleteStatement.java        |  8 ---
 .../cql3/statements/ModificationStatement.java  | 40 +++++++-------
 .../cql3/statements/UpdateStatement.java        |  8 ---
 .../org/apache/cassandra/db/RowMutation.java    |  7 ++-
 6 files changed, 67 insertions(+), 54 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/54a7e003/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index bbacc4d..a5e1016 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -19,6 +19,7 @@
  * Improve repair tasks(snapshot, differencing) concurrency (CASSANDRA-6566)
  * Fix replaying pre-2.0 commit logs (CASSANDRA-6714)
  * Add static columns to CQL3 (CASSANDRA-6561)
+ * Optimize single partition batch statements (CASSANDRA-6737)
 Merged from 1.2:
  * Catch memtable flush exceptions during shutdown (CASSANDRA-6735)
  * Fix broken streams when replacing with same IP (CASSANDRA-6622)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/54a7e003/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
index d4acbae..b1dbb31 100644
--- a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
@@ -30,7 +30,6 @@ import org.apache.cassandra.service.ClientState;
 import org.apache.cassandra.service.QueryState;
 import org.apache.cassandra.service.StorageProxy;
 import org.apache.cassandra.transport.messages.ResultMessage;
-import org.apache.cassandra.utils.Pair;
 
 /**
  * A <code>BATCH</code> statement parsed from a CQL query.
@@ -113,14 +112,26 @@ public class BatchStatement implements CQLStatement, MeasurableForPreparedCache
     private Collection<? extends IMutation> getMutations(BatchVariables variables,
boolean local, ConsistencyLevel cl, long now)
     throws RequestExecutionException, RequestValidationException
     {
-        Map<Pair<String, ByteBuffer>, IMutation> mutations = new HashMap<Pair<String,
ByteBuffer>, IMutation>();
+        Map<String, Map<ByteBuffer, IMutation>> mutations = new HashMap<>();
         for (int i = 0; i < statements.size(); i++)
         {
             ModificationStatement statement = statements.get(i);
             List<ByteBuffer> statementVariables = variables.getVariablesForStatement(i);
             addStatementMutations(statement, statementVariables, local, cl, now, mutations);
         }
-        return mutations.values();
+        return unzipMutations(mutations);
+    }
+
+    private Collection<? extends IMutation> unzipMutations(Map<String, Map<ByteBuffer,
IMutation>> mutations)
+    {
+        // The case where all statement where on the same keyspace is pretty common
+        if (mutations.size() == 1)
+            return mutations.values().iterator().next().values();
+
+        List<IMutation> ms = new ArrayList<>();
+        for (Map<ByteBuffer, IMutation> ksMap : mutations.values())
+            ms.addAll(ksMap.values());
+        return ms;
     }
 
     private void addStatementMutations(ModificationStatement statement,
@@ -128,23 +139,40 @@ public class BatchStatement implements CQLStatement, MeasurableForPreparedCache
                                        boolean local,
                                        ConsistencyLevel cl,
                                        long now,
-                                       Map<Pair<String, ByteBuffer>, IMutation>
mutations)
+                                       Map<String, Map<ByteBuffer, IMutation>>
mutations)
     throws RequestExecutionException, RequestValidationException
     {
-        // Group mutation together, otherwise they won't get applied atomically
-        for (IMutation m : statement.getMutations(variables, local, cl, attrs.getTimestamp(now,
variables), true))
+        String ksName = statement.keyspace();
+        Map<ByteBuffer, IMutation> ksMap = mutations.get(ksName);
+        if (ksMap == null)
         {
-            Pair<String, ByteBuffer> key = Pair.create(m.getKeyspaceName(), m.key());
-            IMutation existing = mutations.get(key);
+            ksMap = new HashMap<>();
+            mutations.put(ksName, ksMap);
+        }
 
-            if (existing == null)
+        // The following does the same than statement.getMutations(), but we inline it here
because
+        // we don't want to recreate mutations every time as this is particularly inefficient
when applying
+        // multiple batch to the same partition (see #6737).
+        List<ByteBuffer> keys = statement.buildPartitionKeyNames(variables);
+        ColumnNameBuilder clusteringPrefix = statement.createClusteringPrefixBuilder(variables);
+        UpdateParameters params = statement.makeUpdateParameters(keys, clusteringPrefix,
variables, local, cl, now);
+
+        for (ByteBuffer key : keys)
+        {
+            IMutation mutation = ksMap.get(key);
+            RowMutation rm;
+            if (mutation == null)
             {
-                mutations.put(key, m);
+                rm = new RowMutation(ksName, key);
+                mutation = type == Type.COUNTER ? new CounterMutation(rm, cl) : rm;
+                ksMap.put(key, mutation);
             }
             else
             {
-                existing.addAll(m);
+                rm = type == Type.COUNTER ? ((CounterMutation)mutation).rowMutation() : (RowMutation)mutation;
             }
+
+            statement.addUpdateForKey(rm.addOrGet(statement.cfm, UnsortedColumns.factory),
key, clusteringPrefix, params);
         }
     }
 
@@ -213,9 +241,9 @@ public class BatchStatement implements CQLStatement, MeasurableForPreparedCache
                 throw new InvalidRequestException("Batch with conditions cannot span multiple
partitions");
             }
 
+            ColumnNameBuilder clusteringPrefix = statement.createClusteringPrefixBuilder(statementVariables);
             if (statement.hasConditions())
             {
-                ColumnNameBuilder clusteringPrefix = statement.createClusteringPrefixBuilder(statementVariables);
                 statement.addUpdatesAndConditions(key, clusteringPrefix, updates, conditions,
statementVariables, timestamp);
                 // As soon as we have a ifNotExists, we set columnsWithConditions to null
so that everything is in the resultSet
                 if (statement.hasIfNotExistCondition())
@@ -225,9 +253,8 @@ public class BatchStatement implements CQLStatement, MeasurableForPreparedCache
             }
             else
             {
-                // getPartitionKey will already have thrown if there is more than one key
involved
-                IMutation mut = statement.getMutations(statementVariables, false, cl, timestamp,
true).iterator().next();
-                updates.resolve(mut.getColumnFamilies().iterator().next());
+                UpdateParameters params = statement.makeUpdateParameters(Collections.singleton(key),
clusteringPrefix, statementVariables, false, cl, now);
+                statement.addUpdateForKey(updates, key, clusteringPrefix, params);
             }
         }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/54a7e003/src/java/org/apache/cassandra/cql3/statements/DeleteStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/DeleteStatement.java b/src/java/org/apache/cassandra/cql3/statements/DeleteStatement.java
index cd5f2a2..6efe100 100644
--- a/src/java/org/apache/cassandra/cql3/statements/DeleteStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/DeleteStatement.java
@@ -41,14 +41,6 @@ public class DeleteStatement extends ModificationStatement
         return false;
     }
 
-    public ColumnFamily updateForKey(ByteBuffer key, ColumnNameBuilder builder, UpdateParameters
params)
-    throws InvalidRequestException
-    {
-        ColumnFamily cf = TreeMapBackedSortedColumns.factory.create(cfm);
-        addUpdateForKey(cf, key, builder, params);
-        return cf;
-    }
-
     public void addUpdateForKey(ColumnFamily cf, ByteBuffer key, ColumnNameBuilder builder,
UpdateParameters params)
     throws InvalidRequestException
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/54a7e003/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
index ac8d2e1..ecefcb9 100644
--- a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
@@ -415,7 +415,7 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
         return null;
     }
 
-    protected Map<ByteBuffer, ColumnGroupMap> readRequiredRows(List<ByteBuffer>
partitionKeys, ColumnNameBuilder clusteringPrefix, boolean local, ConsistencyLevel cl)
+    protected Map<ByteBuffer, ColumnGroupMap> readRequiredRows(Collection<ByteBuffer>
partitionKeys, ColumnNameBuilder clusteringPrefix, boolean local, ConsistencyLevel cl)
     throws RequestExecutionException, RequestValidationException
     {
         // Lists SET operation incurs a read.
@@ -433,7 +433,7 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
         return toRead == null ? null : readRows(partitionKeys, clusteringPrefix, toRead,
(CompositeType)cfm.comparator, local, cl);
     }
 
-    private Map<ByteBuffer, ColumnGroupMap> readRows(List<ByteBuffer> partitionKeys,
ColumnNameBuilder clusteringPrefix, Set<ByteBuffer> toRead, CompositeType composite,
boolean local, ConsistencyLevel cl)
+    private Map<ByteBuffer, ColumnGroupMap> readRows(Collection<ByteBuffer> partitionKeys,
ColumnNameBuilder clusteringPrefix, Set<ByteBuffer> toRead, CompositeType composite,
boolean local, ConsistencyLevel cl)
     throws RequestExecutionException, RequestValidationException
     {
         try
@@ -516,7 +516,7 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
         else
             cl.validateForWrite(cfm.ksName);
 
-        Collection<? extends IMutation> mutations = getMutations(options.getValues(),
false, cl, queryState.getTimestamp(), false);
+        Collection<? extends IMutation> mutations = getMutations(options.getValues(),
false, cl, queryState.getTimestamp());
         if (!mutations.isEmpty())
             StorageProxy.mutateWithTriggers(mutations, cl, false);
 
@@ -651,7 +651,7 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
         if (hasConditions())
             throw new UnsupportedOperationException();
 
-        for (IMutation mutation : getMutations(Collections.<ByteBuffer>emptyList(),
true, null, queryState.getTimestamp(), false))
+        for (IMutation mutation : getMutations(Collections.<ByteBuffer>emptyList(),
true, null, queryState.getTimestamp()))
             mutation.apply();
         return null;
     }
@@ -667,15 +667,13 @@ public abstract class ModificationStatement implements CQLStatement,
MeasurableF
      * @return list of the mutations
      * @throws InvalidRequestException on invalid requests
      */
-    public Collection<? extends IMutation> getMutations(List<ByteBuffer> variables,
boolean local, ConsistencyLevel cl, long now, boolean isBatch)
+    private Collection<? extends IMutation> getMutations(List<ByteBuffer> variables,
boolean local, ConsistencyLevel cl, long now)
     throws RequestExecutionException, RequestValidationException
     {
         List<ByteBuffer> keys = buildPartitionKeyNames(variables);
         ColumnNameBuilder clusteringPrefix = createClusteringPrefixBuilder(variables);
 
-        // Some lists operation requires reading
-        Map<ByteBuffer, ColumnGroupMap> rows = readRequiredRows(keys, clusteringPrefix,
local, cl);
-        UpdateParameters params = new UpdateParameters(cfm, variables, getTimestamp(now,
variables), getTimeToLive(variables), rows);
+        UpdateParameters params = makeUpdateParameters(keys, clusteringPrefix, variables,
local, cl, now);
 
         Collection<IMutation> mutations = new ArrayList<IMutation>();
         for (ByteBuffer key: keys)
@@ -683,25 +681,23 @@ public abstract class ModificationStatement implements CQLStatement,
MeasurableF
             ThriftValidation.validateKey(cfm, key);
             ColumnFamily cf = UnsortedColumns.factory.create(cfm);
             addUpdateForKey(cf, key, clusteringPrefix, params);
-            mutations.add(makeMutation(key, cf, cl, isBatch));
+            RowMutation rm = new RowMutation(cfm.ksName, key, cf);
+            mutations.add(isCounter() ? new CounterMutation(rm, cl) : rm);
         }
         return mutations;
     }
 
-    private IMutation makeMutation(ByteBuffer key, ColumnFamily cf, ConsistencyLevel cl,
boolean isBatch)
+    public UpdateParameters makeUpdateParameters(Collection<ByteBuffer> keys,
+                                                 ColumnNameBuilder prefix,
+                                                 List<ByteBuffer> variables,
+                                                 boolean local,
+                                                 ConsistencyLevel cl,
+                                                 long now)
+    throws RequestExecutionException, RequestValidationException
     {
-        RowMutation rm;
-        if (isBatch)
-        {
-            // we might group other mutations together with this one later, so make it mutable
-            rm = new RowMutation(cfm.ksName, key);
-            rm.add(cf);
-        }
-        else
-        {
-            rm = new RowMutation(cfm.ksName, key, cf);
-        }
-        return isCounter() ? new CounterMutation(rm, cl) : rm;
+        // Some lists operation requires reading
+        Map<ByteBuffer, ColumnGroupMap> rows = readRequiredRows(keys, prefix, local,
cl);
+        return new UpdateParameters(cfm, variables, getTimestamp(now, variables), getTimeToLive(variables),
rows);
     }
 
     public static abstract class Parsed extends CFStatement

http://git-wip-us.apache.org/repos/asf/cassandra/blob/54a7e003/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java b/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java
index 0e6481b..dcf22ef 100644
--- a/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java
@@ -98,14 +98,6 @@ public class UpdateStatement extends ModificationStatement
         }
     }
 
-    public ColumnFamily updateForKey(ByteBuffer key, ColumnNameBuilder builder, UpdateParameters
params)
-    throws InvalidRequestException
-    {
-        ColumnFamily cf = UnsortedColumns.factory.create(cfm);
-        addUpdateForKey(cf, key, builder, params);
-        return cf;
-    }
-
     public static class ParsedInsert extends ModificationStatement.Parsed
     {
         private final List<ColumnIdentifier> columnNames;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/54a7e003/src/java/org/apache/cassandra/db/RowMutation.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/RowMutation.java b/src/java/org/apache/cassandra/db/RowMutation.java
index e9d177b..49ee2c5 100644
--- a/src/java/org/apache/cassandra/db/RowMutation.java
+++ b/src/java/org/apache/cassandra/db/RowMutation.java
@@ -125,10 +125,15 @@ public class RowMutation implements IMutation
 
     public ColumnFamily addOrGet(CFMetaData cfm)
     {
+        return addOrGet(cfm, TreeMapBackedSortedColumns.factory);
+    }
+
+    public ColumnFamily addOrGet(CFMetaData cfm, ColumnFamily.Factory factory)
+    {
         ColumnFamily cf = modifications.get(cfm.cfId);
         if (cf == null)
         {
-            cf = TreeMapBackedSortedColumns.factory.create(cfm);
+            cf = factory.create(cfm);
             modifications.put(cfm.cfId, cf);
         }
         return cf;


Mime
View raw message