cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jbel...@apache.org
Subject svn commit: r799331 [7/29] - in /incubator/cassandra/trunk: ./ src/java/org/apache/cassandra/concurrent/ src/java/org/apache/cassandra/config/ src/java/org/apache/cassandra/db/ src/java/org/apache/cassandra/dht/ src/java/org/apache/cassandra/gms/ src/j...
Date Thu, 30 Jul 2009 15:30:27 GMT
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java?rev=799331&r1=799330&r2=799331&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java Thu Jul 30 15:30:21 2009
@@ -1,280 +1,280 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.db;
-
-import java.util.Collection;
-import java.util.Set;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-import java.io.IOException;
-
-import org.apache.log4j.Logger;
-
-import org.apache.cassandra.concurrent.DebuggableScheduledThreadPoolExecutor;
-import org.apache.cassandra.concurrent.ThreadFactoryImpl;
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.gms.FailureDetector;
-import org.apache.cassandra.net.EndPoint;
-import org.apache.cassandra.net.Message;
-import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.service.*;
-import org.apache.cassandra.db.filter.IdentityQueryFilter;
-import org.apache.cassandra.db.filter.QueryPath;
-
-
-/**
- * There are two ways hinted data gets delivered to the intended nodes.
- *
- * runHints() runs periodically and pushes the hinted data on this node to
- * every intended node.
- *
- * runDelieverHints() is called when some other node starts up (potentially
- * from a failure) and delivers the hinted data just to that node.
- */
-public class HintedHandOffManager
-{
-    private static HintedHandOffManager instance_;
-    private static Lock lock_ = new ReentrantLock();
-    private static Logger logger_ = Logger.getLogger(HintedHandOffManager.class);
-    final static long intervalInMins_ = 60;
-    private ScheduledExecutorService executor_ = new DebuggableScheduledThreadPoolExecutor(1, new ThreadFactoryImpl("HINTED-HANDOFF-POOL"));
-    public static final String HINTS_CF = "HintsColumnFamily";
-
-
-    public static HintedHandOffManager instance()
-    {
-        if ( instance_ == null )
-        {
-            lock_.lock();
-            try
-            {
-                if ( instance_ == null )
-                    instance_ = new HintedHandOffManager();
-            }
-            finally
-            {
-                lock_.unlock();
-            }
-        }
-        return instance_;
-    }
-
-    private static boolean sendMessage(String endpointAddress, String tableName, String key) throws DigestMismatchException, TimeoutException, IOException, InvalidRequestException
-    {
-        EndPoint endPoint = new EndPoint(endpointAddress, DatabaseDescriptor.getStoragePort());
-        if (!FailureDetector.instance().isAlive(endPoint))
-        {
-            return false;
-        }
-
-        Table table = Table.open(tableName);
-        Row row = table.get(key);
-        Row purgedRow = new Row(tableName,key);
-        for (ColumnFamily cf : row.getColumnFamilies())
-        {
-            purgedRow.addColumnFamily(ColumnFamilyStore.removeDeleted(cf));
-        }
-        RowMutation rm = new RowMutation(tableName, purgedRow);
-        Message message = rm.makeRowMutationMessage();
-        QuorumResponseHandler<Boolean> quorumResponseHandler = new QuorumResponseHandler<Boolean>(1, new WriteResponseResolver());
-        MessagingService.getMessagingInstance().sendRR(message, new EndPoint[]{ endPoint }, quorumResponseHandler);
-
-        return quorumResponseHandler.get();
-    }
-
-    private static void deleteEndPoint(byte[] endpointAddress, String tableName, byte[] key, long timestamp) throws IOException
-    {
-        RowMutation rm = new RowMutation(Table.SYSTEM_TABLE, tableName);
-        rm.delete(new QueryPath(HINTS_CF, key, endpointAddress), timestamp);
-        rm.apply();
-    }
-
-    private static void deleteHintedData(String tableName, String key) throws IOException
-    {
-        // delete the row from Application CFs: find the largest timestamp in any of
-        // the data columns, and delete the entire CF with that value for the tombstone.
-
-        // Note that we delete all data associated with the key: this may be more than
-        // we sent earlier in sendMessage, since HH is not serialized with writes.
-        // This is sub-optimal but okay, since HH is just an effort to make a recovering
-        // node more consistent than it would have been; we can rely on the other
-        // consistency mechanisms to finish the job in this corner case.
-        RowMutation rm = new RowMutation(tableName, key);
-        Table table = Table.open(tableName);
-        Row row = table.get(key); // not necessary to do removeDeleted here
-        Collection<ColumnFamily> cfs = row.getColumnFamilies();
-        for (ColumnFamily cf : cfs)
-        {
-            long maxTS = Long.MIN_VALUE;
-            if (!cf.isSuper())
-            {
-                for (IColumn col : cf.getSortedColumns())
-                    maxTS = Math.max(maxTS, col.timestamp());
-            }
-            else
-            {
-                for (IColumn col : cf.getSortedColumns())
-                {
-                    maxTS = Math.max(maxTS, col.timestamp());
-                    Collection<IColumn> subColumns = col.getSubColumns();
-                    for (IColumn subCol : subColumns)
-                        maxTS = Math.max(maxTS, subCol.timestamp());
-                }
-            }
-            rm.delete(new QueryPath(cf.name()), maxTS);
-        }
-        rm.apply();
-    }
-
-    /** hintStore must be the hints columnfamily from the system table */
-    private static void deliverAllHints(ColumnFamilyStore hintStore) throws DigestMismatchException, IOException, InvalidRequestException, TimeoutException
-    {
-        if (logger_.isDebugEnabled())
-          logger_.debug("Started deliverAllHints");
-
-        // 1. Scan through all the keys that we need to handoff
-        // 2. For each key read the list of recipients and send
-        // 3. Delete that recipient from the key if write was successful
-        // 4. If all writes were success for a given key we can even delete the key .
-        // 5. Now force a flush
-        // 6. Do major compaction to clean up all deletes etc.
-        // 7. I guess we are done
-        for (String tableName : DatabaseDescriptor.getTables())
-        {
-            ColumnFamily hintColumnFamily = ColumnFamilyStore.removeDeleted(hintStore.getColumnFamily(new IdentityQueryFilter(tableName, new QueryPath(HINTS_CF))), Integer.MAX_VALUE);
-            if (hintColumnFamily == null)
-            {
-                continue;
-            }
-            Collection<IColumn> keys = hintColumnFamily.getSortedColumns();
-
-            for (IColumn keyColumn : keys)
-            {
-                Collection<IColumn> endpoints = keyColumn.getSubColumns();
-                String keyStr = new String(keyColumn.name(), "UTF-8");
-                int deleted = 0;
-                for (IColumn endpoint : endpoints)
-                {
-                    String endpointStr = new String(endpoint.name(), "UTF-8");
-                    if (sendMessage(endpointStr, tableName, keyStr))
-                    {
-                        deleteEndPoint(endpoint.name(), tableName, keyColumn.name(), keyColumn.timestamp());
-                        deleted++;
-                    }
-                }
-                if (deleted == endpoints.size())
-                {
-                    deleteHintedData(tableName, keyStr);
-                }
-            }
-        }
-        hintStore.forceFlush();
-        hintStore.forceCompaction(null, null, 0, null);
-
-        if (logger_.isDebugEnabled())
-          logger_.debug("Finished deliverAllHints");
-    }
-
-    private static void deliverHintsToEndpoint(EndPoint endPoint) throws IOException, DigestMismatchException, InvalidRequestException, TimeoutException
-    {
-        if (logger_.isDebugEnabled())
-          logger_.debug("Started hinted handoff for endPoint " + endPoint.getHost());
-
-        String targetEPBytes = endPoint.getHost();
-        // 1. Scan through all the keys that we need to handoff
-        // 2. For each key read the list of recipients if the endpoint matches send
-        // 3. Delete that recipient from the key if write was successful
-        Table systemTable = Table.open(Table.SYSTEM_TABLE);
-        for (String tableName : DatabaseDescriptor.getTables())
-        {
-            ColumnFamily hintedColumnFamily = systemTable.get(tableName, HINTS_CF);
-            if (hintedColumnFamily == null)
-            {
-                continue;
-            }
-            Collection<IColumn> keys = hintedColumnFamily.getSortedColumns();
-
-            for (IColumn keyColumn : keys)
-            {
-                String keyStr = new String(keyColumn.name(), "UTF-8");
-                Collection<IColumn> endpoints = keyColumn.getSubColumns();
-                for (IColumn hintEndPoint : endpoints)
-                {
-                    if (hintEndPoint.name().equals(targetEPBytes) && sendMessage(endPoint.getHost(), null, keyStr))
-                    {
-                        deleteEndPoint(hintEndPoint.name(), tableName, keyColumn.name(), keyColumn.timestamp());
-                        if (endpoints.size() == 1)
-                        {
-                            deleteHintedData(tableName, keyStr);
-                        }
-                    }
-                }
-            }
-        }
-
-        if (logger_.isDebugEnabled())
-          logger_.debug("Finished hinted handoff for endpoint " + endPoint.getHost());
-    }
-
-    public void submit(final ColumnFamilyStore columnFamilyStore)
-    {
-        Runnable r = new Runnable()
-        {
-            public void run()
-            {
-                try
-                {
-                    deliverAllHints(columnFamilyStore);
-                }
-                catch (Exception e)
-                {
-                    throw new RuntimeException(e);
-                }
-            }
-        };
-    	executor_.scheduleWithFixedDelay(r, HintedHandOffManager.intervalInMins_, HintedHandOffManager.intervalInMins_, TimeUnit.MINUTES);
-    }
-
-    /*
-     * This method is used to deliver hints to a particular endpoint.
-     * When we learn that some endpoint is back up we deliver the data
-     * to him via an event driven mechanism.
-    */
-    public void deliverHints(final EndPoint to)
-    {
-        Runnable r = new Runnable()
-        {
-            public void run()
-            {
-                try
-                {
-                    deliverHintsToEndpoint(to);
-                }
-                catch (Exception e)
-                {
-                    throw new RuntimeException(e);
-                }
-            }
-        };
-    	executor_.submit(r);
-    }
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db;
+
+import java.util.Collection;
+import java.util.Set;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.io.IOException;
+
+import org.apache.log4j.Logger;
+
+import org.apache.cassandra.concurrent.DebuggableScheduledThreadPoolExecutor;
+import org.apache.cassandra.concurrent.ThreadFactoryImpl;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.gms.FailureDetector;
+import org.apache.cassandra.net.EndPoint;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.service.*;
+import org.apache.cassandra.db.filter.IdentityQueryFilter;
+import org.apache.cassandra.db.filter.QueryPath;
+
+
+/**
+ * There are two ways hinted data gets delivered to the intended nodes.
+ *
+ * runHints() runs periodically and pushes the hinted data on this node to
+ * every intended node.
+ *
+ * runDelieverHints() is called when some other node starts up (potentially
+ * from a failure) and delivers the hinted data just to that node.
+ */
+public class HintedHandOffManager
+{
+    private static HintedHandOffManager instance_;
+    private static Lock lock_ = new ReentrantLock();
+    private static Logger logger_ = Logger.getLogger(HintedHandOffManager.class);
+    final static long intervalInMins_ = 60;
+    private ScheduledExecutorService executor_ = new DebuggableScheduledThreadPoolExecutor(1, new ThreadFactoryImpl("HINTED-HANDOFF-POOL"));
+    public static final String HINTS_CF = "HintsColumnFamily";
+
+
+    public static HintedHandOffManager instance()
+    {
+        if ( instance_ == null )
+        {
+            lock_.lock();
+            try
+            {
+                if ( instance_ == null )
+                    instance_ = new HintedHandOffManager();
+            }
+            finally
+            {
+                lock_.unlock();
+            }
+        }
+        return instance_;
+    }
+
+    private static boolean sendMessage(String endpointAddress, String tableName, String key) throws DigestMismatchException, TimeoutException, IOException, InvalidRequestException
+    {
+        EndPoint endPoint = new EndPoint(endpointAddress, DatabaseDescriptor.getStoragePort());
+        if (!FailureDetector.instance().isAlive(endPoint))
+        {
+            return false;
+        }
+
+        Table table = Table.open(tableName);
+        Row row = table.get(key);
+        Row purgedRow = new Row(tableName,key);
+        for (ColumnFamily cf : row.getColumnFamilies())
+        {
+            purgedRow.addColumnFamily(ColumnFamilyStore.removeDeleted(cf));
+        }
+        RowMutation rm = new RowMutation(tableName, purgedRow);
+        Message message = rm.makeRowMutationMessage();
+        QuorumResponseHandler<Boolean> quorumResponseHandler = new QuorumResponseHandler<Boolean>(1, new WriteResponseResolver());
+        MessagingService.getMessagingInstance().sendRR(message, new EndPoint[]{ endPoint }, quorumResponseHandler);
+
+        return quorumResponseHandler.get();
+    }
+
+    private static void deleteEndPoint(byte[] endpointAddress, String tableName, byte[] key, long timestamp) throws IOException
+    {
+        RowMutation rm = new RowMutation(Table.SYSTEM_TABLE, tableName);
+        rm.delete(new QueryPath(HINTS_CF, key, endpointAddress), timestamp);
+        rm.apply();
+    }
+
+    private static void deleteHintedData(String tableName, String key) throws IOException
+    {
+        // delete the row from Application CFs: find the largest timestamp in any of
+        // the data columns, and delete the entire CF with that value for the tombstone.
+
+        // Note that we delete all data associated with the key: this may be more than
+        // we sent earlier in sendMessage, since HH is not serialized with writes.
+        // This is sub-optimal but okay, since HH is just an effort to make a recovering
+        // node more consistent than it would have been; we can rely on the other
+        // consistency mechanisms to finish the job in this corner case.
+        RowMutation rm = new RowMutation(tableName, key);
+        Table table = Table.open(tableName);
+        Row row = table.get(key); // not necessary to do removeDeleted here
+        Collection<ColumnFamily> cfs = row.getColumnFamilies();
+        for (ColumnFamily cf : cfs)
+        {
+            long maxTS = Long.MIN_VALUE;
+            if (!cf.isSuper())
+            {
+                for (IColumn col : cf.getSortedColumns())
+                    maxTS = Math.max(maxTS, col.timestamp());
+            }
+            else
+            {
+                for (IColumn col : cf.getSortedColumns())
+                {
+                    maxTS = Math.max(maxTS, col.timestamp());
+                    Collection<IColumn> subColumns = col.getSubColumns();
+                    for (IColumn subCol : subColumns)
+                        maxTS = Math.max(maxTS, subCol.timestamp());
+                }
+            }
+            rm.delete(new QueryPath(cf.name()), maxTS);
+        }
+        rm.apply();
+    }
+
+    /** hintStore must be the hints columnfamily from the system table */
+    private static void deliverAllHints(ColumnFamilyStore hintStore) throws DigestMismatchException, IOException, InvalidRequestException, TimeoutException
+    {
+        if (logger_.isDebugEnabled())
+          logger_.debug("Started deliverAllHints");
+
+        // 1. Scan through all the keys that we need to handoff
+        // 2. For each key read the list of recipients and send
+        // 3. Delete that recipient from the key if write was successful
+        // 4. If all writes were success for a given key we can even delete the key .
+        // 5. Now force a flush
+        // 6. Do major compaction to clean up all deletes etc.
+        // 7. I guess we are done
+        for (String tableName : DatabaseDescriptor.getTables())
+        {
+            ColumnFamily hintColumnFamily = ColumnFamilyStore.removeDeleted(hintStore.getColumnFamily(new IdentityQueryFilter(tableName, new QueryPath(HINTS_CF))), Integer.MAX_VALUE);
+            if (hintColumnFamily == null)
+            {
+                continue;
+            }
+            Collection<IColumn> keys = hintColumnFamily.getSortedColumns();
+
+            for (IColumn keyColumn : keys)
+            {
+                Collection<IColumn> endpoints = keyColumn.getSubColumns();
+                String keyStr = new String(keyColumn.name(), "UTF-8");
+                int deleted = 0;
+                for (IColumn endpoint : endpoints)
+                {
+                    String endpointStr = new String(endpoint.name(), "UTF-8");
+                    if (sendMessage(endpointStr, tableName, keyStr))
+                    {
+                        deleteEndPoint(endpoint.name(), tableName, keyColumn.name(), keyColumn.timestamp());
+                        deleted++;
+                    }
+                }
+                if (deleted == endpoints.size())
+                {
+                    deleteHintedData(tableName, keyStr);
+                }
+            }
+        }
+        hintStore.forceFlush();
+        hintStore.forceCompaction(null, null, 0, null);
+
+        if (logger_.isDebugEnabled())
+          logger_.debug("Finished deliverAllHints");
+    }
+
+    private static void deliverHintsToEndpoint(EndPoint endPoint) throws IOException, DigestMismatchException, InvalidRequestException, TimeoutException
+    {
+        if (logger_.isDebugEnabled())
+          logger_.debug("Started hinted handoff for endPoint " + endPoint.getHost());
+
+        String targetEPBytes = endPoint.getHost();
+        // 1. Scan through all the keys that we need to handoff
+        // 2. For each key read the list of recipients if the endpoint matches send
+        // 3. Delete that recipient from the key if write was successful
+        Table systemTable = Table.open(Table.SYSTEM_TABLE);
+        for (String tableName : DatabaseDescriptor.getTables())
+        {
+            ColumnFamily hintedColumnFamily = systemTable.get(tableName, HINTS_CF);
+            if (hintedColumnFamily == null)
+            {
+                continue;
+            }
+            Collection<IColumn> keys = hintedColumnFamily.getSortedColumns();
+
+            for (IColumn keyColumn : keys)
+            {
+                String keyStr = new String(keyColumn.name(), "UTF-8");
+                Collection<IColumn> endpoints = keyColumn.getSubColumns();
+                for (IColumn hintEndPoint : endpoints)
+                {
+                    if (hintEndPoint.name().equals(targetEPBytes) && sendMessage(endPoint.getHost(), null, keyStr))
+                    {
+                        deleteEndPoint(hintEndPoint.name(), tableName, keyColumn.name(), keyColumn.timestamp());
+                        if (endpoints.size() == 1)
+                        {
+                            deleteHintedData(tableName, keyStr);
+                        }
+                    }
+                }
+            }
+        }
+
+        if (logger_.isDebugEnabled())
+          logger_.debug("Finished hinted handoff for endpoint " + endPoint.getHost());
+    }
+
+    public void submit(final ColumnFamilyStore columnFamilyStore)
+    {
+        Runnable r = new Runnable()
+        {
+            public void run()
+            {
+                try
+                {
+                    deliverAllHints(columnFamilyStore);
+                }
+                catch (Exception e)
+                {
+                    throw new RuntimeException(e);
+                }
+            }
+        };
+    	executor_.scheduleWithFixedDelay(r, HintedHandOffManager.intervalInMins_, HintedHandOffManager.intervalInMins_, TimeUnit.MINUTES);
+    }
+
+    /*
+     * This method is used to deliver hints to a particular endpoint.
+     * When we learn that some endpoint is back up we deliver the data
+     * to him via an event driven mechanism.
+    */
+    public void deliverHints(final EndPoint to)
+    {
+        Runnable r = new Runnable()
+        {
+            public void run()
+            {
+                try
+                {
+                    deliverHintsToEndpoint(to);
+                }
+                catch (Exception e)
+                {
+                    throw new RuntimeException(e);
+                }
+            }
+        };
+    	executor_.submit(r);
+    }
+}

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/IColumn.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/IColumn.java?rev=799331&r1=799330&r2=799331&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/IColumn.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/IColumn.java Thu Jul 30 15:30:21 2009
@@ -1,49 +1,49 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.db;
-
-import java.util.Collection;
-
-import org.apache.cassandra.db.marshal.AbstractType;
-
-/**
- * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
- */
-
-public interface IColumn
-{
-    public static short UtfPrefix_ = 2;
-    public boolean isMarkedForDelete();
-    public long getMarkedForDeleteAt();
-    public byte[] name();
-    public int size();
-    public int serializedSize();
-    public long timestamp();
-    public long timestamp(byte[] columnName);
-    public byte[] value();
-    public byte[] value(byte[] columnName);
-    public Collection<IColumn> getSubColumns();
-    public IColumn getSubColumn(byte[] columnName);
-    public void addColumn(IColumn column);
-    public IColumn diff(IColumn column);
-    public int getObjectCount();
-    public byte[] digest();
-    public int getLocalDeletionTime(); // for tombstone GC, so int is sufficient granularity
-    public String getString(AbstractType comparator);
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db;
+
+import java.util.Collection;
+
+import org.apache.cassandra.db.marshal.AbstractType;
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public interface IColumn
+{
+    public static short UtfPrefix_ = 2;
+    public boolean isMarkedForDelete();
+    public long getMarkedForDeleteAt();
+    public byte[] name();
+    public int size();
+    public int serializedSize();
+    public long timestamp();
+    public long timestamp(byte[] columnName);
+    public byte[] value();
+    public byte[] value(byte[] columnName);
+    public Collection<IColumn> getSubColumns();
+    public IColumn getSubColumn(byte[] columnName);
+    public void addColumn(IColumn column);
+    public IColumn diff(IColumn column);
+    public int getObjectCount();
+    public byte[] digest();
+    public int getLocalDeletionTime(); // for tombstone GC, so int is sufficient granularity
+    public String getString(AbstractType comparator);
+}

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/IScanner.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/IScanner.java?rev=799331&r1=799330&r2=799331&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/IScanner.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/IScanner.java Thu Jul 30 15:30:21 2009
@@ -16,14 +16,14 @@
 * specific language governing permissions and limitations
 * under the License.
 */
-package org.apache.cassandra.db;
-
-import java.io.Closeable;
-import java.io.IOException;
-
-public interface IScanner<T> extends Closeable
-{
-    public boolean hasNext() throws IOException;
-    public T next() throws IOException;
-    public void fetch(String key, String cf) throws IOException;    
-}
+package org.apache.cassandra.db;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+public interface IScanner<T> extends Closeable
+{
+    public boolean hasNext() throws IOException;
+    public T next() throws IOException;
+    public void fetch(String key, String cf) throws IOException;    
+}

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/LoadVerbHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/LoadVerbHandler.java?rev=799331&r1=799330&r2=799331&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/LoadVerbHandler.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/LoadVerbHandler.java Thu Jul 30 15:30:21 2009
@@ -1,71 +1,71 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.db;
-
-import org.apache.cassandra.net.EndPoint;
-import org.apache.cassandra.net.IVerbHandler;
-import org.apache.cassandra.net.Message;
-import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.utils.LogUtil;
-import org.apache.cassandra.io.DataInputBuffer;
-import org.apache.log4j.Logger;
-
-/**
- * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
- */
-
-public class LoadVerbHandler implements IVerbHandler
-{
-    private static Logger logger_ = Logger.getLogger(LoadVerbHandler.class);    
-    
-    public void doVerb(Message message)
-    { 
-        try
-        {
-	        byte[] body = message.getMessageBody();
-            DataInputBuffer buffer = new DataInputBuffer();
-            buffer.reset(body, body.length);
-	        RowMutationMessage rmMsg = RowMutationMessage.serializer().deserialize(buffer);
-
-            EndPoint[] endpoints = StorageService.instance().getNStorageEndPoint(rmMsg.getRowMutation().key());
-
-			Message messageInternal = new Message(StorageService.getLocalStorageEndPoint(), 
-	                StorageService.mutationStage_,
-					StorageService.mutationVerbHandler_, 
-	                body
-	        );
-            
-            StringBuilder sb = new StringBuilder();
-			for(EndPoint endPoint : endpoints)
-			{                
-                sb.append(endPoint);
-				MessagingService.getMessagingInstance().sendOneWay(messageInternal, endPoint);
-			}
-            if (logger_.isDebugEnabled())
-                logger_.debug("Sent data to " + sb.toString());
-        }        
-        catch ( Exception e )
-        {
-            if (logger_.isDebugEnabled())
-                logger_.debug(LogUtil.throwableToString(e));            
-        }        
-    }
-
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db;
+
+import org.apache.cassandra.net.EndPoint;
+import org.apache.cassandra.net.IVerbHandler;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.LogUtil;
+import org.apache.cassandra.io.DataInputBuffer;
+import org.apache.log4j.Logger;
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class LoadVerbHandler implements IVerbHandler
+{
+    private static Logger logger_ = Logger.getLogger(LoadVerbHandler.class);    
+    
+    public void doVerb(Message message)
+    { 
+        try
+        {
+	        byte[] body = message.getMessageBody();
+            DataInputBuffer buffer = new DataInputBuffer();
+            buffer.reset(body, body.length);
+	        RowMutationMessage rmMsg = RowMutationMessage.serializer().deserialize(buffer);
+
+            EndPoint[] endpoints = StorageService.instance().getNStorageEndPoint(rmMsg.getRowMutation().key());
+
+			Message messageInternal = new Message(StorageService.getLocalStorageEndPoint(), 
+	                StorageService.mutationStage_,
+					StorageService.mutationVerbHandler_, 
+	                body
+	        );
+            
+            StringBuilder sb = new StringBuilder();
+			for(EndPoint endPoint : endpoints)
+			{                
+                sb.append(endPoint);
+				MessagingService.getMessagingInstance().sendOneWay(messageInternal, endPoint);
+			}
+            if (logger_.isDebugEnabled())
+                logger_.debug("Sent data to " + sb.toString());
+        }        
+        catch ( Exception e )
+        {
+            if (logger_.isDebugEnabled())
+                logger_.debug(LogUtil.throwableToString(e));            
+        }        
+    }
+
+}

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java?rev=799331&r1=799330&r2=799331&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java Thu Jul 30 15:30:21 2009
@@ -1,342 +1,342 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.db;
-
-import java.io.IOException;
-import java.util.*;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.commons.lang.ArrayUtils;
-
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.dht.IPartitioner;
-import org.apache.cassandra.io.DataOutputBuffer;
-import org.apache.cassandra.io.SSTableReader;
-import org.apache.cassandra.io.SSTableWriter;
-import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.utils.DestructivePQIterator;
-import org.apache.cassandra.db.filter.*;
-import org.apache.cassandra.db.marshal.AbstractType;
-
-import org.apache.log4j.Logger;
-
-/**
- * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
- */
-
-public class Memtable implements Comparable<Memtable>
-{
-	private static Logger logger_ = Logger.getLogger( Memtable.class );
-
-    private boolean isFrozen_;
-    private volatile boolean isDirty_;
-    private volatile boolean isFlushed_; // for tests, in particular forceBlockingFlush asserts this
-
-    private int threshold_ = DatabaseDescriptor.getMemtableSize()*1024*1024;
-    private int thresholdCount_ = (int)(DatabaseDescriptor.getMemtableObjectCount()*1024*1024);
-    private AtomicInteger currentSize_ = new AtomicInteger(0);
-    private AtomicInteger currentObjectCount_ = new AtomicInteger(0);
-
-    /* Table and ColumnFamily name are used to determine the ColumnFamilyStore */
-    private String table_;
-    private String cfName_;
-    /* Creation time of this Memtable */
-    private long creationTime_;
-    private Map<String, ColumnFamily> columnFamilies_ = new HashMap<String, ColumnFamily>();
-    /* Lock and Condition for notifying new clients about Memtable switches */
-
-    Memtable(String table, String cfName)
-    {
-        table_ = table;
-        cfName_ = cfName;
-        creationTime_ = System.currentTimeMillis();
-    }
-
-    public boolean isFlushed()
-    {
-        return isFlushed_;
-    }
-
-    /**
-     * Compares two Memtable based on creation time.
-     * @param rhs Memtable to compare to.
-     * @return a negative integer, zero, or a positive integer as this object
-     * is less than, equal to, or greater than the specified object.
-     */
-    public int compareTo(Memtable rhs)
-    {
-    	long diff = creationTime_ - rhs.creationTime_;
-    	if ( diff > 0 )
-    		return 1;
-    	else if ( diff < 0 )
-    		return -1;
-    	else
-    		return 0;
-    }
-
-    public int getCurrentSize()
-    {
-        return currentSize_.get();
-    }
-    
-    public int getCurrentObjectCount()
-    {
-        return currentObjectCount_.get();
-    }
-
-    void resolveSize(int oldSize, int newSize)
-    {
-        currentSize_.addAndGet(newSize - oldSize);
-    }
-
-    void resolveCount(int oldCount, int newCount)
-    {
-        currentObjectCount_.addAndGet(newCount - oldCount);
-    }
-
-    boolean isThresholdViolated()
-    {
-        return currentSize_.get() >= threshold_ ||  currentObjectCount_.get() >= thresholdCount_;
-    }
-
-    String getColumnFamily()
-    {
-    	return cfName_;
-    }
-
-    boolean isFrozen()
-    {
-        return isFrozen_;
-    }
-
-    void freeze()
-    {
-        isFrozen_ = true;
-    }
-
-    /**
-     * Should only be called by ColumnFamilyStore.apply.  NOT a public API.
-     * (CFS handles locking to avoid submitting an op
-     *  to a flushing memtable.  Any other way is unsafe.)
-    */
-    void put(String key, ColumnFamily columnFamily)
-    {
-        assert !isFrozen_; // not 100% foolproof but hell, it's an assert
-        isDirty_ = true;
-        resolve(key, columnFamily);
-    }
-
-    /** flush synchronously (in the current thread, not on the executor).
-     *  only the recover code should call this. */
-    void flushOnRecovery() throws IOException {
-        if (!isClean())
-            flush(CommitLog.CommitLogContext.NULL);
-    }
-
-    private void resolve(String key, ColumnFamily columnFamily)
-    {
-    	ColumnFamily oldCf = columnFamilies_.get(key);
-        if ( oldCf != null )
-        {
-            int oldSize = oldCf.size();
-            int oldObjectCount = oldCf.getColumnCount();
-            oldCf.addColumns(columnFamily);
-            int newSize = oldCf.size();
-            int newObjectCount = oldCf.getColumnCount();
-            resolveSize(oldSize, newSize);
-            resolveCount(oldObjectCount, newObjectCount);
-            oldCf.delete(columnFamily);
-        }
-        else
-        {
-            columnFamilies_.put(key, columnFamily);
-            currentSize_.addAndGet(columnFamily.size() + key.length());
-            currentObjectCount_.addAndGet(columnFamily.getColumnCount());
-        }
-    }
-
-    // for debugging
-    public String contents()
-    {
-        StringBuilder builder = new StringBuilder();
-        builder.append("{");
-        for (Map.Entry<String, ColumnFamily> entry : columnFamilies_.entrySet())
-        {
-            builder.append(entry.getKey()).append(": ").append(entry.getValue()).append(", ");
-        }
-        builder.append("}");
-        return builder.toString();
-    }
-
-    void flush(CommitLog.CommitLogContext cLogCtx) throws IOException
-    {
-        logger_.info("Flushing " + this);
-        ColumnFamilyStore cfStore = Table.open(table_).getColumnFamilyStore(cfName_);
-
-        SSTableWriter writer = new SSTableWriter(cfStore.getTempSSTablePath(), columnFamilies_.size(), StorageService.getPartitioner());
-
-        // sort keys in the order they would be in when decorated
-        final IPartitioner partitioner = StorageService.getPartitioner();
-        final Comparator<String> dc = partitioner.getDecoratedKeyComparator();
-        ArrayList<String> orderedKeys = new ArrayList<String>(columnFamilies_.keySet());
-        Collections.sort(orderedKeys, new Comparator<String>()
-        {
-            public int compare(String o1, String o2)
-            {
-                return dc.compare(partitioner.decorateKey(o1), partitioner.decorateKey(o2));
-            }
-        });
-        DataOutputBuffer buffer = new DataOutputBuffer();
-        for (String key : orderedKeys)
-        {
-            buffer.reset();
-            ColumnFamily columnFamily = columnFamilies_.get(key);
-            if ( columnFamily != null )
-            {
-                /* serialize the cf with column indexes */
-                ColumnFamily.serializerWithIndexes().serialize( columnFamily, buffer );
-                /* Now write the key and value to disk */
-                writer.append(partitioner.decorateKey(key), buffer);
-            }
-        }
-        SSTableReader ssTable = writer.closeAndOpenReader(DatabaseDescriptor.getKeysCachedFraction(table_));
-        cfStore.onMemtableFlush(cLogCtx);
-        cfStore.storeLocation(ssTable);
-        buffer.close();
-        isFlushed_ = true;
-        logger_.info("Completed flushing " + this);
-    }
-
-    public String toString()
-    {
-        return "Memtable(" + cfName_ + ")@" + hashCode();
-    }
-
-    /**
-     * there does not appear to be any data structure that we can pass to PriorityQueue that will
-     * get it to heapify in-place instead of copying first, so we might as well return a Set.
-    */
-    Set<String> getKeys() throws ExecutionException, InterruptedException
-    {
-        return new HashSet<String>(columnFamilies_.keySet());
-    }
-
-    public static Iterator<String> getKeyIterator(Set<String> keys)
-    {
-        if (keys.size() == 0)
-        {
-            // cannot create a PQ of size zero (wtf?)
-            return Arrays.asList(new String[0]).iterator();
-        }
-        PriorityQueue<String> pq = new PriorityQueue<String>(keys.size(), StorageService.getPartitioner().getDecoratedKeyComparator());
-        pq.addAll(keys);
-        return new DestructivePQIterator<String>(pq);
-    }
-
-    public boolean isClean()
-    {
-        // executor taskcount is inadequate for our needs here -- it can return zero under certain
-        // race conditions even though a task has been processed.
-        return !isDirty_;
-    }
-
-    /**
-     * obtain an iterator of columns in this memtable in the specified order starting from a given column.
-     */
-    public ColumnIterator getSliceIterator(SliceQueryFilter filter, AbstractType typeComparator)
-    {
-        ColumnFamily cf = columnFamilies_.get(filter.key);
-        final ColumnFamily columnFamily = cf == null ? ColumnFamily.create(table_, filter.getColumnFamilyName()) : cf.cloneMeShallow();
-
-        final IColumn columns[] = (cf == null ? columnFamily : cf).getSortedColumns().toArray(new IColumn[columnFamily.getSortedColumns().size()]);
-        // TODO if we are dealing with supercolumns, we need to clone them while we have the read lock since they can be modified later
-        if (!filter.isAscending)
-            ArrayUtils.reverse(columns);
-        IColumn startIColumn;
-        if (DatabaseDescriptor.getColumnFamilyType(table_, filter.getColumnFamilyName()).equals("Standard"))
-            startIColumn = new Column(filter.start);
-        else
-            startIColumn = new SuperColumn(filter.start, null); // ok to not have subcolumnComparator since we won't be adding columns to this object
-
-        // can't use a ColumnComparatorFactory comparator since those compare on both name and time (and thus will fail to match
-        // our dummy column, since the time there is arbitrary).
-        Comparator<IColumn> comparator = filter.getColumnComparator(typeComparator);
-        int index = Arrays.binarySearch(columns, startIColumn, comparator);
-        final int startIndex = index < 0 ? -(index + 1) : index;
-
-        return new AbstractColumnIterator()
-        {
-            private int curIndex_ = startIndex;
-
-            public ColumnFamily getColumnFamily()
-            {
-                return columnFamily;
-            }
-
-            public boolean hasNext()
-            {
-                return curIndex_ < columns.length;
-            }
-
-            public IColumn next()
-            {
-                return columns[curIndex_++];
-            }
-        };
-    }
-
-    public ColumnIterator getNamesIterator(final NamesQueryFilter filter)
-    {
-        final ColumnFamily cf = columnFamilies_.get(filter.key);
-        final ColumnFamily columnFamily = cf == null ? ColumnFamily.create(table_, filter.getColumnFamilyName()) : cf.cloneMeShallow();
-
-        return new SimpleAbstractColumnIterator()
-        {
-            private Iterator<byte[]> iter = filter.columns.iterator();
-            private byte[] current;
-
-            public ColumnFamily getColumnFamily()
-            {
-                return columnFamily;
-            }
-
-            protected IColumn computeNext()
-            {
-                if (cf == null)
-                {
-                    return endOfData();
-                }
-                while (iter.hasNext())
-                {
-                    current = iter.next();
-                    IColumn column = cf.getColumn(current);
-                    if (column != null)
-                        return column;
-                }
-                return endOfData();
-            }
-        };
-    }
-    
-    void clearUnsafe()
-    {
-        columnFamilies_.clear();
-    }
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db;
+
+import java.io.IOException;
+import java.util.*;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.commons.lang.ArrayUtils;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.io.DataOutputBuffer;
+import org.apache.cassandra.io.SSTableReader;
+import org.apache.cassandra.io.SSTableWriter;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.DestructivePQIterator;
+import org.apache.cassandra.db.filter.*;
+import org.apache.cassandra.db.marshal.AbstractType;
+
+import org.apache.log4j.Logger;
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class Memtable implements Comparable<Memtable>
+{
+	private static Logger logger_ = Logger.getLogger( Memtable.class );
+
+    private boolean isFrozen_;
+    private volatile boolean isDirty_;
+    private volatile boolean isFlushed_; // for tests, in particular forceBlockingFlush asserts this
+
+    private int threshold_ = DatabaseDescriptor.getMemtableSize()*1024*1024;
+    private int thresholdCount_ = (int)(DatabaseDescriptor.getMemtableObjectCount()*1024*1024);
+    private AtomicInteger currentSize_ = new AtomicInteger(0);
+    private AtomicInteger currentObjectCount_ = new AtomicInteger(0);
+
+    /* Table and ColumnFamily name are used to determine the ColumnFamilyStore */
+    private String table_;
+    private String cfName_;
+    /* Creation time of this Memtable */
+    private long creationTime_;
+    private Map<String, ColumnFamily> columnFamilies_ = new HashMap<String, ColumnFamily>();
+    /* Lock and Condition for notifying new clients about Memtable switches */
+
+    Memtable(String table, String cfName)
+    {
+        table_ = table;
+        cfName_ = cfName;
+        creationTime_ = System.currentTimeMillis();
+    }
+
+    public boolean isFlushed()
+    {
+        return isFlushed_;
+    }
+
+    /**
+     * Compares two Memtable based on creation time.
+     * @param rhs Memtable to compare to.
+     * @return a negative integer, zero, or a positive integer as this object
+     * is less than, equal to, or greater than the specified object.
+     */
+    public int compareTo(Memtable rhs)
+    {
+    	long diff = creationTime_ - rhs.creationTime_;
+    	if ( diff > 0 )
+    		return 1;
+    	else if ( diff < 0 )
+    		return -1;
+    	else
+    		return 0;
+    }
+
+    public int getCurrentSize()
+    {
+        return currentSize_.get();
+    }
+    
+    public int getCurrentObjectCount()
+    {
+        return currentObjectCount_.get();
+    }
+
+    void resolveSize(int oldSize, int newSize)
+    {
+        currentSize_.addAndGet(newSize - oldSize);
+    }
+
+    void resolveCount(int oldCount, int newCount)
+    {
+        currentObjectCount_.addAndGet(newCount - oldCount);
+    }
+
+    boolean isThresholdViolated()
+    {
+        return currentSize_.get() >= threshold_ ||  currentObjectCount_.get() >= thresholdCount_;
+    }
+
+    String getColumnFamily()
+    {
+    	return cfName_;
+    }
+
+    boolean isFrozen()
+    {
+        return isFrozen_;
+    }
+
+    void freeze()
+    {
+        isFrozen_ = true;
+    }
+
+    /**
+     * Should only be called by ColumnFamilyStore.apply.  NOT a public API.
+     * (CFS handles locking to avoid submitting an op
+     *  to a flushing memtable.  Any other way is unsafe.)
+    */
+    void put(String key, ColumnFamily columnFamily)
+    {
+        assert !isFrozen_; // not 100% foolproof but hell, it's an assert
+        isDirty_ = true;
+        resolve(key, columnFamily);
+    }
+
+    /** flush synchronously (in the current thread, not on the executor).
+     *  only the recover code should call this. */
+    void flushOnRecovery() throws IOException {
+        if (!isClean())
+            flush(CommitLog.CommitLogContext.NULL);
+    }
+
+    private void resolve(String key, ColumnFamily columnFamily)
+    {
+    	ColumnFamily oldCf = columnFamilies_.get(key);
+        if ( oldCf != null )
+        {
+            int oldSize = oldCf.size();
+            int oldObjectCount = oldCf.getColumnCount();
+            oldCf.addColumns(columnFamily);
+            int newSize = oldCf.size();
+            int newObjectCount = oldCf.getColumnCount();
+            resolveSize(oldSize, newSize);
+            resolveCount(oldObjectCount, newObjectCount);
+            oldCf.delete(columnFamily);
+        }
+        else
+        {
+            columnFamilies_.put(key, columnFamily);
+            currentSize_.addAndGet(columnFamily.size() + key.length());
+            currentObjectCount_.addAndGet(columnFamily.getColumnCount());
+        }
+    }
+
+    // for debugging
+    public String contents()
+    {
+        StringBuilder builder = new StringBuilder();
+        builder.append("{");
+        for (Map.Entry<String, ColumnFamily> entry : columnFamilies_.entrySet())
+        {
+            builder.append(entry.getKey()).append(": ").append(entry.getValue()).append(", ");
+        }
+        builder.append("}");
+        return builder.toString();
+    }
+
+    void flush(CommitLog.CommitLogContext cLogCtx) throws IOException
+    {
+        logger_.info("Flushing " + this);
+        ColumnFamilyStore cfStore = Table.open(table_).getColumnFamilyStore(cfName_);
+
+        SSTableWriter writer = new SSTableWriter(cfStore.getTempSSTablePath(), columnFamilies_.size(), StorageService.getPartitioner());
+
+        // sort keys in the order they would be in when decorated
+        final IPartitioner partitioner = StorageService.getPartitioner();
+        final Comparator<String> dc = partitioner.getDecoratedKeyComparator();
+        ArrayList<String> orderedKeys = new ArrayList<String>(columnFamilies_.keySet());
+        Collections.sort(orderedKeys, new Comparator<String>()
+        {
+            public int compare(String o1, String o2)
+            {
+                return dc.compare(partitioner.decorateKey(o1), partitioner.decorateKey(o2));
+            }
+        });
+        DataOutputBuffer buffer = new DataOutputBuffer();
+        for (String key : orderedKeys)
+        {
+            buffer.reset();
+            ColumnFamily columnFamily = columnFamilies_.get(key);
+            if ( columnFamily != null )
+            {
+                /* serialize the cf with column indexes */
+                ColumnFamily.serializerWithIndexes().serialize( columnFamily, buffer );
+                /* Now write the key and value to disk */
+                writer.append(partitioner.decorateKey(key), buffer);
+            }
+        }
+        SSTableReader ssTable = writer.closeAndOpenReader(DatabaseDescriptor.getKeysCachedFraction(table_));
+        cfStore.onMemtableFlush(cLogCtx);
+        cfStore.storeLocation(ssTable);
+        buffer.close();
+        isFlushed_ = true;
+        logger_.info("Completed flushing " + this);
+    }
+
+    public String toString()
+    {
+        return "Memtable(" + cfName_ + ")@" + hashCode();
+    }
+
+    /**
+     * there does not appear to be any data structure that we can pass to PriorityQueue that will
+     * get it to heapify in-place instead of copying first, so we might as well return a Set.
+    */
+    Set<String> getKeys() throws ExecutionException, InterruptedException
+    {
+        return new HashSet<String>(columnFamilies_.keySet());
+    }
+
+    public static Iterator<String> getKeyIterator(Set<String> keys)
+    {
+        if (keys.size() == 0)
+        {
+            // cannot create a PQ of size zero (wtf?)
+            return Arrays.asList(new String[0]).iterator();
+        }
+        PriorityQueue<String> pq = new PriorityQueue<String>(keys.size(), StorageService.getPartitioner().getDecoratedKeyComparator());
+        pq.addAll(keys);
+        return new DestructivePQIterator<String>(pq);
+    }
+
+    public boolean isClean()
+    {
+        // executor taskcount is inadequate for our needs here -- it can return zero under certain
+        // race conditions even though a task has been processed.
+        return !isDirty_;
+    }
+
+    /**
+     * obtain an iterator of columns in this memtable in the specified order starting from a given column.
+     */
+    public ColumnIterator getSliceIterator(SliceQueryFilter filter, AbstractType typeComparator)
+    {
+        ColumnFamily cf = columnFamilies_.get(filter.key);
+        final ColumnFamily columnFamily = cf == null ? ColumnFamily.create(table_, filter.getColumnFamilyName()) : cf.cloneMeShallow();
+
+        final IColumn columns[] = (cf == null ? columnFamily : cf).getSortedColumns().toArray(new IColumn[columnFamily.getSortedColumns().size()]);
+        // TODO if we are dealing with supercolumns, we need to clone them while we have the read lock since they can be modified later
+        if (!filter.isAscending)
+            ArrayUtils.reverse(columns);
+        IColumn startIColumn;
+        if (DatabaseDescriptor.getColumnFamilyType(table_, filter.getColumnFamilyName()).equals("Standard"))
+            startIColumn = new Column(filter.start);
+        else
+            startIColumn = new SuperColumn(filter.start, null); // ok to not have subcolumnComparator since we won't be adding columns to this object
+
+        // can't use a ColumnComparatorFactory comparator since those compare on both name and time (and thus will fail to match
+        // our dummy column, since the time there is arbitrary).
+        Comparator<IColumn> comparator = filter.getColumnComparator(typeComparator);
+        int index = Arrays.binarySearch(columns, startIColumn, comparator);
+        final int startIndex = index < 0 ? -(index + 1) : index;
+
+        return new AbstractColumnIterator()
+        {
+            private int curIndex_ = startIndex;
+
+            public ColumnFamily getColumnFamily()
+            {
+                return columnFamily;
+            }
+
+            public boolean hasNext()
+            {
+                return curIndex_ < columns.length;
+            }
+
+            public IColumn next()
+            {
+                return columns[curIndex_++];
+            }
+        };
+    }
+
+    public ColumnIterator getNamesIterator(final NamesQueryFilter filter)
+    {
+        final ColumnFamily cf = columnFamilies_.get(filter.key);
+        final ColumnFamily columnFamily = cf == null ? ColumnFamily.create(table_, filter.getColumnFamilyName()) : cf.cloneMeShallow();
+
+        return new SimpleAbstractColumnIterator()
+        {
+            private Iterator<byte[]> iter = filter.columns.iterator();
+            private byte[] current;
+
+            public ColumnFamily getColumnFamily()
+            {
+                return columnFamily;
+            }
+
+            protected IColumn computeNext()
+            {
+                if (cf == null)
+                {
+                    return endOfData();
+                }
+                while (iter.hasNext())
+                {
+                    current = iter.next();
+                    IColumn column = cf.getColumn(current);
+                    if (column != null)
+                        return column;
+                }
+                return endOfData();
+            }
+        };
+    }
+    
+    void clearUnsafe()
+    {
+        columnFamilies_.clear();
+    }
+}

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/MinorCompactionManager.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/MinorCompactionManager.java?rev=799331&r1=799330&r2=799331&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/MinorCompactionManager.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/MinorCompactionManager.java Thu Jul 30 15:30:21 2009
@@ -1,191 +1,191 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.db;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-
-import org.apache.cassandra.concurrent.DebuggableScheduledThreadPoolExecutor;
-import org.apache.cassandra.concurrent.ThreadFactoryImpl;
-import org.apache.cassandra.dht.Range;
-import org.apache.cassandra.net.EndPoint;
-import org.apache.cassandra.service.StorageService;
-import org.apache.log4j.Logger;
-
-/**
- * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
- */
-
-class MinorCompactionManager
-{
-    private static MinorCompactionManager instance_;
-    private static Lock lock_ = new ReentrantLock();
-    private static Logger logger_ = Logger.getLogger(MinorCompactionManager.class);
-    private static final long intervalInMins_ = 5;
-    static final int COMPACTION_THRESHOLD = 4; // compact this many sstables at a time
-
-    public static MinorCompactionManager instance()
-    {
-        if ( instance_ == null )
-        {
-            lock_.lock();
-            try
-            {
-                if ( instance_ == null )
-                    instance_ = new MinorCompactionManager();
-            }
-            finally
-            {
-                lock_.unlock();
-            }
-        }
-        return instance_;
-    }
-
-    class FileCompactor2 implements Callable<Boolean>
-    {
-        private ColumnFamilyStore columnFamilyStore_;
-        private List<Range> ranges_;
-        private EndPoint target_;
-        private List<String> fileList_;
-
-        FileCompactor2(ColumnFamilyStore columnFamilyStore, List<Range> ranges, EndPoint target,List<String> fileList)
-        {
-            columnFamilyStore_ = columnFamilyStore;
-            ranges_ = ranges;
-            target_ = target;
-            fileList_ = fileList;
-        }
-
-        public Boolean call()
-        {
-        	boolean result;
-            if (logger_.isDebugEnabled())
-              logger_.debug("Started  compaction ..."+columnFamilyStore_.columnFamily_);
-            try
-            {
-                result = columnFamilyStore_.doAntiCompaction(ranges_, target_,fileList_);
-            }
-            catch (IOException e)
-            {
-                throw new RuntimeException(e);
-            }
-            if (logger_.isDebugEnabled())
-              logger_.debug("Finished compaction ..."+columnFamilyStore_.columnFamily_);
-            return result;
-        }
-    }
-
-    class OnDemandCompactor implements Runnable
-    {
-        private ColumnFamilyStore columnFamilyStore_;
-        private long skip_ = 0L;
-
-        OnDemandCompactor(ColumnFamilyStore columnFamilyStore, long skip)
-        {
-            columnFamilyStore_ = columnFamilyStore;
-            skip_ = skip;
-        }
-
-        public void run()
-        {
-            if (logger_.isDebugEnabled())
-              logger_.debug("Started  Major compaction for " + columnFamilyStore_.columnFamily_);
-            try
-            {
-                columnFamilyStore_.doMajorCompaction(skip_);
-            }
-            catch (IOException e)
-            {
-                throw new RuntimeException(e);
-            }
-            if (logger_.isDebugEnabled())
-              logger_.debug("Finished Major compaction for " + columnFamilyStore_.columnFamily_);
-        }
-    }
-
-    class CleanupCompactor implements Runnable
-    {
-        private ColumnFamilyStore columnFamilyStore_;
-
-        CleanupCompactor(ColumnFamilyStore columnFamilyStore)
-        {
-        	columnFamilyStore_ = columnFamilyStore;
-        }
-
-        public void run()
-        {
-            if (logger_.isDebugEnabled())
-              logger_.debug("Started  compaction ..."+columnFamilyStore_.columnFamily_);
-            try
-            {
-                columnFamilyStore_.doCleanupCompaction();
-            }
-            catch (IOException e)
-            {
-                throw new RuntimeException(e);
-            }
-            if (logger_.isDebugEnabled())
-              logger_.debug("Finished compaction ..."+columnFamilyStore_.columnFamily_);
-        }
-    }
-    
-    
-    private ScheduledExecutorService compactor_ = new DebuggableScheduledThreadPoolExecutor(1, new ThreadFactoryImpl("MINOR-COMPACTION-POOL"));
-
-    public Future<Integer> submit(final ColumnFamilyStore columnFamilyStore)
-    {
-        return submit(columnFamilyStore, COMPACTION_THRESHOLD);
-    }
-
-    Future<Integer> submit(final ColumnFamilyStore columnFamilyStore, final int threshold)
-    {
-        Callable<Integer> callable = new Callable<Integer>()
-        {
-            public Integer call() throws IOException
-            {
-                return columnFamilyStore.doCompaction(threshold);
-            }
-        };
-        return compactor_.submit(callable);
-    }
-
-    public void submitCleanup(ColumnFamilyStore columnFamilyStore)
-    {
-        compactor_.submit(new CleanupCompactor(columnFamilyStore));
-    }
-
-    public Future<Boolean> submit(ColumnFamilyStore columnFamilyStore, List<Range> ranges, EndPoint target, List<String> fileList)
-    {
-        return compactor_.submit( new FileCompactor2(columnFamilyStore, ranges, target, fileList) );
-    }
-
-    public void  submitMajor(ColumnFamilyStore columnFamilyStore, long skip)
-    {
-        compactor_.submit( new OnDemandCompactor(columnFamilyStore, skip) );
-    }
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.cassandra.concurrent.DebuggableScheduledThreadPoolExecutor;
+import org.apache.cassandra.concurrent.ThreadFactoryImpl;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.net.EndPoint;
+import org.apache.cassandra.service.StorageService;
+import org.apache.log4j.Logger;
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+class MinorCompactionManager
+{
+    private static MinorCompactionManager instance_;
+    private static Lock lock_ = new ReentrantLock();
+    private static Logger logger_ = Logger.getLogger(MinorCompactionManager.class);
+    private static final long intervalInMins_ = 5;
+    static final int COMPACTION_THRESHOLD = 4; // compact this many sstables at a time
+
+    public static MinorCompactionManager instance()
+    {
+        if ( instance_ == null )
+        {
+            lock_.lock();
+            try
+            {
+                if ( instance_ == null )
+                    instance_ = new MinorCompactionManager();
+            }
+            finally
+            {
+                lock_.unlock();
+            }
+        }
+        return instance_;
+    }
+
+    class FileCompactor2 implements Callable<Boolean>
+    {
+        private ColumnFamilyStore columnFamilyStore_;
+        private List<Range> ranges_;
+        private EndPoint target_;
+        private List<String> fileList_;
+
+        FileCompactor2(ColumnFamilyStore columnFamilyStore, List<Range> ranges, EndPoint target,List<String> fileList)
+        {
+            columnFamilyStore_ = columnFamilyStore;
+            ranges_ = ranges;
+            target_ = target;
+            fileList_ = fileList;
+        }
+
+        public Boolean call()
+        {
+        	boolean result;
+            if (logger_.isDebugEnabled())
+              logger_.debug("Started  compaction ..."+columnFamilyStore_.columnFamily_);
+            try
+            {
+                result = columnFamilyStore_.doAntiCompaction(ranges_, target_,fileList_);
+            }
+            catch (IOException e)
+            {
+                throw new RuntimeException(e);
+            }
+            if (logger_.isDebugEnabled())
+              logger_.debug("Finished compaction ..."+columnFamilyStore_.columnFamily_);
+            return result;
+        }
+    }
+
+    class OnDemandCompactor implements Runnable
+    {
+        private ColumnFamilyStore columnFamilyStore_;
+        private long skip_ = 0L;
+
+        OnDemandCompactor(ColumnFamilyStore columnFamilyStore, long skip)
+        {
+            columnFamilyStore_ = columnFamilyStore;
+            skip_ = skip;
+        }
+
+        public void run()
+        {
+            if (logger_.isDebugEnabled())
+              logger_.debug("Started  Major compaction for " + columnFamilyStore_.columnFamily_);
+            try
+            {
+                columnFamilyStore_.doMajorCompaction(skip_);
+            }
+            catch (IOException e)
+            {
+                throw new RuntimeException(e);
+            }
+            if (logger_.isDebugEnabled())
+              logger_.debug("Finished Major compaction for " + columnFamilyStore_.columnFamily_);
+        }
+    }
+
+    class CleanupCompactor implements Runnable
+    {
+        private ColumnFamilyStore columnFamilyStore_;
+
+        CleanupCompactor(ColumnFamilyStore columnFamilyStore)
+        {
+        	columnFamilyStore_ = columnFamilyStore;
+        }
+
+        public void run()
+        {
+            if (logger_.isDebugEnabled())
+              logger_.debug("Started  compaction ..."+columnFamilyStore_.columnFamily_);
+            try
+            {
+                columnFamilyStore_.doCleanupCompaction();
+            }
+            catch (IOException e)
+            {
+                throw new RuntimeException(e);
+            }
+            if (logger_.isDebugEnabled())
+              logger_.debug("Finished compaction ..."+columnFamilyStore_.columnFamily_);
+        }
+    }
+    
+    
+    private ScheduledExecutorService compactor_ = new DebuggableScheduledThreadPoolExecutor(1, new ThreadFactoryImpl("MINOR-COMPACTION-POOL"));
+
+    public Future<Integer> submit(final ColumnFamilyStore columnFamilyStore)
+    {
+        return submit(columnFamilyStore, COMPACTION_THRESHOLD);
+    }
+
+    Future<Integer> submit(final ColumnFamilyStore columnFamilyStore, final int threshold)
+    {
+        Callable<Integer> callable = new Callable<Integer>()
+        {
+            public Integer call() throws IOException
+            {
+                return columnFamilyStore.doCompaction(threshold);
+            }
+        };
+        return compactor_.submit(callable);
+    }
+
+    public void submitCleanup(ColumnFamilyStore columnFamilyStore)
+    {
+        compactor_.submit(new CleanupCompactor(columnFamilyStore));
+    }
+
+    public Future<Boolean> submit(ColumnFamilyStore columnFamilyStore, List<Range> ranges, EndPoint target, List<String> fileList)
+    {
+        return compactor_.submit( new FileCompactor2(columnFamilyStore, ranges, target, fileList) );
+    }
+
+    public void  submitMajor(ColumnFamilyStore columnFamilyStore, long skip)
+    {
+        compactor_.submit( new OnDemandCompactor(columnFamilyStore, skip) );
+    }
+}

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/PeriodicFlushManager.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/PeriodicFlushManager.java?rev=799331&r1=799330&r2=799331&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/PeriodicFlushManager.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/PeriodicFlushManager.java Thu Jul 30 15:30:21 2009
@@ -1,70 +1,70 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.db;
-
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-
-import org.apache.log4j.Logger;
-
-import org.apache.cassandra.concurrent.*;
-import org.apache.cassandra.service.StorageService;
-
-/**
- *  Background flusher that force-flushes a column family periodically.
- */
-class PeriodicFlushManager
-{
-    private static Logger logger_ = Logger.getLogger(PeriodicFlushManager.class);
-    private static PeriodicFlushManager instance_;
-    private static Lock lock_ = new ReentrantLock();
-    private ScheduledExecutorService flusher_ = new DebuggableScheduledThreadPoolExecutor(1, new ThreadFactoryImpl("PERIODIC-FLUSHER-POOL"));
-
-    public static PeriodicFlushManager instance()
-    {
-        if ( instance_ == null )
-        {
-            lock_.lock();
-            try
-            {
-                if ( instance_ == null )
-                    instance_ = new PeriodicFlushManager();
-            }
-            finally
-            {
-                lock_.unlock();
-            }
-        }
-        return instance_;
-    }
-
-    public void submitPeriodicFlusher(final ColumnFamilyStore columnFamilyStore, int flushPeriodInMinutes)
-    {        
-        Runnable runnable= new Runnable()
-        {
-            public void run()
-            {
-                columnFamilyStore.forceFlush();
-            }
-        };
-        flusher_.scheduleWithFixedDelay(runnable, flushPeriodInMinutes, flushPeriodInMinutes, TimeUnit.MINUTES);       
-    }
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db;
+
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.log4j.Logger;
+
+import org.apache.cassandra.concurrent.*;
+import org.apache.cassandra.service.StorageService;
+
+/**
+ *  Background flusher that force-flushes a column family periodically.
+ */
+class PeriodicFlushManager
+{
+    private static Logger logger_ = Logger.getLogger(PeriodicFlushManager.class);
+    private static PeriodicFlushManager instance_;
+    private static Lock lock_ = new ReentrantLock();
+    private ScheduledExecutorService flusher_ = new DebuggableScheduledThreadPoolExecutor(1, new ThreadFactoryImpl("PERIODIC-FLUSHER-POOL"));
+
+    public static PeriodicFlushManager instance()
+    {
+        if ( instance_ == null )
+        {
+            lock_.lock();
+            try
+            {
+                if ( instance_ == null )
+                    instance_ = new PeriodicFlushManager();
+            }
+            finally
+            {
+                lock_.unlock();
+            }
+        }
+        return instance_;
+    }
+
+    public void submitPeriodicFlusher(final ColumnFamilyStore columnFamilyStore, int flushPeriodInMinutes)
+    {        
+        Runnable runnable= new Runnable()
+        {
+            public void run()
+            {
+                columnFamilyStore.forceFlush();
+            }
+        };
+        flusher_.scheduleWithFixedDelay(runnable, flushPeriodInMinutes, flushPeriodInMinutes, TimeUnit.MINUTES);       
+    }
+}

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadCommand.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadCommand.java?rev=799331&r1=799330&r2=799331&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadCommand.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadCommand.java Thu Jul 30 15:30:21 2009
@@ -1,115 +1,115 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.db;
-
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.cassandra.io.ICompactSerializer;
-import org.apache.cassandra.net.Message;
-import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.db.marshal.AbstractType;
-import org.apache.cassandra.config.DatabaseDescriptor;
-
-
-public abstract class ReadCommand
-{
-    public static final String DO_REPAIR = "READ-REPAIR";
-    public static final byte CMD_TYPE_GET_SLICE_BY_NAMES = 1;
-    public static final byte CMD_TYPE_GET_SLICE = 2;
-
-    public static final String EMPTY_CF = "";
-    
-    private static ReadCommandSerializer serializer = new ReadCommandSerializer();
-
-    public static ReadCommandSerializer serializer()
-    {
-        return serializer;
-    }
-
-    public Message makeReadMessage() throws IOException
-    {
-        ByteArrayOutputStream bos = new ByteArrayOutputStream();
-        DataOutputStream dos = new DataOutputStream(bos);
-        ReadCommand.serializer().serialize(this, dos);
-        return new Message(StorageService.getLocalStorageEndPoint(), StorageService.readStage_, StorageService.readVerbHandler_, bos.toByteArray());
-    }
-
-    public final String table;
-    public final String key;
-    private boolean isDigestQuery = false;    
-    protected final byte commandType;
-
-    protected ReadCommand(String table, String key, byte cmdType)
-    {
-        this.table = table;
-        this.key = key;
-        this.commandType = cmdType;
-    }
-    
-    public boolean isDigestQuery()
-    {
-        return isDigestQuery;
-    }
-
-    public void setDigestQuery(boolean isDigestQuery)
-    {
-        this.isDigestQuery = isDigestQuery;
-    }
-
-    public abstract String getColumnFamilyName();
-    
-    public abstract ReadCommand copy();
-
-    public abstract Row getRow(Table table) throws IOException;
-
-    protected AbstractType getComparator()
-    {
-        return DatabaseDescriptor.getComparator(table, getColumnFamilyName());
-    }
-}
-
-class ReadCommandSerializer implements ICompactSerializer<ReadCommand>
-{
-    private static final Map<Byte, ReadCommandSerializer> CMD_SERIALIZER_MAP = new HashMap<Byte, ReadCommandSerializer>(); 
-    static 
-    {
-        CMD_SERIALIZER_MAP.put(ReadCommand.CMD_TYPE_GET_SLICE_BY_NAMES, new SliceByNamesReadCommandSerializer());
-        CMD_SERIALIZER_MAP.put(ReadCommand.CMD_TYPE_GET_SLICE, new SliceFromReadCommandSerializer());
-    }
-
-
-    public void serialize(ReadCommand rm, DataOutputStream dos) throws IOException
-    {
-        dos.writeByte(rm.commandType);
-        ReadCommandSerializer ser = CMD_SERIALIZER_MAP.get(rm.commandType);
-        ser.serialize(rm, dos);
-    }
-
-    public ReadCommand deserialize(DataInputStream dis) throws IOException
-    {
-        byte msgType = dis.readByte();
-        return CMD_SERIALIZER_MAP.get(msgType).deserialize(dis);
-    }
-        
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.cassandra.io.ICompactSerializer;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.config.DatabaseDescriptor;
+
+
+public abstract class ReadCommand
+{
+    public static final String DO_REPAIR = "READ-REPAIR";
+    public static final byte CMD_TYPE_GET_SLICE_BY_NAMES = 1;
+    public static final byte CMD_TYPE_GET_SLICE = 2;
+
+    public static final String EMPTY_CF = "";
+    
+    private static ReadCommandSerializer serializer = new ReadCommandSerializer();
+
+    public static ReadCommandSerializer serializer()
+    {
+        return serializer;
+    }
+
+    public Message makeReadMessage() throws IOException
+    {
+        ByteArrayOutputStream bos = new ByteArrayOutputStream();
+        DataOutputStream dos = new DataOutputStream(bos);
+        ReadCommand.serializer().serialize(this, dos);
+        return new Message(StorageService.getLocalStorageEndPoint(), StorageService.readStage_, StorageService.readVerbHandler_, bos.toByteArray());
+    }
+
+    public final String table;
+    public final String key;
+    private boolean isDigestQuery = false;    
+    protected final byte commandType;
+
+    protected ReadCommand(String table, String key, byte cmdType)
+    {
+        this.table = table;
+        this.key = key;
+        this.commandType = cmdType;
+    }
+    
+    public boolean isDigestQuery()
+    {
+        return isDigestQuery;
+    }
+
+    public void setDigestQuery(boolean isDigestQuery)
+    {
+        this.isDigestQuery = isDigestQuery;
+    }
+
+    public abstract String getColumnFamilyName();
+    
+    public abstract ReadCommand copy();
+
+    public abstract Row getRow(Table table) throws IOException;
+
+    protected AbstractType getComparator()
+    {
+        return DatabaseDescriptor.getComparator(table, getColumnFamilyName());
+    }
+}
+
+class ReadCommandSerializer implements ICompactSerializer<ReadCommand>
+{
+    private static final Map<Byte, ReadCommandSerializer> CMD_SERIALIZER_MAP = new HashMap<Byte, ReadCommandSerializer>(); 
+    static 
+    {
+        CMD_SERIALIZER_MAP.put(ReadCommand.CMD_TYPE_GET_SLICE_BY_NAMES, new SliceByNamesReadCommandSerializer());
+        CMD_SERIALIZER_MAP.put(ReadCommand.CMD_TYPE_GET_SLICE, new SliceFromReadCommandSerializer());
+    }
+
+
+    public void serialize(ReadCommand rm, DataOutputStream dos) throws IOException
+    {
+        dos.writeByte(rm.commandType);
+        ReadCommandSerializer ser = CMD_SERIALIZER_MAP.get(rm.commandType);
+        ser.serialize(rm, dos);
+    }
+
+    public ReadCommand deserialize(DataInputStream dis) throws IOException
+    {
+        byte msgType = dis.readByte();
+        return CMD_SERIALIZER_MAP.get(msgType).deserialize(dis);
+    }
+        
+}



Mime
View raw message