cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From marc...@apache.org
Subject git commit: Add commit_failure_policy.
Date Tue, 11 Feb 2014 12:14:52 GMT
Updated Branches:
  refs/heads/cassandra-2.0 55b5605b7 -> 9381b8d56


Add commit_failure_policy.

Patch by belliottsmith, reviewed by marcuse for CASSANDRA-6364

CASSANDRA-6364


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/9381b8d5
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/9381b8d5
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/9381b8d5

Branch: refs/heads/cassandra-2.0
Commit: 9381b8d569ae17cf2760bca266b5253e4bcd6ac2
Parents: 55b5605
Author: Marcus Eriksson <marcuse@apache.org>
Authored: Tue Feb 11 13:13:37 2014 +0100
Committer: Marcus Eriksson <marcuse@apache.org>
Committed: Tue Feb 11 13:13:37 2014 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |  2 +-
 conf/cassandra.yaml                             |  8 +++++
 .../org/apache/cassandra/config/Config.java     |  8 +++++
 .../cassandra/config/DatabaseDescriptor.java    | 11 ++++++
 .../BatchCommitLogExecutorService.java          | 17 +++++++--
 .../cassandra/db/commitlog/CommitLog.java       | 24 +++++++++++++
 .../db/commitlog/CommitLogAllocator.java        | 37 +++++++++++++-------
 .../PeriodicCommitLogExecutorService.java       | 26 ++++++++++++--
 .../org/apache/cassandra/io/util/FileUtils.java | 20 ++---------
 .../cassandra/service/StorageService.java       | 19 ++++++++++
 .../org/apache/cassandra/db/CommitLogTest.java  | 32 +++++++++++++++++
 11 files changed, 169 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/9381b8d5/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 93552ef..a8114a3 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -9,13 +9,13 @@
  * Account for range/row tombstones in tombstone drop
    time histogram (CASSANDRA-6522)
  * Stop CommitLogSegment.close() from calling sync() (CASSANDRA-6652)
+ * Make commitlog failure handling configurable (CASSANDRA-6364)
 Merged from 1.2:
  * Fix upgradesstables NPE for non-CF-based indexes (CASSANDRA-6645)
  * Fix partition and range deletes not triggering flush (CASSANDRA-6655)
  * Fix mean cells and mean row size per sstable calculations (CASSANDRA-6667)
  * Compact hints after partial replay to clean out tombstones (CASSANDRA-6666)
 
-
 2.0.5
  * Reduce garbage generated by bloom filter lookups (CASSANDRA-6609)
  * Add ks.cf names to tombstone logging (CASSANDRA-6597)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9381b8d5/conf/cassandra.yaml
----------------------------------------------------------------------
diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index db924bb..bfe60c4 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -121,6 +121,14 @@ commitlog_directory: /var/lib/cassandra/commitlog
 # ignore: ignore fatal errors and let requests fail, as in pre-1.2 Cassandra
 disk_failure_policy: stop
 
+# policy for commit disk failures:
+# stop: shut down gossip and Thrift, leaving the node effectively dead, but
+#       can still be inspected via JMX.
+# stop_commit: shutdown the commit log, letting writes collect but 
+#              continuing to service reads, as in pre-2.0.5 Cassandra
+# ignore: ignore fatal errors and let the batches fail
+commit_failure_policy: stop
+
 # Maximum size of the key cache in memory.
 #
 # Each key cache hit saves 1 seek and each row cache hit saves 2 seeks at the

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9381b8d5/src/java/org/apache/cassandra/config/Config.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java
index a4e4e92..2fa49f3 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -45,6 +45,7 @@ public class Config
     public DiskAccessMode disk_access_mode = DiskAccessMode.auto;
 
     public DiskFailurePolicy disk_failure_policy = DiskFailurePolicy.ignore;
+    public CommitFailurePolicy commit_failure_policy = CommitFailurePolicy.stop;
 
     /* initial token in the ring */
     public String initial_token;
@@ -230,6 +231,13 @@ public class Config
         ignore,
     }
 
