cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jbel...@apache.org
Subject svn commit: r1127179 - in /cassandra/trunk: ./ contrib/ examples/bmt/ interface/thrift/gen-java/org/apache/cassandra/thrift/ src/java/org/apache/cassandra/config/ src/java/org/apache/cassandra/db/ src/java/org/apache/cassandra/tools/
Date Tue, 24 May 2011 17:43:33 GMT
Author: jbellis
Date: Tue May 24 17:43:32 2011
New Revision: 1127179

URL: http://svn.apache.org/viewvc?rev=1127179&view=rev
Log:
revert incomplete BMT-excision

Added:
    cassandra/trunk/examples/bmt/
      - copied from r1127164, cassandra/trunk/examples/bmt/
    cassandra/trunk/examples/bmt/CassandraBulkLoader.java
      - copied unchanged from r1127164, cassandra/trunk/examples/bmt/CassandraBulkLoader.java
    cassandra/trunk/examples/bmt/README.txt
      - copied unchanged from r1127164, cassandra/trunk/examples/bmt/README.txt
    cassandra/trunk/src/java/org/apache/cassandra/db/BinaryMemtable.java
      - copied unchanged from r1127164, cassandra/trunk/src/java/org/apache/cassandra/db/BinaryMemtable.java
    cassandra/trunk/src/java/org/apache/cassandra/db/IFlushable.java
      - copied unchanged from r1127164, cassandra/trunk/src/java/org/apache/cassandra/db/IFlushable.java
    cassandra/trunk/src/java/org/apache/cassandra/tools/GetVersion.java
      - copied unchanged from r1126728, cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/tools/GetVersion.java
Modified:
    cassandra/trunk/   (props changed)
    cassandra/trunk/contrib/   (props changed)
    cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java 
 (props changed)
    cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java   (props
changed)
    cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java
  (props changed)
    cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java
  (props changed)
    cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java
  (props changed)
    cassandra/trunk/src/java/org/apache/cassandra/config/Config.java
    cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
    cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
    cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java

Propchange: cassandra/trunk/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue May 24 17:43:32 2011
@@ -1,7 +1,7 @@
 /cassandra/branches/cassandra-0.6:922689-1052356,1052358-1053452,1053454,1053456-1081914,1083000
 /cassandra/branches/cassandra-0.7:1026516-1127143
 /cassandra/branches/cassandra-0.7.0:1053690-1055654
-/cassandra/branches/cassandra-0.8:1090934-1125013,1125019-1127170
+/cassandra/branches/cassandra-0.8:1090934-1125013,1125019-1127174
 /cassandra/branches/cassandra-0.8.0:1125021-1127038
 /cassandra/branches/cassandra-0.8.1:1101014-1125018
 /cassandra/tags/cassandra-0.7.0-rc3:1051699-1053689

Propchange: cassandra/trunk/contrib/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue May 24 17:43:32 2011
@@ -1,7 +1,7 @@
 /cassandra/branches/cassandra-0.6/contrib:922689-1052356,1052358-1053452,1053454,1053456-1068009
 /cassandra/branches/cassandra-0.7/contrib:1026516-1127143
 /cassandra/branches/cassandra-0.7.0/contrib:1053690-1055654
-/cassandra/branches/cassandra-0.8/contrib:1090934-1125013,1125019-1127170
+/cassandra/branches/cassandra-0.8/contrib:1090934-1125013,1125019-1127174
 /cassandra/branches/cassandra-0.8.0/contrib:1125021-1127038
 /cassandra/branches/cassandra-0.8.1/contrib:1101014-1125018
 /cassandra/tags/cassandra-0.7.0-rc3/contrib:1051699-1053689

Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue May 24 17:43:32 2011
@@ -1,7 +1,7 @@
 /cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:922689-1052356,1052358-1053452,1053454,1053456-1081914,1083000
 /cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1026516-1127143
 /cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1053690-1055654
-/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1090934-1125013,1125019-1127170
+/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1090934-1125013,1125019-1127174
 /cassandra/branches/cassandra-0.8.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1125021-1127038
 /cassandra/branches/cassandra-0.8.1/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1101014-1125018
 /cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1051699-1053689

Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue May 24 17:43:32 2011
@@ -1,7 +1,7 @@
 /cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:922689-1052356,1052358-1053452,1053454,1053456-1081914,1083000
 /cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1026516-1127143
 /cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1053690-1055654
-/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1090934-1125013,1125019-1127170
+/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1090934-1125013,1125019-1127174
 /cassandra/branches/cassandra-0.8.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1125021-1127038
 /cassandra/branches/cassandra-0.8.1/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1101014-1125018
 /cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1051699-1053689

Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue May 24 17:43:32 2011
@@ -1,7 +1,7 @@
 /cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:922689-1052356,1052358-1053452,1053454,1053456-1081914,1083000
 /cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1026516-1127143
 /cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1053690-1055654
