cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From s...@apache.org
Subject cassandra git commit: Make custom filter expressions more extensible
Date Wed, 06 Apr 2016 15:35:42 GMT
Repository: cassandra
Updated Branches:
  refs/heads/trunk b680ddd61 -> 1a73af768


Make custom filter expressions more extensible

Patch by Sam Tunnicliffe; reviewed by Sylvain Lebresne for
CASSANDRA-11295


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/1a73af76
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/1a73af76
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/1a73af76

Branch: refs/heads/trunk
Commit: 1a73af7686e41cf8c2c0a031c9ef466b13a4b794
Parents: b680ddd
Author: Sam Tunnicliffe <sam@beobal.com>
Authored: Thu Feb 25 15:32:34 2016 +0000
Committer: Sam Tunnicliffe <sam@beobal.com>
Committed: Wed Apr 6 16:29:01 2016 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 NEWS.txt                                        |   2 +
 .../cql3/restrictions/Restrictions.java         |   2 +-
 .../restrictions/StatementRestrictions.java     |   6 +
 .../cql3/statements/ModificationStatement.java  |   8 ++
 .../apache/cassandra/db/filter/RowFilter.java   | 144 ++++++++++++++++++-
 .../cassandra/index/SecondaryIndexManager.java  |   2 +-
 7 files changed, 156 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/1a73af76/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 95ff24c..e522035 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.6
+ * Make custom filtering more extensible with UserExpression (CASSANDRA-11295)
  * Improve field-checking and error reporting in cassandra.yaml (CASSANDRA-10649)
  * Print CAS stats in nodetool proxyhistograms (CASSANDRA-11507)
  * More user friendly error when providing an invalid token to nodetool (CASSANDRA-9348)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1a73af76/NEWS.txt
----------------------------------------------------------------------
diff --git a/NEWS.txt b/NEWS.txt
index cc3e9c2..e073592 100644
--- a/NEWS.txt
+++ b/NEWS.txt
@@ -23,6 +23,8 @@ New features
    - Startup is now aborted if corrupted transaction log files are found. The details
      of the affected log files are now logged, allowing the operator to decide how
      to resolve the situation.
+   - Filtering expressions are made more pluggable and can be added programatically via
+     a QueryHandler implementation. See CASSANDRA-11295 for more details.
 
 3.4
 =====

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1a73af76/src/java/org/apache/cassandra/cql3/restrictions/Restrictions.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/restrictions/Restrictions.java b/src/java/org/apache/cassandra/cql3/restrictions/Restrictions.java
index 705d66d..f46f176 100644
--- a/src/java/org/apache/cassandra/cql3/restrictions/Restrictions.java
+++ b/src/java/org/apache/cassandra/cql3/restrictions/Restrictions.java
@@ -29,7 +29,7 @@ import org.apache.cassandra.index.SecondaryIndexManager;
 /**
  * Sets of restrictions
  */
