cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tylerho...@apache.org
Subject [2/3] Support multi-row selects within a partition using IN
Date Thu, 22 May 2014 19:20:05 GMT
http://git-wip-us.apache.org/repos/asf/cassandra/blob/43496384/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
index 6b4309f..e339ccb 100644
--- a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
@@ -30,7 +30,6 @@ import org.apache.cassandra.auth.Permission;
 import org.apache.cassandra.cql3.*;
 import org.apache.cassandra.transport.messages.ResultMessage;
 import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.context.CounterContext;
 import org.apache.cassandra.db.filter.*;
@@ -71,8 +70,13 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
     private final Selection selection;
     private final Term limit;
 
+    /** Restrictions on partitioning columns */
     private final Restriction[] keyRestrictions;
+
+    /** Restrictions on clustering columns */
     private final Restriction[] columnRestrictions;
+
+    /** Restrictions on non-primary key columns (i.e. secondary index restrictions) */
     private final Map<CFDefinition.Name, Restriction> metadataRestrictions = new HashMap<CFDefinition.Name, Restriction>();
 
     // The name of all restricted names not covered by the key or index filter
@@ -288,9 +292,9 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
         return rows;
     }
 
-    public ResultMessage.Rows executeInternal(QueryState state) throws RequestExecutionException, RequestValidationException
+    public ResultMessage.Rows executeInternal(QueryState state, QueryOptions options) throws RequestExecutionException, RequestValidationException
     {
-        List<ByteBuffer> variables = Collections.emptyList();
+        List<ByteBuffer> variables = options.getValues();
         int limit = getLimit(variables);
         int limitForQuery = updateLimitForQuery(limit);
         long now = System.currentTimeMillis();
@@ -331,7 +335,7 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
         if (keys.isEmpty()) // in case of IN () for (the last column of) the partition key.
             return null;
 
-        List<ReadCommand> commands = new ArrayList<ReadCommand>(keys.size());
+        List<ReadCommand> commands = new ArrayList<>(keys.size());
 
         IDiskAtomFilter filter = makeFilter(variables, limit);
         if (filter == null)
@@ -627,7 +631,11 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
     {
         assert onToken;
 
-        Restriction keyRestriction = keyRestrictions[0];
+        Restriction restriction = keyRestrictions[0];
+
+        assert !restriction.isMultiColumn() : "Unexpectedly got a multi-column restriction on a partition key for a range query";
+        SingleColumnRestriction keyRestriction = (SingleColumnRestriction)restriction;
+
         ByteBuffer value;
         if (keyRestriction.isEQ())
         {
@@ -635,7 +643,7 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
         }
         else
         {
-            Restriction.Slice slice = (Restriction.Slice)keyRestriction;
+            SingleColumnRestriction.Slice slice = (SingleColumnRestriction.Slice)keyRestriction;
             if (!slice.hasBound(b))
                 return p.getMinimumToken();
 
@@ -654,7 +662,10 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
             if (r == null)
                 return true;
             else if (r.isSlice())
-                return ((Restriction.Slice)r).isInclusive(b);
+            {
+                assert !r.isMultiColumn() : "Unexpectedly got multi-column restriction on partition key";
+                return ((SingleColumnRestriction.Slice)r).isInclusive(b);
+            }
         }
         // All equality
         return true;
@@ -687,7 +698,7 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
         Iterator<CFDefinition.Name> idIter = cfDef.clusteringColumns().iterator();
         for (Restriction r : columnRestrictions)
         {
-            ColumnIdentifier id = idIter.next().name;
+            CFDefinition.Name name = idIter.next();
             assert r != null && !r.isSlice();
 
             List<ByteBuffer> values = r.values(variables);
@@ -695,7 +706,7 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
             {
                 ByteBuffer val = values.get(0);
                 if (val == null)
-                    throw new InvalidRequestException(String.format("Invalid null value for clustering key part %s", id));
+                    throw new InvalidRequestException(String.format("Invalid null value for clustering key part %s", name.name));
                 builder.add(val);
             }
             else
@@ -712,7 +723,7 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
                     ByteBuffer val = iter.next();
                     ColumnNameBuilder b = iter.hasNext() ? builder.copy() : builder;
                     if (val == null)
-                        throw new InvalidRequestException(String.format("Invalid null value for clustering key part %s", id));
+                        throw new InvalidRequestException(String.format("Invalid null value for clustering key part %s", name.name));
                     b.add(val);
                     if (cfDef.isCompact)
                         columns.add(b.build());
@@ -790,6 +801,22 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
                                         ColumnNameBuilder builder,
                                         List<ByteBuffer> variables) throws InvalidRequestException
     {
+
+        // check the first restriction to see if we're dealing with a multi-column restriction
+        if (!names.isEmpty())
+        {
+            Restriction firstRestriction = restrictions[0];
+            if (firstRestriction != null && firstRestriction.isMultiColumn())
+            {
+                if (firstRestriction.isSlice())
+                    return buildMultiColumnSliceBound(bound, names, (MultiColumnRestriction.Slice) firstRestriction, isReversed, builder, variables);
+                else if (firstRestriction.isIN())
+                    return buildMultiColumnInBound(bound, names, (MultiColumnRestriction.IN) firstRestriction, isReversed, builder, variables);
+                else
+                    return buildMultiColumnEQBound(bound, (MultiColumnRestriction.EQ) firstRestriction, isReversed, builder, variables);
+            }
+        }
+
         // The end-of-component of composite doesn't depend on whether the
         // component type is reversed or not (i.e. the ReversedType is applied
         // to the component comparator but not to the end-of-component itself),
@@ -813,23 +840,10 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
                                                  ? builder.buildAsEndOfRange()
                                                  : builder.build());
             }
-
             if (r.isSlice())
             {
-                builder.add(getSliceValue(name, r, b, variables));
+                builder.add(getSliceValue(r, b, variables));
                 Relation.Type relType = ((Restriction.Slice)r).getRelation(eocBound, b);
-
-                // We can have more non null restriction if the "scalar" notation was used for the bound (#4851).
-                // In that case, we need to add them all, and end the cell name with the correct end-of-component.
-                while (iter.hasNext())
-                {
-                    name = iter.next();
-                    r = restrictions[name.position];
-                    if (isNullRestriction(r, b))
-                        break;
-
-                    builder.add(getSliceValue(name, r, b, variables));
-                }
                 return Collections.singletonList(builder.buildForRelation(relType));
             }
             else
@@ -837,11 +851,11 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
                 List<ByteBuffer> values = r.values(variables);
                 if (values.size() != 1)
                 {
-                    // IN query, we only support it on the clustering column
+                    // IN query, we only support it on the clustering columns
                     assert name.position == names.size() - 1;
                     // The IN query might not have listed the values in comparator order, so we need to re-sort
                     // the bounds lists to make sure the slices works correctly (also, to avoid duplicates).
-                    TreeSet<ByteBuffer> s = new TreeSet<ByteBuffer>(isReversed ? cfDef.cfm.comparator.reverseComparator : cfDef.cfm.comparator);
+                    TreeSet<ByteBuffer> s = new TreeSet<>(isReversed ? cfDef.cfm.comparator.reverseComparator : cfDef.cfm.comparator);
                     for (ByteBuffer val : values)
                     {
                         if (val == null)
@@ -850,7 +864,7 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
                         // See below for why this
                         s.add((b == Bound.END && copy.remainingCount() > 0) ? copy.buildAsEndOfRange() : copy.build());
                     }
-                    return new ArrayList<ByteBuffer>(s);
+                    return new ArrayList<>(s);
                 }
 
                 ByteBuffer val = values.get(0);
@@ -868,18 +882,92 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
         return Collections.singletonList((bound == Bound.END && builder.remainingCount() > 0) ? builder.buildAsEndOfRange() : builder.build());
     }
 
