Return-Path: X-Original-To: apmail-cassandra-commits-archive@www.apache.org Delivered-To: apmail-cassandra-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id B2EAE19B4C for ; Wed, 6 Apr 2016 15:35:42 +0000 (UTC) Received: (qmail 21033 invoked by uid 500); 6 Apr 2016 15:35:42 -0000 Delivered-To: apmail-cassandra-commits-archive@cassandra.apache.org Received: (qmail 21000 invoked by uid 500); 6 Apr 2016 15:35:42 -0000 Mailing-List: contact commits-help@cassandra.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@cassandra.apache.org Delivered-To: mailing list commits@cassandra.apache.org Received: (qmail 20989 invoked by uid 99); 6 Apr 2016 15:35:42 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 06 Apr 2016 15:35:42 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 66069DFF81; Wed, 6 Apr 2016 15:35:42 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: samt@apache.org To: commits@cassandra.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: cassandra git commit: Make custom filter expressions more extensible Date: Wed, 6 Apr 2016 15:35:42 +0000 (UTC) Repository: cassandra Updated Branches: refs/heads/trunk b680ddd61 -> 1a73af768 Make custom filter expressions more extensible Patch by Sam Tunnicliffe; reviewed by Sylvain Lebresne for CASSANDRA-11295 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/1a73af76 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/1a73af76 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/1a73af76 Branch: refs/heads/trunk Commit: 1a73af7686e41cf8c2c0a031c9ef466b13a4b794 Parents: b680ddd Author: Sam Tunnicliffe Authored: Thu Feb 25 15:32:34 2016 +0000 Committer: Sam Tunnicliffe Committed: Wed Apr 6 16:29:01 2016 +0100 ---------------------------------------------------------------------- CHANGES.txt | 1 + NEWS.txt | 2 + .../cql3/restrictions/Restrictions.java | 2 +- .../restrictions/StatementRestrictions.java | 6 + .../cql3/statements/ModificationStatement.java | 8 ++ .../apache/cassandra/db/filter/RowFilter.java | 144 ++++++++++++++++++- .../cassandra/index/SecondaryIndexManager.java | 2 +- 7 files changed, 156 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/1a73af76/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 95ff24c..e522035 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 3.6 + * Make custom filtering more extensible with UserExpression (CASSANDRA-11295) * Improve field-checking and error reporting in cassandra.yaml (CASSANDRA-10649) * Print CAS stats in nodetool proxyhistograms (CASSANDRA-11507) * More user friendly error when providing an invalid token to nodetool (CASSANDRA-9348) http://git-wip-us.apache.org/repos/asf/cassandra/blob/1a73af76/NEWS.txt ---------------------------------------------------------------------- diff --git a/NEWS.txt b/NEWS.txt index cc3e9c2..e073592 100644 --- a/NEWS.txt +++ b/NEWS.txt @@ -23,6 +23,8 @@ New features - Startup is now aborted if corrupted transaction log files are found. The details of the affected log files are now logged, allowing the operator to decide how to resolve the situation. + - Filtering expressions are made more pluggable and can be added programatically via + a QueryHandler implementation. See CASSANDRA-11295 for more details. 3.4 ===== http://git-wip-us.apache.org/repos/asf/cassandra/blob/1a73af76/src/java/org/apache/cassandra/cql3/restrictions/Restrictions.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/restrictions/Restrictions.java b/src/java/org/apache/cassandra/cql3/restrictions/Restrictions.java index 705d66d..f46f176 100644 --- a/src/java/org/apache/cassandra/cql3/restrictions/Restrictions.java +++ b/src/java/org/apache/cassandra/cql3/restrictions/Restrictions.java @@ -29,7 +29,7 @@ import org.apache.cassandra.index.SecondaryIndexManager; /** * Sets of restrictions */ -interface Restrictions +public interface Restrictions { /** * Returns the column definitions in position order. http://git-wip-us.apache.org/repos/asf/cassandra/blob/1a73af76/src/java/org/apache/cassandra/cql3/restrictions/StatementRestrictions.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/restrictions/StatementRestrictions.java b/src/java/org/apache/cassandra/cql3/restrictions/StatementRestrictions.java index a35d86b..b00214c 100644 --- a/src/java/org/apache/cassandra/cql3/restrictions/StatementRestrictions.java +++ b/src/java/org/apache/cassandra/cql3/restrictions/StatementRestrictions.java @@ -270,6 +270,12 @@ public final class StatementRestrictions nonPrimaryKeyRestrictions.getFunctions()); } + // may be used by QueryHandler implementations + public IndexRestrictions getIndexRestrictions() + { + return indexRestrictions; + } + private void addSingleColumnRestriction(SingleColumnRestriction restriction) { ColumnDefinition def = restriction.columnDef; http://git-wip-us.apache.org/repos/asf/cassandra/blob/1a73af76/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java index 614e47a..06ff5d4 100644 --- a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java @@ -147,6 +147,14 @@ public abstract class ModificationStatement implements CQLStatement conditions.getFunctions()); } + /* + * May be used by QueryHandler implementations + */ + public StatementRestrictions getRestrictions() + { + return restrictions; + } + public abstract void addUpdateForKey(PartitionUpdate update, Clustering clustering, UpdateParameters params); public abstract void addUpdateForKey(PartitionUpdate update, Slice slice, UpdateParameters params); http://git-wip-us.apache.org/repos/asf/cassandra/blob/1a73af76/src/java/org/apache/cassandra/db/filter/RowFilter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/filter/RowFilter.java b/src/java/org/apache/cassandra/db/filter/RowFilter.java index fea8ea8..0ef29c2 100644 --- a/src/java/org/apache/cassandra/db/filter/RowFilter.java +++ b/src/java/org/apache/cassandra/db/filter/RowFilter.java @@ -20,15 +20,21 @@ package org.apache.cassandra.db.filter; import java.io.IOException; import java.nio.ByteBuffer; import java.util.*; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicInteger; import com.google.common.base.Objects; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.config.ColumnDefinition; import org.apache.cassandra.cql3.Operator; import org.apache.cassandra.db.*; import org.apache.cassandra.db.marshal.*; -import org.apache.cassandra.db.partitions.*; +import org.apache.cassandra.db.partitions.ImmutableBTreePartition; +import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator; import org.apache.cassandra.db.rows.*; import org.apache.cassandra.db.transform.Transformation; import org.apache.cassandra.exceptions.InvalidRequestException; @@ -53,6 +59,8 @@ import static org.apache.cassandra.cql3.statements.RequestValidations.checkNotNu */ public abstract class RowFilter implements Iterable { + private static final Logger logger = LoggerFactory.getLogger(RowFilter.class); + public static final Serializer serializer = new Serializer(); public static final RowFilter NONE = new CQLFilter(Collections.emptyList()); @@ -107,6 +115,11 @@ public abstract class RowFilter implements Iterable expressions.add(expression); } + public void addUserExpression(UserExpression e) + { + expressions.add(e); + } + public List getExpressions() { return expressions; @@ -245,13 +258,13 @@ public abstract class RowFilter implements Iterable DecoratedKey pk; public UnfilteredRowIterator applyToPartition(UnfilteredRowIterator partition) { + pk = partition.partitionKey(); + // The filter might be on static columns, so need to check static row first. if (filterStaticColumns && applyToRow(partition.staticRow()) == null) return null; - pk = partition.partitionKey(); UnfilteredRowIterator iterator = Transformation.apply(partition, this); - return (filterNonStaticColumns && !iterator.hasNext()) ? null : iterator; } @@ -327,7 +340,7 @@ public abstract class RowFilter implements Iterable private static final Serializer serializer = new Serializer(); // Note: the order of this enum matter, it's used for serialization - protected enum Kind { SIMPLE, MAP_EQUALITY, THRIFT_DYN_EXPR, CUSTOM } + protected enum Kind { SIMPLE, MAP_EQUALITY, THRIFT_DYN_EXPR, CUSTOM, USER } protected abstract Kind kind(); protected final ColumnDefinition column; @@ -346,6 +359,11 @@ public abstract class RowFilter implements Iterable return kind() == Kind.CUSTOM; } + public boolean isUserDefined() + { + return kind() == Kind.USER; + } + public ColumnDefinition column() { return column; @@ -468,6 +486,13 @@ public abstract class RowFilter implements Iterable return; } + if (expression.kind() == Kind.USER) + { + assert version >= MessagingService.VERSION_30; + UserExpression.serialize((UserExpression)expression, out, version); + return; + } + ByteBufferUtil.writeWithShortLength(expression.column.name.bytes, out); expression.operator.writeTo(out); @@ -511,6 +536,11 @@ public abstract class RowFilter implements Iterable IndexMetadata.serializer.deserialize(in, version, metadata), ByteBufferUtil.readWithShortLength(in)); } + + if (kind == Kind.USER) + { + return UserExpression.deserialize(in, version, metadata); + } } name = ByteBufferUtil.readWithShortLength(in); @@ -560,8 +590,11 @@ public abstract class RowFilter implements Iterable // version 3.0+ includes a byte for Kind long size = version >= MessagingService.VERSION_30 ? 1 : 0; - // custom expressions don't include a column or operator, all other expressions do - if (expression.kind() != Kind.CUSTOM) + // Custom expressions include neither a column or operator, but all + // other expressions do. Also, custom expressions are 3.0+ only, so + // the column & operator will always be the first things written for + // any pre-3.0 version + if (expression.kind() != Kind.CUSTOM && expression.kind() != Kind.USER) size += ByteBufferUtil.serializedSizeWithShortLength(expression.column().name.bytes) + expression.operator.serializedSize(); @@ -584,8 +617,11 @@ public abstract class RowFilter implements Iterable case CUSTOM: if (version >= MessagingService.VERSION_30) size += IndexMetadata.serializer.serializedSize(((CustomExpression)expression).targetIndex, version) - + ByteBufferUtil.serializedSizeWithShortLength(expression.value); + + ByteBufferUtil.serializedSizeWithShortLength(expression.value); break; + case USER: + if (version >= MessagingService.VERSION_30) + size += UserExpression.serializedSize((UserExpression)expression, version); } return size; } @@ -920,6 +956,100 @@ public abstract class RowFilter implements Iterable } } + /** + * A user defined filtering expression. These may be added to RowFilter programmatically by a + * QueryHandler implementation. No concrete implementations are provided and adding custom impls + * to the classpath is a task for operators (needless to say, this is something of a power + * user feature). Care must also be taken to register implementations, via the static register + * method during system startup. An implementation and its corresponding Deserializer must be + * registered before sending or receiving any messages containing expressions of that type. + * Use of custom filtering expressions in a mixed version cluster should be handled with caution + * as the order in which types are registered is significant: if continuity of use during upgrades + * is important, new types should registered last & obsoleted types should still be registered ( + * or dummy implementations registered in their place) to preserve consistent identifiers across + * the cluster). + * + * During serialization, the identifier for the Deserializer implementation is prepended to the + * implementation specific payload. To deserialize, the identifier is read first to obtain the + * Deserializer, which then provides the concrete expression instance. + */ + public static abstract class UserExpression extends Expression + { + private static final DeserializerRegistry deserializers = new DeserializerRegistry(); + private static final class DeserializerRegistry + { + private final AtomicInteger counter = new AtomicInteger(0); + private final ConcurrentMap deserializers = new ConcurrentHashMap<>(); + private final ConcurrentMap, Integer> registeredClasses = new ConcurrentHashMap<>(); + + public void registerUserExpressionClass(Class expressionClass, + UserExpression.Deserializer deserializer) + { + int id = registeredClasses.computeIfAbsent(expressionClass, (cls) -> counter.getAndIncrement()); + deserializers.put(id, deserializer); + + logger.debug("Registered user defined expression type {} and serializer {} with identifier {}", + expressionClass.getName(), deserializer.getClass().getName(), id); + } + + public Integer getId(UserExpression expression) + { + return registeredClasses.get(expression.getClass()); + } + + public Deserializer getDeserializer(int id) + { + return deserializers.get(id); + } + } + + protected static abstract class Deserializer + { + protected abstract UserExpression deserialize(DataInputPlus in, + int version, + CFMetaData metadata) throws IOException; + } + + public static void register(Class expressionClass, Deserializer deserializer) + { + deserializers.registerUserExpressionClass(expressionClass, deserializer); + } + + private static UserExpression deserialize(DataInputPlus in, int version, CFMetaData metadata) throws IOException + { + int id = in.readInt(); + Deserializer deserializer = deserializers.getDeserializer(id); + assert deserializer != null : "No user defined expression type registered with id " + id; + return deserializer.deserialize(in, version, metadata); + } + + private static void serialize(UserExpression expression, DataOutputPlus out, int version) throws IOException + { + Integer id = deserializers.getId(expression); + assert id != null : "User defined expression type " + expression.getClass().getName() + " is not registered"; + out.writeInt(id); + expression.serialize(out, version); + } + + private static long serializedSize(UserExpression expression, int version) + { // 4 bytes for the expression type id + return 4 + expression.serializedSize(version); + } + + protected UserExpression(ColumnDefinition column, Operator operator, ByteBuffer value) + { + super(column, operator, value); + } + + protected Kind kind() + { + return Kind.USER; + } + + protected abstract void serialize(DataOutputPlus out, int version) throws IOException; + protected abstract long serializedSize(int version); + } + public static class Serializer { public void serialize(RowFilter filter, DataOutputPlus out, int version) throws IOException http://git-wip-us.apache.org/repos/asf/cassandra/blob/1a73af76/src/java/org/apache/cassandra/index/SecondaryIndexManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/index/SecondaryIndexManager.java b/src/java/org/apache/cassandra/index/SecondaryIndexManager.java index 09eabc7..cdb478c 100644 --- a/src/java/org/apache/cassandra/index/SecondaryIndexManager.java +++ b/src/java/org/apache/cassandra/index/SecondaryIndexManager.java @@ -632,7 +632,7 @@ public class SecondaryIndexManager implements IndexRegistry Tracing.trace("Command contains a custom index expression, using target index {}", customExpression.getTargetIndex().name); return indexes.get(customExpression.getTargetIndex().name); } - else + else if (!expression.isUserDefined()) { indexes.values().stream() .filter(index -> index.supportsExpression(expression.column(), expression.operator()))