-interface Restrictions
+public interface Restrictions
 {
     /**
      * Returns the column definitions in position order.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1a73af76/src/java/org/apache/cassandra/cql3/restrictions/StatementRestrictions.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/restrictions/StatementRestrictions.java b/src/java/org/apache/cassandra/cql3/restrictions/StatementRestrictions.java
index a35d86b..b00214c 100644
--- a/src/java/org/apache/cassandra/cql3/restrictions/StatementRestrictions.java
+++ b/src/java/org/apache/cassandra/cql3/restrictions/StatementRestrictions.java
@@ -270,6 +270,12 @@ public final class StatementRestrictions
                                 nonPrimaryKeyRestrictions.getFunctions());
     }
 
+    // may be used by QueryHandler implementations
+    public IndexRestrictions getIndexRestrictions()
+    {
+        return indexRestrictions;
+    }
+
     private void addSingleColumnRestriction(SingleColumnRestriction restriction)
     {
         ColumnDefinition def = restriction.columnDef;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1a73af76/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
index 614e47a..06ff5d4 100644
--- a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
@@ -147,6 +147,14 @@ public abstract class ModificationStatement implements CQLStatement
                                 conditions.getFunctions());
     }
 
+    /*
+     * May be used by QueryHandler implementations
+     */
+    public StatementRestrictions getRestrictions()
+    {
+        return restrictions;
+    }
+
     public abstract void addUpdateForKey(PartitionUpdate update, Clustering clustering, UpdateParameters
params);
 
     public abstract void addUpdateForKey(PartitionUpdate update, Slice slice, UpdateParameters
params);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1a73af76/src/java/org/apache/cassandra/db/filter/RowFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/RowFilter.java b/src/java/org/apache/cassandra/db/filter/RowFilter.java
index fea8ea8..0ef29c2 100644
--- a/src/java/org/apache/cassandra/db/filter/RowFilter.java
+++ b/src/java/org/apache/cassandra/db/filter/RowFilter.java
@@ -20,15 +20,21 @@ package org.apache.cassandra.db.filter;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import com.google.common.base.Objects;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.cql3.Operator;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.marshal.*;
-import org.apache.cassandra.db.partitions.*;
+import org.apache.cassandra.db.partitions.ImmutableBTreePartition;
+import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
 import org.apache.cassandra.db.rows.*;
 import org.apache.cassandra.db.transform.Transformation;
 import org.apache.cassandra.exceptions.InvalidRequestException;
@@ -53,6 +59,8 @@ import static org.apache.cassandra.cql3.statements.RequestValidations.checkNotNu
  */
 public abstract class RowFilter implements Iterable<RowFilter.Expression>
 {
+    private static final Logger logger = LoggerFactory.getLogger(RowFilter.class);
+
     public static final Serializer serializer = new Serializer();
     public static final RowFilter NONE = new CQLFilter(Collections.emptyList());
 
@@ -107,6 +115,11 @@ public abstract class RowFilter implements Iterable<RowFilter.Expression>
         expressions.add(expression);
     }
 
+    public void addUserExpression(UserExpression e)
+    {
+        expressions.add(e);
+    }
+
     public List<Expression> getExpressions()
     {
         return expressions;
@@ -245,13 +258,13 @@ public abstract class RowFilter implements Iterable<RowFilter.Expression>
                 DecoratedKey pk;
                 public UnfilteredRowIterator applyToPartition(UnfilteredRowIterator partition)
                 {
+                    pk = partition.partitionKey();
+
                     // The filter might be on static columns, so need to check static row
first.
                     if (filterStaticColumns && applyToRow(partition.staticRow())
== null)
                         return null;
 
-                    pk = partition.partitionKey();
                     UnfilteredRowIterator iterator = Transformation.apply(partition, this);
-
                     return (filterNonStaticColumns && !iterator.hasNext()) ? null
: iterator;
                 }
 
@@ -327,7 +340,7 @@ public abstract class RowFilter implements Iterable<RowFilter.Expression>
         private static final Serializer serializer = new Serializer();
 
         // Note: the order of this enum matter, it's used for serialization
-        protected enum Kind { SIMPLE, MAP_EQUALITY, THRIFT_DYN_EXPR, CUSTOM }
+        protected enum Kind { SIMPLE, MAP_EQUALITY, THRIFT_DYN_EXPR, CUSTOM, USER }
 
         protected abstract Kind kind();
         protected final ColumnDefinition column;
@@ -346,6 +359,11 @@ public abstract class RowFilter implements Iterable<RowFilter.Expression>
             return kind() == Kind.CUSTOM;
         }
 
+        public boolean isUserDefined()
+        {
+            return kind() == Kind.USER;
+        }
+
         public ColumnDefinition column()
         {
             return column;
@@ -468,6 +486,13 @@ public abstract class RowFilter implements Iterable<RowFilter.Expression>
                     return;
                 }
 
+                if (expression.kind() == Kind.USER)
+                {
+                    assert version >= MessagingService.VERSION_30;
+                    UserExpression.serialize((UserExpression)expression, out, version);
+                    return;
+                }
+
                 ByteBufferUtil.writeWithShortLength(expression.column.name.bytes, out);
                 expression.operator.writeTo(out);
 
@@ -511,6 +536,11 @@ public abstract class RowFilter implements Iterable<RowFilter.Expression>
                                                     IndexMetadata.serializer.deserialize(in,
version, metadata),
                                                     ByteBufferUtil.readWithShortLength(in));
                     }
+
+                    if (kind == Kind.USER)
+                    {
+                        return UserExpression.deserialize(in, version, metadata);
+                    }
                 }
 
                 name = ByteBufferUtil.readWithShortLength(in);
@@ -560,8 +590,11 @@ public abstract class RowFilter implements Iterable<RowFilter.Expression>
                 // version 3.0+ includes a byte for Kind
                 long size = version >= MessagingService.VERSION_30 ? 1 : 0;
 
-                // custom expressions don't include a column or operator, all other expressions
do
-                if (expression.kind() != Kind.CUSTOM)
+                // Custom expressions include neither a column or operator, but all
+                // other expressions do. Also, custom expressions are 3.0+ only, so
+                // the column & operator will always be the first things written for
+                // any pre-3.0 version
+                if (expression.kind() != Kind.CUSTOM && expression.kind() != Kind.USER)
                     size += ByteBufferUtil.serializedSizeWithShortLength(expression.column().name.bytes)
                             + expression.operator.serializedSize();
 
@@ -584,8 +617,11 @@ public abstract class RowFilter implements Iterable<RowFilter.Expression>
                     case CUSTOM:
                         if (version >= MessagingService.VERSION_30)
                             size += IndexMetadata.serializer.serializedSize(((CustomExpression)expression).targetIndex,
version)
-                                  + ByteBufferUtil.serializedSizeWithShortLength(expression.value);
+                                   + ByteBufferUtil.serializedSizeWithShortLength(expression.value);
                         break;
+                    case USER:
+                        if (version >= MessagingService.VERSION_30)
+                            size += UserExpression.serializedSize((UserExpression)expression,
version);
                 }
                 return size;
             }
