cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From yu...@apache.org
Subject [44/50] [abbrv] Add lists, sets and maps support
Date Fri, 27 Jul 2012 15:19:39 GMT
http://git-wip-us.apache.org/repos/asf/cassandra/blob/91bdf7fb/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
index c74b08f..dd729fb 100644
--- a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
@@ -48,8 +48,8 @@ import org.apache.cassandra.thrift.IndexExpression;
 import org.apache.cassandra.thrift.IndexOperator;
 import org.apache.cassandra.thrift.InvalidRequestException;
 import org.apache.cassandra.thrift.RequestType;
-import org.apache.cassandra.thrift.ThriftValidation;
 import org.apache.cassandra.thrift.TimedOutException;
+import org.apache.cassandra.thrift.ThriftValidation;
 import org.apache.cassandra.thrift.UnavailableException;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
@@ -117,20 +117,27 @@ public class SelectStatement implements CQLStatement
 
     public ResultSet executeInternal(ClientState state, List<ByteBuffer> variables) throws InvalidRequestException, UnavailableException, TimedOutException
     {
-        List<Row> rows;
-        if (isKeyRange())
+        try
         {
-            rows = multiRangeSlice(variables);
+            List<Row> rows;
+            if (isKeyRange())
+            {
+                rows = multiRangeSlice(variables);
+            }
+            else
+            {
+                rows = getSlice(variables);
+            }
+
+            // Even for count, we need to process the result as it'll group some column together in sparse column families
+            ResultSet rset = process(rows, variables);
+            rset = parameters.isCount ? rset.makeCountResult() : rset;
+            return rset;
         }
-        else
+        catch (TimeoutException e)
         {
-            rows = getSlice(variables);
+            throw new TimedOutException();
         }
-
-        // Even for count, we need to process the result as it'll group some column together in sparse column families
-        ResultSet rset = process(rows, variables);
-        rset = parameters.isCount ? rset.makeCountResult() : rset;
-        return rset;
     }
 
     public ResultSet process(List<Row> rows) throws InvalidRequestException
@@ -149,40 +156,32 @@ public class SelectStatement implements CQLStatement
         return cfDef.cfm.cfName;
     }
 
-    private List<Row> getSlice(List<ByteBuffer> variables) throws InvalidRequestException, TimedOutException, UnavailableException
+    private List<Row> getSlice(List<ByteBuffer> variables) throws InvalidRequestException, TimeoutException, UnavailableException
     {
         QueryPath queryPath = new QueryPath(columnFamily());
         Collection<ByteBuffer> keys = getKeys(variables);
         List<ReadCommand> commands = new ArrayList<ReadCommand>(keys.size());
 
+        IFilter filter = makeFilter(variables);
         // ...a range (slice) of column names
         if (isColumnRange())
         {
-            ByteBuffer start = getColumnStart(variables);
-            ByteBuffer finish = getColumnEnd(variables);
-
-            SliceQueryFilter filter = new SliceQueryFilter(start, finish, isReversed, getLimit());
-            QueryProcessor.validateSliceFilter(cfDef.cfm, filter);
-
             // Note that we use the total limit for every key. This is
             // potentially inefficient, but then again, IN + LIMIT is not a
             // very sensible choice
             for (ByteBuffer key : keys)
             {
                 QueryProcessor.validateKey(key);
-                commands.add(new SliceFromReadCommand(keyspace(), key, queryPath, filter));
+                commands.add(new SliceFromReadCommand(keyspace(), key, queryPath, (SliceQueryFilter)filter));
             }
         }
         // ...of a list of column names
         else
         {
-            Collection<ByteBuffer> columnNames = getRequestedColumns(variables);
-            QueryProcessor.validateColumnNames(columnNames);
-
             for (ByteBuffer key: keys)
             {
                 QueryProcessor.validateKey(key);
-                commands.add(new SliceByNamesReadCommand(keyspace(), key, queryPath, columnNames));
+                commands.add(new SliceByNamesReadCommand(keyspace(), key, queryPath, (NamesQueryFilter)filter));
             }
         }
 
@@ -190,23 +189,16 @@ public class SelectStatement implements CQLStatement
         {
             return StorageProxy.read(commands, parameters.consistencyLevel);
         }
-        catch (TimeoutException e)
-        {
-            throw new TimedOutException();
-        }
         catch (IOException e)
         {
             throw new RuntimeException(e);
         }
     }
 
-    private List<Row> multiRangeSlice(List<ByteBuffer> variables) throws InvalidRequestException, TimedOutException, UnavailableException
+    private List<Row> multiRangeSlice(List<ByteBuffer> variables) throws InvalidRequestException, TimeoutException, UnavailableException
     {
         List<Row> rows;
-
-        IFilter filter =  makeFilter(variables);
-        QueryProcessor.validateFilter(cfDef.cfm, filter);
-
+        IFilter filter = makeFilter(variables);
         List<IndexExpression> expressions = getIndexExpressions(variables);
 
         try
@@ -226,10 +218,6 @@ public class SelectStatement implements CQLStatement
         {
             throw new RuntimeException(e);
         }
-        catch (TimeoutException e)
-        {
-            throw new TimedOutException();
-        }
         return rows;
     }
 
@@ -282,14 +270,26 @@ public class SelectStatement implements CQLStatement
     {
         if (isColumnRange())
         {
-            return new SliceQueryFilter(getRequestedBound(isReversed ? Bound.END : Bound.START, variables),
-                                        getRequestedBound(isReversed ? Bound.START : Bound.END, variables),
-                                        isReversed,
-                                        -1); // We use this for range slices, where the count is ignored in favor of the global column count
+            // For sparse, we used to ask for 'defined columns' * 'asked limit' to account for the grouping of columns.
+            // Since that doesn't work for maps/sets/lists, we use the compositesToGroup option of SliceQueryFilter.
+            // But we must preserver backward compatibility too.
+            int multiplier = cfDef.isCompact ? 1 : cfDef.metadata.size();
+            int toGroup = cfDef.isCompact ? -1 : cfDef.columns.size();
+            ColumnSlice slice = new ColumnSlice(getRequestedBound(isReversed ? Bound.END : Bound.START, variables),
+                                                getRequestedBound(isReversed ? Bound.START : Bound.END, variables));
+            SliceQueryFilter filter = new SliceQueryFilter(new ColumnSlice[]{slice},
+                                                           isReversed,
+                                                           getLimit(),
+                                                           toGroup,
+                                                           multiplier);
+            QueryProcessor.validateSliceFilter(cfDef.cfm, filter);
+            return filter;
         }
         else
         {
-            return new NamesQueryFilter(getRequestedColumns(variables));
+            SortedSet<ByteBuffer> columnNames = getRequestedColumns(variables);
+            QueryProcessor.validateColumnNames(columnNames);
+            return new NamesQueryFilter(columnNames);
         }
     }
 
@@ -297,11 +297,7 @@ public class SelectStatement implements CQLStatement
     {
         // Internally, we don't support exclusive bounds for slices. Instead,
         // we query one more element if necessary and exclude
-        int limit = sliceRestriction != null && !sliceRestriction.isInclusive(Bound.START) ? parameters.limit + 1 : parameters.limit;
-        // For sparse, we'll end up merging all defined colums into the same CqlRow. Thus we should query up
-        // to 'defined columns' * 'asked limit' to be sure to have enough columns. We'll trim after query if
-        // this end being too much.
-        return cfDef.isCompact ? limit : cfDef.metadata.size() * limit;
+        return sliceRestriction != null && !sliceRestriction.isInclusive(Bound.START) ? parameters.limit + 1 : parameters.limit;
     }
 
     private boolean isKeyRange()
@@ -370,6 +366,10 @@ public class SelectStatement implements CQLStatement
         if (!cfDef.isCompact && !cfDef.isComposite)
             return false;
 
+        // However, collections always entails one
+        if (cfDef.hasCollections)
+            return true;
+
         // Otherwise, it is a range query if it has at least one the column alias
         // for which no relation is defined or is not EQ.
         for (Restriction r : columnRestrictions)
@@ -468,17 +468,7 @@ public class SelectStatement implements CQLStatement
             }
         }
         // Means no relation at all or everything was an equal
-        return builder.build();
-    }
-
-    public ByteBuffer getColumnStart(List<ByteBuffer> variables) throws InvalidRequestException
-    {
-        return getRequestedBound(isReversed ? Bound.END : Bound.START, variables);
-    }
-
-    public ByteBuffer getColumnEnd(List<ByteBuffer> variables) throws InvalidRequestException
-    {
-        return getRequestedBound(isReversed ? Bound.START : Bound.END, variables);
+        return (b == Bound.END) ? builder.buildAsEndOfRange() : builder.build();
     }
 
     private List<IndexExpression> getIndexExpressions(List<ByteBuffer> variables) throws InvalidRequestException
