cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jbel...@apache.org
Subject svn commit: r908830 - in /incubator/cassandra/trunk: src/java/org/apache/cassandra/db/ src/java/org/apache/cassandra/db/commitlog/ test/unit/org/apache/cassandra/db/
Date Thu, 11 Feb 2010 03:49:50 GMT
Author: jbellis
Date: Thu Feb 11 03:49:49 2010
New Revision: 908830

URL: http://svn.apache.org/viewvc?rev=908830&view=rev
Log:
encapsulate commitlog file operations in CommitLogSegment
patch by jbellis; reviewed by junrao for CASSANDRA-783

Added:
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
  (with props)
Modified:
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RecoveryManager.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLogExecutorService.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLogHeader.java
    incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/CommitLogTest.java

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=908830&r1=908829&r2=908830&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Thu
Feb 11 03:49:49 2010
@@ -40,6 +40,7 @@
 
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.commitlog.CommitLog;
+import org.apache.cassandra.db.commitlog.CommitLogSegment;
 import org.apache.cassandra.dht.AbstractBounds;
 import org.apache.cassandra.dht.Bounds;
 import org.apache.cassandra.dht.Range;
@@ -366,14 +367,14 @@
         Table.flusherLock.writeLock().lock();
         try
         {
-            final CommitLog.CommitLogContext ctx = CommitLog.instance().getContext(); //
this is harmless if !writeCommitLog
-
             if (oldMemtable.isFrozen())
             {
                 return null;
             }
-            logger_.info(columnFamily_ + " has reached its threshold; switching in a fresh
Memtable");
             oldMemtable.freeze();
+
+            final CommitLogSegment.CommitLogContext ctx = writeCommitLog ? CommitLog.instance().getContext()
: null;
+            logger_.info(columnFamily_ + " has reached its threshold; switching in a fresh
Memtable at " + ctx);
             final Condition condition = submitFlush(oldMemtable);
             memtable_ = new Memtable(table_, columnFamily_);
             // a second executor that makes sure the onMemtableFlushes get called in the
right order,
@@ -387,7 +388,7 @@
                     {
                         // if we're not writing to the commit log, we are replaying the log,
so marking
                         // the log header with "you can discard anything written before the
context" is not valid
-                        onMemtableFlush(ctx);
+                        CommitLog.instance().discardCompletedSegments(table_, columnFamily_,
ctx);
                     }
                 }
             });
