cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jbel...@apache.org
Subject svn commit: r911224 - in /incubator/cassandra/trunk/src/java/org/apache/cassandra/db: ColumnFamilyStore.java Memtable.java
Date Wed, 17 Feb 2010 23:22:42 GMT
Author: jbellis
Date: Wed Feb 17 23:22:41 2010
New Revision: 911224

URL: http://svn.apache.org/viewvc?rev=911224&view=rev
Log:
refactor to make memtablesPendingFlush a member variable instead of a static, and Memtable
to have a reference to CFS instead of table/cfname pair.
patch by jbellis; reviewed by Stu Hood for CASSANDRA-799

Modified:
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=911224&r1=911223&r2=911224&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Wed
Feb 17 23:22:41 2010
@@ -61,7 +61,6 @@
 
 import org.apache.commons.collections.IteratorUtils;
 
-import org.cliffc.high_scale_lib.NonBlockingHashMap;
 import com.google.common.collect.Iterators;
 import com.google.common.base.Predicate;
 
@@ -84,7 +83,6 @@
      * which is necessary for replay in case of a restart since CommitLog assumes that when
onMF is
      * called, all data up to the given context has been persisted to SSTables.
      */
-    private static NonBlockingHashMap<String, Set<Memtable>> memtablesPendingFlush
= new NonBlockingHashMap<String, Set<Memtable>>();
     private static ExecutorService flushSorter_
             = new JMXEnabledThreadPoolExecutor(1,
                                                Runtime.getRuntime().availableProcessors(),
@@ -103,6 +101,8 @@
 
     private static final int KEY_RANGE_FILE_BUFFER_SIZE = 256 * 1024;
 
+    private Set<Memtable> memtablesPendingFlush = new ConcurrentSkipListSet<Memtable>();
+
     private final String table_;
     public final String columnFamily_;
     private final boolean isSuper_;
@@ -132,7 +132,7 @@
         columnFamily_ = columnFamilyName;
         isSuper_ = isSuper;
         fileIndexGenerator_.set(indexValue);
-        memtable_ = new Memtable(table_, columnFamily_);
+        memtable_ = new Memtable(this);
         binaryMemtable_ = new AtomicReference<BinaryMemtable>(new BinaryMemtable(this));
 
         if (logger_.isDebugEnabled())
@@ -377,7 +377,7 @@
             final CommitLogSegment.CommitLogContext ctx = writeCommitLog ? CommitLog.instance().getContext()
: null;
             logger_.info(columnFamily_ + " has reached its threshold; switching in a fresh
Memtable at " + ctx);
             final Condition condition = submitFlush(oldMemtable);
-            memtable_ = new Memtable(table_, columnFamily_);
+            memtable_ = new Memtable(this);
             // a second executor that makes sure the onMemtableFlushes get called in the
right order,
             // while keeping the wait-for-flush (future.get) out of anything latency-sensitive.
             return commitLogUpdater_.submit(new WrappedRunnable()
@@ -616,22 +616,6 @@
         ssTables_.replace(sstables, replacements);
     }
 
-    public static List<Memtable> getUnflushedMemtables(String cfName)
-    {
-        return new ArrayList<Memtable>(getMemtablesPendingFlushNotNull(cfName));
-    }
-
-    static Set<Memtable> getMemtablesPendingFlushNotNull(String columnFamilyName)
-    {
-        Set<Memtable> memtables = memtablesPendingFlush.get(columnFamilyName);
-        if (memtables == null)
-        {
-            memtablesPendingFlush.putIfAbsent(columnFamilyName, new ConcurrentSkipListSet<Memtable>());
-            memtables = memtablesPendingFlush.get(columnFamilyName); // might not be the
object we just put, if there was a race!
-        }
-        return memtables;
-    }
-
     /**
      * submits flush sort on the flushSorter executor, which will in turn submit to flushWriter
when sorted.
      * TODO because our executors use CallerRunsPolicy, when flushSorter fills up, no writes
will proceed
@@ -854,8 +838,7 @@
             iterators.add(iter);
 
             /* add the memtables being flushed */
-            List<Memtable> memtables = getUnflushedMemtables(filter.getColumnFamilyName());
-            for (Memtable memtable:memtables)
+            for (Memtable memtable : getMemtablesPendingFlush())
             {
                 iter = filter.getMemColumnIterator(memtable, getComparator());
                 returnCF.delete(iter.getColumnFamily());
@@ -930,7 +913,7 @@
         // current memtable keys.  have to go through the CFS api for locking.
         iterators.add(Iterators.filter(memtableKeyIterator(startWith), p));
         // historical memtables
-        for (Memtable memtable : ColumnFamilyStore.getUnflushedMemtables(columnFamily_))
+        for (Memtable memtable : memtablesPendingFlush)
         {
             iterators.add(Iterators.filter(memtable.getKeyIterator(startWith), p));
         }
@@ -1179,4 +1162,10 @@
         memtable_.clearUnsafe();
         ssTables_.clearUnsafe();
     }
+
+
+    public Set<Memtable> getMemtablesPendingFlush()
+    {
+        return memtablesPendingFlush;
+    }
 }

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java?rev=911224&r1=911223&r2=911224&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java Wed Feb 17 23:22:41
2010
@@ -51,16 +51,15 @@
     private final AtomicInteger currentThroughput = new AtomicInteger(0);
     private final AtomicInteger currentOperations = new AtomicInteger(0);
 
-    private final String table;
-    private final String columnfamilyName;
     private final long creationTime;
     private final ConcurrentNavigableMap<DecoratedKey, ColumnFamily> columnFamilies
= new ConcurrentSkipListMap<DecoratedKey, ColumnFamily>();
     private final IPartitioner partitioner = StorageService.getPartitioner();
+    private final ColumnFamilyStore cfs;
 
-    Memtable(String table, String cfName)
+    public Memtable(ColumnFamilyStore cfs)
     {
-        this.table = table;
-        columnfamilyName = cfName;
+
+        this.cfs = cfs;
         creationTime = System.currentTimeMillis();
     }
 
@@ -147,8 +146,7 @@
     private SSTableReader writeSortedContents() throws IOException
     {
         logger.info("Writing " + this);
-        ColumnFamilyStore cfStore = Table.open(table).getColumnFamilyStore(columnfamilyName);
-        SSTableWriter writer = new SSTableWriter(cfStore.getFlushPath(), columnFamilies.size(),
StorageService.getPartitioner());
+        SSTableWriter writer = new SSTableWriter(cfs.getFlushPath(), columnFamilies.size(),
StorageService.getPartitioner());
 
         DataOutputBuffer buffer = new DataOutputBuffer();
         for (Map.Entry<DecoratedKey, ColumnFamily> entry : columnFamilies.entrySet())
@@ -160,21 +158,20 @@
             writer.append(entry.getKey(), buffer);
         }
 
-        SSTableReader ssTable = writer.closeAndOpenReader(DatabaseDescriptor.getKeysCachedFraction(table,
columnfamilyName));
+        SSTableReader ssTable = writer.closeAndOpenReader(DatabaseDescriptor.getKeysCachedFraction(getTableName(),
cfs.getColumnFamilyName()));
         logger.info("Completed flushing " + ssTable.getFilename());
         return ssTable;
     }
 
     public void flushAndSignal(final Condition condition, ExecutorService sorter, final ExecutorService
writer)
     {
-        ColumnFamilyStore.getMemtablesPendingFlushNotNull(columnfamilyName).add(this); //
it's ok for the MT to briefly be both active and pendingFlush
+        cfs.getMemtablesPendingFlush().add(this); // it's ok for the MT to briefly be both
active and pendingFlush
         writer.submit(new WrappedRunnable()
         {
             public void runMayThrow() throws IOException
             {
-                ColumnFamilyStore cfs = Table.open(table).getColumnFamilyStore(columnfamilyName);
                 cfs.addSSTable(writeSortedContents());
-                ColumnFamilyStore.getMemtablesPendingFlushNotNull(columnfamilyName).remove(Memtable.this);
+                cfs.getMemtablesPendingFlush().remove(Memtable.this);
                 condition.signalAll();
             }
         });
@@ -182,7 +179,7 @@
 
     public String toString()
     {
-        return "Memtable(" + columnfamilyName + ")@" + hashCode();
+        return "Memtable(" + cfs.getColumnFamilyName() + ")@" + hashCode();
     }
 
     public Iterator<DecoratedKey> getKeyIterator(DecoratedKey startWith)
@@ -195,19 +192,24 @@
         return columnFamilies.isEmpty();
     }
 
