Return-Path: X-Original-To: apmail-cassandra-commits-archive@www.apache.org Delivered-To: apmail-cassandra-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 2B4C8D42E for ; Fri, 27 Jul 2012 15:19:51 +0000 (UTC) Received: (qmail 90920 invoked by uid 500); 27 Jul 2012 15:19:41 -0000 Delivered-To: apmail-cassandra-commits-archive@cassandra.apache.org Received: (qmail 90796 invoked by uid 500); 27 Jul 2012 15:19:40 -0000 Mailing-List: contact commits-help@cassandra.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@cassandra.apache.org Delivered-To: mailing list commits@cassandra.apache.org Received: (qmail 89479 invoked by uid 99); 27 Jul 2012 15:19:39 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 27 Jul 2012 15:19:39 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 78BAA1964C; Fri, 27 Jul 2012 15:19:39 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: yukim@apache.org To: commits@cassandra.apache.org X-Mailer: ASF-Git Admin Mailer Subject: [44/50] [abbrv] Add lists, sets and maps support Message-Id: <20120727151939.78BAA1964C@tyr.zones.apache.org> Date: Fri, 27 Jul 2012 15:19:39 +0000 (UTC) http://git-wip-us.apache.org/repos/asf/cassandra/blob/91bdf7fb/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java index c74b08f..dd729fb 100644 --- a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java @@ -48,8 +48,8 @@ import org.apache.cassandra.thrift.IndexExpression; import org.apache.cassandra.thrift.IndexOperator; import org.apache.cassandra.thrift.InvalidRequestException; import org.apache.cassandra.thrift.RequestType; -import org.apache.cassandra.thrift.ThriftValidation; import org.apache.cassandra.thrift.TimedOutException; +import org.apache.cassandra.thrift.ThriftValidation; import org.apache.cassandra.thrift.UnavailableException; import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.FBUtilities; @@ -117,20 +117,27 @@ public class SelectStatement implements CQLStatement public ResultSet executeInternal(ClientState state, List variables) throws InvalidRequestException, UnavailableException, TimedOutException { - List rows; - if (isKeyRange()) + try { - rows = multiRangeSlice(variables); + List rows; + if (isKeyRange()) + { + rows = multiRangeSlice(variables); + } + else + { + rows = getSlice(variables); + } + + // Even for count, we need to process the result as it'll group some column together in sparse column families + ResultSet rset = process(rows, variables); + rset = parameters.isCount ? rset.makeCountResult() : rset; + return rset; } - else + catch (TimeoutException e) { - rows = getSlice(variables); + throw new TimedOutException(); } - - // Even for count, we need to process the result as it'll group some column together in sparse column families - ResultSet rset = process(rows, variables); - rset = parameters.isCount ? rset.makeCountResult() : rset; - return rset; } public ResultSet process(List rows) throws InvalidRequestException @@ -149,40 +156,32 @@ public class SelectStatement implements CQLStatement return cfDef.cfm.cfName; } - private List getSlice(List variables) throws InvalidRequestException, TimedOutException, UnavailableException + private List getSlice(List variables) throws InvalidRequestException, TimeoutException, UnavailableException { QueryPath queryPath = new QueryPath(columnFamily()); Collection keys = getKeys(variables); List commands = new ArrayList(keys.size()); + IFilter filter = makeFilter(variables); // ...a range (slice) of column names if (isColumnRange()) { - ByteBuffer start = getColumnStart(variables); - ByteBuffer finish = getColumnEnd(variables); - - SliceQueryFilter filter = new SliceQueryFilter(start, finish, isReversed, getLimit()); - QueryProcessor.validateSliceFilter(cfDef.cfm, filter); - // Note that we use the total limit for every key. This is // potentially inefficient, but then again, IN + LIMIT is not a // very sensible choice for (ByteBuffer key : keys) { QueryProcessor.validateKey(key); - commands.add(new SliceFromReadCommand(keyspace(), key, queryPath, filter)); + commands.add(new SliceFromReadCommand(keyspace(), key, queryPath, (SliceQueryFilter)filter)); } } // ...of a list of column names else { - Collection columnNames = getRequestedColumns(variables); - QueryProcessor.validateColumnNames(columnNames); - for (ByteBuffer key: keys) { QueryProcessor.validateKey(key); - commands.add(new SliceByNamesReadCommand(keyspace(), key, queryPath, columnNames)); + commands.add(new SliceByNamesReadCommand(keyspace(), key, queryPath, (NamesQueryFilter)filter)); } } @@ -190,23 +189,16 @@ public class SelectStatement implements CQLStatement { return StorageProxy.read(commands, parameters.consistencyLevel); } - catch (TimeoutException e) - { - throw new TimedOutException(); - } catch (IOException e) { throw new RuntimeException(e); } } - private List multiRangeSlice(List variables) throws InvalidRequestException, TimedOutException, UnavailableException + private List multiRangeSlice(List variables) throws InvalidRequestException, TimeoutException, UnavailableException { List rows; - - IFilter filter = makeFilter(variables); - QueryProcessor.validateFilter(cfDef.cfm, filter); - + IFilter filter = makeFilter(variables); List expressions = getIndexExpressions(variables); try @@ -226,10 +218,6 @@ public class SelectStatement implements CQLStatement { throw new RuntimeException(e); } - catch (TimeoutException e) - { - throw new TimedOutException(); - } return rows; } @@ -282,14 +270,26 @@ public class SelectStatement implements CQLStatement { if (isColumnRange()) { - return new SliceQueryFilter(getRequestedBound(isReversed ? Bound.END : Bound.START, variables), - getRequestedBound(isReversed ? Bound.START : Bound.END, variables), - isReversed, - -1); // We use this for range slices, where the count is ignored in favor of the global column count + // For sparse, we used to ask for 'defined columns' * 'asked limit' to account for the grouping of columns. + // Since that doesn't work for maps/sets/lists, we use the compositesToGroup option of SliceQueryFilter. + // But we must preserver backward compatibility too. + int multiplier = cfDef.isCompact ? 1 : cfDef.metadata.size(); + int toGroup = cfDef.isCompact ? -1 : cfDef.columns.size(); + ColumnSlice slice = new ColumnSlice(getRequestedBound(isReversed ? Bound.END : Bound.START, variables), + getRequestedBound(isReversed ? Bound.START : Bound.END, variables)); + SliceQueryFilter filter = new SliceQueryFilter(new ColumnSlice[]{slice}, + isReversed, + getLimit(), + toGroup, + multiplier); + QueryProcessor.validateSliceFilter(cfDef.cfm, filter); + return filter; } else { - return new NamesQueryFilter(getRequestedColumns(variables)); + SortedSet columnNames = getRequestedColumns(variables); + QueryProcessor.validateColumnNames(columnNames); + return new NamesQueryFilter(columnNames); } } @@ -297,11 +297,7 @@ public class SelectStatement implements CQLStatement { // Internally, we don't support exclusive bounds for slices. Instead, // we query one more element if necessary and exclude - int limit = sliceRestriction != null && !sliceRestriction.isInclusive(Bound.START) ? parameters.limit + 1 : parameters.limit; - // For sparse, we'll end up merging all defined colums into the same CqlRow. Thus we should query up - // to 'defined columns' * 'asked limit' to be sure to have enough columns. We'll trim after query if - // this end being too much. - return cfDef.isCompact ? limit : cfDef.metadata.size() * limit; + return sliceRestriction != null && !sliceRestriction.isInclusive(Bound.START) ? parameters.limit + 1 : parameters.limit; } private boolean isKeyRange() @@ -370,6 +366,10 @@ public class SelectStatement implements CQLStatement if (!cfDef.isCompact && !cfDef.isComposite) return false; + // However, collections always entails one + if (cfDef.hasCollections) + return true; + // Otherwise, it is a range query if it has at least one the column alias // for which no relation is defined or is not EQ. for (Restriction r : columnRestrictions) @@ -468,17 +468,7 @@ public class SelectStatement implements CQLStatement } } // Means no relation at all or everything was an equal - return builder.build(); - } - - public ByteBuffer getColumnStart(List variables) throws InvalidRequestException - { - return getRequestedBound(isReversed ? Bound.END : Bound.START, variables); - } - - public ByteBuffer getColumnEnd(List variables) throws InvalidRequestException - { - return getRequestedBound(isReversed ? Bound.START : Bound.END, variables); + return (b == Bound.END) ? builder.buildAsEndOfRange() : builder.build(); } private List getIndexExpressions(List variables) throws InvalidRequestException @@ -708,30 +698,14 @@ public class SelectStatement implements CQLStatement { // Sparse case: group column in cqlRow when composite prefix is equal CompositeType composite = (CompositeType)cfDef.cfm.comparator; - int last = composite.types.size() - 1; - ByteBuffer[] previous = null; - Map group = new HashMap(); - for (IColumn c : row.cf) - { - if (c.isMarkedForDelete()) - continue; + ColumnGroupMap.Builder builder = new ColumnGroupMap.Builder(composite, cfDef.hasCollections); - ByteBuffer[] current = composite.split(c.name()); - // If current differs from previous, we've just finished a group - if (previous != null && !isSameRow(previous, current)) - { - handleGroup(selection, row.key.key, previous, group, cqlRows); - group = new HashMap(); - } + for (IColumn c : row.cf) + builder.add(c); - // Accumulate the current column - group.put(current[last], c); - previous = current; - } - // Handle the last group - if (previous != null) - handleGroup(selection, row.key.key, previous, group, cqlRows); + for (ColumnGroupMap group : builder.groups()) + handleGroup(selection, row.key.key, group, cqlRows); } else { @@ -808,30 +782,7 @@ public class SelectStatement implements CQLStatement Collections.sort(cqlRows.rows, new CompositeComparator(startPosition, types)); } - - /** - * For sparse composite, returns wheter two columns belong to the same - * cqlRow base on the full list of component in the name. - * Two columns do belong together if they differ only by the last - * component. - */ - private static boolean isSameRow(ByteBuffer[] c1, ByteBuffer[] c2) - { - // Cql don't allow to insert columns who doesn't have all component of - // the composite set for sparse composite. Someone coming from thrift - // could hit that though. But since we have no way to handle this - // correctly, better fail here and tell whomever may hit that (if - // someone ever do) to change the definition to a dense composite - assert c1.length == c2.length : "Sparse composite should not have partial column names"; - for (int i = 0; i < c1.length - 1; i++) - { - if (!c1[i].equals(c2[i])) - return false; - } - return true; - } - - private void handleGroup(List> selection, ByteBuffer key, ByteBuffer[] components, Map columns, ResultSet cqlRows) + private void handleGroup(List> selection, ByteBuffer key, ColumnGroupMap columns, ResultSet cqlRows) { // Respect requested order for (Pair p : selection) @@ -844,13 +795,18 @@ public class SelectStatement implements CQLStatement cqlRows.addColumnValue(key); break; case COLUMN_ALIAS: - cqlRows.addColumnValue(components[name.position]); + cqlRows.addColumnValue(columns.getKeyComponent(name.position)); break; case VALUE_ALIAS: // This should not happen for SPARSE throw new AssertionError(); case COLUMN_METADATA: - IColumn c = columns.get(name.name.key); + if (name.type instanceof CollectionType) + { + cqlRows.addColumnValue(((CollectionType)name.type).serializeForThrift(columns.getCollection(name.name.key))); + break; + } + IColumn c = columns.getSimple(name.name.key); addReturnValue(cqlRows, selector, c); break; default: http://git-wip-us.apache.org/repos/asf/cassandra/blob/91bdf7fb/src/java/org/apache/cassandra/cql3/statements/Selector.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/Selector.java b/src/java/org/apache/cassandra/cql3/statements/Selector.java index 21105c0..5847b1c 100644 --- a/src/java/org/apache/cassandra/cql3/statements/Selector.java +++ b/src/java/org/apache/cassandra/cql3/statements/Selector.java @@ -21,8 +21,9 @@ package org.apache.cassandra.cql3.statements; import com.google.common.base.Objects; import org.apache.cassandra.cql3.ColumnIdentifier; +import org.apache.cassandra.cql3.Term; -public interface Selector +public abstract class Selector { public enum Function { @@ -42,11 +43,29 @@ public interface Selector } } - public ColumnIdentifier id(); - public boolean hasFunction(); - public Function function(); + public abstract ColumnIdentifier id(); - public static class WithFunction implements Selector + public boolean hasFunction() + { + return false; + } + + public Function function() + { + return null; + } + + public boolean hasKey() + { + return false; + } + + public Term key() + { + return null; + } + + public static class WithFunction extends Selector { private final Function function; private final ColumnIdentifier id; @@ -95,4 +114,53 @@ public interface Selector return function + "(" + id + ")"; } } + + public static class WithKey extends Selector + { + private final ColumnIdentifier id; + private final Term key; + + public WithKey(ColumnIdentifier id, Term key) + { + this.id = id; + this.key = key; + } + + public ColumnIdentifier id() + { + return id; + } + + @Override + public boolean hasKey() + { + return true; + } + + public Term key() + { + return key; + } + + @Override + public final int hashCode() + { + return Objects.hashCode(id, key); + } + + @Override + public final boolean equals(Object o) + { + if(!(o instanceof WithKey)) + return false; + WithKey that = (WithKey)o; + return id().equals(that.id()) && key.equals(that.key); + } + + @Override + public String toString() + { + return id + "[" + key + "]"; + } + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/91bdf7fb/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java b/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java index 3167b40..dc670dc 100644 --- a/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java @@ -17,18 +17,25 @@ */ package org.apache.cassandra.cql3.statements; +import java.io.IOError; +import java.io.IOException; import java.nio.ByteBuffer; import java.util.*; +import java.util.concurrent.TimeoutException; + +import com.google.common.collect.ArrayListMultimap; import org.apache.cassandra.cql3.*; import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.db.*; import org.apache.cassandra.db.filter.QueryPath; -import org.apache.cassandra.db.marshal.AbstractType; -import org.apache.cassandra.db.marshal.LongType; +import org.apache.cassandra.db.marshal.*; import org.apache.cassandra.service.ClientState; +import org.apache.cassandra.service.StorageProxy; import org.apache.cassandra.thrift.InvalidRequestException; +import org.apache.cassandra.thrift.UnavailableException; import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.cassandra.utils.Pair; import static org.apache.cassandra.cql.QueryProcessor.validateColumnName; import static org.apache.cassandra.cql.QueryProcessor.validateKey; @@ -43,12 +50,12 @@ import static org.apache.cassandra.thrift.ThriftValidation.validateCommutativeFo public class UpdateStatement extends ModificationStatement { private CFDefinition cfDef; - private final Map columns; + private final List> columns; private final List columnNames; - private final List columnValues; + private final List columnValues; private final List whereClause; - private final Map processedColumns = new HashMap(); + private final ArrayListMultimap processedColumns = ArrayListMultimap.create(); private final Map> processedKeys = new HashMap>(); /** @@ -61,7 +68,7 @@ public class UpdateStatement extends ModificationStatement * @param attrs additional attributes for statement (CL, timestamp, timeToLive) */ public UpdateStatement(CFName name, - Map columns, + List> columns, List whereClause, Attributes attrs) { @@ -84,9 +91,9 @@ public class UpdateStatement extends ModificationStatement * @param attrs additional attributes for statement (CL, timestamp, timeToLive) */ public UpdateStatement(CFName name, + Attributes attrs, List columnNames, - List columnValues, - Attributes attrs) + List columnValues) { super(name, attrs); @@ -96,8 +103,9 @@ public class UpdateStatement extends ModificationStatement this.columns = null; } + /** {@inheritDoc} */ - public List getMutations(ClientState clientState, List variables) throws InvalidRequestException + public List getMutations(ClientState clientState, List variables) throws UnavailableException, TimeoutException, InvalidRequestException { // Check key List keys = processedKeys.get(cfDef.key.name); @@ -127,14 +135,41 @@ public class UpdateStatement extends ModificationStatement } } - List rowMutations = new LinkedList(); - + List rawKeys = new ArrayList(keys.size()); for (Term key: keys) + rawKeys.add(key.getByteBuffer(cfDef.key.type, variables)); + + // Lists SET operation incurs a read. Do that now. Note that currently, + // if there is at least one list, we just read the whole "row" (in the CQL sense of + // row) to simplify. Once #3885 is in, we can improve. + boolean needsReading = false; + for (Map.Entry entry : processedColumns.entries()) { - ByteBuffer rawKey = key.getByteBuffer(cfDef.key.type, variables); - rowMutations.add(mutationForKey(cfDef, clientState, rawKey, builder, variables)); + CFDefinition.Name name = entry.getKey(); + Operation value = entry.getValue(); + + if (!(name.type instanceof ListType)) + continue; + + if (value == null || value.type != Operation.Type.FUNCTION) + continue; + + Operation.Function fOp = (Operation.Function)value; + if (fOp.fct.needsReading) + { + needsReading = true; + break; + } } + Map rows = needsReading ? readRows(rawKeys, builder, (CompositeType)cfDef.cfm.comparator) : null; + + List rowMutations = new LinkedList(); + UpdateParameters params = new UpdateParameters(variables, getTimestamp(clientState), timeToLive); + + for (ByteBuffer key: rawKeys) + rowMutations.add(mutationForKey(cfDef, key, builder, params, rows == null ? null : rows.get(key))); + return rowMutations; } @@ -151,7 +186,7 @@ public class UpdateStatement extends ModificationStatement * * @throws InvalidRequestException on the wrong request */ - private IMutation mutationForKey(CFDefinition cfDef, ClientState clientState, ByteBuffer key, ColumnNameBuilder builder, List variables) + private IMutation mutationForKey(CFDefinition cfDef, ByteBuffer key, ColumnNameBuilder builder, UpdateParameters params, ColumnGroupMap group) throws InvalidRequestException { validateKey(key); @@ -167,70 +202,92 @@ public class UpdateStatement extends ModificationStatement if (builder.componentCount() == 0) throw new InvalidRequestException(String.format("Missing PRIMARY KEY part %s", cfDef.columns.values().iterator().next())); - Operation value = processedColumns.get(cfDef.value.name); - if (value == null) + List value = processedColumns.get(cfDef.value); + if (value.isEmpty()) throw new InvalidRequestException(String.format("Missing mandatory column %s", cfDef.value)); - hasCounterColumn = addToMutation(clientState, cf, builder.build(), cfDef.value, value, variables); + assert value.size() == 1; + hasCounterColumn = addToMutation(cf, builder, cfDef.value, value.get(0), params, null); } else { - for (CFDefinition.Name name : cfDef.metadata.values()) + for (Map.Entry entry : processedColumns.entries()) { - Operation value = processedColumns.get(name.name); - if (value == null) - continue; + CFDefinition.Name name = entry.getKey(); + Operation value = entry.getValue(); - ByteBuffer colName = builder.copy().add(name.name.key).build(); - hasCounterColumn |= addToMutation(clientState, cf, colName, name, value, variables); + hasCounterColumn |= addToMutation(cf, builder.copy().add(name.name.key), name, value, params, group == null ? null : group.getCollection(name.name.key)); } } return (hasCounterColumn) ? new CounterMutation(rm, getConsistencyLevel()) : rm; } - private boolean addToMutation(ClientState clientState, - ColumnFamily cf, - ByteBuffer colName, + private boolean addToMutation(ColumnFamily cf, + ColumnNameBuilder builder, CFDefinition.Name valueDef, Operation value, - List variables) throws InvalidRequestException + UpdateParameters params, + List> list) throws InvalidRequestException { - if (value.isUnary()) + switch (value.type) { - validateColumnName(colName); - ByteBuffer valueBytes = value.value.getByteBuffer(valueDef.type, variables); - Column c = timeToLive > 0 - ? new ExpiringColumn(colName, valueBytes, getTimestamp(clientState), timeToLive) - : new Column(colName, valueBytes, getTimestamp(clientState)); - cf.addColumn(c); - return false; - } - else - { - if (!valueDef.name.equals(value.ident)) - throw new InvalidRequestException("Only expressions like X = X + are supported."); + case SET: + Value v = ((Operation.Set)value).value; + if (v instanceof Term) + { + ByteBuffer colName = builder.build(); + validateColumnName(colName); + ByteBuffer valueBytes = ((Term)v).getByteBuffer(valueDef.type, params.variables); + cf.addColumn(params.makeColumn(colName, valueBytes)); + } + else + { + assert v instanceof Value.CollectionLiteral; + Value.CollectionLiteral l = (Value.CollectionLiteral)v; + l.validateType(valueDef); - long val; - try - { - val = ByteBufferUtil.toLong(value.value.getByteBuffer(LongType.instance, variables)); - } - catch (NumberFormatException e) - { - throw new InvalidRequestException(String.format("'%s' is an invalid value, should be a long.", - value.value.getText())); - } + // Remove previous + cf.addAtom(params.makeTombstoneForOverwrite(builder.copy().build(), builder.copy().buildAsEndOfRange())); - if (value.type == Operation.Type.MINUS) - { - if (val == Long.MIN_VALUE) - throw new InvalidRequestException("The negation of " + val + " overflows supported integer precision (signed 8 bytes integer)"); + if (!l.isEmpty()) + addToMutation(cf, builder, valueDef, new Operation.Function(l.constructionFunction(), l.asList()), params, null); + } + return false; + case COUNTER: + Operation.Counter cOp = (Operation.Counter)value; + long val; + try + { + val = ByteBufferUtil.toLong(cOp.value.getByteBuffer(LongType.instance, params.variables)); + } + catch (NumberFormatException e) + { + throw new InvalidRequestException(String.format("'%s' is an invalid value, should be a long.", + cOp.value.getText())); + } + + if (cOp.isSubstraction) + { + if (val == Long.MIN_VALUE) + throw new InvalidRequestException("The negation of " + val + " overflows supported integer precision (signed 8 bytes integer)"); + else + val = -val; + } + cf.addCounter(new QueryPath(columnFamily(), null, builder.build()), val); + return true; + case FUNCTION: + Operation.Function fOp = (Operation.Function)value; + if (!(valueDef.type instanceof CollectionType)) + throw new InvalidRequestException(String.format("Invalid operation %s, %s is not a collection", fOp.fct, valueDef.name)); + + if ((valueDef.type instanceof ListType) && fOp.fct.needsReading) + ((ListType)valueDef.type).execute(cf, builder, fOp.fct, fOp.arguments, params, list); else - val = -val; - } - cf.addCounter(new QueryPath(columnFamily(), null, colName), val); - return true; + ((CollectionType)valueDef.type).execute(cf, builder, fOp.fct, fOp.arguments, params); + + return false; } + throw new AssertionError(); } public ParsedStatement.Prepared prepare(CFDefinition.Name[] boundNames) throws InvalidRequestException @@ -239,13 +296,13 @@ public class UpdateStatement extends ModificationStatement if (columns != null) { - for (Map.Entry column : columns.entrySet()) + for (Pair column : columns) { - if (!column.getValue().isUnary()) + if (column.right.type == Operation.Type.COUNTER) hasCommutativeOperation = true; - if (hasCommutativeOperation && column.getValue().isUnary()) - throw new InvalidRequestException("Mix of commutative and non-commutative operations is not allowed."); + if (hasCommutativeOperation && column.right.type != Operation.Type.COUNTER) + throw new InvalidRequestException("Mix of counter and non-counter operations is not allowed."); } } @@ -271,23 +328,26 @@ public class UpdateStatement extends ModificationStatement if (name == null) throw new InvalidRequestException(String.format("Unknown identifier %s", columnNames.get(i))); - Term value = columnValues.get(i); - if (value.isBindMarker()) - boundNames[value.bindIndex] = name; + Value value = columnValues.get(i); + for (Term t : value.asList()) + if (t.isBindMarker()) + boundNames[t.bindIndex] = name; switch (name.kind) { case KEY_ALIAS: case COLUMN_ALIAS: if (processedKeys.containsKey(name.name)) - throw new InvalidRequestException(String.format("Multiple definition found for PRIMARY KEY part %s", name)); - processedKeys.put(name.name, Collections.singletonList(value)); + throw new InvalidRequestException(String.format("Multiple definitions found for PRIMARY KEY part %s", name)); + if (!(value instanceof Term)) + throw new InvalidRequestException(String.format("Invalid definition for %s, not a collection type", name)); + processedKeys.put(name.name, Collections.singletonList((Term)value)); break; case VALUE_ALIAS: case COLUMN_METADATA: - if (processedColumns.containsKey(name.name)) - throw new InvalidRequestException(String.format("Multiple definition found for column %s", name)); - processedColumns.put(name.name, new Operation(value)); + if (processedColumns.containsKey(name)) + throw new InvalidRequestException(String.format("Multiple definitions found for column %s", name)); + processedColumns.put(name, new Operation.Set(value)); break; } } @@ -295,25 +355,28 @@ public class UpdateStatement extends ModificationStatement else { // Created from an UPDATE - for (Map.Entry entry : columns.entrySet()) + for (Pair entry : columns) { - CFDefinition.Name name = cfDef.get(entry.getKey()); + CFDefinition.Name name = cfDef.get(entry.left); if (name == null) - throw new InvalidRequestException(String.format("Unknown identifier %s", entry.getKey())); + throw new InvalidRequestException(String.format("Unknown identifier %s", entry.left)); switch (name.kind) { case KEY_ALIAS: case COLUMN_ALIAS: - throw new InvalidRequestException(String.format("PRIMARY KEY part %s found in SET part", entry.getKey())); + throw new InvalidRequestException(String.format("PRIMARY KEY part %s found in SET part", entry.left)); case VALUE_ALIAS: case COLUMN_METADATA: - if (processedColumns.containsKey(name.name)) - throw new InvalidRequestException(String.format("Multiple definition found for column %s", name)); - Operation op = entry.getValue(); - if (op.value.isBindMarker()) - boundNames[op.value.bindIndex] = name; - processedColumns.put(name.name, op); + for (Operation op : processedColumns.get(name)) + if (op.type != Operation.Type.FUNCTION) + throw new InvalidRequestException(String.format("Multiple definitions found for column %s", name)); + + Operation op = entry.right; + for (Term t : op.allTerms()) + if (t.isBindMarker()) + boundNames[t.bindIndex] = name; + processedColumns.put(name, op); break; } } @@ -351,7 +414,7 @@ public class UpdateStatement extends ModificationStatement throw new InvalidRequestException(String.format("Invalid operator %s for key %s", rel.operator(), rel.getEntity())); if (processed.containsKey(name.name)) - throw new InvalidRequestException(String.format("Multiple definition found for PRIMARY KEY part %s", name)); + throw new InvalidRequestException(String.format("Multiple definitions found for PRIMARY KEY part %s", name)); for (Term value : values) if (value.isBindMarker()) names[value.bindIndex] = name; http://git-wip-us.apache.org/repos/asf/cassandra/blob/91bdf7fb/src/java/org/apache/cassandra/db/ColumnFamilyStore.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java index 3ab570a..f335386 100644 --- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java +++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java @@ -1448,7 +1448,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean data = filter.prune(data); rows.add(new Row(rawRow.key, data)); if (data != null) - columnsCount += data.getLiveColumnCount(); + columnsCount += filter.lastCounted(data); // Update the underlying filter to avoid querying more columns per slice than necessary and to handle paging filter.updateFilter(columnsCount); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/91bdf7fb/src/java/org/apache/cassandra/db/SliceFromReadCommand.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/SliceFromReadCommand.java b/src/java/org/apache/cassandra/db/SliceFromReadCommand.java index b9d8a17..2f90106 100644 --- a/src/java/org/apache/cassandra/db/SliceFromReadCommand.java +++ b/src/java/org/apache/cassandra/db/SliceFromReadCommand.java @@ -55,7 +55,6 @@ public class SliceFromReadCommand extends ReadCommand this.filter = filter; } - public ReadCommand copy() { ReadCommand readCommand = new SliceFromReadCommand(table, key, queryPath, filter); @@ -86,7 +85,7 @@ public class SliceFromReadCommand extends ReadCommand // columns, only l/t end up live after reconciliation. So for next // round we want to ask x column so that x * (l/t) == t, i.e. x = t^2/l. int retryCount = liveColumnsInRow == 0 ? count + 1 : ((count * count) / liveColumnsInRow) + 1; - SliceQueryFilter newFilter = new SliceQueryFilter(filter.slices, filter.reversed, retryCount); + SliceQueryFilter newFilter = filter.withUpdatedCount(retryCount); return new RetriedSliceFromReadCommand(table, key, queryPath, newFilter, getOriginalRequestedCount()); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/91bdf7fb/src/java/org/apache/cassandra/db/filter/ColumnCounter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/filter/ColumnCounter.java b/src/java/org/apache/cassandra/db/filter/ColumnCounter.java new file mode 100644 index 0000000..c642b82 --- /dev/null +++ b/src/java/org/apache/cassandra/db/filter/ColumnCounter.java @@ -0,0 +1,108 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.cassandra.db.filter; + +import java.nio.ByteBuffer; + +import org.apache.cassandra.db.IColumn; +import org.apache.cassandra.db.IColumnContainer; +import org.apache.cassandra.db.marshal.CompositeType; +import org.apache.cassandra.utils.ByteBufferUtil; + +public class ColumnCounter +{ + protected int count; + + public void countColum(IColumn column, IColumnContainer container) + { + if (isLive(column, container)) + count++; + } + + protected static boolean isLive(IColumn column, IColumnContainer container) + { + return column.isLive() && (!container.deletionInfo().isDeleted(column)); + } + + public int count() + { + return count; + } + + public static class GroupByPrefix extends ColumnCounter + { + private final CompositeType type; + private final int toGroup; + private ByteBuffer[] last; + + /** + * A column counter that count only 1 for all the columns sharing a + * given prefix of the key. + * + * @param type the type of the column name. This can be null if {@code + * toGroup} is 0, otherwise it should be a composite. + * @param toGroup the number of composite components on which to group + * column. If 0, all columns are grouped, otherwise we group + * those for which the {@code toGroup} first component are equals. + */ + public GroupByPrefix(CompositeType type, int toGroup) + { + this.type = type; + this.toGroup = toGroup; + + assert toGroup == 0 || type != null; + } + + public void countColum(IColumn column, IColumnContainer container) + { + if (!isLive(column, container)) + return; + + if (toGroup == 0) + { + count = 1; + return; + } + + ByteBuffer[] current = type.split(column.name()); + assert current.length >= toGroup; + + if (last != null) + { + boolean isSameGroup = true; + for (int i = 0; i < toGroup; i++) + { + if (ByteBufferUtil.compareUnsigned(last[i], current[i]) != 0) + { + isSameGroup = false; + break; + } + } + + if (isSameGroup) + return; + } + + count++; + last = current; + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/91bdf7fb/src/java/org/apache/cassandra/db/filter/ExtendedFilter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/filter/ExtendedFilter.java b/src/java/org/apache/cassandra/db/filter/ExtendedFilter.java index b3736e8..ec6f6ad 100644 --- a/src/java/org/apache/cassandra/db/filter/ExtendedFilter.java +++ b/src/java/org/apache/cassandra/db/filter/ExtendedFilter.java @@ -101,6 +101,14 @@ public abstract class ExtendedFilter initialFilter().updateColumnsLimit(remaining); } + public int lastCounted(ColumnFamily data) + { + if (initialFilter() instanceof SliceQueryFilter) + return ((SliceQueryFilter)initialFilter()).lastCounted(); + else + return data.getLiveColumnCount(); + } + /** The initial filter we'll do our first slice with (either the original or a superset of it) */ public abstract IFilter initialFilter(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/91bdf7fb/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java b/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java index adf4204..2037d66 100644 --- a/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java +++ b/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java @@ -34,6 +34,7 @@ import org.apache.cassandra.db.*; import org.apache.cassandra.db.columniterator.OnDiskAtomIterator; import org.apache.cassandra.db.columniterator.SSTableSliceIterator; import org.apache.cassandra.db.marshal.AbstractType; +import org.apache.cassandra.db.marshal.CompositeType; import org.apache.cassandra.io.IVersionedSerializer; import org.apache.cassandra.io.sstable.SSTableReader; import org.apache.cassandra.io.util.FileDataInput; @@ -48,6 +49,12 @@ public class SliceQueryFilter implements IFilter public final ColumnSlice[] slices; public final boolean reversed; public volatile int count; + private final int compositesToGroup; + // This is a hack to allow rolling upgrade with pre-1.2 nodes + private final int countMutliplierForCompatibility; + + // Not serialized, just a ack for range slices to find the number of live column counted, even when we group + private ColumnCounter columnCounter; public SliceQueryFilter(ByteBuffer start, ByteBuffer finish, boolean reversed, int count) { @@ -60,9 +67,21 @@ public class SliceQueryFilter implements IFilter */ public SliceQueryFilter(ColumnSlice[] slices, boolean reversed, int count) { + this(slices, reversed, count, -1, 1); + } + + public SliceQueryFilter(ColumnSlice[] slices, boolean reversed, int count, int compositesToGroup, int countMutliplierForCompatibility) + { this.slices = slices; this.reversed = reversed; this.count = count; + this.compositesToGroup = compositesToGroup; + this.countMutliplierForCompatibility = countMutliplierForCompatibility; + } + + public SliceQueryFilter withUpdatedCount(int newCount) + { + return new SliceQueryFilter(slices, reversed, newCount, compositesToGroup, countMutliplierForCompatibility); } public OnDiskAtomIterator getMemtableColumnIterator(ColumnFamily cf, DecoratedKey key) @@ -119,25 +138,26 @@ public class SliceQueryFilter implements IFilter public void collectReducedColumns(IColumnContainer container, Iterator reducedColumns, int gcBefore) { - int liveColumns = 0; AbstractType comparator = container.getComparator(); + if (compositesToGroup < 0) + columnCounter = new ColumnCounter(); + else if (compositesToGroup == 0) + columnCounter = new ColumnCounter.GroupByPrefix(null, 0); + else + columnCounter = new ColumnCounter.GroupByPrefix((CompositeType)comparator, compositesToGroup); + while (reducedColumns.hasNext()) { - if (liveColumns >= count) + if (columnCounter.count() >= count) break; IColumn column = reducedColumns.next(); if (logger.isDebugEnabled()) logger.debug(String.format("collecting %s of %s: %s", - liveColumns, count, column.getString(comparator))); + columnCounter.count(), count, column.getString(comparator))); - // only count live columns towards the `count` criteria - if (column.isLive() - && (!container.deletionInfo().isDeleted(column))) - { - liveColumns++; - } + columnCounter.countColum(column, container); // but we need to add all non-gc-able columns to the result for read repair: if (QueryFilter.isRelevant(column, container, gcBefore)) @@ -161,6 +181,11 @@ public class SliceQueryFilter implements IFilter this.slices[0] = new ColumnSlice(start, this.slices[0].finish); } + public int lastCounted() + { + return columnCounter == null ? 0 : columnCounter.count(); + } + @Override public String toString() { @@ -194,7 +219,15 @@ public class SliceQueryFilter implements IFilter ColumnSlice.serializer.serialize(slice, dos, version); } dos.writeBoolean(f.reversed); - dos.writeInt(f.count); + int count = f.count; + if (f.compositesToGroup > 0 && version < MessagingService.VERSION_12) + count *= f.countMutliplierForCompatibility; + dos.writeInt(count); + + if (version < MessagingService.VERSION_12) + return; + + dos.writeInt(f.compositesToGroup); } public SliceQueryFilter deserialize(DataInput dis, int version) throws IOException @@ -212,7 +245,11 @@ public class SliceQueryFilter implements IFilter } boolean reversed = dis.readBoolean(); int count = dis.readInt(); - return new SliceQueryFilter(slices, reversed, count); + int compositesToGroup = -1; + if (version >= MessagingService.VERSION_12) + compositesToGroup = dis.readInt(); + + return new SliceQueryFilter(slices, reversed, count, compositesToGroup, 1); } public long serializedSize(SliceQueryFilter f, int version) @@ -232,6 +269,9 @@ public class SliceQueryFilter implements IFilter } size += sizes.sizeof(f.reversed); size += sizes.sizeof(f.count); + + if (version >= MessagingService.VERSION_12) + size += sizes.sizeof(f.compositesToGroup); return size; } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/91bdf7fb/src/java/org/apache/cassandra/db/marshal/AbstractCompositeType.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/marshal/AbstractCompositeType.java b/src/java/org/apache/cassandra/db/marshal/AbstractCompositeType.java index 6536318..e138b67 100644 --- a/src/java/org/apache/cassandra/db/marshal/AbstractCompositeType.java +++ b/src/java/org/apache/cassandra/db/marshal/AbstractCompositeType.java @@ -68,6 +68,8 @@ public abstract class AbstractCompositeType extends AbstractType ByteBuffer bb2 = o2.duplicate(); int i = 0; + ByteBuffer previous = null; + while (bb1.remaining() > 0 && bb2.remaining() > 0) { AbstractType comparator = getComparator(i, bb1, bb2); @@ -75,10 +77,12 @@ public abstract class AbstractCompositeType extends AbstractType ByteBuffer value1 = getWithShortLength(bb1); ByteBuffer value2 = getWithShortLength(bb2); - int cmp = comparator.compare(value1, value2); + int cmp = comparator.compareCollectionMembers(value1, value2, previous); if (cmp != 0) return cmp; + previous = value1; + byte b1 = bb1.get(); byte b2 = bb2.get(); if (b1 < 0) @@ -238,6 +242,7 @@ public abstract class AbstractCompositeType extends AbstractType ByteBuffer bb = bytes.duplicate(); int i = 0; + ByteBuffer previous = null; while (bb.remaining() > 0) { AbstractType comparator = validateComparator(i, bb); @@ -250,13 +255,15 @@ public abstract class AbstractCompositeType extends AbstractType throw new MarshalException("Not enough bytes to read value of component " + i); ByteBuffer value = getBytes(bb, length); - comparator.validate(value); + comparator.validateCollectionMember(value, previous); if (bb.remaining() == 0) throw new MarshalException("Not enough bytes to read the end-of-component byte of component" + i); byte b = bb.get(); if (b != 0 && bb.remaining() != 0) throw new MarshalException("Invalid bytes remaining after an end-of-component at component" + i); + + previous = value; ++i; } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/91bdf7fb/src/java/org/apache/cassandra/db/marshal/AbstractType.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/marshal/AbstractType.java b/src/java/org/apache/cassandra/db/marshal/AbstractType.java index 7eab64c..148138a 100644 --- a/src/java/org/apache/cassandra/db/marshal/AbstractType.java +++ b/src/java/org/apache/cassandra/db/marshal/AbstractType.java @@ -204,6 +204,29 @@ public abstract class AbstractType implements Comparator } /** + * An alternative comparison function used by CollectionsType in conjunction with CompositeType. + * + * This comparator is only called to compare components of a CompositeType. It gets the value of the + * previous component as argument (or null if it's the first component of the composite). + * + * Unless you're doing something very similar to CollectionsType, you shouldn't override this. + */ + public int compareCollectionMembers(ByteBuffer v1, ByteBuffer v2, ByteBuffer collectionName) + { + return compare(v1, v2); + } + + /** + * An alternative validation function used by CollectionsType in conjunction with CompositeType. + * + * This is similar to the compare function above. + */ + public void validateCollectionMember(ByteBuffer bytes, ByteBuffer collectionName) throws MarshalException + { + validate(bytes); + } + + /** * This must be overriden by subclasses if necessary so that for any * AbstractType, this == TypeParser.parse(toString()). * http://git-wip-us.apache.org/repos/asf/cassandra/blob/91bdf7fb/src/java/org/apache/cassandra/db/marshal/CollectionType.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/marshal/CollectionType.java b/src/java/org/apache/cassandra/db/marshal/CollectionType.java new file mode 100644 index 0000000..ab8f15b --- /dev/null +++ b/src/java/org/apache/cassandra/db/marshal/CollectionType.java @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db.marshal; + +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.EnumSet; +import java.util.List; + +import org.apache.cassandra.cql3.ColumnNameBuilder; +import org.apache.cassandra.cql3.Term; +import org.apache.cassandra.cql3.UpdateParameters; +import org.apache.cassandra.db.ColumnFamily; +import org.apache.cassandra.db.IColumn; +import org.apache.cassandra.config.ConfigurationException; +import org.apache.cassandra.thrift.InvalidRequestException; +import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.cassandra.utils.Pair; + +/** + * The abstract validator that is the base for maps, sets and lists. + * + * Please note that this comparator shouldn't be used "manually" (through thrift for instance). + * + */ +public abstract class CollectionType extends AbstractType +{ + public enum Kind + { + MAP, SET, LIST + } + + public enum Function + { + APPEND (false, Kind.LIST), + PREPEND (false, Kind.LIST), + SET ( true, Kind.LIST, Kind.MAP), + ADD (false, Kind.SET), + DISCARD_LIST ( true, Kind.LIST), + DISCARD_SET (false, Kind.SET), + DISCARD_KEY ( true, Kind.LIST, Kind.MAP); + + public final boolean needsReading; + public final EnumSet validReceivers; + + private Function(boolean needsReading, Kind ... validReceivers) + { + this.needsReading = needsReading; + this.validReceivers = EnumSet.copyOf(Arrays.asList(validReceivers)); + } + } + + public final Kind kind; + + protected CollectionType(Kind kind) + { + this.kind = kind; + } + + protected abstract AbstractType nameComparator(); + protected abstract AbstractType valueComparator(); + protected abstract void appendToStringBuilder(StringBuilder sb); + + public void execute(ColumnFamily cf, ColumnNameBuilder fullPath, Function fct, List args, UpdateParameters params) throws InvalidRequestException + { + if (!fct.validReceivers.contains(kind)) + throw new InvalidRequestException(String.format("Invalid operation %s for %s collection", fct, kind)); + + executeFunction(cf, fullPath, fct, args, params); + } + + public abstract void executeFunction(ColumnFamily cf, ColumnNameBuilder fullPath, Function fct, List args, UpdateParameters params) throws InvalidRequestException; + + public abstract ByteBuffer serializeForThrift(List> columns); + + @Override + public String toString() + { + StringBuilder sb = new StringBuilder(); + appendToStringBuilder(sb); + return sb.toString(); + } + + public int compare(ByteBuffer o1, ByteBuffer o2) + { + throw new UnsupportedOperationException("CollectionType should not be use directly as a comparator"); + } + + public ByteBuffer compose(ByteBuffer bytes) + { + return BytesType.instance.compose(bytes); + } + + public ByteBuffer decompose(ByteBuffer value) + { + return BytesType.instance.decompose(value); + } + + public String getString(ByteBuffer bytes) + { + return BytesType.instance.getString(bytes); + } + + public ByteBuffer fromString(String source) + { + try + { + return ByteBufferUtil.hexToBytes(source); + } + catch (NumberFormatException e) + { + throw new MarshalException(String.format("cannot parse '%s' as hex bytes", source), e); + } + } + + public void validate(ByteBuffer bytes) + { + valueComparator().validate(bytes); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/91bdf7fb/src/java/org/apache/cassandra/db/marshal/ColumnToCollectionType.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/marshal/ColumnToCollectionType.java b/src/java/org/apache/cassandra/db/marshal/ColumnToCollectionType.java new file mode 100644 index 0000000..4ba73aa --- /dev/null +++ b/src/java/org/apache/cassandra/db/marshal/ColumnToCollectionType.java @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db.marshal; + +import java.nio.ByteBuffer; +import java.util.HashMap; +import java.util.Map; + +import com.google.common.collect.ImmutableMap; + +import org.apache.cassandra.config.ConfigurationException; +import org.apache.cassandra.utils.ByteBufferUtil; + +public class ColumnToCollectionType extends AbstractType +{ + // interning instances + private static final Map, ColumnToCollectionType> instances = new HashMap, ColumnToCollectionType>(); + + public final Map defined; + + public static ColumnToCollectionType getInstance(TypeParser parser) throws ConfigurationException + { + return getInstance(parser.getCollectionsParameters()); + } + + public static synchronized ColumnToCollectionType getInstance(Map defined) + { + assert defined != null; + + ColumnToCollectionType t = instances.get(defined); + if (t == null) + { + t = new ColumnToCollectionType(defined); + instances.put(defined, t); + } + return t; + } + + private ColumnToCollectionType(Map defined) + { + this.defined = ImmutableMap.copyOf(defined); + } + + public int compare(ByteBuffer o1, ByteBuffer o2) + { + throw new UnsupportedOperationException("ColumnToCollectionType should only be used in composite types, never alone"); + } + + public int compareCollectionMembers(ByteBuffer o1, ByteBuffer o2, ByteBuffer collectionName) + { + CollectionType t = defined.get(collectionName); + if (t == null) + throw new RuntimeException(ByteBufferUtil.bytesToHex(collectionName) + " is not defined as a collection"); + + return t.nameComparator().compare(o1, o2); + } + + public ByteBuffer compose(ByteBuffer bytes) + { + return BytesType.instance.compose(bytes); + } + + public ByteBuffer decompose(ByteBuffer value) + { + return BytesType.instance.decompose(value); + } + + public String getString(ByteBuffer bytes) + { + return BytesType.instance.getString(bytes); + } + + public ByteBuffer fromString(String source) + { + try + { + return ByteBufferUtil.hexToBytes(source); + } + catch (NumberFormatException e) + { + throw new MarshalException(String.format("cannot parse '%s' as hex bytes", source), e); + } + } + + public void validate(ByteBuffer bytes) + { + throw new UnsupportedOperationException("ColumnToCollectionType should only be used in composite types, never alone"); + } + + public void validateCollectionMember(ByteBuffer bytes, ByteBuffer collectionName) throws MarshalException + { + CollectionType t = defined.get(collectionName); + if (t == null) + throw new MarshalException(ByteBufferUtil.bytesToHex(collectionName) + " is not defined as a collection"); + + t.nameComparator().validate(bytes); + } + + @Override + public boolean isCompatibleWith(AbstractType previous) + { + if (!(previous instanceof ColumnToCollectionType)) + return false; + + ColumnToCollectionType prev = (ColumnToCollectionType)previous; + // We are compatible if we have all the definitions previous have (but we can have more). + for (Map.Entry entry : prev.defined.entrySet()) + { + if (!entry.getValue().isCompatibleWith(defined.get(entry.getKey()))) + return false; + } + return true; + } + + @Override + public String toString() + { + return getClass().getName() + TypeParser.stringifyCollectionsParameters(defined); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/91bdf7fb/src/java/org/apache/cassandra/db/marshal/CompositeType.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/marshal/CompositeType.java b/src/java/org/apache/cassandra/db/marshal/CompositeType.java index 4d26dac..516c44b 100644 --- a/src/java/org/apache/cassandra/db/marshal/CompositeType.java +++ b/src/java/org/apache/cassandra/db/marshal/CompositeType.java @@ -18,11 +18,14 @@ package org.apache.cassandra.db.marshal; import java.nio.ByteBuffer; +import java.util.Arrays; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; +import com.google.common.collect.ImmutableList; + import org.apache.cassandra.config.ConfigurationException; import org.apache.cassandra.cql3.ColumnNameBuilder; import org.apache.cassandra.cql3.Relation; @@ -78,7 +81,7 @@ public class CompositeType extends AbstractCompositeType private CompositeType(List> types) { - this.types = types; + this.types = ImmutableList.copyOf(types); } protected AbstractType getComparator(int i, ByteBuffer bb) @@ -195,18 +198,29 @@ public class CompositeType extends AbstractCompositeType private final CompositeType composite; private int current; - private final DataOutputBuffer out = new DataOutputBuffer(); + private final List components; + private final byte[] endOfComponents; + private int serializedSize; public Builder(CompositeType composite) { + this(composite, new ArrayList(composite.types.size()), new byte[composite.types.size()]); + } + + public Builder(CompositeType composite, List components, byte[] endOfComponents) + { + assert endOfComponents.length == composite.types.size(); + this.composite = composite; + this.components = components; + this.endOfComponents = endOfComponents; } private Builder(Builder b) { - this(b.composite); + this(b.composite, new ArrayList(b.components), Arrays.copyOf(b.endOfComponents, b.endOfComponents.length)); this.current = b.current; - out.write(b.out.getData(), 0, b.out.getLength()); + this.serializedSize = b.serializedSize; } public Builder add(Term t, Relation.Type op, List variables) throws InvalidRequestException @@ -214,9 +228,9 @@ public class CompositeType extends AbstractCompositeType if (current >= composite.types.size()) throw new IllegalStateException("Composite column is already fully constructed"); - AbstractType currentType = composite.types.get(current++); + AbstractType currentType = composite.types.get(current); ByteBuffer buffer = t.getByteBuffer(currentType, variables); - ByteBufferUtil.writeWithShortLength(buffer, out); + components.add(buffer); /* * Given the rules for eoc (end-of-component, see AbstractCompositeType.compare()), @@ -230,16 +244,17 @@ public class CompositeType extends AbstractCompositeType switch (op) { case LT: - out.write((byte) -1); + endOfComponents[current] = (byte) -1; break; case GT: case LTE: - out.write((byte) 1); + endOfComponents[current] = (byte) 1; break; default: - out.write((byte) 0); + endOfComponents[current] = (byte) 0; break; } + ++current; return this; } @@ -248,8 +263,8 @@ public class CompositeType extends AbstractCompositeType if (current >= composite.types.size()) throw new IllegalStateException("Composite column is already fully constructed"); - ByteBufferUtil.writeWithShortLength(bb, out); - out.write((byte) 0); + components.add(bb); + endOfComponents[current++] = (byte) 0; return this; } @@ -260,7 +275,12 @@ public class CompositeType extends AbstractCompositeType public ByteBuffer build() { - // potentially slightly space-wasteful in favor of avoiding a copy + DataOutputBuffer out = new DataOutputBuffer(serializedSize); + for (int i = 0; i < components.size(); i++) + { + ByteBufferUtil.writeWithShortLength(components.get(i), out); + out.write(endOfComponents[i]); + } return ByteBuffer.wrap(out.getData(), 0, out.getLength()); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/91bdf7fb/src/java/org/apache/cassandra/db/marshal/EmptyType.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/marshal/EmptyType.java b/src/java/org/apache/cassandra/db/marshal/EmptyType.java new file mode 100644 index 0000000..6bbd006 --- /dev/null +++ b/src/java/org/apache/cassandra/db/marshal/EmptyType.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db.marshal; + +import java.nio.ByteBuffer; + +import org.apache.cassandra.utils.ByteBufferUtil; + +/** + * A type that only accept empty data. + * It is only useful as a value validation type, not as a comparator since column names can't be empty. + */ +public class EmptyType extends AbstractType +{ + public static final EmptyType instance = new EmptyType(); + + private EmptyType() {} // singleton + + public Void compose(ByteBuffer bytes) + { + return null; + } + + public ByteBuffer decompose(Void value) + { + return ByteBufferUtil.EMPTY_BYTE_BUFFER; + } + + public int compare(ByteBuffer o1, ByteBuffer o2) + { + return 0; + } + + public String getString(ByteBuffer bytes) + { + return ""; + } + + public ByteBuffer fromString(String source) throws MarshalException + { + if (!source.isEmpty()) + throw new MarshalException(String.format("'%s' is not empty", source)); + + return ByteBufferUtil.EMPTY_BYTE_BUFFER; + } + + public void validate(ByteBuffer bytes) throws MarshalException + { + if (bytes.remaining() > 0) + throw new MarshalException("EmptyType only accept empty values"); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/91bdf7fb/src/java/org/apache/cassandra/db/marshal/ListType.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/marshal/ListType.java b/src/java/org/apache/cassandra/db/marshal/ListType.java new file mode 100644 index 0000000..8ad4590 --- /dev/null +++ b/src/java/org/apache/cassandra/db/marshal/ListType.java @@ -0,0 +1,235 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db.marshal; + +import java.nio.ByteBuffer; +import java.util.*; +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.cassandra.cql3.ColumnNameBuilder; +import org.apache.cassandra.cql3.Term; +import org.apache.cassandra.cql3.UpdateParameters; +import org.apache.cassandra.db.ColumnFamily; +import org.apache.cassandra.db.IColumn; +import org.apache.cassandra.config.ConfigurationException; +import org.apache.cassandra.thrift.InvalidRequestException; +import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.Pair; +import org.apache.cassandra.utils.UUIDGen; + +public class ListType extends CollectionType +{ + // interning instances + private static final Map, ListType> instances = new HashMap, ListType>(); + + // Our reference time (1 jan 2010, 00:00:00) in milliseconds. + private static final long REFERENCE_TIME = 1262304000000L; + + /* + * For prepend, we need to be able to generate unique but decreasing time + * UUID, which is a bit challenging. To do that, given a time in milliseconds, + * we adds a number represening the 100-nanoseconds precision and make sure + * that within the same millisecond, that number is always increasing. We + * do rely on the fact that the user will only provide decreasing + * milliseconds timestamp for that purpose. + */ + private static class PrecisionTime + { + public final long millis; + public final int nanos; + + public PrecisionTime(long millis, int nanos) + { + this.millis = millis; + this.nanos = nanos; + } + } + + private static final AtomicReference last = new AtomicReference(new PrecisionTime(Long.MAX_VALUE, 0)); + + private static PrecisionTime getNextTime(long millis) + { + while (true) + { + PrecisionTime current = last.get(); + assert millis <= current.millis; + PrecisionTime next = millis < current.millis + ? new PrecisionTime(millis, 0) + : new PrecisionTime(millis, current.nanos + 1); + if (last.compareAndSet(current, next)) + return next; + } + } + + public final AbstractType elements; + + public static ListType getInstance(TypeParser parser) throws ConfigurationException + { + List> l = parser.getTypeParameters(); + if (l.size() != 1) + throw new ConfigurationException("ListType takes exactly 1 type parameter"); + + return getInstance(l.get(0)); + } + + public static synchronized ListType getInstance(AbstractType elements) + { + ListType t = instances.get(elements); + if (t == null) + { + t = new ListType(elements); + instances.put(elements, t); + } + return t; + } + + private ListType(AbstractType elements) + { + super(Kind.LIST); + this.elements = elements; + } + + protected AbstractType nameComparator() + { + return TimeUUIDType.instance; + } + + protected AbstractType valueComparator() + { + return elements; + } + + protected void appendToStringBuilder(StringBuilder sb) + { + sb.append(getClass().getName()).append(TypeParser.stringifyTypeParameters(Collections.>singletonList(elements))); + } + + public void executeFunction(ColumnFamily cf, ColumnNameBuilder fullPath, Function fct, List args, UpdateParameters params) throws InvalidRequestException + { + switch (fct) + { + case APPEND: + doAppend(cf, fullPath, args, params); + break; + case PREPEND: + doPrepend(cf, fullPath, args, params); + break; + default: + throw new AssertionError("Unsupported function " + fct); + } + } + + public void execute(ColumnFamily cf, ColumnNameBuilder fullPath, Function fct, List args, UpdateParameters params, List> list) throws InvalidRequestException + { + switch (fct) + { + case SET: + doSet(cf, fullPath, validateIdx(fct, args.get(0), list), args.get(1), params, list); + break; + case DISCARD_LIST: + // If list is empty, do nothing + if (list != null) + doDiscard(cf, fullPath, args, params, list); + break; + case DISCARD_KEY: + doDiscardIdx(cf, fullPath, validateIdx(fct, args.get(0), list), params, list); + break; + default: + throw new AssertionError(); + } + } + + private int validateIdx(Function fct, Term value, List> list) throws InvalidRequestException + { + try + { + if (value.getType() != Term.Type.INTEGER) + throw new InvalidRequestException(String.format("Invalid argument %s for %s, must be an integer", value.getText(), fct)); + int idx = Integer.parseInt(value.getText()); + if (list == null || list.size() <= idx) + throw new InvalidRequestException(String.format("Invalid index %d, list has size %d", idx, list == null ? 0 : list.size())); + return idx; + } + catch (NumberFormatException e) + { + // This should not happen, unless we screwed up the parser + throw new RuntimeException(); + } + } + + private void doPrepend(ColumnFamily cf, ColumnNameBuilder builder, List values, UpdateParameters params) throws InvalidRequestException + { + long time = REFERENCE_TIME - (System.currentTimeMillis() - REFERENCE_TIME); + // We do the loop in reverse order because getNext() will create increasing time but we want the last + // value in the prepended list to have the lower time + for (int i = values.size() - 1; i >= 0; i--) + { + ColumnNameBuilder b = i == 0 ? builder : builder.copy(); + PrecisionTime pt = getNextTime(time); + ByteBuffer uuid = ByteBuffer.wrap(UUIDGen.getTimeUUIDBytes(pt.millis, pt.nanos)); + ByteBuffer name = b.add(uuid).build(); + cf.addColumn(params.makeColumn(name, values.get(i).getByteBuffer(elements, params.variables))); + } + } + + private void doAppend(ColumnFamily cf, ColumnNameBuilder builder, List values, UpdateParameters params) throws InvalidRequestException + { + for (int i = 0; i < values.size(); i++) + { + ColumnNameBuilder b = i == values.size() - 1 ? builder : builder.copy(); + ByteBuffer uuid = ByteBuffer.wrap(UUIDGen.getTimeUUIDBytes()); + ByteBuffer name = b.add(uuid).build(); + cf.addColumn(params.makeColumn(name, values.get(i).getByteBuffer(elements, params.variables))); + } + } + + public void doSet(ColumnFamily cf, ColumnNameBuilder builder, int idx, Term value, UpdateParameters params, List> list) throws InvalidRequestException + { + ByteBuffer name = list.get(idx).right.name(); + cf.addColumn(params.makeColumn(name, value.getByteBuffer(elements, params.variables))); + } + + public void doDiscard(ColumnFamily cf, ColumnNameBuilder builder, List values, UpdateParameters params, List> list) throws InvalidRequestException + { + Set toDiscard = new HashSet(); + for (Term value : values) + toDiscard.add(value.getByteBuffer(elements, params.variables)); + + for (Pair p : list) + { + IColumn c = p.right; + if (toDiscard.contains(c.value())) + cf.addColumn(params.makeTombstone(c.name())); + } + } + + public void doDiscardIdx(ColumnFamily cf, ColumnNameBuilder builder, int idx, UpdateParameters params, List> list) throws InvalidRequestException + { + ByteBuffer name = list.get(idx).right.name(); + cf.addColumn(params.makeTombstone(name)); + } + + public ByteBuffer serializeForThrift(List> columns) + { + List l = new ArrayList(columns.size()); + for (Pair p : columns) + l.add(elements.compose(p.right.value())); + return ByteBufferUtil.bytes(FBUtilities.json(l)); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/91bdf7fb/src/java/org/apache/cassandra/db/marshal/MapType.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/marshal/MapType.java b/src/java/org/apache/cassandra/db/marshal/MapType.java new file mode 100644 index 0000000..8e29520 --- /dev/null +++ b/src/java/org/apache/cassandra/db/marshal/MapType.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db.marshal; + +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.Iterator; +import java.util.HashMap; +import java.util.List; +import java.util.LinkedHashMap; +import java.util.Map; + +import org.apache.cassandra.cql3.ColumnNameBuilder; +import org.apache.cassandra.cql3.Term; +import org.apache.cassandra.cql3.UpdateParameters; +import org.apache.cassandra.db.ColumnFamily; +import org.apache.cassandra.db.IColumn; +import org.apache.cassandra.config.ConfigurationException; +import org.apache.cassandra.thrift.InvalidRequestException; +import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.Pair; + +public class MapType extends CollectionType +{ + // interning instances + private static final Map, AbstractType>, MapType> instances = new HashMap, AbstractType>, MapType>(); + + public final AbstractType keys; + public final AbstractType values; + + public static MapType getInstance(TypeParser parser) throws ConfigurationException + { + List> l = parser.getTypeParameters(); + if (l.size() != 2) + throw new ConfigurationException("MapType takes exactly 2 type parameters"); + + return getInstance(l.get(0), l.get(1)); + } + + public static synchronized MapType getInstance(AbstractType keys, AbstractType values) + { + Pair, AbstractType> p = Pair., AbstractType>create(keys, values); + MapType t = instances.get(p); + if (t == null) + { + t = new MapType(keys, values); + instances.put(p, t); + } + return t; + } + + private MapType(AbstractType keys, AbstractType values) + { + super(Kind.MAP); + this.keys = keys; + this.values = values; + } + + protected AbstractType nameComparator() + { + return keys; + } + + protected AbstractType valueComparator() + { + return values; + } + + protected void appendToStringBuilder(StringBuilder sb) + { + sb.append(getClass().getName()).append(TypeParser.stringifyTypeParameters(Arrays.asList(keys, values))); + } + + public void executeFunction(ColumnFamily cf, ColumnNameBuilder fullPath, Function fct, List args, UpdateParameters params) throws InvalidRequestException + { + switch (fct) + { + case SET: + doPut(cf, fullPath, args, params); + break; + case DISCARD_KEY: + doDiscard(cf, fullPath, args.get(0), params); + break; + default: + throw new AssertionError("Unsupported function " + fct); + } + } + + private void doPut(ColumnFamily cf, ColumnNameBuilder builder, List args, UpdateParameters params) throws InvalidRequestException + { + assert args.size() % 2 == 0; + Iterator iter = args.iterator(); + while (iter.hasNext()) + { + ByteBuffer name = builder.copy().add(iter.next().getByteBuffer(keys, params.variables)).build(); + ByteBuffer value = iter.next().getByteBuffer(values, params.variables); + cf.addColumn(params.makeColumn(name, value)); + } + } + + private void doDiscard(ColumnFamily cf, ColumnNameBuilder builder, Term value, UpdateParameters params) throws InvalidRequestException + { + ByteBuffer name = builder.add(value.getByteBuffer(keys, params.variables)).build(); + cf.addColumn(params.makeTombstone(name)); + } + + public ByteBuffer serializeForThrift(List> columns) + { + Map m = new LinkedHashMap(); + for (Pair p : columns) + m.put(keys.getString(p.left), values.compose(p.right.value())); + return ByteBufferUtil.bytes(FBUtilities.json(m)); + } +}