cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From slebre...@apache.org
Subject svn commit: r1142690 - in /cassandra/trunk: ./ src/java/org/apache/cassandra/db/ src/java/org/apache/cassandra/db/filter/ test/unit/org/apache/cassandra/db/ test/unit/org/apache/cassandra/db/compaction/ test/unit/org/apache/cassandra/service/
Date Mon, 04 Jul 2011 14:36:12 GMT
Author: slebresne
Date: Mon Jul  4 14:36:11 2011
New Revision: 1142690

URL: http://svn.apache.org/viewvc?rev=1142690&view=rev
Log:
Reset CF and SC deletion time after gc_grace
patch by slebresne; reviewed by jbellis for CASSANDRA-2317

Added:
    cassandra/trunk/src/java/org/apache/cassandra/db/AbstractColumnContainer.java
Modified:
    cassandra/trunk/CHANGES.txt
    cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamily.java
    cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java
    cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
    cassandra/trunk/src/java/org/apache/cassandra/db/IColumnContainer.java
    cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java
    cassandra/trunk/src/java/org/apache/cassandra/db/SuperColumn.java
    cassandra/trunk/src/java/org/apache/cassandra/db/filter/QueryFilter.java
    cassandra/trunk/test/unit/org/apache/cassandra/db/RowTest.java
    cassandra/trunk/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java
    cassandra/trunk/test/unit/org/apache/cassandra/service/RowResolverTest.java

Modified: cassandra/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=1142690&r1=1142689&r2=1142690&view=diff
==============================================================================
--- cassandra/trunk/CHANGES.txt (original)
+++ cassandra/trunk/CHANGES.txt Mon Jul  4 14:36:11 2011
@@ -10,6 +10,7 @@
  * clean up tmp files after failed compaction (CASSANDRA-2468)
  * restrict repair streaming to specific columnfamilies (CASSANDRA-2280)
  * don't bother persisting columns shadowed by a row tombstone (CASSANDRA-2589)
+ * reset CF and SC deletion times after gc_grace (CASSANDRA-2317)
 
 
 0.8.2

Added: cassandra/trunk/src/java/org/apache/cassandra/db/AbstractColumnContainer.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/AbstractColumnContainer.java?rev=1142690&view=auto
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/AbstractColumnContainer.java (added)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/AbstractColumnContainer.java Mon Jul
 4 14:36:11 2011
