cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jasobr...@apache.org
Subject [4/6] cassandra git commit: Merge branch 'cassandra-3.0' into cassandra-3.11
Date Thu, 14 Dec 2017 03:54:32 GMT
Merge branch 'cassandra-3.0' into cassandra-3.11


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/c000827a
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/c000827a
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/c000827a

Branch: refs/heads/trunk
Commit: c000827afd48b2dc9901c530d5b4118107114c8d
Parents: e646e50 db788fe
Author: Jason Brown <jasedbrown@gmail.com>
Authored: Wed Dec 13 19:51:53 2017 -0800
Committer: Jason Brown <jasedbrown@gmail.com>
Committed: Wed Dec 13 19:52:31 2017 -0800

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 conf/cassandra.yaml                             |   8 -
 .../org/apache/cassandra/config/Config.java     |   1 -
 .../cassandra/config/DatabaseDescriptor.java    |  10 -
 .../db/commitlog/AbstractCommitLogService.java  | 215 +++++++++++--------
 .../db/commitlog/PeriodicCommitLogService.java  |   3 +-
 .../commitlog/AbstractCommitLogServiceTest.java | 166 ++++++++++++++
 .../commitlog/CommitLogChainedMarkersTest.java  |   1 -
 .../CommitLogSegmentBackpressureTest.java       |   8 +-
 9 files changed, 295 insertions(+), 118 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/c000827a/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 60794f0,ee90a67..aa71620
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,15 -1,5 +1,16 @@@
 -3.0.16
 +3.11.2
 + * Prevent continuous schema exchange between 3.0 and 3.11 nodes (CASSANDRA-14109)
 + * Fix imbalanced disks when replacing node with same address with JBOD (CASSANDRA-14084)
 + * Reload compaction strategies when disk boundaries are invalidated (CASSANDRA-13948)
 + * Remove OpenJDK log warning (CASSANDRA-13916)
 + * Prevent compaction strategies from looping indefinitely (CASSANDRA-14079)
 + * Cache disk boundaries (CASSANDRA-13215)
 + * Add asm jar to build.xml for maven builds (CASSANDRA-11193)
 + * Round buffer size to powers of 2 for the chunk cache (CASSANDRA-13897)
 + * Update jackson JSON jars (CASSANDRA-13949)
 + * Avoid locks when checking LCS fanout and if we should defrag (CASSANDRA-13930)
 +Merged from 3.0:
+  * Improve commit log chain marker updating (CASSANDRA-14108)
   * Extra range tombstone bound creates double rows (CASSANDRA-14008)
   * Fix SStable ordering by max timestamp in SinglePartitionReadCommand (CASSANDRA-14010)
   * Accept role names containing forward-slash (CASSANDRA-14088)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c000827a/conf/cassandra.yaml
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c000827a/src/java/org/apache/cassandra/config/Config.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/config/Config.java
index 5fe752e,64d41bb..a01203c
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@@ -198,9 -190,8 +198,8 @@@ public class Confi
      public String commitlog_directory;
      public Integer commitlog_total_space_in_mb;
      public CommitLogSync commitlog_sync;
 -    public Double commitlog_sync_batch_window_in_ms;
 -    public Integer commitlog_sync_period_in_ms;
 +    public double commitlog_sync_batch_window_in_ms = Double.NaN;
 +    public int commitlog_sync_period_in_ms;
-     public int commitlog_marker_period_in_ms = 0;
      public int commitlog_segment_size_in_mb = 32;
      public ParameterizedClass commitlog_compression;
      public int commitlog_max_compression_buffers_in_pool = 3;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c000827a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c000827a/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
index 0410650,829530d..4571b54
--- a/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
+++ b/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
@@@ -17,21 -17,17 +17,21 @@@
   */
  package org.apache.cassandra.db.commitlog;
  
 +import java.util.concurrent.TimeUnit;
 +import java.util.concurrent.atomic.AtomicLong;
 +import java.util.concurrent.locks.LockSupport;
 +
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import com.codahale.metrics.Timer.Context;
 +
  import org.apache.cassandra.concurrent.NamedThreadFactory;
  import org.apache.cassandra.config.Config;