@@ -533,19 +534,6 @@
     }
 
     /*
-     * This method is called when the Memtable is frozen and ready to be flushed
-     * to disk. This method informs the CommitLog that a particular ColumnFamily
-     * is being flushed to disk.
-     */
-    void onMemtableFlush(CommitLog.CommitLogContext cLogCtx) throws IOException
-    {
-        if (cLogCtx.isValidContext())
-        {
-            CommitLog.instance().onMemtableFlush(table_, columnFamily_, cLogCtx);
-        }
-    }
-
-    /*
      * Called after the Memtable flushes its in-memory data, or we add a file
      * via bootstrap. This information is
      * cached in the ColumnFamilyStore. This is useful for reads because the

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RecoveryManager.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RecoveryManager.java?rev=908830&r1=908829&r2=908830&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RecoveryManager.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RecoveryManager.java Thu Feb
11 03:49:49 2010
@@ -53,7 +53,7 @@
 
         Arrays.sort(files, new FileUtils.FileComparator());
         logger_.info("Replaying " + StringUtils.join(files, ", "));
-        CommitLog.instance().recover(files);
+        CommitLog.recover(files);
         FileUtils.delete(files);
         logger_.info("Log replay complete");
     }

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java?rev=908830&r1=908829&r2=908830&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java Thu Feb 11 03:49:49
2010
@@ -28,6 +28,7 @@
 import com.google.common.collect.Iterables;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.commitlog.CommitLog;
+import org.apache.cassandra.db.commitlog.CommitLogSegment;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.io.SSTableDeletingReference;
 import org.apache.cassandra.io.SSTableReader;
@@ -396,7 +397,7 @@
         {
             if (writeCommitLog)
             {
-                Future<CommitLog.CommitLogContext> future = CommitLog.instance().add(mutation,
serializedMutation);
+                Future<CommitLogSegment.CommitLogContext> future = CommitLog.instance().add(mutation,
serializedMutation);
                 if (waitForCommitLog)
                 {
                     try

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLog.java?rev=908830&r1=908829&r2=908830&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLog.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLog.java Thu
Feb 11 03:49:49 2010
@@ -23,9 +23,7 @@
 import org.apache.cassandra.db.RowMutation;
 import org.apache.cassandra.db.Table;
 import org.apache.cassandra.io.util.BufferedRandomAccessFile;
-import org.apache.cassandra.io.util.DataOutputBuffer;
 import org.apache.cassandra.io.DeletionService;
-import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.WrappedRunnable;
 import org.apache.cassandra.concurrent.StageManager;
 
@@ -68,15 +66,12 @@
  * inherited the old's dirty bitflags, getting a zero for any given bit in the anding
  * means that either the CF was clean in the old CL or it has been flushed since the
  * switch in the new.)
- *
- * The CommitLog class itself is "mostly a singleton."  open() always returns one
- * instance, but log replay will bypass that.
  */
 public class CommitLog
 {
     private static volatile int SEGMENT_SIZE = 128*1024*1024; // roll after log gets this
big
+
     private static final Logger logger = Logger.getLogger(CommitLog.class);
-    private static final Map<String, CommitLogHeader> clHeaders = new HashMap<String,
CommitLogHeader>();
 
     public static CommitLog instance()
     {
@@ -88,81 +83,21 @@
         public static final CommitLog instance = new CommitLog();
     }
 
-    public static class CommitLogContext
-    {
-        /* Commit Log associated with this operation */
-        public final String file;
-        /* Offset within the Commit Log where this row as added */
-        public final long position;
-
-        public CommitLogContext(String file, long position)
-        {
-            this.file = file;
-            this.position = position;
-        }
-
-        public boolean isValidContext()
-        {
-            return (position != -1L);
-        }
-
-        @Override
-        public String toString()
-        {
-            return "CommitLogContext(" +
-                   "file='" + file + '\'' +
-                   ", position=" + position +
-                   ')';
-        }
-    }
-
-    public static class CommitLogFileComparator implements Comparator<String>
-    {
-        public int compare(String f, String f2)
-        {
-            return (int)(getCreationTime(f) - getCreationTime(f2));
-        }
-    }
+    private final Deque<CommitLogSegment> segments = new ArrayDeque<CommitLogSegment>();
 
     public static void setSegmentSize(int size)
     {
         SEGMENT_SIZE = size;
     }
 
-    public static int getSegmentCount()
+    public int getSegmentCount()
     {
-        return clHeaders.size();
-    }
-
-    static long getCreationTime(String file)
-    {
-        String[] entries = FBUtilities.strip(file, "-.");
-        return Long.parseLong(entries[entries.length - 2]);
-    }
-
-    private static BufferedRandomAccessFile createWriter(String file) throws IOException
-    {        
-        return new BufferedRandomAccessFile(file, "rw");
+        return segments.size();
     }
 
-    /* Current commit log file */
-    private String logFile;
-    /* header for current commit log */
-    private CommitLogHeader clHeader;
-    private BufferedRandomAccessFile logWriter;
     private final ExecutorService executor = new CommitLogExecutorService();
 
-    /*
-     * Generates a file name of the format CommitLog-<table>-<timestamp>.log
in the
-     * directory specified by the Database Descriptor.
-    */
-    private void setNextFileName()
-    {
-        logFile = DatabaseDescriptor.getLogFileLocation() + File.separator +
-                   "CommitLog-" + System.currentTimeMillis() + ".log";
-    }
-
-    /*
+    /**
      * param @ table - name of table for which we are maintaining
      *                 this commit log.
      * param @ recoverymode - is commit log being instantiated in
@@ -170,17 +105,11 @@
     */
     private CommitLog()
     {
-        setNextFileName();
-        try
-        {
-            logWriter = CommitLog.createWriter(logFile);
-            writeCommitLogHeader();
-        }
-        catch (IOException e)
-        {
-            throw new IOError(e);
-        }
-
+        // all old segments are recovered and deleted before CommitLog is instantiated.
+        // All we need to do is create a new one.
+        int cfSize = Table.TableMetadata.getColumnFamilyCount();
+        segments.add(new CommitLogSegment(cfSize));
+        
         if (DatabaseDescriptor.getCommitLogSync() == DatabaseDescriptor.CommitLogSync.periodic)
         {
             final Runnable syncer = new WrappedRunnable()
@@ -216,50 +145,7 @@
         }
     }
 
-    String getLogFile()
-    {
-        return logFile;
-    }
-    
-    private CommitLogHeader readCommitLogHeader(BufferedRandomAccessFile logReader) throws
IOException
-    {
-        int size = (int)logReader.readLong();
-        byte[] bytes = new byte[size];
-        logReader.read(bytes);
-        ByteArrayInputStream byteStream = new ByteArrayInputStream(bytes);
-        return CommitLogHeader.serializer().deserialize(new DataInputStream(byteStream));
-    }
-
-    /*
-     * This is invoked on startup via the ctor. It basically
-     * writes a header with all bits set to zero.
-    */
-    private void writeCommitLogHeader() throws IOException
-    {
-        int cfSize = Table.TableMetadata.getColumnFamilyCount();
-        clHeader = new CommitLogHeader(cfSize);
-        writeCommitLogHeader(logWriter, clHeader.toByteArray());
-    }
-
-    /** writes header at the beginning of the file, then seeks back to current position */
-    private void seekAndWriteCommitLogHeader(byte[] bytes) throws IOException
-    {
-        long currentPos = logWriter.getFilePointer();
-        logWriter.seek(0);
-
-        writeCommitLogHeader(logWriter, bytes);
-
-        logWriter.seek(currentPos);
-    }
-
-    private static void writeCommitLogHeader(BufferedRandomAccessFile logWriter, byte[] bytes)
throws IOException
-    {
-        logWriter.writeLong(bytes.length);
-        logWriter.write(bytes);
-        logWriter.sync();
-    }
-
-    public void recover(File[] clogs) throws IOException
+    public static void recover(File[] clogs) throws IOException
     {
         Set<Table> tablesRecovered = new HashSet<Table>();
         assert StageManager.getStage(StageManager.MUTATION_STAGE).getCompletedTaskCount()
== 0;
@@ -268,7 +154,7 @@
         {
             int bufferSize = (int)Math.min(file.length(), 32 * 1024 * 1024);
             BufferedRandomAccessFile reader = new BufferedRandomAccessFile(file.getAbsolutePath(),
"r", bufferSize);
-            final CommitLogHeader clHeader = readCommitLogHeader(reader);
+            final CommitLogHeader clHeader = CommitLogHeader.readCommitLogHeader(reader);
             /* seek to the lowest position where any CF has non-flushed data */
             int lowPos = CommitLogHeader.getLowestPosition(clHeader);
             if (lowPos == 0)
@@ -357,13 +243,13 @@
             }
         }
 
-        // flush replayed tables, allowing commitlog segments to be removed
+        // flush replayed tables
         List<Future<?>> futures = new ArrayList<Future<?>>();
         for (Table table : tablesRecovered)
         {
             futures.addAll(table.flush());
         }
-        // wait for flushes to finish before continuing with startup
+        // wait for flushes to finish
         for (Future<?> future : futures)
         {
             try
@@ -377,31 +263,18 @@
         }
     }
 