+    private List<ByteBuffer> buildMultiColumnSliceBound(Bound bound,
+                                                        Collection<CFDefinition.Name> names,
+                                                        MultiColumnRestriction.Slice slice,
+                                                        boolean isReversed,
+                                                        ColumnNameBuilder builder,
+                                                        List<ByteBuffer> variables) throws InvalidRequestException
+    {
+        Bound eocBound = isReversed ? Bound.reverse(bound) : bound;
+
+        Iterator<CFDefinition.Name> iter = names.iterator();
+        CFDefinition.Name firstName = iter.next();
+        // A hack to preserve pre-6875 behavior for tuple-notation slices where the comparator mixes ASCENDING
+        // and DESCENDING orders.  This stores the bound for the first component; we will re-use it for all following
+        // components, even if they don't match the first component's reversal/non-reversal.  Note that this does *not*
+        // guarantee correct query results, it just preserves the previous behavior.
+        Bound firstComponentBound = isReversed == isReversedType(firstName) ? bound : Bound.reverse(bound);
+
+        if (!slice.hasBound(firstComponentBound))
+            return Collections.singletonList(builder.componentCount() > 0 && eocBound == Bound.END
+                    ? builder.buildAsEndOfRange()
+                    : builder.build());
+
+        List<ByteBuffer> vals = slice.componentBounds(firstComponentBound, variables);
+        builder.add(vals.get(firstName.position));
+
+        while(iter.hasNext())
+        {
+            CFDefinition.Name name = iter.next();
+            if (name.position >= vals.size())
+                break;
+
+            builder.add(vals.get(name.position));
+        }
+        Relation.Type relType = slice.getRelation(eocBound, firstComponentBound);
+        return Collections.singletonList(builder.buildForRelation(relType));
+    }
+
+    private List<ByteBuffer> buildMultiColumnInBound(Bound bound,
+                                                     Collection<CFDefinition.Name> names,
+                                                     MultiColumnRestriction.IN restriction,
+                                                     boolean isReversed,
+                                                     ColumnNameBuilder builder,
+                                                     List<ByteBuffer> variables) throws InvalidRequestException
+    {
+        List<List<ByteBuffer>> splitInValues = restriction.splitValues(variables);
+
+        // The IN query might not have listed the values in comparator order, so we need to re-sort
+        // the bounds lists to make sure the slices works correctly (also, to avoid duplicates).
+        TreeSet<ByteBuffer> inValues = new TreeSet<>(isReversed ? cfDef.cfm.comparator.reverseComparator : cfDef.cfm.comparator);
+        Iterator<CFDefinition.Name> iter = names.iterator();
+        for (List<ByteBuffer> components : splitInValues)
+        {
+            ColumnNameBuilder nameBuilder = builder.copy();
+            for (ByteBuffer component : components)
+                nameBuilder.add(component);
+
+            Bound b = isReversed == isReversedType(iter.next()) ? bound : Bound.reverse(bound);
+            inValues.add((bound == Bound.END && nameBuilder.remainingCount() > 0) ? nameBuilder.buildAsEndOfRange() : nameBuilder.build());
+        }
+        return new ArrayList<>(inValues);
+    }
+
+    private List<ByteBuffer> buildMultiColumnEQBound(Bound bound, MultiColumnRestriction.EQ restriction, boolean isReversed, ColumnNameBuilder builder, List<ByteBuffer> variables) throws InvalidRequestException
+    {
+        Bound eocBound = isReversed ? Bound.reverse(bound) : bound;
+        for (ByteBuffer component : restriction.values(variables))
+            builder.add(component);
+
+        ByteBuffer result = builder.componentCount() > 0 && eocBound == Bound.END
+                ? builder.buildAsEndOfRange()
+                : builder.build();
+        return Collections.singletonList(result);
+    }
+
     private static boolean isNullRestriction(Restriction r, Bound b)
     {
         return r == null || (r.isSlice() && !((Restriction.Slice)r).hasBound(b));
     }
 
-    private static ByteBuffer getSliceValue(CFDefinition.Name name, Restriction r, Bound b, List<ByteBuffer> variables) throws InvalidRequestException
+    private static ByteBuffer getSliceValue(Restriction r, Bound b, List<ByteBuffer> variables) throws InvalidRequestException
     {
         Restriction.Slice slice = (Restriction.Slice)r;
         assert slice.hasBound(b);
         ByteBuffer val = slice.bound(b, variables);
         if (val == null)
-            throw new InvalidRequestException(String.format("Invalid null clustering key part %s", name));
+            throw new InvalidRequestException(String.format("Invalid null clustering key part %s", r));
         return val;
     }
 
@@ -923,11 +1011,7 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
                     if (slice.hasBound(b))
                     {
                         ByteBuffer value = slice.bound(b, variables);
-                        if (value == null)
-                            throw new InvalidRequestException(String.format("Unsupported null value for indexed column %s", name));
-                        if (value.remaining() > 0xFFFF)
-                            throw new InvalidRequestException("Index expression values may not be larger than 64K");
-
+                        validateIndexExpressionValue(value, name);
                         IndexOperator op = slice.getIndexOperator(b);
                         // If the underlying comparator for name is reversed, we need to reverse the IndexOperator: user operation
                         // always refer to the "forward" sorting even if the clustering order is reversed, but the 2ndary code does
@@ -946,16 +1030,21 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
                     throw new InvalidRequestException("IN restrictions are not supported on indexed columns");
 
                 ByteBuffer value = values.get(0);
-                if (value == null)
-                    throw new InvalidRequestException(String.format("Unsupported null value for indexed column %s", name));
-                if (value.remaining() > 0xFFFF)
-                    throw new InvalidRequestException("Index expression values may not be larger than 64K");
+                validateIndexExpressionValue(value, name);
                 expressions.add(new IndexExpression(name.name.key, IndexOperator.EQ, value));
             }
         }
         return expressions;
     }
 