@@ -0,0 +1,212 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db;
+
+import java.nio.ByteBuffer;
+import java.security.MessageDigest;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.SortedSet;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.filter.QueryPath;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.io.ICompactSerializer2;
+import org.apache.cassandra.io.util.IIterableColumns;
+import org.apache.cassandra.utils.FBUtilities;
+
+public abstract class AbstractColumnContainer implements IColumnContainer, IIterableColumns
+{
+    private static Logger logger = LoggerFactory.getLogger(AbstractColumnContainer.class);
+
+    protected final ConcurrentSkipListMap<ByteBuffer, IColumn> columns;
+    protected final AtomicReference<DeletionInfo> deletionInfo = new AtomicReference<DeletionInfo>(new
DeletionInfo());
+
+    protected AbstractColumnContainer(ConcurrentSkipListMap<ByteBuffer, IColumn> columns)
+    {
+        this.columns = columns;
+    }
+
+    @Deprecated // TODO this is a hack to set initial value outside constructor
+    public void delete(int localtime, long timestamp)
+    {
+        deletionInfo.set(new DeletionInfo(timestamp, localtime));
+    }
+
+    public void delete(AbstractColumnContainer cc2)
+    {
+        // Keeping deletion info for max markedForDeleteAt value
+        DeletionInfo current;
+        DeletionInfo cc2Info = cc2.deletionInfo.get();
+        while (true)
+        {
+             current = deletionInfo.get();
+             if (current.markedForDeleteAt >= cc2Info.markedForDeleteAt || deletionInfo.compareAndSet(current,
cc2Info))
+                 break;
+        }
+    }
+
+    public boolean isMarkedForDelete()
+    {
+        return getMarkedForDeleteAt() > Long.MIN_VALUE;
+    }
+
+    public long getMarkedForDeleteAt()
+    {
+        return deletionInfo.get().markedForDeleteAt;
+    }
+
+    public int getLocalDeletionTime()
+    {
+        return deletionInfo.get().localDeletionTime;
+    }
+
+    public AbstractType getComparator()
+    {
+        return (AbstractType)columns.comparator();
+    }
+
+    public void maybeResetDeletionTimes(int gcBefore)
+    {
+        while (true)
+        {
+            DeletionInfo current = deletionInfo.get();
+            // Stop if either we don't need to change the deletion info (it's
+            // still MIN_VALUE or not expired yet) or we've succesfully changed it
+            if (current.localDeletionTime == Integer.MIN_VALUE
+             || current.localDeletionTime > gcBefore
+             || deletionInfo.compareAndSet(current, new DeletionInfo()))
+                break;
+        }
+    }
+
+    /**
+     * We need to go through each column in the column container and resolve it before adding
+     */
+    public void addAll(AbstractColumnContainer cc)
+    {
+        for (IColumn column : cc.getSortedColumns())
+            addColumn(column);
+        delete(cc);
+    }
+
+    /*
+     * If we find an old column that has the same name
+     * the ask it to resolve itself else add the new column
+    */
+    public void addColumn(IColumn column)
+    {
+        ByteBuffer name = column.name();
+        IColumn oldColumn;
+        while ((oldColumn = columns.putIfAbsent(name, column)) != null)
+        {
+            if (oldColumn instanceof SuperColumn)
+            {
+                assert column instanceof SuperColumn;
+                ((SuperColumn) oldColumn).putColumn((SuperColumn)column);
+                break;  // Delegated to SuperColumn
+            }
+            else
+            {
+                // calculate reconciled col from old (existing) col and new col
+                IColumn reconciledColumn = column.reconcile(oldColumn);
+                if (columns.replace(name, oldColumn, reconciledColumn))
+                    break;
+
+                // We failed to replace column due to a concurrent update or a concurrent
removal. Keep trying.
+                // (Currently, concurrent removal should not happen (only updates), but let
us support that anyway.)
+            }
+        }
+    }
+
+    abstract protected void putColumn(SuperColumn sc);
+
+    public IColumn getColumn(ByteBuffer name)
+    {
+        return columns.get(name);
+    }
+
+    public SortedSet<ByteBuffer> getColumnNames()
+    {
+        return columns.keySet();
+    }
+
+    public Collection<IColumn> getSortedColumns()
+    {
+        return columns.values();
+    }
+
+    public Collection<IColumn> getReverseSortedColumns()
+    {
+        return columns.descendingMap().values();
+    }
+
+    public Map<ByteBuffer, IColumn> getColumnsMap()
+    {
+        return columns;
+    }
+
+    public void remove(ByteBuffer columnName)
+    {
+        columns.remove(columnName);
+    }
+
+    public int getColumnCount()
+    {
+        return columns.size();
+    }
+
+    public boolean isEmpty()
+    {
+        return columns.isEmpty();
+    }
+
+    public int getEstimatedColumnCount()
+    {
+        return getColumnCount();
+    }
+
+    public Iterator<IColumn> iterator()
+    {
+        return columns.values().iterator();
+    }
+
+    private static class DeletionInfo
+    {
+        public final long markedForDeleteAt;
+        public final int localDeletionTime;
+
+        public DeletionInfo()
+        {
+            this(Long.MIN_VALUE, Integer.MIN_VALUE);
+        }
+
+        public DeletionInfo(long markedForDeleteAt, int localDeletionTime)
+        {
+            this.markedForDeleteAt = markedForDeleteAt;
+            this.localDeletionTime = localDeletionTime;
+        }
+    }
+}

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamily.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamily.java?rev=1142690&r1=1142689&r2=1142690&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamily.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamily.java Mon Jul  4 14:36:11
2011
@@ -22,16 +22,9 @@ import static org.apache.cassandra.db.DB
 
 import java.nio.ByteBuffer;
 import java.security.MessageDigest;
-import java.util.Collection;
-import java.util.Iterator;
 import java.util.Map;
 import java.util.SortedSet;
 import java.util.concurrent.ConcurrentSkipListMap;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.DatabaseDescriptor;
@@ -42,10 +35,8 @@ import org.apache.cassandra.io.IColumnSe
 import org.apache.cassandra.io.util.IIterableColumns;
 import org.apache.cassandra.utils.FBUtilities;
 
-public class ColumnFamily implements IColumnContainer, IIterableColumns
+public class ColumnFamily extends AbstractColumnContainer
 {
-    private static Logger logger = LoggerFactory.getLogger(ColumnFamily.class);
-
     /* The column serializer for this Column Family. Create based on config. */
     private static ColumnFamilySerializer serializer = new ColumnFamilySerializer();
     private final CFMetaData cfm;
@@ -71,23 +62,25 @@ public class ColumnFamily implements ICo
     }
 
     private transient IColumnSerializer columnSerializer;
