cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jbel...@apache.org
Subject [1/3] git commit: Ensure that PerRowSecondaryIndex updates see the most recentvalues patch by Sam Tunnicliffe; reviewed by jbellis for CASSANDRA-5397
Date Thu, 04 Apr 2013 18:33:01 GMT
Updated Branches:
  refs/heads/cassandra-1.2 a19a15a2b -> 6a4af0c77
  refs/heads/trunk a7b2ff65a -> 76c8fe467


Ensure that PerRowSecondaryIndex updates see the most recentvalues
patch by Sam Tunnicliffe; reviewed by jbellis for CASSANDRA-5397


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/6a4af0c7
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/6a4af0c7
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/6a4af0c7

Branch: refs/heads/cassandra-1.2
Commit: 6a4af0c77ac0aa3600e0a778497e6856d3e356cb
Parents: a19a15a
Author: Jonathan Ellis <jbellis@apache.org>
Authored: Thu Apr 4 10:55:17 2013 -0500
Committer: Jonathan Ellis <jbellis@apache.org>
Committed: Thu Apr 4 13:23:35 2013 -0500

----------------------------------------------------------------------
 CHANGES.txt                                        |    2 +
 .../apache/cassandra/db/AtomicSortedColumns.java   |    2 +
 .../cassandra/db/index/SecondaryIndexManager.java  |   49 +++--
 test/unit/org/apache/cassandra/SchemaLoader.java   |   29 +++
 .../db/index/PerRowSecondaryIndexTest.java         |  151 +++++++++++++++
 5 files changed, 217 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/6a4af0c7/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index c1e2540..e64358f 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,6 @@
 1.2.4
+ * Ensure that PerRowSecondaryIndex updates see the most recent values
+   (CASSANDRA-5397)
  * avoid duplicate index entries ind PrecompactedRow and 
    ParallelCompactionIterable (CASSANDRA-5395)
  * remove the index entry on oldColumn when new column is a tombstone 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6a4af0c7/src/java/org/apache/cassandra/db/AtomicSortedColumns.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/AtomicSortedColumns.java b/src/java/org/apache/cassandra/db/AtomicSortedColumns.java
index 83aabea..552ad6a 100644
--- a/src/java/org/apache/cassandra/db/AtomicSortedColumns.java
+++ b/src/java/org/apache/cassandra/db/AtomicSortedColumns.java
@@ -195,6 +195,8 @@ public class AtomicSortedColumns implements ISortedColumns
         }
         while (!ref.compareAndSet(current, modified));
 
+        indexer.commit();
+
         return sizeDelta;
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6a4af0c7/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java b/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java
index 83374d9..df7ceff 100644
--- a/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java
+++ b/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java
@@ -19,21 +19,18 @@ package org.apache.cassandra.db.index;
 
 import java.nio.ByteBuffer;
 import java.util.*;
-import java.util.concurrent.ConcurrentNavigableMap;
-import java.util.concurrent.ConcurrentSkipListMap;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
+import java.util.concurrent.*;
 
 import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.config.ColumnDefinition;
-import org.apache.cassandra.db.filter.IDiskAtomFilter;
-import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.compaction.CompactionManager;
+import org.apache.cassandra.db.filter.IDiskAtomFilter;
 import org.apache.cassandra.dht.AbstractBounds;
+import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.io.sstable.ReducingKeyIterator;
 import org.apache.cassandra.io.sstable.SSTableReader;
 import org.apache.cassandra.thrift.Column;
@@ -54,6 +51,8 @@ public class SecondaryIndexManager
         public void update(IColumn oldColumn, IColumn column) { }
 
         public void remove(IColumn current) { }
+
+        public void commit() {}
     };
 
     /**
@@ -580,11 +579,17 @@ public class SecondaryIndexManager
 
     public static interface Updater
     {
+        /** called when constructing the index against pre-existing data */
         public void insert(IColumn column);
 
+        /** called when updating the index from a memtable */
         public void update(IColumn oldColumn, IColumn column);
 
+        /** called when lazy-updating the index during compaction (CASSANDRA-2897) */
         public void remove(IColumn current);
+
+        /** called after memtable updates are complete (CASSANDRA-5397) */
+        public void commit();
     }
 
     private class PerColumnIndexUpdater implements Updater
@@ -630,12 +635,17 @@ public class SecondaryIndexManager
 
             ((PerColumnSecondaryIndex) index).delete(key.key, column);
         }