+    private void validateIndexExpressionValue(ByteBuffer value, CFDefinition.Name name) throws InvalidRequestException
+    {
+        if (value == null)
+            throw new InvalidRequestException(String.format("Unsupported null value for indexed column %s", name));
+        if (value.remaining() > 0xFFFF)
+            throw new InvalidRequestException("Index expression values may not be larger than 64K");
+    }
+
     private static IndexOperator reverse(IndexOperator op)
     {
         switch (op)
@@ -1266,7 +1355,7 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
 
             CFDefinition cfDef = cfm.getCfDef();
 
-            VariableSpecifications names = getBoundVariables();
+            VariableSpecifications boundNames = getBoundVariables();
 
             // Select clause
             if (parameters.isCount && !selectClause.isEmpty())
@@ -1279,14 +1368,7 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
             if (parameters.isDistinct)
                 validateDistinctSelection(selection.getColumns(), cfDef.partitionKeys());
 
-            Term prepLimit = null;
-            if (limit != null)
-            {
-                prepLimit = limit.prepare(limitReceiver());
-                prepLimit.collectMarkerSpecification(names);
-            }
-
-            SelectStatement stmt = new SelectStatement(cfDef, names.size(), parameters, selection, prepLimit);
+            SelectStatement stmt = new SelectStatement(cfDef, boundNames.size(), parameters, selection, prepareLimit(boundNames));
 
             /*
              * WHERE clause. For a given entity, rules are:
@@ -1298,52 +1380,318 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
              */
             boolean hasQueriableIndex = false;
             boolean hasQueriableClusteringColumnIndex = false;
-            for (Relation rel : whereClause)
+            for (Relation relation : whereClause)
             {
-                CFDefinition.Name name = cfDef.get(rel.getEntity());
-                if (name == null)
+                if (relation.isMultiColumn())
+                {
+                    MultiColumnRelation rel = (MultiColumnRelation) relation;
+                    List<CFDefinition.Name> names = new ArrayList<>(rel.getEntities().size());
+                    for (ColumnIdentifier entity : rel.getEntities())
+                    {
+                        boolean[] queriable = processRelationEntity(stmt, relation, entity, cfDef);
+                        hasQueriableIndex |= queriable[0];
+                        hasQueriableClusteringColumnIndex |= queriable[1];
+                        names.add(cfDef.get(entity));
+                    }
+                    updateRestrictionsForRelation(stmt, names, rel, boundNames);
+                }
+                else
+                {
+                    SingleColumnRelation rel = (SingleColumnRelation) relation;
+                    boolean[] queriable = processRelationEntity(stmt, relation, rel.getEntity(), cfDef);
+                    hasQueriableIndex |= queriable[0];
+                    hasQueriableClusteringColumnIndex |= queriable[1];
+                    updateRestrictionsForRelation(stmt, cfDef.get(rel.getEntity()), rel, boundNames);
+                }
+            }
+
+             // At this point, the select statement if fully constructed, but we still have a few things to validate
+            processPartitionKeyRestrictions(stmt, cfDef, hasQueriableIndex);
+
+            // All (or none) of the partition key columns have been specified;
+            // hence there is no need to turn these restrictions into index expressions.
+            if (!stmt.usesSecondaryIndexing)
+                stmt.restrictedNames.removeAll(cfDef.partitionKeys());
+
+            if (stmt.selectsOnlyStaticColumns && stmt.hasClusteringColumnsRestriction())
+                throw new InvalidRequestException("Cannot restrict clustering columns when selecting only static columns");
+
+            processColumnRestrictions(stmt, cfDef, hasQueriableIndex);
+
+            // Covers indexes on the first clustering column (among others).
+            if (stmt.isKeyRange && hasQueriableClusteringColumnIndex)
+                stmt.usesSecondaryIndexing = true;
+
+            if (!stmt.usesSecondaryIndexing)
+                stmt.restrictedNames.removeAll(cfDef.clusteringColumns());
+
+            // Even if usesSecondaryIndexing is false at this point, we'll still have to use one if
+            // there is restrictions not covered by the PK.
+            if (!stmt.metadataRestrictions.isEmpty())
+            {
+                if (!hasQueriableIndex)
+                    throw new InvalidRequestException("No indexed columns present in by-columns clause with Equal operator");
+                stmt.usesSecondaryIndexing = true;
+            }
+
+            if (stmt.usesSecondaryIndexing)
+                validateSecondaryIndexSelections(stmt);
+
+            if (!stmt.parameters.orderings.isEmpty())
+                processOrderingClause(stmt, cfDef);
+
+            checkNeedsFiltering(stmt);
+
+            return new ParsedStatement.Prepared(stmt, boundNames);
+        }
+
+        /** Returns a pair of (hasQueriableIndex, hasQueriableClusteringColumnIndex) */
+        private boolean[] processRelationEntity(SelectStatement stmt, Relation relation, ColumnIdentifier entity, CFDefinition cfDef) throws InvalidRequestException
+        {
+            CFDefinition.Name name = cfDef.get(entity);
+            if (name == null)
+                handleUnrecognizedEntity(entity, relation);
+
+            stmt.restrictedNames.add(name);
+            if (cfDef.cfm.getColumnDefinition(name.name.key).isIndexed() && relation.operator() == Relation.Type.EQ)
+                return new boolean[]{true, name.kind == CFDefinition.Name.Kind.COLUMN_ALIAS};
+            return new boolean[]{false, false};
+        }
+
+        /** Throws an InvalidRequestException for an unrecognized identifier in the WHERE clause */
+        private void handleUnrecognizedEntity(ColumnIdentifier entity, Relation relation) throws InvalidRequestException
+        {
+            if (containsAlias(entity))
+                throw new InvalidRequestException(String.format("Aliases aren't allowed in the where clause ('%s')", relation));
+            else
+                throw new InvalidRequestException(String.format("Undefined name %s in where clause ('%s')", entity, relation));
+        }
+
+        /** Returns a Term for the limit or null if no limit is set */
+        private Term prepareLimit(VariableSpecifications boundNames) throws InvalidRequestException
+        {
+            if (limit == null)
+                return null;
+
+            Term prepLimit = limit.prepare(limitReceiver());
+            prepLimit.collectMarkerSpecification(boundNames);
+            return prepLimit;
+        }
+
+        private void updateRestrictionsForRelation(SelectStatement stmt, List<CFDefinition.Name> names, MultiColumnRelation relation, VariableSpecifications boundNames) throws InvalidRequestException
+        {
+            List<CFDefinition.Name> restrictedColumns = new ArrayList<>();
+            Set<CFDefinition.Name> seen = new HashSet<>();
+
+            int previousPosition = -1;
+            for (CFDefinition.Name name : names)
+            {
+                // ensure multi-column restriction only applies to clustering columns
+                if (name.kind != CFDefinition.Name.Kind.COLUMN_ALIAS)
+                    throw new InvalidRequestException(String.format("Multi-column relations can only be applied to clustering columns: %s", name));
+
+                if (seen.contains(name))
+                    throw new InvalidRequestException(String.format("Column \"%s\" appeared twice in a relation: %s", name, relation));
+                seen.add(name);
+
+                // check that no clustering columns were skipped
+                if (name.position != previousPosition + 1)
                 {
-                    if (containsAlias(rel.getEntity()))
-                        throw new InvalidRequestException(String.format("Aliases aren't allowed in where clause ('%s')", rel));
+                    if (previousPosition == -1)
+                        throw new InvalidRequestException(String.format(
+                                "Clustering columns may not be skipped in multi-column relations. " +
+                                "They should appear in the PRIMARY KEY order. Got %s", relation));
                     else
-                        throw new InvalidRequestException(String.format("Undefined name %s in where clause ('%s')", rel.getEntity(), rel));
+                        throw new InvalidRequestException(String.format(
+                                "Clustering columns must appear in the PRIMARY KEY order in multi-column relations: %s", relation));
                 }
+                previousPosition++;
 
-                ColumnDefinition def = cfDef.cfm.getColumnDefinition(name.name.key);
-                stmt.restrictedNames.add(name);
-                if (def.isIndexed() && rel.operator() == Relation.Type.EQ)
+                Restriction existing = getExistingRestriction(stmt, name);
+                Relation.Type operator = relation.operator();
+                if (existing != null)
                 {
-                    hasQueriableIndex = true;
-                    if (name.kind == CFDefinition.Name.Kind.COLUMN_ALIAS)
-                        hasQueriableClusteringColumnIndex = true;
+                    if (operator == Relation.Type.EQ || operator == Relation.Type.IN)
+                        throw new InvalidRequestException(String.format("Column \"%s\" cannot be restricted by more than one relation if it is in an %s relation", name, relation.operator()));
+                    else if (!existing.isSlice())
+                        throw new InvalidRequestException(String.format("Column \"%s\" cannot be restricted by an equality relation and an inequality relation", name));
                 }
+                restrictedColumns.add(name);
+            }
+
+            boolean onToken = false;
 
-                switch (name.kind)
+            switch (relation.operator())
+            {
+                case EQ:
                 {
-                    case KEY_ALIAS:
-                        stmt.keyRestrictions[name.position] = updateRestriction(cfm, name, stmt.keyRestrictions[name.position], rel, names);
-                        break;
-                    case COLUMN_ALIAS:
-                        stmt.columnRestrictions[name.position] = updateRestriction(cfm, name, stmt.columnRestrictions[name.position], rel, names);
-                        break;
-                    case VALUE_ALIAS:
-                        throw new InvalidRequestException(String.format("Predicates on the non-primary-key column (%s) of a COMPACT table are not yet supported", name.name));
-                    case COLUMN_METADATA:
-                    case STATIC:
-                        // We only all IN on the row key and last clustering key so far, never on non-PK columns, and this even if there's an index
-                        Restriction r = updateRestriction(cfm, name, stmt.metadataRestrictions.get(name), rel, names);
-                        if (r.isIN() && !((Restriction.IN)r).canHaveOnlyOneValue())
-                            // Note: for backward compatibility reason, we conside a IN of 1 value the same as a EQ, so we let that slide.
-                            throw new InvalidRequestException(String.format("IN predicates on non-primary-key columns (%s) is not yet supported", name));
-                        stmt.metadataRestrictions.put(name, r);
-                        break;
+                    Term t = relation.getValue().prepare(names);
+                    t.collectMarkerSpecification(boundNames);
+                    Restriction restriction = new MultiColumnRestriction.EQ(t, onToken);
+                    for (CFDefinition.Name name : restrictedColumns)
+                        stmt.columnRestrictions[name.position] = restriction;
+                    break;
+                }
+                case IN:
+                {
+                    Restriction restriction;
+                    List<? extends Term.MultiColumnRaw> inValues = relation.getInValues();
+                    if (inValues != null)
+                    {
+                        // we have something like "(a, b, c) IN ((1, 2, 3), (4, 5, 6), ...) or
+                        // "(a, b, c) IN (?, ?, ?)
+                        List<Term> terms = new ArrayList<>(inValues.size());
+                        for (Term.MultiColumnRaw tuple : inValues)
+                        {
+                            Term t = tuple.prepare(names);
+                            t.collectMarkerSpecification(boundNames);
+                            terms.add(t);
+                        }
+                         restriction = new MultiColumnRestriction.InWithValues(terms);
+                    }
+                    else
+                    {
+                        Tuples.INRaw rawMarker = relation.getInMarker();
+                        AbstractMarker t = rawMarker.prepare(names);
+                        t.collectMarkerSpecification(boundNames);
+                        restriction = new MultiColumnRestriction.InWithMarker(t);
+                    }
+                    for (CFDefinition.Name name : restrictedColumns)
+                        stmt.columnRestrictions[name.position] = restriction;
+
+                    break;
+                }
+                case LT:
+                case LTE:
+                case GT:
+                case GTE:
+                {
+                    Term t = relation.getValue().prepare(names);
+                    t.collectMarkerSpecification(boundNames);
+                    for (CFDefinition.Name name : names)
+                    {
+                        Restriction.Slice restriction = (Restriction.Slice)getExistingRestriction(stmt, name);
+                        if (restriction == null)
+                            restriction = new MultiColumnRestriction.Slice(onToken);
+                        else if (!restriction.isMultiColumn())
+                            throw new InvalidRequestException(String.format("Column \"%s\" cannot have both tuple-notation inequalities and single-column inequalities", name, relation));
+                        restriction.setBound(relation.operator(), t);
+                        stmt.columnRestrictions[name.position] = restriction;
+                    }
                 }
             }
+        }
 
