cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jmcken...@apache.org
Subject cassandra git commit: Fix CommitLogStressTest
Date Fri, 24 Jun 2016 16:58:45 GMT
Repository: cassandra
Updated Branches:
  refs/heads/trunk 33f2f844b -> a13add64f


Fix CommitLogStressTest

Patch by jmckenzie; reviewed by blambov for 12082


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/a13add64
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/a13add64
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/a13add64

Branch: refs/heads/trunk
Commit: a13add64fe586ba16041db71f0a200a52da924be
Parents: 33f2f84
Author: Josh McKenzie <jmckenzie@apache.org>
Authored: Thu Jun 23 12:33:13 2016 -0400
Committer: Josh McKenzie <jmckenzie@apache.org>
Committed: Fri Jun 24 12:58:08 2016 -0400

----------------------------------------------------------------------
 .../apache/cassandra/config/DatabaseDescriptor.java |  2 +-
 .../commitlog/AbstractCommitLogSegmentManager.java  | 14 ++++++--------
 .../apache/cassandra/db/commitlog/CommitLog.java    |  1 +
 .../cassandra/db/commitlog/CommitLogStressTest.java | 16 ++++++++++------
 4 files changed, 18 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/a13add64/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index e17a2bc..1375a39 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -1417,7 +1417,7 @@ public class DatabaseDescriptor
     * (one segment in compression, one written to, one in reserve); delays in compression
may cause the log to use
     * more, depending on how soon the sync policy stops all writing threads.
     */
