cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jbel...@apache.org
Subject svn commit: r758974 - in /incubator/cassandra/trunk/src/org/apache/cassandra: db/ io/ service/
Date Fri, 27 Mar 2009 02:19:19 GMT
Author: jbellis
Date: Fri Mar 27 02:19:19 2009
New Revision: 758974

URL: http://svn.apache.org/viewvc?rev=758974&view=rev
Log:
add deletion marker support to Column (boolean isMarkedForDelete), SuperColumn, and ColumnFamily (change boolean isMarkedForDelete to long markedForDeleteAt).  Column is also made immutable to avoid confusion as to how to handle merging different versions of Columns (you don't, you replace the old one with the new).  This also makes concurrency bugs impossible, which remove will rely on.  Removed old broken remove/delete support in prepartion for working new code.

Modified:
    incubator/cassandra/trunk/src/org/apache/cassandra/db/AbstractColumnFactory.java
    incubator/cassandra/trunk/src/org/apache/cassandra/db/Column.java
    incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnFamily.java
    incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnFamilyStore.java
    incubator/cassandra/trunk/src/org/apache/cassandra/db/IColumn.java
    incubator/cassandra/trunk/src/org/apache/cassandra/db/Memtable.java
    incubator/cassandra/trunk/src/org/apache/cassandra/db/RowMutation.java
    incubator/cassandra/trunk/src/org/apache/cassandra/db/SuperColumn.java
    incubator/cassandra/trunk/src/org/apache/cassandra/db/Table.java
    incubator/cassandra/trunk/src/org/apache/cassandra/io/SequenceFile.java
    incubator/cassandra/trunk/src/org/apache/cassandra/service/CassandraServer.java

Modified: incubator/cassandra/trunk/src/org/apache/cassandra/db/AbstractColumnFactory.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/AbstractColumnFactory.java?rev=758974&r1=758973&r2=758974&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/AbstractColumnFactory.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/AbstractColumnFactory.java Fri Mar 27 02:19:19 2009
@@ -31,13 +31,13 @@
 abstract class AbstractColumnFactory
 {
     private static Map<String, AbstractColumnFactory> columnFactory_ = new HashMap<String, AbstractColumnFactory>();
-	
+
 	static
 	{
 		columnFactory_.put(ColumnFamily.getColumnType("Standard"),new ColumnFactory());
 		columnFactory_.put(ColumnFamily.getColumnType("Super"),new SuperColumnFactory());
 	}
-	
+
 	static AbstractColumnFactory getColumnFactory(String columnType)
 	{
 		/* Create based on the type required. */
@@ -46,10 +46,11 @@
 		else
 			return columnFactory_.get("Super");
 	}
-    
+
 	public abstract IColumn createColumn(String name);
 	public abstract IColumn createColumn(String name, byte[] value);
-	public abstract IColumn createColumn(String name, byte[] value, long timestamp);
+    public abstract IColumn createColumn(String name, byte[] value, long timestamp);
+    public abstract IColumn createColumn(String name, byte[] value, long timestamp, boolean deleted);
     public abstract ICompactSerializer2<IColumn> createColumnSerializer();
 }
 
@@ -59,17 +60,21 @@
 	{
 		return new Column(name);
 	}
-	
+
 	public IColumn createColumn(String name, byte[] value)
 	{
 		return new Column(name, value);
 	}
-	
+
 	public IColumn createColumn(String name, byte[] value, long timestamp)
 	{
 		return new Column(name, value, timestamp);
 	}
-    
+
+    public IColumn createColumn(String name, byte[] value, long timestamp, boolean deleted) {
+        return new Column(name, value, timestamp, deleted);
+    }
+
     public ICompactSerializer2<IColumn> createColumnSerializer()
     {
         return Column.serializer();
@@ -103,29 +108,28 @@
         }
 		return superColumn;
 	}
-	
+
 	public IColumn createColumn(String name, byte[] value)
 	{
-		String[] values = SuperColumnFactory.getSuperColumnAndColumn(name);
-        if ( values.length != 2 )
-            throw new IllegalArgumentException("Super Column " + name + " in invalid format. Must be in <super column name>:<column name> format.");
-        IColumn superColumn = new SuperColumn(values[0]);
-        IColumn subColumn = new Column(values[1], value);
-        superColumn.addColumn(values[1], subColumn);
-		return superColumn;
+        return createColumn(name, value, 0);
 	}
-	
-	public IColumn createColumn(String name, byte[] value, long timestamp)
+
+    public IColumn createColumn(String name, byte[] value, long timestamp)
+    {
+        return createColumn(name, value, timestamp, false);
+    }
+
+    public IColumn createColumn(String name, byte[] value, long timestamp, boolean deleted)
 	{
 		String[] values = SuperColumnFactory.getSuperColumnAndColumn(name);
         if ( values.length != 2 )
             throw new IllegalArgumentException("Super Column " + name + " in invalid format. Must be in <super column name>:<column name> format.");
         IColumn superColumn = new SuperColumn(values[0]);
-        IColumn subColumn = new Column(values[1], value, timestamp);
+        IColumn subColumn = new Column(values[1], value, timestamp, deleted);
         superColumn.addColumn(values[1], subColumn);
 		return superColumn;
 	}