-    final AtomicLong markedForDeleteAt = new AtomicLong(Long.MIN_VALUE);
-    final AtomicInteger localDeletionTime = new AtomicInteger(Integer.MIN_VALUE);
-    private ConcurrentSkipListMap<ByteBuffer, IColumn> columns;
     
     public ColumnFamily(CFMetaData cfm)
     {
+        this(cfm, new ConcurrentSkipListMap<ByteBuffer, IColumn>(cfm.comparator));
+    }
+
+    private ColumnFamily(CFMetaData cfm, ConcurrentSkipListMap<ByteBuffer, IColumn>
map)
+    {
+        super(map);
         assert cfm != null;
         this.cfm = cfm;
         columnSerializer = cfm.cfType == ColumnFamilyType.Standard ? Column.serializer()
: SuperColumn.serializer(cfm.subcolumnComparator);
-        columns = new ConcurrentSkipListMap<ByteBuffer, IColumn>(cfm.comparator);
      }
     
     public ColumnFamily cloneMeShallow()
     {
         ColumnFamily cf = new ColumnFamily(cfm);
-        cf.markedForDeleteAt.set(markedForDeleteAt.get());
-        cf.localDeletionTime.set(localDeletionTime.get());
+        // since deletion info is immutable, aliasing it is fine
+        cf.deletionInfo.set(deletionInfo.get());
         return cf;
     }
 
@@ -103,8 +96,9 @@ public class ColumnFamily implements ICo
 
     public ColumnFamily cloneMe()
     {
-        ColumnFamily cf = cloneMeShallow();
-        cf.columns = columns.clone();
+        ColumnFamily cf = new ColumnFamily(cfm, columns.clone());
+        // since deletion info is immutable, aliasing it is fine
+        cf.deletionInfo.set(deletionInfo.get());
         return cf;
     }
 
@@ -121,32 +115,11 @@ public class ColumnFamily implements ICo
         return cfm;
     }
 
-    /*
-     *  We need to go through each column
-     *  in the column family and resolve it before adding
-    */
-    public void addAll(ColumnFamily cf)
-    {
-        for (IColumn column : cf.getSortedColumns())
-            addColumn(column);
-        delete(cf);
-    }
-
     public IColumnSerializer getColumnSerializer()
     {
         return columnSerializer;
     }
 
-    public int getColumnCount()
-    {
-        return columns.size();
-    }
-
-    public boolean isEmpty()
-    {
-        return columns.isEmpty();
-    }
-
     public boolean isSuper()
     {
         return getType() == ColumnFamilyType.Super;
@@ -214,82 +187,6 @@ public class ColumnFamily implements ICo
     }
 
     /*
-     * If we find an old column that has the same name
-     * the ask it to resolve itself else add the new column .
-    */
-    public void addColumn(IColumn column)
-    {
-        ByteBuffer name = column.name();
-        IColumn oldColumn;
-        while ((oldColumn = columns.putIfAbsent(name, column)) != null)
-        {
-            if (oldColumn instanceof SuperColumn)
-            {
-                ((SuperColumn) oldColumn).putColumn(column);
-                break;  // Delegated to SuperColumn
-            }
-            else
-            {
-                // calculate reconciled col from old (existing) col and new col
-                IColumn reconciledColumn = column.reconcile(oldColumn);
-                if (columns.replace(name, oldColumn, reconciledColumn))
-                    break;
-
-                // We failed to replace column due to a concurrent update or a concurrent
removal. Keep trying.
-                // (Currently, concurrent removal should not happen (only updates), but let
us support that anyway.)
-            }
-        }
-    }
-
-    public IColumn getColumn(ByteBuffer name)
-    {
-        return columns.get(name);
-    }
-
-    public SortedSet<ByteBuffer> getColumnNames()
-    {
-        return columns.keySet();
-    }
-
-    public Collection<IColumn> getSortedColumns()
-    {
-        return columns.values();
-    }
-
-    public Collection<IColumn> getReverseSortedColumns()
-    {
-        return columns.descendingMap().values();
-    }
-
-    public Map<ByteBuffer, IColumn> getColumnsMap()
-    {
-        return columns;
-    }
-
-    public void remove(ByteBuffer columnName)
-    {
-        columns.remove(columnName);
-    }
-
-    @Deprecated // TODO this is a hack to set initial value outside constructor
-    public void delete(int localtime, long timestamp)
-    {
-        localDeletionTime.set(localtime);
-        markedForDeleteAt.set(timestamp);
-    }
-
-    public void delete(ColumnFamily cf2)
-    {
-        FBUtilities.atomicSetMax(localDeletionTime, cf2.getLocalDeletionTime()); // do this
first so we won't have a column that's "deleted" but has no local deletion time
-        FBUtilities.atomicSetMax(markedForDeleteAt, cf2.getMarkedForDeleteAt());
-    }
-
-    public boolean isMarkedForDelete()
-    {
-        return markedForDeleteAt.get() > Long.MIN_VALUE;
-    }
-
-    /*
      * This function will calculate the difference between 2 column families.
      * The external input is assumed to be a superset of internal.
      */
