cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From slebre...@apache.org
Subject git commit: Allow paging through non-ordered partitioner results in CQL3
Date Mon, 07 May 2012 11:47:59 GMT
Updated Branches:
  refs/heads/cassandra-1.1 1ab4ec174 -> 8b81c8f2f


Allow paging through non-ordered partitioner results in CQL3

patch by slebresne; reviewed by xedin for CASSANDRA-3771


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/8b81c8f2
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/8b81c8f2
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/8b81c8f2

Branch: refs/heads/cassandra-1.1
Commit: 8b81c8f2fb93a051fbd317c03af727ab919034f2
Parents: 1ab4ec1
Author: Sylvain Lebresne <sylvain@datastax.com>
Authored: Mon May 7 13:47:11 2012 +0200
Committer: Sylvain Lebresne <sylvain@datastax.com>
Committed: Mon May 7 13:47:11 2012 +0200

----------------------------------------------------------------------
 CHANGES.txt                                        |    1 +
 src/java/org/apache/cassandra/cql3/Cql.g           |    7 +
 src/java/org/apache/cassandra/cql3/Relation.java   |   13 +-
 src/java/org/apache/cassandra/cql3/Term.java       |   70 ++++---
 .../cassandra/cql3/statements/SelectStatement.java |  157 +++++++++------
 5 files changed, 152 insertions(+), 96 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/8b81c8f2/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 0c8e022..baf899f 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -35,6 +35,7 @@
  * (cql3) Move max/min compaction thresholds to compaction strategy options
    (CASSANDRA-4187)
  * Fix exception during move when localhost is the only source (CASSANDRA-4200)
+ * (cql3) Allow paging through non-ordered partitioner results (CASSANDRA-3771)
 Merged from 1.0:
  * Fix super columns bug where cache is not updated (CASSANDRA-4190)
  * fix maxTimestamp to include row tombstones (CASSANDRA-4116)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8b81c8f2/src/java/org/apache/cassandra/cql3/Cql.g
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/Cql.g b/src/java/org/apache/cassandra/cql3/Cql.g
index bcfac8a..f567958 100644
--- a/src/java/org/apache/cassandra/cql3/Cql.g
+++ b/src/java/org/apache/cassandra/cql3/Cql.g
@@ -461,6 +461,11 @@ cidentList returns [List<ColumnIdentifier> items]
     ;
 
 // Values (includes prepared statement markers)
+extendedTerm returns [Term term]
+    : K_TOKEN '(' t=term ')' { $term = Term.tokenOf(t); }
+    | t=term                 { $term = t; }
+    ;
+
 term returns [Term term]
     : t=(STRING_LITERAL | UUID | INTEGER | FLOAT ) { $term = new Term($t.text, $t.type);
}
     | t=QMARK                                      { $term = new Term($t.text, $t.type, ++currentBindMarkerIdx);
}
@@ -502,6 +507,7 @@ properties returns [Map<String, String> props]
 
 relation returns [Relation rel]
     : name=cident type=('=' | '<' | '<=' | '>=' | '>') t=term { $rel = new Relation($name.id,
$type.text, $t.term); }
+    | K_TOKEN '(' name=cident ')' type=('=' |'<' | '<=' | '>=' | '>') t=extendedTerm
{ $rel = new Relation($name.id, $type.text, $t.term, true); }
     | name=cident K_IN { $rel = Relation.createInRelation($name.id); }
       '(' f1=term { $rel.addInValue(f1); } (',' fN=term { $rel.addInValue(fN); } )* ')'
     ;
@@ -615,6 +621,7 @@ K_UUID:        U U I D;
 K_VARCHAR:     V A R C H A R;
 K_VARINT:      V A R I N T;
 K_TIMEUUID:    T I M E U U I D;