-    
+
     public ICompactSerializer2<IColumn> createColumnSerializer()
     {
         return SuperColumn.serializer();

Modified: incubator/cassandra/trunk/src/org/apache/cassandra/db/Column.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/Column.java?rev=758974&r1=758973&r2=758974&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/Column.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/Column.java Fri Mar 27 02:19:19 2009
@@ -18,60 +18,41 @@
 
 package org.apache.cassandra.db;
 
-import java.io.DataInput;
 import java.io.DataInputStream;
-import java.io.DataOutput;
 import java.io.DataOutputStream;
 import java.io.IOException;
-import java.io.Serializable;
 import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import org.apache.cassandra.io.DataInputBuffer;
-import org.apache.cassandra.io.IFileReader;
-import org.apache.cassandra.io.IFileWriter;
+
+import org.apache.commons.lang.ArrayUtils;
+
 import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.HashingSchemes;
-import org.apache.cassandra.utils.LogUtil;
-import org.apache.log4j.Logger;
 
 
 /**
- * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ * Column is immutable, which prevents all kinds of confusion in a multithreaded environment.
+ * (TODO: look at making SuperColumn immutable too.  This is trickier but is probably doable
+ *  with something like PCollections -- http://code.google.com
+ *
+ * Author : Avinash Lakshman ( alakshman@facebook.com ) & Prashant Malik ( pmalik@facebook.com )
  */
 
-public final class Column implements IColumn, Serializable
+public final class Column implements IColumn
 {
-	private static Logger logger_ = Logger.getLogger(SuperColumn.class);
-    private static ICompactSerializer2<IColumn> serializer_;
-	private final static String seperator_ = ":";
-    static
-    {
-        serializer_ = new ColumnSerializer();
-    }
+    private static ColumnSerializer serializer_ = new ColumnSerializer();
 
-    static ICompactSerializer2<IColumn> serializer()
+    static ColumnSerializer serializer()
     {
         return serializer_;
     }
 
-    private String name_;
-    private byte[] value_ = new byte[0];
-    private long timestamp_ = 0;
-
-    private transient AtomicBoolean isMarkedForDelete_;
-
-    /* CTOR for JAXB */
-    Column()
-    {
-    }
+    private final String name;
+    private final byte[] value;
+    private final long timestamp;
+    private final boolean isMarkedForDelete;
 
     Column(String name)
     {
-        name_ = name;
+        this(name, ArrayUtils.EMPTY_BYTE_ARRAY);
     }
 
     Column(String name, byte[] value)
@@ -81,54 +62,71 @@
 
     Column(String name, byte[] value, long timestamp)
     {
-        this(name);
-        value_ = value;
-        timestamp_ = timestamp;
+        this(name, value, timestamp, false);
+    }
+
+    Column(String name, byte[] value, long timestamp, boolean isDeleted)
+    {
+        assert name != null;
+        assert value != null;
+        this.name = name;
+        this.value = value;
+        this.timestamp = timestamp;
+        isMarkedForDelete = isDeleted;
     }
 
     public String name()
     {
-        return name_;
+        return name;
     }
 
-    public byte[] value()
+    public IColumn getSubColumn(String columnName)
     {
-        return value_;
+        throw new UnsupportedOperationException("This operation is unsupported on simple columns.");
     }
 
-    public byte[] value(String key)
+    public byte[] value()
     {
-    	throw new UnsupportedOperationException("This operation is unsupported on simple columns.");
+        return value;
     }
 
-    public Collection<IColumn> getSubColumns()
+    public byte[] value(String key)
     {
-    	throw new UnsupportedOperationException("This operation is unsupported on simple columns.");
+        throw new UnsupportedOperationException("This operation is unsupported on simple columns.");
     }
 
-    public IColumn getSubColumn( String columnName )
+    public Collection<IColumn> getSubColumns()
     {
-    	throw new UnsupportedOperationException("This operation is unsupported on simple columns.");
+        throw new UnsupportedOperationException("This operation is unsupported on simple columns.");
     }
 
     public int getObjectCount()
     {
-    	return 1;
+        return 1;
     }
 
     public long timestamp()
     {
-        return timestamp_;
+        return timestamp;
     }
 
     public long timestamp(String key)
     {
-    	throw new UnsupportedOperationException("This operation is unsupported on simple columns.");
+        throw new UnsupportedOperationException("This operation is unsupported on simple columns.");
     }
 
     public boolean isMarkedForDelete()
     {
-        return (isMarkedForDelete_ != null) ? isMarkedForDelete_.get() : false;
+        return isMarkedForDelete;
+    }
+
+    public long getMarkedForDeleteAt()
+    {
+        if (!isMarkedForDelete())
+        {
+            throw new IllegalStateException("column is not marked for delete");
+        }
+        return timestamp;
     }
 
     public int size()
@@ -142,11 +140,11 @@
          * + entire byte array.
         */
 
-    	/*
-    	 * We store the string as UTF-8 encoded, so when we calculate the length, it
-    	 * should be converted to UTF-8.
-    	 */
-        return IColumn.UtfPrefix_ + FBUtilities.getUTF8Length(name_) + DBConstants.boolSize_ + DBConstants.tsSize_ + DBConstants.intSize_ + value_.length;
+        /*
+           * We store the string as UTF-8 encoded, so when we calculate the length, it
+           * should be converted to UTF-8.
+           */
+        return IColumn.UtfPrefix_ + FBUtilities.getUTF8Length(name) + DBConstants.boolSize_ + DBConstants.tsSize_ + DBConstants.intSize_ + value.length;
     }
 
     /*
@@ -155,112 +153,43 @@
     */
     public int serializedSize()
     {
-    	return size();
+        return size();
     }
 
     public void addColumn(String name, IColumn column)
     {
-    	throw new UnsupportedOperationException("This operation is not supported for simple columns.");
-    }
-
-    public void delete()
-    {
-        if ( isMarkedForDelete_ == null )
-            isMarkedForDelete_ = new AtomicBoolean(true);
-        else
-            isMarkedForDelete_.set(true);
-    	value_ = new byte[0];
+        throw new UnsupportedOperationException("This operation is not supported for simple columns.");
     }
 
-    public void repair(IColumn column)
-    {
-    	if( timestamp() < column.timestamp() )
-    	{
-    		value_ = column.value();
-    		timestamp_ = column.timestamp();
-    	}
-    }
     public IColumn diff(IColumn column)
     {
-    	IColumn  columnDiff = null;
-    	if( timestamp() < column.timestamp() )
-    	{
-    		columnDiff = new Column(column.name(),column.value(),column.timestamp());
-    	}
-    	return columnDiff;
-    }
-
-    /*
-     * Resolve the column by comparing timestamps
-     * if a newer vaue is being input
-     * take the change else ignore .
-     *
-     */
-    public boolean putColumn(IColumn column)
-    {
-    	if ( !(column instanceof Column))
-    		throw new UnsupportedOperationException("Only Column objects should be put here");
-    	if( !name_.equals(column.name()))
-    		throw new IllegalArgumentException("The name should match the name of the current column or super column");
-    	if(timestamp_ <= column.timestamp())
-    	{
-            return true;
-    	}
-        return false;
+        if (timestamp() < column.timestamp())
+        {
+            return column;
+        }
+        return null;
     }
 
     public String toString()
     {
-    	StringBuilder sb = new StringBuilder();
-    	sb.append(name_);
-    	sb.append(":");
-    	sb.append(isMarkedForDelete());
-    	sb.append(":");
-    	sb.append(timestamp());
-    	sb.append(":");
-    	sb.append(value().length);
-    	sb.append(":");
-    	sb.append(value());
-    	sb.append(":");
-    	return sb.toString();
+        StringBuilder sb = new StringBuilder();
+        sb.append(name);
+        sb.append(":");
+        sb.append(isMarkedForDelete());
+        sb.append(":");
+        sb.append(value().length);
+        sb.append("@");
+        sb.append(timestamp());
+        return sb.toString();
     }
 
     public byte[] digest()
     {
-    	StringBuilder stringBuilder = new StringBuilder();
-  		stringBuilder.append(name_);
-  		stringBuilder.append(seperator_);
-  		stringBuilder.append(timestamp_);
-    	return stringBuilder.toString().getBytes();
-    }
-    
-    /**
-     * This method is basically implemented for Writable interface
-     * for M/R. 
-     */
-    public void readFields(DataInput in) throws IOException
-    {
-        name_ = in.readUTF();
-        boolean delete = in.readBoolean();
-        long ts = in.readLong();
-        int size = in.readInt();
-        byte[] value = new byte[size];
-        in.readFully(value);        
-        if ( delete )
-            delete();
-    }
-    
-    /**
-     * This method is basically implemented for Writable interface
-     * for M/R. 
-     */
-    public void write(DataOutput out) throws IOException
-    {
-        out.writeUTF(name_);
-        out.writeBoolean(isMarkedForDelete());
-        out.writeLong(timestamp_);
-        out.writeInt(value().length);
-        out.write(value());
+        StringBuilder stringBuilder = new StringBuilder();
+        stringBuilder.append(name);
+        stringBuilder.append(":");
+        stringBuilder.append(timestamp);
+        return stringBuilder.toString().getBytes();
     }
 
 }