@@ -330,11 +227,6 @@ public class ColumnFamily implements ICo
         return null;
     }
 
-    public AbstractType getComparator()
-    {
-        return (AbstractType)columns.comparator();
-    }
-
     int size()
     {
         int size = 0;
@@ -382,16 +274,6 @@ public class ColumnFamily implements ICo
             column.updateDigest(digest);
     }
 
-    public long getMarkedForDeleteAt()
-    {
-        return markedForDeleteAt.get();
-    }
-
-    public int getLocalDeletionTime()
-    {
-        return localDeletionTime.get();
-    }
-
     public static AbstractType getComparatorFor(String table, String columnFamilyName, ByteBuffer
superColumnName)
     {
         return superColumnName == null
@@ -414,16 +296,6 @@ public class ColumnFamily implements ICo
         addAll(cf);
     }
 
-    public int getEstimatedColumnCount()
-    {
-        return getColumnCount();
-    }
-
-    public Iterator<IColumn> iterator()
-    {
-        return columns.values().iterator();
-    }
-
     public long serializedSize()
     {
         int size = boolSize_ // bool
@@ -449,4 +321,9 @@ public class ColumnFamily implements ICo
             column.validateFields(metadata);
         }
     }
+
+    protected void putColumn(SuperColumn sc)
+    {
+        throw new UnsupportedOperationException("Unsupported operation for a column family");
+    }
 }

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java?rev=1142690&r1=1142689&r2=1142690&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java Mon Jul 
4 14:36:11 2011
@@ -97,8 +97,8 @@ public class ColumnFamilySerializer impl
 
     public void serializeCFInfo(ColumnFamily columnFamily, DataOutput dos) throws IOException
     {
-        dos.writeInt(columnFamily.localDeletionTime.get());
-        dos.writeLong(columnFamily.markedForDeleteAt.get());
+        dos.writeInt(columnFamily.getLocalDeletionTime());
+        dos.writeLong(columnFamily.getMarkedForDeleteAt());
     }
 
     public int serializeWithIndexes(ColumnFamily columnFamily, DataOutput dos)

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=1142690&r1=1142689&r2=1142690&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Mon Jul  4 14:36:11
2011
@@ -775,6 +775,10 @@ public class ColumnFamilyStore implement
         // (we want this to be deterministic to avoid confusion.)
         if (cf.getColumnCount() == 0 && cf.getLocalDeletionTime() <= gcBefore)
             return null;
+
+        // If there is non deleted columns, we still need to reset the column family
+        // deletion times since gc_grace seconds had elapsed
+        cf.maybeResetDeletionTimes(gcBefore);
         return cf;
     }
 
@@ -844,6 +848,12 @@ public class ColumnFamilyStore implement
             {
                 cf.remove(c.name());
             }
+            else
+            {
+                // If there is non deleted columns, we still need to reset the column family
+                // deletion times since gc_grace seconds had elapsed
+                c.maybeResetDeletionTimes(gcBefore);
+            }
         }
     }
 

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/IColumnContainer.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/IColumnContainer.java?rev=1142690&r1=1142689&r2=1142690&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/IColumnContainer.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/IColumnContainer.java Mon Jul  4 14:36:11
2011
@@ -32,6 +32,7 @@ public interface IColumnContainer
 
     public boolean isMarkedForDelete();
     public long getMarkedForDeleteAt();
+    public int getLocalDeletionTime();
 
     public AbstractType getComparator();
 

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java?rev=1142690&r1=1142689&r2=1142690&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java Mon Jul  4 14:36:11
2011
@@ -190,7 +190,7 @@ public class RowMutation implements IMut
         else if (path.columnName == null)
         {
             SuperColumn sc = new SuperColumn(path.superColumnName, columnFamily.getSubComparator());
-            sc.markForDeleteAt(localDeleteTime, timestamp);
+            sc.delete(localDeleteTime, timestamp);
             columnFamily.addColumn(sc);
         }
         else

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/SuperColumn.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/SuperColumn.java?rev=1142690&r1=1142689&r2=1142690&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/SuperColumn.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/SuperColumn.java Mon Jul  4 14:36:11
2011
@@ -28,8 +28,6 @@ import java.util.Comparator;
 import java.util.IdentityHashMap;
 import java.util.Map;
 import java.util.concurrent.ConcurrentSkipListMap;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.db.marshal.AbstractType;
@@ -42,7 +40,7 @@ import org.apache.cassandra.utils.FBUtil
 import org.cliffc.high_scale_lib.NonBlockingHashMap;
 
 
