Return-Path: X-Original-To: apmail-cassandra-commits-archive@www.apache.org Delivered-To: apmail-cassandra-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id D58F511C7A for ; Wed, 30 Apr 2014 18:23:08 +0000 (UTC) Received: (qmail 80243 invoked by uid 500); 30 Apr 2014 18:23:05 -0000 Delivered-To: apmail-cassandra-commits-archive@cassandra.apache.org Received: (qmail 80081 invoked by uid 500); 30 Apr 2014 18:23:03 -0000 Mailing-List: contact commits-help@cassandra.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@cassandra.apache.org Delivered-To: mailing list commits@cassandra.apache.org Received: (qmail 80060 invoked by uid 99); 30 Apr 2014 18:23:02 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 30 Apr 2014 18:23:02 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 92FEB88A3BD; Wed, 30 Apr 2014 18:23:02 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: slebresne@apache.org To: commits@cassandra.apache.org Date: Wed, 30 Apr 2014 18:23:03 -0000 Message-Id: In-Reply-To: <85b92dcf8fe14e2591008280fa843182@git.apache.org> References: <85b92dcf8fe14e2591008280fa843182@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [2/5] Native protocol v3 http://git-wip-us.apache.org/repos/asf/cassandra/blob/9872b74e/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java index d79bd5b..b9ccd1a 100644 --- a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java @@ -185,23 +185,22 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache public ResultMessage.Rows execute(QueryState state, QueryOptions options) throws RequestExecutionException, RequestValidationException { ConsistencyLevel cl = options.getConsistency(); - List variables = options.getValues(); if (cl == null) throw new InvalidRequestException("Invalid empty consistency level"); cl.validateForRead(keyspace()); - int limit = getLimit(variables); + int limit = getLimit(options); int limitForQuery = updateLimitForQuery(limit); long now = System.currentTimeMillis(); Pageable command; if (isKeyRange || usesSecondaryIndexing) { - command = getRangeCommand(variables, limitForQuery, now); + command = getRangeCommand(options, limitForQuery, now); } else { - List commands = getSliceCommands(variables, limitForQuery, now); + List commands = getSliceCommands(options, limitForQuery, now); command = commands == null ? null : new Pageable.ReadCommands(commands); } @@ -214,13 +213,13 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache if (pageSize <= 0 || command == null || !QueryPagers.mayNeedPaging(command, pageSize)) { - return execute(command, cl, variables, limit, now); + return execute(command, options, limit, now); } else { QueryPager pager = QueryPagers.pager(command, cl, options.getPagingState()); if (parameters.isCount) - return pageCountQuery(pager, variables, pageSize, now, limit); + return pageCountQuery(pager, options, pageSize, now, limit); // We can't properly do post-query ordering if we page (see #6722) if (needsPostQueryOrdering()) @@ -228,14 +227,14 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache + "ORDER BY or the IN and sort client side, or disable paging for this query"); List page = pager.fetchPage(pageSize); - ResultMessage.Rows msg = processResults(page, variables, limit, now); + ResultMessage.Rows msg = processResults(page, options, limit, now); if (!pager.isExhausted()) msg.result.metadata.setHasMorePages(pager.state()); return msg; } } - private ResultMessage.Rows execute(Pageable command, ConsistencyLevel cl, List variables, int limit, long now) throws RequestValidationException, RequestExecutionException + private ResultMessage.Rows execute(Pageable command, QueryOptions options, int limit, long now) throws RequestValidationException, RequestExecutionException { List rows; if (command == null) @@ -245,21 +244,21 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache else { rows = command instanceof Pageable.ReadCommands - ? StorageProxy.read(((Pageable.ReadCommands)command).commands, cl) - : StorageProxy.getRangeSlice((RangeSliceCommand)command, cl); + ? StorageProxy.read(((Pageable.ReadCommands)command).commands, options.getConsistency()) + : StorageProxy.getRangeSlice((RangeSliceCommand)command, options.getConsistency()); } - return processResults(rows, variables, limit, now); + return processResults(rows, options, limit, now); } - private ResultMessage.Rows pageCountQuery(QueryPager pager, List variables, int pageSize, long now, int limit) throws RequestValidationException, RequestExecutionException + private ResultMessage.Rows pageCountQuery(QueryPager pager, QueryOptions options, int pageSize, long now, int limit) throws RequestValidationException, RequestExecutionException { int count = 0; while (!pager.isExhausted()) { int maxLimit = pager.maxRemaining(); logger.debug("New maxLimit for paged count query is {}", maxLimit); - ResultSet rset = process(pager.fetchPage(pageSize), variables, maxLimit, now); + ResultSet rset = process(pager.fetchPage(pageSize), options, maxLimit, now); count += rset.rows.size(); } @@ -269,10 +268,10 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache return new ResultMessage.Rows(result); } - public ResultMessage.Rows processResults(List rows, List variables, int limit, long now) throws RequestValidationException + public ResultMessage.Rows processResults(List rows, QueryOptions options, int limit, long now) throws RequestValidationException { // Even for count, we need to process the result as it'll group some column together in sparse column families - ResultSet rset = process(rows, variables, limit, now); + ResultSet rset = process(rows, options, limit, now); rset = parameters.isCount ? rset.makeCountResult(parameters.countAlias) : rset; return new ResultMessage.Rows(rset); } @@ -288,29 +287,30 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache public ResultMessage.Rows executeInternal(QueryState state) throws RequestExecutionException, RequestValidationException { - List variables = Collections.emptyList(); - int limit = getLimit(variables); + QueryOptions options = QueryOptions.DEFAULT; + int limit = getLimit(options); int limitForQuery = updateLimitForQuery(limit); long now = System.currentTimeMillis(); List rows; if (isKeyRange || usesSecondaryIndexing) { - RangeSliceCommand command = getRangeCommand(variables, limitForQuery, now); + RangeSliceCommand command = getRangeCommand(options, limitForQuery, now); rows = command == null ? Collections.emptyList() : command.executeLocally(); } else { - List commands = getSliceCommands(variables, limitForQuery, now); + List commands = getSliceCommands(options, limitForQuery, now); rows = commands == null ? Collections.emptyList() : readLocally(keyspace(), commands); } - return processResults(rows, variables, limit, now); + return processResults(rows, options, limit, now); } public ResultSet process(List rows) throws InvalidRequestException { assert !parameters.isCount; // not yet needed - return process(rows, Collections.emptyList(), getLimit(Collections.emptyList()), System.currentTimeMillis()); + QueryOptions options = QueryOptions.DEFAULT; + return process(rows, options, getLimit(options), System.currentTimeMillis()); } public String keyspace() @@ -323,15 +323,15 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache return cfm.cfName; } - private List getSliceCommands(List variables, int limit, long now) throws RequestValidationException + private List getSliceCommands(QueryOptions options, int limit, long now) throws RequestValidationException { - Collection keys = getKeys(variables); + Collection keys = getKeys(options); if (keys.isEmpty()) // in case of IN () for (the last column of) the partition key. return null; List commands = new ArrayList(keys.size()); - IDiskAtomFilter filter = makeFilter(variables, limit); + IDiskAtomFilter filter = makeFilter(options, limit); if (filter == null) return null; @@ -349,29 +349,29 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache return commands; } - private RangeSliceCommand getRangeCommand(List variables, int limit, long now) throws RequestValidationException + private RangeSliceCommand getRangeCommand(QueryOptions options, int limit, long now) throws RequestValidationException { - IDiskAtomFilter filter = makeFilter(variables, limit); + IDiskAtomFilter filter = makeFilter(options, limit); if (filter == null) return null; - List expressions = getIndexExpressions(variables); + List expressions = getIndexExpressions(options); // The LIMIT provided by the user is the number of CQL row he wants returned. // We want to have getRangeSlice to count the number of columns, not the number of keys. - AbstractBounds keyBounds = getKeyBounds(variables); + AbstractBounds keyBounds = getKeyBounds(options); return keyBounds == null ? null : new RangeSliceCommand(keyspace(), columnFamily(), now, filter, keyBounds, expressions, limit, !parameters.isDistinct, false); } - private AbstractBounds getKeyBounds(List variables) throws InvalidRequestException + private AbstractBounds getKeyBounds(QueryOptions options) throws InvalidRequestException { IPartitioner p = StorageService.getPartitioner(); if (onToken) { - Token startToken = getTokenBound(Bound.START, variables, p); - Token endToken = getTokenBound(Bound.END, variables, p); + Token startToken = getTokenBound(Bound.START, options, p); + Token endToken = getTokenBound(Bound.END, options, p); boolean includeStart = includeKeyBound(Bound.START); boolean includeEnd = includeKeyBound(Bound.END); @@ -397,8 +397,8 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache } else { - ByteBuffer startKeyBytes = getKeyBound(Bound.START, variables); - ByteBuffer finishKeyBytes = getKeyBound(Bound.END, variables); + ByteBuffer startKeyBytes = getKeyBound(Bound.START, options); + ByteBuffer finishKeyBytes = getKeyBound(Bound.END, options); RowPosition startKey = RowPosition.ForKey.get(startKeyBytes, p); RowPosition finishKey = RowPosition.ForKey.get(finishKeyBytes, p); @@ -421,7 +421,7 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache } } - private IDiskAtomFilter makeFilter(List variables, int limit) + private IDiskAtomFilter makeFilter(QueryOptions options, int limit) throws InvalidRequestException { if (parameters.isDistinct) @@ -431,8 +431,8 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache else if (isColumnRange()) { int toGroup = cfm.comparator.isDense() ? -1 : cfm.clusteringColumns().size(); - List startBounds = getRequestedBound(Bound.START, variables); - List endBounds = getRequestedBound(Bound.END, variables); + List startBounds = getRequestedBound(Bound.START, options); + List endBounds = getRequestedBound(Bound.END, options); assert startBounds.size() == endBounds.size(); // Handles fetching static columns. Note that for 2i, the filter is just used to restrict @@ -516,7 +516,7 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache } else { - SortedSet cellNames = getRequestedColumns(variables); + SortedSet cellNames = getRequestedColumns(options); if (cellNames == null) // in case of IN () for the last column of the key return null; QueryProcessor.validateCellNames(cellNames, cfm.comparator); @@ -534,12 +534,12 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache return new SliceQueryFilter(slices, isReversed, limit, toGroup); } - private int getLimit(List variables) throws InvalidRequestException + private int getLimit(QueryOptions options) throws InvalidRequestException { int l = Integer.MAX_VALUE; if (limit != null) { - ByteBuffer b = limit.bindAndGet(variables); + ByteBuffer b = limit.bindAndGet(options); if (b == null) throw new InvalidRequestException("Invalid null value of limit"); @@ -569,7 +569,7 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache : limit; } - private Collection getKeys(final List variables) throws InvalidRequestException + private Collection getKeys(final QueryOptions options) throws InvalidRequestException { List keys = new ArrayList(); CBuilder builder = cfm.getKeyValidatorAsCType().builder(); @@ -578,7 +578,7 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache Restriction r = keyRestrictions[def.position()]; assert r != null && !r.isSlice(); - List values = r.values(variables); + List values = r.values(options); if (builder.remainingCount() == 1) { @@ -603,7 +603,7 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache return keys; } - private ByteBuffer getKeyBound(Bound b, List variables) throws InvalidRequestException + private ByteBuffer getKeyBound(Bound b, QueryOptions options) throws InvalidRequestException { // Deal with unrestricted partition key components (special-casing is required to deal with 2i queries on the first // component of a composite partition key). @@ -612,10 +612,10 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache return ByteBufferUtil.EMPTY_BYTE_BUFFER; // We deal with IN queries for keys in other places, so we know buildBound will return only one result - return buildBound(b, cfm.partitionKeyColumns(), keyRestrictions, false, cfm.getKeyValidatorAsCType(), variables).get(0).toByteBuffer(); + return buildBound(b, cfm.partitionKeyColumns(), keyRestrictions, false, cfm.getKeyValidatorAsCType(), options).get(0).toByteBuffer(); } - private Token getTokenBound(Bound b, List variables, IPartitioner p) throws InvalidRequestException + private Token getTokenBound(Bound b, QueryOptions options, IPartitioner p) throws InvalidRequestException { assert onToken; @@ -623,7 +623,7 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache ByteBuffer value; if (keyRestriction.isEQ()) { - value = keyRestriction.values(variables).get(0); + value = keyRestriction.values(options).get(0); } else { @@ -631,7 +631,7 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache if (!slice.hasBound(b)) return p.getMinimumToken(); - value = slice.bound(b, variables); + value = slice.bound(b, options); } if (value == null) @@ -669,7 +669,7 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache return false; } - private SortedSet getRequestedColumns(List variables) throws InvalidRequestException + private SortedSet getRequestedColumns(QueryOptions options) throws InvalidRequestException { // Note: getRequestedColumns don't handle static columns, but due to CASSANDRA-5762 // we always do a slice for CQL3 tables, so it's ok to ignore them here @@ -682,7 +682,7 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache ColumnDefinition def = idIter.next(); assert r != null && !r.isSlice(); - List values = r.values(variables); + List values = r.values(options); if (values.size() == 1) { ByteBuffer val = values.get(0); @@ -772,7 +772,7 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache Restriction[] restrictions, boolean isReversed, CType type, - List variables) throws InvalidRequestException + QueryOptions options) throws InvalidRequestException { CBuilder builder = type.builder(); @@ -801,7 +801,7 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache if (r.isSlice()) { - builder.add(getSliceValue(def, r, b, variables)); + builder.add(getSliceValue(def, r, b, options)); Relation.Type relType = ((Restriction.Slice)r).getRelation(eocBound, b); // We can have more non null restriction if the "scalar" notation was used for the bound (#4851). @@ -813,13 +813,13 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache if (isNullRestriction(r, b)) break; - builder.add(getSliceValue(def, r, b, variables)); + builder.add(getSliceValue(def, r, b, options)); } return Collections.singletonList(builder.build().withEOC(eocForRelation(relType))); } else { - List values = r.values(variables); + List values = r.values(options); if (values.size() != 1) { // IN query, we only support it on the clustering column @@ -878,23 +878,23 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache return r == null || (r.isSlice() && !((Restriction.Slice)r).hasBound(b)); } - private static ByteBuffer getSliceValue(ColumnDefinition def, Restriction r, Bound b, List variables) throws InvalidRequestException + private static ByteBuffer getSliceValue(ColumnDefinition def, Restriction r, Bound b, QueryOptions options) throws InvalidRequestException { Restriction.Slice slice = (Restriction.Slice)r; assert slice.hasBound(b); - ByteBuffer val = slice.bound(b, variables); + ByteBuffer val = slice.bound(b, options); if (val == null) throw new InvalidRequestException(String.format("Invalid null clustering key part %s", def.name)); return val; } - private List getRequestedBound(Bound b, List variables) throws InvalidRequestException + private List getRequestedBound(Bound b, QueryOptions options) throws InvalidRequestException { assert isColumnRange(); - return buildBound(b, cfm.clusteringColumns(), columnRestrictions, isReversed, cfm.comparator, variables); + return buildBound(b, cfm.clusteringColumns(), columnRestrictions, isReversed, cfm.comparator, options); } - public List getIndexExpressions(List variables) throws InvalidRequestException + public List getIndexExpressions(QueryOptions options) throws InvalidRequestException { if (!usesSecondaryIndexing || restrictedColumns.isEmpty()) return Collections.emptyList(); @@ -927,7 +927,7 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache { if (slice.hasBound(b)) { - ByteBuffer value = validateIndexedValue(def, slice.bound(b, variables)); + ByteBuffer value = validateIndexedValue(def, slice.bound(b, options)); expressions.add(new IndexExpression(def.name.bytes, slice.getIndexOperator(b), value)); } } @@ -935,12 +935,12 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache else if (restriction.isContains()) { Restriction.Contains contains = (Restriction.Contains)restriction; - for (ByteBuffer value : contains.values(variables)) + for (ByteBuffer value : contains.values(options)) { validateIndexedValue(def, value); expressions.add(new IndexExpression(def.name.bytes, IndexExpression.Operator.CONTAINS, value)); } - for (ByteBuffer key : contains.keys(variables)) + for (ByteBuffer key : contains.keys(options)) { validateIndexedValue(def, key); expressions.add(new IndexExpression(def.name.bytes, IndexExpression.Operator.CONTAINS_KEY, key)); @@ -948,7 +948,7 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache } else { - List values = restriction.values(variables); + List values = restriction.values(options); if (values.size() != 1) throw new InvalidRequestException("IN restrictions are not supported on indexed columns"); @@ -969,13 +969,13 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache return value; } - private Iterator applySliceRestriction(final Iterator cells, final List variables) throws InvalidRequestException + private Iterator applySliceRestriction(final Iterator cells, final QueryOptions options) throws InvalidRequestException { assert sliceRestriction != null; final CellNameType type = cfm.comparator; - final CellName excludedStart = sliceRestriction.isInclusive(Bound.START) ? null : type.makeCellName(sliceRestriction.bound(Bound.START, variables)); - final CellName excludedEnd = sliceRestriction.isInclusive(Bound.END) ? null : type.makeCellName(sliceRestriction.bound(Bound.END, variables)); + final CellName excludedStart = sliceRestriction.isInclusive(Bound.START) ? null : type.makeCellName(sliceRestriction.bound(Bound.START, options)); + final CellName excludedEnd = sliceRestriction.isInclusive(Bound.END) ? null : type.makeCellName(sliceRestriction.bound(Bound.END, options)); return new AbstractIterator() { @@ -998,7 +998,7 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache }; } - private ResultSet process(List rows, List variables, int limit, long now) throws InvalidRequestException + private ResultSet process(List rows, QueryOptions options, int limit, long now) throws InvalidRequestException { Selection.ResultSetBuilder result = selection.resultSetBuilder(now); for (org.apache.cassandra.db.Row row : rows) @@ -1007,12 +1007,12 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache if (row.cf == null) continue; - processColumnFamily(row.key.getKey(), row.cf, variables, now, result); + processColumnFamily(row.key.getKey(), row.cf, options, now, result); } ResultSet cqlRows = result.build(); - orderResults(cqlRows, variables); + orderResults(cqlRows); // Internal calls always return columns in the comparator order, even when reverse was set if (isReversed) @@ -1024,7 +1024,7 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache } // Used by ModificationStatement for CAS operations - void processColumnFamily(ByteBuffer key, ColumnFamily cf, List variables, long now, Selection.ResultSetBuilder result) + void processColumnFamily(ByteBuffer key, ColumnFamily cf, QueryOptions options, long now, Selection.ResultSetBuilder result) throws InvalidRequestException { CFMetaData cfm = cf.metadata(); @@ -1040,7 +1040,7 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache Iterator cells = cf.getSortedColumns().iterator(); if (sliceRestriction != null) - cells = applySliceRestriction(cells, variables); + cells = applySliceRestriction(cells, options); CQL3Row.RowIterator iter = cfm.comparator.CQL3RowBuilder(cfm, now).group(cells); @@ -1059,7 +1059,7 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache result.add(keyComponents[def.position()]); break; case STATIC: - addValue(result, def, staticRow); + addValue(result, def, staticRow, options); break; default: result.add((ByteBuffer)null); @@ -1089,17 +1089,17 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache result.add(cql3Row.getColumn(null)); break; case REGULAR: - addValue(result, def, cql3Row); + addValue(result, def, cql3Row, options); break; case STATIC: - addValue(result, def, staticRow); + addValue(result, def, staticRow, options); break; } } } } - private static void addValue(Selection.ResultSetBuilder result, ColumnDefinition def, CQL3Row row) + private static void addValue(Selection.ResultSetBuilder result, ColumnDefinition def, CQL3Row row, QueryOptions options) { if (row == null) { @@ -1112,7 +1112,7 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache List collection = row.getCollection(def.name); ByteBuffer value = collection == null ? null - : ((CollectionType)def.type).serialize(collection); + : ((CollectionType)def.type).serializeForNativeProtocol(collection, options.getProtocolVersion()); result.add(value); return; } @@ -1137,7 +1137,7 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache /** * Orders results when multiple keys are selected (using IN) */ - private void orderResults(ResultSet cqlRows, List variables) throws InvalidRequestException + private void orderResults(ResultSet cqlRows) throws InvalidRequestException { if (cqlRows.size() == 0 || !needsPostQueryOrdering()) return; http://git-wip-us.apache.org/repos/asf/cassandra/blob/9872b74e/src/java/org/apache/cassandra/db/DefsTables.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/DefsTables.java b/src/java/org/apache/cassandra/db/DefsTables.java index 6f9a270..3902e05 100644 --- a/src/java/org/apache/cassandra/db/DefsTables.java +++ b/src/java/org/apache/cassandra/db/DefsTables.java @@ -362,7 +362,7 @@ public class DefsTables dropType(type); for (MapDifference.ValueDifference tdiff : typesDiff.entriesDiffering().values()) - addType(tdiff.rightValue()); // use the most recent value + updateType(tdiff.rightValue()); // use the most recent value } } } @@ -412,7 +412,7 @@ public class DefsTables ksm.userTypes.addType(ut); if (!StorageService.instance.isClientMode()) - MigrationManager.instance.notifyUpdateKeyspace(ksm); + MigrationManager.instance.notifyCreateUserType(ut); } private static void updateKeyspace(KSMetaData newState) @@ -444,6 +444,19 @@ public class DefsTables } } + private static void updateType(UserType ut) + { + KSMetaData ksm = Schema.instance.getKSMetaData(ut.keyspace); + assert ksm != null; + + logger.info("Updating {}", ut); + + ksm.userTypes.addType(ut); + + if (!StorageService.instance.isClientMode()) + MigrationManager.instance.notifyUpdateUserType(ut); + } + private static void dropKeyspace(String ksName) { KSMetaData ksm = Schema.instance.getKSMetaData(ksName); @@ -515,7 +528,7 @@ public class DefsTables ksm.userTypes.removeType(ut); if (!StorageService.instance.isClientMode()) - MigrationManager.instance.notifyUpdateKeyspace(ksm); + MigrationManager.instance.notifyUpdateUserType(ut); } private static KSMetaData makeNewKeyspaceDefinition(KSMetaData ksm, CFMetaData toExclude) http://git-wip-us.apache.org/repos/asf/cassandra/blob/9872b74e/src/java/org/apache/cassandra/db/marshal/CollectionType.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/marshal/CollectionType.java b/src/java/org/apache/cassandra/db/marshal/CollectionType.java index 5db4ba0..7f75a5f 100644 --- a/src/java/org/apache/cassandra/db/marshal/CollectionType.java +++ b/src/java/org/apache/cassandra/db/marshal/CollectionType.java @@ -25,6 +25,7 @@ import org.slf4j.LoggerFactory; import org.apache.cassandra.cql3.CQL3Type; import org.apache.cassandra.db.Cell; +import org.apache.cassandra.serializers.CollectionSerializer; import org.apache.cassandra.serializers.MarshalException; import org.apache.cassandra.utils.ByteBufferUtil; @@ -57,7 +58,7 @@ public abstract class CollectionType extends AbstractType protected abstract void appendToStringBuilder(StringBuilder sb); - public abstract ByteBuffer serialize(List cells); + public abstract List serializedValues(List cells); @Override public String toString() @@ -110,22 +111,9 @@ public abstract class CollectionType extends AbstractType return true; } - // Utilitary method - protected static ByteBuffer pack(List buffers, int elements, int size) + protected List enforceLimit(List cells, int version) { - ByteBuffer result = ByteBuffer.allocate(2 + size); - result.putShort((short)elements); - for (ByteBuffer bb : buffers) - { - result.putShort((short)bb.remaining()); - result.put(bb.duplicate()); - } - return (ByteBuffer)result.flip(); - } - - protected List enforceLimit(List cells) - { - if (cells.size() <= MAX_ELEMENTS) + if (version >= 3 || cells.size() <= MAX_ELEMENTS) return cells; logger.error("Detected collection with {} elements, more than the {} limit. Only the first {} elements will be returned to the client. " @@ -133,12 +121,11 @@ public abstract class CollectionType extends AbstractType return cells.subList(0, MAX_ELEMENTS); } - public static ByteBuffer pack(List buffers, int elements) + public ByteBuffer serializeForNativeProtocol(List cells, int version) { - int size = 0; - for (ByteBuffer bb : buffers) - size += 2 + bb.remaining(); - return pack(buffers, elements, size); + cells = enforceLimit(cells, version); + List values = serializedValues(cells); + return CollectionSerializer.pack(values, cells.size(), version); } public CQL3Type asCQL3Type() http://git-wip-us.apache.org/repos/asf/cassandra/blob/9872b74e/src/java/org/apache/cassandra/db/marshal/ListType.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/marshal/ListType.java b/src/java/org/apache/cassandra/db/marshal/ListType.java index 43ace65..6e6821b 100644 --- a/src/java/org/apache/cassandra/db/marshal/ListType.java +++ b/src/java/org/apache/cassandra/db/marshal/ListType.java @@ -72,7 +72,7 @@ public class ListType extends CollectionType> return elements; } - public TypeSerializer> getSerializer() + public ListSerializer getSerializer() { return serializer; } @@ -112,17 +112,11 @@ public class ListType extends CollectionType> sb.append(getClass().getName()).append(TypeParser.stringifyTypeParameters(Collections.>singletonList(elements))); } - public ByteBuffer serialize(List cells) + public List serializedValues(List cells) { - cells = enforceLimit(cells); - List bbs = new ArrayList(cells.size()); - int size = 0; for (Cell c : cells) - { bbs.add(c.value()); - size += 2 + c.value().remaining(); - } - return pack(bbs, cells.size(), size); + return bbs; } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/9872b74e/src/java/org/apache/cassandra/db/marshal/MapType.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/marshal/MapType.java b/src/java/org/apache/cassandra/db/marshal/MapType.java index 213e213..71023a7 100644 --- a/src/java/org/apache/cassandra/db/marshal/MapType.java +++ b/src/java/org/apache/cassandra/db/marshal/MapType.java @@ -108,7 +108,7 @@ public class MapType extends CollectionType> } @Override - public TypeSerializer> getSerializer() + public MapSerializer getSerializer() { return serializer; } @@ -123,23 +123,14 @@ public class MapType extends CollectionType> sb.append(getClass().getName()).append(TypeParser.stringifyTypeParameters(Arrays.asList(keys, values))); } - /** - * Creates the same output than serialize, but from the internal representation. - */ - public ByteBuffer serialize(List cells) + public List serializedValues(List cells) { - cells = enforceLimit(cells); - - List bbs = new ArrayList(2 * cells.size()); - int size = 0; + List bbs = new ArrayList(cells.size() * 2); for (Cell c : cells) { - ByteBuffer key = c.name().collectionElement(); - ByteBuffer value = c.value(); - bbs.add(key); - bbs.add(value); - size += 4 + key.remaining() + value.remaining(); + bbs.add(c.name().collectionElement()); + bbs.add(c.value()); } - return pack(bbs, cells.size(), size); + return bbs; } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/9872b74e/src/java/org/apache/cassandra/db/marshal/SetType.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/marshal/SetType.java b/src/java/org/apache/cassandra/db/marshal/SetType.java index 3b686b8..d2f7f12 100644 --- a/src/java/org/apache/cassandra/db/marshal/SetType.java +++ b/src/java/org/apache/cassandra/db/marshal/SetType.java @@ -77,7 +77,7 @@ public class SetType extends CollectionType> return ListType.compareListOrSet(elements, o1, o2); } - public TypeSerializer> getSerializer() + public SetSerializer getSerializer() { return serializer; } @@ -92,18 +92,11 @@ public class SetType extends CollectionType> sb.append(getClass().getName()).append(TypeParser.stringifyTypeParameters(Collections.>singletonList(elements))); } - public ByteBuffer serialize(List cells) + public List serializedValues(List cells) { - cells = enforceLimit(cells); - List bbs = new ArrayList(cells.size()); - int size = 0; for (Cell c : cells) - { - ByteBuffer key = c.name().collectionElement(); - bbs.add(key); - size += 2 + key.remaining(); - } - return pack(bbs, cells.size(), size); + bbs.add(c.name().collectionElement()); + return bbs; } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/9872b74e/src/java/org/apache/cassandra/db/marshal/UserType.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/marshal/UserType.java b/src/java/org/apache/cassandra/db/marshal/UserType.java index eb95fb9..973a5be 100644 --- a/src/java/org/apache/cassandra/db/marshal/UserType.java +++ b/src/java/org/apache/cassandra/db/marshal/UserType.java @@ -64,6 +64,11 @@ public class UserType extends CompositeType return new UserType(keyspace, name, columnNames, columnTypes); } + public String getNameAsString() + { + return UTF8Type.instance.compose(name); + } + @Override public final int hashCode() { http://git-wip-us.apache.org/repos/asf/cassandra/blob/9872b74e/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java b/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java index af88853..9e3abcf 100644 --- a/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java +++ b/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java @@ -35,6 +35,7 @@ import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.config.ColumnDefinition; import org.apache.cassandra.db.marshal.*; import org.apache.cassandra.db.marshal.AbstractCompositeType.CompositeComponent; +import org.apache.cassandra.serializers.CollectionSerializer; import org.apache.cassandra.hadoop.*; import org.apache.cassandra.thrift.*; import org.apache.cassandra.utils.ByteBufferUtil; @@ -431,8 +432,10 @@ public abstract class AbstractCassandraStorage extends LoadFunc implements Store { ByteBuffer buffer = objToBB(sub); serialized.add(buffer); - } - return CollectionType.pack(serialized, objects.size()); + } + // NOTE: using protocol v1 serialization format for collections so as to not break + // compatibility. Not sure if that's the right thing. + return CollectionSerializer.pack(serialized, objects.size(), 1); } private ByteBuffer objToMapBB(List objects) @@ -447,7 +450,9 @@ public abstract class AbstractCassandraStorage extends LoadFunc implements Store serialized.add(buffer); } } - return CollectionType.pack(serialized, objects.size()); + // NOTE: using protocol v1 serialization format for collections so as to not break + // compatibility. Not sure if that's the right thing. + return CollectionSerializer.pack(serialized, objects.size(), 1); } private ByteBuffer objToCompositeBB(List objects) http://git-wip-us.apache.org/repos/asf/cassandra/blob/9872b74e/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java b/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java index 9b7a8e7..6993b19 100644 --- a/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java +++ b/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java @@ -195,14 +195,15 @@ public class CQLSSTableWriter if (values.size() != boundNames.size()) throw new InvalidRequestException(String.format("Invalid number of arguments, expecting %d values but got %d", boundNames.size(), values.size())); - List keys = insert.buildPartitionKeyNames(values); - Composite clusteringPrefix = insert.createClusteringPrefix(values); + QueryOptions options = QueryOptions.forInternalCalls(null, values); + List keys = insert.buildPartitionKeyNames(options); + Composite clusteringPrefix = insert.createClusteringPrefix(options); long now = System.currentTimeMillis() * 1000; UpdateParameters params = new UpdateParameters(insert.cfm, - values, - insert.getTimestamp(now, values), - insert.getTimeToLive(values), + options, + insert.getTimestamp(now, options), + insert.getTimeToLive(options), Collections.emptyMap()); for (ByteBuffer key: keys) http://git-wip-us.apache.org/repos/asf/cassandra/blob/9872b74e/src/java/org/apache/cassandra/serializers/CollectionSerializer.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/serializers/CollectionSerializer.java b/src/java/org/apache/cassandra/serializers/CollectionSerializer.java index 83a391d..0e16fda 100644 --- a/src/java/org/apache/cassandra/serializers/CollectionSerializer.java +++ b/src/java/org/apache/cassandra/serializers/CollectionSerializer.java @@ -21,6 +21,8 @@ package org.apache.cassandra.serializers; import java.nio.ByteBuffer; import java.util.List; +import org.apache.cassandra.utils.ByteBufferUtil; + public abstract class CollectionSerializer implements TypeSerializer { public void validate(ByteBuffer bytes) throws MarshalException @@ -28,24 +30,104 @@ public abstract class CollectionSerializer implements TypeSerializer // The collection is not currently being properly validated. } - // Utilitary method - protected static ByteBuffer pack(List buffers, int elements, int size) + protected abstract List serializeValues(T value); + protected abstract int getElementCount(T value); + + public abstract T deserializeForNativeProtocol(ByteBuffer buffer, int version); + + public ByteBuffer serialize(T value) + { + List values = serializeValues(value); + // The only case we serialize/deserialize collections internally (i.e. not for the protocol sake), + // is when collections are in UDT values. There, we use the protocol 3 version since it's more flexible. + return pack(values, getElementCount(value), 3); + } + + public T deserialize(ByteBuffer bytes) { - ByteBuffer result = ByteBuffer.allocate(2 + size); - result.putShort((short)elements); + // The only case we serialize/deserialize collections internally (i.e. not for the protocol sake), + // is when collections are in UDT values. There, we use the protocol 3 version since it's more flexible. + return deserializeForNativeProtocol(bytes, 3); + } + + public static ByteBuffer pack(List buffers, int elements, int version) + { + int size = 0; + for (ByteBuffer bb : buffers) + size += sizeOfValue(bb, version); + + ByteBuffer result = ByteBuffer.allocate(sizeOfCollectionSize(elements, version) + size); + writeCollectionSize(result, elements, version); for (ByteBuffer bb : buffers) + writeValue(result, bb, version); + return (ByteBuffer)result.flip(); + } + + protected static void writeCollectionSize(ByteBuffer output, int elements, int version) + { + if (version >= 3) + output.putInt(elements); + else + output.putShort((short)elements); + } + + protected static int readCollectionSize(ByteBuffer input, int version) + { + return version >= 3 ? input.getInt() : ByteBufferUtil.readShortLength(input); + } + + protected static int sizeOfCollectionSize(int elements, int version) + { + return version >= 3 ? 4 : 2; + } + + protected static void writeValue(ByteBuffer output, ByteBuffer value, int version) + { + if (version >= 3) { - result.putShort((short)bb.remaining()); - result.put(bb.duplicate()); + if (value == null) + { + output.putInt(-1); + return; + } + + output.putInt(value.remaining()); + output.put(value.duplicate()); + } + else + { + assert value != null; + output.putShort((short)value.remaining()); + output.put(value.duplicate()); } - return (ByteBuffer)result.flip(); } - public static ByteBuffer pack(List buffers, int elements) + protected static ByteBuffer readValue(ByteBuffer input, int version) { - int size = 0; - for (ByteBuffer bb : buffers) - size += 2 + bb.remaining(); - return pack(buffers, elements, size); + if (version >= 3) + { + int size = input.getInt(); + if (size < 0) + return null; + + return ByteBufferUtil.readBytes(input, size); + } + else + { + return ByteBufferUtil.readBytesWithShortLength(input); + } + } + + protected static int sizeOfValue(ByteBuffer value, int version) + { + if (version >= 3) + { + return value == null ? 4 : 4 + value.remaining(); + } + else + { + assert value != null; + return 2 + value.remaining(); + } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/9872b74e/src/java/org/apache/cassandra/serializers/ListSerializer.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/serializers/ListSerializer.java b/src/java/org/apache/cassandra/serializers/ListSerializer.java index 59f25d2..e662341 100644 --- a/src/java/org/apache/cassandra/serializers/ListSerializer.java +++ b/src/java/org/apache/cassandra/serializers/ListSerializer.java @@ -47,16 +47,29 @@ public class ListSerializer extends CollectionSerializer> this.elements = elements; } - public List deserialize(ByteBuffer bytes) + public List serializeValues(List values) + { + List buffers = new ArrayList<>(values.size()); + for (T value : values) + buffers.add(elements.serialize(value)); + return buffers; + } + + public int getElementCount(List value) + { + return value.size(); + } + + public List deserializeForNativeProtocol(ByteBuffer bytes, int version) { try { ByteBuffer input = bytes.duplicate(); - int n = ByteBufferUtil.readShortLength(input); + int n = readCollectionSize(input, version); List l = new ArrayList(n); for (int i = 0; i < n; i++) { - ByteBuffer databb = ByteBufferUtil.readBytesWithShortLength(input); + ByteBuffer databb = readValue(input, version); elements.validate(databb); l.add(elements.deserialize(databb)); } @@ -68,26 +81,6 @@ public class ListSerializer extends CollectionSerializer> } } - /** - * Layout is: {@code ... } - * where: - * n is the number of elements - * s_i is the number of bytes composing the ith element - * b_i is the s_i bytes composing the ith element - */ - public ByteBuffer serialize(List value) - { - List bbs = new ArrayList(value.size()); - int size = 0; - for (T elt : value) - { - ByteBuffer bb = elements.serialize(elt); - bbs.add(bb); - size += 2 + bb.remaining(); - } - return pack(bbs, value.size(), size); - } - public String toString(List value) { StringBuilder sb = new StringBuilder(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/9872b74e/src/java/org/apache/cassandra/serializers/MapSerializer.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/serializers/MapSerializer.java b/src/java/org/apache/cassandra/serializers/MapSerializer.java index f79d07f..5d349dd 100644 --- a/src/java/org/apache/cassandra/serializers/MapSerializer.java +++ b/src/java/org/apache/cassandra/serializers/MapSerializer.java @@ -51,19 +51,35 @@ public class MapSerializer extends CollectionSerializer> this.values = values; } - public Map deserialize(ByteBuffer bytes) + public List serializeValues(Map map) + { + List buffers = new ArrayList<>(map.size() * 2); + for (Map.Entry entry : map.entrySet()) + { + buffers.add(keys.serialize(entry.getKey())); + buffers.add(values.serialize(entry.getValue())); + } + return buffers; + } + + public int getElementCount(Map value) + { + return value.size(); + } + + public Map deserializeForNativeProtocol(ByteBuffer bytes, int version) { try { ByteBuffer input = bytes.duplicate(); - int n = ByteBufferUtil.readShortLength(input); + int n = readCollectionSize(input, version); Map m = new LinkedHashMap(n); for (int i = 0; i < n; i++) { - ByteBuffer kbb = ByteBufferUtil.readBytesWithShortLength(input); + ByteBuffer kbb = readValue(input, version); keys.validate(kbb); - ByteBuffer vbb = ByteBufferUtil.readBytesWithShortLength(input); + ByteBuffer vbb = readValue(input, version); values.validate(vbb); m.put(keys.deserialize(kbb), values.deserialize(vbb)); @@ -76,30 +92,6 @@ public class MapSerializer extends CollectionSerializer> } } - /** - * Layout is: {@code ... } - * where: - * n is the number of elements - * sk_i is the number of bytes composing the ith key k_i - * k_i is the sk_i bytes composing the ith key - * sv_i is the number of bytes composing the ith value v_i - * v_i is the sv_i bytes composing the ith value - */ - public ByteBuffer serialize(Map value) - { - List bbs = new ArrayList(2 * value.size()); - int size = 0; - for (Map.Entry entry : value.entrySet()) - { - ByteBuffer bbk = keys.serialize(entry.getKey()); - ByteBuffer bbv = values.serialize(entry.getValue()); - bbs.add(bbk); - bbs.add(bbv); - size += 4 + bbk.remaining() + bbv.remaining(); - } - return pack(bbs, value.size(), size); - } - public String toString(Map value) { StringBuilder sb = new StringBuilder(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/9872b74e/src/java/org/apache/cassandra/serializers/SetSerializer.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/serializers/SetSerializer.java b/src/java/org/apache/cassandra/serializers/SetSerializer.java index d6d7062..812dd68 100644 --- a/src/java/org/apache/cassandra/serializers/SetSerializer.java +++ b/src/java/org/apache/cassandra/serializers/SetSerializer.java @@ -47,16 +47,29 @@ public class SetSerializer extends CollectionSerializer> this.elements = elements; } - public Set deserialize(ByteBuffer bytes) + public List serializeValues(Set values) + { + List buffers = new ArrayList<>(values.size()); + for (T value : values) + buffers.add(elements.serialize(value)); + return buffers; + } + + public int getElementCount(Set value) + { + return value.size(); + } + + public Set deserializeForNativeProtocol(ByteBuffer bytes, int version) { try { ByteBuffer input = bytes.duplicate(); - int n = ByteBufferUtil.readShortLength(input); + int n = readCollectionSize(input, version); Set l = new LinkedHashSet(n); for (int i = 0; i < n; i++) { - ByteBuffer databb = ByteBufferUtil.readBytesWithShortLength(input); + ByteBuffer databb = readValue(input, version); elements.validate(databb); l.add(elements.deserialize(databb)); } @@ -68,26 +81,6 @@ public class SetSerializer extends CollectionSerializer> } } - /** - * Layout is: {@code ... } - * where: - * n is the number of elements - * s_i is the number of bytes composing the ith element - * b_i is the s_i bytes composing the ith element - */ - public ByteBuffer serialize(Set value) - { - List bbs = new ArrayList(value.size()); - int size = 0; - for (T elt : value) - { - ByteBuffer bb = elements.serialize(elt); - bbs.add(bb); - size += 2 + bb.remaining(); - } - return pack(bbs, value.size(), size); - } - public String toString(Set value) { StringBuilder sb = new StringBuilder(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/9872b74e/src/java/org/apache/cassandra/service/IMigrationListener.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/IMigrationListener.java b/src/java/org/apache/cassandra/service/IMigrationListener.java index e16ac62..4d142bd 100644 --- a/src/java/org/apache/cassandra/service/IMigrationListener.java +++ b/src/java/org/apache/cassandra/service/IMigrationListener.java @@ -21,10 +21,13 @@ public interface IMigrationListener { public void onCreateKeyspace(String ksName); public void onCreateColumnFamily(String ksName, String cfName); + public void onCreateUserType(String ksName, String typeName); public void onUpdateKeyspace(String ksName); public void onUpdateColumnFamily(String ksName, String cfName); + public void onUpdateUserType(String ksName, String typeName); public void onDropKeyspace(String ksName); public void onDropColumnFamily(String ksName, String cfName); + public void onDropUserType(String ksName, String typeName); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/9872b74e/src/java/org/apache/cassandra/service/MigrationManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/MigrationManager.java b/src/java/org/apache/cassandra/service/MigrationManager.java index 7eb7282..ec46d3f 100644 --- a/src/java/org/apache/cassandra/service/MigrationManager.java +++ b/src/java/org/apache/cassandra/service/MigrationManager.java @@ -167,6 +167,12 @@ public class MigrationManager listener.onCreateColumnFamily(cfm.ksName, cfm.cfName); } + public void notifyCreateUserType(UserType ut) + { + for (IMigrationListener listener : listeners) + listener.onCreateUserType(ut.keyspace, ut.getNameAsString()); + } + public void notifyUpdateKeyspace(KSMetaData ksm) { for (IMigrationListener listener : listeners) @@ -179,6 +185,12 @@ public class MigrationManager listener.onUpdateColumnFamily(cfm.ksName, cfm.cfName); } + public void notifyUpdateUserType(UserType ut) + { + for (IMigrationListener listener : listeners) + listener.onUpdateUserType(ut.keyspace, ut.getNameAsString()); + } + public void notifyDropKeyspace(KSMetaData ksm) { for (IMigrationListener listener : listeners) @@ -191,6 +203,12 @@ public class MigrationManager listener.onDropColumnFamily(cfm.ksName, cfm.cfName); } + public void notifyDropUserType(UserType ut) + { + for (IMigrationListener listener : listeners) + listener.onDropUserType(ut.keyspace, ut.getNameAsString()); + } + public static void announceNewKeyspace(KSMetaData ksm) throws ConfigurationException { announceNewKeyspace(ksm, FBUtilities.timestampMicros()); http://git-wip-us.apache.org/repos/asf/cassandra/blob/9872b74e/src/java/org/apache/cassandra/thrift/CassandraServer.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/thrift/CassandraServer.java b/src/java/org/apache/cassandra/thrift/CassandraServer.java index c4abe0b..3040aaf 100644 --- a/src/java/org/apache/cassandra/thrift/CassandraServer.java +++ b/src/java/org/apache/cassandra/thrift/CassandraServer.java @@ -1969,7 +1969,7 @@ public class CassandraServer implements Cassandra.Iface } ThriftClientState cState = state(); - return cState.getCQLQueryHandler().process(queryString, cState.getQueryState(), new QueryOptions(ThriftConversion.fromThrift(cLevel), Collections.emptyList())).toThriftResult(); + return cState.getCQLQueryHandler().process(queryString, cState.getQueryState(), QueryOptions.fromProtocolV2(ThriftConversion.fromThrift(cLevel), Collections.emptyList())).toThriftResult(); } catch (RequestExecutionException e) { @@ -2100,7 +2100,7 @@ public class CassandraServer implements Cassandra.Iface return cState.getCQLQueryHandler().processPrepared(statement, cState.getQueryState(), - new QueryOptions(ThriftConversion.fromThrift(cLevel), bindVariables)).toThriftResult(); + QueryOptions.fromProtocolV2(ThriftConversion.fromThrift(cLevel), bindVariables)).toThriftResult(); } catch (RequestExecutionException e) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/9872b74e/src/java/org/apache/cassandra/transport/CBUtil.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/CBUtil.java b/src/java/org/apache/cassandra/transport/CBUtil.java index e5222a1..36a7e71 100644 --- a/src/java/org/apache/cassandra/transport/CBUtil.java +++ b/src/java/org/apache/cassandra/transport/CBUtil.java @@ -36,6 +36,7 @@ import io.netty.util.CharsetUtil; import org.apache.cassandra.db.ConsistencyLevel; import org.apache.cassandra.db.TypeSizes; +import org.apache.cassandra.utils.Pair; import org.apache.cassandra.utils.UUIDGen; /** @@ -363,6 +364,22 @@ public abstract class CBUtil return size; } + public static Pair, List> readNameAndValueList(ByteBuf cb) + { + int size = cb.readUnsignedShort(); + if (size == 0) + return Pair.create(Collections.emptyList(), Collections.emptyList()); + + List s = new ArrayList<>(size); + List l = new ArrayList<>(size); + for (int i = 0; i < size; i++) + { + s.add(readString(cb)); + l.add(readValue(cb)); + } + return Pair.create(s, l); + } + public static InetSocketAddress readInet(ByteBuf cb) { int addrSize = cb.readByte(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/9872b74e/src/java/org/apache/cassandra/transport/Client.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/Client.java b/src/java/org/apache/cassandra/transport/Client.java index 4a50bde..989b954 100644 --- a/src/java/org/apache/cassandra/transport/Client.java +++ b/src/java/org/apache/cassandra/transport/Client.java @@ -128,7 +128,7 @@ public class Client extends SimpleClient return null; } } - return new QueryMessage(query, new QueryOptions(ConsistencyLevel.ONE, Collections.emptyList(), false, pageSize, null, null)); + return new QueryMessage(query, QueryOptions.create(ConsistencyLevel.ONE, Collections.emptyList(), false, pageSize, null, null)); } else if (msgType.equals("PREPARE")) { @@ -156,7 +156,7 @@ public class Client extends SimpleClient } values.add(bb); } - return new ExecuteMessage(MD5Digest.wrap(id), new QueryOptions(ConsistencyLevel.ONE, values)); + return new ExecuteMessage(MD5Digest.wrap(id), QueryOptions.forInternalCalls(ConsistencyLevel.ONE, values)); } catch (Exception e) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/9872b74e/src/java/org/apache/cassandra/transport/DataType.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/DataType.java b/src/java/org/apache/cassandra/transport/DataType.java index f0b5d95..3cff973 100644 --- a/src/java/org/apache/cassandra/transport/DataType.java +++ b/src/java/org/apache/cassandra/transport/DataType.java @@ -17,7 +17,7 @@ */ package org.apache.cassandra.transport; -import java.nio.charset.StandardCharsets; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; @@ -51,7 +51,9 @@ public enum DataType implements OptionCodec.Codecable INET (16, InetAddressType.instance), LIST (32, null), MAP (33, null), - SET (34, null); + SET (34, null), + UDT (48, null); + public static final OptionCodec codec = new OptionCodec(DataType.class); @@ -78,27 +80,39 @@ public enum DataType implements OptionCodec.Codecable return id; } - public Object readValue(ByteBuf cb) + public Object readValue(ByteBuf cb, int version) { switch (this) { case CUSTOM: return CBUtil.readString(cb); case LIST: - return DataType.toType(codec.decodeOne(cb)); + return DataType.toType(codec.decodeOne(cb, version)); case SET: - return DataType.toType(codec.decodeOne(cb)); + return DataType.toType(codec.decodeOne(cb, version)); case MAP: List l = new ArrayList(2); - l.add(DataType.toType(codec.decodeOne(cb))); - l.add(DataType.toType(codec.decodeOne(cb))); + l.add(DataType.toType(codec.decodeOne(cb, version))); + l.add(DataType.toType(codec.decodeOne(cb, version))); return l; + case UDT: + String ks = CBUtil.readString(cb); + ByteBuffer name = UTF8Type.instance.decompose(CBUtil.readString(cb)); + int n = cb.readUnsignedShort(); + List fieldNames = new ArrayList<>(n); + List> fieldTypes = new ArrayList<>(n); + for (int i = 0; i < n; i++) + { + fieldNames.add(UTF8Type.instance.decompose(CBUtil.readString(cb))); + fieldTypes.add(DataType.toType(codec.decodeOne(cb, version))); + } + return new UserType(ks, name, fieldNames, fieldTypes); default: return null; } } - public void writeValue(Object value, ByteBuf cb) + public void writeValue(Object value, ByteBuf cb, int version) { switch (this) { @@ -107,40 +121,63 @@ public enum DataType implements OptionCodec.Codecable CBUtil.writeString((String)value, cb); break; case LIST: - codec.writeOne(DataType.fromType((AbstractType)value), cb); + codec.writeOne(DataType.fromType((AbstractType)value, version), cb, version); break; case SET: - codec.writeOne(DataType.fromType((AbstractType)value), cb); + codec.writeOne(DataType.fromType((AbstractType)value, version), cb, version); break; case MAP: List l = (List)value; - codec.writeOne(DataType.fromType(l.get(0)), cb); - codec.writeOne(DataType.fromType(l.get(1)), cb); + codec.writeOne(DataType.fromType(l.get(0), version), cb, version); + codec.writeOne(DataType.fromType(l.get(1), version), cb, version); + break; + case UDT: + UserType udt = (UserType)value; + CBUtil.writeString(udt.keyspace, cb); + CBUtil.writeString(UTF8Type.instance.compose(udt.name), cb); + cb.writeShort(udt.columnNames.size()); + for (int i = 0; i < udt.columnNames.size(); i++) + { + CBUtil.writeString(UTF8Type.instance.compose(udt.columnNames.get(i)), cb); + codec.writeOne(DataType.fromType(udt.types.get(i), version), cb, version); + } break; } } - public int serializedValueSize(Object value) + public int serializedValueSize(Object value, int version) { switch (this) { case CUSTOM: - return 2 + ((String)value).getBytes(StandardCharsets.UTF_8).length; + return CBUtil.sizeOfString((String)value); case LIST: case SET: - return codec.oneSerializedSize(DataType.fromType((AbstractType)value)); + return codec.oneSerializedSize(DataType.fromType((AbstractType)value, version), version); case MAP: List l = (List)value; int s = 0; - s += codec.oneSerializedSize(DataType.fromType(l.get(0))); - s += codec.oneSerializedSize(DataType.fromType(l.get(1))); + s += codec.oneSerializedSize(DataType.fromType(l.get(0), version), version); + s += codec.oneSerializedSize(DataType.fromType(l.get(1), version), version); return s; + case UDT: + UserType udt = (UserType)value; + int size = 0; + size += CBUtil.sizeOfString(udt.keyspace); + size += CBUtil.sizeOfString(UTF8Type.instance.compose(udt.name)); + size += 2; + for (int i = 0; i < udt.columnNames.size(); i++) + { + size += CBUtil.sizeOfString(UTF8Type.instance.compose(udt.columnNames.get(i))); + size += codec.oneSerializedSize(DataType.fromType(udt.types.get(i), version), version); + } + return size; default: return 0; } } - public static Pair fromType(AbstractType type) + public static Pair fromType(AbstractType type, int version) { // For CQL3 clients, ReversedType is an implementation detail and they // shouldn't have to care about it. @@ -170,6 +207,10 @@ public enum DataType implements OptionCodec.Codecable return Pair.create(SET, ((SetType)type).elements); } } + + if (type instanceof UserType && version >= 3) + return Pair.create(UDT, type); + return Pair.create(CUSTOM, type.toString()); } else @@ -193,6 +234,8 @@ public enum DataType implements OptionCodec.Codecable case MAP: List l = (List)entry.right; return MapType.getInstance(l.get(0), l.get(1)); + case UDT: + return (AbstractType)entry.right; default: return entry.left.type; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/9872b74e/src/java/org/apache/cassandra/transport/Event.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/Event.java b/src/java/org/apache/cassandra/transport/Event.java index 242ad64..7ec026e 100644 --- a/src/java/org/apache/cassandra/transport/Event.java +++ b/src/java/org/apache/cassandra/transport/Event.java @@ -20,6 +20,7 @@ package org.apache.cassandra.transport; import java.net.InetAddress; import java.net.InetSocketAddress; +import com.google.common.base.Objects; import io.netty.buffer.ByteBuf; public abstract class Event @@ -33,33 +34,33 @@ public abstract class Event this.type = type; } - public static Event deserialize(ByteBuf cb) + public static Event deserialize(ByteBuf cb, int version) { switch (CBUtil.readEnumValue(Type.class, cb)) { case TOPOLOGY_CHANGE: - return TopologyChange.deserializeEvent(cb); + return TopologyChange.deserializeEvent(cb, version); case STATUS_CHANGE: - return StatusChange.deserializeEvent(cb); + return StatusChange.deserializeEvent(cb, version); case SCHEMA_CHANGE: - return SchemaChange.deserializeEvent(cb); + return SchemaChange.deserializeEvent(cb, version); } throw new AssertionError(); } - public void serialize(ByteBuf dest) + public void serialize(ByteBuf dest, int version) { CBUtil.writeEnumValue(type, dest); - serializeEvent(dest); + serializeEvent(dest, version); } - public int serializedSize() + public int serializedSize(int version) { - return CBUtil.sizeOfEnumValue(type) + eventSerializedSize(); + return CBUtil.sizeOfEnumValue(type) + eventSerializedSize(version); } - protected abstract void serializeEvent(ByteBuf dest); - protected abstract int eventSerializedSize(); + protected abstract void serializeEvent(ByteBuf dest, int version); + protected abstract int eventSerializedSize(int version); public static class TopologyChange extends Event { @@ -91,20 +92,20 @@ public abstract class Event } // Assumes the type has already been deserialized - private static TopologyChange deserializeEvent(ByteBuf cb) + private static TopologyChange deserializeEvent(ByteBuf cb, int version) { Change change = CBUtil.readEnumValue(Change.class, cb); InetSocketAddress node = CBUtil.readInet(cb); return new TopologyChange(change, node); } - protected void serializeEvent(ByteBuf dest) + protected void serializeEvent(ByteBuf dest, int version) { CBUtil.writeEnumValue(change, dest); CBUtil.writeInet(node, dest); } - protected int eventSerializedSize() + protected int eventSerializedSize(int version) { return CBUtil.sizeOfEnumValue(change) + CBUtil.sizeOfInet(node); } @@ -114,6 +115,23 @@ public abstract class Event { return change + " " + node; } + + @Override + public int hashCode() + { + return Objects.hashCode(change, node); + } + + @Override + public boolean equals(Object other) + { + if (!(other instanceof TopologyChange)) + return false; + + TopologyChange tpc = (TopologyChange)other; + return Objects.equal(change, tpc.change) + && Objects.equal(node, tpc.node); + } } public static class StatusChange extends Event @@ -141,20 +159,20 @@ public abstract class Event } // Assumes the type has already been deserialized - private static StatusChange deserializeEvent(ByteBuf cb) + private static StatusChange deserializeEvent(ByteBuf cb, int version) { Status status = CBUtil.readEnumValue(Status.class, cb); InetSocketAddress node = CBUtil.readInet(cb); return new StatusChange(status, node); } - protected void serializeEvent(ByteBuf dest) + protected void serializeEvent(ByteBuf dest, int version) { CBUtil.writeEnumValue(status, dest); CBUtil.writeInet(node, dest); } - protected int eventSerializedSize() + protected int eventSerializedSize(int version) { return CBUtil.sizeOfEnumValue(status) + CBUtil.sizeOfInet(node); } @@ -164,56 +182,130 @@ public abstract class Event { return status + " " + node; } + + @Override + public int hashCode() + { + return Objects.hashCode(status, node); + } + + @Override + public boolean equals(Object other) + { + if (!(other instanceof StatusChange)) + return false; + + StatusChange stc = (StatusChange)other; + return Objects.equal(status, stc.status) + && Objects.equal(node, stc.node); + } } public static class SchemaChange extends Event { public enum Change { CREATED, UPDATED, DROPPED } + public enum Target { KEYSPACE, TABLE, TYPE } public final Change change; + public final Target target; public final String keyspace; - public final String table; + public final String tableOrType; - public SchemaChange(Change change, String keyspace, String table) + public SchemaChange(Change change, Target target, String keyspace, String tableOrType) { super(Type.SCHEMA_CHANGE); this.change = change; + this.target = target; this.keyspace = keyspace; - this.table = table; + this.tableOrType = tableOrType; } public SchemaChange(Change change, String keyspace) { - this(change, keyspace, ""); + this(change, Target.KEYSPACE, keyspace, null); } // Assumes the type has already been deserialized - private static SchemaChange deserializeEvent(ByteBuf cb) + private static SchemaChange deserializeEvent(ByteBuf cb, int version) { Change change = CBUtil.readEnumValue(Change.class, cb); - String keyspace = CBUtil.readString(cb); - String table = CBUtil.readString(cb); - return new SchemaChange(change, keyspace, table); + if (version >= 3) + { + Target target = CBUtil.readEnumValue(Target.class, cb); + String keyspace = CBUtil.readString(cb); + String tableOrType = target == Target.KEYSPACE ? null : CBUtil.readString(cb); + return new SchemaChange(change, target, keyspace, tableOrType); + } + else + { + String keyspace = CBUtil.readString(cb); + String table = CBUtil.readString(cb); + return new SchemaChange(change, table.isEmpty() ? Target.KEYSPACE : Target.TABLE, keyspace, table.isEmpty() ? null : table); + } } - protected void serializeEvent(ByteBuf dest) + protected void serializeEvent(ByteBuf dest, int version) { - CBUtil.writeEnumValue(change, dest); - CBUtil.writeString(keyspace, dest); - CBUtil.writeString(table, dest); + if (version >= 3) + { + CBUtil.writeEnumValue(change, dest); + CBUtil.writeEnumValue(target, dest); + CBUtil.writeString(keyspace, dest); + if (target != Target.KEYSPACE) + CBUtil.writeString(tableOrType, dest); + } + else + { + CBUtil.writeEnumValue(change, dest); + CBUtil.writeString(keyspace, dest); + CBUtil.writeString(target == Target.KEYSPACE ? "" : tableOrType, dest); + } } - protected int eventSerializedSize() + protected int eventSerializedSize(int version) { - return CBUtil.sizeOfEnumValue(change) - + CBUtil.sizeOfString(keyspace) - + CBUtil.sizeOfString(table); + if (version >= 3) + { + int size = CBUtil.sizeOfEnumValue(change) + + CBUtil.sizeOfEnumValue(target) + + CBUtil.sizeOfString(keyspace); + + if (target != Target.KEYSPACE) + size += CBUtil.sizeOfString(tableOrType); + + return size; + } + else + { + return CBUtil.sizeOfEnumValue(change) + + CBUtil.sizeOfString(keyspace) + + CBUtil.sizeOfString(target == Target.KEYSPACE ? "" : tableOrType); + } } @Override public String toString() { - return change + " " + keyspace + (table.isEmpty() ? "" : "." + table); + return change + " " + target + " " + keyspace + (tableOrType == null ? "" : "." + tableOrType); + } + + @Override + public int hashCode() + { + return Objects.hashCode(change, target, keyspace, tableOrType); + } + + @Override + public boolean equals(Object other) + { + if (!(other instanceof SchemaChange)) + return false; + + SchemaChange scc = (SchemaChange)other; + return Objects.equal(change, scc.change) + && Objects.equal(target, scc.target) + && Objects.equal(keyspace, scc.keyspace) + && Objects.equal(tableOrType, scc.tableOrType); } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/9872b74e/src/java/org/apache/cassandra/transport/OptionCodec.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/OptionCodec.java b/src/java/org/apache/cassandra/transport/OptionCodec.java index 9b82bda..ec2a1fa 100644 --- a/src/java/org/apache/cassandra/transport/OptionCodec.java +++ b/src/java/org/apache/cassandra/transport/OptionCodec.java @@ -32,9 +32,9 @@ public class OptionCodec & OptionCodec.Codecable> { public int getId(); - public Object readValue(ByteBuf cb); - public void writeValue(Object value, ByteBuf cb); - public int serializedValueSize(Object obj); + public Object readValue(ByteBuf cb, int version); + public void writeValue(Object value, ByteBuf cb, int version); + public int serializedValueSize(Object obj, int version); } private final Class klass; @@ -66,14 +66,14 @@ public class OptionCodec & OptionCodec.Codecable> return opt; } - public Map decode(ByteBuf body) + public Map decode(ByteBuf body, int version) { EnumMap options = new EnumMap(klass); int n = body.readUnsignedShort(); for (int i = 0; i < n; i++) { T opt = fromId(body.readUnsignedShort()); - Object value = opt.readValue(body); + Object value = opt.readValue(body, version); if (options.containsKey(opt)) throw new ProtocolException(String.format("Duplicate option %s in message", opt.name())); options.put(opt, value); @@ -81,41 +81,41 @@ public class OptionCodec & OptionCodec.Codecable> return options; } - public ByteBuf encode(Map options) + public ByteBuf encode(Map options, int version) { int optLength = 2; for (Map.Entry entry : options.entrySet()) - optLength += 2 + entry.getKey().serializedValueSize(entry.getValue()); + optLength += 2 + entry.getKey().serializedValueSize(entry.getValue(), version); ByteBuf cb = Unpooled.buffer(optLength); cb.writeShort(options.size()); for (Map.Entry entry : options.entrySet()) { T opt = entry.getKey(); cb.writeShort(opt.getId()); - opt.writeValue(entry.getValue(), cb); + opt.writeValue(entry.getValue(), cb, version); } return cb; } - public Pair decodeOne(ByteBuf body) + public Pair decodeOne(ByteBuf body, int version) { T opt = fromId(body.readUnsignedShort()); - Object value = opt.readValue(body); + Object value = opt.readValue(body, version); return Pair.create(opt, value); } - public void writeOne(Pair option, ByteBuf dest) + public void writeOne(Pair option, ByteBuf dest, int version) { T opt = option.left; Object obj = option.right; dest.writeShort(opt.getId()); - opt.writeValue(obj, dest); + opt.writeValue(obj, dest, version); } - public int oneSerializedSize(Pair option) + public int oneSerializedSize(Pair option, int version) { T opt = option.left; Object obj = option.right; - return 2 + opt.serializedValueSize(obj); + return 2 + opt.serializedValueSize(obj, version); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/9872b74e/src/java/org/apache/cassandra/transport/Server.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/Server.java b/src/java/org/apache/cassandra/transport/Server.java index 8d08ffd..eb2b043 100644 --- a/src/java/org/apache/cassandra/transport/Server.java +++ b/src/java/org/apache/cassandra/transport/Server.java @@ -378,7 +378,12 @@ public class Server implements CassandraDaemon.Server public void onCreateColumnFamily(String ksName, String cfName) { - server.connectionTracker.send(new Event.SchemaChange(Event.SchemaChange.Change.CREATED, ksName, cfName)); + server.connectionTracker.send(new Event.SchemaChange(Event.SchemaChange.Change.CREATED, Event.SchemaChange.Target.TABLE, ksName, cfName)); + } + + public void onCreateUserType(String ksName, String typeName) + { + server.connectionTracker.send(new Event.SchemaChange(Event.SchemaChange.Change.CREATED, Event.SchemaChange.Target.TYPE, ksName, typeName)); } public void onUpdateKeyspace(String ksName) @@ -388,7 +393,12 @@ public class Server implements CassandraDaemon.Server public void onUpdateColumnFamily(String ksName, String cfName) { - server.connectionTracker.send(new Event.SchemaChange(Event.SchemaChange.Change.UPDATED, ksName, cfName)); + server.connectionTracker.send(new Event.SchemaChange(Event.SchemaChange.Change.UPDATED, Event.SchemaChange.Target.TABLE, ksName, cfName)); + } + + public void onUpdateUserType(String ksName, String typeName) + { + server.connectionTracker.send(new Event.SchemaChange(Event.SchemaChange.Change.UPDATED, Event.SchemaChange.Target.TYPE, ksName, typeName)); } public void onDropKeyspace(String ksName) @@ -398,7 +408,12 @@ public class Server implements CassandraDaemon.Server public void onDropColumnFamily(String ksName, String cfName) { - server.connectionTracker.send(new Event.SchemaChange(Event.SchemaChange.Change.DROPPED, ksName, cfName)); + server.connectionTracker.send(new Event.SchemaChange(Event.SchemaChange.Change.DROPPED, Event.SchemaChange.Target.TABLE, ksName, cfName)); + } + + public void onDropUserType(String ksName, String typeName) + { + server.connectionTracker.send(new Event.SchemaChange(Event.SchemaChange.Change.DROPPED, Event.SchemaChange.Target.TYPE, ksName, typeName)); } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/9872b74e/src/java/org/apache/cassandra/transport/SimpleClient.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/SimpleClient.java b/src/java/org/apache/cassandra/transport/SimpleClient.java index ef56882..3cf9b7b 100644 --- a/src/java/org/apache/cassandra/transport/SimpleClient.java +++ b/src/java/org/apache/cassandra/transport/SimpleClient.java @@ -157,7 +157,7 @@ public class SimpleClient public ResultMessage execute(String query, List values, ConsistencyLevel consistencyLevel) { - Message.Response msg = execute(new QueryMessage(query, new QueryOptions(consistencyLevel, values))); + Message.Response msg = execute(new QueryMessage(query, QueryOptions.forInternalCalls(consistencyLevel, values))); assert msg instanceof ResultMessage; return (ResultMessage)msg; } @@ -171,7 +171,7 @@ public class SimpleClient public ResultMessage executePrepared(byte[] statementId, List values, ConsistencyLevel consistency) { - Message.Response msg = execute(new ExecuteMessage(MD5Digest.wrap(statementId), new QueryOptions(consistency, values))); + Message.Response msg = execute(new ExecuteMessage(MD5Digest.wrap(statementId), QueryOptions.forInternalCalls(consistency, values))); assert msg instanceof ResultMessage; return (ResultMessage)msg; }