+
+        public void commit()
+        {
+            // this is a no-op as per-column index updates are applied immediately
+        }
     }
 
     private class MixedIndexUpdater implements Updater
     {
         private final DecoratedKey key;
-        Set<Class<? extends SecondaryIndex>> appliedRowLevelIndexes = new HashSet<Class<?
extends SecondaryIndex>>();
+        ConcurrentHashMap<SecondaryIndex, ByteBuffer> deferredUpdates = new ConcurrentHashMap<SecondaryIndex,
ByteBuffer>();
 
         public MixedIndexUpdater(DecoratedKey key)
         {
@@ -651,14 +661,13 @@ public class SecondaryIndexManager
             if (index == null)
                 return;
 
-            if (index instanceof  PerColumnSecondaryIndex)
+            if (index instanceof PerColumnSecondaryIndex)
             {
                 ((PerColumnSecondaryIndex) index).insert(key.key, column);
             }
             else
             {
-                if (appliedRowLevelIndexes.add(index.getClass()))
-                    ((PerRowSecondaryIndex) index).index(key.key);
+                deferredUpdates.putIfAbsent(index, key.key);
             }
         }
 
@@ -668,7 +677,7 @@ public class SecondaryIndexManager
             if (index == null)
                 return;
 
-            if (index instanceof  PerColumnSecondaryIndex)
+            if (index instanceof PerColumnSecondaryIndex)
             {
                 ((PerColumnSecondaryIndex) index).delete(key.key, oldColumn);
                 if (!column.isMarkedForDelete())
@@ -676,8 +685,7 @@ public class SecondaryIndexManager
             }
             else
             {
-                if (appliedRowLevelIndexes.add(index.getClass()))
-                    ((PerRowSecondaryIndex) index).index(key.key);
+                deferredUpdates.putIfAbsent(index, key.key);
             }
         }
 
@@ -690,14 +698,23 @@ public class SecondaryIndexManager
             if (index == null)
                 return;
 
-            if (index instanceof  PerColumnSecondaryIndex)
+            if (index instanceof PerColumnSecondaryIndex)
             {
                 ((PerColumnSecondaryIndex) index).delete(key.key, column);
             }
             else
             {
-                if (appliedRowLevelIndexes.add(index.getClass()))
-                    ((PerRowSecondaryIndex) index).index(key.key);
+                // per-row secondary indexes are assumed to keep the index up-to-date at
insert time, rather
+                // than performing lazy updates
+            }
+        }
+
+        public void commit()
+        {
+            for (Map.Entry<SecondaryIndex, ByteBuffer> update : deferredUpdates.entrySet())
+            {
+                assert update.getKey() instanceof PerRowSecondaryIndex;
+                ((PerRowSecondaryIndex) update.getKey()).index(update.getValue());
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6a4af0c7/test/unit/org/apache/cassandra/SchemaLoader.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/SchemaLoader.java b/test/unit/org/apache/cassandra/SchemaLoader.java
index 48fbc04..cb17665 100644
--- a/test/unit/org/apache/cassandra/SchemaLoader.java
+++ b/test/unit/org/apache/cassandra/SchemaLoader.java
@@ -24,6 +24,8 @@ import java.util.*;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import com.google.common.base.Charsets;
+import org.apache.cassandra.db.index.PerRowSecondaryIndexTest;
+import org.apache.cassandra.db.index.SecondaryIndex;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.slf4j.Logger;
@@ -113,6 +115,8 @@ public class SchemaLoader
         String ks_kcs = "KeyCacheSpace";
         String ks_rcs = "RowCacheSpace";
         String ks_nocommit = "NoCommitlogSpace";
+        String ks_prsi = "PerRowSecondaryIndex";
+
 
         Class<? extends AbstractReplicationStrategy> simple = SimpleStrategy.class;
 
@@ -290,6 +294,12 @@ public class SchemaLoader
                                                      opts_rf1,
                                                      standardCFMD(ks_nocommit, "Standard1",
withOldCfIds)));
 
+        // PerRowSecondaryIndexTest
+        schema.add(KSMetaData.testMetadata(ks_prsi,
+                                           simple,
+                                           opts_rf1,
+                                           perRowIndexedCFMD(ks_prsi, "Indexed1", withOldCfIds)));
+
 
         if (Boolean.parseBoolean(System.getProperty("cassandra.test.compression", "false")))
             useCompression(schema);
@@ -297,6 +307,25 @@ public class SchemaLoader
         return schema;
     }
 