+K_TOKEN:       T O K E N;
 
 // Case-insensitive alpha characters
 fragment A: ('a'|'A');

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8b81c8f2/src/java/org/apache/cassandra/cql3/Relation.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/Relation.java b/src/java/org/apache/cassandra/cql3/Relation.java
index 0724abd..e7bf616 100644
--- a/src/java/org/apache/cassandra/cql3/Relation.java
+++ b/src/java/org/apache/cassandra/cql3/Relation.java
@@ -32,6 +32,7 @@ public class Relation
     private final Type relationType;
     private final Term value;
     private final List<Term> inValues;
+    public final boolean onToken;
 
     public static enum Type
     {
@@ -54,12 +55,13 @@ public class Relation
         }
     }
 
-    private Relation(ColumnIdentifier entity, Type type, Term value, List<Term> inValues)
+    private Relation(ColumnIdentifier entity, Type type, Term value, List<Term> inValues,
boolean onToken)
     {
         this.entity = entity;
         this.relationType = type;
         this.value = value;
         this.inValues = inValues;
+        this.onToken = onToken;
     }
 
     /**
@@ -71,12 +73,17 @@ public class Relation
      */
     public Relation(ColumnIdentifier entity, String type, Term value)
     {
-        this(entity, Type.forString(type), value, null);
+        this(entity, Type.forString(type), value, null, false);
+    }
+
+    public Relation(ColumnIdentifier entity, String type, Term value, boolean onToken)
+    {
+        this(entity, Type.forString(type), value, null, onToken);
     }
 
     public static Relation createInRelation(ColumnIdentifier entity)
     {
-        return new Relation(entity, Type.IN, null, new ArrayList<Term>());
+        return new Relation(entity, Type.IN, null, new ArrayList<Term>(), false);
     }
 
     public Type operator()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8b81c8f2/src/java/org/apache/cassandra/cql3/Term.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/Term.java b/src/java/org/apache/cassandra/cql3/Term.java
index 07e1625..c86f0e2 100644
--- a/src/java/org/apache/cassandra/cql3/Term.java
+++ b/src/java/org/apache/cassandra/cql3/Term.java
@@ -21,6 +21,9 @@ package org.apache.cassandra.cql3;
 import java.nio.ByteBuffer;
 import java.util.List;
 
+import org.apache.cassandra.config.ConfigurationException;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.db.marshal.AsciiType;
 import org.apache.cassandra.db.marshal.FloatType;
@@ -35,12 +38,19 @@ public class Term
     private final String text;
     private final TermType type;
     public final int bindIndex;
+    public final boolean isToken;
 
