cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From slebre...@apache.org
Subject git commit: Add support for batchlog in CQL3
Date Mon, 01 Oct 2012 14:48:43 GMT
Updated Branches:
  refs/heads/trunk 859473db5 -> 5dc8ba837


Add support for batchlog in CQL3

patch by iamaleksey; reviewed by slebresne for CASSANDRA-4545


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/5dc8ba83
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/5dc8ba83
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/5dc8ba83

Branch: refs/heads/trunk
Commit: 5dc8ba837a706be7f4f8e180b9bafdb61aa4ccac
Parents: 859473d
Author: Sylvain Lebresne <sylvain@datastax.com>
Authored: Mon Oct 1 16:47:46 2012 +0200
Committer: Sylvain Lebresne <sylvain@datastax.com>
Committed: Mon Oct 1 16:47:46 2012 +0200

----------------------------------------------------------------------
 pylib/cqlshlib/cql3handling.py                     |    2 +-
 src/java/org/apache/cassandra/cql3/Cql.g           |   12 ++-
 .../cassandra/cql3/statements/BatchStatement.java  |   94 ++++++++-------
 .../cassandra/cql3/statements/DeleteStatement.java |    6 +-
 .../cql3/statements/ModificationStatement.java     |    4 +-
 .../cassandra/cql3/statements/UpdateStatement.java |   10 +-
 .../org/apache/cassandra/service/StorageProxy.java |   14 ++-
 7 files changed, 73 insertions(+), 69 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/5dc8ba83/pylib/cqlshlib/cql3handling.py