@@ -708,30 +698,14 @@ public class SelectStatement implements CQLStatement
             {
                 // Sparse case: group column in cqlRow when composite prefix is equal
                 CompositeType composite = (CompositeType)cfDef.cfm.comparator;
-                int last = composite.types.size() - 1;
 
-                ByteBuffer[] previous = null;
-                Map<ByteBuffer, IColumn> group = new HashMap<ByteBuffer, IColumn>();
-                for (IColumn c : row.cf)
-                {
-                    if (c.isMarkedForDelete())
-                        continue;
+                ColumnGroupMap.Builder builder = new ColumnGroupMap.Builder(composite, cfDef.hasCollections);
 
-                    ByteBuffer[] current = composite.split(c.name());
-                    // If current differs from previous, we've just finished a group
-                    if (previous != null && !isSameRow(previous, current))
-                    {
-                        handleGroup(selection, row.key.key, previous, group, cqlRows);
-                        group = new HashMap<ByteBuffer, IColumn>();
-                    }
+                for (IColumn c : row.cf)
+                    builder.add(c);
 
-                    // Accumulate the current column
-                    group.put(current[last], c);
-                    previous = current;
-                }
-                // Handle the last group
-                if (previous != null)
-                    handleGroup(selection, row.key.key, previous, group, cqlRows);
+                for (ColumnGroupMap group : builder.groups())
+                    handleGroup(selection, row.key.key, group, cqlRows);
             }
             else
             {
@@ -808,30 +782,7 @@ public class SelectStatement implements CQLStatement
         Collections.sort(cqlRows.rows, new CompositeComparator(startPosition, types));
     }
 
-
-    /**
-     * For sparse composite, returns wheter two columns belong to the same
-     * cqlRow base on the full list of component in the name.
-     * Two columns do belong together if they differ only by the last
-     * component.
-     */
-    private static boolean isSameRow(ByteBuffer[] c1, ByteBuffer[] c2)
-    {
-        // Cql don't allow to insert columns who doesn't have all component of
-        // the composite set for sparse composite. Someone coming from thrift
-        // could hit that though. But since we have no way to handle this
-        // correctly, better fail here and tell whomever may hit that (if
-        // someone ever do) to change the definition to a dense composite
-        assert c1.length == c2.length : "Sparse composite should not have partial column names";
-        for (int i = 0; i < c1.length - 1; i++)
-        {
-            if (!c1[i].equals(c2[i]))
-                return false;
-        }
-        return true;
-    }
-
-    private void handleGroup(List<Pair<CFDefinition.Name, Selector>> selection, ByteBuffer key, ByteBuffer[] components, Map<ByteBuffer, IColumn> columns, ResultSet cqlRows)
+    private void handleGroup(List<Pair<CFDefinition.Name, Selector>> selection, ByteBuffer key, ColumnGroupMap columns, ResultSet cqlRows)
     {
         // Respect requested order
         for (Pair<CFDefinition.Name, Selector> p : selection)
@@ -844,13 +795,18 @@ public class SelectStatement implements CQLStatement
                     cqlRows.addColumnValue(key);
                     break;
                 case COLUMN_ALIAS:
-                    cqlRows.addColumnValue(components[name.position]);
+                    cqlRows.addColumnValue(columns.getKeyComponent(name.position));
                     break;
                 case VALUE_ALIAS:
                     // This should not happen for SPARSE
                     throw new AssertionError();
                 case COLUMN_METADATA:
-                    IColumn c = columns.get(name.name.key);
+                    if (name.type instanceof CollectionType)
+                    {
+                         cqlRows.addColumnValue(((CollectionType)name.type).serializeForThrift(columns.getCollection(name.name.key)));
+                        break;
+                    }
+                    IColumn c = columns.getSimple(name.name.key);
                     addReturnValue(cqlRows, selector, c);
                     break;
                 default:

http://git-wip-us.apache.org/repos/asf/cassandra/blob/91bdf7fb/src/java/org/apache/cassandra/cql3/statements/Selector.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/Selector.java b/src/java/org/apache/cassandra/cql3/statements/Selector.java
index 21105c0..5847b1c 100644
--- a/src/java/org/apache/cassandra/cql3/statements/Selector.java
+++ b/src/java/org/apache/cassandra/cql3/statements/Selector.java
@@ -21,8 +21,9 @@ package org.apache.cassandra.cql3.statements;
 import com.google.common.base.Objects;
 
 import org.apache.cassandra.cql3.ColumnIdentifier;
+import org.apache.cassandra.cql3.Term;
 
-public interface Selector
+public abstract class Selector
 {
     public enum Function
     {
@@ -42,11 +43,29 @@ public interface Selector
         }
     }
 
-    public ColumnIdentifier id();
-    public boolean hasFunction();
-    public Function function();
+    public abstract ColumnIdentifier id();
 
-    public static class WithFunction implements Selector
+    public boolean hasFunction()
+    {
+        return false;
+    }
+
+    public Function function()
+    {
+        return null;
+    }
+
+    public boolean hasKey()
+    {
+        return false;
+    }
+
+    public Term key()
+    {
+        return null;
+    }
+
+    public static class WithFunction extends Selector
     {
         private final Function function;
         private final ColumnIdentifier id;
@@ -95,4 +114,53 @@ public interface Selector
             return function + "(" + id + ")";
         }
     }