-    public static int getCommitLogMaxCompressionBuffersPerPool()
+    public static int getCommitLogMaxCompressionBuffersInPool()
     {
         return conf.commitlog_max_compression_buffers_in_pool;
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a13add64/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogSegmentManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogSegmentManager.java
b/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogSegmentManager.java
index b8f0a4e..7ea7439 100644
--- a/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogSegmentManager.java
+++ b/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogSegmentManager.java
@@ -75,15 +75,12 @@ public abstract class AbstractCommitLogSegmentManager
      */
     volatile boolean createReserveSegments = false;
 
-    // Used by tests to determine if segment manager is active or not.
-    volatile boolean processingTask = false;
-
     private Thread managerThread;
     protected volatile boolean run = true;
     protected final CommitLog commitLog;
 
     private static final SimpleCachedBufferPool bufferPool =
-        new SimpleCachedBufferPool(DatabaseDescriptor.getCommitLogMaxCompressionBuffersPerPool(),
DatabaseDescriptor.getCommitLogSegmentSize());
+        new SimpleCachedBufferPool(DatabaseDescriptor.getCommitLogMaxCompressionBuffersInPool(),
DatabaseDescriptor.getCommitLogSegmentSize());
 
     AbstractCommitLogSegmentManager(final CommitLog commitLog, String storageDirectory)
     {
@@ -103,7 +100,6 @@ public abstract class AbstractCommitLogSegmentManager
                     try
                     {
                         Runnable task = segmentManagementTasks.poll();
-                        processingTask = true;
                         if (task == null)
                         {
                             // if we have no more work to do, check if we should create a
new segment
@@ -139,7 +135,6 @@ public abstract class AbstractCommitLogSegmentManager
                             // queue rather than looping, grabbing another null, and repeating
the above work.
                             try
                             {
-                                processingTask = false;
                                 task = segmentManagementTasks.take();
                             }
                             catch (InterruptedException e)
@@ -148,7 +143,6 @@ public abstract class AbstractCommitLogSegmentManager
                             }
                         }
                         task.run();
-                        processingTask = false;
                     }
                     catch (Throwable t)
                     {
@@ -507,8 +501,12 @@ public abstract class AbstractCommitLogSegmentManager
     // Used by tests only.
     void awaitManagementTasksCompletion()
     {
-        while (segmentManagementTasks.size() > 0 || processingTask)
+        while (!segmentManagementTasks.isEmpty())
             Thread.yield();
+        // The last management task is not yet complete. Wait a while for it.
+        Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
+        // TODO: If this functionality is required by anything other than tests, signalling
must be used to ensure
+        // waiting completes correctly.
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a13add64/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
index b1f48b2..b66221c 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
@@ -109,6 +109,7 @@ public class CommitLog implements CommitLogMBean
         segmentManager = DatabaseDescriptor.isCDCEnabled()
                          ? new CommitLogSegmentManagerCDC(this, DatabaseDescriptor.getCommitLogLocation())
                          : new CommitLogSegmentManagerStandard(this, DatabaseDescriptor.getCommitLogLocation());
+
         // register metrics
         metrics.attach(executor, segmentManager);
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a13add64/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java
----------------------------------------------------------------------
diff --git a/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java b/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java
index 04682fd..06c252f 100644
--- a/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java
+++ b/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java
@@ -25,6 +25,7 @@ import java.io.*;
 import java.nio.ByteBuffer;
 import java.util.*;
 import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
 import com.google.common.util.concurrent.RateLimiter;
@@ -33,7 +34,6 @@ import org.junit.*;
 import org.apache.cassandra.*;
 import org.apache.cassandra.config.Config.CommitLogSync;
 import org.apache.cassandra.config.*;
-import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.db.Mutation;
 import org.apache.cassandra.db.marshal.UTF8Type;
 import org.apache.cassandra.db.partitions.PartitionUpdate;
@@ -135,6 +135,7 @@ public class CommitLogStressTest
     @Before
     public void cleanDir() throws IOException
     {
+        CommitLog.instance.stopUnsafe(true);
         File dir = new File(location);
         if (dir.isDirectory())
         {
@@ -208,6 +209,8 @@ public class CommitLogStressTest
             {
                 DatabaseDescriptor.setCommitLogSync(sync);
                 CommitLog commitLog = new CommitLog(CommitLogArchiver.disabled()).start();
+                // Need to enable reserve segment creation as close to test start as possible
to minimize race
+                commitLog.segmentManager.enableReserveSegmentCreation();
                 testLog(commitLog);
                 assert !failed;
             }
@@ -226,8 +229,7 @@ public class CommitLogStressTest
                            commitLog.executor.getClass().getSimpleName(),
                            randomSize ? " random size" : "",
                            discardedRun ? " with discarded run" : "");
-        CommitLog.instance.segmentManager.enableReserveSegmentCreation();
-        
+
         final List<CommitlogThread> threads = new ArrayList<>();
         ScheduledExecutorService scheduled = startThreads(commitLog, threads);
 
@@ -310,8 +312,9 @@ public class CommitLogStressTest
         // (which shouldn't write anything) to make sure the first we triggered completes.
         // FIXME: The executor should give us a chance to await completion of the sync we
requested.
         commitLog.executor.requestExtraSync().awaitUninterruptibly();
+
         // Wait for any pending deletes or segment allocations to complete.
-        CommitLog.instance.segmentManager.awaitManagementTasksCompletion();
+        commitLog.segmentManager.awaitManagementTasksCompletion();
 
         long combinedSize = 0;
         for (File f : new File(DatabaseDescriptor.getCommitLogLocation()).listFiles())
@@ -320,7 +323,7 @@ public class CommitLogStressTest
 
         List<String> logFileNames = commitLog.getActiveSegmentNames();
         Map<String, Double> ratios = commitLog.getActiveSegmentCompressionRatios();
-        Collection<CommitLogSegment> segments = CommitLog.instance.segmentManager.getActiveSegments();
+        Collection<CommitLogSegment> segments = commitLog.segmentManager.getActiveSegments();
 
         for (CommitLogSegment segment : segments)
         {
@@ -411,6 +414,7 @@ public class CommitLogStressTest
         int dataSize = 0;
         final CommitLog commitLog;
         final Random random;
+        final AtomicInteger threadID = new AtomicInteger(0);
 
         volatile CommitLogPosition clsp;
 
@@ -422,6 +426,7 @@ public class CommitLogStressTest
 
         public void run()
         {
+            Thread.currentThread().setName("CommitLogThread-" + threadID.getAndIncrement());
             RateLimiter rl = rateLimit != 0 ? RateLimiter.create(rateLimit) : null;
             final Random rand = random != null ? random : ThreadLocalRandom.current();
             while (!stop)
@@ -441,7 +446,6 @@ public class CommitLogStressTest
                     dataSize += sz;
                 }
 
-                Keyspace ks = Keyspace.open("Keyspace1");
                 clsp = commitLog.add(new Mutation(builder.build()));
                 counter.incrementAndGet();
             }


Mime
View raw message