-    /*
-     * Update the header of the commit log if a new column family
-     * is encountered for the first time.
-    */
-    private void maybeUpdateHeader(RowMutation rm) throws IOException
+    private CommitLogSegment currentSegment()
     {
-        Table table = Table.open(rm.getTable());
-        for (ColumnFamily columnFamily : rm.getColumnFamilies())
-        {
-            int id = table.getColumnFamilyId(columnFamily.name());
-            if (!clHeader.isDirty(id))
-            {
-                clHeader.turnOn(id, logWriter.getFilePointer());
-                seekAndWriteCommitLogHeader(clHeader.toByteArray());
-            }
-        }
+        return segments.getLast();
     }
     
-    public CommitLogContext getContext() throws IOException
+    public CommitLogSegment.CommitLogContext getContext() throws IOException
     {
-        Callable<CommitLogContext> task = new Callable<CommitLogContext>()
+        Callable<CommitLogSegment.CommitLogContext> task = new Callable<CommitLogSegment.CommitLogContext>()
         {
-            public CommitLogContext call() throws Exception
+            public CommitLogSegment.CommitLogContext call() throws Exception
             {
-                return new CommitLogContext(logFile, logWriter.getFilePointer());
+                return currentSegment().getContext();
             }
         };
         try
@@ -424,9 +297,9 @@
      * of any problems. This way we can assume that the subsequent commit log
      * entry will override the garbage left over by the previous write.
     */
-    public Future<CommitLogContext> add(RowMutation rowMutation, Object serializedRow)
throws IOException
+    public Future<CommitLogSegment.CommitLogContext> add(RowMutation rowMutation, Object
serializedRow) throws IOException
     {
-        Callable<CommitLogContext> task = new LogRecordAdder(rowMutation, serializedRow);
+        Callable<CommitLogSegment.CommitLogContext> task = new LogRecordAdder(rowMutation,
serializedRow);
         return executor.submit(task);
     }
 
@@ -436,15 +309,14 @@
      * The bit flag associated with this column family is set in the
      * header and this is used to decide if the log file can be deleted.
     */
-    public void onMemtableFlush(final String tableName, final String cf, final CommitLog.CommitLogContext
cLogCtx) throws IOException
+    public void discardCompletedSegments(final String tableName, final String cf, final CommitLogSegment.CommitLogContext
context) throws IOException
     {
         Callable task = new Callable()
         {
             public Object call() throws IOException
             {
-                Table table = Table.open(tableName);
-                int id = table.getColumnFamilyId(cf);
-                discardCompletedSegments(cLogCtx, id);
+                int id = Table.open(tableName).getColumnFamilyId(cf);
+                discardCompletedSegmentsInternal(context, id);
                 return null;
             }
         };
@@ -462,41 +334,23 @@
         }
     }
 