-/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1090934-1125013,1125019-1127170
+/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1090934-1125013,1125019-1127174
 /cassandra/branches/cassandra-0.8.0/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1125021-1127038
 /cassandra/branches/cassandra-0.8.1/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1101014-1125018
 /cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1051699-1053689

Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue May 24 17:43:32 2011
@@ -1,7 +1,7 @@
 /cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:922689-1052356,1052358-1053452,1053454,1053456-1081914,1083000
 /cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1026516-1127143
 /cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1053690-1055654
-/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1090934-1125013,1125019-1127170
+/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1090934-1125013,1125019-1127174
 /cassandra/branches/cassandra-0.8.0/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1125021-1127038
 /cassandra/branches/cassandra-0.8.1/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1101014-1125018
 /cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1051699-1053689

Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue May 24 17:43:32 2011
@@ -1,7 +1,7 @@
 /cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:922689-1052356,1052358-1053452,1053454,1053456-1081914,1083000
 /cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1026516-1127143
 /cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1053690-1055654
-/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1090934-1125013,1125019-1127170
+/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1090934-1125013,1125019-1127174
 /cassandra/branches/cassandra-0.8.0/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1125021-1127038
 /cassandra/branches/cassandra-0.8.1/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1101014-1125018
 /cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1051699-1053689

Modified: cassandra/trunk/src/java/org/apache/cassandra/config/Config.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/config/Config.java?rev=1127179&r1=1127178&r2=1127179&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/config/Config.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/config/Config.java Tue May 24 17:43:32 2011
@@ -77,6 +77,8 @@ public class Config
     public Boolean snapshot_before_compaction = false;
     public Integer compaction_thread_priority = Thread.MIN_PRIORITY;
     
+    public Integer binary_memtable_throughput_in_mb = 256;
+    
     /* if the size of columns or super-columns are more than this, indexing will kick in
*/
     public Integer column_index_size_in_kb = 64;
     public Integer in_memory_compaction_limit_in_mb = 256;

Modified: cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java?rev=1127179&r1=1127178&r2=1127179&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java Tue May 24
17:43:32 2011
@@ -937,6 +937,11 @@ public class DatabaseDescriptor
         return conf.sliced_buffer_size_in_kb;
     }
 
+    public static int getBMTThreshold()
+    {
+        return conf.binary_memtable_throughput_in_mb;
+    }
+
     public static int getCompactionThreadPriority()
     {
         return conf.compaction_thread_priority;

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=1127179&r1=1127178&r2=1127179&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Tue May 24 17:43:32
2011
@@ -25,6 +25,7 @@ import java.util.*;
 import java.util.concurrent.*;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.regex.Pattern;
@@ -71,10 +72,13 @@ public class ColumnFamilyStore implement
     private static Logger logger = LoggerFactory.getLogger(ColumnFamilyStore.class);
 
     /*
-     * maybeSwitchMemtable puts Memtable.getSortedContents on the writer executor.  When
the write is complete,
+     * submitFlush first puts [Binary]Memtable.getSortedContents on the flushSorter executor,
+     * which then puts the sorted results on the writer executor.  This is because sorting
is CPU-bound,
+     * and writing is disk-bound; we want to be able to do both at once.  When the write
is complete,
      * we turn the writer into an SSTableReader and add it to ssTables_ where it is available
for reads.
      *
-     * There are two other things that maybeSwitchMemtable does.
+     * For BinaryMemtable that's about all that happens.  For live Memtables there are two
other things
+     * that switchMemtable does (which should be the only caller of submitFlush in this case).
      * First, it puts the Memtable into memtablesPendingFlush, where it stays until the flush
is complete
      * and it's been added as an SSTableReader to ssTables_.  Second, it adds an entry to
commitLogUpdater
      * that waits for the flush to complete, then calls onMemtableFlush.  This allows multiple
flushes
@@ -82,6 +86,13 @@ public class ColumnFamilyStore implement
      * which is necessary for replay in case of a restart since CommitLog assumes that when
onMF is
      * called, all data up to the given context has been persisted to SSTables.
      */
+    private static final ExecutorService flushSorter
+            = new JMXEnabledThreadPoolExecutor(Runtime.getRuntime().availableProcessors(),
+                                               StageManager.KEEPALIVE,
+                                               TimeUnit.SECONDS,
+                                               new LinkedBlockingQueue<Runnable>(Runtime.getRuntime().availableProcessors()),
+                                               new NamedThreadFactory("FlushSorter"),
+                                               "internal");
     private static final ExecutorService flushWriter
             = new JMXEnabledThreadPoolExecutor(DatabaseDescriptor.getFlushWriters(),
                                                StageManager.KEEPALIVE,
@@ -123,6 +134,9 @@ public class ColumnFamilyStore implement
 
     private final ConcurrentSkipListMap<ByteBuffer, ColumnFamilyStore> indexedColumns;
 
+    // TODO binarymemtable ops are not threadsafe (do they need to be?)
+    private AtomicReference<BinaryMemtable> binaryMemtable;
+
     private LatencyTracker readStats = new LatencyTracker();
     private LatencyTracker writeStats = new LatencyTracker();
 
@@ -243,6 +257,7 @@ public class ColumnFamilyStore implement
         this.keyCacheSaveInSeconds = new DefaultInteger(metadata.getKeyCacheSavePeriodInSeconds());
         this.partitioner = partitioner;
         fileIndexGenerator.set(generation);
+        binaryMemtable = new AtomicReference<BinaryMemtable>(new BinaryMemtable(this));
 
         if (logger.isDebugEnabled())
             logger.debug("Starting CFS {}", columnFamily);
@@ -643,11 +658,7 @@ public class ColumnFamilyStore implement
             }
             final CountDownLatch latch = new CountDownLatch(icc.size());
             for (ColumnFamilyStore cfs : icc)
-            {
-                Memtable memtable = cfs.data.switchMemtable();
-                logger.info("Enqueuing flush of {}", memtable);
-                memtable.flushAndSignal(latch, flushWriter, ctx);
-            }
+                submitFlush(cfs.data.switchMemtable(), latch, ctx);
 
             // we marked our memtable as frozen as part of the concurrency control,
             // so even if there was nothing to flush we need to switch it out
@@ -688,6 +699,12 @@ public class ColumnFamilyStore implement
                : DatabaseDescriptor.getCFMetaData(metadata.cfId) == null;
     }
 