+
+    public static class WithKey extends Selector
+    {
+        private final ColumnIdentifier id;
+        private final Term key;
+
+        public WithKey(ColumnIdentifier id, Term key)
+        {
+            this.id = id;
+            this.key = key;
+        }
+
+        public ColumnIdentifier id()
+        {
+            return id;
+        }
+
+        @Override
+        public boolean hasKey()
+        {
+            return true;
+        }
+
+        public Term key()
+        {
+            return key;
+        }
+
+        @Override
+        public final int hashCode()
+        {
+            return Objects.hashCode(id, key);
+        }
+
+        @Override
+        public final boolean equals(Object o)
+        {
+            if(!(o instanceof WithKey))
+                return false;
+            WithKey that = (WithKey)o;
+            return id().equals(that.id()) && key.equals(that.key);
+        }
+
+        @Override
+        public String toString()
+        {
+            return id + "[" + key + "]";
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/91bdf7fb/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java b/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java
index 3167b40..dc670dc 100644
--- a/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java
@@ -17,18 +17,25 @@
  */
 package org.apache.cassandra.cql3.statements;
 
+import java.io.IOError;
+import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.*;
+import java.util.concurrent.TimeoutException;
+
+import com.google.common.collect.ArrayListMultimap;
 
 import org.apache.cassandra.cql3.*;
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.filter.QueryPath;
-import org.apache.cassandra.db.marshal.AbstractType;
-import org.apache.cassandra.db.marshal.LongType;
+import org.apache.cassandra.db.marshal.*;
 import org.apache.cassandra.service.ClientState;
+import org.apache.cassandra.service.StorageProxy;
 import org.apache.cassandra.thrift.InvalidRequestException;
+import org.apache.cassandra.thrift.UnavailableException;
 import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.Pair;
 
 import static org.apache.cassandra.cql.QueryProcessor.validateColumnName;
 import static org.apache.cassandra.cql.QueryProcessor.validateKey;
@@ -43,12 +50,12 @@ import static org.apache.cassandra.thrift.ThriftValidation.validateCommutativeFo
 public class UpdateStatement extends ModificationStatement
 {
     private CFDefinition cfDef;
-    private final Map<ColumnIdentifier, Operation> columns;
+    private final List<Pair<ColumnIdentifier, Operation>> columns;
     private final List<ColumnIdentifier> columnNames;
-    private final List<Term> columnValues;
+    private final List<Value> columnValues;
     private final List<Relation> whereClause;
 
-    private final Map<ColumnIdentifier, Operation> processedColumns = new HashMap<ColumnIdentifier, Operation>();
+    private final ArrayListMultimap<CFDefinition.Name, Operation> processedColumns = ArrayListMultimap.create();
     private final Map<ColumnIdentifier, List<Term>> processedKeys = new HashMap<ColumnIdentifier, List<Term>>();
 
     /**
@@ -61,7 +68,7 @@ public class UpdateStatement extends ModificationStatement
      * @param attrs additional attributes for statement (CL, timestamp, timeToLive)
      */
     public UpdateStatement(CFName name,
-                           Map<ColumnIdentifier, Operation> columns,
+                           List<Pair<ColumnIdentifier, Operation>> columns,
                            List<Relation> whereClause,
                            Attributes attrs)
     {
@@ -84,9 +91,9 @@ public class UpdateStatement extends ModificationStatement
      * @param attrs additional attributes for statement (CL, timestamp, timeToLive)
      */
     public UpdateStatement(CFName name,
+                           Attributes attrs,
                            List<ColumnIdentifier> columnNames,
-                           List<Term> columnValues,
-                           Attributes attrs)
+                           List<Value> columnValues)
     {
         super(name, attrs);
 
@@ -96,8 +103,9 @@ public class UpdateStatement extends ModificationStatement
         this.columns = null;
     }
 
+
     /** {@inheritDoc} */
-    public List<IMutation> getMutations(ClientState clientState, List<ByteBuffer> variables) throws InvalidRequestException
+    public List<IMutation> getMutations(ClientState clientState, List<ByteBuffer> variables) throws UnavailableException, TimeoutException, InvalidRequestException
     {
         // Check key
         List<Term> keys = processedKeys.get(cfDef.key.name);
@@ -127,14 +135,41 @@ public class UpdateStatement extends ModificationStatement
             }
         }
 
-        List<IMutation> rowMutations = new LinkedList<IMutation>();
-
+        List<ByteBuffer> rawKeys = new ArrayList<ByteBuffer>(keys.size());
         for (Term key: keys)
+            rawKeys.add(key.getByteBuffer(cfDef.key.type, variables));
+
+        // Lists SET operation incurs a read. Do that now. Note that currently,
+        // if there is at least one list, we just read the whole "row" (in the CQL sense of
+        // row) to simplify. Once #3885 is in, we can improve.
+        boolean needsReading = false;
+        for (Map.Entry<CFDefinition.Name, Operation> entry : processedColumns.entries())
         {
-            ByteBuffer rawKey = key.getByteBuffer(cfDef.key.type, variables);
-            rowMutations.add(mutationForKey(cfDef, clientState, rawKey, builder, variables));
+            CFDefinition.Name name = entry.getKey();
+            Operation value = entry.getValue();
+
+            if (!(name.type instanceof ListType))
+                continue;
+
+            if (value == null || value.type != Operation.Type.FUNCTION)
+                continue;
+
+            Operation.Function fOp = (Operation.Function)value;
+            if (fOp.fct.needsReading)
+            {
+                needsReading = true;
+                break;
+            }
         }
 
+        Map<ByteBuffer, ColumnGroupMap> rows = needsReading ? readRows(rawKeys, builder, (CompositeType)cfDef.cfm.comparator) : null;
+
+        List<IMutation> rowMutations = new LinkedList<IMutation>();
+        UpdateParameters params = new UpdateParameters(variables, getTimestamp(clientState), timeToLive);
+
+        for (ByteBuffer key: rawKeys)
+            rowMutations.add(mutationForKey(cfDef, key, builder, params, rows == null ? null : rows.get(key)));
+
         return rowMutations;
     }
 
@@ -151,7 +186,7 @@ public class UpdateStatement extends ModificationStatement
      *
      * @throws InvalidRequestException on the wrong request
      */
-    private IMutation mutationForKey(CFDefinition cfDef, ClientState clientState, ByteBuffer key, ColumnNameBuilder builder, List<ByteBuffer> variables)
+    private IMutation mutationForKey(CFDefinition cfDef, ByteBuffer key, ColumnNameBuilder builder, UpdateParameters params, ColumnGroupMap group)
     throws InvalidRequestException
     {
         validateKey(key);
@@ -167,70 +202,92 @@ public class UpdateStatement extends ModificationStatement
             if (builder.componentCount() == 0)
                 throw new InvalidRequestException(String.format("Missing PRIMARY KEY part %s", cfDef.columns.values().iterator().next()));
 
-            Operation value = processedColumns.get(cfDef.value.name);
-            if (value == null)
+            List<Operation> value = processedColumns.get(cfDef.value);
+            if (value.isEmpty())
                 throw new InvalidRequestException(String.format("Missing mandatory column %s", cfDef.value));
-            hasCounterColumn = addToMutation(clientState, cf, builder.build(), cfDef.value, value, variables);
+            assert value.size() == 1;
+            hasCounterColumn = addToMutation(cf, builder, cfDef.value, value.get(0), params, null);
         }
         else
         {
-            for (CFDefinition.Name name : cfDef.metadata.values())
+            for (Map.Entry<CFDefinition.Name, Operation> entry : processedColumns.entries())
             {
-                Operation value = processedColumns.get(name.name);
-                if (value == null)
-                    continue;
+                CFDefinition.Name name = entry.getKey();
+                Operation value = entry.getValue();
 
-                ByteBuffer colName = builder.copy().add(name.name.key).build();
-                hasCounterColumn |= addToMutation(clientState, cf, colName, name, value, variables);
+                hasCounterColumn |= addToMutation(cf, builder.copy().add(name.name.key), name, value, params, group == null ? null : group.getCollection(name.name.key));
             }
         }
 
         return (hasCounterColumn) ? new CounterMutation(rm, getConsistencyLevel()) : rm;
     }
 
-    private boolean addToMutation(ClientState clientState,
-                                  ColumnFamily cf,
-                                  ByteBuffer colName,
+    private boolean addToMutation(ColumnFamily cf,
+                                  ColumnNameBuilder builder,
                                   CFDefinition.Name valueDef,
                                   Operation value,
-                                  List<ByteBuffer> variables) throws InvalidRequestException
+                                  UpdateParameters params,
+                                  List<Pair<ByteBuffer, IColumn>> list) throws InvalidRequestException
     {
-        if (value.isUnary())
+        switch (value.type)
         {
-            validateColumnName(colName);
-            ByteBuffer valueBytes = value.value.getByteBuffer(valueDef.type, variables);
-            Column c = timeToLive > 0
-                       ? new ExpiringColumn(colName, valueBytes, getTimestamp(clientState), timeToLive)
-                       : new Column(colName, valueBytes, getTimestamp(clientState));
-            cf.addColumn(c);
-            return false;
-        }
-        else
-        {
-            if (!valueDef.name.equals(value.ident))
-                throw new InvalidRequestException("Only expressions like X = X + <long> are supported.");
+            case SET:
+                Value v = ((Operation.Set)value).value;
+                if (v instanceof Term)
+                {
+                    ByteBuffer colName = builder.build();
+                    validateColumnName(colName);
+                    ByteBuffer valueBytes = ((Term)v).getByteBuffer(valueDef.type, params.variables);
+                    cf.addColumn(params.makeColumn(colName, valueBytes));
+                }
+                else
+                {
+                    assert v instanceof Value.CollectionLiteral;
+                    Value.CollectionLiteral l = (Value.CollectionLiteral)v;
+                    l.validateType(valueDef);
 
-            long val;
-            try
-            {
-                val = ByteBufferUtil.toLong(value.value.getByteBuffer(LongType.instance, variables));
-            }
-            catch (NumberFormatException e)
-            {
-                throw new InvalidRequestException(String.format("'%s' is an invalid value, should be a long.",
-                            value.value.getText()));
-            }
+                    // Remove previous
+                    cf.addAtom(params.makeTombstoneForOverwrite(builder.copy().build(), builder.copy().buildAsEndOfRange()));
 
-            if (value.type == Operation.Type.MINUS)
-            {
-                if (val == Long.MIN_VALUE)
-                    throw new InvalidRequestException("The negation of " + val + " overflows supported integer precision (signed 8 bytes integer)");
+                    if (!l.isEmpty())
+                        addToMutation(cf, builder, valueDef, new Operation.Function(l.constructionFunction(), l.asList()), params, null);
+                }
+                return false;
+            case COUNTER:
+                Operation.Counter cOp = (Operation.Counter)value;
+                long val;
+                try
+                {
+                    val = ByteBufferUtil.toLong(cOp.value.getByteBuffer(LongType.instance, params.variables));
+                }
+                catch (NumberFormatException e)
+                {
+                    throw new InvalidRequestException(String.format("'%s' is an invalid value, should be a long.",
+                                cOp.value.getText()));
+                }
+
+                if (cOp.isSubstraction)
+                {
+                    if (val == Long.MIN_VALUE)
+                        throw new InvalidRequestException("The negation of " + val + " overflows supported integer precision (signed 8 bytes integer)");
+                    else
+                        val = -val;
+                }
+                cf.addCounter(new QueryPath(columnFamily(), null, builder.build()), val);
+                return true;
+            case FUNCTION:
+                Operation.Function fOp = (Operation.Function)value;
+                if (!(valueDef.type instanceof CollectionType))
+                    throw new InvalidRequestException(String.format("Invalid operation %s, %s is not a collection", fOp.fct, valueDef.name));
+
+                if ((valueDef.type instanceof ListType) && fOp.fct.needsReading)
+                    ((ListType)valueDef.type).execute(cf, builder, fOp.fct, fOp.arguments, params, list);
                 else
-                    val = -val;
-            }
-            cf.addCounter(new QueryPath(columnFamily(), null, colName), val);
-            return true;
+                    ((CollectionType)valueDef.type).execute(cf, builder, fOp.fct, fOp.arguments, params);
+
+                return false;
         }
+        throw new AssertionError();
     }
 
     public ParsedStatement.Prepared prepare(CFDefinition.Name[] boundNames) throws InvalidRequestException
@@ -239,13 +296,13 @@ public class UpdateStatement extends ModificationStatement
 
         if (columns != null)
         {
-            for (Map.Entry<ColumnIdentifier, Operation> column : columns.entrySet())
+            for (Pair<ColumnIdentifier, Operation> column : columns)
             {
-                if (!column.getValue().isUnary())
+                if (column.right.type == Operation.Type.COUNTER)
                     hasCommutativeOperation = true;
 
-                if (hasCommutativeOperation && column.getValue().isUnary())
-                    throw new InvalidRequestException("Mix of commutative and non-commutative operations is not allowed.");
+                if (hasCommutativeOperation && column.right.type != Operation.Type.COUNTER)
+                    throw new InvalidRequestException("Mix of counter and non-counter operations is not allowed.");
             }
         }
 
