cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jbel...@apache.org
Subject [1/3] git commit: merge from 1.1
Date Mon, 09 Apr 2012 16:30:20 GMT
Updated Branches:
  refs/heads/cassandra-1.1 044e17a22 -> 5923d3295
  refs/heads/trunk 0cc97d91c -> 64305dd9f


merge from 1.1


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/64305dd9
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/64305dd9
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/64305dd9

Branch: refs/heads/trunk
Commit: 64305dd9f9e608b51ce521158269c6c7222def24
Parents: 0cc97d9 5923d32
Author: Jonathan Ellis <jbellis@apache.org>
Authored: Mon Apr 9 11:28:33 2012 -0500
Committer: Jonathan Ellis <jbellis@apache.org>
Committed: Mon Apr 9 11:29:49 2012 -0500

----------------------------------------------------------------------
 CHANGES.txt                                        |    2 +
 conf/cassandra.yaml                                |    8 +
 conf/commitlog_archiving.properties                |   37 ++
 src/java/org/apache/cassandra/config/Config.java   |    1 +
 .../cassandra/config/DatabaseDescriptor.java       |    8 +
 .../apache/cassandra/db/commitlog/CommitLog.java   |  253 ++------------
 .../cassandra/db/commitlog/CommitLogAllocator.java |   20 +-
 .../cassandra/db/commitlog/CommitLogArchiver.java  |  147 ++++++++
 .../cassandra/db/commitlog/CommitLogMBean.java     |   17 +
 .../cassandra/db/commitlog/CommitLogReplayer.java  |  269 +++++++++++++++
 .../cassandra/db/commitlog/CommitLogSegment.java   |   14 +-
 src/java/org/apache/cassandra/utils/CLibrary.java  |   33 +--
 .../org/apache/cassandra/utils/FBUtilities.java    |   34 ++-
 13 files changed, 580 insertions(+), 263 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/64305dd9/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 2c42ced,b80368a..eb97992
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,11 -1,6 +1,13 @@@
 +1.2-dev
 + * Track tombstone expiration and compact when tombstone content is
 +   higher than a configurable threshold, default 20% (CASSANDRA-3442)
 + * update MurmurHash to version 3 (CASSANDRA-2975)
 + * (CLI) track elapsed time for `delete' operation (CASSANDRA-4060)
 +
 +
  1.1.1-dev
+  * add support for commitlog archiving and point-in-time recovery
+    (CASSANDRA-3647)
   * update caches to use byte[] keys to reduce memory overhead (CASSANDRA-3966)
   * add column limit to cli (CASSANDRA-3012, 4098)
   * clean up and optimize DataOutputBuffer, used by CQL compression and

http://git-wip-us.apache.org/repos/asf/cassandra/blob/64305dd9/src/java/org/apache/cassandra/config/Config.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/64305dd9/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/64305dd9/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/commitlog/CommitLog.java
index 5b3c7b0,3c34772..2305dd5
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
@@@ -398,8 -201,8 +200,8 @@@ public class CommitLog implements Commi
       */
      public void add(RowMutation rm) throws IOException
      {
 -        long totalSize = RowMutation.serializer().serializedSize(rm, MessagingService.version_)
+ CommitLogSegment.ENTRY_OVERHEAD_SIZE;
 +        long totalSize = RowMutation.serializer().serializedSize(rm, MessagingService.current_version)
+ CommitLogSegment.ENTRY_OVERHEAD_SIZE;
-         if (totalSize > CommitLog.SEGMENT_SIZE)
+         if (totalSize > DatabaseDescriptor.getCommitLogSegmentSize())
          {
              logger.warn("Skipping commitlog append of extremely large mutation ({} bytes)",
totalSize);
              return;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/64305dd9/src/java/org/apache/cassandra/db/commitlog/CommitLogAllocator.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/64305dd9/src/java/org/apache/cassandra/db/commitlog/CommitLogMBean.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/64305dd9/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
index 0000000,eb997fc..488af20
mode 000000,100644..100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
@@@ -1,0 -1,269 +1,269 @@@
+ package org.apache.cassandra.db.commitlog;
+ 
+ import java.io.DataInputStream;
+ import java.io.EOFException;
+ import java.io.File;
+ import java.io.IOException;
+ import java.util.ArrayList;
+ import java.util.HashMap;
+ import java.util.List;
+ import java.util.Map;
+ import java.util.Set;
+ import java.util.concurrent.Future;
+ import java.util.concurrent.atomic.AtomicInteger;
+ import java.util.zip.Checksum;
+ 
+ import org.apache.cassandra.concurrent.Stage;
+ import org.apache.cassandra.concurrent.StageManager;
+ import org.apache.cassandra.config.Schema;
+ import org.apache.cassandra.db.ColumnFamily;
+ import org.apache.cassandra.db.ColumnFamilyStore;
+ import org.apache.cassandra.db.RowMutation;
+ import org.apache.cassandra.db.Table;
+ import org.apache.cassandra.db.UnknownColumnFamilyException;
+ import org.apache.cassandra.io.IColumnSerializer;
+ import org.apache.cassandra.io.util.FastByteArrayInputStream;
+ import org.apache.cassandra.io.util.FileUtils;
+ import org.apache.cassandra.io.util.RandomAccessReader;
+ import org.apache.cassandra.net.MessagingService;
+ import org.apache.cassandra.utils.ByteBufferUtil;
+ import org.apache.cassandra.utils.FBUtilities;
+ import org.apache.cassandra.utils.PureJavaCrc32;
+ import org.apache.cassandra.utils.WrappedRunnable;
+ import org.apache.commons.lang.StringUtils;
+ import org.cliffc.high_scale_lib.NonBlockingHashSet;
+ import org.slf4j.Logger;
+ import org.slf4j.LoggerFactory;
+ 
+ import com.google.common.collect.Ordering;
+ 
+ public class CommitLogReplayer
+ {
+     private static final Logger logger = LoggerFactory.getLogger(CommitLogReplayer.class);
+     private static final int MAX_OUTSTANDING_REPLAY_COUNT = 1024;
+ 
+     private final Set<Table> tablesRecovered;
+     private final List<Future<?>> futures;
+     private final Map<Integer, AtomicInteger> invalidMutations;
+ private final AtomicInteger replayedCount;
+     private final Map<Integer, ReplayPosition> cfPositions;
+     private final ReplayPosition globalPosition;
+     private final Checksum checksum;
+     private byte[] buffer;
+ 
+     public CommitLogReplayer()
+     {
+         this.tablesRecovered = new NonBlockingHashSet<Table>();
+         this.futures = new ArrayList<Future<?>>();
+         this.buffer = new byte[4096];
+         this.invalidMutations = new HashMap<Integer, AtomicInteger>();
+         // count the number of replayed mutation. We don't really care about atomicity,
but we need it to be a reference.
+         this.replayedCount = new AtomicInteger();
+         // compute per-CF and global replay positions
+         this.cfPositions = new HashMap<Integer, ReplayPosition>();
+         for (ColumnFamilyStore cfs : ColumnFamilyStore.all())
+         {
+             // it's important to call RP.gRP per-cf, before aggregating all the positions
w/ the Ordering.min call
+             // below: gRP will return NONE if there are no flushed sstables, which is important
to have in the
+             // list (otherwise we'll just start replay from the first flush position that
we do have, which is not correct).
+             ReplayPosition rp = ReplayPosition.getReplayPosition(cfs.getSSTables());
+             cfPositions.put(cfs.metadata.cfId, rp);
+         }
+         this.globalPosition = Ordering.from(ReplayPosition.comparator).min(cfPositions.values());
+         this.checksum = new PureJavaCrc32();
+     }
+ 
+     public void recover(File[] clogs) throws IOException
+     {
+         for (final File file : clogs)
+             recover(file);
+     }
+ 
+     public int blockForWrites() throws IOException
+     {
+         for (Map.Entry<Integer, AtomicInteger> entry : invalidMutations.entrySet())
+             logger.info(String.format("Skipped %d mutations from unknown (probably removed)
CF with id %d", entry.getValue().intValue(), entry.getKey()));
+ 
+         // wait for all the writes to finish on the mutation stage
+         FBUtilities.waitOnFutures(futures);
+         logger.debug("Finished waiting on mutations from recovery");
+ 
+         // flush replayed tables
+         futures.clear();
+         for (Table table : tablesRecovered)
+             futures.addAll(table.flush());
+         FBUtilities.waitOnFutures(futures);
+         return replayedCount.get();
+     }
+ 
+     public void recover(File file) throws IOException
+     {
+         logger.info("Replaying " + file.getPath());
+         final long segment = CommitLogSegment.idFromFilename(file.getName());
+         RandomAccessReader reader = RandomAccessReader.open(new File(file.getAbsolutePath()),
true);
+         assert reader.length() <= Integer.MAX_VALUE;
+         try
+         {
+             int replayPosition;
+             if (globalPosition.segment < segment)
+                 replayPosition = 0;
+             else if (globalPosition.segment == segment)
+                 replayPosition = globalPosition.position;
+             else
+                 replayPosition = (int) reader.length();
+ 
+             if (replayPosition < 0 || replayPosition >= reader.length())
+             {
+                 // replayPosition > reader.length() can happen if some data gets flushed
before it is written to the commitlog
+                 // (see https://issues.apache.org/jira/browse/CASSANDRA-2285)
+                 logger.debug("skipping replay of fully-flushed {}", file);
+                 return;
+             }
+ 
+             reader.seek(replayPosition);
+ 
+             if (logger.isDebugEnabled())
+                 logger.debug("Replaying " + file + " starting at " + reader.getFilePointer());
+ 
+             /* read the logs populate RowMutation and apply */
+             while (!reader.isEOF())
+             {
+                 if (logger.isDebugEnabled())
+                     logger.debug("Reading mutation at " + reader.getFilePointer());
+ 
+                 long claimedCRC32;
+                 int serializedSize;
+                 try
+                 {
+                     // any of the reads may hit EOF
+                     serializedSize = reader.readInt();
+                     if (serializedSize == CommitLog.END_OF_SEGMENT_MARKER)
+                     {
+                         logger.debug("Encountered end of segment marker at " + reader.getFilePointer());
+                         break;
+                     }
+ 
+                     // RowMutation must be at LEAST 10 bytes:
+                     // 3 each for a non-empty Table and Key (including the
+                     // 2-byte length from writeUTF/writeWithShortLength) and 4 bytes for
column count.
+                     // This prevents CRC by being fooled by special-case garbage in the
file; see CASSANDRA-2128
+                     if (serializedSize < 10)
+                         break;
+                     long claimedSizeChecksum = reader.readLong();
+                     checksum.reset();
+                     checksum.update(serializedSize);
+                     if (checksum.getValue() != claimedSizeChecksum)
+                         break; // entry wasn't synced correctly/fully. that's
+                                // ok.
+ 
+                     if (serializedSize > buffer.length)
+                         buffer = new byte[(int) (1.2 * serializedSize)];
+                     reader.readFully(buffer, 0, serializedSize);
+                     claimedCRC32 = reader.readLong();
+                 }
+                 catch (EOFException eof)
+                 {
+                     break; // last CL entry didn't get completely written. that's ok.
+                 }
+ 
+                 checksum.update(buffer, 0, serializedSize);
+                 if (claimedCRC32 != checksum.getValue())
+                 {
+                     // this entry must not have been fsynced. probably the rest is bad too,
+                     // but just in case there is no harm in trying them (since we still
read on an entry boundary)
+                     continue;
+                 }
+ 
+                 /* deserialize the commit log entry */
+                 FastByteArrayInputStream bufIn = new FastByteArrayInputStream(buffer, 0,
serializedSize);
+                 RowMutation rm;
+                 try
+                 {
+                     // assuming version here. We've gone to lengths to make sure what gets
written to the CL is in
+                     // the current version. so do make sure the CL is drained prior to upgrading
a node.
 -                    rm = RowMutation.serializer().deserialize(new DataInputStream(bufIn),
MessagingService.version_, IColumnSerializer.Flag.LOCAL);
++                    rm = RowMutation.serializer().deserialize(new DataInputStream(bufIn),
MessagingService.current_version, IColumnSerializer.Flag.LOCAL);
+                 }
+                 catch (UnknownColumnFamilyException ex)
+                 {
+                     AtomicInteger i = invalidMutations.get(ex.cfId);
+                     if (i == null)
+                     {
+                         i = new AtomicInteger(1);
+                         invalidMutations.put(ex.cfId, i);
+                     }
+                     else
+                         i.incrementAndGet();
+                     continue;
+                 }
+ 
+                 if (logger.isDebugEnabled())
+                     logger.debug(String.format("replaying mutation for %s.%s: %s", rm.getTable(),
ByteBufferUtil.bytesToHex(rm.key()), "{" + StringUtils.join(rm.getColumnFamilies().iterator(),
", ")
+                             + "}"));
+ 
+                 final long entryLocation = reader.getFilePointer();
+                 final RowMutation frm = rm;
+                 Runnable runnable = new WrappedRunnable()
+                 {
+                     public void runMayThrow() throws IOException
+                     {
+                         if (Schema.instance.getKSMetaData(frm.getTable()) == null)
+                             return;
+                         if (pointInTimeExceeded(frm))
+                             return;
+ 
+                         final Table table = Table.open(frm.getTable());
+                         RowMutation newRm = new RowMutation(frm.getTable(), frm.key());
+ 
+                         // Rebuild the row mutation, omitting column families that 
+                         // a) have already been flushed,
+                         // b) are part of a cf that was dropped. Keep in mind that the cf.name()
is suspect. do every thing based on the cfid instead.
+                         for (ColumnFamily columnFamily : frm.getColumnFamilies())
+                         {
+                             if (Schema.instance.getCF(columnFamily.id()) == null)
+                                 // null means the cf has been dropped
+                                 continue;
+ 
+                             ReplayPosition rp = cfPositions.get(columnFamily.id());
+ 
+                             // replay if current segment is newer than last flushed one
or, 
+                             // if it is the last known segment, if we are after the replay
position
+                             if (segment > rp.segment || (segment == rp.segment &&
entryLocation > rp.position))
+                             {
+                                 newRm.add(columnFamily);
+                                 replayedCount.incrementAndGet();
+                             }
+                         }
+                         if (!newRm.isEmpty())
+                         {
+                             Table.open(newRm.getTable()).apply(newRm, false);
+                             tablesRecovered.add(table);
+                         }
+                     }
+                 };
+                 futures.add(StageManager.getStage(Stage.MUTATION).submit(runnable));
+                 if (futures.size() > MAX_OUTSTANDING_REPLAY_COUNT)
+                 {
+                     FBUtilities.waitOnFutures(futures);
+                     futures.clear();
+                 }
+             }
+         }
+         finally
+         {
+             FileUtils.closeQuietly(reader);
+             logger.info("Finished reading " + file);
+         }
+     }
+ 
+     protected boolean pointInTimeExceeded(RowMutation frm)
+     {
+         long restoreTarget = CommitLog.instance.archiver.restorePointInTime;
+ 
+         for (ColumnFamily families : frm.getColumnFamilies())
+         {
+             if (families.maxTimestamp() > restoreTarget)
+                 return true;
+         }
+         return false;
+     }
+ }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/64305dd9/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/64305dd9/src/java/org/apache/cassandra/utils/CLibrary.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/64305dd9/src/java/org/apache/cassandra/utils/FBUtilities.java
----------------------------------------------------------------------


Mime
View raw message