cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [2/3] cassandra git commit: Reuse TemporalRow when updating multiple MaterializedViews
Date Mon, 24 Aug 2015 17:50:35 GMT
Reuse TemporalRow when updating multiple MaterializedViews

Patch by tjake; rewiewed by carl for CASSANDRA-10060


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/1fc31216
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/1fc31216
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/1fc31216

Branch: refs/heads/trunk
Commit: 1fc31216cbf2fc4b2fa687cca20c59eb3c4246e3
Parents: a3d9e6c
Author: T Jake Luciani <jake@apache.org>
Authored: Wed Aug 12 15:56:25 2015 -0400
Committer: T Jake Luciani <jake@apache.org>
Committed: Mon Aug 24 13:49:11 2015 -0400

----------------------------------------------------------------------
 CHANGES.txt                                     |  2 +-
 src/java/org/apache/cassandra/db/Keyspace.java  |  2 +-
 .../cassandra/db/view/MaterializedView.java     | 86 ++++++++++++++------
 .../db/view/MaterializedViewBuilder.java        |  5 +-
 .../db/view/MaterializedViewManager.java        | 10 ++-
 .../apache/cassandra/db/view/TemporalRow.java   | 58 ++++++++++++-
 .../apache/cassandra/service/StorageProxy.java  |  6 +-
 7 files changed, 133 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/1fc31216/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index c866905..9cfbd64 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,8 +1,8 @@
 3.0.0-beta2
+ * Reuse TemporalRow when updating multiple MaterializedViews (CASSANDRA-10060)
  * Validate gc_grace_seconds for batchlog writes and MVs (CASSANDRA-9917)
  * Fix sstablerepairedset (CASSANDRA-10132)
 
-
 3.0.0-beta1
  * Redesign secondary index API (CASSANDRA-9459, 7771, 9041)
  * Fix throwing ReadFailure instead of ReadTimeout on range queries (CASSANDRA-10125)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1fc31216/src/java/org/apache/cassandra/db/Keyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Keyspace.java b/src/java/org/apache/cassandra/db/Keyspace.java
index 199cd25..f5a047f 100644
--- a/src/java/org/apache/cassandra/db/Keyspace.java
+++ b/src/java/org/apache/cassandra/db/Keyspace.java
@@ -457,7 +457,7 @@ public class Keyspace
                     try
                     {
                         Tracing.trace("Create materialized view mutations from replica");
-                        cfs.materializedViewManager.pushViewReplicaUpdates(upd.partitionKey().getKey(),
upd);
+                        cfs.materializedViewManager.pushViewReplicaUpdates(upd);
                     }
                     catch (Exception e)
                     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1fc31216/src/java/org/apache/cassandra/db/view/MaterializedView.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/view/MaterializedView.java b/src/java/org/apache/cassandra/db/view/MaterializedView.java