@@ -271,23 +328,26 @@ public class UpdateStatement extends ModificationStatement
                 if (name == null)
                     throw new InvalidRequestException(String.format("Unknown identifier %s", columnNames.get(i)));
 
-                Term value = columnValues.get(i);
-                if (value.isBindMarker())
-                    boundNames[value.bindIndex] = name;
+                Value value = columnValues.get(i);
+                for (Term t : value.asList())
+                    if (t.isBindMarker())
+                        boundNames[t.bindIndex] = name;
 
                 switch (name.kind)
                 {
                     case KEY_ALIAS:
                     case COLUMN_ALIAS:
                         if (processedKeys.containsKey(name.name))
-                            throw new InvalidRequestException(String.format("Multiple definition found for PRIMARY KEY part %s", name));
-                        processedKeys.put(name.name, Collections.singletonList(value));
+                            throw new InvalidRequestException(String.format("Multiple definitions found for PRIMARY KEY part %s", name));
+                        if (!(value instanceof Term))
+                            throw new InvalidRequestException(String.format("Invalid definition for %s, not a collection type", name));
+                        processedKeys.put(name.name, Collections.singletonList((Term)value));
                         break;
                     case VALUE_ALIAS:
                     case COLUMN_METADATA:
-                        if (processedColumns.containsKey(name.name))
-                            throw new InvalidRequestException(String.format("Multiple definition found for column %s", name));
-                        processedColumns.put(name.name, new Operation(value));
+                        if (processedColumns.containsKey(name))
+                            throw new InvalidRequestException(String.format("Multiple definitions found for column %s", name));
+                        processedColumns.put(name, new Operation.Set(value));
                         break;
                 }
             }
@@ -295,25 +355,28 @@ public class UpdateStatement extends ModificationStatement
         else
         {
             // Created from an UPDATE
-            for (Map.Entry<ColumnIdentifier, Operation> entry : columns.entrySet())
+            for (Pair<ColumnIdentifier, Operation> entry : columns)
             {
-                CFDefinition.Name name = cfDef.get(entry.getKey());
+                CFDefinition.Name name = cfDef.get(entry.left);
                 if (name == null)
-                    throw new InvalidRequestException(String.format("Unknown identifier %s", entry.getKey()));
+                    throw new InvalidRequestException(String.format("Unknown identifier %s", entry.left));
 
                 switch (name.kind)
                 {
                     case KEY_ALIAS:
                     case COLUMN_ALIAS:
-                        throw new InvalidRequestException(String.format("PRIMARY KEY part %s found in SET part", entry.getKey()));
+                        throw new InvalidRequestException(String.format("PRIMARY KEY part %s found in SET part", entry.left));
                     case VALUE_ALIAS:
                     case COLUMN_METADATA:
-                        if (processedColumns.containsKey(name.name))
-                            throw new InvalidRequestException(String.format("Multiple definition found for column %s", name));
-                        Operation op = entry.getValue();
-                        if (op.value.isBindMarker())
-                            boundNames[op.value.bindIndex] = name;
-                        processedColumns.put(name.name, op);
+                        for (Operation op : processedColumns.get(name))
+                            if (op.type != Operation.Type.FUNCTION)
+                                throw new InvalidRequestException(String.format("Multiple definitions found for column %s", name));
+
+                        Operation op = entry.right;
+                        for (Term t : op.allTerms())
+                            if (t.isBindMarker())
+                                boundNames[t.bindIndex] = name;
+                        processedColumns.put(name, op);
                         break;
                 }
             }
@@ -351,7 +414,7 @@ public class UpdateStatement extends ModificationStatement
                         throw new InvalidRequestException(String.format("Invalid operator %s for key %s", rel.operator(), rel.getEntity()));
 
                     if (processed.containsKey(name.name))
-                        throw new InvalidRequestException(String.format("Multiple definition found for PRIMARY KEY part %s", name));
+                        throw new InvalidRequestException(String.format("Multiple definitions found for PRIMARY KEY part %s", name));
                     for (Term value : values)
                         if (value.isBindMarker())
                             names[value.bindIndex] = name;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/91bdf7fb/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 3ab570a..f335386 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -1448,7 +1448,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
                 data = filter.prune(data);
                 rows.add(new Row(rawRow.key, data));
                 if (data != null)
-                    columnsCount += data.getLiveColumnCount();
+                    columnsCount += filter.lastCounted(data);
                 // Update the underlying filter to avoid querying more columns per slice than necessary and to handle paging
                 filter.updateFilter(columnsCount);
             }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/91bdf7fb/src/java/org/apache/cassandra/db/SliceFromReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SliceFromReadCommand.java b/src/java/org/apache/cassandra/db/SliceFromReadCommand.java
index b9d8a17..2f90106 100644
--- a/src/java/org/apache/cassandra/db/SliceFromReadCommand.java
+++ b/src/java/org/apache/cassandra/db/SliceFromReadCommand.java
@@ -55,7 +55,6 @@ public class SliceFromReadCommand extends ReadCommand
         this.filter = filter;
     }
 
-
     public ReadCommand copy()
     {
         ReadCommand readCommand = new SliceFromReadCommand(table, key, queryPath, filter);
@@ -86,7 +85,7 @@ public class SliceFromReadCommand extends ReadCommand
             // columns, only l/t end up live after reconciliation. So for next
             // round we want to ask x column so that x * (l/t) == t, i.e. x = t^2/l.
             int retryCount = liveColumnsInRow == 0 ? count + 1 : ((count * count) / liveColumnsInRow) + 1;
-            SliceQueryFilter newFilter = new SliceQueryFilter(filter.slices, filter.reversed, retryCount);
+            SliceQueryFilter newFilter = filter.withUpdatedCount(retryCount);
             return new RetriedSliceFromReadCommand(table, key, queryPath, newFilter, getOriginalRequestedCount());
         }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/91bdf7fb/src/java/org/apache/cassandra/db/filter/ColumnCounter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/ColumnCounter.java b/src/java/org/apache/cassandra/db/filter/ColumnCounter.java