+    void switchBinaryMemtable(DecoratedKey key, ByteBuffer buffer)
+    {
+        binaryMemtable.set(new BinaryMemtable(this));
+        binaryMemtable.get().put(key, buffer);
+    }
+
     public void forceFlushIfExpired()
     {
         if (getMemtableThreadSafe().isExpired())
@@ -718,6 +735,14 @@ public class ColumnFamilyStore implement
             future.get();
     }
 
+    public void forceFlushBinary()
+    {
+        if (binaryMemtable.get().isClean())
+            return;
+
+        submitFlush(binaryMemtable.get(), new CountDownLatch(1), null);
+    }
+
     public void updateRowCache(DecoratedKey key, ColumnFamily columnFamily)
     {
         if (rowCache.isPutCopying())
@@ -768,6 +793,20 @@ public class ColumnFamilyStore implement
         return flushRequested ? mt : null;
     }
 
+    /*
+     * Insert/Update the column family for this key. param @ lock - lock that
+     * Caller is responsible for acquiring Table.flusherLock!
+     * param @ lock - lock that needs to be used.
+     * needs to be used. param @ key - key for update/insert param @
+     * columnFamily - columnFamily changes
+     */
+    void applyBinary(DecoratedKey key, ByteBuffer buffer)
+    {
+        long start = System.nanoTime();
+        binaryMemtable.get().put(key, buffer);
+        writeStats.addNano(System.nanoTime() - start);
+    }
+
     public static ColumnFamily removeDeletedCF(ColumnFamily cf, int gcBefore)
     {
         // in case of a timestamp tie, tombstones get priority over non-tombstones.
@@ -959,6 +998,21 @@ public class ColumnFamilyStore implement
         }
     }
 
+    /**
+     * submits flush sort on the flushSorter executor, which will in turn submit to flushWriter
when sorted.
+     * TODO because our executors use CallerRunsPolicy, when flushSorter fills up, no writes
will proceed
+     * because the next flush will start executing on the caller, mutation-stage thread that
has the
+     * flush write lock held.  (writes aquire this as a read lock before proceeding.)
+     * This is good, because it backpressures flushes, but bad, because we can't write until
that last
+     * flushing thread finishes sorting, which will almost always be longer than any of the
flushSorter threads proper
+     * (since, by definition, it started last).
+     */
+    void submitFlush(IFlushable flushable, CountDownLatch latch, ReplayPosition context)
+    {
+        logger.info("Enqueuing flush of {}", flushable);
+        flushable.flushAndSignal(latch, flushSorter, flushWriter, context);
+    }
+
     public long getMemtableColumnsCount()
     {
         return getMemtableThreadSafe().getOperations();

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java?rev=1127179&r1=1127178&r2=1127179&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java Tue May 24 17:43:32 2011
@@ -46,7 +46,7 @@ import org.apache.cassandra.io.sstable.S
 import org.apache.cassandra.utils.WrappedRunnable;
 import org.github.jamm.MemoryMeter;
 
-public class Memtable implements Comparable<Memtable>
+public class Memtable implements Comparable<Memtable>, IFlushable
 {
     private static final Logger logger = LoggerFactory.getLogger(Memtable.class);
 
@@ -256,7 +256,7 @@ public class Memtable implements Compara
         return ssTable;
     }
 
-    public void flushAndSignal(final CountDownLatch latch, ExecutorService writer, final
ReplayPosition context)
+    public void flushAndSignal(final CountDownLatch latch, ExecutorService sorter, final
ExecutorService writer, final ReplayPosition context)
     {
         writer.execute(new WrappedRunnable()
         {



Mime
View raw message