cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From stefa...@apache.org
Subject [1/2] cassandra git commit: Support optional backpressure strategies at the coordinator
Date Tue, 20 Sep 2016 01:08:45 GMT
Repository: cassandra
Updated Branches:
  refs/heads/trunk 560faba2f -> d43b9ce50


http://git-wip-us.apache.org/repos/asf/cassandra/blob/d43b9ce5/test/unit/org/apache/cassandra/net/RateBasedBackPressureTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/net/RateBasedBackPressureTest.java b/test/unit/org/apache/cassandra/net/RateBasedBackPressureTest.java
new file mode 100644
index 0000000..b94b6ee
--- /dev/null
+++ b/test/unit/org/apache/cassandra/net/RateBasedBackPressureTest.java
@@ -0,0 +1,409 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*    http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+package org.apache.cassandra.net;
+
+import java.net.InetAddress;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.RateLimiter;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.cassandra.utils.TestTimeSource;
+import org.apache.cassandra.utils.TimeSource;
+
+import static org.apache.cassandra.net.RateBasedBackPressure.FACTOR;
+import static org.apache.cassandra.net.RateBasedBackPressure.FLOW;
+import static org.apache.cassandra.net.RateBasedBackPressure.HIGH_RATIO;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class RateBasedBackPressureTest
+{
+    @Test(expected = IllegalArgumentException.class)
+    public void testAcceptsNoLessThanThreeArguments() throws Exception
+    {
+        new RateBasedBackPressure(ImmutableMap.of(HIGH_RATIO, "1"), new TestTimeSource(),
10);
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testHighRatioMustBeBiggerThanZero() throws Exception
+    {
+        new RateBasedBackPressure(ImmutableMap.of(HIGH_RATIO, "0", FACTOR, "2", FLOW, "FAST"),
new TestTimeSource(), 10);
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testHighRatioMustBeSmallerEqualThanOne() throws Exception
+    {
+        new RateBasedBackPressure(ImmutableMap.of(HIGH_RATIO, "2", FACTOR, "2", FLOW, "FAST"),
new TestTimeSource(), 10);
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testFactorMustBeBiggerEqualThanOne() throws Exception
+    {
+        new RateBasedBackPressure(ImmutableMap.of(HIGH_RATIO, "0.9", FACTOR, "0", FLOW, "FAST"),
new TestTimeSource(), 10);
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testWindowSizeMustBeBiggerEqualThanTen() throws Exception
+    {
+        new RateBasedBackPressure(ImmutableMap.of(HIGH_RATIO, "0.9", FACTOR, "5", FLOW, "FAST"),
new TestTimeSource(), 1);
+    }
+
+    @Test
+    public void testFlowMustBeEitherFASTorSLOW() throws Exception
+    {
+        new RateBasedBackPressure(ImmutableMap.of(HIGH_RATIO, "0.9", FACTOR, "1", FLOW, "FAST"),
new TestTimeSource(), 10);
+        new RateBasedBackPressure(ImmutableMap.of(HIGH_RATIO, "0.9", FACTOR, "1", FLOW, "SLOW"),
new TestTimeSource(), 10);
+        try
+        {
+            new RateBasedBackPressure(ImmutableMap.of(HIGH_RATIO, "0.9", FACTOR, "1", FLOW,
"WRONG"), new TestTimeSource(), 10);
+            fail("Expected to fail with wrong flow type.");
+        }
+        catch (Exception ex)
+        {
+        }
+    }
+
+    @Test
+    public void testBackPressureStateUpdates()
+    {
+        long windowSize = 6000;
+        TestTimeSource timeSource = new TestTimeSource();
+        RateBasedBackPressure strategy = new RateBasedBackPressure(ImmutableMap.of(HIGH_RATIO,
"0.9", FACTOR, "10", FLOW, "FAST"), timeSource, windowSize);
+
+        RateBasedBackPressureState state = strategy.newState(InetAddress.getLoopbackAddress());
+        state.onMessageSent(null);
+        assertEquals(0, state.incomingRate.size());
+        assertEquals(0, state.outgoingRate.size());
+
+        state = strategy.newState(InetAddress.getLoopbackAddress());
+        state.onResponseReceived();
+        assertEquals(1, state.incomingRate.size());
+        assertEquals(1, state.outgoingRate.size());
+
+        state = strategy.newState(InetAddress.getLoopbackAddress());
+        state.onResponseTimeout();
+        assertEquals(0, state.incomingRate.size());
+        assertEquals(1, state.outgoingRate.size());
+    }
+
+    @Test
+    public void testBackPressureIsNotUpdatedBeyondInfinity() throws Exception
+    {
+        long windowSize = 6000;
+        TestTimeSource timeSource = new TestTimeSource();
+        RateBasedBackPressure strategy = new RateBasedBackPressure(ImmutableMap.of(HIGH_RATIO,
"0.9", FACTOR, "10", FLOW, "FAST"), timeSource, windowSize);
+        RateBasedBackPressureState state = strategy.newState(InetAddress.getLoopbackAddress());
+
+        // Get initial rate:
+        double initialRate = state.rateLimiter.getRate();
+        assertEquals(Double.POSITIVE_INFINITY, initialRate, 0.0);
+
+        // Update incoming and outgoing rate equally:
+        state.incomingRate.update(1);
+        state.outgoingRate.update(1);
+
+        // Move time ahead:
+        timeSource.sleep(windowSize, TimeUnit.MILLISECONDS);
+
+        // Verify the rate doesn't change because already at infinity:
+        strategy.apply(Sets.newHashSet(state), 1, TimeUnit.SECONDS);
+        assertEquals(initialRate, state.rateLimiter.getRate(), 0.0);
+    }
+
+    @Test
+    public void testBackPressureIsUpdatedOncePerWindowSize() throws Exception
+    {
+        long windowSize = 6000;
+        TestTimeSource timeSource = new TestTimeSource();
+        RateBasedBackPressure strategy = new RateBasedBackPressure(ImmutableMap.of(HIGH_RATIO,
"0.9", FACTOR, "10", FLOW, "FAST"), timeSource, windowSize);
+        RateBasedBackPressureState state = strategy.newState(InetAddress.getLoopbackAddress());
+
+        // Get initial time:
+        long current = state.getLastIntervalAcquire();
+        assertEquals(0, current);
+
+        // Update incoming and outgoing rate:
+        state.incomingRate.update(1);
+        state.outgoingRate.update(1);
+
+        // Move time ahead by window size:
+        timeSource.sleep(windowSize, TimeUnit.MILLISECONDS);
+
+        // Verify the timestamp changed:
+        strategy.apply(Sets.newHashSet(state), 1, TimeUnit.SECONDS);
+        current = state.getLastIntervalAcquire();
+        assertEquals(timeSource.currentTimeMillis(), current);
+
+        // Move time ahead by less than interval:
+        long previous = current;
+        timeSource.sleep(windowSize / 2, TimeUnit.MILLISECONDS);
+
+        // Verify the last timestamp didn't change because below the window size:
+        strategy.apply(Sets.newHashSet(state), 1, TimeUnit.SECONDS);
+        current = state.getLastIntervalAcquire();
+        assertEquals(previous, current);
+    }
+
+    @Test
+    public void testBackPressureWhenBelowHighRatio() throws Exception
+    {
+        long windowSize = 6000;
+        TestTimeSource timeSource = new TestTimeSource();
+        RateBasedBackPressure strategy = new RateBasedBackPressure(ImmutableMap.of(HIGH_RATIO,
"0.9", FACTOR, "10", FLOW, "FAST"), timeSource, windowSize);
+        RateBasedBackPressureState state = strategy.newState(InetAddress.getLoopbackAddress());
+
+        // Update incoming and outgoing rate so that the ratio is 0.5:
+        state.incomingRate.update(50);
+        state.outgoingRate.update(100);
+
+        // Move time ahead:
+        timeSource.sleep(windowSize, TimeUnit.MILLISECONDS);
+
+        // Verify the rate is decreased by factor:
+        strategy.apply(Sets.newHashSet(state), 1, TimeUnit.SECONDS);
+        assertEquals(7.4, state.rateLimiter.getRate(), 0.1);
+    }
+
+    @Test
+    public void testBackPressureRateLimiterIsIncreasedAfterGoingAgainAboveHighRatio() throws
Exception
+    {
+        long windowSize = 6000;
+        TestTimeSource timeSource = new TestTimeSource();
+        RateBasedBackPressure strategy = new RateBasedBackPressure(ImmutableMap.of(HIGH_RATIO,
"0.9", FACTOR, "10", FLOW, "FAST"), timeSource, windowSize);
+        RateBasedBackPressureState state = strategy.newState(InetAddress.getLoopbackAddress());
+
+        // Update incoming and outgoing rate so that the ratio is 0.5:
+        state.incomingRate.update(50);
+        state.outgoingRate.update(100);
+
+        // Move time ahead:
+        timeSource.sleep(windowSize, TimeUnit.MILLISECONDS);
+
+        // Verify the rate decreased:
+        strategy.apply(Sets.newHashSet(state), 1, TimeUnit.SECONDS);
+        assertEquals(7.4, state.rateLimiter.getRate(), 0.1);
+
+        // Update incoming and outgoing rate back above high rate:
+        state.incomingRate.update(50);
+        state.outgoingRate.update(50);
+
+        // Move time ahead:
+        timeSource.sleep(windowSize, TimeUnit.MILLISECONDS);
+
+        // Verify rate limiter is increased by factor:
+        strategy.apply(Sets.newHashSet(state), 1, TimeUnit.SECONDS);
+        assertEquals(8.25, state.rateLimiter.getRate(), 0.1);
+
+        // Update incoming and outgoing rate to keep it below the limiter rate:
+        state.incomingRate.update(1);
+        state.outgoingRate.update(1);
+
+        // Move time ahead:
+        timeSource.sleep(windowSize, TimeUnit.MILLISECONDS);
+
+        // Verify rate limiter is not increased as already higher than the actual rate:
+        strategy.apply(Sets.newHashSet(state), 1, TimeUnit.SECONDS);
+        assertEquals(8.25, state.rateLimiter.getRate(), 0.1);
+    }
+
+    @Test
+    public void testBackPressureFastFlow() throws Exception
+    {
+        long windowSize = 6000;
+        TestTimeSource timeSource = new TestTimeSource();
+        TestableBackPressure strategy = new TestableBackPressure(ImmutableMap.of(HIGH_RATIO,
"0.9", FACTOR, "10", FLOW, "FAST"), timeSource, windowSize);
+        RateBasedBackPressureState state1 = strategy.newState(InetAddress.getByName("127.0.0.1"));
+        RateBasedBackPressureState state2 = strategy.newState(InetAddress.getByName("127.0.0.2"));
+        RateBasedBackPressureState state3 = strategy.newState(InetAddress.getByName("127.0.0.3"));
+
+        // Update incoming and outgoing rates:
+        state1.incomingRate.update(50);
+        state1.outgoingRate.update(100);
+        state2.incomingRate.update(80); // fast
+        state2.outgoingRate.update(100);
+        state3.incomingRate.update(20);
+        state3.outgoingRate.update(100);
+
+        // Move time ahead:
+        timeSource.sleep(windowSize, TimeUnit.MILLISECONDS);
+
+        // Verify the fast replica rate limiting has been applied:
+        Set<RateBasedBackPressureState> replicaGroup = Sets.newHashSet(state1, state2,
state3);
+        strategy.apply(replicaGroup, 1, TimeUnit.SECONDS);
+        assertTrue(strategy.checkAcquired());
+        assertTrue(strategy.checkApplied());
+        assertEquals(12.0, strategy.getRateLimiterForReplicaGroup(replicaGroup).getRate(),
0.1);
+    }
+
+    @Test
+    public void testBackPressureSlowFlow() throws Exception
+    {
+        long windowSize = 6000;
+        TestTimeSource timeSource = new TestTimeSource();
+        TestableBackPressure strategy = new TestableBackPressure(ImmutableMap.of(HIGH_RATIO,
"0.9", FACTOR, "10", FLOW, "SLOW"), timeSource, windowSize);
+        RateBasedBackPressureState state1 = strategy.newState(InetAddress.getByName("127.0.0.1"));
+        RateBasedBackPressureState state2 = strategy.newState(InetAddress.getByName("127.0.0.2"));
+        RateBasedBackPressureState state3 = strategy.newState(InetAddress.getByName("127.0.0.3"));
+
+        // Update incoming and outgoing rates:
+        state1.incomingRate.update(50);
+        state1.outgoingRate.update(100);
+        state2.incomingRate.update(100);
+        state2.outgoingRate.update(100);
+        state3.incomingRate.update(20); // slow
+        state3.outgoingRate.update(100);
+
+        // Move time ahead:
+        timeSource.sleep(windowSize, TimeUnit.MILLISECONDS);
+
+        // Verify the slow replica rate limiting has been applied:
+        Set<RateBasedBackPressureState> replicaGroup = Sets.newHashSet(state1, state2,
state3);
+        strategy.apply(replicaGroup, 1, TimeUnit.SECONDS);
+        assertTrue(strategy.checkAcquired());
+        assertTrue(strategy.checkApplied());
+        assertEquals(3.0, strategy.getRateLimiterForReplicaGroup(replicaGroup).getRate(),
0.1);
+    }
+
+    @Test
+    public void testBackPressureWithDifferentGroups() throws Exception
+    {
+        long windowSize = 6000;
+        TestTimeSource timeSource = new TestTimeSource();
+        TestableBackPressure strategy = new TestableBackPressure(ImmutableMap.of(HIGH_RATIO,
"0.9", FACTOR, "10", FLOW, "SLOW"), timeSource, windowSize);
+        RateBasedBackPressureState state1 = strategy.newState(InetAddress.getByName("127.0.0.1"));
+        RateBasedBackPressureState state2 = strategy.newState(InetAddress.getByName("127.0.0.2"));
+        RateBasedBackPressureState state3 = strategy.newState(InetAddress.getByName("127.0.0.3"));
+        RateBasedBackPressureState state4 = strategy.newState(InetAddress.getByName("127.0.0.4"));
+
+        // Update incoming and outgoing rates:
+        state1.incomingRate.update(50); // this
+        state1.outgoingRate.update(100);
+        state2.incomingRate.update(100);
+        state2.outgoingRate.update(100);
+        state3.incomingRate.update(20); // this
+        state3.outgoingRate.update(100);
+        state4.incomingRate.update(80);
+        state4.outgoingRate.update(100);
+
+        // Move time ahead:
+        timeSource.sleep(windowSize, TimeUnit.MILLISECONDS);
+
+        // Verify the first group:
+        Set<RateBasedBackPressureState> replicaGroup = Sets.newHashSet(state1, state2);
+        strategy.apply(replicaGroup, 1, TimeUnit.SECONDS);
+        assertTrue(strategy.checkAcquired());
+        assertTrue(strategy.checkApplied());
+        assertEquals(7.4, strategy.getRateLimiterForReplicaGroup(replicaGroup).getRate(),
0.1);
+
+        // Verify the second group:
+        replicaGroup = Sets.newHashSet(state3, state4);
+        strategy.apply(replicaGroup, 1, TimeUnit.SECONDS);
+        assertTrue(strategy.checkAcquired());
+        assertTrue(strategy.checkApplied());
+        assertEquals(3.0, strategy.getRateLimiterForReplicaGroup(replicaGroup).getRate(),
0.1);
+    }
+
+    @Test
+    public void testBackPressurePastTimeout() throws Exception
+    {
+        long windowSize = 10000;
+        TestTimeSource timeSource = new TestTimeSource();
+        TestableBackPressure strategy = new TestableBackPressure(ImmutableMap.of(HIGH_RATIO,
"0.9", FACTOR, "10", FLOW, "SLOW"), timeSource, windowSize);
+        RateBasedBackPressureState state1 = strategy.newState(InetAddress.getByName("127.0.0.1"));
+        RateBasedBackPressureState state2 = strategy.newState(InetAddress.getByName("127.0.0.2"));
+        RateBasedBackPressureState state3 = strategy.newState(InetAddress.getByName("127.0.0.3"));
+
+        // Update incoming and outgoing rates:
+        state1.incomingRate.update(5); // slow
+        state1.outgoingRate.update(100);
+        state2.incomingRate.update(100);
+        state2.outgoingRate.update(100);
+        state3.incomingRate.update(100);
+        state3.outgoingRate.update(100);
+
+        // Move time ahead:
+        timeSource.sleep(windowSize, TimeUnit.MILLISECONDS);
+
+        // Verify the slow replica rate limiting has been applied:
+        Set<RateBasedBackPressureState> replicaGroup = Sets.newHashSet(state1, state2,
state3);
+        strategy.apply(replicaGroup, 4, TimeUnit.SECONDS);
+        assertTrue(strategy.checkAcquired());
+        assertTrue(strategy.checkApplied());
+        assertEquals(0.5, strategy.getRateLimiterForReplicaGroup(replicaGroup).getRate(),
0.1);
+
+        // Make one more apply call to saturate the rate limit timeout (0.5 requests per
second means 2 requests span
+        // 4 seconds, but we can only make one as we have to subtract the incoming response
time):
+        strategy.apply(replicaGroup, 4, TimeUnit.SECONDS);
+
+        // Now verify another call to apply doesn't acquire the rate limit because of the
max timeout of 4 seconds minus
+        // 2 seconds of response time, so the time source itself sleeps two second:
+        long start = timeSource.currentTimeMillis();
+        strategy.apply(replicaGroup, 4, TimeUnit.SECONDS);
+        assertFalse(strategy.checkAcquired());
+        assertTrue(strategy.checkApplied());
+        assertEquals(TimeUnit.NANOSECONDS.convert(2, TimeUnit.SECONDS),
+                     strategy.timeout);
+        assertEquals(strategy.timeout,
+                     TimeUnit.NANOSECONDS.convert(timeSource.currentTimeMillis() - start,
TimeUnit.MILLISECONDS));
+    }
+
+    public static class TestableBackPressure extends RateBasedBackPressure
+    {
+        public volatile boolean acquired = false;
+        public volatile boolean applied = false;
+        public volatile long timeout;
+
+        public TestableBackPressure(Map<String, Object> args, TimeSource timeSource,
long windowSize)
+        {
+            super(args, timeSource, windowSize);
+        }
+
+        @Override
+        public boolean doRateLimit(RateLimiter rateLimiter, long timeoutInNanos)
+        {
+            acquired = super.doRateLimit(rateLimiter, timeoutInNanos);
+            applied = true;
+            timeout = timeoutInNanos;
+            return acquired;
+        }
+
+        public boolean checkAcquired()
+        {
+            boolean checked = acquired;
+            acquired = false;
+            return checked;
+        }
+
+        public boolean checkApplied()
+        {
+            boolean checked = applied;
+            applied = false;
+            return checked;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d43b9ce5/test/unit/org/apache/cassandra/utils/SlidingTimeRateTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/utils/SlidingTimeRateTest.java b/test/unit/org/apache/cassandra/utils/SlidingTimeRateTest.java
new file mode 100644
index 0000000..8c11f9d
--- /dev/null
+++ b/test/unit/org/apache/cassandra/utils/SlidingTimeRateTest.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.utils;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class SlidingTimeRateTest
+{
+    @Test
+    public void testUpdateAndGet()
+    {
+        SlidingTimeRate rate = new SlidingTimeRate(new TestTimeSource(), 10, 1, TimeUnit.SECONDS);
+        int updates = 100;
+        for (int i = 0; i < updates; i++)
+        {
+            rate.update(1);
+        }
+        Assert.assertEquals(updates, rate.get(TimeUnit.SECONDS), 0.0);
+    }
+
+    @Test
+    public void testUpdateAndGetBetweenWindows() throws InterruptedException
+    {
+        TestTimeSource time = new TestTimeSource();
+        SlidingTimeRate rate = new SlidingTimeRate(time, 5, 1, TimeUnit.SECONDS);
+        int updates = 100;
+        for (int i = 0; i < updates; i++)
+        {
+            rate.update(1);
+            time.sleep(100, TimeUnit.MILLISECONDS);
+        }
+        Assert.assertEquals(10, rate.get(TimeUnit.SECONDS), 0.0);
+    }
+
+    @Test
+    public void testUpdateAndGetPastWindowSize() throws InterruptedException
+    {
+        TestTimeSource time = new TestTimeSource();
+        SlidingTimeRate rate = new SlidingTimeRate(time, 5, 1, TimeUnit.SECONDS);
+        int updates = 100;
+        for (int i = 0; i < updates; i++)
+        {
+            rate.update(1);
+        }
+
+        time.sleep(6, TimeUnit.SECONDS);
+
+        Assert.assertEquals(0, rate.get(TimeUnit.SECONDS), 0.0);
+    }
+
+    @Test
+    public void testUpdateAndGetToPointInTime() throws InterruptedException
+    {
+        TestTimeSource time = new TestTimeSource();
+        SlidingTimeRate rate = new SlidingTimeRate(time, 5, 1, TimeUnit.SECONDS);
+        int updates = 10;
+        for (int i = 0; i < updates; i++)
+        {
+            rate.update(1);
+            time.sleep(100, TimeUnit.MILLISECONDS);
+        }
+
+        time.sleep(1, TimeUnit.SECONDS);
+
+        Assert.assertEquals(5, rate.get(TimeUnit.SECONDS), 0.0);
+        Assert.assertEquals(10, rate.get(1, TimeUnit.SECONDS), 0.0);
+    }
+
+    @Test
+    public void testDecay() throws InterruptedException
+    {
+        TestTimeSource time = new TestTimeSource();
+        SlidingTimeRate rate = new SlidingTimeRate(time, 5, 1, TimeUnit.SECONDS);
+        int updates = 10;
+        for (int i = 0; i < updates; i++)
+        {
+            rate.update(1);
+            time.sleep(100, TimeUnit.MILLISECONDS);
+        }
+        Assert.assertEquals(10, rate.get(TimeUnit.SECONDS), 0.0);
+
+        time.sleep(1, TimeUnit.SECONDS);
+
+        Assert.assertEquals(5, rate.get(TimeUnit.SECONDS), 0.0);
+
+        time.sleep(2, TimeUnit.SECONDS);
+
+        Assert.assertEquals(2.5, rate.get(TimeUnit.SECONDS), 0.0);
+    }
+
+    @Test
+    public void testPruning() throws InterruptedException
+    {
+        TestTimeSource time = new TestTimeSource();
+        SlidingTimeRate rate = new SlidingTimeRate(time, 5, 1, TimeUnit.SECONDS);
+
+        rate.update(1);
+        Assert.assertEquals(1, rate.size());
+
+        time.sleep(6, TimeUnit.SECONDS);
+
+        rate.prune();
+        Assert.assertEquals(0, rate.size());
+    }
+
+    @Test
+    public void testConcurrentUpdateAndGet() throws InterruptedException
+    {
+        final ExecutorService executor = Executors.newFixedThreadPool(FBUtilities.getAvailableProcessors());
+        final TestTimeSource time = new TestTimeSource();
+        final SlidingTimeRate rate = new SlidingTimeRate(time, 5, 1, TimeUnit.SECONDS);
+        int updates = 100000;
+        for (int i = 0; i < updates; i++)
+        {
+            executor.submit(() -> {
+                time.sleep(1, TimeUnit.MILLISECONDS);
+                rate.update(1);
+            });
+        }
+
+        executor.shutdown();
+
+        Assert.assertTrue(executor.awaitTermination(1, TimeUnit.MINUTES));
+        Assert.assertEquals(1000, rate.get(TimeUnit.SECONDS), 100.0);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d43b9ce5/test/unit/org/apache/cassandra/utils/TestTimeSource.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/utils/TestTimeSource.java b/test/unit/org/apache/cassandra/utils/TestTimeSource.java
new file mode 100644
index 0000000..4ecd086
--- /dev/null
+++ b/test/unit/org/apache/cassandra/utils/TestTimeSource.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.utils;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+public class TestTimeSource implements TimeSource
+{
+    private final AtomicLong timeInMillis = new AtomicLong(System.currentTimeMillis());
+
+    @Override
+    public long currentTimeMillis()
+    {
+        return timeInMillis.get();
+    }
+
+    @Override
+    public long nanoTime()
+    {
+        return timeInMillis.get() * 1_000_000;
+    }
+
+    @Override
+    public TimeSource sleep(long sleepFor, TimeUnit unit)
+    {
+        long current = timeInMillis.get();
+        long sleepInMillis = TimeUnit.MILLISECONDS.convert(sleepFor, unit);
+        boolean elapsed;
+        do
+        {
+            long newTime = current + sleepInMillis;
+            elapsed = timeInMillis.compareAndSet(current, newTime);
+            if (!elapsed)
+            {
+                long updated = timeInMillis.get();
+                if (updated - current >= sleepInMillis)
+                {
+                    elapsed = true;
+                }
+                else
+                {
+                    sleepInMillis -= updated - current;
+                    current = updated;
+                }
+            }
+        }
+        while (!elapsed);
+        return this;
+    }
+
+    @Override
+    public TimeSource sleepUninterruptibly(long sleepFor, TimeUnit unit)
+    {
+        return sleep(sleepFor, unit);
+    }
+}


Mime
View raw message