new file mode 100644
index 0000000..c642b82
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/filter/ColumnCounter.java
@@ -0,0 +1,108 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.cassandra.db.filter;
+
+import java.nio.ByteBuffer;
+
+import org.apache.cassandra.db.IColumn;
+import org.apache.cassandra.db.IColumnContainer;
+import org.apache.cassandra.db.marshal.CompositeType;
+import org.apache.cassandra.utils.ByteBufferUtil;
+
+public class ColumnCounter
+{
+    protected int count;
+
+    public void countColum(IColumn column, IColumnContainer container)
+    {
+        if (isLive(column, container))
+            count++;
+    }
+
+    protected static boolean isLive(IColumn column, IColumnContainer container)
+    {
+        return column.isLive() && (!container.deletionInfo().isDeleted(column));
+    }
+
+    public int count()
+    {
+        return count;
+    }
+
+    public static class GroupByPrefix extends ColumnCounter
+    {
+        private final CompositeType type;
+        private final int toGroup;
+        private ByteBuffer[] last;
+
+        /**
+         * A column counter that count only 1 for all the columns sharing a
+         * given prefix of the key.
+         *
+         * @param type the type of the column name. This can be null if {@code
+         *             toGroup} is 0, otherwise it should be a composite.
+         * @param toGroup the number of composite components on which to group
+         *                column. If 0, all columns are grouped, otherwise we group
+         *                those for which the {@code toGroup} first component are equals.
+         */
+        public GroupByPrefix(CompositeType type, int toGroup)
+        {
+            this.type = type;
+            this.toGroup = toGroup;
+
+            assert toGroup == 0 || type != null;
+        }
+
+        public void countColum(IColumn column, IColumnContainer container)
+        {
+            if (!isLive(column, container))
+                return;
+
+            if (toGroup == 0)
+            {
+                count = 1;
+                return;
+            }
+
+            ByteBuffer[] current = type.split(column.name());
+            assert current.length >= toGroup;
+
+            if (last != null)
+            {
+                boolean isSameGroup = true;
+                for (int i = 0; i < toGroup; i++)
+                {
+                    if (ByteBufferUtil.compareUnsigned(last[i], current[i]) != 0)
+                    {
+                        isSameGroup = false;
+                        break;
+                    }
+                }
+
+                if (isSameGroup)
+                    return;
+            }
+
+            count++;
+            last = current;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/91bdf7fb/src/java/org/apache/cassandra/db/filter/ExtendedFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/ExtendedFilter.java b/src/java/org/apache/cassandra/db/filter/ExtendedFilter.java
index b3736e8..ec6f6ad 100644
--- a/src/java/org/apache/cassandra/db/filter/ExtendedFilter.java
+++ b/src/java/org/apache/cassandra/db/filter/ExtendedFilter.java
@@ -101,6 +101,14 @@ public abstract class ExtendedFilter
         initialFilter().updateColumnsLimit(remaining);
     }
 
+    public int lastCounted(ColumnFamily data)
+    {
+        if (initialFilter() instanceof SliceQueryFilter)
+            return ((SliceQueryFilter)initialFilter()).lastCounted();
+        else
+            return data.getLiveColumnCount();
+    }
+
     /** The initial filter we'll do our first slice with (either the original or a superset of it) */
     public abstract IFilter initialFilter();
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/91bdf7fb/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java b/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java
index adf4204..2037d66 100644
--- a/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java
+++ b/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java
@@ -34,6 +34,7 @@ import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
 import org.apache.cassandra.db.columniterator.SSTableSliceIterator;
 import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.marshal.CompositeType;
 import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.io.sstable.SSTableReader;
 import org.apache.cassandra.io.util.FileDataInput;
@@ -48,6 +49,12 @@ public class SliceQueryFilter implements IFilter
     public final ColumnSlice[] slices;
     public final boolean reversed;
     public volatile int count;
+    private final int compositesToGroup;
+    // This is a hack to allow rolling upgrade with pre-1.2 nodes
+    private final int countMutliplierForCompatibility;
+
+    // Not serialized, just a ack for range slices to find the number of live column counted, even when we group
+    private ColumnCounter columnCounter;
 
     public SliceQueryFilter(ByteBuffer start, ByteBuffer finish, boolean reversed, int count)
     {
@@ -60,9 +67,21 @@ public class SliceQueryFilter implements IFilter
      */
     public SliceQueryFilter(ColumnSlice[] slices, boolean reversed, int count)
     {
+        this(slices, reversed, count, -1, 1);
+    }
+
+    public SliceQueryFilter(ColumnSlice[] slices, boolean reversed, int count, int compositesToGroup, int countMutliplierForCompatibility)
+    {
         this.slices = slices;
         this.reversed = reversed;
         this.count = count;
+        this.compositesToGroup = compositesToGroup;
+        this.countMutliplierForCompatibility = countMutliplierForCompatibility;
+    }
+
+    public SliceQueryFilter withUpdatedCount(int newCount)
+    {
+        return new SliceQueryFilter(slices, reversed, newCount, compositesToGroup, countMutliplierForCompatibility);
     }
 
     public OnDiskAtomIterator getMemtableColumnIterator(ColumnFamily cf, DecoratedKey key)
@@ -119,25 +138,26 @@ public class SliceQueryFilter implements IFilter
 
     public void collectReducedColumns(IColumnContainer container, Iterator<IColumn> reducedColumns, int gcBefore)
     {
-        int liveColumns = 0;
         AbstractType<?> comparator = container.getComparator();
 
+        if (compositesToGroup < 0)
+            columnCounter = new ColumnCounter();
+        else if (compositesToGroup == 0)
+            columnCounter = new ColumnCounter.GroupByPrefix(null, 0);
+        else
+            columnCounter = new ColumnCounter.GroupByPrefix((CompositeType)comparator, compositesToGroup);
+
         while (reducedColumns.hasNext())
         {
-            if (liveColumns >= count)
+            if (columnCounter.count() >= count)
                 break;
 
             IColumn column = reducedColumns.next();
             if (logger.isDebugEnabled())
                 logger.debug(String.format("collecting %s of %s: %s",
-                                           liveColumns, count, column.getString(comparator)));
+                                           columnCounter.count(), count, column.getString(comparator)));
 
-            // only count live columns towards the `count` criteria
-            if (column.isLive()
-                && (!container.deletionInfo().isDeleted(column)))
-            {
-                liveColumns++;
-            }
+            columnCounter.countColum(column, container);
 
             // but we need to add all non-gc-able columns to the result for read repair:
             if (QueryFilter.isRelevant(column, container, gcBefore))
@@ -161,6 +181,11 @@ public class SliceQueryFilter implements IFilter
         this.slices[0] = new ColumnSlice(start, this.slices[0].finish);
     }
 
+    public int lastCounted()
+    {
+        return columnCounter == null ? 0 : columnCounter.count();
+    }
+
     @Override
     public String toString()
     {
@@ -194,7 +219,15 @@ public class SliceQueryFilter implements IFilter
                     ColumnSlice.serializer.serialize(slice, dos, version);
             }
             dos.writeBoolean(f.reversed);
-            dos.writeInt(f.count);
+            int count = f.count;
+            if (f.compositesToGroup > 0 && version < MessagingService.VERSION_12)
+                count *= f.countMutliplierForCompatibility;
+            dos.writeInt(count);
+
+            if (version < MessagingService.VERSION_12)
+                return;
+
+            dos.writeInt(f.compositesToGroup);
         }
 
         public SliceQueryFilter deserialize(DataInput dis, int version) throws IOException
@@ -212,7 +245,11 @@ public class SliceQueryFilter implements IFilter
             }
             boolean reversed = dis.readBoolean();
             int count = dis.readInt();
-            return new SliceQueryFilter(slices, reversed, count);
+            int compositesToGroup = -1;
+            if (version >= MessagingService.VERSION_12)
+                compositesToGroup = dis.readInt();
+
+            return new SliceQueryFilter(slices, reversed, count, compositesToGroup, 1);
         }
 
         public long serializedSize(SliceQueryFilter f, int version)
@@ -232,6 +269,9 @@ public class SliceQueryFilter implements IFilter
             }
             size += sizes.sizeof(f.reversed);
             size += sizes.sizeof(f.count);
+
+            if (version >= MessagingService.VERSION_12)
+                size += sizes.sizeof(f.compositesToGroup);
             return size;
         }
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/91bdf7fb/src/java/org/apache/cassandra/db/marshal/AbstractCompositeType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/marshal/AbstractCompositeType.java b/src/java/org/apache/cassandra/db/marshal/AbstractCompositeType.java
index 6536318..e138b67 100644
--- a/src/java/org/apache/cassandra/db/marshal/AbstractCompositeType.java
+++ b/src/java/org/apache/cassandra/db/marshal/AbstractCompositeType.java
@@ -68,6 +68,8 @@ public abstract class AbstractCompositeType extends AbstractType<ByteBuffer>
         ByteBuffer bb2 = o2.duplicate();
         int i = 0;
 