index 7337e4b..0f6cf06 100644
--- a/src/java/org/apache/cassandra/db/view/MaterializedView.java
+++ b/src/java/org/apache/cassandra/db/view/MaterializedView.java
@@ -481,26 +481,36 @@ public class MaterializedView
 
         if (command != null)
         {
-            QueryPager pager = command.getPager(null);
 
-            // Add all of the rows which were recovered from the query to the row set
-            while (!pager.isExhausted())
+            //We may have already done this work for
+            //another MV update so check
+
+            if (!rowSet.hasTombstonedExisting())
             {
-                try (ReadOrderGroup orderGroup = pager.startOrderGroup();
-                     PartitionIterator iter = pager.fetchPageInternal(128, orderGroup))
-                {
-                    if (!iter.hasNext())
-                        break;
+                QueryPager pager = command.getPager(null);
 
-                    try (RowIterator rowIterator = iter.next())
+                // Add all of the rows which were recovered from the query to the row set
+                while (!pager.isExhausted())
+                {
+                    try (ReadOrderGroup orderGroup = pager.startOrderGroup();
+                         PartitionIterator iter = pager.fetchPageInternal(128, orderGroup))
                     {
-                        while (rowIterator.hasNext())
+                        if (!iter.hasNext())
+                            break;
+
+                        try (RowIterator rowIterator = iter.next())
                         {
-                            Row row = rowIterator.next();
-                            rowSet.addRow(row, false);
+                            while (rowIterator.hasNext())
+                            {
+                                Row row = rowIterator.next();
+                                rowSet.addRow(row, false);
+                            }
                         }
                     }
                 }
+
+                //Incase we fetched nothing, avoid re checking on another MV update
+                rowSet.setTombstonedExisting();
             }
 
             // If the temporal row has been deleted by the deletion info, we generate the
corresponding range tombstone
@@ -558,13 +568,11 @@ public class MaterializedView
     /**
      * @return Set of rows which are contained in the partition update {@param partition}
      */
-    private TemporalRow.Set separateRows(ByteBuffer key, AbstractBTreePartition partition)
+    private TemporalRow.Set separateRows(AbstractBTreePartition partition, Set<ColumnIdentifier>
viewPrimaryKeyCols)
     {
-        Set<ColumnIdentifier> columns = new HashSet<>();
-        for (ColumnDefinition def : this.columns.primaryKeyDefs)
-            columns.add(def.name);
 
-        TemporalRow.Set rowSet = new TemporalRow.Set(baseCfs, columns, key);
+        TemporalRow.Set rowSet = new TemporalRow.Set(baseCfs, viewPrimaryKeyCols, partition.partitionKey().getKey());
+
         for (Row row : partition)
             rowSet.addRow(row, true);
 
@@ -572,23 +580,53 @@ public class MaterializedView
     }
 
     /**
+     * Splits the partition update up and adds the existing state to each row.
+     * This data can be reused for multiple MV updates on the same base table
+     *
+     * @param partition the mutation
+     * @param isBuilding If the view is currently being built, we do not query the values
which are already stored,
+     *                   since all of the update will already be present in the base table.
+     * @return The set of temoral rows contained in this update
+     */
+    public TemporalRow.Set getTemporalRowSet(AbstractBTreePartition partition, TemporalRow.Set
existing, boolean isBuilding)
+    {
+        if (!updateAffectsView(partition))
+            return null;
+
+        Set<ColumnIdentifier> columns = new HashSet<>(this.columns.primaryKeyDefs.size());
+        for (ColumnDefinition def : this.columns.primaryKeyDefs)
+            columns.add(def.name);
+
+        TemporalRow.Set rowSet = null;
+        if (existing == null)
+        {
+            rowSet = separateRows(partition, columns);
+
+            // If we are building the view, we do not want to add old values; they will always
be the same
+            if (!isBuilding)
+                readLocalRows(rowSet);
+        }
+        else
+        {
+            rowSet = existing.withNewViewPrimaryKey(columns);
+        }
+
+        return rowSet;
+    }
+
+
+    /**
      * @param isBuilding If the view is currently being built, we do not query the values
which are already stored,
      *                   since all of the update will already be present in the base table.
      * @return View mutations which represent the changes necessary as long as previously
created mutations for the view
      *         have been applied successfully. This is based solely on the changes that are
necessary given the current
      *         state of the base table and the newly applying partition data.
      */
-    public Collection<Mutation> createMutations(ByteBuffer key, AbstractBTreePartition
partition, boolean isBuilding)
+    public Collection<Mutation> createMutations(AbstractBTreePartition partition, TemporalRow.Set
rowSet, boolean isBuilding)
     {
         if (!updateAffectsView(partition))
             return null;
 
-        TemporalRow.Set rowSet = separateRows(key, partition);
-
-        // If we are building the view, we do not want to add old values; they will always
be the same
-        if (!isBuilding)
-            readLocalRows(rowSet);
-
         Collection<Mutation> mutations = null;
         for (TemporalRow temporalRow : rowSet)
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1fc31216/src/java/org/apache/cassandra/db/view/MaterializedViewBuilder.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/view/MaterializedViewBuilder.java b/src/java/org/apache/cassandra/db/view/MaterializedViewBuilder.java
index e8842ed..5fa5a82 100644
--- a/src/java/org/apache/cassandra/db/view/MaterializedViewBuilder.java
+++ b/src/java/org/apache/cassandra/db/view/MaterializedViewBuilder.java
@@ -89,7 +89,10 @@ public class MaterializedViewBuilder extends CompactionInfo.Holder
 
                try (RowIterator rowIterator = partitionIterator.next())
                {
-                   Collection<Mutation> mutations = view.createMutations(key.getKey(),
FilteredPartition.create(rowIterator), true);
+                   FilteredPartition partition = FilteredPartition.create(rowIterator);
+                   TemporalRow.Set temporalRows = view.getTemporalRowSet(partition, null,
true);
+
+                   Collection<Mutation> mutations = view.createMutations(partition,
temporalRows, true);
 
                    if (mutations != null)
                    {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1fc31216/src/java/org/apache/cassandra/db/view/MaterializedViewManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/view/MaterializedViewManager.java b/src/java/org/apache/cassandra/db/view/MaterializedViewManager.java
index 7f97728..5184d8d 100644
--- a/src/java/org/apache/cassandra/db/view/MaterializedViewManager.java
+++ b/src/java/org/apache/cassandra/db/view/MaterializedViewManager.java
@@ -150,16 +150,20 @@ public class MaterializedViewManager
      * Calculates and pushes updates to the views replicas. The replicas are determined by
      * {@link MaterializedViewUtils#getViewNaturalEndpoint(String, Token, Token)}.
      */
-    public void pushViewReplicaUpdates(ByteBuffer key, PartitionUpdate update) throws UnavailableException,
OverloadedException, WriteTimeoutException
+    public void pushViewReplicaUpdates(PartitionUpdate update) throws UnavailableException,
OverloadedException, WriteTimeoutException
     {
         // This happens when we are replaying from commitlog. In that case, we have already
sent this commit off to the
         // view node.
         if (!StorageService.instance.isJoined()) return;
 
         List<Mutation> mutations = null;
+        TemporalRow.Set temporalRows = null;
         for (Map.Entry<String, MaterializedView> view : viewsByName.entrySet())
         {
-            Collection<Mutation> viewMutations = view.getValue().createMutations(key,
update, false);
+
+            temporalRows = view.getValue().getTemporalRowSet(update, temporalRows, false);
+
+            Collection<Mutation> viewMutations = view.getValue().createMutations(update,
temporalRows, false);
             if (viewMutations != null && !viewMutations.isEmpty())
             {
                 if (mutations == null)
@@ -169,7 +173,7 @@ public class MaterializedViewManager
         }
         if (mutations != null)
         {
-            StorageProxy.mutateMV(key, mutations);
+            StorageProxy.mutateMV(update.partitionKey().getKey(), mutations);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1fc31216/src/java/org/apache/cassandra/db/view/TemporalRow.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/view/TemporalRow.java b/src/java/org/apache/cassandra/db/view/TemporalRow.java
index 00fdf48..3ba91ee 100644
--- a/src/java/org/apache/cassandra/db/view/TemporalRow.java
+++ b/src/java/org/apache/cassandra/db/view/TemporalRow.java
@@ -162,7 +162,10 @@ public class TemporalRow
     private final ColumnFamilyStore baseCfs;
     private final java.util.Set<ColumnIdentifier> viewPrimaryKey;
     private final ByteBuffer basePartitionKey;
-    public final Map<ColumnIdentifier, ByteBuffer> clusteringColumns;
+    private final Map<ColumnIdentifier, ByteBuffer> clusteringColumns;
+    private final Row startRow;
+    private final boolean startIsNew;
+
     public final int nowInSec;
     private final Map<ColumnIdentifier, Map<CellPath, SortedMap<Long, TemporalCell>>>
columnValues = new HashMap<>();
     private int viewClusteringTtl = NO_TTL;
@@ -174,14 +177,18 @@ public class TemporalRow
         this.baseCfs = baseCfs;
         this.viewPrimaryKey = viewPrimaryKey;
         this.basePartitionKey = key;
+        this.startRow = row;
+        this.startIsNew = isNew;
         this.nowInSec = nowInSec;
-        clusteringColumns = new HashMap<>();
+
         LivenessInfo liveness = row.primaryKeyLivenessInfo();
         this.viewClusteringLocalDeletionTime = minValueIfSet(viewClusteringLocalDeletionTime,
row.deletion().localDeletionTime(), NO_DELETION_TIME);
         this.viewClusteringTimestamp = minValueIfSet(viewClusteringTimestamp, liveness.timestamp(),
NO_TIMESTAMP);
         this.viewClusteringTtl = minValueIfSet(viewClusteringTtl, liveness.ttl(), NO_TTL);
 
         List<ColumnDefinition> clusteringDefs = baseCfs.metadata.clusteringColumns();
+        clusteringColumns = new HashMap<>();
+
         for (int i = 0; i < clusteringDefs.size(); i++)
         {
             ColumnDefinition cdef = clusteringDefs.get(i);
@@ -371,6 +378,7 @@ public class TemporalRow
         public final DecoratedKey dk;
         private final Map<Clustering, TemporalRow> clusteringToRow;
         final int nowInSec = FBUtilities.nowInSeconds();
+        private boolean hasTombstonedExisting = false;
 
         Set(ColumnFamilyStore baseCfs, java.util.Set<ColumnIdentifier> viewPrimaryKey,
ByteBuffer key)
         {
@@ -400,12 +408,56 @@ public class TemporalRow
                 clusteringToRow.put(row.clustering(), temporalRow);
             }
 
-            for (Cell cell: row.cells())
+            for (Cell cell : row.cells())
             {
                 temporalRow.addCell(cell, isNew);
             }
         }
 
+        private void addRow(TemporalRow row)
+        {
+            TemporalRow newRow = new TemporalRow(baseCfs, viewPrimaryKey, key, row.startRow,
nowInSec, row.startIsNew);
+
+            TemporalRow existing = clusteringToRow.put(row.startRow.clustering(), newRow);
+            assert existing == null;
+
+
+            for (Map.Entry<ColumnIdentifier, Map<CellPath, SortedMap<Long, TemporalCell>>>
entry : row.columnValues.entrySet())
+            {
+                for (Map.Entry<CellPath, SortedMap<Long, TemporalCell>> cellPathEntry
: entry.getValue().entrySet())
+                {
+                    SortedMap<Long, TemporalCell> oldCells = cellPathEntry.getValue();
+
+                    for (Map.Entry<Long, TemporalCell> cellEntry : oldCells.entrySet())
+                    {
+                        newRow.addColumnValue(entry.getKey(), cellPathEntry.getKey(), cellEntry.getKey(),
+                                              cellEntry.getValue().ttl, cellEntry.getValue().localDeletionTime,
+                                              cellEntry.getValue().value, cellEntry.getValue().isNew);
+                    }
+                }
+            }
+        }
+
+        public TemporalRow.Set withNewViewPrimaryKey(java.util.Set<ColumnIdentifier>
viewPrimaryKey)
+        {
+            TemporalRow.Set newSet = new Set(baseCfs, viewPrimaryKey, key);
+
+            for (TemporalRow row : this)
+                newSet.addRow(row);
+
+            return newSet;
+        }
+
+        public boolean hasTombstonedExisting()
+        {
+            return hasTombstonedExisting;
+        }
+
+        public void setTombstonedExisting()
+        {
+            hasTombstonedExisting = true;
+        }
+
         public int size()
         {
             return clusteringToRow.size();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1fc31216/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index 9d999ee..f58ac56 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -685,11 +685,11 @@ public class StorageProxy implements StorageProxyMBean
                                                                                  cleanup);
 
                 wrappers.add(wrapper);
-
-                //Apply to local batchlog memtable in this thread
-                BatchlogManager.getBatchlogMutationFor(mutations, batchUUID, MessagingService.current_version).apply();
             }
 
+            //Apply to local batchlog memtable in this thread
+            BatchlogManager.getBatchlogMutationFor(mutations, batchUUID, MessagingService.current_version).apply();
+
             // now actually perform the writes and wait for them to complete
             asyncWriteBatchedMutations(wrappers, localDataCenter, Stage.MATERIALIZED_VIEW_MUTATION);
         }


Mime
View raw message