Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 7524E200B66 for ; Wed, 3 Aug 2016 17:45:16 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 742FD160AB1; Wed, 3 Aug 2016 15:45:16 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 861D4160AB0 for ; Wed, 3 Aug 2016 17:45:14 +0200 (CEST) Received: (qmail 96894 invoked by uid 500); 3 Aug 2016 15:45:08 -0000 Mailing-List: contact commits-help@cassandra.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@cassandra.apache.org Delivered-To: mailing list commits@cassandra.apache.org Received: (qmail 96065 invoked by uid 99); 3 Aug 2016 15:45:08 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 03 Aug 2016 15:45:08 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id C81E1EEB00; Wed, 3 Aug 2016 15:45:07 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: slebresne@apache.org To: commits@cassandra.apache.org Date: Wed, 03 Aug 2016 15:45:14 -0000 Message-Id: <4023432e23914d26af270867dd95dabe@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [08/14] cassandra git commit: Fix RTE on mixed-version cluster due to CDC schema changes. archived-at: Wed, 03 Aug 2016 15:45:16 -0000 Fix RTE on mixed-version cluster due to CDC schema changes. Patch by jmckenzie and slebresne; reviewed by ayeschenko for CASSANDRA-12236 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/26838063 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/26838063 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/26838063 Branch: refs/heads/cassandra-3.8 Commit: 26838063de6246e3a1e18062114ca92fb81c00cf Parents: b27e2f9 Author: Josh McKenzie Authored: Thu Jul 21 12:45:13 2016 -0400 Committer: Sylvain Lebresne Committed: Wed Aug 3 17:41:24 2016 +0200 ---------------------------------------------------------------------- CHANGES.txt | 1 + NEWS.txt | 10 +- .../cassandra/batchlog/BatchlogManager.java | 19 +- .../batchlog/LegacyBatchlogMigrator.java | 9 +- src/java/org/apache/cassandra/db/Mutation.java | 66 +++ .../apache/cassandra/db/RowUpdateBuilder.java | 400 ---------------- .../org/apache/cassandra/db/SimpleBuilders.java | 461 +++++++++++++++++++ .../org/apache/cassandra/db/SystemKeyspace.java | 11 +- .../db/partitions/AbstractBTreePartition.java | 2 +- .../db/partitions/PartitionUpdate.java | 154 +++++++ src/java/org/apache/cassandra/db/rows/Row.java | 99 ++++ src/java/org/apache/cassandra/db/rows/Rows.java | 16 + .../apache/cassandra/db/transform/BaseRows.java | 3 +- .../cassandra/schema/LegacySchemaMigrator.java | 12 +- .../apache/cassandra/schema/SchemaKeyspace.java | 427 +++++++++-------- .../cassandra/service/MigrationManager.java | 8 +- .../apache/cassandra/tracing/TraceKeyspace.java | 52 ++- .../org/apache/cassandra/UpdateBuilder.java | 56 +-- test/unit/org/apache/cassandra/Util.java | 26 +- .../apache/cassandra/batchlog/BatchTest.java | 17 +- .../apache/cassandra/config/CFMetaDataTest.java | 2 +- .../apache/cassandra/cql3/CDCStatementTest.java | 10 + .../entities/RowUpdateBuilderTest.java | 79 ---- .../db/RecoveryManagerMissingHeaderTest.java | 4 +- .../cassandra/db/RecoveryManagerTest.java | 8 +- .../apache/cassandra/db/RowUpdateBuilder.java | 196 ++++++++ .../cassandra/db/compaction/TTLExpiryTest.java | 2 +- .../db/partition/PartitionUpdateTest.java | 23 +- .../org/apache/cassandra/hints/HintTest.java | 56 +-- .../hints/LegacyHintsMigratorTest.java | 3 +- .../org/apache/cassandra/schema/DefsTest.java | 2 +- .../schema/LegacySchemaMigratorTest.java | 111 ++--- .../cassandra/schema/SchemaKeyspaceTest.java | 6 +- .../cassandra/service/DataResolverTest.java | 2 +- .../streaming/StreamingTransferTest.java | 2 +- 35 files changed, 1448 insertions(+), 907 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/26838063/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 4330fde..388a290 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 3.8 + * RTE from new CDC column breaks in flight queries (CASSANDRA-12236) * Fix hdr logging for single operation workloads (CASSANDRA-12145) * Fix SASI PREFIX search in CONTAINS mode with partial terms (CASSANDRA-12073) * Increase size of flushExecutor thread pool (CASSANDRA-12071) http://git-wip-us.apache.org/repos/asf/cassandra/blob/26838063/NEWS.txt ---------------------------------------------------------------------- diff --git a/NEWS.txt b/NEWS.txt index 7418f3a..d8d84f5 100644 --- a/NEWS.txt +++ b/NEWS.txt @@ -39,6 +39,9 @@ New features the data/cdc_raw directory until removed by the user and writes to CDC-enabled tables will be rejected with a WriteTimeoutException once cdc_total_space_in_mb is reached between unflushed CommitLogSegments and cdc_raw. + NOTE: CDC is disabled by default in the .yaml file. Do not enable CDC on a mixed-version + cluster as it will lead to exceptions which can interrupt traffic. Once all nodes + have been upgraded to 3.8 it is safe to enable this feature and restart the cluster. Upgrading --------- @@ -48,13 +51,6 @@ Upgrading those under a different name, change your code to use the new names and drop the old versions, and this _before_ upgrade (see CASSANDRA-10783 for more details). - - Due to changes in schema migration handling and the storage format after 3.0, you will - see error messages such as: - "java.lang.RuntimeException: Unknown column cdc during deserialization" - in your system logs on a mixed-version cluster during upgrades. This error message - is harmless and due to the 3.8 nodes having cdc added to their schema tables while - the <3.8 nodes do not. This message should cease once all nodes are upgraded to 3.8. - As always, refrain from schema changes during cluster upgrades. Deprecation ----------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/26838063/src/java/org/apache/cassandra/batchlog/BatchlogManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/batchlog/BatchlogManager.java b/src/java/org/apache/cassandra/batchlog/BatchlogManager.java index f5133bb..0bc9185 100644 --- a/src/java/org/apache/cassandra/batchlog/BatchlogManager.java +++ b/src/java/org/apache/cassandra/batchlog/BatchlogManager.java @@ -121,20 +121,15 @@ public class BatchlogManager implements BatchlogManagerMBean public static void store(Batch batch, boolean durableWrites) { - RowUpdateBuilder builder = - new RowUpdateBuilder(SystemKeyspace.Batches, batch.creationTime, batch.id) - .clustering() - .add("version", MessagingService.current_version); - - for (ByteBuffer mutation : batch.encodedMutations) - builder.addListEntry("mutations", mutation); + List mutations = new ArrayList<>(batch.encodedMutations.size() + batch.decodedMutations.size()); + mutations.addAll(batch.encodedMutations); for (Mutation mutation : batch.decodedMutations) { try (DataOutputBuffer buffer = new DataOutputBuffer()) { Mutation.serializer.serialize(mutation, buffer, MessagingService.current_version); - builder.addListEntry("mutations", buffer.buffer()); + mutations.add(buffer.buffer()); } catch (IOException e) { @@ -143,7 +138,13 @@ public class BatchlogManager implements BatchlogManagerMBean } } - builder.build().apply(durableWrites); + PartitionUpdate.SimpleBuilder builder = PartitionUpdate.simpleBuilder(SystemKeyspace.Batches, batch.id); + builder.row() + .timestamp(batch.creationTime) + .add("version", MessagingService.current_version) + .appendAll("mutations", mutations); + + builder.buildAsMutation().apply(durableWrites); } @VisibleForTesting http://git-wip-us.apache.org/repos/asf/cassandra/blob/26838063/src/java/org/apache/cassandra/batchlog/LegacyBatchlogMigrator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/batchlog/LegacyBatchlogMigrator.java b/src/java/org/apache/cassandra/batchlog/LegacyBatchlogMigrator.java index dd19f19..3a8bf83 100644 --- a/src/java/org/apache/cassandra/batchlog/LegacyBatchlogMigrator.java +++ b/src/java/org/apache/cassandra/batchlog/LegacyBatchlogMigrator.java @@ -162,12 +162,13 @@ public final class LegacyBatchlogMigrator @SuppressWarnings("deprecation") static Mutation getStoreMutation(Batch batch, int version) { - return new RowUpdateBuilder(SystemKeyspace.LegacyBatchlog, batch.creationTime, batch.id) - .clustering() + PartitionUpdate.SimpleBuilder builder = PartitionUpdate.simpleBuilder(SystemKeyspace.LegacyBatchlog, batch.id); + builder.row() + .timestamp(batch.creationTime) .add("written_at", new Date(batch.creationTime / 1000)) .add("data", getSerializedMutations(version, batch.decodedMutations)) - .add("version", version) - .build(); + .add("version", version); + return builder.buildAsMutation(); } @SuppressWarnings("deprecation") http://git-wip-us.apache.org/repos/asf/cassandra/blob/26838063/src/java/org/apache/cassandra/db/Mutation.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/Mutation.java b/src/java/org/apache/cassandra/db/Mutation.java index 61e5ee9..b8639a7 100644 --- a/src/java/org/apache/cassandra/db/Mutation.java +++ b/src/java/org/apache/cassandra/db/Mutation.java @@ -302,6 +302,72 @@ public class Mutation implements IMutation return buff.append("])").toString(); } + /** + * Creates a new simple mutuation builder. + * + * @param keyspaceName the name of the keyspace this is a mutation for. + * @param partitionKey the key of partition this if a mutation for. + * @return a newly created builder. + */ + public static SimpleBuilder simpleBuilder(String keyspaceName, DecoratedKey partitionKey) + { + return new SimpleBuilders.MutationBuilder(keyspaceName, partitionKey); + } + + /** + * Interface for building mutations geared towards human. + *