-            /*
-             * At this point, the select statement if fully constructed, but we still have a few things to validate
-             */
+        private Restriction getExistingRestriction(SelectStatement stmt, CFDefinition.Name name)
+        {
+            switch (name.kind)
+            {
+                case KEY_ALIAS:
+                    return stmt.keyRestrictions[name.position];
+                case COLUMN_ALIAS:
+                    return stmt.columnRestrictions[name.position];
+                case VALUE_ALIAS:
+                    return null;
+                default:
+                    return stmt.metadataRestrictions.get(name);
+            }
+        }
 
+        private void updateRestrictionsForRelation(SelectStatement stmt, CFDefinition.Name name, SingleColumnRelation relation, VariableSpecifications names) throws InvalidRequestException
+        {
+            switch (name.kind)
+            {
+                case KEY_ALIAS:
+                    stmt.keyRestrictions[name.position] = updateSingleColumnRestriction(name, stmt.keyRestrictions[name.position], relation, names);
+                    break;
+                case COLUMN_ALIAS:
+                    stmt.columnRestrictions[name.position] = updateSingleColumnRestriction(name, stmt.columnRestrictions[name.position], relation, names);
+                    break;
+                case VALUE_ALIAS:
+                    throw new InvalidRequestException(String.format("Predicates on the non-primary-key column (%s) of a COMPACT table are not yet supported", name.name));
+                case COLUMN_METADATA:
+                case STATIC:
+                    // We only all IN on the row key and last clustering key so far, never on non-PK columns, and this even if there's an index
+                    Restriction r = updateSingleColumnRestriction(name, stmt.metadataRestrictions.get(name), relation, names);
+                    if (r.isIN() && !((Restriction.IN)r).canHaveOnlyOneValue())
+                        // Note: for backward compatibility reason, we conside a IN of 1 value the same as a EQ, so we let that slide.
+                        throw new InvalidRequestException(String.format("IN predicates on non-primary-key columns (%s) is not yet supported", name));
+                    stmt.metadataRestrictions.put(name, r);
+                    break;
+            }
+        }
+
+        Restriction updateSingleColumnRestriction(CFDefinition.Name name, Restriction existingRestriction, SingleColumnRelation newRel, VariableSpecifications boundNames) throws InvalidRequestException
+        {
+            ColumnSpecification receiver = name;
+            if (newRel.onToken)
+            {
+                if (name.kind != CFDefinition.Name.Kind.KEY_ALIAS)
+                    throw new InvalidRequestException(String.format("The token() function is only supported on the partition key, found on %s", name));
+
+                receiver = new ColumnSpecification(name.ksName,
+                                                   name.cfName,
+                                                   new ColumnIdentifier("partition key token", true),
+                                                   StorageService.getPartitioner().getTokenValidator());
+            }
+
+            switch (newRel.operator())
+            {
+                case EQ:
+                {
+                    if (existingRestriction != null)
+                        throw new InvalidRequestException(String.format("%s cannot be restricted by more than one relation if it includes an Equal", name));
+                    Term t = newRel.getValue().prepare(receiver);
+                    t.collectMarkerSpecification(boundNames);
+                    existingRestriction = new SingleColumnRestriction.EQ(t, newRel.onToken);
+                }
+                break;
+                case IN:
+                    if (existingRestriction != null)
+                        throw new InvalidRequestException(String.format("%s cannot be restricted by more than one relation if it includes a IN", name));
+
+                    if (newRel.getInValues() == null)
+                    {
+                        // Means we have a "SELECT ... IN ?"
+                        assert newRel.getValue() != null;
+                        Term t = newRel.getValue().prepare(receiver);
+                        t.collectMarkerSpecification(boundNames);
+                        existingRestriction = new SingleColumnRestriction.InWithMarker((Lists.Marker)t);
+                    }
+                    else
+                    {
+                        List<Term> inValues = new ArrayList<Term>(newRel.getInValues().size());
+                        for (Term.Raw raw : newRel.getInValues())
+                        {
+                            Term t = raw.prepare(receiver);
+                            t.collectMarkerSpecification(boundNames);
+                            inValues.add(t);
+                        }
+                        existingRestriction = new SingleColumnRestriction.InWithValues(inValues);
+                    }
+                    break;
+                case GT:
+                case GTE:
+                case LT:
+                case LTE:
+                {
+                    if (existingRestriction == null)
+                        existingRestriction = new SingleColumnRestriction.Slice(newRel.onToken);
+                    else if (!existingRestriction.isSlice())
+                        throw new InvalidRequestException(String.format("Column \"%s\" cannot be restricted by both an equality and an inequality relation", name));
+                    else if (existingRestriction.isMultiColumn())
+                        throw new InvalidRequestException(String.format("Column \"%s\" cannot be restricted by both a tuple notation inequality and a single column inequality (%s)", name, newRel));
+                    Term t = newRel.getValue().prepare(receiver);
+                    t.collectMarkerSpecification(boundNames);
+                    ((SingleColumnRestriction.Slice)existingRestriction).setBound(newRel.operator(), t);
+                }
+                break;
+            }
+            return existingRestriction;
+        }
+
+        private void processPartitionKeyRestrictions(SelectStatement stmt, CFDefinition cfDef, boolean hasQueriableIndex) throws InvalidRequestException
+        {
             // If there is a queriable index, no special condition are required on the other restrictions.
             // But we still need to know 2 things:
             //   - If we don't have a queriable index, is the query ok
@@ -1386,7 +1734,9 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
                         stmt.usesSecondaryIndexing = true;
                         break;
                     }
-                    throw new InvalidRequestException(String.format("partition key part %s cannot be restricted (preceding part %s is either not restricted or by a non-EQ relation)", cname, previous));
+                    throw new InvalidRequestException(String.format(
+                            "Partitioning column \"%s\" cannot be restricted because the preceding column (\"%s\") is " +
+                            "either not restricted or is restricted by a non-EQ relation", cname, previous));
                 }
                 else if (restriction.isOnToken())
                 {
@@ -1418,22 +1768,17 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
                 }
                 previous = cname;
             }
+        }
 