+        ByteBuffer previous = null;
+
         while (bb1.remaining() > 0 && bb2.remaining() > 0)
         {
             AbstractType<?> comparator = getComparator(i, bb1, bb2);
@@ -75,10 +77,12 @@ public abstract class AbstractCompositeType extends AbstractType<ByteBuffer>
             ByteBuffer value1 = getWithShortLength(bb1);
             ByteBuffer value2 = getWithShortLength(bb2);
 
-            int cmp = comparator.compare(value1, value2);
+            int cmp = comparator.compareCollectionMembers(value1, value2, previous);
             if (cmp != 0)
                 return cmp;
 
+            previous = value1;
+
             byte b1 = bb1.get();
             byte b2 = bb2.get();
             if (b1 < 0)
@@ -238,6 +242,7 @@ public abstract class AbstractCompositeType extends AbstractType<ByteBuffer>
         ByteBuffer bb = bytes.duplicate();
 
         int i = 0;
+        ByteBuffer previous = null;
         while (bb.remaining() > 0)
         {
             AbstractType<?> comparator = validateComparator(i, bb);
@@ -250,13 +255,15 @@ public abstract class AbstractCompositeType extends AbstractType<ByteBuffer>
                 throw new MarshalException("Not enough bytes to read value of component " + i);
             ByteBuffer value = getBytes(bb, length);
 
-            comparator.validate(value);
+            comparator.validateCollectionMember(value, previous);
 
             if (bb.remaining() == 0)
                 throw new MarshalException("Not enough bytes to read the end-of-component byte of component" + i);
             byte b = bb.get();
             if (b != 0 && bb.remaining() != 0)
                 throw new MarshalException("Invalid bytes remaining after an end-of-component at component" + i);
+
+            previous = value;
             ++i;
         }
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/91bdf7fb/src/java/org/apache/cassandra/db/marshal/AbstractType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/marshal/AbstractType.java b/src/java/org/apache/cassandra/db/marshal/AbstractType.java
index 7eab64c..148138a 100644
--- a/src/java/org/apache/cassandra/db/marshal/AbstractType.java
+++ b/src/java/org/apache/cassandra/db/marshal/AbstractType.java
@@ -204,6 +204,29 @@ public abstract class AbstractType<T> implements Comparator<ByteBuffer>
     }
 
     /**
+     * An alternative comparison function used by CollectionsType in conjunction with CompositeType.
+     *
+     * This comparator is only called to compare components of a CompositeType. It gets the value of the
+     * previous component as argument (or null if it's the first component of the composite).
+     *
+     * Unless you're doing something very similar to CollectionsType, you shouldn't override this.
+     */
+    public int compareCollectionMembers(ByteBuffer v1, ByteBuffer v2, ByteBuffer collectionName)
+    {
+        return compare(v1, v2);
+    }
+
+    /**
+     * An alternative validation function used by CollectionsType in conjunction with CompositeType.
+     *
+     * This is similar to the compare function above.
+     */
+    public void validateCollectionMember(ByteBuffer bytes, ByteBuffer collectionName) throws MarshalException
+    {
+        validate(bytes);
+    }
+
+    /**
      * This must be overriden by subclasses if necessary so that for any
      * AbstractType, this == TypeParser.parse(toString()).
      *

http://git-wip-us.apache.org/repos/asf/cassandra/blob/91bdf7fb/src/java/org/apache/cassandra/db/marshal/CollectionType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/marshal/CollectionType.java b/src/java/org/apache/cassandra/db/marshal/CollectionType.java
new file mode 100644
index 0000000..ab8f15b
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/marshal/CollectionType.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.marshal;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.EnumSet;
+import java.util.List;
+
+import org.apache.cassandra.cql3.ColumnNameBuilder;
+import org.apache.cassandra.cql3.Term;
+import org.apache.cassandra.cql3.UpdateParameters;
+import org.apache.cassandra.db.ColumnFamily;
+import org.apache.cassandra.db.IColumn;
+import org.apache.cassandra.config.ConfigurationException;
+import org.apache.cassandra.thrift.InvalidRequestException;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.Pair;
+
+/**
+ * The abstract validator that is the base for maps, sets and lists.
+ *
+ * Please note that this comparator shouldn't be used "manually" (through thrift for instance).
+ *
+ */
+public abstract class CollectionType extends AbstractType<ByteBuffer>
+{
+    public enum Kind
+    {
+        MAP, SET, LIST
+    }
+
+    public enum Function
+    {
+        APPEND       (false, Kind.LIST),
+        PREPEND      (false, Kind.LIST),
+        SET          ( true, Kind.LIST, Kind.MAP),
+        ADD          (false, Kind.SET),
+        DISCARD_LIST ( true, Kind.LIST),
+        DISCARD_SET  (false, Kind.SET),
+        DISCARD_KEY  ( true, Kind.LIST, Kind.MAP);
+
+        public final boolean needsReading;
+        public final EnumSet<Kind> validReceivers;
+
+        private Function(boolean needsReading, Kind ... validReceivers)
+        {
+            this.needsReading = needsReading;
+            this.validReceivers = EnumSet.copyOf(Arrays.asList(validReceivers));
+        }
+    }
+
+    public final Kind kind;
+
+    protected CollectionType(Kind kind)
+    {
+        this.kind = kind;
+    }
+
+    protected abstract AbstractType<?> nameComparator();
+    protected abstract AbstractType<?> valueComparator();
+    protected abstract void appendToStringBuilder(StringBuilder sb);
+
+    public void execute(ColumnFamily cf, ColumnNameBuilder fullPath, Function fct, List<Term> args, UpdateParameters params) throws InvalidRequestException
+    {
+        if (!fct.validReceivers.contains(kind))
+            throw new InvalidRequestException(String.format("Invalid operation %s for %s collection", fct, kind));
+
+        executeFunction(cf, fullPath, fct, args, params);
+    }
+
+    public abstract void executeFunction(ColumnFamily cf, ColumnNameBuilder fullPath, Function fct, List<Term> args, UpdateParameters params) throws InvalidRequestException;
+
+    public abstract ByteBuffer serializeForThrift(List<Pair<ByteBuffer, IColumn>> columns);
+
+    @Override
+    public String toString()
+    {
+        StringBuilder sb = new StringBuilder();
+        appendToStringBuilder(sb);
+        return sb.toString();
+    }
+
+    public int compare(ByteBuffer o1, ByteBuffer o2)
+    {
+        throw new UnsupportedOperationException("CollectionType should not be use directly as a comparator");
+    }
+
+    public ByteBuffer compose(ByteBuffer bytes)
+    {
+        return BytesType.instance.compose(bytes);
+    }
+
+    public ByteBuffer decompose(ByteBuffer value)
+    {
+        return BytesType.instance.decompose(value);
+    }
+
+    public String getString(ByteBuffer bytes)
+    {
+        return BytesType.instance.getString(bytes);
+    }
+
+    public ByteBuffer fromString(String source)
+    {
+        try
+        {
+            return ByteBufferUtil.hexToBytes(source);
+        }
+        catch (NumberFormatException e)
+        {
+            throw new MarshalException(String.format("cannot parse '%s' as hex bytes", source), e);
+        }
+    }
+
+    public void validate(ByteBuffer bytes)
+    {
+        valueComparator().validate(bytes);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/91bdf7fb/src/java/org/apache/cassandra/db/marshal/ColumnToCollectionType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/marshal/ColumnToCollectionType.java b/src/java/org/apache/cassandra/db/marshal/ColumnToCollectionType.java
new file mode 100644
index 0000000..4ba73aa
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/marshal/ColumnToCollectionType.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.marshal;
+
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
+
+import com.google.common.collect.ImmutableMap;
+
+import org.apache.cassandra.config.ConfigurationException;
+import org.apache.cassandra.utils.ByteBufferUtil;
+
+public class ColumnToCollectionType extends AbstractType<ByteBuffer>
+{
+    // interning instances
+    private static final Map<Map<ByteBuffer, CollectionType>, ColumnToCollectionType> instances = new HashMap<Map<ByteBuffer, CollectionType>, ColumnToCollectionType>();
+
+    public final Map<ByteBuffer, CollectionType> defined;
+
+    public static ColumnToCollectionType getInstance(TypeParser parser) throws ConfigurationException
+    {
+        return getInstance(parser.getCollectionsParameters());
+    }
+
+    public static synchronized ColumnToCollectionType getInstance(Map<ByteBuffer, CollectionType> defined)
+    {
+        assert defined != null;
+
+        ColumnToCollectionType t = instances.get(defined);
+        if (t == null)
+        {
+            t = new ColumnToCollectionType(defined);
+            instances.put(defined, t);
+        }
+        return t;
+    }
+
+    private ColumnToCollectionType(Map<ByteBuffer, CollectionType> defined)
+    {
+        this.defined = ImmutableMap.copyOf(defined);
+    }
+
+    public int compare(ByteBuffer o1, ByteBuffer o2)
+    {
+        throw new UnsupportedOperationException("ColumnToCollectionType should only be used in composite types, never alone");
+    }
+
+    public int compareCollectionMembers(ByteBuffer o1, ByteBuffer o2, ByteBuffer collectionName)
+    {
+        CollectionType t = defined.get(collectionName);
+        if (t == null)
+            throw new RuntimeException(ByteBufferUtil.bytesToHex(collectionName) + " is not defined as a collection");
+
+        return t.nameComparator().compare(o1, o2);
+    }
+
+    public ByteBuffer compose(ByteBuffer bytes)
+    {
+        return BytesType.instance.compose(bytes);
+    }
+
+    public ByteBuffer decompose(ByteBuffer value)
+    {
+        return BytesType.instance.decompose(value);
+    }
+
+    public String getString(ByteBuffer bytes)
+    {
+        return BytesType.instance.getString(bytes);
+    }
+
+    public ByteBuffer fromString(String source)
+    {
+        try
+        {
+            return ByteBufferUtil.hexToBytes(source);
+        }
+        catch (NumberFormatException e)
+        {
+            throw new MarshalException(String.format("cannot parse '%s' as hex bytes", source), e);
+        }
+    }
+
+    public void validate(ByteBuffer bytes)
+    {
+        throw new UnsupportedOperationException("ColumnToCollectionType should only be used in composite types, never alone");
+    }
+
+    public void validateCollectionMember(ByteBuffer bytes, ByteBuffer collectionName) throws MarshalException
+    {
+        CollectionType t = defined.get(collectionName);
+        if (t == null)
+            throw new MarshalException(ByteBufferUtil.bytesToHex(collectionName) + " is not defined as a collection");
+
+        t.nameComparator().validate(bytes);
+    }
+
+    @Override
+    public boolean isCompatibleWith(AbstractType<?> previous)
+    {
+        if (!(previous instanceof ColumnToCollectionType))
+            return false;
+
+        ColumnToCollectionType prev = (ColumnToCollectionType)previous;
+        // We are compatible if we have all the definitions previous have (but we can have more).
+        for (Map.Entry<ByteBuffer, CollectionType> entry : prev.defined.entrySet())
+        {
+            if (!entry.getValue().isCompatibleWith(defined.get(entry.getKey())))
+                return false;
+        }
+        return true;
+    }
+
+    @Override
+    public String toString()
+    {
+        return getClass().getName() + TypeParser.stringifyCollectionsParameters(defined);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/91bdf7fb/src/java/org/apache/cassandra/db/marshal/CompositeType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/marshal/CompositeType.java b/src/java/org/apache/cassandra/db/marshal/CompositeType.java
index 4d26dac..516c44b 100644
--- a/src/java/org/apache/cassandra/db/marshal/CompositeType.java
+++ b/src/java/org/apache/cassandra/db/marshal/CompositeType.java
@@ -18,11 +18,14 @@
 package org.apache.cassandra.db.marshal;
 
 import java.nio.ByteBuffer;
+import java.util.Arrays;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import com.google.common.collect.ImmutableList;
+
 import org.apache.cassandra.config.ConfigurationException;
 import org.apache.cassandra.cql3.ColumnNameBuilder;
 import org.apache.cassandra.cql3.Relation;
@@ -78,7 +81,7 @@ public class CompositeType extends AbstractCompositeType
 
     private CompositeType(List<AbstractType<?>> types)
     {
-        this.types = types;
+        this.types = ImmutableList.copyOf(types);
     }
 
     protected AbstractType<?> getComparator(int i, ByteBuffer bb)
@@ -195,18 +198,29 @@ public class CompositeType extends AbstractCompositeType
         private final CompositeType composite;
         private int current;
 
-        private final DataOutputBuffer out = new DataOutputBuffer();
+        private final List<ByteBuffer> components;
+        private final byte[] endOfComponents;
+        private int serializedSize;
 
         public Builder(CompositeType composite)
         {
+            this(composite, new ArrayList<ByteBuffer>(composite.types.size()), new byte[composite.types.size()]);
+        }
+
+        public Builder(CompositeType composite, List<ByteBuffer> components, byte[] endOfComponents)
+        {
+            assert endOfComponents.length == composite.types.size();
+
             this.composite = composite;
+            this.components = components;
+            this.endOfComponents = endOfComponents;
         }
 
         private Builder(Builder b)
         {
-            this(b.composite);
+            this(b.composite, new ArrayList<ByteBuffer>(b.components), Arrays.copyOf(b.endOfComponents, b.endOfComponents.length));
             this.current = b.current;
-            out.write(b.out.getData(), 0, b.out.getLength());
+            this.serializedSize = b.serializedSize;
         }
 
         public Builder add(Term t, Relation.Type op, List<ByteBuffer> variables) throws InvalidRequestException
@@ -214,9 +228,9 @@ public class CompositeType extends AbstractCompositeType
             if (current >= composite.types.size())
                 throw new IllegalStateException("Composite column is already fully constructed");
 
-            AbstractType currentType = composite.types.get(current++);
+            AbstractType currentType = composite.types.get(current);
             ByteBuffer buffer = t.getByteBuffer(currentType, variables);
-            ByteBufferUtil.writeWithShortLength(buffer, out);
+            components.add(buffer);
 
             /*
              * Given the rules for eoc (end-of-component, see AbstractCompositeType.compare()),
@@ -230,16 +244,17 @@ public class CompositeType extends AbstractCompositeType
             switch (op)
             {
                 case LT:
-                    out.write((byte) -1);
+                    endOfComponents[current] = (byte) -1;
                     break;
                 case GT:
                 case LTE:
-                    out.write((byte) 1);
+                    endOfComponents[current] = (byte) 1;
                     break;
                 default:
-                    out.write((byte) 0);
+                    endOfComponents[current] = (byte) 0;
                     break;
             }
+            ++current;
             return this;
         }
 
@@ -248,8 +263,8 @@ public class CompositeType extends AbstractCompositeType
             if (current >= composite.types.size())
                 throw new IllegalStateException("Composite column is already fully constructed");
 
-            ByteBufferUtil.writeWithShortLength(bb, out);
-            out.write((byte) 0);
+            components.add(bb);
+            endOfComponents[current++] = (byte) 0;
             return this;
         }
 
@@ -260,7 +275,12 @@ public class CompositeType extends AbstractCompositeType
 
         public ByteBuffer build()
         {
-            // potentially slightly space-wasteful in favor of avoiding a copy
+            DataOutputBuffer out = new DataOutputBuffer(serializedSize);
+            for (int i = 0; i < components.size(); i++)
+            {
+                ByteBufferUtil.writeWithShortLength(components.get(i), out);
+                out.write(endOfComponents[i]);
+            }
             return ByteBuffer.wrap(out.getData(), 0, out.getLength());
         }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/91bdf7fb/src/java/org/apache/cassandra/db/marshal/EmptyType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/marshal/EmptyType.java b/src/java/org/apache/cassandra/db/marshal/EmptyType.java
new file mode 100644
index 0000000..6bbd006
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/marshal/EmptyType.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.marshal;
+
+import java.nio.ByteBuffer;
+
+import org.apache.cassandra.utils.ByteBufferUtil;
+
+/**
+ * A type that only accept empty data.
+ * It is only useful as a value validation type, not as a comparator since column names can't be empty.
+ */
+public class EmptyType extends AbstractType<Void>
+{
+    public static final EmptyType instance = new EmptyType();
+
+    private EmptyType() {} // singleton
+
+    public Void compose(ByteBuffer bytes)
+    {
+        return null;
+    }
+
+    public ByteBuffer decompose(Void value)
+    {
+        return ByteBufferUtil.EMPTY_BYTE_BUFFER;
+    }
+
+    public int compare(ByteBuffer o1, ByteBuffer o2)
+    {
+        return 0;
+    }
+
+    public String getString(ByteBuffer bytes)
+    {
+        return "";
+    }
+
+    public ByteBuffer fromString(String source) throws MarshalException
+    {
+        if (!source.isEmpty())
+            throw new MarshalException(String.format("'%s' is not empty", source));
+
+        return ByteBufferUtil.EMPTY_BYTE_BUFFER;
+    }
+
+    public void validate(ByteBuffer bytes) throws MarshalException
+    {
+        if (bytes.remaining() > 0)
+            throw new MarshalException("EmptyType only accept empty values");
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/91bdf7fb/src/java/org/apache/cassandra/db/marshal/ListType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/marshal/ListType.java b/src/java/org/apache/cassandra/db/marshal/ListType.java
new file mode 100644
index 0000000..8ad4590
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/marshal/ListType.java
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.marshal;
+
+import java.nio.ByteBuffer;
+import java.util.*;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.cassandra.cql3.ColumnNameBuilder;
+import org.apache.cassandra.cql3.Term;
+import org.apache.cassandra.cql3.UpdateParameters;
+import org.apache.cassandra.db.ColumnFamily;
+import org.apache.cassandra.db.IColumn;
+import org.apache.cassandra.config.ConfigurationException;
+import org.apache.cassandra.thrift.InvalidRequestException;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.Pair;
+import org.apache.cassandra.utils.UUIDGen;
+
+public class ListType extends CollectionType
+{
+    // interning instances
+    private static final Map<AbstractType<?>, ListType> instances = new HashMap<AbstractType<?>, ListType>();
+
+    // Our reference time (1 jan 2010, 00:00:00) in milliseconds.
+    private static final long REFERENCE_TIME = 1262304000000L;
+
+    /*
+     * For prepend, we need to be able to generate unique but decreasing time
+     * UUID, which is a bit challenging. To do that, given a time in milliseconds,
+     * we adds a number represening the 100-nanoseconds precision and make sure
+     * that within the same millisecond, that number is always increasing. We
+     * do rely on the fact that the user will only provide decreasing
+     * milliseconds timestamp for that purpose.
+     */
+    private static class PrecisionTime
+    {
+        public final long millis;
+        public final int nanos;
+
+        public PrecisionTime(long millis, int nanos)
+        {
+            this.millis = millis;
+            this.nanos = nanos;
+        }
+    }
+
+    private static final AtomicReference<PrecisionTime> last = new AtomicReference<PrecisionTime>(new PrecisionTime(Long.MAX_VALUE, 0));
+
+    private static PrecisionTime getNextTime(long millis)
+    {
+        while (true)
+        {
+            PrecisionTime current = last.get();
+            assert millis <= current.millis;
+            PrecisionTime next = millis < current.millis
+                               ? new PrecisionTime(millis, 0)
+                               : new PrecisionTime(millis, current.nanos + 1);
+            if (last.compareAndSet(current, next))
+                return next;
+        }
+    }
+
+    public final AbstractType<?> elements;
+
+    public static ListType getInstance(TypeParser parser) throws ConfigurationException
+    {
+        List<AbstractType<?>> l = parser.getTypeParameters();
+        if (l.size() != 1)
+            throw new ConfigurationException("ListType takes exactly 1 type parameter");
+
+        return getInstance(l.get(0));
+    }
+
+    public static synchronized ListType getInstance(AbstractType<?> elements)
+    {
+        ListType t = instances.get(elements);
+        if (t == null)
+        {
+            t = new ListType(elements);
+            instances.put(elements, t);
+        }
+        return t;
+    }
+
+    private ListType(AbstractType<?> elements)
+    {
+        super(Kind.LIST);
+        this.elements = elements;
+    }
+
+    protected AbstractType<?> nameComparator()
+    {
+        return TimeUUIDType.instance;
+    }
+
+    protected AbstractType<?> valueComparator()
+    {
+        return elements;
+    }
+
+    protected void appendToStringBuilder(StringBuilder sb)
+    {
+        sb.append(getClass().getName()).append(TypeParser.stringifyTypeParameters(Collections.<AbstractType<?>>singletonList(elements)));
+    }
+
+    public void executeFunction(ColumnFamily cf, ColumnNameBuilder fullPath, Function fct, List<Term> args, UpdateParameters params) throws InvalidRequestException
+    {
+        switch (fct)
+        {
+            case APPEND:
+                doAppend(cf, fullPath, args, params);
+                break;
+            case PREPEND:
+                doPrepend(cf, fullPath, args, params);
+                break;
+            default:
+                throw new AssertionError("Unsupported function " + fct);
+        }
+    }
+
+    public void execute(ColumnFamily cf, ColumnNameBuilder fullPath, Function fct, List<Term> args, UpdateParameters params, List<Pair<ByteBuffer, IColumn>> list) throws InvalidRequestException
+    {
+        switch (fct)
+        {
+            case SET:
+                doSet(cf, fullPath, validateIdx(fct, args.get(0), list), args.get(1), params, list);
+                break;
+            case DISCARD_LIST:
+                // If list is empty, do nothing
+                if (list != null)
+                    doDiscard(cf, fullPath, args, params, list);
+                break;
+            case DISCARD_KEY:
+                doDiscardIdx(cf, fullPath, validateIdx(fct, args.get(0), list), params, list);
+                break;
+            default:
+                throw new AssertionError();
+        }
+    }
+
+    private int validateIdx(Function fct, Term value, List<Pair<ByteBuffer, IColumn>> list) throws InvalidRequestException
+    {
+        try
+        {
+            if (value.getType() != Term.Type.INTEGER)
+                throw new InvalidRequestException(String.format("Invalid argument %s for %s, must be an integer", value.getText(), fct));
+            int idx = Integer.parseInt(value.getText());
+            if (list == null || list.size() <= idx)
+                throw new InvalidRequestException(String.format("Invalid index %d, list has size %d", idx, list == null ? 0 : list.size()));
+            return idx;
+        }
+        catch (NumberFormatException e)
+        {
+            // This should not happen, unless we screwed up the parser
+            throw new RuntimeException();
+        }
+    }
+
+    private void doPrepend(ColumnFamily cf, ColumnNameBuilder builder, List<Term> values, UpdateParameters params) throws InvalidRequestException
+    {
+        long time = REFERENCE_TIME - (System.currentTimeMillis() - REFERENCE_TIME);
+        // We do the loop in reverse order because getNext() will create increasing time but we want the last
+        // value in the prepended list to have the lower time
+        for (int i = values.size() - 1; i >= 0; i--)
+        {
+            ColumnNameBuilder b = i == 0 ? builder : builder.copy();
+            PrecisionTime pt = getNextTime(time);
+            ByteBuffer uuid = ByteBuffer.wrap(UUIDGen.getTimeUUIDBytes(pt.millis, pt.nanos));
+            ByteBuffer name = b.add(uuid).build();
+            cf.addColumn(params.makeColumn(name, values.get(i).getByteBuffer(elements, params.variables)));
+        }
+    }
+
+    private void doAppend(ColumnFamily cf, ColumnNameBuilder builder, List<Term> values, UpdateParameters params) throws InvalidRequestException
+    {
+        for (int i = 0; i < values.size(); i++)
+        {
+            ColumnNameBuilder b = i == values.size() - 1 ? builder : builder.copy();
+            ByteBuffer uuid = ByteBuffer.wrap(UUIDGen.getTimeUUIDBytes());
+            ByteBuffer name = b.add(uuid).build();
+            cf.addColumn(params.makeColumn(name, values.get(i).getByteBuffer(elements, params.variables)));
+        }
+    }
+
+    public void doSet(ColumnFamily cf, ColumnNameBuilder builder, int idx, Term value, UpdateParameters params, List<Pair<ByteBuffer, IColumn>> list) throws InvalidRequestException
+    {
+        ByteBuffer name = list.get(idx).right.name();
+        cf.addColumn(params.makeColumn(name, value.getByteBuffer(elements, params.variables)));
+    }
+
+    public void doDiscard(ColumnFamily cf, ColumnNameBuilder builder, List<Term> values, UpdateParameters params, List<Pair<ByteBuffer, IColumn>> list) throws InvalidRequestException
+    {
+        Set<ByteBuffer> toDiscard = new HashSet<ByteBuffer>();
+        for (Term value : values)
+            toDiscard.add(value.getByteBuffer(elements, params.variables));
+
+        for (Pair<ByteBuffer, IColumn> p : list)
+        {
+            IColumn c = p.right;
+            if (toDiscard.contains(c.value()))
+                cf.addColumn(params.makeTombstone(c.name()));
+        }
+    }
+
+    public void doDiscardIdx(ColumnFamily cf, ColumnNameBuilder builder, int idx, UpdateParameters params, List<Pair<ByteBuffer, IColumn>> list) throws InvalidRequestException
+    {
+        ByteBuffer name = list.get(idx).right.name();
+        cf.addColumn(params.makeTombstone(name));
+    }
+
+    public ByteBuffer serializeForThrift(List<Pair<ByteBuffer, IColumn>> columns)
+    {
+        List<Object> l = new ArrayList<Object>(columns.size());
+        for (Pair<ByteBuffer, IColumn> p : columns)
+            l.add(elements.compose(p.right.value()));
+        return ByteBufferUtil.bytes(FBUtilities.json(l));
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/91bdf7fb/src/java/org/apache/cassandra/db/marshal/MapType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/marshal/MapType.java b/src/java/org/apache/cassandra/db/marshal/MapType.java
new file mode 100644
index 0000000..8e29520
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/marshal/MapType.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.marshal;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+import org.apache.cassandra.cql3.ColumnNameBuilder;
+import org.apache.cassandra.cql3.Term;
+import org.apache.cassandra.cql3.UpdateParameters;
+import org.apache.cassandra.db.ColumnFamily;
+import org.apache.cassandra.db.IColumn;
+import org.apache.cassandra.config.ConfigurationException;
+import org.apache.cassandra.thrift.InvalidRequestException;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.Pair;
+
+public class MapType extends CollectionType
+{
+    // interning instances
+    private static final Map<Pair<AbstractType<?>, AbstractType<?>>, MapType> instances = new HashMap<Pair<AbstractType<?>, AbstractType<?>>, MapType>();
+
+    public final AbstractType<?> keys;
+    public final AbstractType<?> values;
+
+    public static MapType getInstance(TypeParser parser) throws ConfigurationException
+    {
+        List<AbstractType<?>> l = parser.getTypeParameters();
+        if (l.size() != 2)
+            throw new ConfigurationException("MapType takes exactly 2 type parameters");
+
+        return getInstance(l.get(0), l.get(1));
+    }
+
+    public static synchronized MapType getInstance(AbstractType<?> keys, AbstractType<?> values)
+    {
+        Pair<AbstractType<?>, AbstractType<?>> p = Pair.<AbstractType<?>, AbstractType<?>>create(keys, values);
+        MapType t = instances.get(p);
+        if (t == null)
+        {
+            t = new MapType(keys, values);
+            instances.put(p, t);
+        }
+        return t;
+    }
+
+    private MapType(AbstractType<?> keys, AbstractType<?> values)
+    {
+        super(Kind.MAP);
+        this.keys = keys;
+        this.values = values;
+    }
+
+    protected AbstractType<?> nameComparator()
+    {
+        return keys;
+    }
+
+    protected AbstractType<?> valueComparator()
+    {
+        return values;
+    }
+
+    protected void appendToStringBuilder(StringBuilder sb)
+    {
+        sb.append(getClass().getName()).append(TypeParser.stringifyTypeParameters(Arrays.asList(keys, values)));
+    }
+
+    public void executeFunction(ColumnFamily cf, ColumnNameBuilder fullPath, Function fct, List<Term> args, UpdateParameters params) throws InvalidRequestException
+    {
+        switch (fct)
+        {
+            case SET:
+                doPut(cf, fullPath, args, params);
+                break;
+            case DISCARD_KEY:
+                doDiscard(cf, fullPath, args.get(0), params);
+                break;
+            default:
+                throw new AssertionError("Unsupported function " + fct);
+        }
+    }
+
+    private void doPut(ColumnFamily cf, ColumnNameBuilder builder, List<Term> args, UpdateParameters params) throws InvalidRequestException
+    {
+        assert args.size() % 2 == 0;
+        Iterator<Term> iter = args.iterator();
+        while (iter.hasNext())
+        {
+            ByteBuffer name = builder.copy().add(iter.next().getByteBuffer(keys, params.variables)).build();
+            ByteBuffer value = iter.next().getByteBuffer(values, params.variables);
+            cf.addColumn(params.makeColumn(name, value));
+        }
+    }
+
+    private void doDiscard(ColumnFamily cf, ColumnNameBuilder builder, Term value, UpdateParameters params) throws InvalidRequestException
+    {
+        ByteBuffer name = builder.add(value.getByteBuffer(keys, params.variables)).build();
+        cf.addColumn(params.makeTombstone(name));
+    }
+
+    public ByteBuffer serializeForThrift(List<Pair<ByteBuffer, IColumn>> columns)
+    {
+        Map<String, Object> m = new LinkedHashMap<String, Object>();
+        for (Pair<ByteBuffer, IColumn> p : columns)
+            m.put(keys.getString(p.left), values.compose(p.right.value()));
+        return ByteBufferUtil.bytes(FBUtilities.json(m));
+    }
+}


Mime
View raw message