----------------------------------------------------------------------
diff --git a/pylib/cqlshlib/cql3handling.py b/pylib/cqlshlib/cql3handling.py
index f3c338e..4fc7714 100644
--- a/pylib/cqlshlib/cql3handling.py
+++ b/pylib/cqlshlib/cql3handling.py
@@ -449,7 +449,7 @@ def delete_opt_completer(ctxt, cass):
 explain_completion('deleteStatement', 'delcol', '<column_to_delete>')
 
 syntax_rules += r'''
-<batchStatement> ::= "BEGIN" "BATCH"
+<batchStatement> ::= "BEGIN" ( "UNLOGGED" | "COUNTER" )? "BATCH"
                         ( "USING" [batchopt]=<usingOption>
                                   ( "AND" [batchopt]=<usingOption> )* )?
                         [batchstmt]=<batchStatementMember> ";"

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5dc8ba83/src/java/org/apache/cassandra/cql3/Cql.g
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/Cql.g b/src/java/org/apache/cassandra/cql3/Cql.g
index 1379b9a..320e934 100644
--- a/src/java/org/apache/cassandra/cql3/Cql.g
+++ b/src/java/org/apache/cassandra/cql3/Cql.g
@@ -353,14 +353,17 @@ deleteSelector returns [Selector s]
  */
 batchStatement returns [BatchStatement expr]
     @init {
-        Attributes attrs = new Attributes();
+        BatchStatement.Type type = BatchStatement.Type.LOGGED;
         List<ModificationStatement> statements = new ArrayList<ModificationStatement>();
+        Attributes attrs = new Attributes();
     }
-    : K_BEGIN K_BATCH ( usingClause[attrs] )?
+    : K_BEGIN
+      ( K_UNLOGGED { type = BatchStatement.Type.UNLOGGED; } | K_COUNTER { type = BatchStatement.Type.COUNTER;
} )?
+      K_BATCH ( usingClause[attrs] )?
           s1=batchStatementObjective ';'? { statements.add(s1); } ( sN=batchStatementObjective
';'? { statements.add(sN); } )*
       K_APPLY K_BATCH
       {
-          return new BatchStatement(statements, attrs);
+          return new BatchStatement(type, statements, attrs);
       }
     ;
 
@@ -767,8 +770,9 @@ K_USE:         U S E;
 K_COUNT:       C O U N T;
 K_SET:         S E T;
 K_BEGIN:       B E G I N;
-K_APPLY:       A P P L Y;
+K_UNLOGGED:    U N L O G G E D;
 K_BATCH:       B A T C H;
+K_APPLY:       A P P L Y;
 K_TRUNCATE:    T R U N C A T E;
 K_DELETE:      D E L E T E;
 K_IN:          I N;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5dc8ba83/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
index 9e5cb40..38df9bd 100644
--- a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
@@ -19,7 +19,6 @@ package org.apache.cassandra.cql3.statements;
 
 import java.nio.ByteBuffer;
 import java.util.*;
-import java.util.concurrent.TimeoutException;
 
 import org.apache.cassandra.auth.Permission;
 import org.apache.cassandra.cql3.*;
@@ -29,8 +28,8 @@ import org.apache.cassandra.db.IMutation;
 import org.apache.cassandra.db.RowMutation;
 import org.apache.cassandra.exceptions.*;
 import org.apache.cassandra.service.ClientState;
-import org.apache.cassandra.thrift.RequestType;
-import org.apache.cassandra.thrift.ThriftValidation;
+import org.apache.cassandra.service.StorageProxy;
+import org.apache.cassandra.transport.messages.ResultMessage;
 import org.apache.cassandra.utils.Pair;
 
 /**
@@ -39,6 +38,12 @@ import org.apache.cassandra.utils.Pair;
  */
 public class BatchStatement extends ModificationStatement
 {
+    public static enum Type
+    {
+        LOGGED, UNLOGGED, COUNTER
+    }
+
+    protected final Type type;
     // statements to execute
     protected final List<ModificationStatement> statements;
 
@@ -46,12 +51,14 @@ public class BatchStatement extends ModificationStatement
      * Creates a new BatchStatement from a list of statements and a
      * Thrift consistency level.
      *
+     * @param type type of the batch
      * @param statements a list of UpdateStatements
      * @param attrs additional attributes for statement (CL, timestamp, timeToLive)
      */
-    public BatchStatement(List<ModificationStatement> statements, Attributes attrs)
+    public BatchStatement(Type type, List<ModificationStatement> statements, Attributes
attrs)
     {
         super(null, attrs);
+        this.type = type;
         this.statements = statements;
     }
 
@@ -78,6 +85,28 @@ public class BatchStatement extends ModificationStatement
     }
 
     @Override
+    public ResultMessage execute(ClientState state, List<ByteBuffer> variables) throws
RequestExecutionException, RequestValidationException
+    {
+        Collection<? extends IMutation> mutations = getMutations(state, variables,
false);
+        ConsistencyLevel cl = getConsistencyLevel();
+
+        switch (type)
+        {
+            case LOGGED:
+                StorageProxy.mutateAtomically((Collection<RowMutation>) mutations,
cl);
+                break;
+            case UNLOGGED:
+            case COUNTER:
+                StorageProxy.mutate(mutations, cl);
+                break;
+            default:
+                throw new AssertionError();
+        }
+
+        return null;
+    }
+
+    @Override
     public ConsistencyLevel getConsistencyLevel()
     {
         // We have validated that either the consistency is set, or all statements have the
same default CL (see validate())
@@ -119,54 +148,35 @@ public class BatchStatement extends ModificationStatement
         }
     }
 
-    public List<IMutation> getMutations(ClientState clientState, List<ByteBuffer>
variables, boolean local)
+    public Collection<? extends IMutation> getMutations(ClientState clientState, List<ByteBuffer>
variables, boolean local)
     throws RequestExecutionException, RequestValidationException
     {
-        Map<Pair<String, ByteBuffer>, RowAndCounterMutation> mutations = new
HashMap<Pair<String, ByteBuffer>, RowAndCounterMutation>();
+        Map<Pair<String, ByteBuffer>, IMutation> mutations = new HashMap<Pair<String,
ByteBuffer>, IMutation>();
         for (ModificationStatement statement : statements)
         {
             if (isSetTimestamp())
                 statement.setTimestamp(getTimestamp(clientState));
 
-            List<IMutation> lm = statement.getMutations(clientState, variables, local);
             // Group mutation together, otherwise they won't get applied atomically
-            for (IMutation m : lm)
+            for (IMutation m : statement.getMutations(clientState, variables, local))
             {
+                if (m instanceof CounterMutation && type != Type.COUNTER)
+                    throw new InvalidRequestException("Counter mutations are only allowed
in COUNTER batches");
+
+                if (m instanceof RowMutation && type == Type.COUNTER)
+                    throw new InvalidRequestException("Only counter mutations are allowed
in COUNTER batches");
+
                 Pair<String, ByteBuffer> key = Pair.create(m.getTable(), m.key());
-                RowAndCounterMutation racm = mutations.get(key);
-                if (racm == null)
-                {
-                    racm = new RowAndCounterMutation();
-                    mutations.put(key, racm);
-                }
-
-                if (m instanceof CounterMutation)
-                {
-                    if (racm.cm == null)
-                        racm.cm = (CounterMutation)m;
-                    else
-                        racm.cm.addAll(m);
-                }
+                IMutation existing = mutations.get(key);
+
+                if (existing == null)
+                    mutations.put(key, m);
                 else
-                {
-                    assert m instanceof RowMutation;
-                    if (racm.rm == null)
-                        racm.rm = (RowMutation)m;
-                    else
-                        racm.rm.addAll(m);
-                }
+                    existing.addAll(m);
             }
         }
 
-        List<IMutation> batch = new LinkedList<IMutation>();
-        for (RowAndCounterMutation racm : mutations.values())
-        {
-            if (racm.rm != null)
-                batch.add(racm.rm);
-            if (racm.cm != null)
-                batch.add(racm.cm);
-        }
-        return batch;
+        return mutations.values();
     }
 
     public ParsedStatement.Prepared prepare(CFDefinition.Name[] boundNames) throws InvalidRequestException
@@ -187,12 +197,6 @@ public class BatchStatement extends ModificationStatement
 
     public String toString()
     {
-        return String.format("BatchStatement(statements=%s, consistency=%s)", statements,
getConsistencyLevel());
-    }
-
-    private static class RowAndCounterMutation
-    {
-        public RowMutation rm;
-        public CounterMutation cm;
+        return String.format("BatchStatement(type=%s, statements=%s, consistency=%s)", type,
statements, getConsistencyLevel());
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5dc8ba83/src/java/org/apache/cassandra/cql3/statements/DeleteStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/DeleteStatement.java b/src/java/org/apache/cassandra/cql3/statements/DeleteStatement.java
index 50a279f..fc0efb8 100644
--- a/src/java/org/apache/cassandra/cql3/statements/DeleteStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/DeleteStatement.java
@@ -19,7 +19,6 @@ package org.apache.cassandra.cql3.statements;
 
 import java.nio.ByteBuffer;
 import java.util.*;
-import java.util.concurrent.TimeoutException;
 
 import org.apache.cassandra.cql3.*;
 import org.apache.cassandra.config.CFMetaData;
@@ -29,7 +28,6 @@ import org.apache.cassandra.cql3.operations.Operation;
 import org.apache.cassandra.cql3.operations.SetOperation;
 import org.apache.cassandra.db.ColumnFamily;
 import org.apache.cassandra.db.DeletionInfo;
-import org.apache.cassandra.db.IMutation;
 import org.apache.cassandra.db.RowMutation;
 import org.apache.cassandra.db.marshal.CollectionType;
 import org.apache.cassandra.db.marshal.CompositeType;
@@ -62,7 +60,7 @@ public class DeleteStatement extends ModificationStatement
         this.toRemove = new ArrayList<Pair<CFDefinition.Name, Term>>(columns.size());
     }
 
-    public List<IMutation> getMutations(ClientState clientState, List<ByteBuffer>
variables, boolean local)
+    public Collection<RowMutation> getMutations(ClientState clientState, List<ByteBuffer>
variables, boolean local)
     throws RequestExecutionException, RequestValidationException
     {
         // keys
@@ -94,7 +92,7 @@ public class DeleteStatement extends ModificationStatement
 
         Map<ByteBuffer, ColumnGroupMap> rows = needsReading ? readRows(keys, builder,
(CompositeType)cfDef.cfm.comparator, local) : null;
 
-        List<IMutation> rowMutations = new ArrayList<IMutation>(keys.size());
+        Collection<RowMutation> rowMutations = new ArrayList<RowMutation>(keys.size());
         UpdateParameters params = new UpdateParameters(variables, getTimestamp(clientState),
-1);
 
         for (ByteBuffer key : keys)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5dc8ba83/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
index 6072c24..192d837 100644
--- a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
@@ -33,8 +33,6 @@ import org.apache.cassandra.db.marshal.CompositeType;
 import org.apache.cassandra.exceptions.*;
 import org.apache.cassandra.service.ClientState;
 import org.apache.cassandra.service.StorageProxy;
-import org.apache.cassandra.thrift.RequestType;
-import org.apache.cassandra.thrift.ThriftValidation;
 
 /**
  * Abstract class for statements that apply on a given column family.
@@ -179,7 +177,7 @@ public abstract class ModificationStatement extends CFStatement implements
CQLSt
      * @return list of the mutations
      * @throws InvalidRequestException on invalid requests
      */
-    protected abstract List<IMutation> getMutations(ClientState clientState, List<ByteBuffer>
variables, boolean local)
+    protected abstract Collection<? extends IMutation> getMutations(ClientState clientState,
List<ByteBuffer> variables, boolean local)
     throws RequestExecutionException, RequestValidationException;
 
     public abstract ParsedStatement.Prepared prepare(CFDefinition.Name[] boundNames) throws
InvalidRequestException;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5dc8ba83/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java b/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java
index 877d0d8..a4a310d 100644
--- a/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java
@@ -19,7 +19,6 @@ package org.apache.cassandra.cql3.statements;
 
 import java.nio.ByteBuffer;
 import java.util.*;
-import java.util.concurrent.TimeoutException;
 
 import com.google.common.collect.ArrayListMultimap;
 
@@ -35,7 +34,6 @@ import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.Pair;
 
 import static org.apache.cassandra.cql.QueryProcessor.validateKey;
-
 import static org.apache.cassandra.thrift.ThriftValidation.validateColumnFamily;
 
 /**
@@ -100,7 +98,7 @@ public class UpdateStatement extends ModificationStatement
 
 
     /** {@inheritDoc} */
-    public List<IMutation> getMutations(ClientState clientState, List<ByteBuffer>
variables, boolean local)
+    public Collection<IMutation> getMutations(ClientState clientState, List<ByteBuffer>
variables, boolean local)
     throws RequestExecutionException, RequestValidationException
     {
         List<ByteBuffer> keys = buildKeyNames(cfDef, processedKeys, variables);
@@ -129,13 +127,13 @@ public class UpdateStatement extends ModificationStatement
 
         Map<ByteBuffer, ColumnGroupMap> rows = needsReading ? readRows(keys, builder,
(CompositeType)cfDef.cfm.comparator, local) : null;
 
-        List<IMutation> rowMutations = new LinkedList<IMutation>();
+        Collection<IMutation> mutations = new LinkedList<IMutation>();
         UpdateParameters params = new UpdateParameters(variables, getTimestamp(clientState),
getTimeToLive());
 
         for (ByteBuffer key: keys)
-            rowMutations.add(mutationForKey(cfDef, key, builder, params, rows == null ? null
: rows.get(key)));
+            mutations.add(mutationForKey(cfDef, key, builder, params, rows == null ? null
: rows.get(key)));
 
-        return rowMutations;
+        return mutations;
     }
 
     // Returns the first empty component or null if none are

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5dc8ba83/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index 122d3fd..df6a36a 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -167,7 +167,7 @@ public class StorageProxy implements StorageProxyMBean
      * @param mutations the mutations to be applied across the replicas
      * @param consistency_level the consistency level for the operation
      */
-    public static void mutate(List<? extends IMutation> mutations, ConsistencyLevel
consistency_level)
+    public static void mutate(Collection<? extends IMutation> mutations, ConsistencyLevel
consistency_level)
     throws UnavailableException, OverloadedException, WriteTimeoutException
     {
         logger.debug("Mutations/ConsistencyLevel are {}/{}", mutations, consistency_level);
@@ -243,7 +243,7 @@ public class StorageProxy implements StorageProxyMBean
      * @param mutations the RowMutations to be applied across the replicas
      * @param consistency_level the consistency level for the operation
      */
-    public static void mutateAtomically(List<RowMutation> mutations, ConsistencyLevel
consistency_level)
+    public static void mutateAtomically(Collection<RowMutation> mutations, ConsistencyLevel
consistency_level)
     throws UnavailableException, WriteTimeoutException
     {
         long startTime = System.nanoTime();
@@ -294,13 +294,15 @@ public class StorageProxy implements StorageProxyMBean
         }
     }
 
-    private static void syncWriteToBatchlog(List<RowMutation> mutations,
-                                            Collection<InetAddress> endpoints,
-                                            UUID uuid)
+    private static void syncWriteToBatchlog(Collection<RowMutation> mutations, Collection<InetAddress>
endpoints, UUID uuid)
     throws WriteTimeoutException
     {
         RowMutation rm = BatchlogManager.getBatchlogMutationFor(mutations, uuid);
-        AbstractWriteResponseHandler handler = WriteResponseHandler.create(endpoints, Collections.<InetAddress>emptyList(),
ConsistencyLevel.ONE, Table.SYSTEM_KS, null);
+        AbstractWriteResponseHandler handler = WriteResponseHandler.create(endpoints,
+                                                                           Collections.<InetAddress>emptyList(),
+                                                                           ConsistencyLevel.ONE,
+                                                                           Table.SYSTEM_KS,
+                                                                           null);
 
         try
         {


Mime
View raw message