-    /*
-     * Delete log segments whose contents have been turned into SSTables.
+    /**
+     * Delete log segments whose contents have been turned into SSTables. NOT threadsafe.
      *
-     * param @ cLogCtx The commitLog context .
+     * param @ context The commitLog context .
      * param @ id id of the columnFamily being flushed to disk.
      *
     */
-    private void discardCompletedSegments(CommitLog.CommitLogContext cLogCtx, int id) throws
IOException
+    private void discardCompletedSegmentsInternal(CommitLogSegment.CommitLogContext context,
int id) throws IOException
     {
         if (logger.isDebugEnabled())
-            logger.debug("discard completed log segments for " + cLogCtx + ", column family
" + id + ". CFIDs are " + Table.TableMetadata.getColumnFamilyIDString());
-        /* retrieve the commit log header associated with the file in the context */
-        if (clHeaders.get(cLogCtx.file) == null)
-        {
-            if (logFile.equals(cLogCtx.file))
-            {
-                /* this means we are dealing with the current commit log. */
-                clHeaders.put(cLogCtx.file, clHeader);
-            }
-            else
-            {
-                logger.error("Unknown commitlog file " + cLogCtx.file);
-                return;
-            }
-        }
+            logger.debug("discard completed log segments for " + context + ", column family
" + id + ". CFIDs are " + Table.TableMetadata.getColumnFamilyIDString());
 
         /*
          * log replay assumes that we only have to look at entries past the last
          * flush position, so verify that this flush happens after the last.
         */
-        assert cLogCtx.position >= clHeaders.get(cLogCtx.file).getPosition(id);
-
-        /* Sort the commit logs based on creation time */
-        List<String> oldFiles = new ArrayList<String>(clHeaders.keySet());
-        Collections.sort(oldFiles, new CommitLogFileComparator());
+        assert context.position > context.getSegment().getHeader().getPosition(id) : "discard
called on obsolete context " + context;
 
         /*
          * Loop through all the commit log files in the history. Now process
@@ -504,78 +358,47 @@
          * these files the header needs to modified by resetting the dirty
          * bit corresponding to the flushed CF.
         */
-        for (String oldFile : oldFiles)
+        for (CommitLogSegment segment : segments)
         {
-            CommitLogHeader header = clHeaders.get(oldFile);
-            if (oldFile.equals(cLogCtx.file))
+            CommitLogHeader header = segment.getHeader();
+            if (segment.equals(context.getSegment()))
             {
                 // we can't just mark the segment where the flush happened clean,
                 // since there may have been writes to it between when the flush
                 // started and when it finished. so mark the flush position as
                 // the replay point for this CF, instead.
                 if (logger.isDebugEnabled())
-                    logger.debug("Marking replay position " + cLogCtx.position + " on commit
log " + oldFile);
-                header.turnOn(id, cLogCtx.position);
-                if (oldFile.equals(logFile))
-                {
-                    seekAndWriteCommitLogHeader(header.toByteArray());
-                }
-                else
-                {
-                    writeOldCommitLogHeader(oldFile, header);
-                }
+                    logger.debug("Marking replay position " + context.position + " on commit
log " + segment);
+                header.turnOn(id, context.position);
+                segment.writeHeader();
                 break;
             }
 
             header.turnOff(id);
             if (header.isSafeToDelete())
             {
-                logger.info("Deleting obsolete commit log:" + oldFile);
-                DeletionService.submitDelete(oldFile);
-                clHeaders.remove(oldFile);
+                logger.info("Discarding obsolete commit log:" + segment);
+                segment.close();
+                DeletionService.submitDelete(segment.getPath());
+                // usually this will be the first (remaining) segment, but not always, if
segment A contains
+                // writes to a CF that is unflushed but is followed by segment B whose CFs
are all flushed.
+                segments.remove(segment);
             }
             else
             {
                 if (logger.isDebugEnabled())
-                    logger.debug("Not safe to delete commit log " + oldFile + "; dirty is
" + header.dirtyString());
-                writeOldCommitLogHeader(oldFile, header);
+                    logger.debug("Not safe to delete commit log " + segment + "; dirty is
" + header.dirtyString());
+                segment.writeHeader();
             }
         }
     }
 
-    private void writeOldCommitLogHeader(String oldFile, CommitLogHeader header) throws IOException
-    {
-        BufferedRandomAccessFile logWriter = CommitLog.createWriter(oldFile);
-        writeCommitLogHeader(logWriter, header.toByteArray());
-        logWriter.close();
-    }
-
-    private boolean maybeRollLog() throws IOException
-    {
-        if (logWriter.length() >= SEGMENT_SIZE)
-        {
-            /* Rolls the current log file over to a new one. */
-            setNextFileName();
-            String oldLogFile = logWriter.getPath();
-            logWriter.close();
-
-            /* point reader/writer to a new commit log file. */
-            logWriter = CommitLog.createWriter(logFile);
-            /* squirrel away the old commit log header */
-            clHeaders.put(oldLogFile, new CommitLogHeader(clHeader));
-            clHeader.clear();
-            writeCommitLogHeader(logWriter, clHeader.toByteArray());
-            return true;
-        }
-        return false;
-    }
-
     void sync() throws IOException
     {
-        logWriter.sync();
+        currentSegment().sync();
     }
 
-    class LogRecordAdder implements Callable<CommitLog.CommitLogContext>
+    class LogRecordAdder implements Callable<CommitLogSegment.CommitLogContext>
     {
         final RowMutation rowMutation;
         final Object serializedRow;
@@ -586,40 +409,18 @@
             this.serializedRow = serializedRow;
         }
 
-        public CommitLog.CommitLogContext call() throws Exception
+        public CommitLogSegment.CommitLogContext call() throws Exception
         {
-            long currentPosition = -1L;
-            try
+            CommitLogSegment.CommitLogContext context = currentSegment().write(rowMutation,
serializedRow);
+
+            // roll log if necessary
+            if (currentSegment().length() >= SEGMENT_SIZE)
             {
-                currentPosition = logWriter.getFilePointer();
-                CommitLogContext cLogCtx = new CommitLogContext(logFile, currentPosition);
-                maybeUpdateHeader(rowMutation);
-                Checksum checkum = new CRC32();
-                if (serializedRow instanceof DataOutputBuffer)
-                {
-                    DataOutputBuffer buffer = (DataOutputBuffer) serializedRow;
-                    logWriter.writeLong(buffer.getLength());
-                    logWriter.write(buffer.getData(), 0, buffer.getLength());
-                    checkum.update(buffer.getData(), 0, buffer.getLength());
-                }
-                else
-                {
-                    assert serializedRow instanceof byte[];
-                    byte[] bytes = (byte[]) serializedRow;
-                    logWriter.writeLong(bytes.length);
-                    logWriter.write(bytes);
-                    checkum.update(bytes, 0, bytes.length);
-                }
-                logWriter.writeLong(checkum.getValue());
-                maybeRollLog();
-                return cLogCtx;
-            }
-            catch (IOException e)
-            {
-                if ( currentPosition != -1 )
-                    logWriter.seek(currentPosition);
-                throw e;
+                sync();
+                segments.add(new CommitLogSegment(currentSegment().getHeader().getColumnFamilyCount()));
             }
+
+            return context;
         }
     }
 }

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLogExecutorService.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLogExecutorService.java?rev=908830&r1=908829&r2=908830&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLogExecutorService.java
(original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLogExecutorService.java
Thu Feb 11 03:49:49 2010
@@ -11,9 +11,9 @@
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.utils.WrappedRunnable;
 
-public class CommitLogExecutorService extends AbstractExecutorService implements CommitLogExecutorServiceMBean
+class CommitLogExecutorService extends AbstractExecutorService implements CommitLogExecutorServiceMBean
 {
-    BlockingQueue<CheaterFutureTask> queue;
+    private final BlockingQueue<CheaterFutureTask> queue;
 
     private volatile long completedTaskCount = 0;
 
@@ -92,8 +92,8 @@
         queue.take().run();
     }
 
-    private ArrayList<CheaterFutureTask> incompleteTasks = new ArrayList<CheaterFutureTask>();
-    private ArrayList taskValues = new ArrayList(); // TODO not sure how to generify this
+    private final ArrayList<CheaterFutureTask> incompleteTasks = new ArrayList<CheaterFutureTask>();
+    private final ArrayList taskValues = new ArrayList(); // TODO not sure how to generify
this
     private void processWithSyncBatch() throws Exception
     {
         CheaterFutureTask firstTask = queue.take();
@@ -188,26 +188,26 @@
     {
         throw new UnsupportedOperationException();
     }
-}
-
-class CheaterFutureTask<V> extends FutureTask<V>
-{
-    private Callable rawCallable;
 
-    public CheaterFutureTask(Callable<V> callable)
+    private static class CheaterFutureTask<V> extends FutureTask<V>
     {
-        super(callable);
-        rawCallable = callable;
-    }
+        private final Callable rawCallable;
 
-    public Callable getRawCallable()
-    {
-        return rawCallable;
-    }
+        public CheaterFutureTask(Callable<V> callable)
+        {
+            super(callable);
+            rawCallable = callable;
+        }
 
-    @Override
-    public void set(V v)
-    {
-        super.set(v);
+        public Callable getRawCallable()
+        {
+            return rawCallable;
+        }
+
+        @Override
+        public void set(V v)
+        {
+            super.set(v);
+        }
     }
-}
\ No newline at end of file
+}

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLogHeader.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLogHeader.java?rev=908830&r1=908829&r2=908830&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLogHeader.java
(original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLogHeader.java
Thu Feb 11 03:49:49 2010
@@ -24,6 +24,7 @@
 
 import org.apache.cassandra.db.Table;
 import org.apache.cassandra.io.ICompactSerializer;
+import org.apache.cassandra.io.util.BufferedRandomAccessFile;
 import org.apache.cassandra.utils.BitSetSerializer;
 
 class CommitLogHeader
@@ -70,14 +71,7 @@
         this.dirty = dirty;
         this.lastFlushedAt = lastFlushedAt;
     }
-    
-    CommitLogHeader(CommitLogHeader clHeader)
-    {
-        dirty = (BitSet)clHeader.dirty.clone();
-        lastFlushedAt = new int[clHeader.lastFlushedAt.length];
-        System.arraycopy(clHeader.lastFlushedAt, 0, lastFlushedAt, 0, lastFlushedAt.length);
-    }
-    
+        
     boolean isDirty(int index)
     {
         return dirty.get(index);
@@ -105,12 +99,6 @@
         return dirty.isEmpty();
     }
 
-    void clear()
-    {
-        dirty.clear();
-        Arrays.fill(lastFlushedAt, 0);
-    }
-        
     byte[] toByteArray() throws IOException
     {
         ByteArrayOutputStream bos = new ByteArrayOutputStream();
@@ -152,6 +140,20 @@
         return sb.toString();
     }
 
+    static CommitLogHeader readCommitLogHeader(BufferedRandomAccessFile logReader) throws
IOException
+    {
+        int size = (int)logReader.readLong();
+        byte[] bytes = new byte[size];
+        logReader.read(bytes);
+        ByteArrayInputStream byteStream = new ByteArrayInputStream(bytes);
+        return serializer().deserialize(new DataInputStream(byteStream));
+    }
+
+    public int getColumnFamilyCount()
+    {
+        return lastFlushedAt.length;
+    }
+
     static class CommitLogHeaderSerializer implements ICompactSerializer<CommitLogHeader>
     {
         public void serialize(CommitLogHeader clHeader, DataOutputStream dos) throws IOException

Added: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java?rev=908830&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
(added)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
Thu Feb 11 03:49:49 2010
@@ -0,0 +1,193 @@
+package org.apache.cassandra.db.commitlog;
+
+import java.io.File;
+import java.io.IOError;
+import java.io.IOException;
+import java.util.zip.CRC32;
+import java.util.zip.Checksum;
+
+import org.apache.log4j.Logger;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.ColumnFamily;
+import org.apache.cassandra.db.RowMutation;
+import org.apache.cassandra.db.Table;
+import org.apache.cassandra.io.util.BufferedRandomAccessFile;
+import org.apache.cassandra.io.util.DataOutputBuffer;
+
+public class CommitLogSegment
+{
+    private static final Logger logger = Logger.getLogger(CommitLogSegment.class);
+
+    private final BufferedRandomAccessFile logWriter;
+    private final CommitLogHeader header;
+
+    public CommitLogSegment(int cfCount)
+    {
+        this.header = new CommitLogHeader(cfCount);
+        String logFile = DatabaseDescriptor.getLogFileLocation() + File.separator + "CommitLog-"
+ System.currentTimeMillis() + ".log";
+        logger.info("Creating new commitlog segment " + logFile);
+
+        try
+        {
+            logWriter = createWriter(logFile);
+            writeCommitLogHeader(header.toByteArray());
+        }
+        catch (IOException e)
+        {
+            throw new IOError(e);
+        }
+    }
+
+    public void writeHeader() throws IOException
+    {
+        seekAndWriteCommitLogHeader(header.toByteArray());
+    }
+
+    /** writes header at the beginning of the file, then seeks back to current position */
+    void seekAndWriteCommitLogHeader(byte[] bytes) throws IOException
+    {
+        long currentPos = logWriter.getFilePointer();
+        logWriter.seek(0);
+
+        writeCommitLogHeader(bytes);
+
+        logWriter.seek(currentPos);
+    }
+
+    private void writeCommitLogHeader(byte[] bytes) throws IOException
+    {
+        logWriter.writeLong(bytes.length);
+        logWriter.write(bytes);
+        logWriter.sync();
+    }
+
+    private static BufferedRandomAccessFile createWriter(String file) throws IOException
+    {
+        return new BufferedRandomAccessFile(file, "rw", 128 * 1024);
+    }
+
+    public CommitLogSegment.CommitLogContext write(RowMutation rowMutation, Object serializedRow)
throws IOException
+    {
+        long currentPosition = -1L;
+        try
+        {
+            currentPosition = logWriter.getFilePointer();
+            CommitLogSegment.CommitLogContext cLogCtx = new CommitLogSegment.CommitLogContext(currentPosition);
+            Table table = Table.open(rowMutation.getTable());
+
+            // update header
+            for (ColumnFamily columnFamily : rowMutation.getColumnFamilies())
+            {
+                int id = table.getColumnFamilyId(columnFamily.name());
+                if (!header.isDirty(id))
+                {
+                    header.turnOn(id, logWriter.getFilePointer());
+                    seekAndWriteCommitLogHeader(header.toByteArray());
+                }
+            }
+
+            // write mutation, w/ checksum
+            Checksum checkum = new CRC32();
+            if (serializedRow instanceof DataOutputBuffer)
+            {
+                DataOutputBuffer buffer = (DataOutputBuffer) serializedRow;
+                logWriter.writeLong(buffer.getLength());
+                logWriter.write(buffer.getData(), 0, buffer.getLength());
+                checkum.update(buffer.getData(), 0, buffer.getLength());
+            }
+            else
+            {
+                assert serializedRow instanceof byte[];
+                byte[] bytes = (byte[]) serializedRow;
+                logWriter.writeLong(bytes.length);
+                logWriter.write(bytes);
+                checkum.update(bytes, 0, bytes.length);
+            }
+            logWriter.writeLong(checkum.getValue());
+
+            return cLogCtx;
+        }
+        catch (IOException e)
+        {
+            if (currentPosition != -1)
+                logWriter.seek(currentPosition);
+            throw e;
+        }
+    }
+
+    public void sync() throws IOException
+    {
+        logWriter.sync();
+    }
+
+    public CommitLogContext getContext()
+    {
+        return new CommitLogContext(logWriter.getFilePointer());
+    }
+
+    public CommitLogHeader getHeader()
+    {
+        return header;
+    }
+
+    public String getPath()
+    {
+        return logWriter.getPath();
+    }
+
+    public long length()
+    {
+        try
+        {
+            return logWriter.length();
+        }
+        catch (IOException e)
+        {
+            throw new IOError(e);
+        }
+    }
+
+    public void close()
+    {
+        try
+        {
+            logWriter.close();
+        }
+        catch (IOException e)
+        {
+            throw new IOError(e);
+        }
+    }
+
+    @Override
+    public String toString()
+    {
+        return "CommitLogSegment(" + logWriter.getPath() + ')';
+    }
+
+    public class CommitLogContext
+    {
+        public final long position;
+
+        public CommitLogContext(long position)
+        {
+            assert position >= 0;
+            this.position = position;
+        }
+
+        public CommitLogSegment getSegment()
+        {
+            return CommitLogSegment.this;
+        }
+
+        @Override
+        public String toString()
+        {
+            return "CommitLogContext(" +
+                   "file='" + logWriter.getPath() + '\'' +
+                   ", position=" + position +
+                   ')';
+        }
+    }
+}

Propchange: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/CommitLogTest.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/CommitLogTest.java?rev=908830&r1=908829&r2=908830&view=diff
==============================================================================
--- incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/CommitLogTest.java (original)
+++ incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/CommitLogTest.java Thu Feb
11 03:49:49 2010
@@ -32,7 +32,7 @@
     @Test
     public void testCleanup() throws IOException, ExecutionException, InterruptedException
     {
-        assert CommitLog.getSegmentCount() == 0;
+        assert CommitLog.instance().getSegmentCount() == 1;
         CommitLog.setSegmentSize(1000);
 
         Table table = Table.open("Keyspace1");
@@ -49,14 +49,14 @@
             rm.add(new QueryPath("Standard2", null, "Column1".getBytes()), value, 0);
             rm.apply();
         }
-        assert CommitLog.getSegmentCount() > 1;
+        assert CommitLog.instance().getSegmentCount() > 1;
 
         // nothing should get removed after flushing just Standard1
         store1.forceBlockingFlush();
-        assert CommitLog.getSegmentCount() > 1;
+        assert CommitLog.instance().getSegmentCount() > 1;
 
         // after flushing Standard2 we should be able to clean out all segments
         store2.forceBlockingFlush();
-        assert CommitLog.getSegmentCount() == 1;
+        assert CommitLog.instance().getSegmentCount() == 1;
     }
 }



Mime
View raw message