+    public static enum CommitFailurePolicy
+    {
+        stop,
+        stop_commit,
+        ignore,
+    }
+
     public static enum RequestSchedulerId
     {
         keyspace

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9381b8d5/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index bd5db69..e1a95ab 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -194,6 +194,7 @@ public class DatabaseDescriptor
         }
 
         logger.info("disk_failure_policy is " + conf.disk_failure_policy);
+        logger.info("commit_failure_policy is " + conf.commit_failure_policy);
 
         /* Authentication and authorization backend, implementing IAuthenticator and IAuthorizer
*/
         if (conf.authenticator != null)
@@ -1082,6 +1083,16 @@ public class DatabaseDescriptor
         return conf.disk_failure_policy;
     }
 
+    public static void setCommitFailurePolicy(Config.CommitFailurePolicy policy)
+    {
+        conf.commit_failure_policy = policy;
+    }
+
+    public static Config.CommitFailurePolicy getCommitFailurePolicy()
+    {
+        return conf.commit_failure_policy;
+    }
+
     public static boolean isSnapshotBeforeCompaction()
     {
         return conf.snapshot_before_compaction;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9381b8d5/src/java/org/apache/cassandra/db/commitlog/BatchCommitLogExecutorService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/BatchCommitLogExecutorService.java
b/src/java/org/apache/cassandra/db/commitlog/BatchCommitLogExecutorService.java
index d985f1f..9c2e2ac 100644
--- a/src/java/org/apache/cassandra/db/commitlog/BatchCommitLogExecutorService.java
+++ b/src/java/org/apache/cassandra/db/commitlog/BatchCommitLogExecutorService.java
@@ -20,12 +20,17 @@ package org.apache.cassandra.db.commitlog;
 import java.util.ArrayList;
 import java.util.concurrent.*;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.io.FSError;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.WrappedRunnable;
 
 class BatchCommitLogExecutorService extends AbstractCommitLogExecutorService
 {
+
     private final BlockingQueue<CheaterFutureTask> queue;
     private final Thread appendingThread;
     private volatile boolean run = true;
@@ -44,8 +49,16 @@ class BatchCommitLogExecutorService extends AbstractCommitLogExecutorService
             {
                 while (run)
                 {
-                    if (processWithSyncBatch())
-                        completedTaskCount++;
+                    try
+                    {
+                        if (processWithSyncBatch())
+                            completedTaskCount++;
+                    }
+                    catch (Throwable t)
+                    {
+                        if (!CommitLog.handleCommitError("Failed to persist commits to disk",
t))
+                            return;
+                    }
                 }
             }
         };

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9381b8d5/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
index e9507da..4bab83f 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
@@ -21,10 +21,13 @@ import java.io.*;
 import java.lang.management.ManagementFactory;
 import java.util.*;
 import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
 import javax.management.MBeanServer;
 import javax.management.ObjectName;
 
+import com.google.common.util.concurrent.Uninterruptibles;
 import org.apache.commons.lang3.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -32,9 +35,11 @@ import org.slf4j.LoggerFactory;
 import org.apache.cassandra.config.Config;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.*;
+import org.apache.cassandra.io.FSError;
 import org.apache.cassandra.io.FSWriteError;
 import org.apache.cassandra.metrics.CommitLogMetrics;
 import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.FBUtilities;
 
 /*
@@ -363,4 +368,23 @@ public class CommitLog implements CommitLogMBean
             return null;
         }
     }
+
+    static boolean handleCommitError(String message, Throwable t)
+    {
+        switch (DatabaseDescriptor.getCommitFailurePolicy())
+        {
+            case stop:
+                StorageService.instance.stopTransports();
+            case stop_commit:
+                logger.error(String.format("%s. Commit disk failure policy is %s; terminating
thread", message, DatabaseDescriptor.getCommitFailurePolicy()), t);
+                return false;
+            case ignore:
+                logger.error(message, t);
+                Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
+                return true;
+            default:
+                throw new AssertionError(DatabaseDescriptor.getCommitFailurePolicy());
+        }
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9381b8d5/src/java/org/apache/cassandra/db/commitlog/CommitLogAllocator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogAllocator.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogAllocator.java
index 706cf9e..575e3c3 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogAllocator.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogAllocator.java
@@ -36,6 +36,7 @@ import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.io.FSError;
 import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.service.StorageService;
@@ -88,22 +89,32 @@ public class CommitLogAllocator
             {
                 while (run)
                 {
-                    Runnable r = queue.poll(TICK_CYCLE_TIME, TimeUnit.MILLISECONDS);
-
-                    if (r != null)
-                    {
-                        r.run();
-                    }
-                    else
+                    try
                     {
-                        // no job, so we're clear to check to see if we're out of segments
-                        // and ready a new one if needed. has the effect of ensuring there's
-                        // almost always a segment available when it's needed.
-                        if (availableSegments.isEmpty() && (activeSegments.isEmpty()
|| createReserveSegments))
+
+                        Runnable r = queue.poll(TICK_CYCLE_TIME, TimeUnit.MILLISECONDS);
+
+                        if (r != null)
                         {
-                            logger.debug("No segments in reserve; creating a fresh one");
-                            createFreshSegment();
+                            r.run();
                         }
+                        else
+                        {
+                            // no job, so we're clear to check to see if we're out of segments
+                            // and ready a new one if needed. has the effect of ensuring
there's
+                            // almost always a segment available when it's needed.
+                            if (availableSegments.isEmpty() && (activeSegments.isEmpty()
|| createReserveSegments))
+                            {
+                                logger.debug("No segments in reserve; creating a fresh one");
+                                createFreshSegment();
+                            }
+                        }
+
+                    }
+                    catch (Throwable t)
+                    {
+                        if (!CommitLog.handleCommitError("Failed to allocate new commit log
segments", t))
+                            return;
                     }
                 }
             }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9381b8d5/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogExecutorService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogExecutorService.java
b/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogExecutorService.java
index 30f33b6..00507c2 100644
--- a/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogExecutorService.java
+++ b/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogExecutorService.java
@@ -25,9 +25,12 @@ import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.WrappedRunnable;
 
 import com.google.common.util.concurrent.Uninterruptibles;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 class PeriodicCommitLogExecutorService implements ICommitLogExecutorService
 {
+
     private final BlockingQueue<Runnable> queue;
     protected volatile long completedTaskCount = 0;
     private final Thread appendingThread;
@@ -69,8 +72,27 @@ class PeriodicCommitLogExecutorService implements ICommitLogExecutorService
             {
                 while (run)
                 {
-                    FBUtilities.waitOnFuture(submit(syncer));
-                    Uninterruptibles.sleepUninterruptibly(DatabaseDescriptor.getCommitLogSyncPeriod(),
TimeUnit.MILLISECONDS);
+                    try
+                    {
+                        FBUtilities.waitOnFuture(submit(syncer));
+                        Uninterruptibles.sleepUninterruptibly(DatabaseDescriptor.getCommitLogSyncPeriod(),
TimeUnit.MILLISECONDS);
+                    }
+                    catch (Throwable t)
+                    {
+                        if (!CommitLog.handleCommitError("Failed to persist commits to disk",
t))
+                        {
+                            PeriodicCommitLogExecutorService.this.run = false;
+                            try
+                            {
+                                appendingThread.join();
+                            }
+                            catch (InterruptedException e)
+                            {
+                                throw new IllegalStateException();
+                            }
+                            return;
+                        }
+                    }
                 }
             }
         }, "PERIODIC-COMMIT-LOG-SYNCER").start();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9381b8d5/src/java/org/apache/cassandra/io/util/FileUtils.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/FileUtils.java b/src/java/org/apache/cassandra/io/util/FileUtils.java
index 0d8538e..e091465 100644
--- a/src/java/org/apache/cassandra/io/util/FileUtils.java
+++ b/src/java/org/apache/cassandra/io/util/FileUtils.java
@@ -31,6 +31,7 @@ import java.util.Arrays;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.cassandra.config.Config;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.BlacklistedDirectories;
 import org.apache.cassandra.db.Keyspace;
@@ -412,23 +413,7 @@ public class FileUtils
         switch (DatabaseDescriptor.getDiskFailurePolicy())
         {
             case stop:
-                if (StorageService.instance.isInitialized())
-                {
-                    logger.error("Stopping gossiper");
-                    StorageService.instance.stopGossiping();
-                }
-
-                if (StorageService.instance.isRPCServerRunning())
-                {
-                    logger.error("Stopping RPC server");
-                    StorageService.instance.stopRPCServer();
-                }
-
-                if (StorageService.instance.isNativeTransportRunning())
-                {
-                    logger.error("Stopping native transport");
-                    StorageService.instance.stopNativeTransport();
-                }
+                StorageService.instance.stopTransports();
                 break;
             case best_effort:
                 // for both read and write errors mark the path as unwritable.
@@ -447,4 +432,5 @@ public class FileUtils
                 throw new IllegalStateException();
         }
     }
+
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9381b8d5/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index e181c44..09b93d7 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -357,6 +357,25 @@ public class StorageService extends NotificationBroadcasterSupport implements
IE
         return daemon.nativeServer.isRunning();
     }
 
+    public void stopTransports()
+    {
+        if (isInitialized())
+        {
+            logger.error("Stopping gossiper");
+            stopGossiping();
+        }
+        if (isRPCServerRunning())
+        {
+            logger.error("Stopping RPC server");
+            stopRPCServer();
+        }
+        if (isNativeTransportRunning())
+        {
+            logger.error("Stopping native transport");
+            stopNativeTransport();
+        }
+    }
+
     private void shutdownClientServers()
     {
         stopRPCServer();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9381b8d5/test/unit/org/apache/cassandra/db/CommitLogTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/CommitLogTest.java b/test/unit/org/apache/cassandra/db/CommitLogTest.java
index 8e5f418..036ce15 100644
--- a/test/unit/org/apache/cassandra/db/CommitLogTest.java
+++ b/test/unit/org/apache/cassandra/db/CommitLogTest.java
@@ -22,17 +22,21 @@ package org.apache.cassandra.db;
 import java.io.*;
 import java.nio.ByteBuffer;
 import java.util.UUID;
+import java.util.concurrent.TimeUnit;
 import java.util.zip.CRC32;
 import java.util.zip.Checksum;
 
+import com.google.common.util.concurrent.Uninterruptibles;
 import org.junit.Assert;
 import org.junit.Test;
 
 import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.config.Config;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.commitlog.CommitLog;
 import org.apache.cassandra.db.commitlog.CommitLogDescriptor;
 import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.service.StorageService;
 
 import static org.apache.cassandra.utils.ByteBufferUtil.bytes;
 
@@ -225,4 +229,32 @@ public class CommitLogTest extends SchemaLoader
         String newCLName = "CommitLog-" + CommitLogDescriptor.current_version + "-1340512736956320000.log";
         Assert.assertEquals(MessagingService.current_version, CommitLogDescriptor.fromFileName(newCLName).getMessagingVersion());
     }
+
+    @Test
+    public void testCommitFailurePolicy_stop()
+    {
+        File commitDir = new File(DatabaseDescriptor.getCommitLogLocation());
+
+        try
+        {
+
+            DatabaseDescriptor.setCommitFailurePolicy(Config.CommitFailurePolicy.stop);
+            commitDir.setWritable(false);
+            RowMutation rm = new RowMutation("Keyspace1", bytes("k"));
+            rm.add("Standard1", bytes("c1"), ByteBuffer.allocate(100), 0);
+
+            // Adding it twice (won't change segment)
+            CommitLog.instance.add(rm);
+            Uninterruptibles.sleepUninterruptibly((int) DatabaseDescriptor.getCommitLogSyncBatchWindow(),
TimeUnit.MILLISECONDS);
+            Assert.assertFalse(StorageService.instance.isRPCServerRunning());
+            Assert.assertFalse(StorageService.instance.isNativeTransportRunning());
+            Assert.assertFalse(StorageService.instance.isInitialized());
+
+        }
+        finally
+        {
+            commitDir.setWritable(true);
+        }
+    }
+
 }


Mime
View raw message