@@ -284,9 +213,7 @@
         int size = dis.readInt();
         byte[] value = new byte[size];
         dis.readFully(value);
-        column = new Column(name, value, ts);
-        if ( delete )
-            column.delete();
+        column = new Column(name, value, ts, delete);
         return column;
     }
 
@@ -303,7 +230,7 @@
     {
         if ( dis.available() == 0 )
             return null;
-                
+
         String name = dis.readUTF();
         IColumn column = new Column(name);
         column = filter.filter(column, dis);
@@ -339,8 +266,8 @@
             	/*
             	 * If this is being called with identity filter
             	 * since a column name is passed in we know
-            	 * that this is a final call 
-            	 * Hence if the column is found set the filter to done 
+            	 * that this is a final call
+            	 * Hence if the column is found set the filter to done
             	 * so that we do not look for the column in further files
             	 */
             	IdentityFilter f = (IdentityFilter)filter;
@@ -368,5 +295,6 @@
         /* size of the column */
         int size = dis.readInt();
         dis.skip(size);
-    }    
+    }
 }
+

Modified: incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnFamily.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnFamily.java?rev=758974&r1=758973&r2=758974&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnFamily.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnFamily.java Fri Mar 27 02:19:19 2009
@@ -18,28 +18,29 @@
 
 package org.apache.cassandra.db;
 
-import java.io.DataInput;
 import java.io.DataInputStream;
-import java.io.DataOutput;
 import java.io.DataOutputStream;
 import java.io.IOException;
-import java.io.Serializable;
 import java.lang.reflect.Proxy;
-import java.util.*;
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedSet;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import org.apache.commons.lang.StringUtils;
+import org.apache.log4j.Logger;
+
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.HashingSchemes;
-import org.apache.log4j.Logger;
-import org.apache.cassandra.io.*;
 
 /**
  * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
  */
-
-public final class ColumnFamily implements Serializable
+public final class ColumnFamily
 {
     private static ICompactSerializer2<ColumnFamily> serializer_;
     public static final short utfPrefix_ = 2;   
@@ -56,7 +57,7 @@
         /* TODO: These are the various column types. Hard coded for now. */
         columnTypes_.put("Standard", "Standard");
         columnTypes_.put("Super", "Super");
-        
+
         indexTypes_.put("Name", "Name");
         indexTypes_.put("Time", "Time");
     }
@@ -94,9 +95,9 @@
 
     private String name_;
 
-    private  transient ICompactSerializer2<IColumn> columnSerializer_;
-    private transient AtomicBoolean isMarkedForDelete_;
-    private  AtomicInteger size_ = new AtomicInteger(0);
+    private transient ICompactSerializer2<IColumn> columnSerializer_;
+    private long markedForDeleteAt = Long.MIN_VALUE;
+    private AtomicInteger size_ = new AtomicInteger(0);
     private EfficientBidiMap columns_;
 
     private Comparator<IColumn> columnComparator_;
@@ -122,20 +123,16 @@
 
 		return columnComparator_;
 	}
-    
-    ColumnFamily()
-    {
-    }
 
-    public ColumnFamily(String cf)
+    public ColumnFamily(String cfName)
     {
-        name_ = cf;
+        name_ = cfName;
         createColumnFactoryAndColumnSerializer();
     }
 
-    public ColumnFamily(String cf, String columnType)
+    public ColumnFamily(String cfName, String columnType)
     {
-        name_ = cf;
+        this(cfName);
         createColumnFactoryAndColumnSerializer(columnType);
     }
 
@@ -168,7 +165,7 @@
     ColumnFamily cloneMe()
     {
     	ColumnFamily cf = new ColumnFamily(name_);
-    	cf.isMarkedForDelete_ = isMarkedForDelete_;
+    	cf.markedForDeleteAt = markedForDeleteAt;
     	cf.columns_ = columns_.cloneMe();
     	return cf;
     }
@@ -178,13 +175,14 @@
         return name_;
     }
 
-    /**
+    /*
      *  We need to go through each column
      *  in the column family and resolve it before adding
     */
     void addColumns(ColumnFamily cf)
     {
-        for (IColumn column : cf.getAllColumns()) {
+        for (IColumn column : cf.getAllColumns())
+        {
             addColumn(column);
         }
     }
@@ -197,7 +195,7 @@
 
     public void addColumn(String name)
     {
-        addColumn(columnFactory_.createColumn(name));
+    	addColumn(columnFactory_.createColumn(name));
     }
 
     int getColumnCount()
@@ -206,7 +204,7 @@
     	Map<String, IColumn> columns = columns_.getColumns();
     	if( columns != null )
     	{
-    		if(!DatabaseDescriptor.getColumnType(name_).equals("Super"))
+    		if(!isSuper())
     		{
     			count = columns.size();
     		}
@@ -222,14 +220,25 @@
     	return count;
     }
 
+    public boolean isSuper()
+    {
+        return DatabaseDescriptor.getColumnType(name_).equals("Super");
+    }
+
     public void addColumn(String name, byte[] value)
     {
     	addColumn(name, value, 0);
     }
 
     public void addColumn(String name, byte[] value, long timestamp)
+    {
+        addColumn(name, value, timestamp, false);
+    }
+
+    public void addColumn(String name, byte[] value, long timestamp, boolean deleted)
 	{
-        addColumn(columnFactory_.createColumn(name, value, timestamp));
+		IColumn column = columnFactory_.createColumn(name, value, timestamp, deleted);
+		addColumn(column);
     }
 
     void clear()
@@ -243,28 +252,28 @@
     */
     void addColumn(IColumn column)
     {
-    	int newSize = 0;
         String name = column.name();
         IColumn oldColumn = columns_.get(name);
-        if ( oldColumn != null )
+        if (oldColumn != null)
         {
-            int oldSize = oldColumn.size();
-            if( oldColumn.putColumn(column))
+            if (oldColumn instanceof SuperColumn)
             {
-            	// This will never be called for super column as put column always returns false.
-                columns_.put(name, column);
-            	newSize = column.size();
+                int oldSize = oldColumn.size();
+                ((SuperColumn) oldColumn).putColumn(column);
+                size_.addAndGet(oldColumn.size() - oldSize);
             }
             else
             {
-            	newSize = oldColumn.size();
+                if (oldColumn.timestamp() <= column.timestamp())
+                {
+                    columns_.put(name, column);
+                    size_.addAndGet(column.size());
+                }
             }
-            size_.addAndGet(newSize - oldSize);
         }
         else
         {
-            newSize = column.size();
-            size_.addAndGet(newSize);
+            size_.addAndGet(column.size());
             columns_.put(name, column);
         }
     }
@@ -274,7 +283,7 @@
         return columns_.get( name );
     }
 
