Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id A87092009FB for ; Fri, 6 May 2016 13:47:43 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id A7140160A0C; Fri, 6 May 2016 11:47:43 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id ADB21160A11 for ; Fri, 6 May 2016 13:47:41 +0200 (CEST) Received: (qmail 4377 invoked by uid 500); 6 May 2016 11:47:40 -0000 Mailing-List: contact commits-help@cassandra.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@cassandra.apache.org Delivered-To: mailing list commits@cassandra.apache.org Received: (qmail 3974 invoked by uid 99); 6 May 2016 11:47:40 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 06 May 2016 11:47:40 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id D517AE0381; Fri, 6 May 2016 11:47:39 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: slebresne@apache.org To: commits@cassandra.apache.org Date: Fri, 06 May 2016 11:47:43 -0000 Message-Id: <8e5e777cf1e44ec9bd14433c097b69cf@git.apache.org> In-Reply-To: <4ca5881daa03456e9398d2b799c8b096@git.apache.org> References: <4ca5881daa03456e9398d2b799c8b096@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [5/9] cassandra git commit: Refactor MV code archived-at: Fri, 06 May 2016 11:47:43 -0000 http://git-wip-us.apache.org/repos/asf/cassandra/blob/86ba2274/src/java/org/apache/cassandra/db/view/View.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/view/View.java b/src/java/org/apache/cassandra/db/view/View.java index 1b823aa..845a6ab 100644 --- a/src/java/org/apache/cassandra/db/view/View.java +++ b/src/java/org/apache/cassandra/db/view/View.java @@ -32,17 +32,15 @@ import org.apache.cassandra.cql3.statements.SelectStatement; import org.apache.cassandra.db.*; import org.apache.cassandra.config.*; import org.apache.cassandra.cql3.ColumnIdentifier; -import org.apache.cassandra.db.AbstractReadCommandBuilder.SinglePartitionSliceBuilder; import org.apache.cassandra.db.compaction.CompactionManager; -import org.apache.cassandra.db.partitions.AbstractBTreePartition; -import org.apache.cassandra.db.partitions.PartitionIterator; -import org.apache.cassandra.db.partitions.PartitionUpdate; +import org.apache.cassandra.db.partitions.*; import org.apache.cassandra.db.rows.*; import org.apache.cassandra.schema.KeyspaceMetadata; import org.apache.cassandra.service.ClientState; import org.apache.cassandra.service.pager.QueryPager; import org.apache.cassandra.transport.Server; import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.btree.BTreeSet; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -50,46 +48,18 @@ import org.slf4j.LoggerFactory; * A View copies data from a base table into a view table which can be queried independently from the * base. Every update which targets the base table must be fed through the {@link ViewManager} to ensure * that if a view needs to be updated, the updates are properly created and fed into the view. - * - * This class does the job of translating the base row to the view row. - * - * It handles reading existing state and figuring out what tombstones need to be generated. - * - * {@link View#createMutations(AbstractBTreePartition, TemporalRow.Set, boolean)} is the "main method" - * */ public class View { private static final Logger logger = LoggerFactory.getLogger(View.class); - /** - * The columns should all be updated together, so we use this object as group. - */ - private static class Columns - { - //These are the base column definitions in terms of the *views* partitioning. - //Meaning we can see (for example) the partition key of the view contains a clustering key - //from the base table. - public final List partitionDefs; - public final List primaryKeyDefs; - public final List baseComplexColumns; - - private Columns(List partitionDefs, List primaryKeyDefs, List baseComplexColumns) - { - this.partitionDefs = partitionDefs; - this.primaryKeyDefs = primaryKeyDefs; - this.baseComplexColumns = baseComplexColumns; - } - } - public final String name; private volatile ViewDefinition definition; private final ColumnFamilyStore baseCfs; - private Columns columns; + public volatile List baseNonPKColumnsInViewPK; - private final boolean viewPKIncludesOnlyBasePKColumns; private final boolean includeAllColumns; private ViewBuilder builder; @@ -104,12 +74,11 @@ public class View ColumnFamilyStore baseCfs) { this.baseCfs = baseCfs; - - name = definition.viewName; - includeAllColumns = definition.includeAllColumns; - - viewPKIncludesOnlyBasePKColumns = updateDefinition(definition); + this.name = definition.viewName; + this.includeAllColumns = definition.includeAllColumns; this.rawSelect = definition.select; + + updateDefinition(definition); } public ViewDefinition getDefinition() @@ -118,513 +87,100 @@ public class View } /** - * Lookup column definitions in the base table that correspond to the view columns (should be 1:1) - * - * Notify caller if all primary keys in the view are ALL primary keys in the base. We do this to simplify - * tombstone checks. - * - * @param columns a list of columns to lookup in the base table - * @param definitions lists to populate for the base table definitions - * @return true if all view PKs are also Base PKs - */ - private boolean resolveAndAddColumns(Iterable columns, List... definitions) - { - boolean allArePrimaryKeys = true; - for (ColumnIdentifier identifier : columns) - { - ColumnDefinition cdef = baseCfs.metadata.getColumnDefinition(identifier); - assert cdef != null : "Could not resolve column " + identifier.toString(); - - for (List list : definitions) - { - list.add(cdef); - } - - allArePrimaryKeys = allArePrimaryKeys && cdef.isPrimaryKeyColumn(); - } - - return allArePrimaryKeys; - } - - /** * This updates the columns stored which are dependent on the base CFMetaData. * * @return true if the view contains only columns which are part of the base's primary key; false if there is at * least one column which is not. */ - public boolean updateDefinition(ViewDefinition definition) + public void updateDefinition(ViewDefinition definition) { this.definition = definition; CFMetaData viewCfm = definition.metadata; - List partitionDefs = new ArrayList<>(viewCfm.partitionKeyColumns().size()); - List primaryKeyDefs = new ArrayList<>(viewCfm.partitionKeyColumns().size() - + viewCfm.clusteringColumns().size()); - List baseComplexColumns = new ArrayList<>(); - - // We only add the partition columns to the partitions list, but both partition columns and clustering - // columns are added to the primary keys list - boolean partitionAllPrimaryKeyColumns = resolveAndAddColumns(Iterables.transform(viewCfm.partitionKeyColumns(), cd -> cd.name), primaryKeyDefs, partitionDefs); - boolean clusteringAllPrimaryKeyColumns = resolveAndAddColumns(Iterables.transform(viewCfm.clusteringColumns(), cd -> cd.name), primaryKeyDefs); - - for (ColumnDefinition cdef : baseCfs.metadata.allColumns()) + List nonPKDefPartOfViewPK = new ArrayList<>(); + for (ColumnDefinition baseColumn : baseCfs.metadata.allColumns()) { - if (cdef.isComplex() && definition.includes(cdef.name)) - { - baseComplexColumns.add(cdef); - } + ColumnDefinition viewColumn = getViewColumn(baseColumn); + if (viewColumn != null && !baseColumn.isPrimaryKeyColumn() && viewColumn.isPrimaryKeyColumn()) + nonPKDefPartOfViewPK.add(baseColumn); } - - this.columns = new Columns(partitionDefs, primaryKeyDefs, baseComplexColumns); - - return partitionAllPrimaryKeyColumns && clusteringAllPrimaryKeyColumns; + this.baseNonPKColumnsInViewPK = nonPKDefPartOfViewPK; } /** - * Check to see if the update could possibly modify a view. Cases where the view may be updated are: - *
    - *
  • View selects all columns
  • - *
  • Update contains any range tombstones
  • - *
  • Update touches one of the columns included in the view
  • - *
