Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 1B42B200AE4 for ; Fri, 24 Jun 2016 18:58:47 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 19D34160A58; Fri, 24 Jun 2016 16:58:47 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 3D11B160A2E for ; Fri, 24 Jun 2016 18:58:46 +0200 (CEST) Received: (qmail 83275 invoked by uid 500); 24 Jun 2016 16:58:45 -0000 Mailing-List: contact commits-help@cassandra.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@cassandra.apache.org Delivered-To: mailing list commits@cassandra.apache.org Received: (qmail 83264 invoked by uid 99); 24 Jun 2016 16:58:45 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 24 Jun 2016 16:58:45 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 13712E049D; Fri, 24 Jun 2016 16:58:45 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: jmckenzie@apache.org To: commits@cassandra.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: cassandra git commit: Fix CommitLogStressTest Date: Fri, 24 Jun 2016 16:58:45 +0000 (UTC) archived-at: Fri, 24 Jun 2016 16:58:47 -0000 Repository: cassandra Updated Branches: refs/heads/trunk 33f2f844b -> a13add64f Fix CommitLogStressTest Patch by jmckenzie; reviewed by blambov for 12082 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/a13add64 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/a13add64 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/a13add64 Branch: refs/heads/trunk Commit: a13add64fe586ba16041db71f0a200a52da924be Parents: 33f2f84 Author: Josh McKenzie Authored: Thu Jun 23 12:33:13 2016 -0400 Committer: Josh McKenzie Committed: Fri Jun 24 12:58:08 2016 -0400 ---------------------------------------------------------------------- .../apache/cassandra/config/DatabaseDescriptor.java | 2 +- .../commitlog/AbstractCommitLogSegmentManager.java | 14 ++++++-------- .../apache/cassandra/db/commitlog/CommitLog.java | 1 + .../cassandra/db/commitlog/CommitLogStressTest.java | 16 ++++++++++------ 4 files changed, 18 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/a13add64/src/java/org/apache/cassandra/config/DatabaseDescriptor.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java index e17a2bc..1375a39 100644 --- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java +++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java @@ -1417,7 +1417,7 @@ public class DatabaseDescriptor * (one segment in compression, one written to, one in reserve); delays in compression may cause the log to use * more, depending on how soon the sync policy stops all writing threads. */ - public static int getCommitLogMaxCompressionBuffersPerPool() + public static int getCommitLogMaxCompressionBuffersInPool() { return conf.commitlog_max_compression_buffers_in_pool; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/a13add64/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogSegmentManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogSegmentManager.java b/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogSegmentManager.java index b8f0a4e..7ea7439 100644 --- a/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogSegmentManager.java +++ b/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogSegmentManager.java @@ -75,15 +75,12 @@ public abstract class AbstractCommitLogSegmentManager */ volatile boolean createReserveSegments = false; - // Used by tests to determine if segment manager is active or not. - volatile boolean processingTask = false; - private Thread managerThread; protected volatile boolean run = true; protected final CommitLog commitLog; private static final SimpleCachedBufferPool bufferPool = - new SimpleCachedBufferPool(DatabaseDescriptor.getCommitLogMaxCompressionBuffersPerPool(), DatabaseDescriptor.getCommitLogSegmentSize()); + new SimpleCachedBufferPool(DatabaseDescriptor.getCommitLogMaxCompressionBuffersInPool(), DatabaseDescriptor.getCommitLogSegmentSize()); AbstractCommitLogSegmentManager(final CommitLog commitLog, String storageDirectory) { @@ -103,7 +100,6 @@ public abstract class AbstractCommitLogSegmentManager try { Runnable task = segmentManagementTasks.poll(); - processingTask = true; if (task == null) { // if we have no more work to do, check if we should create a new segment @@ -139,7 +135,6 @@ public abstract class AbstractCommitLogSegmentManager // queue rather than looping, grabbing another null, and repeating the above work. try { - processingTask = false; task = segmentManagementTasks.take(); } catch (InterruptedException e) @@ -148,7 +143,6 @@ public abstract class AbstractCommitLogSegmentManager } } task.run(); - processingTask = false; } catch (Throwable t) { @@ -507,8 +501,12 @@ public abstract class AbstractCommitLogSegmentManager // Used by tests only. void awaitManagementTasksCompletion() { - while (segmentManagementTasks.size() > 0 || processingTask) + while (!segmentManagementTasks.isEmpty()) Thread.yield(); + // The last management task is not yet complete. Wait a while for it. + Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS); + // TODO: If this functionality is required by anything other than tests, signalling must be used to ensure + // waiting completes correctly. } /** http://git-wip-us.apache.org/repos/asf/cassandra/blob/a13add64/src/java/org/apache/cassandra/db/commitlog/CommitLog.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java index b1f48b2..b66221c 100644 --- a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java +++ b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java @@ -109,6 +109,7 @@ public class CommitLog implements CommitLogMBean segmentManager = DatabaseDescriptor.isCDCEnabled() ? new CommitLogSegmentManagerCDC(this, DatabaseDescriptor.getCommitLogLocation()) : new CommitLogSegmentManagerStandard(this, DatabaseDescriptor.getCommitLogLocation()); + // register metrics metrics.attach(executor, segmentManager); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/a13add64/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java ---------------------------------------------------------------------- diff --git a/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java b/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java index 04682fd..06c252f 100644 --- a/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java +++ b/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java @@ -25,6 +25,7 @@ import java.io.*; import java.nio.ByteBuffer; import java.util.*; import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import com.google.common.util.concurrent.RateLimiter; @@ -33,7 +34,6 @@ import org.junit.*; import org.apache.cassandra.*; import org.apache.cassandra.config.Config.CommitLogSync; import org.apache.cassandra.config.*; -import org.apache.cassandra.db.Keyspace; import org.apache.cassandra.db.Mutation; import org.apache.cassandra.db.marshal.UTF8Type; import org.apache.cassandra.db.partitions.PartitionUpdate; @@ -135,6 +135,7 @@ public class CommitLogStressTest @Before public void cleanDir() throws IOException { + CommitLog.instance.stopUnsafe(true); File dir = new File(location); if (dir.isDirectory()) { @@ -208,6 +209,8 @@ public class CommitLogStressTest { DatabaseDescriptor.setCommitLogSync(sync); CommitLog commitLog = new CommitLog(CommitLogArchiver.disabled()).start(); + // Need to enable reserve segment creation as close to test start as possible to minimize race + commitLog.segmentManager.enableReserveSegmentCreation(); testLog(commitLog); assert !failed; } @@ -226,8 +229,7 @@ public class CommitLogStressTest commitLog.executor.getClass().getSimpleName(), randomSize ? " random size" : "", discardedRun ? " with discarded run" : ""); - CommitLog.instance.segmentManager.enableReserveSegmentCreation(); - + final List threads = new ArrayList<>(); ScheduledExecutorService scheduled = startThreads(commitLog, threads); @@ -310,8 +312,9 @@ public class CommitLogStressTest // (which shouldn't write anything) to make sure the first we triggered completes. // FIXME: The executor should give us a chance to await completion of the sync we requested. commitLog.executor.requestExtraSync().awaitUninterruptibly(); + // Wait for any pending deletes or segment allocations to complete. - CommitLog.instance.segmentManager.awaitManagementTasksCompletion(); + commitLog.segmentManager.awaitManagementTasksCompletion(); long combinedSize = 0; for (File f : new File(DatabaseDescriptor.getCommitLogLocation()).listFiles()) @@ -320,7 +323,7 @@ public class CommitLogStressTest List logFileNames = commitLog.getActiveSegmentNames(); Map ratios = commitLog.getActiveSegmentCompressionRatios(); - Collection segments = CommitLog.instance.segmentManager.getActiveSegments(); + Collection segments = commitLog.segmentManager.getActiveSegments(); for (CommitLogSegment segment : segments) { @@ -411,6 +414,7 @@ public class CommitLogStressTest int dataSize = 0; final CommitLog commitLog; final Random random; + final AtomicInteger threadID = new AtomicInteger(0); volatile CommitLogPosition clsp; @@ -422,6 +426,7 @@ public class CommitLogStressTest public void run() { + Thread.currentThread().setName("CommitLogThread-" + threadID.getAndIncrement()); RateLimiter rl = rateLimit != 0 ? RateLimiter.create(rateLimit) : null; final Random rand = random != null ? random : ThreadLocalRandom.current(); while (!stop) @@ -441,7 +446,6 @@ public class CommitLogStressTest dataSize += sz; } - Keyspace ks = Keyspace.open("Keyspace1"); clsp = commitLog.add(new Mutation(builder.build())); counter.incrementAndGet(); }