-    public Collection<IColumn> getAllColumns()
+    public SortedSet<IColumn> getAllColumns()
     {
         return columns_.getSortedColumns();
     }
@@ -289,17 +298,14 @@
     	columns_.remove(columnName);
     }
 
-    void delete()
+    void delete(long timestamp)
     {
-        if ( isMarkedForDelete_ == null )
-            isMarkedForDelete_ = new AtomicBoolean(true);
-        else
-            isMarkedForDelete_.set(true);
+        markedForDeleteAt = timestamp;
     }
 
-    boolean isMarkedForDelete()
+    public boolean isMarkedForDelete()
     {
-        return ( ( isMarkedForDelete_ == null ) ? false : isMarkedForDelete_.get() );
+        return markedForDeleteAt > Long.MIN_VALUE;
     }
 
     /*
@@ -328,28 +334,8 @@
      */
     void repair(ColumnFamily columnFamily)
     {
-        Map<String, IColumn> columns = columnFamily.getColumns();
-        Set<String> cNames = columns.keySet();
-
-        for ( String cName : cNames )
-        {
-        	IColumn columnInternal = columns_.get(cName);
-        	IColumn columnExternal = columns.get(cName);
-
-        	if( columnInternal == null )
-        	{                
-        		if(DatabaseDescriptor.getColumnFamilyType(name_).equals(ColumnFamily.getColumnType("Super")))
-        		{
-        			columnInternal = new SuperColumn(columnExternal.name());
-        			columns_.put(cName, columnInternal);
-        		}
-        		if(DatabaseDescriptor.getColumnFamilyType(name_).equals(ColumnFamily.getColumnType("Standard")))
-        		{
-        			columnInternal = columnExternal;
-        			columns_.put(cName, columnInternal);
-        		}
-        	}
-       		columnInternal.repair(columnExternal);
+        for (IColumn column : columnFamily.getAllColumns()) {
+            addColumn(column);
         }
     }
 
@@ -417,193 +403,174 @@
     public String toString()
     {
     	StringBuilder sb = new StringBuilder();
+        sb.append("ColumnFamily(");
     	sb.append(name_);
-    	sb.append(":");
-    	sb.append(isMarkedForDelete());
-    	sb.append(":");
-    	Collection<IColumn> columns = getAllColumns();
-        sb.append(columns.size());
-        sb.append(":");
 
-        for ( IColumn column : columns )
-        {
-            sb.append(column.toString());
+        if (isMarkedForDelete()) {
+            sb.append(" -delete at " + getMarkedForDeleteAt() + "-");
         }
-        sb.append(":");
+
+    	sb.append(" [");
+        sb.append(StringUtils.join(getAllColumns(), ", "));
+        sb.append("])");
+
     	return sb.toString();
     }
 
     public byte[] digest()
     {
     	Set<IColumn> columns = columns_.getSortedColumns();
-    	byte[] xorHash = new byte[0];
-    	byte[] tmpHash = new byte[0];
+    	byte[] xorHash = null;
     	for(IColumn column : columns)
     	{
-    		if(xorHash.length == 0)
+    		if(xorHash == null)
     		{
     			xorHash = column.digest();
     		}
     		else
     		{
-    			tmpHash = column.digest();
-    			xorHash = FBUtilities.xor(xorHash, tmpHash);
+                xorHash = FBUtilities.xor(xorHash, column.digest());
     		}
     	}
     	return xorHash;
     }
-}
 
-class ColumnFamilySerializer implements ICompactSerializer2<ColumnFamily>
-{
-	/*
-	 * We are going to create indexes, and write out that information as well. The format
-	 * of the data serialized is as follows.
-	 *
-	 * 1) Without indexes:
-     *  // written by the data
-	 * 	<boolean false (index is not present)>
-	 * 	<column family id>
-	 * 	<is marked for delete>
-	 * 	<total number of columns>
-	 * 	<columns data>
-
-	 * 	<boolean true (index is present)>
-	 *
-	 *  This part is written by the column indexer
-	 * 	<size of index in bytes>
-	 * 	<list of column names and their offsets relative to the first column>
-	 *
-	 *  <size of the cf in bytes>
-	 * 	<column family id>
-	 * 	<is marked for delete>
-	 * 	<total number of columns>
-	 * 	<columns data>
-	*/
-    public void serialize(ColumnFamily columnFamily, DataOutputStream dos) throws IOException
-    {
-    	Collection<IColumn> columns = columnFamily.getAllColumns();
-
-        /* write the column family id */
-        dos.writeUTF(columnFamily.name());
-        /* write if this cf is marked for delete */
-        dos.writeBoolean(columnFamily.isMarkedForDelete());
-    	/* write the size is the number of columns */
-        dos.writeInt(columns.size());                    
-        /* write the column data */
-    	for ( IColumn column : columns )
-        {
-            columnFamily.getColumnSerializer().serialize(column, dos);            
-        }
+    public long getMarkedForDeleteAt() {
+        return markedForDeleteAt;
     }
 
-    /*
-     * Use this method to create a bare bones Column Family. This column family
-     * does not have any of the Column information.
-    */
-    private ColumnFamily defreezeColumnFamily(DataInputStream dis) throws IOException
+    public static class ColumnFamilySerializer implements ICompactSerializer2<ColumnFamily>
     {
-        String name = dis.readUTF();
-        boolean delete = dis.readBoolean();
-        ColumnFamily cf = new ColumnFamily(name);
-        if ( delete )
-            cf.delete();
-        return cf;
-    }
+        /*
+         * We are going to create indexes, and write out that information as well. The format
+         * of the data serialized is as follows.
+         *
+         * 1) Without indexes:
+         *  // written by the data
+         * 	<boolean false (index is not present)>
+         * 	<column family id>
+         * 	<is marked for delete>
+         * 	<total number of columns>
+         * 	<columns data>
+
+         * 	<boolean true (index is present)>
+         *
+         *  This part is written by the column indexer
+         * 	<size of index in bytes>
+         * 	<list of column names and their offsets relative to the first column>
+         *
+         *  <size of the cf in bytes>
+         * 	<column family id>
+         * 	<is marked for delete>
+         * 	<total number of columns>
+         * 	<columns data>
+        */
+        public void serialize(ColumnFamily columnFamily, DataOutputStream dos) throws IOException
+        {
+            Collection<IColumn> columns = columnFamily.getAllColumns();
+
+            /* write the column family id */
+            dos.writeUTF(columnFamily.name());
+            /* write if this cf is marked for delete */
+            dos.writeLong(columnFamily.getMarkedForDeleteAt());
 
-    /*
-     * This method fills the Column Family object with the column information
-     * from the DataInputStream. The "items" parameter tells us whether we need
-     * all the columns or just a subset of all the Columns that make up the
-     * Column Family. If "items" is -1 then we need all the columns if not we
-     * deserialize only as many columns as indicated by the "items" parameter.
-    */
-    private void fillColumnFamily(ColumnFamily cf,  DataInputStream dis) throws IOException
-    {
-        int size = dis.readInt();        	        	
-    	IColumn column = null;           
-        for ( int i = 0; i < size; ++i )
-        {
-        	column = cf.getColumnSerializer().deserialize(dis);
-        	if(column != null)
-        	{
-        		cf.addColumn(column);
-        	}
+            /* write the size is the number of columns */
+            dos.writeInt(columns.size());
+
+            /* write the column data */
+            for ( IColumn column : columns )
+            {
+                columnFamily.getColumnSerializer().serialize(column, dos);
+            }
         }
-    }
 
-    public ColumnFamily deserialize(DataInputStream dis) throws IOException
-    {       
-        ColumnFamily cf = defreezeColumnFamily(dis);
-        if ( !cf.isMarkedForDelete() )
-            fillColumnFamily(cf,dis);
-        return cf;
-    }
+        /*
+         * Use this method to create a bare bones Column Family. This column family
+         * does not have any of the Column information.
+        */
+        private ColumnFamily defreezeColumnFamily(DataInputStream dis) throws IOException
+        {
+            String name = dis.readUTF();
+            ColumnFamily cf = new ColumnFamily(name);
+            cf.delete(dis.readLong());
+            return cf;
+        }
+
+        public ColumnFamily deserialize(DataInputStream dis) throws IOException
+        {
+            ColumnFamily cf = defreezeColumnFamily(dis);
+            int size = dis.readInt();
+            IColumn column = null;
+            for ( int i = 0; i < size; ++i )
+            {
+                column = cf.getColumnSerializer().deserialize(dis);
+                if(column != null)
+                {
+                    cf.addColumn(column);
+                }
+            }
+            return cf;
+        }
 
-    /*
-     * This version of deserialize is used when we need a specific set if columns for
-     * a column family specified in the name cfName parameter.
-    */
-    public ColumnFamily deserialize(DataInputStream dis, IFilter filter) throws IOException
-    {        
-        ColumnFamily cf = defreezeColumnFamily(dis);
-        if ( !cf.isMarkedForDelete() )
-        {
-            int size = dis.readInt();        	        	
-        	IColumn column = null;
+        /*
+         * This version of deserialize is used when we need a specific set if columns for
+         * a column family specified in the name cfName parameter.
+        */
+        public ColumnFamily deserialize(DataInputStream dis, IFilter filter) throws IOException
+        {
+            ColumnFamily cf = defreezeColumnFamily(dis);
+            int size = dis.readInt();
+            IColumn column = null;
             for ( int i = 0; i < size; ++i )
             {
-            	column = cf.getColumnSerializer().deserialize(dis, filter);
-            	if(column != null)
-            	{
-            		cf.addColumn(column);
-            		column = null;
-            		if(filter.isDone())
-            		{
-            			break;
-            		}
-            	}
+                column = cf.getColumnSerializer().deserialize(dis, filter);
+                if(column != null)
+                {
+                    cf.addColumn(column);
+                    column = null;
+                    if(filter.isDone())
+                    {
+                        break;
+                    }
+                }
             }
+            return cf;
         }
-        return cf;
-    }
 
-    /*
-     * Deserialize a particular column or super column or the entire columnfamily given a : seprated name
-     * name could be of the form cf:superColumn:column  or cf:column or cf
-     */
-    public ColumnFamily deserialize(DataInputStream dis, String name, IFilter filter) throws IOException
-    {        
-        String[] names = RowMutation.getColumnAndColumnFamily(name);
-        String columnName = "";
-        if ( names.length == 1 )
-            return deserialize(dis, filter);
-        if( names.length == 2 )
-            columnName = names[1];
-        if( names.length == 3 )
-            columnName = names[1]+ ":" + names[2];
+        /*
+         * Deserialize a particular column or super column or the entire columnfamily given a : seprated name
+         * name could be of the form cf:superColumn:column  or cf:column or cf
+         */
+        public ColumnFamily deserialize(DataInputStream dis, String name, IFilter filter) throws IOException
+        {
+            String[] names = RowMutation.getColumnAndColumnFamily(name);
+            String columnName = "";
+            if ( names.length == 1 )
+                return deserialize(dis, filter);
+            if( names.length == 2 )
+                columnName = names[1];
+            if( names.length == 3 )
+                columnName = names[1]+ ":" + names[2];
 
-        ColumnFamily cf = defreezeColumnFamily(dis);
-        if ( !cf.isMarkedForDelete() )
-        {
+            ColumnFamily cf = defreezeColumnFamily(dis);
             /* read the number of columns */
-            int size = dis.readInt();            
+            int size = dis.readInt();
             for ( int i = 0; i < size; ++i )
             {
-	            IColumn column = cf.getColumnSerializer().deserialize(dis, columnName, filter);
-	            if ( column != null )
-	            {
-	                cf.addColumn(column);
-	                break;
-	            }
+                IColumn column = cf.getColumnSerializer().deserialize(dis, columnName, filter);
+                if ( column != null )
+                {
+                    cf.addColumn(column);
+                    break;
+                }
             }
+            return cf;
         }
-        return cf;
-    }
 
-    public void skip(DataInputStream dis) throws IOException
-    {
-        throw new UnsupportedOperationException("This operation is not yet supported.");
+        public void skip(DataInputStream dis) throws IOException
+        {
+            throw new UnsupportedOperationException("This operation is not yet supported.");
+        }
     }
-
 }
+

Modified: incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnFamilyStore.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnFamilyStore.java?rev=758974&r1=758973&r2=758974&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnFamilyStore.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnFamilyStore.java Fri Mar 27 02:19:19 2009
@@ -615,17 +615,6 @@
     }
 
     /*
-     * Delete doesn't mean we can blindly delete. We need to write this to disk
-     * as being marked for delete. This is to prevent a previous value from
-     * resuscitating a column family that has been deleted.
-     */
-    void delete(String key, ColumnFamily columnFamily)
-            throws IOException
-    {
-        memtable_.get().remove(key, columnFamily);
-    }
-
-    /*
      * This method is called when the Memtable is frozen and ready to be flushed
      * to disk. This method informs the CommitLog that a particular ColumnFamily
      * is being flushed to disk.

Modified: incubator/cassandra/trunk/src/org/apache/cassandra/db/IColumn.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/IColumn.java?rev=758974&r1=758973&r2=758974&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/IColumn.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/IColumn.java Fri Mar 27 02:19:19 2009
@@ -29,6 +29,7 @@
 {
     public static short UtfPrefix_ = 2;
     public boolean isMarkedForDelete();
+    public long getMarkedForDeleteAt();
     public String name();
     public int size();
     public int serializedSize();
@@ -39,10 +40,7 @@
     public Collection<IColumn> getSubColumns();
     public IColumn getSubColumn(String columnName);
     public void addColumn(String name, IColumn column);
-    public void delete();
-    public void repair(IColumn column);
     public IColumn diff(IColumn column);
-    public boolean putColumn(IColumn column);
     public int getObjectCount();
-    public byte[] digest();    
+    public byte[] digest();
 }

Modified: incubator/cassandra/trunk/src/org/apache/cassandra/db/Memtable.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/Memtable.java?rev=758974&r1=758973&r2=758974&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/Memtable.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/Memtable.java Fri Mar 27 02:19:19 2009
@@ -143,24 +143,6 @@
         }
     }
 
-    class Remover implements Runnable
-    {
-        private String key_;
-        private ColumnFamily columnFamily_;
-
-        Remover(String key, ColumnFamily columnFamily)
-        {
-            key_ = key;
-            columnFamily_ = columnFamily;
-        }
-
-        public void run()
-        {
-        	columnFamily_.delete();
-            columnFamilies_.put(key_, columnFamily_);
-        }
-    }
-    
     /**
      * Flushes the current memtable to disk.
      * 
@@ -426,19 +408,6 @@
     	return cf;
     }
 
-    /*
-     * Although the method is named remove() we cannot remove the key
-     * from memtable. We add it to the memtable but mark it as deleted.
-     * The reason for this because we do not want a successive get()
-     * for the same key to scan the ColumnFamilyStore files for this key.
-    */
-    void remove(String key, ColumnFamily columnFamily) throws IOException
-    {
-    	printExecutorStats();
-    	Runnable deleter = new Remover(key, columnFamily);
-    	apartments_.get(cfName_).submit(deleter);
-    }
-
     void flush(CommitLog.CommitLogContext cLogCtx) throws IOException
     {
         ColumnFamilyStore cfStore = Table.open(table_).getColumnFamilyStore(cfName_);

Modified: incubator/cassandra/trunk/src/org/apache/cassandra/db/RowMutation.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/RowMutation.java?rev=758974&r1=758973&r2=758974&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/RowMutation.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/RowMutation.java Fri Mar 27 02:19:19 2009
@@ -24,6 +24,8 @@
 import java.io.IOException;
 import java.io.Serializable;
 
+import org.apache.commons.lang.ArrayUtils;
+
 import org.apache.cassandra.io.ICompactSerializer;
 
 
@@ -166,39 +168,23 @@
         }
         modifications_.put(values[0], columnFamily);
     }
-    
+
     /*
      * Specify a column name to be deleted. Column name is
      * specified as <column family>:column. This will result
      * in a ColumnFamily associated with <column family> as
      * name and perhaps Column with <column> as name being
      * marked as deleted.
-     * TODO : Delete is NOT correct as we do not know 
+     * TODO : Delete is NOT correct as we do not know
      * the CF type so we need to fix that.
-     * param @ cf - column name as <column family>:<column>     
+     * param @ cf - column name as <column family>:<column>
     */
-    public void delete(String cf)
-    {        
-        String[] values = RowMutation.getColumnAndColumnFamily(cf);
-        
-        if ( values.length == 0 || values.length > 3 )
-            throw new IllegalArgumentException("Column Family " + cf + " in invalid format. Must be in <column family>:<column> format.");
-     
-        ColumnFamily columnFamily = modifications_.get(values[0]);
-        if ( columnFamily == null )
-            columnFamily = new ColumnFamily(values[0]);
-        if(values.length == 2 )
-        {
-	        columnFamily.addColumn( values[1]);
-        }
-        if(values.length == 3 )
-        {
-	        columnFamily.addColumn( values[1] + ":" + values[2]);
-        }
-        deletions_.put(values[0], columnFamily);
+    public void delete(String columnFamilyColumn)
+    {
+        throw new UnsupportedOperationException();
     }
-    
-    /* 
+
+    /*
      * This is equivalent to calling commit. Applies the changes to
      * to the table that is obtained by calling Table.open().
     */
@@ -214,16 +200,6 @@
             row.addColumnFamily( modifications_.get(cfName) );            
         }
         table.apply(row);
-                
-        Set<String> cfNames2 = deletions_.keySet();
-        for (String cfName : cfNames2 )
-        {    
-            if ( !table.isValidColumnFamily(cfName) )
-                throw new ColumnFamilyNotDefinedException("Column Family " + cfName + " has not been defined.");
-            row.addColumnFamily( deletions_.get(cfName) );        
-        }
-        if ( deletions_.size() > 0 )
-            table.delete(row);
     }
     
     /* 
@@ -241,16 +217,6 @@
             row.addColumnFamily( modifications_.get(cfName) );            
         }
         table.apply(row);
-                
-        Set<String> cfNames2 = deletions_.keySet();
-        for (String cfName : cfNames2 )
-        {    
-            if ( !table.isValidColumnFamily(cfName) )
-                throw new ColumnFamilyNotDefinedException("Column Family " + cfName + " has not been defined.");
-            row.addColumnFamily( deletions_.get(cfName) );        
-        }
-        if ( deletions_.size() > 0 )
-            table.delete(row);
     }
     
     /* 

Modified: incubator/cassandra/trunk/src/org/apache/cassandra/db/SuperColumn.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/SuperColumn.java?rev=758974&r1=758973&r2=758974&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/SuperColumn.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/SuperColumn.java Fri Mar 27 02:19:19 2009
@@ -18,22 +18,20 @@
 
 package org.apache.cassandra.db;
 
-import java.io.DataInput;
 import java.io.DataInputStream;
-import java.io.DataOutput;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.io.Serializable;
-import java.io.UnsupportedEncodingException;
-import java.util.*;
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.Collection;
+import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
 
-import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.HashingSchemes;
-import org.apache.cassandra.utils.LogUtil;
+import org.apache.commons.lang.ArrayUtils;
+import org.apache.commons.lang.StringUtils;
 import org.apache.log4j.Logger;
 
+import org.apache.cassandra.utils.FBUtilities;
+
 /**
  * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
  */
@@ -56,7 +54,7 @@
 
 	private String name_;
     private EfficientBidiMap columns_ = new EfficientBidiMap(ColumnComparatorFactory.getComparator(ColumnComparatorFactory.ComparatorType.TIMESTAMP));
-	private AtomicBoolean isMarkedForDelete_ = new AtomicBoolean(false);
+	private long markedForDeleteAt = Long.MIN_VALUE;
     private AtomicInteger size_ = new AtomicInteger(0);
 
     SuperColumn()
@@ -70,7 +68,7 @@
 
 	public boolean isMarkedForDelete()
 	{
-		return isMarkedForDelete_.get();
+		return markedForDeleteAt > Long.MIN_VALUE;
 	}
 
     public String name()
@@ -83,12 +81,11 @@
     	return columns_.getSortedColumns();
     }
 
-    public IColumn getSubColumn( String columnName )
+    public IColumn getSubColumn(String columnName)
     {
-    	IColumn column = columns_.get(columnName);
-    	if ( column instanceof SuperColumn )
-    		throw new UnsupportedOperationException("A super column cannot hold other super columns.");
-		return column;
+        IColumn column = columns_.get(columnName);
+        assert column instanceof Column;
+        return column;
     }
 
     public int compareTo(IColumn superColumn)
@@ -148,7 +145,7 @@
         return size;
     }
 