-public class SuperColumn implements IColumn, IColumnContainer
+public class SuperColumn extends AbstractColumnContainer implements IColumn
 {
     private static NonBlockingHashMap<Comparator, SuperColumnSerializer> serializers
= new NonBlockingHashMap<Comparator, SuperColumnSerializer>();
     public static SuperColumnSerializer serializer(AbstractType comparator)
@@ -56,10 +54,7 @@ public class SuperColumn implements ICol
         return serializer;
     }
 
-    private ByteBuffer name_;
-    private ConcurrentSkipListMap<ByteBuffer, IColumn> columns_;
-    private AtomicInteger localDeletionTime = new AtomicInteger(Integer.MIN_VALUE);
-    private AtomicLong markedForDeleteAt = new AtomicLong(Long.MIN_VALUE);
+    private ByteBuffer name;
 
     public SuperColumn(ByteBuffer name, AbstractType comparator)
     {
@@ -68,54 +63,41 @@ public class SuperColumn implements ICol
 
     SuperColumn(ByteBuffer name, ConcurrentSkipListMap<ByteBuffer, IColumn> columns)
     {
+        super(columns);
         assert name != null;
         assert name.remaining() <= IColumn.MAX_NAME_LENGTH;
-        name_ = name;
-        columns_ = columns;
-    }
-
-    public AbstractType getComparator()
-    {
-        return (AbstractType)columns_.comparator();
+        this.name = name;
     }
 
     public SuperColumn cloneMeShallow()
     {
-        SuperColumn sc = new SuperColumn(name_, getComparator());
-        sc.markForDeleteAt(localDeletionTime.get(), markedForDeleteAt.get());
+        SuperColumn sc = new SuperColumn(name, getComparator());
+        // since deletion info is immutable, aliasing it is fine
+        sc.deletionInfo.set(deletionInfo.get());
         return sc;
     }
 
     public IColumn cloneMe()
     {
-        SuperColumn sc = new SuperColumn(name_, new ConcurrentSkipListMap<ByteBuffer,
IColumn>(columns_));
-        sc.markForDeleteAt(localDeletionTime.get(), markedForDeleteAt.get());
+        SuperColumn sc = new SuperColumn(name, new ConcurrentSkipListMap<ByteBuffer, IColumn>(columns));
+        // since deletion info is immutable, aliasing it is fine
+        sc.deletionInfo.set(deletionInfo.get());
         return sc;
     }
 
-	public boolean isMarkedForDelete()
-	{
-        return markedForDeleteAt.get() > Long.MIN_VALUE;
-	}
-
     public ByteBuffer name()
     {
-    	return name_;
+        return name;
     }
 
     public Collection<IColumn> getSubColumns()
     {
-    	return columns_.values();
-    }
-
-    public Collection<IColumn> getSortedColumns()
-    {
-        return getSubColumns();
+        return getSortedColumns();
     }
 
     public IColumn getSubColumn(ByteBuffer columnName)
     {
-        IColumn column = columns_.get(columnName);
+        IColumn column = columns.get(columnName);
         assert column == null || column instanceof Column;
         return column;
     }
@@ -143,12 +125,7 @@ public class SuperColumn implements ICol
     	 * We need to keep the way we are calculating the column size in sync with the
     	 * way we are calculating the size for the column family serializer.
     	 */
-      return DBConstants.shortSize_ + name_.remaining() + DBConstants.intSize_ + DBConstants.longSize_
+ DBConstants.intSize_ + size();
-    }
-
-    public void remove(ByteBuffer columnName)
-    {
-    	columns_.remove(columnName);
+      return DBConstants.shortSize_ + name.remaining() + DBConstants.intSize_ + DBConstants.longSize_
+ DBConstants.intSize_ + size();
     }
 
     public long timestamp()