- * - * If the update contains any range tombstones, there is a possibility that it will not touch a range that is - * currently included in the view. - * - * @return true if {@param partition} modifies a column included in the view + * The view column corresponding to the provided base column. This can + * return {@code null} if the column is denormalized in the view. */ - public boolean updateAffectsView(AbstractBTreePartition partition) + public ColumnDefinition getViewColumn(ColumnDefinition baseColumn) { - ReadQuery selectQuery = getReadQuery(); - - if (!partition.metadata().cfId.equals(definition.baseTableId)) - return false; - - if (!selectQuery.selectsKey(partition.partitionKey())) - return false; - - // If there are range tombstones, tombstones will also need to be generated for the view - // This requires a query of the base rows and generating tombstones for all of those values - if (!partition.deletionInfo().isLive()) - return true; - - // Check each row for deletion or update - for (Row row : partition) - { - if (!selectQuery.selectsClustering(partition.partitionKey(), row.clustering())) - continue; - - if (includeAllColumns || !row.deletion().isLive()) - return true; - - if (row.primaryKeyLivenessInfo().isLive(FBUtilities.nowInSeconds())) - return true; - - for (ColumnData data : row) - { - if (definition.metadata.getColumnDefinition(data.column().name) != null) - return true; - } - } - - return false; - } - - /** - * Creates the clustering columns for the view based on the specified row and resolver policy - * - * @param temporalRow The current row - * @param resolver The policy to use when selecting versions of cells use - * @return The clustering object to use for the view - */ - private Clustering viewClustering(TemporalRow temporalRow, TemporalRow.Resolver resolver) - { - CFMetaData viewCfm = definition.metadata; - int numViewClustering = viewCfm.clusteringColumns().size(); - CBuilder clustering = CBuilder.create(viewCfm.comparator); - for (int i = 0; i < numViewClustering; i++) - { - ColumnDefinition definition = viewCfm.clusteringColumns().get(i); - clustering.add(temporalRow.clusteringValue(definition, resolver)); - } - - return clustering.build(); - } - - /** - * @return Mutation containing a range tombstone for a base partition key and TemporalRow. - */ - private PartitionUpdate createTombstone(TemporalRow temporalRow, - DecoratedKey partitionKey, - Row.Deletion deletion, - TemporalRow.Resolver resolver, - int nowInSec) - { - CFMetaData viewCfm = definition.metadata; - Row.Builder builder = BTreeRow.unsortedBuilder(nowInSec); - builder.newRow(viewClustering(temporalRow, resolver)); - builder.addRowDeletion(deletion); - return PartitionUpdate.singleRowUpdate(viewCfm, partitionKey, builder.build()); - } - - /** - * @return PartitionUpdate containing a complex tombstone for a TemporalRow, and the collection's column identifier. - */ - private PartitionUpdate createComplexTombstone(TemporalRow temporalRow, - DecoratedKey partitionKey, - ColumnDefinition deletedColumn, - DeletionTime deletionTime, - TemporalRow.Resolver resolver, - int nowInSec) - { - CFMetaData viewCfm = definition.metadata; - Row.Builder builder = BTreeRow.unsortedBuilder(nowInSec); - builder.newRow(viewClustering(temporalRow, resolver)); - builder.addComplexDeletion(deletedColumn, deletionTime); - return PartitionUpdate.singleRowUpdate(viewCfm, partitionKey, builder.build()); + return definition.metadata.getColumnDefinition(baseColumn.name); } /** - * @return View's DecoratedKey or null, if one of the view's primary key components has an invalid resolution from - * the TemporalRow and its Resolver + * The base column corresponding to the provided view column. This should + * never return {@code null} since a view can't have its "own" columns. */ - private DecoratedKey viewPartitionKey(TemporalRow temporalRow, TemporalRow.Resolver resolver) + public ColumnDefinition getBaseColumn(ColumnDefinition viewColumn) { - List partitionDefs = this.columns.partitionDefs; - Object[] partitionKey = new Object[partitionDefs.size()]; - - for (int i = 0; i < partitionKey.length; i++) - { - ByteBuffer value = temporalRow.clusteringValue(partitionDefs.get(i), resolver); - - if (value == null) - return null; - - partitionKey[i] = value; - } - - CFMetaData metadata = definition.metadata; - return metadata.decorateKey(CFMetaData.serializePartitionKey(metadata - .getKeyValidatorAsClusteringComparator() - .make(partitionKey))); + ColumnDefinition baseColumn = baseCfs.metadata.getColumnDefinition(viewColumn.name); + assert baseColumn != null; + return baseColumn; } /** - * @return mutation which contains the tombstone for the referenced TemporalRow, or null if not necessary. - * TemporalRow's can reference at most one view row; there will be at most one row to be tombstoned, so only one - * mutation is necessary - */ - private PartitionUpdate createRangeTombstoneForRow(TemporalRow temporalRow) - { - // Primary Key and Clustering columns do not generate tombstones - if (viewPKIncludesOnlyBasePKColumns) - return null; - - boolean hasUpdate = false; - List primaryKeyDefs = this.columns.primaryKeyDefs; - for (ColumnDefinition viewPartitionKeys : primaryKeyDefs) - { - if (!viewPartitionKeys.isPrimaryKeyColumn() && temporalRow.clusteringValue(viewPartitionKeys, TemporalRow.oldValueIfUpdated) != null) - hasUpdate = true; - } - - if (!hasUpdate) - return null; - - TemporalRow.Resolver resolver = TemporalRow.earliest; - return createTombstone(temporalRow, - viewPartitionKey(temporalRow, resolver), - Row.Deletion.shadowable(new DeletionTime(temporalRow.viewClusteringTimestamp(), temporalRow.nowInSec)), - resolver, - temporalRow.nowInSec); - } - - /** - * @return Mutation which is the transformed base table mutation for the view. - */ - private PartitionUpdate createUpdatesForInserts(TemporalRow temporalRow) - { - TemporalRow.Resolver resolver = TemporalRow.latest; - - DecoratedKey partitionKey = viewPartitionKey(temporalRow, resolver); - CFMetaData viewCfm = definition.metadata; - - if (partitionKey == null) - { - // Not having a partition key means we aren't updating anything - return null; - } - - Row.Builder regularBuilder = BTreeRow.unsortedBuilder(temporalRow.nowInSec); - - CBuilder clustering = CBuilder.create(viewCfm.comparator); - for (int i = 0; i < viewCfm.clusteringColumns().size(); i++) - { - ColumnDefinition column = viewCfm.clusteringColumns().get(i); - ByteBuffer value = temporalRow.clusteringValue(column, resolver); - - // handle single-column deletions correctly to avoid nulls in the view primary key - if (value == null) - return null; - - clustering.add(value); - } - regularBuilder.newRow(clustering.build()); - regularBuilder.addPrimaryKeyLivenessInfo(LivenessInfo.create(viewCfm, - temporalRow.viewClusteringTimestamp(), - temporalRow.viewClusteringTtl(), - temporalRow.viewClusteringLocalDeletionTime())); - - for (ColumnDefinition columnDefinition : viewCfm.allColumns()) - { - if (columnDefinition.isPrimaryKeyColumn()) - continue; - - for (Cell cell : temporalRow.values(columnDefinition, resolver)) - { - regularBuilder.addCell(cell); - } - } - - Row row = regularBuilder.build(); - - // although we check for empty rows in updateAppliesToView(), if there are any good rows in the PartitionUpdate, - // all rows in the partition will be processed, and we need to handle empty/non-live rows here (CASSANDRA-10614) - if (row.isEmpty()) - return null; - - return PartitionUpdate.singleRowUpdate(viewCfm, partitionKey, row); - } - - /** - * @param partition Update which possibly contains deletion info for which to generate view tombstones. - * @return View Tombstones which delete all of the rows which have been removed from the base table with - * {@param partition} - */ - private Collection createForDeletionInfo(TemporalRow.Set rowSet, AbstractBTreePartition partition) - { - final TemporalRow.Resolver resolver = TemporalRow.earliest; - - DeletionInfo deletionInfo = partition.deletionInfo(); - - List mutations = new ArrayList<>(); - - // Check the complex columns to see if there are any which may have tombstones we need to create for the view - if (!columns.baseComplexColumns.isEmpty()) - { - for (Row row : partition) - { - if (!row.hasComplexDeletion()) - continue; - - TemporalRow temporalRow = rowSet.getClustering(row.clustering()); - - assert temporalRow != null; - - for (ColumnDefinition definition : columns.baseComplexColumns) - { - ComplexColumnData columnData = row.getComplexColumnData(definition); - - if (columnData != null) - { - DeletionTime time = columnData.complexDeletion(); - if (!time.isLive()) - { - DecoratedKey targetKey = viewPartitionKey(temporalRow, resolver); - if (targetKey != null) - mutations.add(new Mutation(createComplexTombstone(temporalRow, targetKey, definition, time, resolver, temporalRow.nowInSec))); - } - } - } - } - } - - ReadCommand command = null; - - if (!deletionInfo.isLive()) - { - // We have to generate tombstones for all of the affected rows, but we don't have the information in order - // to create them. This requires that we perform a read for the entire range that is being tombstoned, and - // generate a tombstone for each. This may be slow, because a single range tombstone can cover up to an - // entire partition of data which is not distributed on a single partition node. - DecoratedKey dk = rowSet.dk; - - if (!deletionInfo.getPartitionDeletion().isLive()) - { - command = getSelectStatement().internalReadForView(dk, rowSet.nowInSec); - } - else - { - SinglePartitionSliceBuilder builder = new SinglePartitionSliceBuilder(baseCfs, dk); - Iterator tombstones = deletionInfo.rangeIterator(false); - while (tombstones.hasNext()) - { - RangeTombstone tombstone = tombstones.next(); - - builder.addSlice(tombstone.deletedSlice()); - } - - command = builder.build(); - } - } - - if (command == null) - { - ReadQuery selectQuery = getReadQuery(); - SinglePartitionSliceBuilder builder = null; - for (Row row : partition) - { - if (!row.deletion().isLive()) - { - if (!selectQuery.selectsClustering(rowSet.dk, row.clustering())) - continue; - - if (builder == null) - builder = new SinglePartitionSliceBuilder(baseCfs, rowSet.dk); - builder.addSlice(Slice.make(row.clustering())); - } - } - - if (builder != null) - command = builder.build(); - } - - if (command != null) - { - ReadQuery selectQuery = getReadQuery(); - assert selectQuery.selectsKey(rowSet.dk); - - // We may have already done this work for another MV update so check - if (!rowSet.hasTombstonedExisting()) - { - QueryPager pager = command.getPager(null, Server.CURRENT_VERSION); - - // Add all of the rows which were recovered from the query to the row set - while (!pager.isExhausted()) - { - try (ReadOrderGroup orderGroup = pager.startOrderGroup(); - PartitionIterator iter = pager.fetchPageInternal(128, orderGroup)) - { - if (!iter.hasNext()) - break; - - try (RowIterator rowIterator = iter.next()) - { - while (rowIterator.hasNext()) - { - Row row = rowIterator.next(); - if (selectQuery.selectsClustering(rowSet.dk, row.clustering())) - rowSet.addRow(row, false); - } - } - } - } - - //Incase we fetched nothing, avoid re checking on another MV update - rowSet.setTombstonedExisting(); - } - - // If the temporal row has been deleted by the deletion info, we generate the corresponding range tombstone - // for the view. - for (TemporalRow temporalRow : rowSet) - { - DeletionTime deletionTime = temporalRow.deletionTime(partition); - if (!deletionTime.isLive()) - { - DecoratedKey value = viewPartitionKey(temporalRow, resolver); - if (value != null) - { - PartitionUpdate update = createTombstone(temporalRow, value, Row.Deletion.regular(deletionTime), resolver, temporalRow.nowInSec); - if (update != null) - mutations.add(new Mutation(update)); - } - } - } - } - - return !mutations.isEmpty() ? mutations : null; - } - - /** - * Read and update temporal rows in the set which have corresponding values stored on the local node + * Whether the view might be affected by the provided update. + *

+ * Note that having this method return {@code true} is not an absolute guarantee that the view will be + * updated, just that it most likely will, but a {@code false} return guarantees it won't be affected). + * + * @param partitionKey the partition key that is updated. + * @param update the update being applied. + * @return {@code false} if we can guarantee that inserting {@code update} for key {@code partitionKey} + * won't affect the view in any way, {@code true} otherwise. */ - private void readLocalRows(TemporalRow.Set rowSet) + public boolean mayBeAffectedBy(DecoratedKey partitionKey, Row update) { - long start = System.currentTimeMillis(); - SinglePartitionSliceBuilder builder = new SinglePartitionSliceBuilder(baseCfs, rowSet.dk); - - for (TemporalRow temporalRow : rowSet) - builder.addSlice(temporalRow.baseSlice()); + // We can guarantee that the view won't be affected if: + // - the clustering is excluded by the view filter (note that this isn't true of the filter on regular columns: + // even if an update don't match a view condition on a regular column, that update can still invalidate an pre-existing + // entry). + // - or the update don't modify any of the columns impacting the view (where "impacting" the view means that column is + // neither included in the view, nor used by the view filter). + if (!getReadQuery().selectsClustering(partitionKey, update.clustering())) + return false; - QueryPager pager = builder.build().getPager(null, Server.CURRENT_VERSION); + // We want to find if the update modify any of the columns that are part of the view (in which case the view is affected). + // But if the view include all the base table columns, or the update has either a row deletion or a row liveness (note + // that for the liveness, it would be more "precise" to check if it's live, but pushing an update that is already expired + // is dump so it's ok not to optimize for it and it saves us from having to pass nowInSec to the method), we know the view + // is affected right away. + if (includeAllColumns || !update.deletion().isLive() || !update.primaryKeyLivenessInfo().isEmpty()) + return true; - while (!pager.isExhausted()) + for (ColumnData data : update) { - try (ReadOrderGroup orderGroup = pager.startOrderGroup(); - PartitionIterator iter = pager.fetchPageInternal(128, orderGroup)) - { - while (iter.hasNext()) - { - try (RowIterator rows = iter.next()) - { - while (rows.hasNext()) - { - rowSet.addRow(rows.next(), false); - } - } - } - } + if (definition.metadata.getColumnDefinition(data.column().name) != null) + return true; } - baseCfs.metric.viewReadTime.update(System.currentTimeMillis() - start, TimeUnit.MILLISECONDS); - } - - /** - * @return Set of rows which are contained in the partition update {@param partition} - */ - private TemporalRow.Set separateRows(AbstractBTreePartition partition, Set viewPrimaryKeyCols) - { - - TemporalRow.Set rowSet = new TemporalRow.Set(baseCfs, viewPrimaryKeyCols, partition.partitionKey().getKey()); - - for (Row row : partition) - rowSet.addRow(row, true); - - return rowSet; + return false; } /** - * Splits the partition update up and adds the existing state to each row. - * This data can be reused for multiple MV updates on the same base table + * Whether a given base row matches the view filter (and thus if is should have a corresponding entry). + *

+ * Note that this differs from {@link #mayBeAffectedBy} in that the provide row must be the current + * state of the base row, not just some updates to it. This method also has no false positive: a base + * row either do or don't match the view filter. * - * @param partition the mutation - * @param isBuilding If the view is currently being built, we do not query the values which are already stored, - * since all of the update will already be present in the base table. - * @return The set of temoral rows contained in this update + * @param partitionKey the partition key that is updated. + * @param baseRow the current state of a particular base row. + * @param nowInSec the current time in seconds (to decide what is live and what isn't). + * @return {@code true} if {@code baseRow} matches the view filters, {@code false} otherwise. */ - public TemporalRow.Set getTemporalRowSet(AbstractBTreePartition partition, TemporalRow.Set existing, boolean isBuilding) + public boolean matchesViewFilter(DecoratedKey partitionKey, Row baseRow, int nowInSec) { - if (!updateAffectsView(partition)) - return existing; - - Set columns = new HashSet<>(this.columns.primaryKeyDefs.size()); - for (ColumnDefinition def : this.columns.primaryKeyDefs) - columns.add(def.name); - - TemporalRow.Set rowSet; - if (existing == null) - { - rowSet = separateRows(partition, columns); - - // If we are building the view, we do not want to add old values; they will always be the same - if (!isBuilding) - readLocalRows(rowSet); - } - else - { - rowSet = existing.withNewViewPrimaryKey(columns); - } - - return rowSet; + return getReadQuery().selectsClustering(partitionKey, baseRow.clustering()) + && getSelectStatement().rowFilterForInternalCalls().isSatisfiedBy(baseCfs.metadata, partitionKey, baseRow, nowInSec); } /** @@ -656,61 +212,6 @@ public class View return query; } - /** - * @param isBuilding If the view is currently being built, we do not query the values which are already stored, - * since all of the update will already be present in the base table. - * @return View mutations which represent the changes necessary as long as previously created mutations for the view - * have been applied successfully. This is based solely on the changes that are necessary given the current - * state of the base table and the newly applying partition data. - */ - public Collection createMutations(AbstractBTreePartition partition, TemporalRow.Set rowSet, boolean isBuilding) - { - if (!updateAffectsView(partition)) - return null; - - ReadQuery selectQuery = getReadQuery(); - Collection mutations = null; - for (TemporalRow temporalRow : rowSet) - { - // In updateAffectsView, we check the partition to see if there is at least one row that matches the - // filters and is live. If there is more than one row in the partition, we need to re-check each one - // individually. - if (partition.rowCount() != 1 && !selectQuery.selectsClustering(partition.partitionKey(), temporalRow.baseClustering())) - continue; - - // If we are building, there is no need to check for partition tombstones; those values will not be present - // in the partition data - if (!isBuilding) - { - PartitionUpdate partitionTombstone = createRangeTombstoneForRow(temporalRow); - if (partitionTombstone != null) - { - if (mutations == null) mutations = new LinkedList<>(); - mutations.add(new Mutation(partitionTombstone)); - } - } - - PartitionUpdate insert = createUpdatesForInserts(temporalRow); - if (insert != null) - { - if (mutations == null) mutations = new LinkedList<>(); - mutations.add(new Mutation(insert)); - } - } - - if (!isBuilding) - { - Collection deletion = createForDeletionInfo(rowSet, partition); - if (deletion != null && !deletion.isEmpty()) - { - if (mutations == null) mutations = new LinkedList<>(); - mutations.addAll(deletion); - } - } - - return mutations; - } - public synchronized void build() { if (this.builder != null) http://git-wip-us.apache.org/repos/asf/cassandra/blob/86ba2274/src/java/org/apache/cassandra/db/view/ViewBuilder.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/view/ViewBuilder.java b/src/java/org/apache/cassandra/db/view/ViewBuilder.java index 35b023b..b2b409b 100644 --- a/src/java/org/apache/cassandra/db/view/ViewBuilder.java +++ b/src/java/org/apache/cassandra/db/view/ViewBuilder.java @@ -19,6 +19,7 @@ package org.apache.cassandra.db.view; import java.util.Collection; +import java.util.Collections; import java.util.UUID; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; @@ -35,9 +36,8 @@ import org.apache.cassandra.db.compaction.CompactionInfo; import org.apache.cassandra.db.compaction.CompactionManager; import org.apache.cassandra.db.compaction.OperationType; import org.apache.cassandra.db.lifecycle.SSTableSet; -import org.apache.cassandra.db.partitions.FilteredPartition; -import org.apache.cassandra.db.partitions.PartitionIterator; -import org.apache.cassandra.db.rows.RowIterator; +import org.apache.cassandra.db.partitions.*; +import org.apache.cassandra.db.rows.*; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; import org.apache.cassandra.io.sstable.ReducingKeyIterator; @@ -76,28 +76,22 @@ public class ViewBuilder extends CompactionInfo.Holder if (!selectQuery.selectsKey(key)) return; - QueryPager pager = view.getSelectStatement().internalReadForView(key, FBUtilities.nowInSeconds()).getPager(null, Server.CURRENT_VERSION); + int nowInSec = FBUtilities.nowInSeconds(); + SinglePartitionReadCommand command = view.getSelectStatement().internalReadForView(key, nowInSec); - while (!pager.isExhausted()) + // We're rebuilding everything from what's on disk, so we read everything, consider that as new updates + // and pretend that there is nothing pre-existing. + UnfilteredRowIterator empty = UnfilteredRowIterators.noRowsIterator(baseCfs.metadata, key, Rows.EMPTY_STATIC_ROW, DeletionTime.LIVE, false); + + Collection mutations; + try (ReadOrderGroup orderGroup = command.startOrderGroup(); + UnfilteredRowIterator data = UnfilteredPartitionIterators.getOnlyElement(command.executeLocally(orderGroup), command)) { - try (ReadOrderGroup orderGroup = pager.startOrderGroup(); - PartitionIterator partitionIterator = pager.fetchPageInternal(128, orderGroup)) - { - if (!partitionIterator.hasNext()) - return; - - try (RowIterator rowIterator = partitionIterator.next()) - { - FilteredPartition partition = FilteredPartition.create(rowIterator); - TemporalRow.Set temporalRows = view.getTemporalRowSet(partition, null, true); - - Collection mutations = view.createMutations(partition, temporalRows, true); - - if (mutations != null) - StorageProxy.mutateMV(key.getKey(), mutations, true, noBase); - } - } + mutations = baseCfs.keyspace.viewManager.forTable(baseCfs.metadata).generateViewUpdates(Collections.singleton(view), data, empty, nowInSec); } + + if (!mutations.isEmpty()) + StorageProxy.mutateMV(key.getKey(), mutations, true, noBase); } public void run() http://git-wip-us.apache.org/repos/asf/cassandra/blob/86ba2274/src/java/org/apache/cassandra/db/view/ViewManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/view/ViewManager.java b/src/java/org/apache/cassandra/db/view/ViewManager.java index 9fe0544..fd04b97 100644 --- a/src/java/org/apache/cassandra/db/view/ViewManager.java +++ b/src/java/org/apache/cassandra/db/view/ViewManager.java @@ -19,23 +19,21 @@ package org.apache.cassandra.db.view; import java.nio.ByteBuffer; import java.util.*; -import java.util.concurrent.ConcurrentNavigableMap; -import java.util.concurrent.ConcurrentSkipListMap; -import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.locks.Lock; -import com.google.common.collect.Lists; import com.google.common.util.concurrent.Striped; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.config.ViewDefinition; import org.apache.cassandra.db.*; -import org.apache.cassandra.db.commitlog.ReplayPosition; -import org.apache.cassandra.db.partitions.PartitionUpdate; -import org.apache.cassandra.dht.Token; -import org.apache.cassandra.service.StorageProxy; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import org.apache.cassandra.db.rows.*; +import org.apache.cassandra.db.partitions.*; + /** * Manages {@link View}'s for a single {@link ColumnFamilyStore}. All of the views for that table are created when this @@ -45,110 +43,30 @@ import org.slf4j.LoggerFactory; * any views {@link ViewManager#updatesAffectView(Collection, boolean)}, provide locks to prevent multiple * updates from creating incoherent updates in the view {@link ViewManager#acquireLockFor(ByteBuffer)}, and * to affect change on the view. + * + * TODO: I think we can get rid of that class. For addition/removal of view by names, we could move it Keyspace. And we + * not sure it's even worth keeping viewsByName as none of the related operation are performance sensitive so we could + * find the view by iterating over the CFStore.viewManager directly. + * For the lock, it could move to Keyspace too, but I don't remmenber why it has to be at the keyspace level and if it + * can be at the table level, maybe that's where it should be. */ public class ViewManager { private static final Logger logger = LoggerFactory.getLogger(ViewManager.class); - public class ForStore - { - private final ConcurrentNavigableMap viewsByName; - - public ForStore() - { - this.viewsByName = new ConcurrentSkipListMap<>(); - } - - public Iterable allViews() - { - return viewsByName.values(); - } - - public Iterable allViewsCfs() - { - List viewColumnFamilies = new ArrayList<>(); - for (View view : allViews()) - viewColumnFamilies.add(keyspace.getColumnFamilyStore(view.getDefinition().viewName)); - return viewColumnFamilies; - } - - public void forceBlockingFlush() - { - for (ColumnFamilyStore viewCfs : allViewsCfs()) - viewCfs.forceBlockingFlush(); - } - - public void dumpMemtables() - { - for (ColumnFamilyStore viewCfs : allViewsCfs()) - viewCfs.dumpMemtable(); - } - - public void truncateBlocking(long truncatedAt) - { - for (ColumnFamilyStore viewCfs : allViewsCfs()) - { - ReplayPosition replayAfter = viewCfs.discardSSTables(truncatedAt); - SystemKeyspace.saveTruncationRecord(viewCfs, truncatedAt, replayAfter); - } - } - - public void addView(View view) - { - viewsByName.put(view.name, view); - } - - public void removeView(String name) - { - viewsByName.remove(name); - } - } - private static final Striped LOCKS = Striped.lazyWeakLock(DatabaseDescriptor.getConcurrentViewWriters() * 1024); private static final boolean enableCoordinatorBatchlog = Boolean.getBoolean("cassandra.mv_enable_coordinator_batchlog"); - private final ConcurrentNavigableMap viewManagersByStore; - private final ConcurrentNavigableMap viewsByName; + private final ConcurrentMap viewsByName = new ConcurrentHashMap<>(); + private final ConcurrentMap viewsByBaseTable = new ConcurrentHashMap<>(); private final Keyspace keyspace; public ViewManager(Keyspace keyspace) { - this.viewManagersByStore = new ConcurrentSkipListMap<>(); - this.viewsByName = new ConcurrentSkipListMap<>(); this.keyspace = keyspace; } - /** - * Calculates and pushes updates to the views replicas. The replicas are determined by - * {@link ViewUtils#getViewNaturalEndpoint(String, Token, Token)}. - */ - public void pushViewReplicaUpdates(PartitionUpdate update, boolean writeCommitLog, AtomicLong baseComplete) - { - List mutations = null; - TemporalRow.Set temporalRows = null; - for (Map.Entry view : viewsByName.entrySet()) - { - // Make sure that we only get mutations from views which are affected since the set includes all views for a - // keyspace. This will prevent calling getTemporalRowSet for the wrong base table. - if (view.getValue().updateAffectsView(update)) - { - temporalRows = view.getValue().getTemporalRowSet(update, temporalRows, false); - - Collection viewMutations = view.getValue().createMutations(update, temporalRows, false); - if (viewMutations != null && !viewMutations.isEmpty()) - { - if (mutations == null) - mutations = Lists.newLinkedList(); - mutations.addAll(viewMutations); - } - } - } - - if (mutations != null) - StorageProxy.mutateMV(update.partitionKey().getKey(), mutations, writeCommitLog, baseComplete); - } - public boolean updatesAffectView(Collection mutations, boolean coordinatorBatchlog) { if (coordinatorBatchlog && !enableCoordinatorBatchlog) @@ -156,25 +74,22 @@ public class ViewManager for (IMutation mutation : mutations) { - for (PartitionUpdate cf : mutation.getPartitionUpdates()) + for (PartitionUpdate update : mutation.getPartitionUpdates()) { - assert keyspace.getName().equals(cf.metadata().ksName); + assert keyspace.getName().equals(update.metadata().ksName); if (coordinatorBatchlog && keyspace.getReplicationStrategy().getReplicationFactor() == 1) continue; - for (View view : allViews()) - { - if (view.updateAffectsView(cf)) - return true; - } + if (!forTable(update.metadata()).updatedViews(update).isEmpty()) + return true; } } return false; } - public Iterable allViews() + private Iterable allViews() { return viewsByName.values(); } @@ -222,7 +137,7 @@ public class ViewManager public void addView(ViewDefinition definition) { View view = new View(definition, keyspace.getColumnFamilyStore(definition.baseTableId)); - forTable(view.getDefinition().baseTableId).addView(view); + forTable(view.getDefinition().baseTableMetadata()).add(view); viewsByName.put(definition.viewName, view); } @@ -233,7 +148,7 @@ public class ViewManager if (view == null) return; - forTable(view.getDefinition().baseTableId).removeView(name); + forTable(view.getDefinition().baseTableMetadata()).removeByName(name); SystemKeyspace.setViewRemoved(keyspace.getName(), view.name); } @@ -243,17 +158,18 @@ public class ViewManager view.build(); } - public ForStore forTable(UUID baseId) + public TableViews forTable(CFMetaData metadata) { - ForStore forStore = viewManagersByStore.get(baseId); - if (forStore == null) + UUID baseId = metadata.cfId; + TableViews views = viewsByBaseTable.get(baseId); + if (views == null) { - forStore = new ForStore(); - ForStore previous = viewManagersByStore.put(baseId, forStore); + views = new TableViews(metadata); + TableViews previous = viewsByBaseTable.putIfAbsent(baseId, views); if (previous != null) - forStore = previous; + views = previous; } - return forStore; + return views; } public static Lock acquireLockFor(ByteBuffer key) http://git-wip-us.apache.org/repos/asf/cassandra/blob/86ba2274/src/java/org/apache/cassandra/db/view/ViewUpdateGenerator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/view/ViewUpdateGenerator.java b/src/java/org/apache/cassandra/db/view/ViewUpdateGenerator.java new file mode 100644 index 0000000..af025cb --- /dev/null +++ b/src/java/org/apache/cassandra/db/view/ViewUpdateGenerator.java @@ -0,0 +1,549 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db.view; + +import java.nio.ByteBuffer; +import java.util.*; + +import com.google.common.collect.Iterators; +import com.google.common.collect.PeekingIterator; + +import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.config.ColumnDefinition; +import org.apache.cassandra.config.ViewDefinition; +import org.apache.cassandra.db.*; +import org.apache.cassandra.db.rows.*; +import org.apache.cassandra.db.partitions.*; +import org.apache.cassandra.db.marshal.AbstractType; +import org.apache.cassandra.db.marshal.CompositeType; + +/** + * Creates the updates to apply to a view given the existing rows in the base + * table and the updates that we're applying to them (this handles updates + * on a single partition only). + * + * This class is used by passing the updates made to the base table to + * {@link #addBaseTableUpdate} and calling {@link #generateViewUpdates} once all updates have + * been handled to get the resulting view mutations. + */ +public class ViewUpdateGenerator +{ + private final View view; + private final int nowInSec; + + private final CFMetaData baseMetadata; + private final DecoratedKey baseDecoratedKey; + private final ByteBuffer[] basePartitionKey; + + private final CFMetaData viewMetadata; + + private final Map updates = new HashMap<>(); + + // Reused internally to build a new entry + private final ByteBuffer[] currentViewEntryPartitionKey; + private final Row.Builder currentViewEntryBuilder; + + /** + * The type of type update action to perform to the view for a given base table + * update. + */ + private enum UpdateAction + { + NONE, // There was no view entry and none should be added + NEW_ENTRY, // There was no entry but there is one post-update + DELETE_OLD, // There was an entry but there is nothing after update + UPDATE_EXISTING, // There was an entry and the update modifies it + SWITCH_ENTRY // There was an entry and there is still one after update, + // but they are not the same one. + }; + + /** + * Creates a new {@code ViewUpdateBuilder}. + * + * @param view the view for which this will be building updates for. + * @param basePartitionKey the partition key for the base table partition for which + * we'll handle updates for. + * @param nowInSec the current time in seconds. Used to decide if data are live or not + * and as base reference for new deletions. + */ + public ViewUpdateGenerator(View view, DecoratedKey basePartitionKey, int nowInSec) + { + this.view = view; + this.nowInSec = nowInSec; + + this.baseMetadata = view.getDefinition().baseTableMetadata(); + this.baseDecoratedKey = basePartitionKey; + this.basePartitionKey = extractKeyComponents(basePartitionKey, baseMetadata.getKeyValidator()); + + this.viewMetadata = view.getDefinition().metadata; + + this.currentViewEntryPartitionKey = new ByteBuffer[viewMetadata.partitionKeyColumns().size()]; + this.currentViewEntryBuilder = BTreeRow.sortedBuilder(); + } + + private static ByteBuffer[] extractKeyComponents(DecoratedKey partitionKey, AbstractType type) + { + return type instanceof CompositeType + ? ((CompositeType)type).split(partitionKey.getKey()) + : new ByteBuffer[]{ partitionKey.getKey() }; + } + + /** + * Adds to this generator the updates to be made to the view given a base table row + * before and after an update. + * + * @param existingBaseRow the base table row as it is before an update. + * @param mergedBaseRow the base table row after the update is applied (note that + * this is not just the new update, but rather the resulting row). + */ + public void addBaseTableUpdate(Row existingBaseRow, Row mergedBaseRow) + { + switch (updateAction(existingBaseRow, mergedBaseRow)) + { + case NONE: + return; + case NEW_ENTRY: + createEntry(mergedBaseRow); + return; + case DELETE_OLD: + deleteOldEntry(existingBaseRow); + return; + case UPDATE_EXISTING: + updateEntry(existingBaseRow, mergedBaseRow); + return; + case SWITCH_ENTRY: + createEntry(mergedBaseRow); + deleteOldEntry(existingBaseRow); + return; + } + } + + /** + * Returns the updates that needs to be done to the view given the base table updates + * passed to {@link #generateViewMutations}. + * + * @return the updates to do to the view. + */ + public Collection generateViewUpdates() + { + return updates.values(); + } + + /** + * Compute which type of action needs to be performed to the view for a base table row + * before and after an update. + */ + private UpdateAction updateAction(Row existingBaseRow, Row mergedBaseRow) + { + // Having existing empty is useful, it just means we'll insert a brand new entry for mergedBaseRow, + // but if we have no update at all, we shouldn't get there. + assert !mergedBaseRow.isEmpty(); + + // Note that none of the base PK columns will differ since we're intrinsically dealing + // with the same base row. So we have to check 3 things: + // 1) that the clustering doesn't have a null, which can happen for compact tables. If that's the case, + // there is no corresponding entries. + // 2) if there is a column not part of the base PK in the view PK, whether it is changed by the update. + // 3) whether mergedBaseRow actually match the view SELECT filter + + if (baseMetadata.isCompactTable()) + { + Clustering clustering = mergedBaseRow.clustering(); + for (int i = 0; i < clustering.size(); i++) + { + if (clustering.get(i) == null) + return UpdateAction.NONE; + } + } + + assert view.baseNonPKColumnsInViewPK.size() <= 1 : "We currently only support one base non-PK column in the view PK"; + if (view.baseNonPKColumnsInViewPK.isEmpty()) + { + // The view entry is necessarily the same pre and post update. + + // Note that we allow existingBaseRow to be null and treat it as empty (see MultiViewUpdateBuilder.generateViewsMutations). + boolean existingHasLiveData = existingBaseRow != null && existingBaseRow.hasLiveData(nowInSec); + boolean mergedHasLiveData = mergedBaseRow.hasLiveData(nowInSec); + return existingHasLiveData + ? (mergedHasLiveData ? UpdateAction.UPDATE_EXISTING : UpdateAction.DELETE_OLD) + : (mergedHasLiveData ? UpdateAction.NEW_ENTRY : UpdateAction.NONE); + } + + ColumnDefinition baseColumn = view.baseNonPKColumnsInViewPK.get(0); + assert !baseColumn.isComplex() : "A complex column couldn't be part of the view PK"; + Cell before = existingBaseRow == null ? null : existingBaseRow.getCell(baseColumn); + Cell after = mergedBaseRow.getCell(baseColumn); + + // If the update didn't modified this column, the cells will be the same object so it's worth checking + if (before == after) + return before == null ? UpdateAction.NONE : UpdateAction.UPDATE_EXISTING; + + if (!isLive(before)) + return isLive(after) ? UpdateAction.NEW_ENTRY : UpdateAction.NONE; + if (!isLive(after)) + return UpdateAction.DELETE_OLD; + + return baseColumn.cellValueType().compare(before.value(), after.value()) == 0 + ? UpdateAction.UPDATE_EXISTING + : UpdateAction.SWITCH_ENTRY; + } + + private boolean matchesViewFilter(Row baseRow) + { + return view.matchesViewFilter(baseDecoratedKey, baseRow, nowInSec); + } + + private boolean isLive(Cell cell) + { + return cell != null && cell.isLive(nowInSec); + } + + /** + * Creates a view entry corresponding to the provided base row. + *

+ * This method checks that the base row does match the view filter before applying it. + */ + private void createEntry(Row baseRow) + { + // Before create a new entry, make sure it matches the view filter + if (!matchesViewFilter(baseRow)) + return; + + startNewUpdate(baseRow); + currentViewEntryBuilder.addPrimaryKeyLivenessInfo(computeLivenessInfoForEntry(baseRow)); + currentViewEntryBuilder.addRowDeletion(baseRow.deletion()); + + for (ColumnData data : baseRow) + { + ColumnDefinition viewColumn = view.getViewColumn(data.column()); + // If that base table column is not denormalized in the view, we had nothing to do. + // Alose, if it's part of the view PK it's already been taken into account in the clustering. + if (viewColumn == null || viewColumn.isPrimaryKeyColumn()) + continue; + + addColumnData(viewColumn, data); + } + + submitUpdate(); + } + + /** + * Creates the updates to apply to the existing view entry given the base table row before + * and after the update, assuming that the update hasn't changed to which view entry the + * row correspond (that is, we know the columns composing the view PK haven't changed). + *

+ * This method checks that the base row (before and after) does match the view filter before + * applying anything. + */ + private void updateEntry(Row existingBaseRow, Row mergedBaseRow) + { + // While we know existingBaseRow and mergedBaseRow are corresponding to the same view entry, + // they may not match the view filter. + if (!matchesViewFilter(existingBaseRow)) + { + createEntry(mergedBaseRow); + return; + } + if (!matchesViewFilter(mergedBaseRow)) + { + deleteOldEntryInternal(existingBaseRow); + return; + } + + startNewUpdate(mergedBaseRow); + + // In theory, it may be the PK liveness and row deletion hasn't been change by the update + // and we could condition the 2 additions below. In practice though, it's as fast (if not + // faster) to compute those info than to check if they have changed so we keep it simple. + currentViewEntryBuilder.addPrimaryKeyLivenessInfo(computeLivenessInfoForEntry(mergedBaseRow)); + currentViewEntryBuilder.addRowDeletion(mergedBaseRow.deletion()); + + // We only add to the view update the cells from mergedBaseRow that differs from + // existingBaseRow. For that and for speed we can just cell pointer equality: if the update + // hasn't touched a cell, we know it will be the same object in existingBaseRow and + // mergedBaseRow (note that including more cells than we strictly should isn't a problem + // for correction, so even if the code change and pointer equality don't work anymore, it'll + // only a slightly inefficiency which we can fix then). + // Note: we could alternatively use Rows.diff() for this, but because it is a bit more generic + // than what we need here, it's also a bit less efficient (it allocates more in particular), + // and this might be called a lot of time for view updates. So, given that this is not a whole + // lot of code anyway, it's probably doing the diff manually. + PeekingIterator existingIter = Iterators.peekingIterator(existingBaseRow.iterator()); + for (ColumnData mergedData : mergedBaseRow) + { + ColumnDefinition baseColumn = mergedData.column(); + ColumnDefinition viewColumn = view.getViewColumn(baseColumn); + // If that base table column is not denormalized in the view, we had nothing to do. + // Alose, if it's part of the view PK it's already been taken into account in the clustering. + if (viewColumn == null || viewColumn.isPrimaryKeyColumn()) + continue; + + ColumnData existingData = null; + // Find if there is data for that column in the existing row + while (existingIter.hasNext()) + { + int cmp = baseColumn.compareTo(existingIter.peek().column()); + if (cmp < 0) + break; + + ColumnData next = existingIter.next(); + if (cmp == 0) + { + existingData = next; + break; + } + } + + if (existingData == null) + { + addColumnData(viewColumn, mergedData); + continue; + } + + if (mergedData == existingData) + continue; + + if (baseColumn.isComplex()) + { + ComplexColumnData mergedComplexData = (ComplexColumnData)mergedData; + ComplexColumnData existingComplexData = (ComplexColumnData)existingData; + if (mergedComplexData.complexDeletion().supersedes(existingComplexData.complexDeletion())) + currentViewEntryBuilder.addComplexDeletion(viewColumn, mergedComplexData.complexDeletion()); + + PeekingIterator existingCells = Iterators.peekingIterator(existingComplexData.iterator()); + for (Cell mergedCell : mergedComplexData) + { + Cell existingCell = null; + // Find if there is corresponding cell in the existing row + while (existingCells.hasNext()) + { + int cmp = baseColumn.cellPathComparator().compare(mergedCell.path(), existingCells.peek().path()); + if (cmp > 0) + break; + + Cell next = existingCells.next(); + if (cmp == 0) + { + existingCell = next; + break; + } + } + + if (mergedCell != existingCell) + addCell(viewColumn, mergedCell); + } + } + else + { + // Note that we've already eliminated the case where merged == existing + addCell(viewColumn, (Cell)mergedData); + } + } + + submitUpdate(); + } + + /** + * Deletes the view entry corresponding to the provided base row. + *

+ * This method checks that the base row does match the view filter before bothering. + */ + private void deleteOldEntry(Row existingBaseRow) + { + // Before deleting an old entry, make sure it was matching the view filter (otherwise there is nothing to delete) + if (!matchesViewFilter(existingBaseRow)) + return; + + deleteOldEntryInternal(existingBaseRow); + } + + private void deleteOldEntryInternal(Row existingBaseRow) + { + startNewUpdate(existingBaseRow); + DeletionTime dt = new DeletionTime(computeTimestampForEntryDeletion(existingBaseRow), nowInSec); + currentViewEntryBuilder.addRowDeletion(Row.Deletion.shadowable(dt)); + submitUpdate(); + } + + /** + * Computes the partition key and clustering for a new view entry, and setup the internal + * row builder for the new row. + * + * This assumes that there is corresponding entry, i.e. no values for the partition key and + * clustering are null (since we have eliminated that case through updateAction). + */ + private void startNewUpdate(Row baseRow) + { + ByteBuffer[] clusteringValues = new ByteBuffer[viewMetadata.clusteringColumns().size()]; + for (ColumnDefinition viewColumn : viewMetadata.primaryKeyColumns()) + { + ColumnDefinition baseColumn = view.getBaseColumn(viewColumn); + ByteBuffer value = getValueForPK(baseColumn, baseRow); + if (viewColumn.isPartitionKey()) + currentViewEntryPartitionKey[viewColumn.position()] = value; + else + clusteringValues[viewColumn.position()] = value; + } + + currentViewEntryBuilder.newRow(new Clustering(clusteringValues)); + } + + private LivenessInfo computeLivenessInfoForEntry(Row baseRow) + { + /* + * We need to compute both the timestamp and expiration. + * + * For the timestamp, it makes sense to use the bigger timestamp for all view PK columns. + * + * This is more complex for the expiration. We want to maintain consistency between the base and the view, so the + * entry should only exist as long as the base row exists _and_ has non-null values for all the columns that are part + * of the view PK. + * Which means we really have 2 cases: + * 1) either the columns for the base and view PKs are exactly the same: in that case, the view entry should live + * as long as the base row lives. This means the view entry should only expire once *everything* in the base row + * has expired. Which means the row TTL should be the max of any other TTL. + * 2) or there is a column that is not in the base PK but is in the view PK (we can only have one so far, we'll need + * to slightly adapt if we allow more later): in that case, as long as that column lives the entry does too, but + * as soon as it expires (or is deleted for that matter) the entry also should expire. So the expiration for the + * view is the one of that column, irregarding of any other expiration. + * To take an example of that case, if you have: + * CREATE TABLE t (a int, b int, c int, PRIMARY KEY (a, b)) + * CREATE MATERIALIZED VIEW mv AS SELECT * FROM t WHERE c IS NOT NULL AND a IS NOT NULL AND b IS NOT NULL PRIMARY KEY (c, a, b) + * INSERT INTO t(a, b) VALUES (0, 0) USING TTL 3; + * UPDATE t SET c = 0 WHERE a = 0 AND b = 0; + * then even after 3 seconds elapsed, the row will still exist (it just won't have a "row marker" anymore) and so + * the MV should still have a corresponding entry. + */ + assert view.baseNonPKColumnsInViewPK.size() <= 1; // This may change, but is currently an enforced limitation + + LivenessInfo baseLiveness = baseRow.primaryKeyLivenessInfo(); + + if (view.baseNonPKColumnsInViewPK.isEmpty()) + { + int ttl = baseLiveness.ttl(); + int expirationTime = baseLiveness.localExpirationTime(); + for (Cell cell : baseRow.cells()) + { + if (cell.ttl() > ttl) + { + ttl = cell.ttl(); + expirationTime = cell.localDeletionTime(); + } + } + return ttl == baseLiveness.ttl() + ? baseLiveness + : LivenessInfo.create(baseLiveness.timestamp(), ttl, expirationTime); + } + + ColumnDefinition baseColumn = view.baseNonPKColumnsInViewPK.get(0); + Cell cell = baseRow.getCell(baseColumn); + assert isLive(cell) : "We shouldn't have got there is the base row had no associated entry"; + + long timestamp = Math.max(baseLiveness.timestamp(), cell.timestamp()); + return LivenessInfo.create(timestamp, cell.ttl(), cell.localDeletionTime()); + } + + private long computeTimestampForEntryDeletion(Row baseRow) + { + // We delete the old row with it's row entry timestamp using a shadowable deletion. + // We must make sure that the deletion deletes everything in the entry (or the entry will + // still show up), so we must use the bigger timestamp found in the existing row (for any + // column included in the view at least). + // TODO: We have a problem though: if the entry is "resurected" by a later update, we would + // need to ensure that the timestamp for then entry then is bigger than the tombstone + // we're just inserting, which is not currently guaranteed. + // This is a bug for a separate ticket though. + long timestamp = baseRow.primaryKeyLivenessInfo().timestamp(); + for (ColumnData data : baseRow) + { + if (!view.getDefinition().includes(data.column().name)) + continue; + + timestamp = Math.max(timestamp, data.maxTimestamp()); + } + return timestamp; + } + + private void addColumnData(ColumnDefinition viewColumn, ColumnData baseTableData) + { + assert viewColumn.isComplex() == baseTableData.column().isComplex(); + if (!viewColumn.isComplex()) + { + addCell(viewColumn, (Cell)baseTableData); + return; + } + + ComplexColumnData complexData = (ComplexColumnData)baseTableData; + currentViewEntryBuilder.addComplexDeletion(viewColumn, complexData.complexDeletion()); + for (Cell cell : complexData) + addCell(viewColumn, cell); + } + + private void addCell(ColumnDefinition viewColumn, Cell baseTableCell) + { + assert !viewColumn.isPrimaryKeyColumn(); + currentViewEntryBuilder.addCell(baseTableCell.withUpdatedColumn(viewColumn)); + } + + /** + * Finish building the currently updated view entry and add it to the other built + * updates. + */ + private void submitUpdate() + { + Row row = currentViewEntryBuilder.build(); + // I'm not sure we can reach there is there is nothing is updated, but adding an empty row breaks things + // and it costs us nothing to be prudent here. + if (row.isEmpty()) + return; + + DecoratedKey partitionKey = makeCurrentPartitionKey(); + PartitionUpdate update = updates.get(partitionKey); + if (update == null) + { + // We can't really know which columns of the view will be updated nor how many row will be updated for this key + // so we rely on hopefully sane defaults. + update = new PartitionUpdate(viewMetadata, partitionKey, viewMetadata.partitionColumns(), 4); + updates.put(partitionKey, update); + } + update.add(row); + } + + private DecoratedKey makeCurrentPartitionKey() + { + ByteBuffer rawKey = viewMetadata.partitionKeyColumns().size() == 1 + ? currentViewEntryPartitionKey[0] + : CompositeType.build(currentViewEntryPartitionKey); + + return viewMetadata.decorateKey(rawKey); + } + + private ByteBuffer getValueForPK(ColumnDefinition column, Row row) + { + switch (column.kind) + { + case PARTITION_KEY: + return basePartitionKey[column.position()]; + case CLUSTERING: + return row.clustering().get(column.position()); + default: + // This shouldn't NPE as we shouldn't get there if the value can be null (or there is a bug in updateAction()) + return row.getCell(column).value(); + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/86ba2274/test/unit/org/apache/cassandra/cql3/ViewTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/cql3/ViewTest.java b/test/unit/org/apache/cassandra/cql3/ViewTest.java index ac4becb..4a1dc07 100644 --- a/test/unit/org/apache/cassandra/cql3/ViewTest.java +++ b/test/unit/org/apache/cassandra/cql3/ViewTest.java @@ -98,7 +98,7 @@ public class ViewTest extends CQLTester @Test public void testPartitionTombstone() throws Throwable { - createTable("CREATE TABLE %s (k1 int, c1 int , val int, PRIMARY KEY (k1))"); + createTable("CREATE TABLE %s (k1 int, c1 int , val int, PRIMARY KEY (k1, c1))"); execute("USE " + keyspace()); executeNet(protocolVersion, "USE " + keyspace()); @@ -108,8 +108,8 @@ public class ViewTest extends CQLTester updateView("INSERT INTO %s (k1, c1, val) VALUES (1, 2, 200)"); updateView("INSERT INTO %s (k1, c1, val) VALUES (1, 3, 300)"); - Assert.assertEquals(1, execute("select * from %s").size()); - Assert.assertEquals(1, execute("select * from view1").size()); + Assert.assertEquals(2, execute("select * from %s").size()); + Assert.assertEquals(2, execute("select * from view1").size()); updateView("DELETE FROM %s WHERE k1 = 1"); @@ -814,18 +814,58 @@ public class ViewTest extends CQLTester createView("mv", "CREATE MATERIALIZED VIEW %s AS SELECT * FROM %%s WHERE c IS NOT NULL AND a IS NOT NULL AND b IS NOT NULL PRIMARY KEY (c, a, b)"); - updateView("INSERT INTO %s (a, b, c, d) VALUES (?, ?, ?, ?) USING TTL 5", 1, 1, 1, 1); + updateView("INSERT INTO %s (a, b, c, d) VALUES (?, ?, ?, ?) USING TTL 3", 1, 1, 1, 1); - Thread.sleep(TimeUnit.SECONDS.toMillis(3)); + Thread.sleep(TimeUnit.SECONDS.toMillis(1)); updateView("INSERT INTO %s (a, b, c) VALUES (?, ?, ?)", 1, 1, 2); - Thread.sleep(TimeUnit.SECONDS.toMillis(3)); + Thread.sleep(TimeUnit.SECONDS.toMillis(5)); List results = executeNet(protocolVersion, "SELECT d FROM mv WHERE c = 2 AND a = 1 AND b = 1").all(); Assert.assertEquals(1, results.size()); Assert.assertTrue("There should be a null result given back due to ttl expiry", results.get(0).isNull(0)); } @Test + public void ttlExpirationTest() throws Throwable + { + createTable("CREATE TABLE %s (" + + "a int," + + "b int," + + "c int," + + "d int," + + "PRIMARY KEY (a, b))"); + + executeNet(protocolVersion, "USE " + keyspace()); + + createView("mv", "CREATE MATERIALIZED VIEW %s AS SELECT * FROM %%s WHERE c IS NOT NULL AND a IS NOT NULL AND b IS NOT NULL PRIMARY KEY (c, a, b)"); + + updateView("INSERT INTO %s (a, b, c, d) VALUES (?, ?, ?, ?) USING TTL 3", 1, 1, 1, 1); + + Thread.sleep(TimeUnit.SECONDS.toMillis(4)); + Assert.assertEquals(0, executeNet(protocolVersion, "SELECT * FROM mv WHERE c = 1 AND a = 1 AND b = 1").all().size()); + } + + @Test + public void rowDeletionTest() throws Throwable + { + createTable("CREATE TABLE %s (" + + "a int," + + "b int," + + "c int," + + "d int," + + "PRIMARY KEY (a, b))"); + + executeNet(protocolVersion, "USE " + keyspace()); + + createView("mv", "CREATE MATERIALIZED VIEW %s AS SELECT * FROM %%s WHERE c IS NOT NULL AND a IS NOT NULL AND b IS NOT NULL PRIMARY KEY (c, a, b)"); + + String table = keyspace() + "." + currentTable(); + updateView("DELETE FROM " + table + " USING TIMESTAMP 6 WHERE a = 1 AND b = 1;"); + updateView("INSERT INTO %s (a, b, c, d) VALUES (?, ?, ?, ?) USING TIMESTAMP 3", 1, 1, 1, 1); + Assert.assertEquals(0, executeNet(protocolVersion, "SELECT * FROM mv WHERE c = 1 AND a = 1 AND b = 1").all().size()); + } + + @Test public void conflictingTimestampTest() throws Throwable { createTable("CREATE TABLE %s (" + http://git-wip-us.apache.org/repos/asf/cassandra/blob/86ba2274/test/unit/org/apache/cassandra/db/rows/RowsTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/rows/RowsTest.java b/test/unit/org/apache/cassandra/db/rows/RowsTest.java index dede867..b47bea2 100644 --- a/test/unit/org/apache/cassandra/db/rows/RowsTest.java +++ b/test/unit/org/apache/cassandra/db/rows/RowsTest.java @@ -239,7 +239,8 @@ public class RowsTest BufferCell.live(kcvm, m, secondToTs(now), BB1, CellPath.create(BB1)), BufferCell.live(kcvm, m, secondToTs(now), BB2, CellPath.create(BB2))); expectedCells.forEach(originalBuilder::addCell); - Row.Deletion rowDeletion = new Row.Deletion(new DeletionTime(ts, now), false); + // We need to use ts-1 so the deletion doesn't shadow what we've created + Row.Deletion rowDeletion = new Row.Deletion(new DeletionTime(ts-1, now), false); originalBuilder.addRowDeletion(rowDeletion); RowBuilder builder = new RowBuilder(); @@ -267,7 +268,8 @@ public class RowsTest BufferCell.live(kcvm, m, ts, BB1, CellPath.create(BB1)), BufferCell.live(kcvm, m, ts, BB2, CellPath.create(BB2))); expectedCells.forEach(builder::addCell); - Row.Deletion rowDeletion = new Row.Deletion(new DeletionTime(ts, now), false); + // We need to use ts-1 so the deletion doesn't shadow what we've created + Row.Deletion rowDeletion = new Row.Deletion(new DeletionTime(ts-1, now), false); builder.addRowDeletion(rowDeletion); StatsCollector collector = new StatsCollector();