-    protected void remove(String columnName)
+    public void remove(String columnName)
     {
     	columns_.remove(columnName);
     }
@@ -176,8 +173,6 @@
     public byte[] value(String key)
     {
     	IColumn column = columns_.get(key);
-    	if ( column instanceof SuperColumn )
-    		throw new UnsupportedOperationException("A super column cannot hold other super columns.");
     	if ( column != null )
     		return column.value();
     	throw new IllegalArgumentException("Value was requested for a column that does not exist.");
@@ -211,19 +206,18 @@
      * Go through each sub column if it exists then as it to resolve itself
      * if the column does not exist then create it.
      */
-    public boolean putColumn(IColumn column)
+    public void putColumn(IColumn column)
     {
     	if ( !(column instanceof SuperColumn))
     		throw new UnsupportedOperationException("Only Super column objects should be put here");
     	if( !name_.equals(column.name()))
     		throw new IllegalArgumentException("The name should match the name of the current column or super column");
-    	Collection<IColumn> columns = column.getSubColumns();
 
-        for ( IColumn subColumn : columns )
+        for (IColumn subColumn : column.getSubColumns())
         {
-       		addColumn(subColumn.name(), subColumn);
+        	addColumn(subColumn.name(), subColumn);
         }
-        return false;
+        markedForDeleteAt = Math.max(markedForDeleteAt, column.getMarkedForDeleteAt());
     }
 
     public int getObjectCount()
@@ -231,10 +225,8 @@
     	return 1 + columns_.size();
     }
 