@@ -920,6 +956,100 @@ public abstract class RowFilter implements Iterable<RowFilter.Expression>
         }
     }
 
+    /**
+     * A user defined filtering expression. These may be added to RowFilter programmatically
by a
+     * QueryHandler implementation. No concrete implementations are provided and adding custom
impls
+     * to the classpath is a task for operators (needless to say, this is something of a
power
+     * user feature). Care must also be taken to register implementations, via the static
register
+     * method during system startup. An implementation and its corresponding Deserializer
must be
+     * registered before sending or receiving any messages containing expressions of that
type.
+     * Use of custom filtering expressions in a mixed version cluster should be handled with
caution
+     * as the order in which types are registered is significant: if continuity of use during
upgrades
+     * is important, new types should registered last & obsoleted types should still
be registered (
+     * or dummy implementations registered in their place) to preserve consistent identifiers
across
+     * the cluster).
+     *
+     * During serialization, the identifier for the Deserializer implementation is prepended
to the
+     * implementation specific payload. To deserialize, the identifier is read first to obtain
the
+     * Deserializer, which then provides the concrete expression instance.
+     */
+    public static abstract class UserExpression extends Expression
+    {
+        private static final DeserializerRegistry deserializers = new DeserializerRegistry();
+        private static final class DeserializerRegistry
+        {
+            private final AtomicInteger counter = new AtomicInteger(0);
+            private final ConcurrentMap<Integer, Deserializer> deserializers = new
ConcurrentHashMap<>();
+            private final ConcurrentMap<Class<? extends UserExpression>, Integer>
registeredClasses = new ConcurrentHashMap<>();
+
+            public void registerUserExpressionClass(Class<? extends UserExpression>
expressionClass,
+                                                    UserExpression.Deserializer deserializer)
+            {
+                int id = registeredClasses.computeIfAbsent(expressionClass, (cls) -> counter.getAndIncrement());
+                deserializers.put(id, deserializer);
+
+                logger.debug("Registered user defined expression type {} and serializer {}
with identifier {}",
+                             expressionClass.getName(), deserializer.getClass().getName(),
id);
+            }
+
+            public Integer getId(UserExpression expression)
+            {
+                return registeredClasses.get(expression.getClass());
+            }
+
+            public Deserializer getDeserializer(int id)
+            {
+                return deserializers.get(id);
+            }
+        }
+
+        protected static abstract class Deserializer
+        {
+            protected abstract UserExpression deserialize(DataInputPlus in,
+                                                          int version,
+                                                          CFMetaData metadata) throws IOException;
+        }
+
+        public static void register(Class<? extends UserExpression> expressionClass,
Deserializer deserializer)
+        {
+            deserializers.registerUserExpressionClass(expressionClass, deserializer);
+        }
+
+        private static UserExpression deserialize(DataInputPlus in, int version, CFMetaData
metadata) throws IOException
+        {
+            int id = in.readInt();
+            Deserializer deserializer = deserializers.getDeserializer(id);
+            assert deserializer != null : "No user defined expression type registered with
id " + id;
+            return deserializer.deserialize(in, version, metadata);
+        }
+
+        private static void serialize(UserExpression expression, DataOutputPlus out, int
version) throws IOException
+        {
+            Integer id = deserializers.getId(expression);
+            assert id != null : "User defined expression type " + expression.getClass().getName()
+ " is not registered";
+            out.writeInt(id);
+            expression.serialize(out, version);
+        }
+
+        private static long serializedSize(UserExpression expression, int version)
+        {   // 4 bytes for the expression type id
+            return 4 + expression.serializedSize(version);
+        }
+
+        protected UserExpression(ColumnDefinition column, Operator operator, ByteBuffer value)
+        {
+            super(column, operator, value);
+        }
+
+        protected Kind kind()
+        {
+            return Kind.USER;
+        }
+
+        protected abstract void serialize(DataOutputPlus out, int version) throws IOException;
+        protected abstract long serializedSize(int version);
+    }
+
     public static class Serializer
     {
         public void serialize(RowFilter filter, DataOutputPlus out, int version) throws IOException

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1a73af76/src/java/org/apache/cassandra/index/SecondaryIndexManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/SecondaryIndexManager.java b/src/java/org/apache/cassandra/index/SecondaryIndexManager.java
index 09eabc7..cdb478c 100644
--- a/src/java/org/apache/cassandra/index/SecondaryIndexManager.java
+++ b/src/java/org/apache/cassandra/index/SecondaryIndexManager.java
@@ -632,7 +632,7 @@ public class SecondaryIndexManager implements IndexRegistry
                 Tracing.trace("Command contains a custom index expression, using target index
{}", customExpression.getTargetIndex().name);
                 return indexes.get(customExpression.getTargetIndex().name);
             }
-            else
+            else if (!expression.isUserDefined())
             {
                 indexes.values().stream()
                        .filter(index -> index.supportsExpression(expression.column(),
expression.operator()))


Mime
View raw message