-            // All (or none) of the partition key columns have been specified;
-            // hence there is no need to turn these restrictions into index expressions.
-            if (!stmt.usesSecondaryIndexing)
-                stmt.restrictedNames.removeAll(cfDef.partitionKeys());
-
-            if (stmt.selectsOnlyStaticColumns && stmt.hasClusteringColumnsRestriction())
-                throw new InvalidRequestException("Cannot restrict clustering columns when selecting only static columns");
-
+        private void processColumnRestrictions(SelectStatement stmt, CFDefinition cfDef, boolean hasQueriableIndex) throws InvalidRequestException
+        {
             // If a clustering key column is restricted by a non-EQ relation, all preceding
             // columns must have a EQ, and all following must have no restriction. Unless
             // the column is indexed that is.
-            canRestrictFurtherComponents = true;
-            previous = null;
+            boolean canRestrictFurtherComponents = true;
+            CFDefinition.Name previous = null;
             boolean previousIsSlice = false;
-            iter = cfDef.clusteringColumns().iterator();
+            Iterator<CFDefinition.Name> iter = cfDef.clusteringColumns().iterator();
             for (int i = 0; i < stmt.columnRestrictions.length; i++)
             {
                 CFDefinition.Name cname = iter.next();
@@ -1451,20 +1796,21 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
                     //   1) we're in the special case of the 'tuple' notation from #4851 which we expand as multiple
                     //      consecutive slices: in which case we're good with this restriction and we continue
                     //   2) we have a 2ndary index, in which case we have to use it but can skip more validation
-                    if (!(previousIsSlice && restriction.isSlice() && ((Restriction.Slice)restriction).isPartOfTuple()))
+                    if (!(previousIsSlice && restriction.isSlice() && restriction.isMultiColumn()))
                     {
                         if (hasQueriableIndex)
                         {
                             stmt.usesSecondaryIndexing = true; // handle gaps and non-keyrange cases.
                             break;
                         }
-                        throw new InvalidRequestException(String.format("PRIMARY KEY part %s cannot be restricted (preceding part %s is either not restricted or by a non-EQ relation)", cname, previous));
+                        throw new InvalidRequestException(String.format(
+                                "PRIMARY KEY column \"%s\" cannot be restricted (preceding column \"%s\" is either not restricted or by a non-EQ relation)", cname, previous));
                     }
                 }
                 else if (restriction.isSlice())
                 {
-                    canRestrictFurtherComponents = false;
                     previousIsSlice = true;
+                    canRestrictFurtherComponents = false;
                     Restriction.Slice slice = (Restriction.Slice)restriction;
                     // For non-composite slices, we don't support internally the difference between exclusive and
                     // inclusive bounds, so we deal with it manually.
@@ -1473,156 +1819,148 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
                 }
                 else if (restriction.isIN())
                 {
-                    // We only support IN for the last name and for compact storage so far
-                    // TODO: #3885 allows us to extend to non compact as well, but that remains to be done
-                    if (i != stmt.columnRestrictions.length - 1)
-                        throw new InvalidRequestException(String.format("PRIMARY KEY part %s cannot be restricted by IN relation", cname));
-                    else if (stmt.selectACollection())
-                        throw new InvalidRequestException(String.format("Cannot restrict PRIMARY KEY part %s by IN relation as a collection is selected by the query", cname));
+                    if (!restriction.isMultiColumn() && i != stmt.columnRestrictions.length - 1)
+                        throw new InvalidRequestException(String.format("Clustering column \"%s\" cannot be restricted by an IN relation", cname));
+                    if (stmt.selectACollection())
+                        throw new InvalidRequestException(String.format("Cannot restrict column \"%s\" by IN relation as a collection is selected by the query", cname));
                 }
 
                 previous = cname;
             }
+        }
 
-            // Covers indexes on the first clustering column (among others).
-            if (stmt.isKeyRange && hasQueriableClusteringColumnIndex)
-                stmt.usesSecondaryIndexing = true;
-
-            if (!stmt.usesSecondaryIndexing)
-                stmt.restrictedNames.removeAll(cfDef.clusteringColumns());
-
-            // Even if usesSecondaryIndexing is false at this point, we'll still have to use one if
-            // there is restrictions not covered by the PK.
-            if (!stmt.metadataRestrictions.isEmpty())
-            {
-                if (!hasQueriableIndex)
-                    throw new InvalidRequestException("No indexed columns present in by-columns clause with Equal operator");
-                stmt.usesSecondaryIndexing = true;
-            }
+        private void validateSecondaryIndexSelections(SelectStatement stmt) throws InvalidRequestException
+        {
+            if (stmt.keyIsInRelation)
+                throw new InvalidRequestException("Select on indexed columns and with IN clause for the PRIMARY KEY are not supported");
+            // When the user only select static columns, the intent is that we don't query the whole partition but just
+            // the static parts. But 1) we don't have an easy way to do that with 2i and 2) since we don't support index on static columns
+            // so far, 2i means that you've restricted a non static column, so the query is somewhat non-sensical.
+            if (stmt.selectsOnlyStaticColumns)
+                throw new InvalidRequestException("Queries using 2ndary indexes don't support selecting only static columns");
+        }
 