+    private String getTableName()
+    {
+        return cfs.getTable().name;
+    }
+
     /**
      * obtain an iterator of columns in this memtable in the specified order starting from
a given column.
      */
     public ColumnIterator getSliceIterator(ColumnFamily cf, SliceQueryFilter filter, AbstractType
typeComparator)
     {
-        final ColumnFamily columnFamily = cf == null ? ColumnFamily.create(table, filter.getColumnFamilyName())
: cf.cloneMeShallow();
+        final ColumnFamily columnFamily = cf == null ? ColumnFamily.create(getTableName(),
filter.getColumnFamilyName()) : cf.cloneMeShallow();
 
         final IColumn columns[] = (cf == null ? columnFamily : cf).getSortedColumns().toArray(new
IColumn[columnFamily.getSortedColumns().size()]);
         // TODO if we are dealing with supercolumns, we need to clone them while we have
the read lock since they can be modified later
         if (filter.reversed)
             ArrayUtils.reverse(columns);
         IColumn startIColumn;
-        final boolean isStandard = DatabaseDescriptor.getColumnFamilyType(table, filter.getColumnFamilyName()).equals("Standard");
+        final boolean isStandard = DatabaseDescriptor.getColumnFamilyType(getTableName(),
filter.getColumnFamilyName()).equals("Standard");
         if (isStandard)
             startIColumn = new Column(filter.start);
         else
@@ -252,8 +254,8 @@
 
     public ColumnIterator getNamesIterator(final ColumnFamily cf, final NamesQueryFilter
filter)
     {
-        final ColumnFamily columnFamily = cf == null ? ColumnFamily.create(table, filter.getColumnFamilyName())
: cf.cloneMeShallow();
-        final boolean isStandard = DatabaseDescriptor.getColumnFamilyType(table, filter.getColumnFamilyName()).equals("Standard");
+        final ColumnFamily columnFamily = cf == null ? ColumnFamily.create(getTableName(),
filter.getColumnFamilyName()) : cf.cloneMeShallow();
+        final boolean isStandard = DatabaseDescriptor.getColumnFamilyType(getTableName(),
filter.getColumnFamilyName()).equals("Standard");
 
         return new SimpleAbstractColumnIterator()
         {



Mime
View raw message