- import org.apache.cassandra.config.DatabaseDescriptor;
  import org.apache.cassandra.db.commitlog.CommitLogSegment.Allocation;
+ import org.apache.cassandra.utils.Clock;
  import org.apache.cassandra.utils.NoSpamLogger;
  import org.apache.cassandra.utils.concurrent.WaitQueue;
 -import org.slf4j.*;
 -
 -import java.util.concurrent.Semaphore;
 -import java.util.concurrent.TimeUnit;
 -import java.util.concurrent.atomic.AtomicLong;
  
  public abstract class AbstractCommitLogService
  {
@@@ -54,13 -57,13 +60,13 @@@
      /**
       * The duration between syncs to disk.
       */
-     private final long syncIntervalNanos;
 -    final long syncIntervalMillis;
++    final long syncIntervalNanos;
  
      /**
       * The duration between updating the chained markers in the the commit log file. This
value should be
 -     * 0 < {@link #markerIntervalMillis} <= {@link #syncIntervalMillis}.
 +     * 0 < {@link #markerIntervalNanos} <= {@link #syncIntervalNanos}.
       */
-     private final long markerIntervalNanos;
 -    final long markerIntervalMillis;
++    final long markerIntervalNanos;
  
      /**
       * A flag that callers outside of the sync thread can use to signal they want the commitlog
segments
@@@ -92,121 -97,156 +100,142 @@@
      {
          this.commitLog = commitLog;
          this.name = name;
-         this.syncIntervalNanos = TimeUnit.NANOSECONDS.convert(syncIntervalMillis, TimeUnit.MILLISECONDS);
  
-         // if we are not using periodic mode, or we using compression/encryption, we shouldn't
update the chained markers
-         // faster than the sync interval
-         if (DatabaseDescriptor.getCommitLogSync() != Config.CommitLogSync.periodic || commitLog.configuration.useCompression()
|| commitLog.configuration.useEncryption())
-             markerIntervalMillis = syncIntervalMillis;
++        final long markerIntervalMillis;
+         if (markHeadersFaster && syncIntervalMillis > DEFAULT_MARKER_INTERVAL_MILLIS)
+         {
+             markerIntervalMillis = DEFAULT_MARKER_INTERVAL_MILLIS;
+             long modulo = syncIntervalMillis % markerIntervalMillis;
+             if (modulo != 0)
+             {
+                 // quantize syncIntervalMillis to a multiple of markerIntervalMillis
+                 syncIntervalMillis -= modulo;
  
-         // apply basic bounds checking on the marker interval
-         if (markerIntervalMillis <= 0 || markerIntervalMillis > syncIntervalMillis)
+                 if (modulo >= markerIntervalMillis / 2)
+                     syncIntervalMillis += markerIntervalMillis;
+             }
+             logger.debug("Will update the commitlog markers every {}ms and flush every {}ms",
markerIntervalMillis, syncIntervalMillis);
+         }
+         else
          {
-             logger.debug("commit log marker interval {} is less than zero or above the sync
interval {}; setting value to sync interval",
-                         markerIntervalMillis, syncIntervalMillis);
              markerIntervalMillis = syncIntervalMillis;
          }
--
+         assert syncIntervalMillis % markerIntervalMillis == 0;
 -        this.syncIntervalMillis = syncIntervalMillis;
 +        this.markerIntervalNanos = TimeUnit.NANOSECONDS.convert(markerIntervalMillis, TimeUnit.MILLISECONDS);
++        this.syncIntervalNanos = TimeUnit.NANOSECONDS.convert(syncIntervalMillis, TimeUnit.MILLISECONDS);
      }
  
      // Separated into individual method to ensure relevant objects are constructed before
this is started.
      void start()
      {
 -        if (syncIntervalMillis < 1)
 -            throw new IllegalArgumentException(String.format("Commit log flush interval
must be positive: %dms",
 -                                                             syncIntervalMillis));
 +        if (syncIntervalNanos < 1)
 +            throw new IllegalArgumentException(String.format("Commit log flush interval
must be positive: %fms",
 +                                                             syncIntervalNanos * 1e-6));
+         shutdown = false;
+         Runnable runnable = new SyncRunnable(new Clock());
 -        thread = new Thread(NamedThreadFactory.threadLocalDeallocator(runnable), name);
++        thread = NamedThreadFactory.createThread(runnable, name);
+         thread.start();
+     }
  
-         Runnable runnable = new Runnable()
+     class SyncRunnable implements Runnable
+     {
 -        final Clock clock;
 -        long firstLagAt = 0;
 -        long totalSyncDuration = 0; // total time spent syncing since firstLagAt
 -        long syncExceededIntervalBy = 0; // time that syncs exceeded pollInterval since
firstLagAt
 -        int lagCount = 0;
 -        int syncCount = 0;
++        private final Clock clock;
++        private long firstLagAt = 0;
++        private long totalSyncDuration = 0; // total time spent syncing since firstLagAt
++        private long syncExceededIntervalBy = 0; // time that syncs exceeded pollInterval
since firstLagAt
++        private int lagCount = 0;
++        private int syncCount = 0;
+ 
+         SyncRunnable(Clock clock)
          {
-             public void run()
+             this.clock = clock;
+         }
+ 
+         public void run()
+         {
+             while (true)
              {
-                 long firstLagAt = 0;
-                 long totalSyncDuration = 0; // total time spent syncing since firstLagAt
-                 long syncExceededIntervalBy = 0; // time that syncs exceeded pollInterval
since firstLagAt
-                 int lagCount = 0;
-                 int syncCount = 0;
+                 if (!sync())
+                     break;
+             }
+         }
+ 
+         boolean sync()
+         {
++            // always run once after shutdown signalled
++            boolean shutdownRequested = shutdown;
 +
-                 while (true)
+             try
+             {
 -                // always run once after shutdown signalled
 -                boolean run = !shutdown;
 -
+                 // sync and signal
 -                long pollStarted = clock.currentTimeMillis();
 -                if (lastSyncedAt + syncIntervalMillis <= pollStarted || shutdown || syncRequested)
++                long pollStarted = clock.nanoTime();
++                if (lastSyncedAt + syncIntervalNanos <= pollStarted || shutdownRequested
|| syncRequested)
+                 {
+                     // in this branch, we want to flush the commit log to disk
 -                    commitLog.sync(shutdown, true);
++                    commitLog.sync(true);
+                     syncRequested = false;
+                     lastSyncedAt = pollStarted;
+                     syncComplete.signalAll();
++                    syncCount++;
+                 }
+                 else
                  {
-                     // always run once after shutdown signalled
-                     boolean shutdownRequested = shutdown;
+                     // in this branch, just update the commit log sync headers
 -                    commitLog.sync(false, false);
++                    commitLog.sync(false);
+                 }
  
-                     try
+                 // sleep any time we have left before the next one is due
 -                long now = clock.currentTimeMillis();
 -                long sleep = pollStarted + markerIntervalMillis - now;
 -                if (sleep < 0)
++                long now = clock.nanoTime();
++                long wakeUpAt = pollStarted + markerIntervalNanos;
++                if (wakeUpAt < now)
+                 {
+                     // if we have lagged noticeably, update our lag counter
+                     if (firstLagAt == 0)
                      {
-                         // sync and signal
-                         long pollStarted = System.nanoTime();
-                         if (lastSyncedAt + syncIntervalNanos <= pollStarted || shutdownRequested
|| syncRequested)
-                         {
-                             // in this branch, we want to flush the commit log to disk
-                             commitLog.sync(true);
-                             syncRequested = false;
-                             lastSyncedAt = pollStarted;
-                             syncComplete.signalAll();
-                         }
-                         else
-                         {
-                             // in this branch, just update the commit log sync headers
-                             commitLog.sync(false);
-                         }
- 
-                         // sleep any time we have left before the next one is due
-                         long now = System.nanoTime();
-                         long wakeUpAt = pollStarted + markerIntervalNanos;
-                         if (wakeUpAt < now)
-                         {
-                             // if we have lagged noticeably, update our lag counter
-                             if (firstLagAt == 0)
-                             {
-                                 firstLagAt = now;
-                                 totalSyncDuration = syncExceededIntervalBy = syncCount =
lagCount = 0;
-                             }
-                             syncExceededIntervalBy += now - wakeUpAt;
-                             lagCount++;
-                         }
-                         syncCount++;
-                         totalSyncDuration += now - pollStarted;
- 
-                         if (firstLagAt > 0)
-                         {
-                             //Only reset the lag tracking if it actually logged this time
-                             boolean logged = NoSpamLogger.log(logger,
-                                                               NoSpamLogger.Level.WARN,
-                                                               5,
-                                                               TimeUnit.MINUTES,
-                                                               "Out of {} commit log syncs
over the past {}s with average duration of {}ms, {} have exceeded the configured commit interval
by an average of {}ms",
-                                                               syncCount,
-                                                               String.format("%.2f", (now
- firstLagAt) * 1e-9d),
-                                                               String.format("%.2f", totalSyncDuration
* 1e-6d / syncCount),
-                                                               lagCount,
-                                                               String.format("%.2f", syncExceededIntervalBy
* 1e-6d / lagCount));
-                            if (logged)
-                                firstLagAt = 0;
-                         }
- 
-                         if (shutdownRequested)
-                             return;
- 
-                         if (wakeUpAt > now)
-                             LockSupport.parkNanos(wakeUpAt - now);
+                         firstLagAt = now;
+                         totalSyncDuration = syncExceededIntervalBy = syncCount = lagCount
= 0;
                      }
-                     catch (Throwable t)
-                     {
-                         if (!CommitLog.handleCommitError("Failed to persist commits to disk",
t))
-                             break;
 -                    syncExceededIntervalBy -= sleep;
++                    syncExceededIntervalBy += now - wakeUpAt;
+                     lagCount++;
+                 }
 -                syncCount++;
+                 totalSyncDuration += now - pollStarted;
  
-                         // sleep for full poll-interval after an error, so we don't spam
the log file
-                         LockSupport.parkNanos(markerIntervalNanos);
-                     }
+                 if (firstLagAt > 0)
+                 {
+                     //Only reset the lag tracking if it actually logged this time
 -                    boolean logged = NoSpamLogger.log(
 -                    logger,
 -                    NoSpamLogger.Level.WARN,
 -                    5,
 -                    TimeUnit.MINUTES,
 -                    "Out of {} commit log syncs over the past {}s with average duration
of {}ms, {} have exceeded the configured commit interval by an average of {}ms",
 -                    syncCount, (now - firstLagAt) / 1000, String.format("%.2f", (double)
totalSyncDuration / syncCount), lagCount, String.format("%.2f", (double) syncExceededIntervalBy
/ lagCount));
++                    boolean logged = NoSpamLogger.log(logger,
++                                                      NoSpamLogger.Level.WARN,
++                                                      5,
++                                                      TimeUnit.MINUTES,
++                                                      "Out of {} commit log syncs over the
past {}s with average duration of {}ms, {} have exceeded the configured commit interval by
an average of {}ms",
++                                                      syncCount,
++                                                      String.format("%.2f", (now - firstLagAt)
* 1e-9d),
++                                                      String.format("%.2f", totalSyncDuration
* 1e-6d / syncCount),
++                                                      lagCount,
++                                                      String.format("%.2f", syncExceededIntervalBy
* 1e-6d / lagCount));
+                     if (logged)
+                         firstLagAt = 0;
                  }
+ 
 -                if (!run)
++                if (shutdownRequested)
+                     return false;
+ 
 -                // if we have lagged this round, we probably have work to do already so
we don't sleep
 -                if (sleep < 0)
 -                    return true;
 -
 -                try
 -                {
 -                    haveWork.tryAcquire(sleep, TimeUnit.MILLISECONDS);
 -                    haveWork.drainPermits();
 -                }
 -                catch (InterruptedException e)
 -                {
 -                    throw new AssertionError();
 -                }
++                if (wakeUpAt > now)
++                    LockSupport.parkNanos(wakeUpAt - now);
              }
-         };
+             catch (Throwable t)
+             {
+                 if (!CommitLog.handleCommitError("Failed to persist commits to disk", t))
+                     return false;
  
-         shutdown = false;
-         thread = NamedThreadFactory.createThread(runnable, name);
-         thread.start();
+                 // sleep for full poll-interval after an error, so we don't spam the log
file
 -                try
 -                {
 -                    haveWork.tryAcquire(markerIntervalMillis, TimeUnit.MILLISECONDS);
 -                }
 -                catch (InterruptedException e)
 -                {
 -                    throw new AssertionError();
 -                }
++                LockSupport.parkNanos(markerIntervalNanos);
+             }
++
+             return true;
+         }
      }
  
 -
      /**
       * Block for @param alloc to be sync'd as necessary, and handle bookkeeping
       */

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c000827a/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogService.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogService.java
index bf6eb49,7a09de0..efd3394
--- a/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogService.java
+++ b/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogService.java
@@@ -25,7 -26,8 +25,8 @@@ class PeriodicCommitLogService extends 
  
      public PeriodicCommitLogService(final CommitLog commitLog)
      {
-         super(commitLog, "PERIODIC-COMMIT-LOG-SYNCER", DatabaseDescriptor.getCommitLogSyncPeriod(),
DatabaseDescriptor.getCommitLogMarkerPeriod());
+         super(commitLog, "PERIODIC-COMMIT-LOG-SYNCER", DatabaseDescriptor.getCommitLogSyncPeriod(),
 -              !commitLog.configuration.useCompression());
++              !(commitLog.configuration.useCompression() || commitLog.configuration.useEncryption()));
      }
  
      protected void maybeWaitForSync(CommitLogSegment.Allocation alloc)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c000827a/test/unit/org/apache/cassandra/db/commitlog/AbstractCommitLogServiceTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/commitlog/AbstractCommitLogServiceTest.java