+        private void verifyOrderingIsAllowed(SelectStatement stmt) throws InvalidRequestException
+        {
             if (stmt.usesSecondaryIndexing)
-            {
-                if (stmt.keyIsInRelation)
-                    throw new InvalidRequestException("Select on indexed columns and with IN clause for the PRIMARY KEY are not supported");
-                // When the user only select static columns, the intent is that we don't query the whole partition but just
-                // the static parts. But 1) we don't have an easy way to do that with 2i and 2) since we don't support index on static columns
-                // so far, 2i means that you've restricted a non static column, so the query is somewhat non-sensical.
-                if (stmt.selectsOnlyStaticColumns)
-                    throw new InvalidRequestException("Queries using 2ndary indexes don't support selecting only static columns");
-            }
+                throw new InvalidRequestException("ORDER BY with 2ndary indexes is not supported.");
 
-            if (!stmt.parameters.orderings.isEmpty())
-            {
-                if (stmt.usesSecondaryIndexing)
-                    throw new InvalidRequestException("ORDER BY with 2ndary indexes is not supported.");
+            if (stmt.isKeyRange)
+                throw new InvalidRequestException("ORDER BY is only supported when the partition key is restricted by an EQ or an IN.");
+        }
 
-                if (stmt.isKeyRange)
-                    throw new InvalidRequestException("ORDER BY is only supported when the partition key is restricted by an EQ or an IN.");
+        private void handleUnrecognizedOrderingColumn(ColumnIdentifier column) throws InvalidRequestException
+        {
+            if (containsAlias(column))
+                throw new InvalidRequestException(String.format("Aliases are not allowed in order by clause ('%s')", column));
+            else
+                throw new InvalidRequestException(String.format("Order by on unknown column %s", column));
+        }
 
-                // If we order an IN query, we'll have to do a manual sort post-query. Currently, this sorting requires that we
-                // have queried the column on which we sort (TODO: we should update it to add the column on which we sort to the one
-                // queried automatically, and then removing it from the resultSet afterwards if needed)
-                if (stmt.keyIsInRelation)
+        private void processOrderingClause(SelectStatement stmt, CFDefinition cfDef) throws InvalidRequestException
+        {
+            verifyOrderingIsAllowed(stmt);
+
+            // If we order an IN query, we'll have to do a manual sort post-query. Currently, this sorting requires that we
+            // have queried the column on which we sort (TODO: we should update it to add the column on which we sort to the one
+            // queried automatically, and then removing it from the resultSet afterwards if needed)
+            if (stmt.keyIsInRelation)
+            {
+                stmt.orderingIndexes = new HashMap<CFDefinition.Name, Integer>();
+                for (ColumnIdentifier column : stmt.parameters.orderings.keySet())
                 {
-                    stmt.orderingIndexes = new HashMap<CFDefinition.Name, Integer>();
-                    for (ColumnIdentifier column : stmt.parameters.orderings.keySet())
-                    {
-                        final CFDefinition.Name name = cfDef.get(column);
-                        if (name == null)
-                        {
-                            if (containsAlias(column))
-                                throw new InvalidRequestException(String.format("Aliases are not allowed in order by clause ('%s')", column));
-                            else
-                                throw new InvalidRequestException(String.format("Order by on unknown column %s", column));
-                        }
+                    final CFDefinition.Name name = cfDef.get(column);
+                    if (name == null)
+                        handleUnrecognizedOrderingColumn(column);
 
-                        if (selectClause.isEmpty()) // wildcard
+                    if (selectClause.isEmpty()) // wildcard
+                    {
+                        stmt.orderingIndexes.put(name, Iterables.indexOf(cfDef, new Predicate<CFDefinition.Name>()
                         {
-                            stmt.orderingIndexes.put(name, Iterables.indexOf(cfDef, new Predicate<CFDefinition.Name>()
-                                                                                    {
-                                                                                        public boolean apply(CFDefinition.Name n)
-                                                                                        {
-                                                                                            return name.equals(n);
-                                                                                        }
-                                                                                    }));
-                        }
-                        else
+                            public boolean apply(CFDefinition.Name n)
+                            {
+                                return name.equals(n);
+                            }
+                        }));
+                    }
+                    else
+                    {
+                        boolean hasColumn = false;
+                        for (int i = 0; i < selectClause.size(); i++)
                         {
-                            boolean hasColumn = false;
-                            for (int i = 0; i < selectClause.size(); i++)
+                            RawSelector selector = selectClause.get(i);
+                            if (name.name.equals(selector.selectable))
                             {
-                                RawSelector selector = selectClause.get(i);
-                                if (name.name.equals(selector.selectable))
-                                {
-                                    stmt.orderingIndexes.put(name, i);
-                                    hasColumn = true;
-                                    break;
-                                }
+                                stmt.orderingIndexes.put(name, i);
+                                hasColumn = true;
+                                break;
                             }
-
-                            if (!hasColumn)
-                                throw new InvalidRequestException("ORDER BY could not be used on columns missing in select clause.");
                         }
+
+                        if (!hasColumn)
+                            throw new InvalidRequestException("ORDER BY could not be used on columns missing in select clause.");
                     }
                 }
+            }
+            stmt.isReversed = isReversed(stmt, cfDef);
+        }
 
-                Boolean[] reversedMap = new Boolean[cfDef.clusteringColumnsCount()];
-                int i = 0;
-                for (Map.Entry<ColumnIdentifier, Boolean> entry : stmt.parameters.orderings.entrySet())
-                {
-                    ColumnIdentifier column = entry.getKey();
-                    boolean reversed = entry.getValue();
+        private boolean isReversed(SelectStatement stmt, CFDefinition cfDef) throws InvalidRequestException
+        {
+            Boolean[] reversedMap = new Boolean[cfDef.clusteringColumnsCount()];
+            int i = 0;
+            for (Map.Entry<ColumnIdentifier, Boolean> entry : stmt.parameters.orderings.entrySet())
+            {
+                ColumnIdentifier column = entry.getKey();
+                boolean reversed = entry.getValue();
 
-                    CFDefinition.Name name = cfDef.get(column);
-                    if (name == null)
-                    {
-                        if (containsAlias(column))
-                            throw new InvalidRequestException(String.format("Aliases are not allowed in order by clause ('%s')", column));
-                        else
-                            throw new InvalidRequestException(String.format("Order by on unknown column %s", column));
-                    }
+                CFDefinition.Name name = cfDef.get(column);
+                if (name == null)
+                    handleUnrecognizedOrderingColumn(column);
 
-                    if (name.kind != CFDefinition.Name.Kind.COLUMN_ALIAS)
-                        throw new InvalidRequestException(String.format("Order by is currently only supported on the clustered columns of the PRIMARY KEY, got %s", column));
+                if (name.kind != CFDefinition.Name.Kind.COLUMN_ALIAS)
+                    throw new InvalidRequestException(String.format("Order by is currently only supported on the clustered columns of the PRIMARY KEY, got %s", column));
 
-                    if (i++ != name.position)
-                        throw new InvalidRequestException(String.format("Order by currently only support the ordering of columns following their declared order in the PRIMARY KEY"));
+                if (i++ != name.position)
+                    throw new InvalidRequestException(String.format("Order by currently only support the ordering of columns following their declared order in the PRIMARY KEY"));
 
-                    reversedMap[name.position] = (reversed != isReversedType(name));
-                }
+                reversedMap[name.position] = (reversed != isReversedType(name));
+            }
 
-                // Check that all boolean in reversedMap, if set, agrees
-                Boolean isReversed = null;
-                for (Boolean b : reversedMap)
-                {
-                    // Column on which order is specified can be in any order
-                    if (b == null)
-                        continue;
+            // Check that all boolean in reversedMap, if set, agrees
+            Boolean isReversed = null;
+            for (Boolean b : reversedMap)
+            {
+                // Column on which order is specified can be in any order
+                if (b == null)
+                    continue;
 
-                    if (isReversed == null)
-                    {
-                        isReversed = b;
-                        continue;
-                    }
-                    if (isReversed != b)
-                        throw new InvalidRequestException(String.format("Unsupported order by relation"));
+                if (isReversed == null)
+                {
+                    isReversed = b;
+                    continue;
                 }
-                assert isReversed != null;
-                stmt.isReversed = isReversed;
+                if (isReversed != b)
+                    throw new InvalidRequestException(String.format("Unsupported order by relation"));
             }
+            assert isReversed != null;
+            return isReversed;
+        }
 
-            // Make sure this queries is allowed (note: non key range non indexed cannot involve filtering underneath)
+        /** If ALLOW FILTERING was not specified, this verifies that it is not needed */
+        private void checkNeedsFiltering(SelectStatement stmt) throws InvalidRequestException
+        {
+            // non-key-range non-indexed queries cannot involve filtering underneath
             if (!parameters.allowFiltering && (stmt.isKeyRange || stmt.usesSecondaryIndexing))
             {
                 // We will potentially filter data if either:
                 //  - Have more than one IndexExpression
                 //  - Have no index expression and the column filter is not the identity
                 if (stmt.restrictedNames.size() > 1 || (stmt.restrictedNames.isEmpty() && !stmt.columnFilterIsIdentity()))
-                    throw new InvalidRequestException("Cannot execute this query as it might involve data filtering and thus may have unpredictable performance. "
-                                                    + "If you want to execute this query despite the performance unpredictability, use ALLOW FILTERING");
+                    throw new InvalidRequestException("Cannot execute this query as it might involve data filtering and " +
+                                                      "thus may have unpredictable performance. If you want to execute " +
+                                                      "this query despite the performance unpredictability, use ALLOW FILTERING");
             }
-
-            return new ParsedStatement.Prepared(stmt, names);
         }
 
         private void validateDistinctSelection(Collection<CFDefinition.Name> requestedColumns, Collection<CFDefinition.Name> partitionKey)
@@ -1653,79 +1991,6 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
             return new ColumnSpecification(keyspace(), columnFamily(), new ColumnIdentifier("[limit]", true), Int32Type.instance);
         }
 
-        Restriction updateRestriction(CFMetaData cfm, CFDefinition.Name name, Restriction restriction, Relation newRel, VariableSpecifications boundNames) throws InvalidRequestException
-        {
-            ColumnSpecification receiver = name;
-            if (newRel.onToken)
-            {
-                if (name.kind != CFDefinition.Name.Kind.KEY_ALIAS)
-                    throw new InvalidRequestException(String.format("The token() function is only supported on the partition key, found on %s", name));
-
-                receiver = new ColumnSpecification(name.ksName,
-                                                   name.cfName,
-                                                   new ColumnIdentifier("partition key token", true),
-                                                   StorageService.getPartitioner().getTokenValidator());
-            }
-
-            // We can only use the tuple notation of #4851 on clustering columns for now
-            if (newRel.previousInTuple != null && name.kind != CFDefinition.Name.Kind.COLUMN_ALIAS)
-                throw new InvalidRequestException(String.format("Tuple notation can only be used on clustering columns but found on %s", name));
-
-            switch (newRel.operator())
-            {
-                case EQ:
-                    {
-                        if (restriction != null)
-                            throw new InvalidRequestException(String.format("%s cannot be restricted by more than one relation if it includes an Equal", name));
-                        Term t = newRel.getValue().prepare(receiver);
-                        t.collectMarkerSpecification(boundNames);
-                        restriction = new Restriction.EQ(t, newRel.onToken);
-                    }
-                    break;
-                case IN:
-                    if (restriction != null)
-                        throw new InvalidRequestException(String.format("%s cannot be restricted by more than one relation if it includes a IN", name));
-
-                    if (newRel.getInValues() == null)
-                    {
-                        // Means we have a "SELECT ... IN ?"
-                        assert newRel.getValue() != null;
-                        Term t = newRel.getValue().prepare(receiver);
-                        t.collectMarkerSpecification(boundNames);
-                        restriction = Restriction.IN.create(t);
-                    }
-                    else
-                    {
-                        List<Term> inValues = new ArrayList<Term>(newRel.getInValues().size());
-                        for (Term.Raw raw : newRel.getInValues())
-                        {
-                            Term t = raw.prepare(receiver);
-                            t.collectMarkerSpecification(boundNames);
-                            inValues.add(t);
-                        }
-                        restriction = Restriction.IN.create(inValues);
-                    }
-                    break;
-                case GT:
-                case GTE:
-                case LT:
-                case LTE:
-                    {
-                        if (restriction == null)
-                            restriction = new Restriction.Slice(newRel.onToken);
-                        else if (!restriction.isSlice())
-                            throw new InvalidRequestException(String.format("%s cannot be restricted by both an equal and an inequal relation", name));
-                        Term t = newRel.getValue().prepare(receiver);
-                        t.collectMarkerSpecification(boundNames);
-                        if (newRel.previousInTuple != null && (name.position == 0 || !cfm.clusteringKeyColumns().get(name.position - 1).name.equals(newRel.previousInTuple.key)))
-                            throw new InvalidRequestException(String.format("Invalid tuple notation, column %s is not before column %s in the clustering order", newRel.previousInTuple, name.name));
-                        ((Restriction.Slice)restriction).setBound(name.name, newRel.operator(), t, newRel.previousInTuple);
-                    }
-                    break;
-            }
-            return restriction;
-        }
-
         @Override
         public String toString()
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/43496384/src/java/org/apache/cassandra/cql3/statements/SingleColumnRestriction.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/SingleColumnRestriction.java b/src/java/org/apache/cassandra/cql3/statements/SingleColumnRestriction.java
new file mode 100644
index 0000000..2e63272
--- /dev/null
+++ b/src/java/org/apache/cassandra/cql3/statements/SingleColumnRestriction.java
@@ -0,0 +1,300 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.cql3.statements;
+
+import org.apache.cassandra.cql3.*;
+import org.apache.cassandra.exceptions.InvalidRequestException;
+import org.apache.cassandra.thrift.IndexOperator;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+public abstract class SingleColumnRestriction implements Restriction
+{
+    public boolean isMultiColumn()
+    {
+        return false;
+    }
+
+    public static class EQ extends SingleColumnRestriction implements Restriction.EQ
+    {
+        protected final Term value;
+        private final boolean onToken;
+
+        public EQ(Term value, boolean onToken)
+        {
+            this.value = value;
+            this.onToken = onToken;
+        }
+
+        public List<ByteBuffer> values(List<ByteBuffer> variables) throws InvalidRequestException
+        {
+            return Collections.singletonList(value.bindAndGet(variables));
+        }
+
+        public boolean isSlice()
+        {
+            return false;
+        }
+
+        public boolean isEQ()
+        {
+            return true;
+        }
+
+        public boolean isIN()
+        {
+            return false;
+        }
+
+        public boolean isOnToken()
+        {
+            return onToken;
+        }
+
+        @Override
+        public String toString()
+        {
+            return String.format("EQ(%s)%s", value, onToken ? "*" : "");
+        }
+    }
+
+    public static class InWithValues extends SingleColumnRestriction implements Restriction.IN
+    {
+        protected final List<Term> values;
+
+        public InWithValues(List<Term> values)
+        {
+            this.values = values;
+        }
+
+        public List<ByteBuffer> values(List<ByteBuffer> variables) throws InvalidRequestException
+        {
+            List<ByteBuffer> buffers = new ArrayList<>(values.size());
+            for (Term value : values)
+                buffers.add(value.bindAndGet(variables));
+            return buffers;
+        }
+
+        public boolean canHaveOnlyOneValue()
+        {
+            return values.size() == 1;
+        }
+
+        public boolean isSlice()
+        {
+            return false;
+        }
+
+        public boolean isEQ()
+        {
+            return false;
+        }
+
+        public boolean isIN()
+        {
+            return true;
+        }
+
+        public boolean isOnToken()
+        {
+            return false;
+        }
+
+        @Override
+        public String toString()
+        {
+            return String.format("IN(%s)", values);
+        }
+    }
+
+    public static class InWithMarker extends SingleColumnRestriction implements Restriction.IN
+    {
+        protected final AbstractMarker marker;
+
+        public InWithMarker(AbstractMarker marker)
+        {
+            this.marker = marker;
+        }
+
+        public List<ByteBuffer> values(List<ByteBuffer> variables) throws InvalidRequestException
+        {
+            Term.MultiItemTerminal lval = (Term.MultiItemTerminal)marker.bind(variables);
+            if (lval == null)
+                throw new InvalidRequestException("Invalid null value for IN restriction");
+            return lval.getElements();
+        }
+
+        public boolean canHaveOnlyOneValue()
+        {
+            return false;
+        }
+
+        public boolean isSlice()
+        {
+            return false;
+        }
+
+        public boolean isEQ()
+        {
+            return false;
+        }
+
+        public boolean isIN()
+        {
+            return true;
+        }
+
+        public boolean isOnToken()
+        {
+            return false;
+        }
+
+        @Override
+        public String toString()
+        {
+            return "IN ?";
+        }
+    }
+
+    public static class Slice extends SingleColumnRestriction implements Restriction.Slice
+    {
+        protected final Term[] bounds;
+        protected final boolean[] boundInclusive;
+        protected final boolean onToken;
+
+        public Slice(boolean onToken)
+        {
+            this.bounds = new Term[2];
+            this.boundInclusive = new boolean[2];
+            this.onToken = onToken;
+        }
+
+        public boolean isSlice()
+        {
+            return true;
+        }
+
+        public boolean isEQ()
+        {
+            return false;
+        }
+
+        public boolean isIN()
+        {
+            return false;
+        }
+
+        public List<ByteBuffer> values(List<ByteBuffer> variables) throws InvalidRequestException
+        {
+            throw new UnsupportedOperationException();
+        }
+
+        public boolean isOnToken()
+        {
+            return onToken;
+        }
+
+        /** Returns true if the start or end bound (depending on the argument) is set, false otherwise */
+        public boolean hasBound(Bound b)
+        {
+            return bounds[b.idx] != null;
+        }
+
+        public ByteBuffer bound(Bound b, List<ByteBuffer> variables) throws InvalidRequestException
+        {
+            return bounds[b.idx].bindAndGet(variables);
+        }
+
+        /** Returns true if the start or end bound (depending on the argument) is inclusive, false otherwise */
+        public boolean isInclusive(Bound b)
+        {
+            return bounds[b.idx] == null || boundInclusive[b.idx];
+        }
+
+        public Relation.Type getRelation(Bound eocBound, Bound inclusiveBound)
+        {
+            switch (eocBound)
+            {
+                case START:
+                    return boundInclusive[inclusiveBound.idx] ? Relation.Type.GTE : Relation.Type.GT;
+                case END:
+                    return boundInclusive[inclusiveBound.idx] ? Relation.Type.LTE : Relation.Type.LT;
+            }
+            throw new AssertionError();
+        }
+
+        public IndexOperator getIndexOperator(Bound b)
+        {
+            switch (b)
+            {
+                case START:
+                    return boundInclusive[b.idx] ? IndexOperator.GTE : IndexOperator.GT;
+                case END:
+                    return boundInclusive[b.idx] ? IndexOperator.LTE : IndexOperator.LT;
+            }
+            throw new AssertionError();
+        }
+
+        public void setBound(Relation.Type type, Term t) throws InvalidRequestException
+        {
+            Bound b;
+            boolean inclusive;
+            switch (type)
+            {
+                case GT:
+                    b = Bound.START;
+                    inclusive = false;
+                    break;
+                case GTE:
+                    b = Bound.START;
+                    inclusive = true;
+                    break;
+                case LT:
+                    b = Bound.END;
+                    inclusive = false;
+                    break;
+                case LTE:
+                    b = Bound.END;
+                    inclusive = true;
+                    break;
+                default:
+                    throw new AssertionError();
+            }
+
+            if (bounds[b.idx] != null)
+                throw new InvalidRequestException(String.format(
+                        "More than one restriction was found for the %s bound", b.name().toLowerCase()));
+
+            bounds[b.idx] = t;
+            boundInclusive[b.idx] = inclusive;
+        }
+
+        @Override
+        public String toString()
+        {
+            return String.format("SLICE(%s %s, %s %s)%s", boundInclusive[0] ? ">=" : ">",
+                                 bounds[0],
+                                 boundInclusive[1] ? "<=" : "<",
+                                 bounds[1],
+                                 onToken ? "*" : "");
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/43496384/src/java/org/apache/cassandra/cql3/statements/TruncateStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/TruncateStatement.java b/src/java/org/apache/cassandra/cql3/statements/TruncateStatement.java
index 30e57d5..ef1c4a4 100644
--- a/src/java/org/apache/cassandra/cql3/statements/TruncateStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/TruncateStatement.java
@@ -77,7 +77,7 @@ public class TruncateStatement extends CFStatement implements CQLStatement
         return null;
     }
 
-    public ResultMessage executeInternal(QueryState state)
+    public ResultMessage executeInternal(QueryState state, QueryOptions options)
     {
         throw new UnsupportedOperationException();
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/43496384/src/java/org/apache/cassandra/cql3/statements/UseStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/UseStatement.java b/src/java/org/apache/cassandra/cql3/statements/UseStatement.java
index ee70f9d..efda72d 100644
--- a/src/java/org/apache/cassandra/cql3/statements/UseStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/UseStatement.java
@@ -59,7 +59,7 @@ public class UseStatement extends ParsedStatement implements CQLStatement
         return new ResultMessage.SetKeyspace(keyspace);
     }
 
-    public ResultMessage executeInternal(QueryState state)
+    public ResultMessage executeInternal(QueryState state, QueryOptions options)
     {
         // Internal queries are exclusively on the system keyspace and 'use' is thus useless
         throw new UnsupportedOperationException();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/43496384/src/java/org/apache/cassandra/db/marshal/TupleType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/marshal/TupleType.java b/src/java/org/apache/cassandra/db/marshal/TupleType.java
new file mode 100644
index 0000000..74211c8
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/marshal/TupleType.java
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.marshal;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import com.google.common.base.Objects;
+
+import org.apache.cassandra.cql3.CQL3Type;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.exceptions.SyntaxException;
+import org.apache.cassandra.serializers.*;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.Pair;
+
+/**
+ * This is essentially like a CompositeType, but it's not primarily meant for comparison, just
+ * to pack multiple values together so has a more friendly encoding.
+ */
+public class TupleType extends AbstractType<ByteBuffer>
+{
+    protected final List<AbstractType<?>> types;
+
+    public TupleType(List<AbstractType<?>> types)
+    {
+        this.types = types;
+    }
+
+    public static TupleType getInstance(TypeParser parser) throws ConfigurationException, SyntaxException
+    {
+        return new TupleType(parser.getTypeParameters());
+    }
+
+    public AbstractType<?> type(int i)
+    {
+        return types.get(i);
+    }
+
+    public int size()
+    {
+        return types.size();
+    }
+
+    public List<AbstractType<?>> allTypes()
+    {
+        return types;
+    }
+
+    public int compare(ByteBuffer o1, ByteBuffer o2)
+    {
+        if (!o1.hasRemaining() || !o2.hasRemaining())
+            return o1.hasRemaining() ? 1 : o2.hasRemaining() ? -1 : 0;
+
+        ByteBuffer bb1 = o1.duplicate();
+        ByteBuffer bb2 = o2.duplicate();
+
+        int i = 0;
+        while (bb1.remaining() > 0 && bb2.remaining() > 0)
+        {
+            AbstractType<?> comparator = types.get(i);
+
+            int size1 = bb1.getInt();
+            int size2 = bb2.getInt();
+
+            // Handle nulls
+            if (size1 < 0)
+            {
+                if (size2 < 0)
+                    continue;
+                return -1;
+            }
+            if (size2 < 0)
+                return 1;
+
+            ByteBuffer value1 = ByteBufferUtil.readBytes(bb1, size1);
+            ByteBuffer value2 = ByteBufferUtil.readBytes(bb2, size2);
+            int cmp = comparator.compare(value1, value2);
+            if (cmp != 0)
+                return cmp;
+
+            ++i;
+        }
+
+        if (bb1.remaining() == 0)
+            return bb2.remaining() == 0 ? 0 : -1;
+
+        // bb1.remaining() > 0 && bb2.remaining() == 0
+        return 1;
+    }
+
+    @Override
+    public void validate(ByteBuffer bytes) throws MarshalException
+    {
+        ByteBuffer input = bytes.duplicate();
+        for (int i = 0; i < size(); i++)
+        {
+            // we allow the input to have less fields than declared so as to support field addition.
+            if (!input.hasRemaining())
+                return;
+
+            if (input.remaining() < 4)
+                throw new MarshalException(String.format("Not enough bytes to read size of %dth component", i));
+
+            int size = input.getInt();
+            // We don't handle null just yet, but we should fix that soon (CASSANDRA-7206)
+            if (size < 0)
+                throw new MarshalException("Nulls are not yet supported inside tuple values");
+
+            if (input.remaining() < size)
+                throw new MarshalException(String.format("Not enough bytes to read %dth component", i));
+
+            ByteBuffer field = ByteBufferUtil.readBytes(input, size);
+            types.get(i).validate(field);
+        }
+
+        // We're allowed to get less fields than declared, but not more
+        if (input.hasRemaining())
+            throw new MarshalException("Invalid remaining data after end of tuple value");
+    }
+
+    /**
+     * Split a tuple value into its component values.
+     */
+    public ByteBuffer[] split(ByteBuffer value)
+    {
+        ByteBuffer[] components = new ByteBuffer[size()];
+        ByteBuffer input = value.duplicate();
+        for (int i = 0; i < size(); i++)
+        {
+            if (!input.hasRemaining())
+                return Arrays.copyOfRange(components, 0, i);
+
+            int size = input.getInt();
+            components[i] = size < 0 ? null : ByteBufferUtil.readBytes(input, size);
+        }
+        return components;
+    }
+
+    public static ByteBuffer buildValue(ByteBuffer[] components)
+    {
+        int totalLength = 0;
+        for (ByteBuffer component : components)
+            totalLength += 4 + component.remaining();
+
+        ByteBuffer result = ByteBuffer.allocate(totalLength);
+        for (ByteBuffer component : components)
+        {
+            result.putInt(component.remaining());
+            result.put(component.duplicate());
+        }
+        result.rewind();
+        return result;
+    }
+
+    @Override
+    public String getString(ByteBuffer value)
+    {
+        StringBuilder sb = new StringBuilder();
+        ByteBuffer input = value.duplicate();
+        for (int i = 0; i < size(); i++)
+        {
+            if (!input.hasRemaining())
+                return sb.toString();
+
+            if (i > 0)
+                sb.append(":");
+
+            int size = input.getInt();
+            assert size >= 0; // We don't support nulls yet, but we will likely do with #7206 and we'll need
+                              // a way to represent it as a string (without it conflicting with a user value)
+            ByteBuffer field = ByteBufferUtil.readBytes(input, size);
+            // We use ':' as delimiter so escape it if it's in the generated string
+            sb.append(field == null ? "null" : type(i).getString(value).replaceAll(":", "\\\\:"));
+        }
+        return sb.toString();
+    }
+
+    public ByteBuffer fromString(String source)
+    {
+        // Split the input on non-escaped ':' characters
+        List<String> strings = AbstractCompositeType.split(source);
+        ByteBuffer[] components = new ByteBuffer[strings.size()];
+        for (int i = 0; i < strings.size(); i++)
+        {
+            // TODO: we'll need to handle null somehow here once we support them
+            String str = strings.get(i).replaceAll("\\\\:", ":");
+            components[i] = type(i).fromString(str);
+        }
+        return buildValue(components);
+    }
+
+    public TypeSerializer<ByteBuffer> getSerializer()
+    {
+        return BytesSerializer.instance;
+    }
+
+    @Override
+    public boolean isCompatibleWith(AbstractType<?> previous)
+    {
+        if (!(previous instanceof TupleType))
+            return false;
+
+        // Extending with new components is fine, removing is not
+        TupleType tt = (TupleType)previous;
+        if (size() < tt.size())
+            return false;
+
+        for (int i = 0; i < tt.size(); i++)
+        {
+            AbstractType<?> tprev = tt.type(i);
+            AbstractType<?> tnew = type(i);
+            if (!tnew.isCompatibleWith(tprev))
+                return false;
+        }
+        return true;
+    }
+
+    @Override
+    public boolean isValueCompatibleWith(AbstractType<?> previous)
+    {
+        if (!(previous instanceof TupleType))
+            return false;
+
+        // Extending with new components is fine, removing is not
+        TupleType tt = (TupleType)previous;
+        if (size() < tt.size())
+            return false;
+
+        for (int i = 0; i < tt.size(); i++)
+        {
+            AbstractType<?> tprev = tt.type(i);
+            AbstractType<?> tnew = type(i);
+            if (!tnew.isValueCompatibleWith(tprev))
+                return false;
+        }
+        return true;
+    }
+
+    @Override
+    public int hashCode()
+    {
+        return Objects.hashCode(types);
+    }
+
+    @Override
+    public boolean equals(Object o)
+    {
+        if(!(o instanceof TupleType))
+            return false;
+
+        TupleType that = (TupleType)o;
+        return types.equals(that.types);
+    }
+
+    @Override
+    public String toString()
+    {
+        return getClass().getName() + TypeParser.stringifyTypeParameters(types);
+    }
+}
+


Mime
View raw message