+    private static CFMetaData perRowIndexedCFMD(String ksName, String cfName, boolean withOldCfIds)
+    {
+        final Map<String, String> indexOptions = Collections.singletonMap(
+                                                      SecondaryIndex.CUSTOM_INDEX_OPTION_NAME,
+                                                      PerRowSecondaryIndexTest.TestIndex.class.getName());
+        return standardCFMD(ksName, cfName, withOldCfIds)
+                .keyValidator(AsciiType.instance)
+                .columnMetadata(new HashMap<ByteBuffer, ColumnDefinition>()
+                {{
+                        ByteBuffer cName = ByteBuffer.wrap("indexed".getBytes(Charsets.UTF_8));
+                        put(cName, new ColumnDefinition(cName,
+                                AsciiType.instance,
+                                IndexType.CUSTOM,
+                                indexOptions,
+                                ByteBufferUtil.bytesToHex(cName),
+                                null));
+                    }});
+    }
+
     private static void useCompression(List<KSMetaData> schema)
     {
         for (KSMetaData ksm : schema)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6a4af0c7/test/unit/org/apache/cassandra/db/index/PerRowSecondaryIndexTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/index/PerRowSecondaryIndexTest.java b/test/unit/org/apache/cassandra/db/index/PerRowSecondaryIndexTest.java
new file mode 100644
index 0000000..3a4f947
--- /dev/null
+++ b/test/unit/org/apache/cassandra/db/index/PerRowSecondaryIndexTest.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.index;
+
+
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.filter.QueryFilter;
+import org.apache.cassandra.db.filter.QueryPath;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.utils.ByteBufferUtil;
+
+import org.junit.Test;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Set;
+
+import static junit.framework.Assert.assertEquals;
+import static junit.framework.Assert.assertNotNull;
+
+public class PerRowSecondaryIndexTest extends SchemaLoader
+{
+
+    // test that when index(key) is called on a PRSI index,
+    // the data to be indexed can be read using the supplied
+    // key. TestIndex.index(key) simply reads the data to be
+    // indexed & stashes it in a static variable for inspection
+    // in the test.
+
+    @Test
+    public void testIndexInsertAndUpdate() throws IOException
+    {
+        // create a row then test that the configured index instance was able to read the
row
+        RowMutation rm;
+        rm = new RowMutation("PerRowSecondaryIndex", ByteBufferUtil.bytes("k1"));
+        rm.add(new QueryPath("Indexed1", null, ByteBufferUtil.bytes("indexed")), ByteBufferUtil.bytes("foo"),
1);
+        rm.apply();
+
+        ColumnFamily indexedRow = TestIndex.LAST_INDEXED_ROW;
+        assertNotNull(indexedRow);
+        assertEquals(ByteBufferUtil.bytes("foo"), indexedRow.getColumn(ByteBufferUtil.bytes("indexed")).value());
+
+        // update the row and verify what was indexed
+        rm = new RowMutation("PerRowSecondaryIndex", ByteBufferUtil.bytes("k1"));
+        rm.add(new QueryPath("Indexed1", null, ByteBufferUtil.bytes("indexed")), ByteBufferUtil.bytes("bar"),
2);
+        rm.apply();
+
+        indexedRow = TestIndex.LAST_INDEXED_ROW;
+        assertNotNull(indexedRow);
+        assertEquals(ByteBufferUtil.bytes("bar"), indexedRow.getColumn(ByteBufferUtil.bytes("indexed")).value());
+    }
+
+    public static class TestIndex extends PerRowSecondaryIndex
+    {
+        public static ColumnFamily LAST_INDEXED_ROW;
+
+        @Override
+        public void index(ByteBuffer rowKey, ColumnFamily cf)
+        {
+        }
+
+        @Override
+        public void index(ByteBuffer rowKey)
+        {
+            QueryFilter filter = QueryFilter.getIdentityFilter(DatabaseDescriptor.getPartitioner().decorateKey(rowKey),
+                                                               new QueryPath(baseCfs.getColumnFamilyName()));
+            LAST_INDEXED_ROW = baseCfs.getColumnFamily(filter);
+        }
+
+        @Override
+        public void delete(DecoratedKey key)
+        {
+        }
+
+        @Override
+        public void init()
+        {
+        }
+
+        @Override
+        public void reload()
+        {
+        }
+
+        @Override
+        public void validateOptions() throws ConfigurationException
+        {
+        }
+
+        @Override
+        public String getIndexName()
+        {
+            return null;
+        }
+
+        @Override
+        protected SecondaryIndexSearcher createSecondaryIndexSearcher(Set<ByteBuffer>
columns)
+        {
+            return null;
+        }
+
+        @Override
+        public void forceBlockingFlush()
+        {
+        }
+
+        @Override
+        public long getLiveSize()
+        {
+            return 0;
+        }
+
+        @Override
+        public ColumnFamilyStore getIndexCfs()
+        {
+            return null;
+        }
+
+        @Override
+        public void removeIndex(ByteBuffer columnName)
+        {
+        }
+
+        @Override
+        public void invalidate()
+        {
+        }
+
+        @Override
+        public void truncate(long truncatedAt)
+        {
+        }
+    }
+}


Mime
View raw message