@@ -159,7 +136,7 @@ public class SuperColumn implements ICol
     public long mostRecentLiveChangeAt()
     {
         long max = Long.MIN_VALUE;
-        for (IColumn column : columns_.values())
+        for (IColumn column : getSubColumns())
         {
             if (!column.isMarkedForDelete() && column.timestamp() > max)
             {
@@ -174,42 +151,24 @@ public class SuperColumn implements ICol
     	throw new UnsupportedOperationException("This operation is not supported for Super Columns.");
     }
 
+    @Override
     public void addColumn(IColumn column)
     {
         assert column instanceof Column : "A super column can only contain simple columns";
-
-        ByteBuffer name = column.name();
-        IColumn oldColumn;
-        while ((oldColumn = columns_.putIfAbsent(name, column)) != null)
-        {
-            IColumn reconciledColumn = column.reconcile(oldColumn);
-            if (columns_.replace(name, oldColumn, reconciledColumn))
-                break;
-
-            // We failed to replace column due to a concurrent update or a concurrent removal.
Keep trying.
-            // (Currently, concurrent removal should not happen (only updates), but let us
support that anyway.)
-        }
+        super.addColumn((Column)column);
     }
 
     /*
      * Go through each sub column if it exists then as it to resolve itself
      * if the column does not exist then create it.
      */
-    public void putColumn(IColumn column)
+    protected void putColumn(SuperColumn column)
     {
-        assert column instanceof SuperColumn;
-
         for (IColumn subColumn : column.getSubColumns())
         {
         	addColumn(subColumn);
         }
-        FBUtilities.atomicSetMax(localDeletionTime, column.getLocalDeletionTime()); // do
this first so we won't have a column that's "deleted" but has no local deletion time
-        FBUtilities.atomicSetMax(markedForDeleteAt, column.getMarkedForDeleteAt());
-    }
-
-    public long getMarkedForDeleteAt()
-    {
-        return markedForDeleteAt.get();
+        delete(column);
     }
 
     public IColumn diff(IColumn columnNew)
@@ -217,7 +176,7 @@ public class SuperColumn implements ICol
     	IColumn columnDiff = new SuperColumn(columnNew.name(), ((SuperColumn)columnNew).getComparator());
         if (columnNew.getMarkedForDeleteAt() > getMarkedForDeleteAt())
         {
-            ((SuperColumn)columnDiff).markForDeleteAt(columnNew.getLocalDeletionTime(), columnNew.getMarkedForDeleteAt());
+            ((SuperColumn)columnDiff).delete(columnNew.getLocalDeletionTime(), columnNew.getMarkedForDeleteAt());
         }
 
         // (don't need to worry about columnNew containing subColumns that are shadowed by
@@ -225,7 +184,7 @@ public class SuperColumn implements ICol
         // takes care of those for us.)
         for (IColumn subColumn : columnNew.getSubColumns())
         {
-        	IColumn columnInternal = columns_.get(subColumn.name());
+        	IColumn columnInternal = columns.get(subColumn.name());
         	if(columnInternal == null )
         	{
         		columnDiff.addColumn(subColumn);
@@ -248,19 +207,19 @@ public class SuperColumn implements ICol
 
     public void updateDigest(MessageDigest digest)
     {
-        assert name_ != null;
-        digest.update(name_.duplicate());
+        assert name != null;
+        digest.update(name.duplicate());
         DataOutputBuffer buffer = new DataOutputBuffer();
         try
         {
-            buffer.writeLong(markedForDeleteAt.get());
+            buffer.writeLong(getMarkedForDeleteAt());
         }
         catch (IOException e)
         {
             throw new RuntimeException(e);
         }
         digest.update(buffer.getData(), 0, buffer.getLength());
-        for (IColumn column : columns_.values())
+        for (IColumn column : getSubColumns())
         {
             column.updateDigest(digest);
         }
@@ -270,14 +229,14 @@ public class SuperColumn implements ICol
     {
     	StringBuilder sb = new StringBuilder();
         sb.append("SuperColumn(");
-    	sb.append(comparator.getString(name_));
+    	sb.append(comparator.getString(name));
 
         if (isMarkedForDelete()) {
             sb.append(" -delete at ").append(getMarkedForDeleteAt()).append("-");
         }
 
         sb.append(" [");
-        sb.append(getComparator().getColumnsString(columns_.values()));
+        sb.append(getComparator().getColumnsString(getSubColumns()));
         sb.append("])");
 
         return sb.toString();
@@ -285,26 +244,14 @@ public class SuperColumn implements ICol
 
     public boolean isLive()
     {
-        return mostRecentLiveChangeAt() > markedForDeleteAt.get();
-    }
-
-    public int getLocalDeletionTime()
-    {
-        return localDeletionTime.get();
-    }
-
-    @Deprecated // TODO this is a hack to set initial value outside constructor
-    public void markForDeleteAt(int localDeleteTime, long timestamp)
-    {
-        this.localDeletionTime.set(localDeleteTime);
-        this.markedForDeleteAt.set(timestamp);
+        return mostRecentLiveChangeAt() > getMarkedForDeleteAt();
     }
 
     public IColumn shallowCopy()
     {
-        SuperColumn sc = new SuperColumn(ByteBufferUtil.clone(name_), this.getComparator());
-        sc.localDeletionTime = localDeletionTime;
-        sc.markedForDeleteAt = markedForDeleteAt;
+        SuperColumn sc = new SuperColumn(ByteBufferUtil.clone(name()), this.getComparator());
+        // since deletion info is immutable, aliasing it is fine
+        sc.deletionInfo.set(deletionInfo.get());
         return sc;
     }
     
@@ -312,11 +259,9 @@ public class SuperColumn implements ICol
     {
         // we don't try to intern supercolumn names, because if we're using Cassandra correctly
it's almost
         // certainly just going to pollute our interning map with unique, dynamic values
-        SuperColumn sc = new SuperColumn(ByteBufferUtil.clone(name_), this.getComparator());
-        sc.localDeletionTime = localDeletionTime;
-        sc.markedForDeleteAt = markedForDeleteAt;
-        
-        for(Map.Entry<ByteBuffer, IColumn> c : columns_.entrySet())
+        SuperColumn sc = (SuperColumn)shallowCopy();
+
+        for(Map.Entry<ByteBuffer, IColumn> c : columns.entrySet())
         {
             sc.addColumn(c.getValue().localCopy(cfs));
         }
@@ -414,7 +359,7 @@ class SuperColumnSerializer implements I
         {
             throw new IOException("Invalid localDeleteTime read: " + localDeleteTime);
         }
-        superColumn.markForDeleteAt(localDeleteTime, markedForDeleteAt);
+        superColumn.delete(localDeleteTime, markedForDeleteAt);
         return superColumn;
     }
 }

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/filter/QueryFilter.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/filter/QueryFilter.java?rev=1142690&r1=1142689&r2=1142690&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/filter/QueryFilter.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/filter/QueryFilter.java Mon Jul  4 14:36:11
2011
@@ -129,10 +129,10 @@ public class QueryFilter
                     // time of the cf, if that is greater.
                     long deletedAt = c.getMarkedForDeleteAt();
                     if (returnCF.getMarkedForDeleteAt() > deletedAt)
-                        ((SuperColumn)c).markForDeleteAt(c.getLocalDeletionTime(), returnCF.getMarkedForDeleteAt());
+                        ((SuperColumn)c).delete(c.getLocalDeletionTime(), returnCF.getMarkedForDeleteAt());
 
                     c = filter.filterSuperColumn((SuperColumn)c, gcBefore);
-                    ((SuperColumn)c).markForDeleteAt(c.getLocalDeletionTime(), deletedAt);
// reset sc tombstone time to what it should be
+                    ((SuperColumn)c).delete(c.getLocalDeletionTime(), deletedAt); // reset
sc tombstone time to what it should be
                 }
                 curCF.clear();           
 

Modified: cassandra/trunk/test/unit/org/apache/cassandra/db/RowTest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/db/RowTest.java?rev=1142690&r1=1142689&r2=1142690&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/db/RowTest.java (original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/db/RowTest.java Mon Jul  4 14:36:11 2011
@@ -54,7 +54,7 @@ public class RowTest extends SchemaLoade
         sc1.addColumn(column("subcolumn", "A", 0));
 
         SuperColumn sc2 = new SuperColumn(ByteBufferUtil.bytes("one"), AsciiType.instance);
-        sc2.markForDeleteAt(0, 0);
+        sc2.delete(0, 0);
 
         SuperColumn scDiff = (SuperColumn)sc1.diff(sc2);
         assertEquals(scDiff.getSubColumns().size(), 0);

Modified: cassandra/trunk/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java?rev=1142690&r1=1142689&r2=1142690&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java
(original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java
Mon Jul  4 14:36:11 2011
@@ -19,6 +19,7 @@
 package org.apache.cassandra.db.compaction;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.Collection;
 import java.util.concurrent.ExecutionException;
 
@@ -31,6 +32,7 @@ import org.apache.cassandra.db.Decorated
 import org.apache.cassandra.db.RowMutation;
 import org.apache.cassandra.db.ColumnFamily;
 import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.SuperColumn;
 import org.apache.cassandra.db.filter.QueryFilter;
 import org.apache.cassandra.db.filter.QueryPath;
 import org.apache.cassandra.io.sstable.SSTableReader;
@@ -184,7 +186,7 @@ public class CompactionsPurgeTest extend
     }
 
     @Test
-    public void testCompactionPurgeTombstonedRow() throws IOException, ExecutionException,
InterruptedException
+    public void testCompactionPurgeCachedRow() throws IOException, ExecutionException, InterruptedException
     {
         CompactionManager.instance.disableAutoCompaction();
 
@@ -230,4 +232,96 @@ public class CompactionsPurgeTest extend
         for (IColumn c : cf)
             assert !c.isMarkedForDelete();
     }
+
+    @Test
+    public void testCompactionPurgeTombstonedRow() throws IOException, ExecutionException,
InterruptedException
+    {
+        CompactionManager.instance.disableAutoCompaction();
+
+        String tableName = "Keyspace1";
+        String cfName = "Standard1";
+        Table table = Table.open(tableName);
+        ColumnFamilyStore cfs = table.getColumnFamilyStore(cfName);
+
+        DecoratedKey key = Util.dk("key3");
+        RowMutation rm;
+
+        // inserts
+        rm = new RowMutation(tableName, key.key);
+        for (int i = 0; i < 10; i++)
+        {
+            rm.add(new QueryPath(cfName, null, ByteBufferUtil.bytes(String.valueOf(i))),
ByteBufferUtil.EMPTY_BYTE_BUFFER, i);
+        }
+        rm.apply();
+
+        // deletes row with timestamp such that not all columns are deleted
+        rm = new RowMutation(tableName, key.key);
+        rm.delete(new QueryPath(cfName, null, null), 4);
+        rm.apply();
+
+        // flush and major compact (with tombstone purging)
+        cfs.forceBlockingFlush();
+        Util.compactAll(cfs).get();
+
+        // re-inserts with timestamp lower than delete
+        rm = new RowMutation(tableName, key.key);
+        for (int i = 0; i < 5; i++)
+        {
+            rm.add(new QueryPath(cfName, null, ByteBufferUtil.bytes(String.valueOf(i))),
ByteBufferUtil.EMPTY_BYTE_BUFFER, i);
+        }
+        rm.apply();
+
+        // Check that the second insert did went in
+        ColumnFamily cf = cfs.getColumnFamily(QueryFilter.getIdentityFilter(key, new QueryPath(cfName)));
+        assertEquals(10, cf.getColumnCount());
+        for (IColumn c : cf)
+            assert !c.isMarkedForDelete();
+    }
+
+    @Test
+    public void testCompactionPurgeTombstonedSuperColumn() throws IOException, ExecutionException,
InterruptedException
+    {
+        CompactionManager.instance.disableAutoCompaction();
+
+        String tableName = "Keyspace1";
+        String cfName = "Super5";
+        Table table = Table.open(tableName);
+        ColumnFamilyStore cfs = table.getColumnFamilyStore(cfName);
+
+        DecoratedKey key = Util.dk("key5");
+        RowMutation rm;
+
+        ByteBuffer scName = ByteBufferUtil.bytes("sc");
+
+        // inserts
+        rm = new RowMutation(tableName, key.key);
+        for (int i = 0; i < 10; i++)
+        {
+            rm.add(new QueryPath(cfName, scName, ByteBuffer.wrap(String.valueOf(i).getBytes())),
ByteBufferUtil.EMPTY_BYTE_BUFFER, i);
+        }
+        rm.apply();
+
+        // deletes supercolumn with timestamp such that not all columns go
+        rm = new RowMutation(tableName, key.key);
+        rm.delete(new QueryPath(cfName, scName, null), 4);
+        rm.apply();
+
+        // flush and major compact
+        cfs.forceBlockingFlush();
+        Util.compactAll(cfs).get();
+
+        // re-inserts with timestamp lower than delete
+        rm = new RowMutation(tableName, key.key);
+        for (int i = 0; i < 5; i++)
+        {
+            rm.add(new QueryPath(cfName, scName, ByteBuffer.wrap(String.valueOf(i).getBytes())),
ByteBufferUtil.EMPTY_BYTE_BUFFER, i);
+        }
+        rm.apply();
+
+        // Check that the second insert did went in
+        ColumnFamily cf = cfs.getColumnFamily(QueryFilter.getIdentityFilter(key, new QueryPath(cfName)));
+        SuperColumn sc = (SuperColumn)cf.getColumn(scName);
+        assert sc != null;
+        assertEquals(10, sc.getColumnCount());
+    }
 }

Modified: cassandra/trunk/test/unit/org/apache/cassandra/service/RowResolverTest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/service/RowResolverTest.java?rev=1142690&r1=1142689&r2=1142690&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/service/RowResolverTest.java (original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/service/RowResolverTest.java Mon Jul  4
14:36:11 2011
@@ -131,7 +131,7 @@ public class RowResolverTest extends Sch
         // subcolumn is newer than a tombstone on its parent, but not newer than the row
deletion
         ColumnFamily scf1 = ColumnFamily.create("Keyspace1", "Super1");
         SuperColumn sc = superColumn(scf1, "super-foo", column("one", "A", 1));
-        sc.markForDeleteAt((int) (System.currentTimeMillis() / 1000), 0);
+        sc.delete((int) (System.currentTimeMillis() / 1000), 0);
         scf1.addColumn(sc);
 
         ColumnFamily scf2 = ColumnFamily.create("Keyspace1", "Super1");



Mime
View raw message