index 0000000,5a46e5f..18f15fa
mode 000000,100644..100644
--- a/test/unit/org/apache/cassandra/db/commitlog/AbstractCommitLogServiceTest.java
+++ b/test/unit/org/apache/cassandra/db/commitlog/AbstractCommitLogServiceTest.java
@@@ -1,0 -1,176 +1,166 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ 
+ package org.apache.cassandra.db.commitlog;
+ 
+ import java.util.concurrent.TimeUnit;
+ import java.util.concurrent.atomic.AtomicInteger;
+ 
+ import org.junit.Assert;
+ import org.junit.BeforeClass;
 -import org.junit.Ignore;
+ import org.junit.Test;
+ 
+ import org.apache.cassandra.config.Config;
+ import org.apache.cassandra.config.DatabaseDescriptor;
+ import org.apache.cassandra.utils.Clock;
+ import org.apache.cassandra.utils.FreeRunningClock;
+ 
+ import static org.apache.cassandra.db.commitlog.AbstractCommitLogService.DEFAULT_MARKER_INTERVAL_MILLIS;
+ 
+ public class AbstractCommitLogServiceTest
+ {
+     @BeforeClass
+     public static void before()
+     {
++        DatabaseDescriptor.daemonInitialization();
+         DatabaseDescriptor.setCommitLogSync(Config.CommitLogSync.periodic);
+         DatabaseDescriptor.setCommitLogSyncPeriod(10 * 1000);
+     }
+ 
+     @Test
+     public void testConstructorSyncIsQuantized()
+     {
+         long syncTimeMillis = 10 * 1000;
+         FakeCommitLogService commitLogService = new FakeCommitLogService(syncTimeMillis);
 -        Assert.assertEquals(DEFAULT_MARKER_INTERVAL_MILLIS, commitLogService.markerIntervalMillis);
 -        Assert.assertEquals(syncTimeMillis, commitLogService.syncIntervalMillis);
++        Assert.assertEquals(toNanos(DEFAULT_MARKER_INTERVAL_MILLIS), commitLogService.markerIntervalNanos);
++        Assert.assertEquals(toNanos(syncTimeMillis), commitLogService.syncIntervalNanos);
+     }
+ 
+     @Test
+     public void testConstructorSyncEqualsMarkerDefault()
+     {
+         long syncTimeMillis = 100;
+         FakeCommitLogService commitLogService = new FakeCommitLogService(syncTimeMillis);
 -        Assert.assertEquals(DEFAULT_MARKER_INTERVAL_MILLIS, commitLogService.markerIntervalMillis);
 -        Assert.assertEquals(syncTimeMillis, commitLogService.syncIntervalMillis);
 -        Assert.assertEquals(commitLogService.markerIntervalMillis, commitLogService.syncIntervalMillis);
++        Assert.assertEquals(toNanos(DEFAULT_MARKER_INTERVAL_MILLIS), commitLogService.markerIntervalNanos);
++        Assert.assertEquals(toNanos(syncTimeMillis), commitLogService.syncIntervalNanos);
++        Assert.assertEquals(commitLogService.markerIntervalNanos, commitLogService.syncIntervalNanos);
+     }
+ 
+     @Test
+     public void testConstructorSyncShouldRoundUp()
+     {
+         long syncTimeMillis = 151;
+         long expectedMillis = 200;
+         FakeCommitLogService commitLogService = new FakeCommitLogService(syncTimeMillis);
 -        Assert.assertEquals(DEFAULT_MARKER_INTERVAL_MILLIS, commitLogService.markerIntervalMillis);
 -        Assert.assertEquals(expectedMillis, commitLogService.syncIntervalMillis);
++        Assert.assertEquals(toNanos(DEFAULT_MARKER_INTERVAL_MILLIS), commitLogService.markerIntervalNanos);
++        Assert.assertEquals(toNanos(expectedMillis), commitLogService.syncIntervalNanos);
+     }
+ 
+     @Test
+     public void testConstructorSyncShouldRoundDown()
+     {
+         long syncTimeMillis = 121;
+         long expectedMillis = 100;
+         FakeCommitLogService commitLogService = new FakeCommitLogService(syncTimeMillis);
 -        Assert.assertEquals(DEFAULT_MARKER_INTERVAL_MILLIS, commitLogService.markerIntervalMillis);
 -        Assert.assertEquals(expectedMillis, commitLogService.syncIntervalMillis);
++        Assert.assertEquals(toNanos(DEFAULT_MARKER_INTERVAL_MILLIS), commitLogService.markerIntervalNanos);
++        Assert.assertEquals(toNanos(expectedMillis), commitLogService.syncIntervalNanos);
+     }
+ 
+     @Test
+     public void testConstructorSyncTinyValue()
+     {
+         long syncTimeMillis = 10;
 -        long expectedNanos = syncTimeMillis;
++        long expectedNanos = toNanos(syncTimeMillis);
+         FakeCommitLogService commitLogService = new FakeCommitLogService(syncTimeMillis);
 -        Assert.assertEquals(expectedNanos, commitLogService.markerIntervalMillis);
 -        Assert.assertEquals(expectedNanos, commitLogService.syncIntervalMillis);
++        Assert.assertEquals(expectedNanos, commitLogService.markerIntervalNanos);
++        Assert.assertEquals(expectedNanos, commitLogService.syncIntervalNanos);
++    }
++
++    private static long toNanos(long millis)
++    {
++        return TimeUnit.MILLISECONDS.toNanos(millis);
+     }
+ 
+     private static class FakeCommitLogService extends AbstractCommitLogService
+     {
+         FakeCommitLogService(long syncIntervalMillis)
+         {
+             super(new FakeCommitLog(), "This is not a real commit log", syncIntervalMillis,
true);
+             lastSyncedAt = 0;
+         }
+ 
 -        @Override
 -        void start()
 -        {
 -            // nop
 -        }
 -
+         protected void maybeWaitForSync(CommitLogSegment.Allocation alloc)
+         {
+             // nop
+         }
+     }
+ 
+     @Test
+     public void testSync()
+     {
+         long syncTimeMillis = AbstractCommitLogService.DEFAULT_MARKER_INTERVAL_MILLIS *
2;
+         FreeRunningClock clock = new FreeRunningClock();
+         FakeCommitLogService commitLogService = new FakeCommitLogService(syncTimeMillis);
+         AbstractCommitLogService.SyncRunnable syncRunnable = commitLogService.new SyncRunnable(clock);
+         FakeCommitLog commitLog = (FakeCommitLog) commitLogService.commitLog;
+ 
+         // at time 0
+         Assert.assertTrue(syncRunnable.sync());
+         Assert.assertEquals(1, commitLog.markCount.get());
+         Assert.assertEquals(0, commitLog.syncCount.get());
+ 
+         // at time DEFAULT_MARKER_INTERVAL_MILLIS
+         clock.advance(DEFAULT_MARKER_INTERVAL_MILLIS, TimeUnit.MILLISECONDS);
+         Assert.assertTrue(syncRunnable.sync());
+         Assert.assertEquals(2, commitLog.markCount.get());
+         Assert.assertEquals(0, commitLog.syncCount.get());
+ 
+         // at time DEFAULT_MARKER_INTERVAL_MILLIS * 2
+         clock.advance(DEFAULT_MARKER_INTERVAL_MILLIS, TimeUnit.MILLISECONDS);
+         Assert.assertTrue(syncRunnable.sync());
+         Assert.assertEquals(2, commitLog.markCount.get());
+         Assert.assertEquals(1, commitLog.syncCount.get());
+ 
+         // at time DEFAULT_MARKER_INTERVAL_MILLIS * 3, but with shutdown!
+         clock.advance(DEFAULT_MARKER_INTERVAL_MILLIS, TimeUnit.MILLISECONDS);
+         commitLogService.shutdown();
+         Assert.assertFalse(syncRunnable.sync());
+         Assert.assertEquals(2, commitLog.markCount.get());
+         Assert.assertEquals(2, commitLog.syncCount.get());
+     }
+ 
+     private static class FakeCommitLog extends CommitLog
+     {
+         private final AtomicInteger markCount = new AtomicInteger();
+         private final AtomicInteger syncCount = new AtomicInteger();
+ 
+         FakeCommitLog()
+         {
 -            super(DatabaseDescriptor.getCommitLogLocation(), null);
 -        }
 -
 -        @Override
 -        CommitLog start()
 -        {
 -            // this is a bit dicey. we need to start the allocator, but starting the parent's
executor will muck things
 -            // up as it is pointing to a different executor service, not the fake one in
this test class.
 -            allocator.start();
 -            return this;
++            super(null);
+         }
+ 
+         @Override
 -        public void sync(boolean syncAllSegments, boolean flush)
++        public void sync(boolean flush)
+         {
+             if (flush)
+                 syncCount.incrementAndGet();
+             else
+                 markCount.incrementAndGet();
+         }
+     }
+ }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c000827a/test/unit/org/apache/cassandra/db/commitlog/CommitLogChainedMarkersTest.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c000827a/test/unit/org/apache/cassandra/db/commitlog/CommitLogSegmentBackpressureTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/commitlog/CommitLogSegmentBackpressureTest.java
index 46a3fb0,c615880..0076eb6
--- a/test/unit/org/apache/cassandra/db/commitlog/CommitLogSegmentBackpressureTest.java
+++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogSegmentBackpressureTest.java
@@@ -65,14 -64,14 +65,14 @@@ public class CommitLogSegmentBackpressu
  
      @Test
      @BMRules(rules = {@BMRule(name = "Acquire Semaphore before sync",
-                               targetClass = "AbstractCommitLogService$1",
-                               targetMethod = "run",
+                               targetClass = "AbstractCommitLogService$SyncRunnable",
+                               targetMethod = "sync",
 -                              targetLocation = "AT INVOKE org.apache.cassandra.db.commitlog.CommitLog.sync(boolean,
boolean)",
 +                              targetLocation = "AT INVOKE org.apache.cassandra.db.commitlog.CommitLog.sync(boolean)",
                                action = "org.apache.cassandra.db.commitlog.CommitLogSegmentBackpressureTest.allowSync.acquire()"),
                        @BMRule(name = "Release Semaphore after sync",
-                               targetClass = "AbstractCommitLogService$1",
-                               targetMethod = "run",
+                               targetClass = "AbstractCommitLogService$SyncRunnable",
+                               targetMethod = "sync",
 -                              targetLocation = "AFTER INVOKE org.apache.cassandra.db.commitlog.CommitLog.sync(boolean,
boolean)",
 +                              targetLocation = "AFTER INVOKE org.apache.cassandra.db.commitlog.CommitLog.sync(boolean)",
                                action = "org.apache.cassandra.db.commitlog.CommitLogSegmentBackpressureTest.allowSync.release()")})
      public void testCompressedCommitLogBackpressure() throws Throwable
      {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org


Mime
View raw message