+ * This should generally not be used when performance matters too much, but provides a more convenient interface to + * build a mutation than using the class constructor when performance is not of the utmost importance. + */ + public interface SimpleBuilder + { + /** + * Sets the timestamp to use for the following additions to this builder or any derived (update or row) builder. + * + * @param timestamp the timestamp to use for following additions. If that timestamp hasn't been set, the current + * time in microseconds will be used. + * @return this builder. + */ + public SimpleBuilder timestamp(long timestamp); + + /** + * Sets the ttl to use for the following additions to this builder or any derived (update or row) builder. + *

+ * Note that the for non-compact tables, this method must be called before any column addition for this + * ttl to be used for the row {@code LivenessInfo}. + * + * @param ttl the ttl to use for following additions. If that ttl hasn't been set, no ttl will be used. + * @return this builder. + */ + public SimpleBuilder ttl(int ttl); + + /** + * Adds an update for table identified by the provided metadata and return a builder for that partition. + * + * @param metadata the metadata of the table for which to add an update. + * @return a builder for the partition identified by {@code metadata} (and the partition key for which this is a + * mutation of). + */ + public PartitionUpdate.SimpleBuilder update(CFMetaData metadata); + + /** + * Adds an update for table identified by the provided name and return a builder for that partition. + * + * @param tableName the name of the table for which to add an update. + * @return a builder for the partition identified by {@code metadata} (and the partition key for which this is a + * mutation of). + */ + public PartitionUpdate.SimpleBuilder update(String tableName); + + /** + * Build the mutation represented by this builder. + * + * @return the built mutation. + */ + public Mutation build(); + } + public static class MutationSerializer implements IVersionedSerializer { public void serialize(Mutation mutation, DataOutputPlus out, int version) throws IOException http://git-wip-us.apache.org/repos/asf/cassandra/blob/26838063/src/java/org/apache/cassandra/db/RowUpdateBuilder.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/RowUpdateBuilder.java b/src/java/org/apache/cassandra/db/RowUpdateBuilder.java deleted file mode 100644 index b414eba..0000000 --- a/src/java/org/apache/cassandra/db/RowUpdateBuilder.java +++ /dev/null @@ -1,400 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.db; - -import java.nio.ByteBuffer; -import java.util.List; -import java.util.Map; -import java.util.Set; - -import org.apache.cassandra.cql3.ColumnIdentifier; -import org.apache.cassandra.config.CFMetaData; -import org.apache.cassandra.config.ColumnDefinition; -import org.apache.cassandra.db.marshal.SetType; -import org.apache.cassandra.db.rows.*; -import org.apache.cassandra.db.context.CounterContext; -import org.apache.cassandra.db.partitions.*; -import org.apache.cassandra.db.marshal.AbstractType; -import org.apache.cassandra.db.marshal.ListType; -import org.apache.cassandra.db.marshal.MapType; -import org.apache.cassandra.utils.*; - -/** - * Convenience object to create single row updates. - * - * This is meant for system table update, when performance is not of the utmost importance. - */ -public class RowUpdateBuilder -{ - private final PartitionUpdate update; - - private final long timestamp; - private final int ttl; - private final int localDeletionTime; - - private final DeletionTime deletionTime; - - private final Mutation mutation; - - private Row.Builder regularBuilder; - private Row.Builder staticBuilder; - - private boolean useRowMarker = true; - - private RowUpdateBuilder(PartitionUpdate update, long timestamp, int ttl, int localDeletionTime, Mutation mutation) - { - this.update = update; - - this.timestamp = timestamp; - this.ttl = ttl; - this.localDeletionTime = localDeletionTime; - this.deletionTime = new DeletionTime(timestamp, localDeletionTime); - - // note that the created mutation may get further update later on, so we don't use the ctor that create a singletonMap - // underneath (this class if for convenience, not performance) - this.mutation = mutation == null ? new Mutation(update.metadata().ksName, update.partitionKey()).add(update) : mutation; - } - - private RowUpdateBuilder(PartitionUpdate update, long timestamp, int ttl, Mutation mutation) - { - this(update, timestamp, ttl, FBUtilities.nowInSeconds(), mutation); - } - - private void startRow(Clustering clustering) - { - assert staticBuilder == null : "Cannot update both static and non-static columns with the same RowUpdateBuilder object"; - assert regularBuilder == null : "Cannot add the clustering twice to the same row"; - - regularBuilder = BTreeRow.unsortedBuilder(FBUtilities.nowInSeconds()); - regularBuilder.newRow(clustering); - - // If a CQL table, add the "row marker" - if (update.metadata().isCQLTable() && useRowMarker) - regularBuilder.addPrimaryKeyLivenessInfo(LivenessInfo.create(timestamp, ttl, localDeletionTime)); - } - - private Row.Builder builder() - { - assert staticBuilder == null : "Cannot update both static and non-static columns with the same RowUpdateBuilder object"; - if (regularBuilder == null) - { - // we don't force people to call clustering() if the table has no clustering, so call it ourselves - assert update.metadata().comparator.size() == 0 : "Missing call to clustering()"; - startRow(Clustering.EMPTY); - } - return regularBuilder; - } - - private Row.Builder staticBuilder() - { - assert regularBuilder == null : "Cannot update both static and non-static columns with the same RowUpdateBuilder object"; - if (staticBuilder == null) - { - staticBuilder = BTreeRow.unsortedBuilder(FBUtilities.nowInSeconds()); - staticBuilder.newRow(Clustering.STATIC_CLUSTERING); - } - return staticBuilder; - } - - private Row.Builder builder(ColumnDefinition c) - { - return c.isStatic() ? staticBuilder() : builder(); - } - - public RowUpdateBuilder(CFMetaData metadata, long timestamp, Object partitionKey) - { - this(metadata, FBUtilities.nowInSeconds(), timestamp, partitionKey); - } - - public RowUpdateBuilder(CFMetaData metadata, int localDeletionTime, long timestamp, Object partitionKey) - { - this(metadata, localDeletionTime, timestamp, metadata.params.defaultTimeToLive, partitionKey); - } - - public RowUpdateBuilder(CFMetaData metadata, long timestamp, int ttl, Object partitionKey) - { - this(metadata, FBUtilities.nowInSeconds(), timestamp, ttl, partitionKey); - } - - public RowUpdateBuilder(CFMetaData metadata, int localDeletionTime, long timestamp, int ttl, Object partitionKey) - { - this(new PartitionUpdate(metadata, makeKey(metadata, partitionKey), metadata.partitionColumns(), 1), timestamp, ttl, localDeletionTime, null); - } - - public RowUpdateBuilder(CFMetaData metadata, long timestamp, Mutation mutation) - { - this(metadata, timestamp, LivenessInfo.NO_TTL, mutation); - } - - public RowUpdateBuilder(CFMetaData metadata, long timestamp, int ttl, Mutation mutation) - { - this(getOrAdd(metadata, mutation), timestamp, ttl, mutation); - } - - public RowUpdateBuilder(PartitionUpdate update, long timestamp, int ttl) - { - this(update, timestamp, ttl, null); - } - - // This must be called before any addition or deletion if used. - public RowUpdateBuilder noRowMarker() - { - this.useRowMarker = false; - return this; - } - - public RowUpdateBuilder clustering(Object... clusteringValues) - { - assert clusteringValues.length == update.metadata().comparator.size() - : "Invalid clustering values length. Expected: " + update.metadata().comparator.size() + " got: " + clusteringValues.length; - - startRow(clusteringValues.length == 0 ? Clustering.EMPTY : update.metadata().comparator.make(clusteringValues)); - return this; - } - - public Mutation build() - { - Row.Builder builder = regularBuilder == null ? staticBuilder : regularBuilder; - if (builder != null) - update.add(builder.build()); - return mutation; - } - - public PartitionUpdate buildUpdate() - { - build(); - return update; - } - - private static void deleteRow(PartitionUpdate update, long timestamp, int localDeletionTime, Object... clusteringValues) - { - assert clusteringValues.length == update.metadata().comparator.size() || (clusteringValues.length == 0 && !update.columns().statics.isEmpty()); - - boolean isStatic = clusteringValues.length != update.metadata().comparator.size(); - Row.Builder builder = BTreeRow.sortedBuilder(); - - if (isStatic) - builder.newRow(Clustering.STATIC_CLUSTERING); - else - builder.newRow(clusteringValues.length == 0 ? Clustering.EMPTY : update.metadata().comparator.make(clusteringValues)); - builder.addRowDeletion(Row.Deletion.regular(new DeletionTime(timestamp, localDeletionTime))); - - update.add(builder.build()); - } - - public static Mutation deleteRow(CFMetaData metadata, long timestamp, Mutation mutation, Object... clusteringValues) - { - deleteRow(getOrAdd(metadata, mutation), timestamp, FBUtilities.nowInSeconds(), clusteringValues); - return mutation; - } - - public static Mutation deleteRow(CFMetaData metadata, long timestamp, Object key, Object... clusteringValues) - { - return deleteRowAt(metadata, timestamp, FBUtilities.nowInSeconds(), key, clusteringValues); - } - - public static Mutation deleteRowAt(CFMetaData metadata, long timestamp, int localDeletionTime, Object key, Object... clusteringValues) - { - PartitionUpdate update = new PartitionUpdate(metadata, makeKey(metadata, key), metadata.partitionColumns(), 0); - deleteRow(update, timestamp, localDeletionTime, clusteringValues); - // note that the created mutation may get further update later on, so we don't use the ctor that create a singletonMap - // underneath (this class if for convenience, not performance) - return new Mutation(update.metadata().ksName, update.partitionKey()).add(update); - } - - private static DecoratedKey makeKey(CFMetaData metadata, Object... partitionKey) - { - if (partitionKey.length == 1 && partitionKey[0] instanceof DecoratedKey) - return (DecoratedKey)partitionKey[0]; - - ByteBuffer key = CFMetaData.serializePartitionKey(metadata.getKeyValidatorAsClusteringComparator().make(partitionKey)); - return metadata.decorateKey(key); - } - - private static PartitionUpdate getOrAdd(CFMetaData metadata, Mutation mutation) - { - PartitionUpdate upd = mutation.get(metadata); - if (upd == null) - { - upd = new PartitionUpdate(metadata, mutation.key(), metadata.partitionColumns(), 1); - mutation.add(upd); - } - return upd; - } - - public RowUpdateBuilder resetCollection(String columnName) - { - ColumnDefinition c = getDefinition(columnName); - assert c != null : "Cannot find column " + columnName; - assert c.isStatic() || update.metadata().comparator.size() == 0 || regularBuilder != null : "Cannot set non static column " + c + " since no clustering has been provided"; - assert c.type.isCollection() && c.type.isMultiCell(); - builder(c).addComplexDeletion(c, new DeletionTime(timestamp - 1, localDeletionTime)); - return this; - } - - public RowUpdateBuilder addRangeTombstone(RangeTombstone rt) - { - update.add(rt); - return this; - } - - public RowUpdateBuilder addRangeTombstone(Slice slice) - { - return addRangeTombstone(new RangeTombstone(slice, deletionTime)); - } - - public RowUpdateBuilder addRangeTombstone(Object start, Object end) - { - ClusteringComparator cmp = update.metadata().comparator; - Slice slice = Slice.make(cmp.make(start), cmp.make(end)); - return addRangeTombstone(slice); - } - - public RowUpdateBuilder add(String columnName, Object value) - { - ColumnDefinition c = getDefinition(columnName); - assert c != null : "Cannot find column " + columnName; - return add(c, value); - } - - private Cell makeCell(ColumnDefinition c, ByteBuffer value, CellPath path) - { - return value == null - ? BufferCell.tombstone(c, timestamp, localDeletionTime) - : (ttl == LivenessInfo.NO_TTL ? BufferCell.live(c, timestamp, value, path) : BufferCell.expiring(c, timestamp, ttl, localDeletionTime, value, path)); - } - - public RowUpdateBuilder add(ColumnDefinition columnDefinition, Object value) - { - assert columnDefinition.isStatic() || update.metadata().comparator.size() == 0 || regularBuilder != null : "Cannot set non static column " + columnDefinition + " since no clustering hasn't been provided"; - builder(columnDefinition).addCell(makeCell(columnDefinition, bb(value, columnDefinition.type), null)); - return this; - } - - public RowUpdateBuilder delete(String columnName) - { - ColumnDefinition c = getDefinition(columnName); - assert c != null : "Cannot find column " + columnName; - return delete(c); - } - - public RowUpdateBuilder delete(ColumnDefinition columnDefinition) - { - return add(columnDefinition, null); - } - - private static ByteBuffer bb(Object value, AbstractType type) - { - if (value == null) - return null; - - if (value instanceof ByteBuffer) - return (ByteBuffer)value; - - if (type.isCounter()) - { - // See UpdateParameters.addCounter() - assert value instanceof Long : "Attempted to adjust Counter cell with non-long value."; - return CounterContext.instance().createGlobal(CounterId.getLocalId(), 1, (Long)value); - } - return ((AbstractType)type).decompose(value); - } - - public RowUpdateBuilder map(String columnName, Map map) - { - resetCollection(columnName); - for (Map.Entry entry : map.entrySet()) - addMapEntry(columnName, entry.getKey(), entry.getValue()); - return this; - } - - public RowUpdateBuilder set(String columnName, Set set) - { - resetCollection(columnName); - for (Object element : set) - addSetEntry(columnName, element); - return this; - } - - public RowUpdateBuilder frozenList(String columnName, List list) - { - ColumnDefinition c = getDefinition(columnName); - assert c.isStatic() || regularBuilder != null : "Cannot set non static column " + c + " since no clustering has been provided"; - assert c.type instanceof ListType && !c.type.isMultiCell() : "Column " + c + " is not a frozen list"; - builder(c).addCell(makeCell(c, bb(((AbstractType)c.type).decompose(list), c.type), null)); - return this; - } - - public RowUpdateBuilder frozenSet(String columnName, Set set) - { - ColumnDefinition c = getDefinition(columnName); - assert c.isStatic() || regularBuilder != null : "Cannot set non static column " + c + " since no clustering has been provided"; - assert c.type instanceof SetType && !c.type.isMultiCell() : "Column " + c + " is not a frozen set"; - builder(c).addCell(makeCell(c, bb(((AbstractType)c.type).decompose(set), c.type), null)); - return this; - } - - public RowUpdateBuilder frozenMap(String columnName, Map map) - { - ColumnDefinition c = getDefinition(columnName); - assert c.isStatic() || regularBuilder != null : "Cannot set non static column " + c + " since no clustering has been provided"; - assert c.type instanceof MapType && !c.type.isMultiCell() : "Column " + c + " is not a frozen map"; - builder(c).addCell(makeCell(c, bb(((AbstractType)c.type).decompose(map), c.type), null)); - return this; - } - - public RowUpdateBuilder addMapEntry(String columnName, Object key, Object value) - { - ColumnDefinition c = getDefinition(columnName); - assert c.isStatic() || update.metadata().comparator.size() == 0 || regularBuilder != null : "Cannot set non static column " + c + " since no clustering has been provided"; - assert c.type instanceof MapType && c.type.isMultiCell() : "Column " + c + " is not a non-frozen map"; - MapType mt = (MapType)c.type; - builder(c).addCell(makeCell(c, bb(value, mt.getValuesType()), CellPath.create(bb(key, mt.getKeysType())))); - return this; - } - - public RowUpdateBuilder addListEntry(String columnName, Object value) - { - ColumnDefinition c = getDefinition(columnName); - assert c.isStatic() || regularBuilder != null : "Cannot set non static column " + c + " since no clustering has been provided"; - assert c.type instanceof ListType && c.type.isMultiCell() : "Column " + c + " is not a non-frozen list"; - ListType lt = (ListType)c.type; - builder(c).addCell(makeCell(c, bb(value, lt.getElementsType()), CellPath.create(ByteBuffer.wrap(UUIDGen.getTimeUUIDBytes())))); - return this; - } - - public RowUpdateBuilder addSetEntry(String columnName, Object value) - { - ColumnDefinition c = getDefinition(columnName); - assert c.isStatic() || regularBuilder != null : "Cannot set non static column " + c + " since no clustering has been provided"; - assert c.type instanceof SetType && c.type.isMultiCell() : "Column " + c + " is not a non-frozen set"; - SetType st = (SetType)c.type; - builder(c).addCell(makeCell(c, ByteBufferUtil.EMPTY_BYTE_BUFFER, CellPath.create(bb(value, st.getElementsType())))); - return this; - } - - private ColumnDefinition getDefinition(String name) - { - return update.metadata().getColumnDefinition(new ColumnIdentifier(name, true)); - } - - public UnfilteredRowIterator unfilteredIterator() - { - return update.unfilteredIterator(); - } -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/26838063/src/java/org/apache/cassandra/db/SimpleBuilders.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/SimpleBuilders.java b/src/java/org/apache/cassandra/db/SimpleBuilders.java new file mode 100644 index 0000000..6e65743 --- /dev/null +++ b/src/java/org/apache/cassandra/db/SimpleBuilders.java @@ -0,0 +1,461 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db; + +import java.nio.ByteBuffer; +import java.util.*; + +import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.config.ColumnDefinition; +import org.apache.cassandra.config.Schema; +import org.apache.cassandra.cql3.ColumnIdentifier; +import org.apache.cassandra.db.context.CounterContext; +import org.apache.cassandra.db.partitions.PartitionUpdate; +import org.apache.cassandra.db.rows.BTreeRow; +import org.apache.cassandra.db.rows.BufferCell; +import org.apache.cassandra.db.rows.Cell; +import org.apache.cassandra.db.rows.CellPath; +import org.apache.cassandra.db.rows.Row; +import org.apache.cassandra.db.marshal.*; +import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.cassandra.utils.CounterId; +import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.UUIDGen; + +public abstract class SimpleBuilders +{ + private SimpleBuilders() + { + } + + private static DecoratedKey makePartitonKey(CFMetaData metadata, Object... partitionKey) + { + if (partitionKey.length == 1 && partitionKey[0] instanceof DecoratedKey) + return (DecoratedKey)partitionKey[0]; + + ByteBuffer key = CFMetaData.serializePartitionKey(metadata.getKeyValidatorAsClusteringComparator().make(partitionKey)); + return metadata.decorateKey(key); + } + + private static Clustering makeClustering(CFMetaData metadata, Object... clusteringColumns) + { + if (clusteringColumns.length == 1 && clusteringColumns[0] instanceof Clustering) + return (Clustering)clusteringColumns[0]; + + if (clusteringColumns.length == 0) + { + // If the table has clustering columns, passing no values is for updating the static values, so check we + // do have some static columns defined. + assert metadata.comparator.size() == 0 || !metadata.partitionColumns().statics.isEmpty(); + return metadata.comparator.size() == 0 ? Clustering.EMPTY : Clustering.STATIC_CLUSTERING; + } + else + { + return metadata.comparator.make(clusteringColumns); + } + } + + private static class AbstractBuilder + { + protected long timestamp = FBUtilities.timestampMicros(); + protected int ttl = 0; + protected int nowInSec = FBUtilities.nowInSeconds(); + + protected void copyParams(AbstractBuilder other) + { + other.timestamp = timestamp; + other.ttl = ttl; + other.nowInSec = nowInSec; + } + + public T timestamp(long timestamp) + { + this.timestamp = timestamp; + return (T)this; + } + + public T ttl(int ttl) + { + this.ttl = ttl; + return (T)this; + } + + public T nowInSec(int nowInSec) + { + this.nowInSec = nowInSec; + return (T)this; + } + } + + public static class MutationBuilder extends AbstractBuilder implements Mutation.SimpleBuilder + { + private final String keyspaceName; + private final DecoratedKey key; + + private final Map updateBuilders = new HashMap<>(); + + public MutationBuilder(String keyspaceName, DecoratedKey key) + { + this.keyspaceName = keyspaceName; + this.key = key; + } + + public PartitionUpdate.SimpleBuilder update(CFMetaData metadata) + { + assert metadata.ksName.equals(keyspaceName); + + PartitionUpdateBuilder builder = updateBuilders.get(metadata.cfId); + if (builder == null) + { + builder = new PartitionUpdateBuilder(metadata, key); + updateBuilders.put(metadata.cfId, builder); + } + + copyParams(builder); + + return builder; + } + + public PartitionUpdate.SimpleBuilder update(String tableName) + { + CFMetaData metadata = Schema.instance.getCFMetaData(keyspaceName, tableName); + assert metadata != null : "Unknown table " + tableName + " in keyspace " + keyspaceName; + return update(metadata); + } + + public Mutation build() + { + assert !updateBuilders.isEmpty() : "Cannot create empty mutation"; + + if (updateBuilders.size() == 1) + return new Mutation(updateBuilders.values().iterator().next().build()); + + Mutation mutation = new Mutation(keyspaceName, key); + for (PartitionUpdateBuilder builder : updateBuilders.values()) + mutation.add(builder.build()); + return mutation; + } + } + + public static class PartitionUpdateBuilder extends AbstractBuilder implements PartitionUpdate.SimpleBuilder + { + private final CFMetaData metadata; + private final DecoratedKey key; + private final Map rowBuilders = new HashMap<>(); + private List rangeBuilders = null; // We use that rarely, so create lazily + + private DeletionTime partitionDeletion = DeletionTime.LIVE; + + public PartitionUpdateBuilder(CFMetaData metadata, Object... partitionKeyValues) + { + this.metadata = metadata; + this.key = makePartitonKey(metadata, partitionKeyValues); + } + + public CFMetaData metadata() + { + return metadata; + } + + public Row.SimpleBuilder row(Object... clusteringValues) + { + Clustering clustering = makeClustering(metadata, clusteringValues); + RowBuilder builder = rowBuilders.get(clustering); + if (builder == null) + { + builder = new RowBuilder(metadata, clustering); + rowBuilders.put(clustering, builder); + } + + copyParams(builder); + + return builder; + } + + public PartitionUpdate.SimpleBuilder delete() + { + this.partitionDeletion = new DeletionTime(timestamp, nowInSec); + return this; + } + + public RangeTombstoneBuilder addRangeTombstone() + { + if (rangeBuilders == null) + rangeBuilders = new ArrayList<>(); + + RTBuilder builder = new RTBuilder(metadata.comparator, new DeletionTime(timestamp, nowInSec)); + rangeBuilders.add(builder); + return builder; + } + + public PartitionUpdate build() + { + // Collect all updated columns + PartitionColumns.Builder columns = PartitionColumns.builder(); + for (RowBuilder builder : rowBuilders.values()) + columns.addAll(builder.columns()); + + // Note that rowBuilders.size() could include the static column so could be 1 off the really need capacity + // of the final PartitionUpdate, but as that's just a sizing hint, we'll live. + PartitionUpdate update = new PartitionUpdate(metadata, key, columns.build(), rowBuilders.size()); + + update.addPartitionDeletion(partitionDeletion); + if (rangeBuilders != null) + { + for (RTBuilder builder : rangeBuilders) + update.add(builder.build()); + } + + for (RowBuilder builder : rowBuilders.values()) + update.add(builder.build()); + + return update; + } + + public Mutation buildAsMutation() + { + return new Mutation(build()); + } + + private static class RTBuilder implements RangeTombstoneBuilder + { + private final ClusteringComparator comparator; + private final DeletionTime deletionTime; + + private Object[] start; + private Object[] end; + + private boolean startInclusive = true; + private boolean endInclusive = true; + + private RTBuilder(ClusteringComparator comparator, DeletionTime deletionTime) + { + this.comparator = comparator; + this.deletionTime = deletionTime; + } + + public RangeTombstoneBuilder start(Object... values) + { + this.start = values; + return this; + } + + public RangeTombstoneBuilder end(Object... values) + { + this.end = values; + return this; + } + + public RangeTombstoneBuilder inclStart() + { + this.startInclusive = true; + return this; + } + + public RangeTombstoneBuilder exclStart() + { + this.startInclusive = false; + return this; + } + + public RangeTombstoneBuilder inclEnd() + { + this.endInclusive = true; + return this; + } + + public RangeTombstoneBuilder exclEnd() + { + this.endInclusive = false; + return this; + } + + private RangeTombstone build() + { + ClusteringBound startBound = ClusteringBound.create(comparator, true, startInclusive, start); + ClusteringBound endBound = ClusteringBound.create(comparator, false, endInclusive, end); + return new RangeTombstone(Slice.make(startBound, endBound), deletionTime); + } + } + } + + public static class RowBuilder extends AbstractBuilder implements Row.SimpleBuilder + { + private final CFMetaData metadata; + + private final Set columns = new HashSet<>(); + private final Row.Builder builder; + + private boolean initiated; + private boolean noPrimaryKeyLivenessInfo; + + public RowBuilder(CFMetaData metadata, Object... clusteringColumns) + { + this.metadata = metadata; + this.builder = BTreeRow.unsortedBuilder(FBUtilities.nowInSeconds()); + + this.builder.newRow(makeClustering(metadata, clusteringColumns)); + } + + Set columns() + { + return columns; + } + + private void maybeInit() + { + // We're working around the fact that Row.Builder requires that addPrimaryKeyLivenessInfo() and + // addRowDeletion() are called before any cell addition (which is done so the builder can more easily skip + // shadowed cells). + if (initiated) + return; + + // If a CQL table, add the "row marker" + if (metadata.isCQLTable() && !noPrimaryKeyLivenessInfo) + builder.addPrimaryKeyLivenessInfo(LivenessInfo.create(timestamp, ttl, nowInSec)); + + initiated = true; + } + + public Row.SimpleBuilder add(String columnName, Object value) + { + return add(columnName, value, true); + } + + public Row.SimpleBuilder appendAll(String columnName, Object value) + { + return add(columnName, value, false); + } + + private Row.SimpleBuilder add(String columnName, Object value, boolean overwriteForCollection) + { + maybeInit(); + ColumnDefinition column = getColumn(columnName); + + if (!overwriteForCollection && !(column.type.isMultiCell() && column.type.isCollection())) + throw new IllegalArgumentException("appendAll() can only be called on non-frozen colletions"); + + columns.add(column); + + if (!column.type.isMultiCell()) + { + builder.addCell(cell(column, toByteBuffer(value, column.type), null)); + return this; + } + + assert column.type instanceof CollectionType : "Collection are the only multi-cell types supported so far"; + + if (value == null) + { + builder.addComplexDeletion(column, new DeletionTime(timestamp, nowInSec)); + return this; + } + + // Erase previous entry if any. + if (overwriteForCollection) + builder.addComplexDeletion(column, new DeletionTime(timestamp - 1, nowInSec)); + switch (((CollectionType)column.type).kind) + { + case LIST: + ListType lt = (ListType)column.type; + assert value instanceof List; + for (Object elt : (List)value) + builder.addCell(cell(column, toByteBuffer(elt, lt.getElementsType()), CellPath.create(ByteBuffer.wrap(UUIDGen.getTimeUUIDBytes())))); + break; + case SET: + SetType st = (SetType)column.type; + assert value instanceof Set; + for (Object elt : (Set)value) + builder.addCell(cell(column, ByteBufferUtil.EMPTY_BYTE_BUFFER, CellPath.create(toByteBuffer(elt, st.getElementsType())))); + break; + case MAP: + MapType mt = (MapType)column.type; + assert value instanceof Map; + for (Map.Entry entry : ((Map)value).entrySet()) + builder.addCell(cell(column, + toByteBuffer(entry.getValue(), mt.getValuesType()), + CellPath.create(toByteBuffer(entry.getKey(), mt.getKeysType())))); + break; + default: + throw new AssertionError(); + } + return this; + } + + public Row.SimpleBuilder delete() + { + assert !initiated : "If called, delete() should be called before any other column value addition"; + builder.addRowDeletion(Row.Deletion.regular(new DeletionTime(timestamp, nowInSec))); + return this; + } + + public Row.SimpleBuilder delete(String columnName) + { + return add(columnName, null); + } + + public Row.SimpleBuilder noPrimaryKeyLivenessInfo() + { + this.noPrimaryKeyLivenessInfo = true; + return this; + } + + public Row build() + { + maybeInit(); + return builder.build(); + } + + private ColumnDefinition getColumn(String columnName) + { + ColumnDefinition column = metadata.getColumnDefinition(new ColumnIdentifier(columnName, true)); + assert column != null : "Cannot find column " + columnName; + assert !column.isPrimaryKeyColumn(); + assert !column.isStatic() || builder.clustering() == Clustering.STATIC_CLUSTERING : "Cannot add non-static column to static-row"; + return column; + } + + private Cell cell(ColumnDefinition column, ByteBuffer value, CellPath path) + { + if (value == null) + return BufferCell.tombstone(column, timestamp, nowInSec, path); + + return ttl == LivenessInfo.NO_TTL + ? BufferCell.live(column, timestamp, value, path) + : BufferCell.expiring(column, timestamp, ttl, nowInSec, value, path); + } + + private ByteBuffer toByteBuffer(Object value, AbstractType type) + { + if (value == null) + return null; + + if (value instanceof ByteBuffer) + return (ByteBuffer)value; + + if (type.isCounter()) + { + // See UpdateParameters.addCounter() + assert value instanceof Long : "Attempted to adjust Counter cell with non-long value."; + return CounterContext.instance().createGlobal(CounterId.getLocalId(), 1, (Long)value); + } + + return ((AbstractType)type).decompose(value); + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/26838063/src/java/org/apache/cassandra/db/SystemKeyspace.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/SystemKeyspace.java b/src/java/org/apache/cassandra/db/SystemKeyspace.java index 584279d..36629a1 100644 --- a/src/java/org/apache/cassandra/db/SystemKeyspace.java +++ b/src/java/org/apache/cassandra/db/SystemKeyspace.java @@ -44,6 +44,7 @@ import org.apache.cassandra.cql3.functions.*; import org.apache.cassandra.db.commitlog.CommitLogPosition; import org.apache.cassandra.db.compaction.CompactionHistoryTabularData; import org.apache.cassandra.db.marshal.*; +import org.apache.cassandra.db.rows.Rows; import org.apache.cassandra.db.partitions.PartitionUpdate; import org.apache.cassandra.dht.*; import org.apache.cassandra.exceptions.ConfigurationException; @@ -1233,11 +1234,11 @@ public final class SystemKeyspace { Range range = entry.getKey(); Pair values = entry.getValue(); - new RowUpdateBuilder(SizeEstimates, timestamp, mutation) - .clustering(table, range.left.toString(), range.right.toString()) - .add("partitions_count", values.left) - .add("mean_partition_size", values.right) - .build(); + update.add(Rows.simpleBuilder(SizeEstimates, table, range.left.toString(), range.right.toString()) + .timestamp(timestamp) + .add("partitions_count", values.left) + .add("mean_partition_size", values.right) + .build()); } mutation.apply(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/26838063/src/java/org/apache/cassandra/db/partitions/AbstractBTreePartition.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/partitions/AbstractBTreePartition.java b/src/java/org/apache/cassandra/db/partitions/AbstractBTreePartition.java index 1c05f3c..954168d 100644 --- a/src/java/org/apache/cassandra/db/partitions/AbstractBTreePartition.java +++ b/src/java/org/apache/cassandra/db/partitions/AbstractBTreePartition.java @@ -169,7 +169,7 @@ public abstract class AbstractBTreePartition implements Partition, Iterable public UnfilteredRowIterator unfilteredIterator() { - return unfilteredIterator(ColumnFilter.all(metadata()), Slices.ALL, false); + return unfilteredIterator(ColumnFilter.selection(columns()), Slices.ALL, false); } public UnfilteredRowIterator unfilteredIterator(ColumnFilter selection, Slices slices, boolean reversed) http://git-wip-us.apache.org/repos/asf/cassandra/blob/26838063/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java b/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java index d18392c..7796fd9 100644 --- a/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java +++ b/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java @@ -614,6 +614,160 @@ public class PartitionUpdate extends AbstractBTreePartition return sb.toString(); } + /** + * Creates a new simple partition update builder. + * + * @param metadata the metadata for the table this is a partition of. + * @param partitionKeyValues the values for partition key columns identifying this partition. The values for each + * partition key column can be passed either directly as {@code ByteBuffer} or using a "native" value (int for + * Int32Type, string for UTF8Type, ...). It is also allowed to pass a single {@code DecoratedKey} value directly. + * @return a newly created builder. + */ + public static SimpleBuilder simpleBuilder(CFMetaData metadata, Object... partitionKeyValues) + { + return new SimpleBuilders.PartitionUpdateBuilder(metadata, partitionKeyValues); + } + + /** + * Interface for building partition updates geared towards human. + *

+ * This should generally not be used when performance matters too much, but provides a more convenient interface to + * build an update than using the class constructor when performance is not of the utmost importance. + */ + public interface SimpleBuilder + { + /** + * The metadata of the table this is a builder on. + */ + public CFMetaData metadata(); + + /** + * Sets the timestamp to use for the following additions to this builder or any derived (row) builder. + * + * @param timestamp the timestamp to use for following additions. If that timestamp hasn't been set, the current + * time in microseconds will be used. + * @return this builder. + */ + public SimpleBuilder timestamp(long timestamp); + + /** + * Sets the ttl to use for the following additions to this builder or any derived (row) builder. + * + * @param ttl the ttl to use for following additions. If that ttl hasn't been set, no ttl will be used. + * @return this builder. + */ + public SimpleBuilder ttl(int ttl); + + /** + * Sets the current time to use for the following additions to this builder or any derived (row) builder. + * + * @param nowInSec the current time to use for following additions. If the current time hasn't been set, the current + * time in seconds will be used. + * @return this builder. + */ + public SimpleBuilder nowInSec(int nowInSec); + + /** + * Adds the row identifier by the provided clustering and return a builder for that row. + * + * @param clusteringValues the value for the clustering columns of the row to add to this build. There may be no + * values if either the table has no clustering column, or if you want to edit the static row. Note that as a + * shortcut it is also allowed to pass a {@code Clustering} object directly, in which case that should be the + * only argument. + * @return a builder for the row identified by {@code clusteringValues}. + */ + public Row.SimpleBuilder row(Object... clusteringValues); + + /** + * Deletes the partition identified by this builder (using a partition level deletion). + * + * @return this builder. + */ + public SimpleBuilder delete(); + + /** + * Adds a new range tombstone to this update, returning a builder for that range. + * + * @return the range tombstone builder for the newly added range. + */ + public RangeTombstoneBuilder addRangeTombstone(); + + /** + * Build the update represented by this builder. + * + * @return the built update. + */ + public PartitionUpdate build(); + + /** + * As shortcut for {@code new Mutation(build())}. + * + * @return the built update, wrapped in a {@code Mutation}. + */ + public Mutation buildAsMutation(); + + /** + * Interface to build range tombstone. + * + * By default, if no other methods are called, the represented range is inclusive of both start and end and + * includes everything (its start is {@code BOTTOM} and it's end is {@code TOP}). + */ + public interface RangeTombstoneBuilder + { + /** + * Sets the start for the built range using the provided values. + * + * @param values the value for the start of the range. They act like the {@code clusteringValues} argument + * of the {@link PartitionUpdate.SimpleBuilder#row()} method, except that it doesn't have to be a full + * clustering, it can only be a prefix. + * @return this builder. + */ + public RangeTombstoneBuilder start(Object... values); + + /** + * Sets the end for the built range using the provided values. + * + * @param values the value for the end of the range. They act like the {@code clusteringValues} argument + * of the {@link PartitionUpdate.SimpleBuilder#row()} method, except that it doesn't have to be a full + * clustering, it can only be a prefix. + * @return this builder. + */ + public RangeTombstoneBuilder end(Object... values); + + /** + * Sets the start of this range as inclusive. + *

+ * This is the default and don't need to be called, but can for explicitness. + * + * @return this builder. + */ + public RangeTombstoneBuilder inclStart(); + + /** + * Sets the start of this range as exclusive. + * + * @return this builder. + */ + public RangeTombstoneBuilder exclStart(); + + /** + * Sets the end of this range as inclusive. + *

+ * This is the default and don't need to be called, but can for explicitness. + * + * @return this builder. + */ + public RangeTombstoneBuilder inclEnd(); + + /** + * Sets the end of this range as exclusive. + * + * @return this builder. + */ + public RangeTombstoneBuilder exclEnd(); + } + } + public static class PartitionUpdateSerializer { public void serialize(PartitionUpdate update, DataOutputPlus out, int version) throws IOException http://git-wip-us.apache.org/repos/asf/cassandra/blob/26838063/src/java/org/apache/cassandra/db/rows/Row.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/rows/Row.java b/src/java/org/apache/cassandra/db/rows/Row.java index 4fc3e22..7e6d141 100644 --- a/src/java/org/apache/cassandra/db/rows/Row.java +++ b/src/java/org/apache/cassandra/db/rows/Row.java @@ -468,6 +468,105 @@ public interface Row extends Unfiltered, Collection } /** + * Row builder interface geared towards human. + *

+ * Where the {@link Builder} deals with building rows efficiently from internal objects ({@code Cell}, {@code + * LivenessInfo}, ...), the {@code SimpleBuilder} is geared towards building rows from string column name and + * 'native' values (string for text, ints for numbers, et...). In particular, it is meant to be convenient, not + * efficient, and should be used only in place where performance is not of the utmost importance (it is used to + * build schema mutation for instance). + *

+ * Also note that contrarily to {@link Builder}, the {@code SimpleBuilder} API has no {@code newRow()} method: it is + * expected that the clustering of the row built is provided by the constructor of the builder. + */ + public interface SimpleBuilder + { + /** + * Sets the timestamp to use for the following additions. + *

+ * Note that the for non-compact tables, this method must be called before any column addition for this + * timestamp to be used for the row {@code LivenessInfo}. + * + * @param timestamp the timestamp to use for following additions. If that timestamp hasn't been set, the current + * time in microseconds will be used. + * @return this builder. + */ + public SimpleBuilder timestamp(long timestamp); + + /** + * Sets the ttl to use for the following additions. + *

+ * Note that the for non-compact tables, this method must be called before any column addition for this + * ttl to be used for the row {@code LivenessInfo}. + * + * @param ttl the ttl to use for following additions. If that ttl hasn't been set, no ttl will be used. + * @return this builder. + */ + public SimpleBuilder ttl(int ttl); + + /** + * Adds a value to a given column. + * + * @param columnName the name of the column for which to add a new value. + * @param value the value to add, which must be of the proper type for {@code columnName}. This can be {@code + * null} in which case the this is equivalent to {@code delete(columnName)}. + * @return this builder. + */ + public SimpleBuilder add(String columnName, Object value); + + /** + * Appends new values to a given non-frozen collection column. + *

+ * This method is similar to {@code add()} but the collection elements added through this method are "appended" + * to any pre-exising elements. In other words, this is like {@code add()} except that it doesn't delete the + * previous value of the collection. This can only be called on non-frozen collection columns. + *

+ * Note that this method can be used in replacement of {@code add()} if you know that there can't be any + * pre-existing value for that column, in which case this is slightly less expensive as it avoid the collection + * tombstone inherent to {@code add()}. + * + * @param columnName the name of the column for which to add a new value, which must be a non-frozen collection. + * @param value the value to add, which must be of the proper type for {@code columnName} (in other words, it + * must be a collection). + * @return this builder. + * + * @throws IllegalArgumentException if columnName is not a non-frozen collection column. + */ + public SimpleBuilder appendAll(String columnName, Object value); + + /** + * Deletes the whole row. + *

+ * If called, this is generally the only method called on the builder (outside of {@code timestamp()}. + * + * @return this builder. + */ + public SimpleBuilder delete(); + + /** + * Removes the value for a given column (creating a tombstone). + * + * @param columnName the name of the column to delete. + * @return this builder. + */ + public SimpleBuilder delete(String columnName); + + /** + * Don't include any primary key {@code LivenessInfo} in the built row. + * + * @return this builder. + */ + public SimpleBuilder noPrimaryKeyLivenessInfo(); + + /** + * Returns the built row. + * + * @return the built row. + */ + public Row build(); + } + + /** * Utility class to help merging rows from multiple inputs (UnfilteredRowIterators). */ public static class Merger http://git-wip-us.apache.org/repos/asf/cassandra/blob/26838063/src/java/org/apache/cassandra/db/rows/Rows.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/rows/Rows.java b/src/java/org/apache/cassandra/db/rows/Rows.java index e325091..976d37e 100644 --- a/src/java/org/apache/cassandra/db/rows/Rows.java +++ b/src/java/org/apache/cassandra/db/rows/Rows.java @@ -22,6 +22,7 @@ import java.util.*; import com.google.common.collect.Iterators; import com.google.common.collect.PeekingIterator; +import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.config.ColumnDefinition; import org.apache.cassandra.db.*; import org.apache.cassandra.db.partitions.PartitionStatisticsCollector; @@ -59,6 +60,21 @@ public abstract class Rows } /** + * Creates a new simple row builder. + * + * @param metadata the metadata of the table this is a row of. + * @param clusteringValues the value for the clustering columns of the row to add to this build. There may be no + * values if either the table has no clustering column, or if you want to edit the static row. Note that as a + * shortcut it is also allowed to pass a {@code Clustering} object directly, in which case that should be the + * only argument. + * @return a newly created builder. + */ + public static Row.SimpleBuilder simpleBuilder(CFMetaData metadata, Object... clusteringValues) + { + return new SimpleBuilders.RowBuilder(metadata, clusteringValues); + } + + /** * Collect statistics on a given row. * * @param row the row for which to collect stats. http://git-wip-us.apache.org/repos/asf/cassandra/blob/26838063/src/java/org/apache/cassandra/db/transform/BaseRows.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/transform/BaseRows.java b/src/java/org/apache/cassandra/db/transform/BaseRows.java index fb3b9f9..ce4e458 100644 --- a/src/java/org/apache/cassandra/db/transform/BaseRows.java +++ b/src/java/org/apache/cassandra/db/transform/BaseRows.java @@ -105,7 +105,8 @@ implements BaseRowIterator super.add(transformation); // transform any existing data - staticRow = transformation.applyToStatic(staticRow); + if (staticRow != null) + staticRow = transformation.applyToStatic(staticRow); next = applyOne(next, transformation); partitionKey = transformation.applyToPartitionKey(partitionKey); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/26838063/src/java/org/apache/cassandra/schema/LegacySchemaMigrator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/schema/LegacySchemaMigrator.java b/src/java/org/apache/cassandra/schema/LegacySchemaMigrator.java index 93591f0..7cc822f 100644 --- a/src/java/org/apache/cassandra/schema/LegacySchemaMigrator.java +++ b/src/java/org/apache/cassandra/schema/LegacySchemaMigrator.java @@ -148,20 +148,20 @@ public final class LegacySchemaMigrator { logger.info("Migrating keyspace {}", keyspace); - Mutation mutation = SchemaKeyspace.makeCreateKeyspaceMutation(keyspace.name, keyspace.params, keyspace.timestamp); + Mutation.SimpleBuilder builder = SchemaKeyspace.makeCreateKeyspaceMutation(keyspace.name, keyspace.params, keyspace.timestamp); for (Table table : keyspace.tables) - SchemaKeyspace.addTableToSchemaMutation(table.metadata, table.timestamp, true, mutation); + SchemaKeyspace.addTableToSchemaMutation(table.metadata, true, builder.timestamp(table.timestamp)); for (Type type : keyspace.types) - SchemaKeyspace.addTypeToSchemaMutation(type.metadata, type.timestamp, mutation); + SchemaKeyspace.addTypeToSchemaMutation(type.metadata, builder.timestamp(type.timestamp)); for (Function function : keyspace.functions) - SchemaKeyspace.addFunctionToSchemaMutation(function.metadata, function.timestamp, mutation); + SchemaKeyspace.addFunctionToSchemaMutation(function.metadata, builder.timestamp(function.timestamp)); for (Aggregate aggregate : keyspace.aggregates) - SchemaKeyspace.addAggregateToSchemaMutation(aggregate.metadata, aggregate.timestamp, mutation); + SchemaKeyspace.addAggregateToSchemaMutation(aggregate.metadata, builder.timestamp(aggregate.timestamp)); - mutation.apply(); + builder.build().apply(); } /*