-    public void delete()
-    {
-    	columns_.clear();
-    	isMarkedForDelete_.set(true);
+    public long getMarkedForDeleteAt() {
+        return markedForDeleteAt;
     }
 
     int getColumnCount()
@@ -242,21 +234,6 @@
     	return columns_.size();
     }
 
-    public void repair(IColumn column)
-    {
-    	Collection<IColumn> columns = column.getSubColumns();
-
-        for ( IColumn subColumn : columns )
-        {
-        	IColumn columnInternal = columns_.get(subColumn.name());
-        	if( columnInternal == null )
-        		columns_.put(subColumn.name(), subColumn);
-        	else
-        		columnInternal.repair(subColumn);
-        }
-    }
-
-
     public IColumn diff(IColumn column)
     {
     	IColumn  columnDiff = new SuperColumn(column.name());
@@ -287,7 +264,7 @@
     public byte[] digest()
     {
     	Set<IColumn> columns = columns_.getSortedColumns();
-    	byte[] xorHash = new byte[0];
+    	byte[] xorHash = ArrayUtils.EMPTY_BYTE_ARRAY;
     	if(name_ == null)
     		return xorHash;
     	xorHash = name_.getBytes();
@@ -302,23 +279,23 @@
     public String toString()
     {
     	StringBuilder sb = new StringBuilder();
+        sb.append("SuperColumn(");
     	sb.append(name_);
-    	sb.append(":");
-        sb.append(isMarkedForDelete());
-        sb.append(":");
-
-        Collection<IColumn> columns  = getSubColumns();
-        sb.append(columns.size());
-        sb.append(":");
-        sb.append(size());
-        sb.append(":");
-        for ( IColumn subColumn : columns )
-        {
-            sb.append(subColumn.toString());
+
+        if (isMarkedForDelete()) {
+            sb.append(" -delete at " + getMarkedForDeleteAt() + "-");
         }
-        sb.append(":");
+
+        sb.append(" [");
+        sb.append(StringUtils.join(getSubColumns(), ", "));
+        sb.append("])");
+
         return sb.toString();
     }
+
+    public void markForDeleteAt(long timestamp) {
+        this.markedForDeleteAt = timestamp;
+    }
 }
 
 class SuperColumnSerializer implements ICompactSerializer2<IColumn>
@@ -327,7 +304,7 @@
     {
     	SuperColumn superColumn = (SuperColumn)column;
         dos.writeUTF(superColumn.name());
-        dos.writeBoolean(superColumn.isMarkedForDelete());
+        dos.writeLong(superColumn.getMarkedForDeleteAt());
 
         Collection<IColumn> columns  = column.getSubColumns();
         int size = columns.size();
@@ -354,18 +331,15 @@
     private SuperColumn defreezeSuperColumn(DataInputStream dis) throws IOException
     {
         String name = dis.readUTF();
-        boolean delete = dis.readBoolean();
         SuperColumn superColumn = new SuperColumn(name);
-        if ( delete )
-            superColumn.delete();
+        superColumn.markForDeleteAt(dis.readLong());
         return superColumn;
     }
 
     public IColumn deserialize(DataInputStream dis) throws IOException
     {
         SuperColumn superColumn = defreezeSuperColumn(dis);
-        if ( !superColumn.isMarkedForDelete() )
-            fillSuperColumn(superColumn, dis);
+        fillSuperColumn(superColumn, dis);
         return superColumn;
     }
 
@@ -378,12 +352,11 @@
         int size = dis.readInt();
         dis.skip(size);
     }
-    
+
     private void fillSuperColumn(IColumn superColumn, DataInputStream dis) throws IOException
     {
-        if ( dis.available() == 0 )
-            return;
-        
+        assert dis.available() != 0;
+
         /* read the number of columns */
         int size = dis.readInt();
         /* read the size of all columns */
@@ -399,13 +372,12 @@
     {
         if ( dis.available() == 0 )
             return null;
-        
+
         IColumn superColumn = defreezeSuperColumn(dis);
         superColumn = filter.filter(superColumn, dis);
         if(superColumn != null)
         {
-            if ( !superColumn.isMarkedForDelete() )
-                fillSuperColumn(superColumn, dis);
+            fillSuperColumn(superColumn, dis);
             return superColumn;
         }
         else
@@ -427,32 +399,29 @@
     {
         if ( dis.available() == 0 )
             return null;
-        
+
         String[] names = RowMutation.getColumnAndColumnFamily(name);
         if ( names.length == 1 )
         {
             IColumn superColumn = defreezeSuperColumn(dis);
             if(name.equals(superColumn.name()))
             {
-                if ( !superColumn.isMarkedForDelete() )
+                /* read the number of columns stored */
+                int size = dis.readInt();
+                /* read the size of all columns */
+                dis.readInt();
+                IColumn column = null;
+                for ( int i = 0; i < size; ++i )
                 {
-                    /* read the number of columns stored */
-                    int size = dis.readInt();
-                    /* read the size of all columns */
-                    dis.readInt();     
-                	IColumn column = null;
-                    for ( int i = 0; i < size; ++i )
+                    column = Column.serializer().deserialize(dis, filter);
+                    if(column != null)
                     {
-                    	column = Column.serializer().deserialize(dis, filter);
-                    	if(column != null)
-                    	{
-                            superColumn.addColumn(column.name(), column);
-                    		column = null;
-                    		if(filter.isDone())
-                    		{
-                    			break;
-                    		}
-                    	}
+                        superColumn.addColumn(column.name(), column);
+                        column = null;
+                        if(filter.isDone())
+                        {
+                            break;
+                        }
                     }
                 }
                 return superColumn;

Modified: incubator/cassandra/trunk/src/org/apache/cassandra/db/Table.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/Table.java?rev=758974&r1=758973&r2=758974&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/Table.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/Table.java Fri Mar 27 02:19:19 2009
@@ -855,24 +855,6 @@
         }
     }
 
-
-    void delete(Row row) throws IOException
-    {
-        String key = row.key();
-        Map<String, ColumnFamily> columnFamilies = row.getColumnFamilyMap();
-
-        /* Add row to commit log */
-        CommitLog.open(table_).add(row);
-        Set<String> cNames = columnFamilies.keySet();
-
-        for ( String cName : cNames )
-        {
-        	ColumnFamily columnFamily = columnFamilies.get(cName);
-            ColumnFamilyStore cfStore = columnFamilyStores_.get(columnFamily.name());
-            cfStore.delete( key, columnFamily );
-        }
-    }
-
     void load(Row row) throws IOException
     {
         String key = row.key();

Modified: incubator/cassandra/trunk/src/org/apache/cassandra/io/SequenceFile.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/io/SequenceFile.java?rev=758974&r1=758973&r2=758974&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/io/SequenceFile.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/io/SequenceFile.java Fri Mar 27 02:19:19 2009
@@ -1179,8 +1179,8 @@
                         dataSize -= (utfPrefix_ + cfName.length());
 
                         /* read if this cf is marked for delete */
-                        boolean markedForDelete = file_.readBoolean();
-                        dataSize -= 1;
+                        long markedForDeleteAt = file_.readLong();
+                        dataSize -= 8;
 
                         /* read the total number of columns */
                         int totalNumCols = file_.readInt();
@@ -1213,7 +1213,7 @@
                         /* write the column family name */
                         bufOut.writeUTF(cfName);
                         /* write if this cf is marked for delete */
-                        bufOut.writeBoolean(markedForDelete);
+                        bufOut.writeLong(markedForDeleteAt);
                         /* write number of columns */
                         bufOut.writeInt(numColsReturned);
                         int prevPosition = 0;

Modified: incubator/cassandra/trunk/src/org/apache/cassandra/service/CassandraServer.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/service/CassandraServer.java?rev=758974&r1=758973&r2=758974&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/service/CassandraServer.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/service/CassandraServer.java Fri Mar 27 02:19:19 2009
@@ -563,20 +563,6 @@
 					}
 				}
 			}
-			if(batchMutation.cfmapdel != null)
-			{
-				Set keys = batchMutation.cfmapdel.keySet();
-				Iterator keyIter = keys.iterator();
-				while (keyIter.hasNext())
-				{
-					Object key = keyIter.next(); // Get the next key.
-					List<column_t> list = batchMutation.cfmapdel.get(key);
-					for (column_t columnData : list)
-					{
-						rm.delete(key.toString() + ":" + columnData.columnName);
-					}
-				}            
-			}
 			StorageProxy.insert(rm);
 		}
 		catch (Exception e)
@@ -588,18 +574,7 @@
 
     public void remove(String tablename, String key, String columnFamily_column)
 	{
-		try
-		{
-			validateTable(tablename);
-			RowMutation rm = new RowMutation(tablename, key.trim());
-			rm.delete(columnFamily_column);
-            StorageProxy.insert(rm);
-		}
-		catch (Exception e)
-		{
-			logger_.debug( LogUtil.throwableToString(e) );
-		}
-		return;
+		throw new UnsupportedOperationException();
 	}
 
     public List<superColumn_t> get_slice_super_by_names(String tablename, String key, String columnFamily, List<String> superColumnNames) throws CassandraException, TException
@@ -880,30 +855,6 @@
 					}
 				} 
 			}
-			if(batchMutationSuper.cfmapdel != null)
-			{
-				Set keys = batchMutationSuper.cfmapdel.keySet();
-				Iterator keyIter = keys.iterator();
-				while (keyIter.hasNext())
-				{
-					Object key = keyIter.next(); // Get the next key.
-					List<superColumn_t> list = batchMutationSuper.cfmapdel.get(key);
-					for (superColumn_t superColumnData : list)
-					{
-						if(superColumnData.columns.size() != 0 )
-						{
-							for (column_t columnData : superColumnData.columns)
-							{
-								rm.delete(key.toString() + ":" + superColumnData.name  +":" + columnData.columnName);
-							}
-						}
-						else
-						{
-							rm.delete(key.toString() + ":" + superColumnData.name);
-						}
-					}
-				} 
-			}
             StorageProxy.insert(rm);
 		}
 		catch (Exception e)



Mime
View raw message