-    public Term(String text, TermType type)
+    private Term(String text, TermType type, int bindIndex, boolean isToken)
     {
         this.text = text == null ? "" : text;
         this.type = type;
-        this.bindIndex = -1;
+        this.bindIndex = bindIndex;
+        this.isToken = isToken;
+    }
+
+    public Term(String text, TermType type)
+    {
+        this(text, type, -1, false);
     }
 
     /**
@@ -62,9 +72,12 @@ public class Term
 
     public Term(String text, int type, int index)
     {
-        this.text = text == null ? "" : text;
-        this.type = TermType.forInt(type);
-        this.bindIndex = index;
+        this(text, TermType.forInt(type), index, false);
+    }
+
+    public static Term tokenOf(Term t)
+    {
+        return new Term(t.text, t.type, t.bindIndex, true);
     }
 
     /**
@@ -74,7 +87,7 @@ public class Term
      */
     public String getText()
     {
-        return text;
+        return isToken ? "token(" + text + ")" : text;
     }
 
     /**
@@ -105,29 +118,28 @@ public class Term
         }
     }
 
-    /**
-     * Returns the typed value, serialized to a ByteBuffer.
-     *
-     * @return a ByteBuffer of the value.
-     * @throws InvalidRequestException if unable to coerce the string to its type.
-     */
-    public ByteBuffer getByteBuffer() throws InvalidRequestException
+    public Token getAsToken(AbstractType<?> validator, List<ByteBuffer> variables,
IPartitioner<?> p) throws InvalidRequestException
     {
-        switch (type)
+        if (!(isToken || type == TermType.STRING))
+            throw new InvalidRequestException("Invalid value for token (use a string literal
of the token value or the token() function)");
+
+        try
         {
-            case STRING:
-                return AsciiType.instance.fromString(text);
-            case INTEGER:
-                return IntegerType.instance.fromString(text);
-            case UUID:
-                // we specifically want the Lexical class here, not "UUIDType," because we're
supposed to have
-                // a uuid-shaped string here, and UUIDType also accepts integer or date strings
(and turns them into version 1 uuids).
-                return LexicalUUIDType.instance.fromString(text);
-            case FLOAT:
-              return FloatType.instance.fromString(text);
+            if (isToken)
+            {
+                ByteBuffer value = getByteBuffer(validator, variables);
+                return p.getToken(value);
+            }
+            else
+            {
+                p.getTokenFactory().validate(text);
+                return p.getTokenFactory().fromString(text);
+            }
+        }
+        catch (ConfigurationException e)
+        {
+            throw new InvalidRequestException(e.getMessage());
         }
-
-        throw new IllegalStateException();
     }
 
     /**
@@ -148,14 +160,14 @@ public class Term
     @Override
     public String toString()
     {
-        return String.format("Term(%s, type=%s)", getText(), type);
+        return String.format("Term(%s, type=%s%s)", getText(), type, isToken ? ", isToken"
: "");
     }
 
     @Override
     public int hashCode()
     {
         final int prime = 31;
-        int result = 1;
+        int result = 1 + (isToken ? 1 : 0);
         result = prime * result + ((text == null) ? 0 : text.hashCode());
         result = prime * result + ((type == null) ? 0 : type.hashCode());
         return result;
@@ -180,6 +192,8 @@ public class Term
             return false;
         if (type != other.type)
             return false;
+        if (isToken != other.isToken)
+            return false;
         return true;
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8b81c8f2/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
index b9d1c4f..04f16cc 100644
--- a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
@@ -46,13 +46,7 @@ import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.db.marshal.CompositeType;
 import org.apache.cassandra.db.marshal.ReversedType;
 import org.apache.cassandra.db.marshal.TypeParser;
-import org.apache.cassandra.dht.AbstractBounds;
-import org.apache.cassandra.dht.Bounds;
-import org.apache.cassandra.dht.ExcludingBounds;
-import org.apache.cassandra.dht.IPartitioner;
-import org.apache.cassandra.dht.IncludingExcludingBounds;
-import org.apache.cassandra.dht.RandomPartitioner;
-import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.*;
 import org.apache.cassandra.service.ClientState;
 import org.apache.cassandra.service.StorageProxy;
 import org.apache.cassandra.service.StorageService;
@@ -250,33 +244,6 @@ public class SelectStatement implements CQLStatement
     private List<Row> multiRangeSlice(List<ByteBuffer> variables) throws InvalidRequestException,
TimedOutException, UnavailableException
     {
         List<Row> rows;
-        IPartitioner<?> p = StorageService.getPartitioner();
-
-        ByteBuffer startKeyBytes = getKeyBound(Bound.START, variables);
-        ByteBuffer finishKeyBytes = getKeyBound(Bound.END, variables);
-
-        RowPosition startKey = RowPosition.forKey(startKeyBytes, p);
-        RowPosition finishKey = RowPosition.forKey(finishKeyBytes, p);
-        if (startKey.compareTo(finishKey) > 0 && !finishKey.isMinimum(p))
-        {
-            if (p instanceof RandomPartitioner)
-                throw new InvalidRequestException("Start key sorts after end key. This is
not allowed; you probably should not specify end key at all, under RandomPartitioner");
-            else
-                throw new InvalidRequestException("Start key must sort before (or equal to)
finish key in your partitioner!");
-        }
-        AbstractBounds<RowPosition> bounds;
-        if (includeKeyBound(Bound.START))
-        {
-            bounds = includeKeyBound(Bound.END)
-                   ? new Bounds<RowPosition>(startKey, finishKey)
-                   : new IncludingExcludingBounds<RowPosition>(startKey, finishKey);
-        }
-        else
-        {
-            bounds = includeKeyBound(Bound.END)
-                   ? new Range<RowPosition>(startKey, finishKey)
-                   : new ExcludingBounds<RowPosition>(startKey, finishKey);
-        }
 
         // XXX: Our use of Thrift structs internally makes me Sad. :(
         SlicePredicate thriftSlicePredicate = makeSlicePredicate(variables);
@@ -290,7 +257,7 @@ public class SelectStatement implements CQLStatement
                                                                     columnFamily(),
                                                                     null,
                                                                     thriftSlicePredicate,
-                                                                    bounds,
+                                                                    getKeyBounds(variables),
                                                                     expressions,
                                                                     getLimit(),
                                                                     true, // limit by columns,
not keys
@@ -308,6 +275,50 @@ public class SelectStatement implements CQLStatement
         return rows;
     }
 
+    private AbstractBounds<RowPosition> getKeyBounds(List<ByteBuffer> variables)
throws InvalidRequestException
+    {
+        IPartitioner<?> p = StorageService.getPartitioner();
+        AbstractBounds<RowPosition> bounds;
+
+        if (keyRestriction != null && keyRestriction.onToken)
+        {
+            Token startToken = getTokenBound(Bound.START, variables, p);
+            Token endToken = getTokenBound(Bound.END, variables, p);
+
+            RowPosition start = includeKeyBound(Bound.START) ? startToken.minKeyBound() :
startToken.maxKeyBound();
+            RowPosition end = includeKeyBound(Bound.END) ? endToken.maxKeyBound() : endToken.minKeyBound();
+            bounds = new Range<RowPosition>(start, end);
+        }
+        else
+        {
+            ByteBuffer startKeyBytes = getKeyBound(Bound.START, variables);
+            ByteBuffer finishKeyBytes = getKeyBound(Bound.END, variables);
+
+            RowPosition startKey = RowPosition.forKey(startKeyBytes, p);
+            RowPosition finishKey = RowPosition.forKey(finishKeyBytes, p);
+            if (startKey.compareTo(finishKey) > 0 && !finishKey.isMinimum(p))
+            {
+                if (p instanceof RandomPartitioner)
+                    throw new InvalidRequestException("Start key sorts after end key. This
is not allowed; you probably should not specify end key at all, under RandomPartitioner");
+                else
+                    throw new InvalidRequestException("Start key must sort before (or equal
to) finish key in your partitioner!");
+            }
+            if (includeKeyBound(Bound.START))
+            {
+                bounds = includeKeyBound(Bound.END)
+                    ? new Bounds<RowPosition>(startKey, finishKey)
+                    : new IncludingExcludingBounds<RowPosition>(startKey, finishKey);
+            }
+            else
+            {
+                bounds = includeKeyBound(Bound.END)
+                    ? new Range<RowPosition>(startKey, finishKey)
+                    : new ExcludingBounds<RowPosition>(startKey, finishKey);
+            }
+        }
+        return bounds;
+    }
+
     private SlicePredicate makeSlicePredicate(List<ByteBuffer> variables)
     throws InvalidRequestException
     {
@@ -342,11 +353,11 @@ public class SelectStatement implements CQLStatement
 
     private boolean isKeyRange()
     {
-        // If indexed columns, they always use getRangeSlices
+        // If indexed columns or a token range, they always use getRangeSlices
         if (!metadataRestrictions.isEmpty())
             return true;
 
-        return keyRestriction == null || !keyRestriction.isEquality();
+        return keyRestriction == null || !keyRestriction.isEquality() || keyRestriction.onToken;
     }
 
     private Collection<ByteBuffer> getKeys(final List<ByteBuffer> variables)
throws InvalidRequestException
@@ -377,6 +388,21 @@ public class SelectStatement implements CQLStatement
         }
     }
 
+    private Token getTokenBound(Bound b, List<ByteBuffer> variables, IPartitioner<?>
p) throws InvalidRequestException
+    {
+        assert keyRestriction != null;
+        if (keyRestriction.isEquality())
+        {
+            assert keyRestriction.eqValues.size() == 1;
+            return keyRestriction.eqValues.get(0).getAsToken(cfDef.key.type, variables, p);
+        }
+        else
+        {
+            Term bound = keyRestriction.bound(b);
+            return bound == null ? p.getMinimumToken() : bound.getAsToken(cfDef.key.type,
variables, p);
+        }
+    }
+
     private boolean includeKeyBound(Bound b)
     {
         if (keyRestriction == null || keyRestriction.isEquality())
@@ -871,17 +897,17 @@ public class SelectStatement implements CQLStatement
                 switch (name.kind)
                 {
                     case KEY_ALIAS:
-                        if (rel.operator() != Relation.Type.EQ && rel.operator()
!= Relation.Type.IN && !StorageService.getPartitioner().preservesOrder())
-                            throw new InvalidRequestException("Only EQ and IN relation are
supported on first component of the PRIMARY KEY for RandomPartitioner");
-                        stmt.keyRestriction = updateRestriction(name.name, stmt.keyRestriction,
rel);
+                        if (rel.operator() != Relation.Type.EQ && rel.operator()
!= Relation.Type.IN && !rel.onToken && !StorageService.getPartitioner().preservesOrder())
+                            throw new InvalidRequestException("Only EQ and IN relation are
supported on first component of the PRIMARY KEY for RandomPartitioner (unless you use the
token() function)");
+                        stmt.keyRestriction = updateRestriction(name, stmt.keyRestriction,
rel);
                         break;
                     case COLUMN_ALIAS:
-                        stmt.columnRestrictions[name.position] = updateRestriction(name.name,
stmt.columnRestrictions[name.position], rel);
+                        stmt.columnRestrictions[name.position] = updateRestriction(name,
stmt.columnRestrictions[name.position], rel);
                         break;
                     case VALUE_ALIAS:
                         throw new InvalidRequestException(String.format("Restricting the
value of a compact CF (%s) is not supported", name.name));
                     case COLUMN_METADATA:
-                        stmt.metadataRestrictions.put(name, updateRestriction(name.name,
stmt.metadataRestrictions.get(name), rel));
+                        stmt.metadataRestrictions.put(name, updateRestriction(name, stmt.metadataRestrictions.get(name),
rel));
                         break;
                 }
             }
@@ -941,21 +967,9 @@ public class SelectStatement implements CQLStatement
                 if (!hasEq)
                     throw new InvalidRequestException("No indexed columns present in by-columns
clause with Equal operator");
 
-                // If we have indexed columns and the key = X clause, we transform it into
a key >= X AND key <= X clause.
-                // If it's a IN relation however, we reject it.
-                if (stmt.keyRestriction != null && stmt.keyRestriction.isEquality())
-                {
-                    if (stmt.keyRestriction.eqValues.size() > 1)
-                        throw new InvalidRequestException("Select on indexed columns and
with IN clause for the PRIMARY KEY are not supported");
-
-                    Restriction newRestriction = new Restriction();
-                    for (Bound b : Bound.values())
-                    {
-                        newRestriction.setBound(b, stmt.keyRestriction.eqValues.get(0));
-                        newRestriction.setInclusive(b);
-                    }
-                    stmt.keyRestriction = newRestriction;
-                }
+                // If we have indexed columns and the key = X clause, we will do a range
query, but if it's a IN relation, we don't know how to handle it.
+                if (stmt.keyRestriction != null && stmt.keyRestriction.isEquality()
&& stmt.keyRestriction.eqValues.size() > 1)
+                    throw new InvalidRequestException("Select on indexed columns and with
IN clause for the PRIMARY KEY are not supported");
             }
 
             if (!stmt.parameters.orderings.isEmpty())
@@ -1008,6 +1022,10 @@ public class SelectStatement implements CQLStatement
                     throw new InvalidRequestException("Descending order is only supported
is the first part of the PRIMARY KEY is restricted by an Equal or a IN");
             }
 
+            // If this is a query on tokens, it's necessary a range query (there can be more
than one key per token), so reject IN queries (as we don't know how to do them)
+            if (stmt.keyRestriction != null && stmt.keyRestriction.onToken &&
stmt.keyRestriction.isEquality() && stmt.keyRestriction.eqValues.size() > 1)
+                throw new InvalidRequestException("Select using the token() function don't
support IN clause");
+
             return new ParsedStatement.Prepared(stmt, Arrays.<AbstractType<?>>asList(types));
         }
 
@@ -1016,14 +1034,17 @@ public class SelectStatement implements CQLStatement
             return name.type instanceof ReversedType;
         }
 
-        Restriction updateRestriction(ColumnIdentifier name, Restriction restriction, Relation
newRel) throws InvalidRequestException
+        Restriction updateRestriction(CFDefinition.Name name, Restriction restriction, Relation
newRel) throws InvalidRequestException
         {
+            if (newRel.onToken && name.kind != CFDefinition.Name.Kind.KEY_ALIAS)
+                throw new InvalidRequestException(String.format("The token() function is
only supported on the partition key, found on %s", name));
+
             switch (newRel.operator())
             {
                 case EQ:
                     if (restriction != null)
                         throw new InvalidRequestException(String.format("%s cannot be restricted
by more than one relation if it includes an Equal", name));
-                    restriction = new Restriction(newRel.getValue());
+                    restriction = new Restriction(newRel.getValue(), newRel.onToken);
                     break;
                 case IN:
                     if (restriction != null)
@@ -1035,8 +1056,8 @@ public class SelectStatement implements CQLStatement
                 case LT:
                 case LTE:
                     if (restriction == null)
-                        restriction = new Restriction();
-                    restriction.setBound(name, newRel.operator(), newRel.getValue());
+                        restriction = new Restriction(newRel.onToken);
+                    restriction.setBound(name.name, newRel.operator(), newRel.getValue());
                     break;
             }
             return restriction;
@@ -1066,23 +1087,27 @@ public class SelectStatement implements CQLStatement
         private final Term[] bounds;
         private final boolean[] boundInclusive;
 
+        final boolean onToken;
+
         Restriction(List<Term> values)
         {
             this.eqValues = values;
             this.bounds = null;
             this.boundInclusive = null;
+            this.onToken = false;
         }
 
-        Restriction(Term value)
+        Restriction(Term value, boolean onToken)
         {
             this(Collections.singletonList(value));
         }
 
-        Restriction()
+        Restriction(boolean onToken)
         {
             this.eqValues = null;
             this.bounds = new Term[2];
             this.boundInclusive = new boolean[2];
+            this.onToken = onToken;
         }
 
         boolean isEquality()
@@ -1167,17 +1192,19 @@ public class SelectStatement implements CQLStatement
         @Override
         public String toString()
         {
+            String s;
             if (eqValues == null)
             {
-                return String.format("SLICE(%s %s, %s %s)", boundInclusive[0] ? ">=" :
">",
+                s = String.format("SLICE(%s %s, %s %s)", boundInclusive[0] ? ">=" : ">",
                                                             bounds[0],
                                                             boundInclusive[1] ? "<=" :
"<",
                                                             bounds[1]);
             }
             else
             {
-                return String.format("EQ(%s)", eqValues);
+                s = String.format("EQ(%s)", eqValues);
             }
+            return onToken ? s + "*" : s;
         }
     }
 


Mime
View raw message